Skip to content

Commit a398afd

Browse files
committed
test: add coverage for large magic-v2 batch encoding
1 parent 637dbbb commit a398afd

File tree

3 files changed

+85
-2
lines changed

3 files changed

+85
-2
lines changed

scripts/produce_req_encoding_benchmark.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,4 @@ main() {
104104
}
105105

106106
# Run main function
107-
main "$@"
107+
main "$@"

src/kpro_batch.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
-type headers() :: kpro:headers().
3838
-type batch_meta() :: kpro:batch_meta().
3939
-define(NO_META, ?KPRO_NO_BATCH_META).
40+
-define(CRC32C_NON_DIRTY_BYTES_THRESHOLD, 1024 * 1024).
4041

4142
%% @doc Encode a list of batch inputs into byte stream.
4243
-spec encode(magic(), batch_input(), compress_option()) -> {non_neg_integer(), iodata()}.
@@ -112,7 +113,7 @@ encode_tx([FirstMsg | _] = Batch, Compression, FirstSequence,
112113
%% the cost of crc32c computation is negligible comparing to the batch
113114
%% encoding part, so we do not call dirty-scheduler.
114115
%% Otherwise we respect iolist and call dirty-scheduler (_d flavor API).
115-
crc32c(Bytes, IoData) when Bytes =< 1024 * 1024 ->
116+
crc32c(Bytes, IoData) when Bytes =< ?CRC32C_NON_DIRTY_BYTES_THRESHOLD ->
116117
crc32cer:nif(IoData);
117118
crc32c(_, IoData) ->
118119
crc32cer:nif_iolist_d(IoData).

test/kpro_batch_tests.erl

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,88 @@ encode_decode_test_() ->
4646
CompressionOpt <- CompressionOpts,
4747
MagicV <- MagicVersions].
4848

49+
encode_large_batch_test() ->
50+
%% Test encoding large batches (>1MB) with different compression options
51+
LargeBatch = make_large_batch(),
52+
53+
%% Test with no compression
54+
{SizeNoComp, EncodedNoComp} = kpro_batch:encode(2, LargeBatch, no_compression),
55+
?assert(SizeNoComp > 1024 * 1024), %% Should be >1MB
56+
?assert(is_binary(bin(EncodedNoComp))),
57+
58+
%% Test with gzip compression
59+
{SizeGzip, EncodedGzip} = kpro_batch:encode(2, LargeBatch, gzip),
60+
%% Compression may not always be smaller due to overhead, so just verify it's reasonable
61+
?assert(SizeGzip > 0),
62+
?assert(SizeGzip < SizeNoComp * 2), %% Should not be more than 2x larger
63+
?assert(is_binary(bin(EncodedGzip))),
64+
65+
%% Test with snappy compression
66+
{SizeSnappy, EncodedSnappy} = kpro_batch:encode(2, LargeBatch, snappy),
67+
%% Compression may not always be smaller due to overhead, so just verify it's reasonable
68+
?assert(SizeSnappy > 0),
69+
?assert(SizeSnappy < SizeNoComp * 2), %% Should not be more than 2x larger
70+
?assert(is_binary(bin(EncodedSnappy))),
71+
72+
%% Test with lz4 compression (if available)
73+
try
74+
{SizeLz4, EncodedLz4} = kpro_batch:encode(2, LargeBatch, lz4),
75+
?assert(SizeLz4 > 0),
76+
?assert(SizeLz4 < SizeNoComp * 2), %% Should not be more than 2x larger
77+
?assert(is_binary(bin(EncodedLz4)))
78+
catch
79+
error:undef ->
80+
%% LZ4 not available, skip test
81+
ok
82+
end,
83+
84+
%% Test with zstd compression (if available)
85+
try
86+
{SizeZstd, EncodedZstd} = kpro_batch:encode(2, LargeBatch, zstd),
87+
?assert(SizeZstd > 0),
88+
?assert(SizeZstd < SizeNoComp * 2), %% Should not be more than 2x larger
89+
?assert(is_binary(bin(EncodedZstd)))
90+
catch
91+
error:undef ->
92+
%% ZSTD not available, skip test
93+
ok
94+
end,
95+
96+
%% Verify we can decode the uncompressed batch
97+
[{_Meta, DecodedMessages}] = kpro_batch:decode(bin(EncodedNoComp)),
98+
?assertEqual(length(LargeBatch), length(DecodedMessages)),
99+
100+
%% Verify message content matches
101+
lists:foreach(fun({Original, Decoded}) ->
102+
#kafka_message{key = DecodedKey, value = DecodedValue, ts = DecodedTs} = Decoded,
103+
#{key := OriginalKey, value := OriginalValue, ts := OriginalTs} = Original,
104+
?assertEqual(OriginalKey, DecodedKey),
105+
?assertEqual(OriginalValue, DecodedValue),
106+
?assertEqual(OriginalTs, DecodedTs)
107+
end, lists:zip(LargeBatch, DecodedMessages)),
108+
109+
%% Test performance: encoding should complete in reasonable time
110+
StartTime = erlang:system_time(microsecond),
111+
_ = kpro_batch:encode(2, LargeBatch, no_compression),
112+
EndTime = erlang:system_time(microsecond),
113+
EncodingTime = EndTime - StartTime,
114+
?assert(EncodingTime < 1000000), %% Should complete in <1 second
115+
116+
ok.
117+
118+
make_large_batch() ->
119+
%% Create a batch that will be >1MB total
120+
%% Each message is ~100KB, so 11 messages = ~1.1MB total
121+
MessageCount = 11,
122+
MessageSize = 100 * 1024, %% 100KB per message
123+
124+
[#{ ts => kpro_lib:now_ts() + N * 1000 %% Different timestamps
125+
, key => <<"large_msg_key_", (integer_to_binary(N))/binary>>
126+
, value => crypto:strong_rand_bytes(MessageSize)
127+
, headers => [{<<"header_key_", (integer_to_binary(N))/binary>>,
128+
<<"header_value_", (integer_to_binary(N))/binary>>}]
129+
} || N <- lists:seq(1, MessageCount)].
130+
49131
provide_compression_test() ->
50132
kpro:provide_compression([{snappy, ?MODULE}]),
51133
try

0 commit comments

Comments
 (0)