Skip to content

Commit 4b6ab7f

Browse files
refactor: remove IncomingRequests GenServer (#1195)
Co-authored-by: Tomás Arjovsky <[email protected]>
1 parent 2bfe826 commit 4b6ab7f

File tree

10 files changed

+100
-164
lines changed

10 files changed

+100
-164
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
4242
[
4343
{LambdaEthereumConsensus.Beacon.Clock, {store.genesis_time, time}},
4444
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
45-
LambdaEthereumConsensus.P2P.IncomingRequests,
4645
LambdaEthereumConsensus.Beacon.SyncBlocks,
4746
{Task.Supervisor, name: PruneStatesSupervisor},
4847
{Task.Supervisor, name: PruneBlocksSupervisor},
@@ -83,7 +82,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
8382
enable_discovery: true,
8483
discovery_addr: "0.0.0.0:#{port}",
8584
bootnodes: bootnodes,
86-
join_init_topics: true
85+
join_init_topics: true,
86+
enable_request_handlers: true
8787
]
8888
end
8989

lib/lambda_ethereum_consensus/p2p/incoming_requests/incoming_requests.ex

-22
This file was deleted.

lib/lambda_ethereum_consensus/p2p/incoming_requests/receiver.ex

-58
This file was deleted.

lib/lambda_ethereum_consensus/p2p/incoming_requests/handler.ex renamed to lib/lambda_ethereum_consensus/p2p/incoming_requests_handler.ex

+37-22
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,71 @@
1-
defmodule LambdaEthereumConsensus.P2P.IncomingRequests.Handler do
1+
defmodule LambdaEthereumConsensus.P2P.IncomingRequestsHandler do
22
@moduledoc """
33
This module handles Req/Resp domain requests.
44
"""
5-
require Logger
65

76
alias LambdaEthereumConsensus.ForkChoice
8-
alias LambdaEthereumConsensus.Libp2pPort
97
alias LambdaEthereumConsensus.P2P.Metadata
108
alias LambdaEthereumConsensus.P2P.ReqResp
119
alias LambdaEthereumConsensus.Store.BlockDb
1210
alias LambdaEthereumConsensus.Store.Blocks
1311

1412
require Logger
1513

16-
@spec handle(String.t(), String.t(), binary()) :: any()
17-
def handle(name, message_id, message) do
18-
case handle_req(name, message_id, message) do
19-
:ok -> :ok
20-
{:error, error} -> Logger.error("[#{name}] Request error: #{inspect(error)}")
14+
@request_prefix "/eth2/beacon_chain/req/"
15+
@request_names [
16+
"status/1",
17+
"goodbye/1",
18+
"ping/1",
19+
"beacon_blocks_by_range/2",
20+
"beacon_blocks_by_root/2",
21+
"metadata/2"
22+
]
23+
24+
@spec protocol_ids() :: list(String.t())
25+
def protocol_ids() do
26+
@request_names |> Enum.map(&Enum.join([@request_prefix, &1, "/ssz_snappy"]))
27+
end
28+
29+
@spec handle(String.t(), String.t(), binary()) :: {:ok, any()} | {:error, String.t()}
30+
def handle(@request_prefix <> name, message_id, message) do
31+
Logger.debug("'#{name}' request received")
32+
33+
result =
34+
:telemetry.span([:port, :request], %{}, fn ->
35+
{handle_req(name, message_id, message), %{module: "handler", request: inspect(name)}}
36+
end)
37+
38+
case result do
39+
{:error, error} -> {:error, "[#{name}] Request error: #{inspect(error)}"}
40+
result -> result
2141
end
2242
end
2343

2444
@spec handle_req(String.t(), String.t(), binary()) ::
25-
:ok | {:error, String.t()}
45+
{:ok, any()} | {:error, String.t()}
2646
defp handle_req(protocol_name, message_id, message)
2747

2848
defp handle_req("status/1/ssz_snappy", message_id, message) do
2949
with {:ok, request} <- ReqResp.decode_request(message, Types.StatusMessage) do
3050
Logger.debug("[Status] '#{inspect(request)}'")
3151
payload = ForkChoice.get_current_status_message() |> ReqResp.encode_ok()
32-
Libp2pPort.send_response(message_id, payload)
52+
{:ok, {message_id, payload}}
3353
end
3454
end
3555

3656
defp handle_req("goodbye/1/ssz_snappy", _, "") do
3757
# ignore empty messages
38-
Logger.debug("[Goodbye] empty message")
58+
{:error, "Empty message"}
3959
end
4060

4161
defp handle_req("goodbye/1/ssz_snappy", message_id, message) do
4262
case ReqResp.decode_request(message, TypeAliases.uint64()) do
4363
{:ok, goodbye_reason} ->
4464
Logger.debug("[Goodbye] reason: #{goodbye_reason}")
4565
payload = ReqResp.encode_ok({0, TypeAliases.uint64()})
46-
Libp2pPort.send_response(message_id, payload)
66+
{:ok, {message_id, payload}}
4767

4868
# Ignore read errors, since some peers eagerly disconnect.
49-
{:error, "failed to read"} ->
50-
Logger.debug("[Goodbye] failed to read")
51-
:ok
52-
5369
err ->
5470
err
5571
end
@@ -61,14 +77,14 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequests.Handler do
6177
Logger.debug("[Ping] seq_number: #{seq_num}")
6278
seq_number = Metadata.get_seq_number()
6379
payload = ReqResp.encode_ok({seq_number, TypeAliases.uint64()})
64-
Libp2pPort.send_response(message_id, payload)
80+
{:ok, {message_id, payload}}
6581
end
6682
end
6783

6884
defp handle_req("metadata/2/ssz_snappy", message_id, _message) do
6985
# NOTE: there's no request content so we just ignore it
7086
payload = Metadata.get_metadata() |> ReqResp.encode_ok()
71-
Libp2pPort.send_response(message_id, payload)
87+
{:ok, {message_id, payload}}
7288
end
7389

7490
defp handle_req("beacon_blocks_by_range/2/ssz_snappy", message_id, message) do
@@ -89,7 +105,7 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequests.Handler do
89105
|> Enum.reject(&(&1 == :skip))
90106
|> ReqResp.encode_response()
91107

92-
Libp2pPort.send_response(message_id, response_chunk)
108+
{:ok, {message_id, response_chunk}}
93109
end
94110
end
95111

@@ -108,14 +124,13 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequests.Handler do
108124
|> Enum.reject(&(&1 == :skip))
109125
|> ReqResp.encode_response()
110126

111-
Libp2pPort.send_response(message_id, response_chunk)
127+
{:ok, {message_id, response_chunk}}
112128
end
113129
end
114130

115131
defp handle_req(protocol, _message_id, _message) do
116132
# This should never happen, since Libp2p only accepts registered protocols
117-
Logger.error("Unsupported protocol: #{protocol}")
118-
:ok
133+
{:error, "Unsupported protocol: #{protocol}"}
119134
end
120135

121136
defp map_block_result(:not_found), do: map_block_result(nil)

lib/lambda_ethereum_consensus/telemetry.ex

+8
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ defmodule LambdaEthereumConsensus.Telemetry do
8585
counter("network.pubsub_topics_un_deliverable_message.count", tags: [:topic]),
8686
counter("network.pubsub_topics_validate_message.count", tags: [:topic]),
8787
counter("port.message.count", tags: [:function, :direction]),
88+
last_value("port.request.stop.duration",
89+
unit: {:native, :millisecond},
90+
tags: [:module, :request]
91+
),
92+
last_value("port.request.exception.duration",
93+
unit: {:native, :millisecond},
94+
tags: [:module, :request]
95+
),
8896
sum("network.request.blocks", tags: [:result, :type, :reason]),
8997

9098
# Sync metrics

lib/libp2p_port.ex

+35-31
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
1515
alias LambdaEthereumConsensus.P2P.Gossip.BeaconBlock
1616
alias LambdaEthereumConsensus.P2P.Gossip.BlobSideCar
1717
alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
18+
alias LambdaEthereumConsensus.P2P.IncomingRequestsHandler
1819
alias LambdaEthereumConsensus.P2P.Peerbook
1920
alias LambdaEthereumConsensus.P2p.Requests
2021
alias LambdaEthereumConsensus.StateTransition.Misc
@@ -62,6 +63,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
6263
| {:discovery_addr, String.t()}
6364
| {:bootnodes, [String.t()]}
6465
| {:join_init_topics, boolean()}
66+
| {:enable_request_handlers, boolean()}
6567

6668
@type node_identity() :: %{
6769
peer_id: binary(),
@@ -116,15 +118,15 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
116118
|> Map.take([:peer_id, :pretty_peer_id, :enr, :p2p_addresses, :discovery_addresses])
117119
end
118120

119-
@doc """
120-
Sets a Req/Resp handler for the given protocol ID. After this call,
121-
peer requests are sent to the current process' mailbox. To handle them,
122-
use `handle_request/0`.
123-
"""
124-
@spec set_handler(GenServer.server(), String.t()) :: :ok | {:error, String.t()}
125-
def set_handler(pid \\ __MODULE__, protocol_id) do
121+
# Sets libp2pport as the Req/Resp handler for the given protocol ID.
122+
@spec set_handler(String.t(), port()) :: boolean()
123+
defp set_handler(protocol_id, port) do
126124
:telemetry.execute([:port, :message], %{}, %{function: "set_handler", direction: "elixir->"})
127-
call_command(pid, {:set_handler, %SetHandler{protocol_id: protocol_id}})
125+
126+
c = {:set_handler, %SetHandler{protocol_id: protocol_id}}
127+
data = Command.encode(%Command{c: c})
128+
129+
send_data(port, data)
128130
end
129131

130132
@doc """
@@ -166,27 +168,15 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
166168
GenServer.cast(pid, {:send_request, peer_id, protocol_id, message, handler})
167169
end
168170

169-
@doc """
170-
Returns the next request received by the server for registered handlers
171-
on the current process. If there are no requests, it waits for one.
172-
"""
173-
@spec handle_request() :: {String.t(), String.t(), binary()}
174-
def handle_request() do
175-
receive do
176-
{:request, {_protocol_id, _message_id, _message} = request} -> request
177-
end
178-
end
179-
180-
@doc """
181-
Sends a response for the request with the given message ID.
182-
"""
183-
@spec send_response(GenServer.server(), String.t(), binary()) ::
184-
:ok | {:error, String.t()}
185-
def send_response(pid \\ __MODULE__, request_id, response) do
171+
# Sends a response for the request with the given message ID.
172+
@spec send_response({String.t(), binary()}, port()) :: boolean()
173+
defp send_response({request_id, response}, port) do
186174
:telemetry.execute([:port, :message], %{}, %{function: "send_response", direction: "elixir->"})
187175

188-
c = %SendResponse{request_id: request_id, message: response}
189-
call_command(pid, {:send_response, c})
176+
c = {:send_response, %SendResponse{request_id: request_id, message: response}}
177+
data = Command.encode(%Command{c: c})
178+
179+
send_data(port, data)
190180
end
191181

192182
@doc """
@@ -299,6 +289,12 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
299289
OperationsCollector.init()
300290
end
301291

292+
@spec enable_request_handlers(port()) :: :ok | {:error, String.t()}
293+
defp enable_request_handlers(port) do
294+
IncomingRequestsHandler.protocol_ids()
295+
|> Enum.each(fn protocol_id -> set_handler(protocol_id, port) end)
296+
end
297+
302298
def add_block(pid \\ __MODULE__, block), do: GenServer.cast(pid, {:add_block, block})
303299

304300
########################
@@ -308,6 +304,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
308304
@impl GenServer
309305
def init(args) do
310306
{join_init_topics, args} = Keyword.pop(args, :join_init_topics, false)
307+
{enable_request_handlers, args} = Keyword.pop(args, :enable_request_handlers, false)
311308

312309
port = Port.open({:spawn, @port_name}, [:binary, {:packet, 4}, :exit_status])
313310

@@ -319,6 +316,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
319316
|> then(&send_data(port, &1))
320317

321318
if join_init_topics, do: join_init_topics(port)
319+
if enable_request_handlers, do: enable_request_handlers(port)
322320

323321
Peerbook.init()
324322

@@ -412,15 +410,21 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
412410
defp handle_notification(
413411
%Request{
414412
protocol_id: protocol_id,
415-
handler: handler,
416413
request_id: request_id,
417414
message: message
418415
},
419-
state
416+
%{port: port} = state
420417
) do
421418
:telemetry.execute([:port, :message], %{}, %{function: "request", direction: "->elixir"})
422-
handler_pid = :erlang.binary_to_term(handler)
423-
send(handler_pid, {:request, {protocol_id, request_id, message}})
419+
420+
case IncomingRequestsHandler.handle(protocol_id, request_id, message) do
421+
{:ok, response} ->
422+
send_response(response, port)
423+
424+
{:error, reason} ->
425+
Logger.error("[Libp2pPort] Error handling request. Reason: #{inspect(reason)}")
426+
end
427+
424428
state
425429
end
426430

native/libp2p_port/internal/proto_helpers/proto_helpers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ func NewPeerNotification(id []byte) proto_defs.Notification {
123123
return proto_defs.Notification{N: &proto_defs.Notification_NewPeer{NewPeer: newPeerNotification}}
124124
}
125125

126-
func RequestNotification(protocolId string, handler []byte, requestId string, message []byte) proto_defs.Notification {
127-
requestNotification := &proto_defs.Request{ProtocolId: []byte(protocolId), Handler: handler, RequestId: []byte(requestId), Message: message}
126+
func RequestNotification(protocolId string, requestId string, message []byte) proto_defs.Notification {
127+
requestNotification := &proto_defs.Request{ProtocolId: []byte(protocolId), RequestId: []byte(requestId), Message: message}
128128
return proto_defs.Notification{N: &proto_defs.Notification_Request{Request: requestNotification}}
129129
}
130130

0 commit comments

Comments
 (0)