Skip to content

Commit a38c2fc

Browse files
authored
Merge pull request #146 from zmstone/251027-support-process-alias-as-request-ref
feat: support process alias as request reference
2 parents a3babac + c97d891 commit a38c2fc

File tree

7 files changed

+63
-22
lines changed

7 files changed

+63
-22
lines changed

changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
* 4.3.0
2+
- Allow process alias as request reference.
3+
Previously `kpro_req_lib:produce/5` creates a reference for the caller,
4+
now new API `kpro_req_lib:produce/6` can be used with pre-made reference.
5+
If the reference is passed as `{alias, Ref}`, the Kafka response is sent to the alias, but not the `kpro:send` caller.
6+
17
* 4.2.9
28
- Improve message encoding performance.
39
- Allow `{magic_v2, Size, IoList}` as batch input for `produce` request.

include/kpro.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
-include("kpro_error_codes.hrl").
2222

2323
-record(kpro_req,
24-
{ ref :: reference()
24+
{ ref :: kpro:req_ref()
2525
, api :: kpro:api()
2626
, vsn :: kpro:vsn()
2727
, no_ack = false :: boolean() %% set to true for fire-n-forget requests
2828
, msg :: iodata() | kpro:struct()
2929
}).
3030

3131
-record(kpro_rsp,
32-
{ ref :: false | reference()
32+
{ ref :: false | kpro:req_ref()
3333
, api :: kpro:api()
3434
, vsn :: kpro:vsn()
3535
, msg :: binary() | kpro:struct()

src/kpro.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
, producer_id/0
119119
, protocol/0
120120
, req/0
121+
, req_ref/0
121122
, required_acks/0
122123
, rsp/0
123124
, schema/0
@@ -194,6 +195,7 @@
194195
| [{field_name(), field_value()}].
195196
-type api() :: atom().
196197
-type req() :: #kpro_req{}.
198+
-type req_ref() :: reference() | {alias, reference()}.
197199
-type rsp() :: #kpro_rsp{}.
198200
-type compress_option() :: ?no_compression
199201
| ?gzip

src/kpro_connection.erl

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ handle_msg({_, Sock, Bin}, #state{ sock = Sock
405405
{CorrId, Body} = kpro_lib:decode_corr_id(Bin),
406406
{Caller, Ref, API, Vsn} = kpro_sent_reqs:get_req(Requests, CorrId),
407407
Rsp = kpro_rsp_lib:decode(API, Vsn, Body, Ref),
408-
ok = cast(Caller, {msg, self(), Rsp}),
408+
ok = kafka_reply(Caller, {msg, self(), Rsp}),
409409
NewRequests = kpro_sent_reqs:del(Requests, CorrId),
410410
State1 = maybe_flush_backlog(State#state{requests = NewRequests}),
411411
?MODULE:loop(State1, Debug);
@@ -534,13 +534,12 @@ sasl_authenticate(#state{client_id = ClientId, mod = Mod, sock = Sock, remote =
534534
ok = setopts(Sock, Mod, [{active, once}]),
535535
State.
536536

537-
cast(Pid, Msg) ->
538-
try
539-
Pid ! Msg,
540-
ok
541-
catch _ : _ ->
542-
ok
543-
end.
537+
kafka_reply(_CallerPid, {msg, _SelfPid, #kpro_rsp{ref = {alias, Ref}}} = Msg) ->
538+
erlang:send(Ref, Msg),
539+
ok;
540+
kafka_reply(Pid, Msg) when is_pid(Pid) ->
541+
_ = erlang:send(Pid, Msg),
542+
ok.
544543

545544
system_continue(_Parent, Debug, State) ->
546545
?MODULE:loop(State, Debug).

src/kpro_req_lib.erl

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
-export([ produce/4
3030
, produce/5
31+
, produce/6
3132
]).
3233

3334
-export([ metadata/2
@@ -51,6 +52,7 @@
5152

5253
-export([ encode/3
5354
, make/3
55+
, make/4
5456
]).
5557

5658
-export_type([ fetch_opts/0
@@ -250,17 +252,20 @@ fetch(Vsn, Topic, Partition, Offset, Opts) ->
250252
make(fetch, Vsn, Fields).
251253

252254
%% @doc Help function to construct a non-transactional produce request.
253-
%% `Batch' arg can be be a `[map()]' like `[#{key => Key, value => Value, ts => Ts}]'.
254-
%% Current system time will be taken if `ts' is missing in batch input.
255-
%% It may also be `binary()' or `{magic_v2, Bytes, iolist()}' if user choose to encode
256-
%% a batch beforehand which could be helpful when a large batch can be encoded
257-
%% in other processes.
255+
%% @see produce/6.
258256
-spec produce(vsn(), topic(), partition(),
259257
binary() | batch_input() | {magic_v2, non_neg_integer(), iolist()}) -> req().
260258
produce(Vsn, Topic, Partition, Batch) ->
261259
produce(Vsn, Topic, Partition, Batch, #{}).
262260

263261
%% @doc Help function to construct a produce request.
262+
%% @see produce/6.
263+
-spec produce(vsn(), topic(), partition(),
264+
binary() | batch_input() | {magic_v2, non_neg_integer(), iolist()}, produce_opts()) -> req().
265+
produce(Vsn, Topic, Partition, Batch, Opts) when is_map(Opts) ->
266+
produce(Vsn, Topic, Partition, Batch, Opts, make_ref()).
267+
268+
%% @doc Help function to construct a produce request with a specified request reference.
264269
%% By default, it constructs a non-transactional produce request.
265270
%% `Batch' arg can be be a `[map()]' like `[#{key => Key, value => Value, ts => Ts}]'.
266271
%% Current system time will be taken if `ts' is missing in batch input.
@@ -274,8 +279,8 @@ produce(Vsn, Topic, Partition, Batch) ->
274279
%% monotonically increasing, with one sequence number per topic-partition.
275280
%% - `txn_ctx' (which is of spec `kpro:txn_ctx()') must exist in `Opts'
276281
-spec produce(vsn(), topic(), partition(),
277-
binary() | batch_input() | {magic_v2, non_neg_integer(), iolist()}, produce_opts()) -> req().
278-
produce(Vsn, Topic, Partition, Batch, Opts) ->
282+
binary() | batch_input() | {magic_v2, non_neg_integer(), iolist()}, produce_opts(), kpro:req_ref()) -> req().
283+
produce(Vsn, Topic, Partition, Batch, Opts, Ref) ->
279284
ok = assert_known_api_and_vsn(produce, Vsn),
280285
RequiredAcks = required_acks(maps:get(required_acks, Opts, all_isr)),
281286
Compression = maps:get(compression, Opts, ?no_compression),
@@ -313,7 +318,7 @@ produce(Vsn, Topic, Partition, Batch, Opts) ->
313318
#kpro_req{ api = produce
314319
, vsn = Vsn
315320
, msg = Msg
316-
, ref = make_ref()
321+
, ref = Ref
317322
, no_ack = RequiredAcks =:= 0
318323
}.
319324

@@ -434,11 +439,17 @@ alter_configs(Vsn, Resources, Opts) ->
434439
%% @doc Help function to make a request body.
435440
-spec make(api(), vsn(), struct()) -> req().
436441
make(API, Vsn, Fields) ->
442+
make(API, Vsn, Fields, make_ref()).
443+
444+
%% @doc Help function to make a request body with provided reference or process alias.
445+
%% The refreence can be an alias to receive response this request.
446+
-spec make(api(), vsn(), struct(), reference()| {alias, reference()}) -> req().
447+
make(API, Vsn, Fields, Ref) ->
437448
ok = assert_known_api_and_vsn(API, Vsn),
438449
#kpro_req{ api = API
439450
, vsn = Vsn
440451
, msg = encode_struct(API, Vsn, Fields)
441-
, ref = make_ref()
452+
, ref = Ref
442453
}.
443454

444455
%% @doc Encode a request to bytes that can be sent on wire.

src/kpro_sent_reqs.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
-type corr_id() :: kpro:corr_id().
4141
-define(REQ(Caller, Ref, API, Vsn, Ts), {Caller, Ref, API, Vsn, Ts}).
42-
-type req() :: ?REQ(pid(), reference(), kpro:api(), kpro:vsn(), integer()).
42+
-type req() :: ?REQ(pid(), kpro:req_ref(), kpro:api(), kpro:vsn(), integer()).
4343

4444
-record(requests,
4545
{ corr_id = 0
@@ -61,7 +61,7 @@ is_empty(#requests{sent = Sent}) -> maps:size(Sent) == 0.
6161

6262
%% @doc Add a new request to sent collection.
6363
%% Return the last corrlation ID and the new collection.
64-
-spec add(requests(), pid(), reference(), kpro:api(), kpro:vsn()) ->
64+
-spec add(requests(), pid(), kpro:req_ref(), kpro:api(), kpro:vsn()) ->
6565
{corr_id(), requests()}.
6666
add(#requests{ corr_id = CorrId
6767
, sent = Sent
@@ -82,7 +82,7 @@ del(#requests{sent = Sent} = Requests, CorrId) ->
8282
%% @doc Get caller of a request having the given correlation ID.
8383
%% Crash if the request is not found.
8484
-spec get_req(requests(), corr_id()) ->
85-
{pid(), reference(), kpro:api(), kpro:vsn()}.
85+
{pid(), kpro:req_ref(), kpro:api(), kpro:vsn()}.
8686
get_req(#requests{sent = Sent}, CorrId) ->
8787
?REQ(Caller, Ref, API, Vsn, _Ts) = maps:get(CorrId, Sent),
8888
{Caller, Ref, API, Vsn}.

test/kpro_produce_tests.erl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,29 @@ async_send_test() ->
171171
receive Msg -> erlang:throw({unexpected, Msg}) after 10 -> ok end
172172
end).
173173

174+
%% async send with alias test
175+
async_send_alias_test() ->
176+
{_, Vsn} = get_api_vsn_range(),
177+
Batch1 = [#{ts => kpro_lib:now_ts(), value => make_value(?LINE)}],
178+
Batch2 = [#{ts => kpro_lib:now_ts(), value => make_value(?LINE)}],
179+
Ref1 = {alias, erlang:alias([reply])},
180+
Req1 = kpro_req_lib:produce(Vsn, topic(), ?PARTI, Batch1, #{}, Ref1),
181+
Ref2 = {alias, erlang:alias([reply])},
182+
Req2 = kpro_req_lib:produce(Vsn, topic(), ?PARTI, Batch2, #{}, Ref2),
183+
with_connection(
184+
fun(Pid) ->
185+
{_, MRef} = spawn_monitor(fun() -> kpro:send(Pid, Req1) end),
186+
receive {'DOWN', MRef, _, _, _} -> ok end,
187+
_ = spawn(fun() -> kpro:request_async(Pid, Req2) end),
188+
Assert = fun(Req, Rsp) ->
189+
?ASSERT_RESPONSE_NO_ERROR(Vsn, Rsp),
190+
?assertEqual(Req#kpro_req.ref, Rsp#kpro_rsp.ref)
191+
end,
192+
receive {msg, Pid, Rsp1} -> Assert(Req1, Rsp1) end,
193+
receive {msg, Pid, Rsp2} -> Assert(Req2, Rsp2) end,
194+
receive Msg -> erlang:throw({unexpected, Msg}) after 10 -> ok end
195+
end).
196+
174197
make_req(Vsn) ->
175198
Batch = make_batch(Vsn),
176199
kpro_req_lib:produce(Vsn, topic(), ?PARTI, Batch).

0 commit comments

Comments
 (0)