Skip to content

Commit 3e2f78d

Browse files
refactor: remove genserver behavior from pending blocks (#1141)
Co-authored-by: Avila Gastón <[email protected]>
1 parent b4d14f1 commit 3e2f78d

24 files changed

+625
-330
lines changed

docs/architecture.md

+31-1
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,37 @@ Asynchronously, a new task is started to recompute the new head, as this takes a
264264

265265
## Request-Response
266266

267-
**TO DO**: document how ports work for this.
267+
Request-response is an on-demand protocol where a node asks for information directly to a peer and expects a response. This may be to request metadata that corresponds to that peer for discovery purposes, or to request information from the past that will not appear on when listening to gossip (useful for checkpoint sync).
268+
269+
It's implemented in the following way:
270+
271+
```mermaid
272+
sequenceDiagram
273+
274+
participant req as Requesting Process
275+
participant p2p as Libp2pPort
276+
participant gomain as go libp2p main
277+
participant goreq as request goroutine
278+
279+
req ->> req: send_request(peer_id, protocol_id, message)
280+
req ->> p2p: send_protobuf(from: self())
281+
activate p2p
282+
p2p ->> gomain: %Command{}
283+
deactivate p2p
284+
req ->>+ req: receive_response()
285+
286+
gomain ->> gomain: handle_command()
287+
gomain ->>+ goreq: go sendAsyncRequest()
288+
goreq ->>- p2p: SendNotification(%Result{from, response, err})
289+
290+
p2p ->>p2p: handle_notification(%Result{from: from})
291+
p2p ->> req: {:response, result}
292+
deactivate req
293+
```
294+
295+
Explained, a process that wants to request something from Libp2pPort sends a request with its own pid, which is then included in the Command payload. The request is handled asynchronously in the go side, and eventually, the pid is included in the response, and sent back to LibP2PPort, who now knows to which process it needs to be dispatched.
296+
297+
The specific kind of command (a request) is specified, but there's nothing identifying this is a response vs any other kind of result, or the specific kind of response (e.g. a block download vs a blob download). Currently the only way this is handled differentially is because the pid is waiting for a specific kind of response and for nothing else at a time.
268298

269299
### Checkpoint sync
270300

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
4343
{LambdaEthereumConsensus.Beacon.Clock, {store.genesis_time, time}},
4444
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
4545
LambdaEthereumConsensus.P2P.IncomingRequests,
46-
LambdaEthereumConsensus.Beacon.PendingBlocks,
4746
LambdaEthereumConsensus.Beacon.SyncBlocks,
4847
{Task.Supervisor, name: PruneStatesSupervisor},
4948
{Task.Supervisor, name: PruneBlocksSupervisor},

lib/lambda_ethereum_consensus/beacon/clock.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule LambdaEthereumConsensus.Beacon.Clock do
33

44
use GenServer
55

6-
alias LambdaEthereumConsensus.Beacon.PendingBlocks
6+
alias LambdaEthereumConsensus.Libp2pPort
77
alias LambdaEthereumConsensus.Validator.ValidatorManager
88

99
require Logger
@@ -50,7 +50,7 @@ defmodule LambdaEthereumConsensus.Beacon.Clock do
5050
new_state = %{state | time: time}
5151

5252
if time >= state.genesis_time do
53-
PendingBlocks.on_tick(time)
53+
Libp2pPort.on_tick(time)
5454
# TODO: reduce time between ticks to account for gnosis' 5s slot time.
5555
old_logical_time = compute_logical_time(state)
5656
new_logical_time = compute_logical_time(new_state)

lib/lambda_ethereum_consensus/beacon/pending_blocks.ex

+68-138
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
44
55
The main purpose of this module is making sure that a blocks parent is already in the fork choice. If it's not, it will request it to the block downloader.
66
"""
7-
8-
use GenServer
97
require Logger
108

119
alias LambdaEthereumConsensus.ForkChoice
1210
alias LambdaEthereumConsensus.P2P.BlobDownloader
13-
alias LambdaEthereumConsensus.P2P.BlockDownloader
1411
alias LambdaEthereumConsensus.Store.BlobDb
1512
alias LambdaEthereumConsensus.Store.Blocks
1613
alias Types.BlockInfo
@@ -22,137 +19,39 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
2219
| {nil, :invalid | :download}
2320
@type state :: nil
2421

25-
##########################
26-
### Public API
27-
##########################
22+
@doc """
23+
If the block is not present, it will be stored as pending.
2824
29-
def start_link(opts) do
30-
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
31-
end
25+
In case it's ready to be processed
26+
(the parent is present and already transitioned), then the block's state transition will be
27+
calculated, resulting in a new saved block.
28+
29+
If the new state enables older blocks that were pending to be processed, they will be processed
30+
immediately.
3231
32+
If blobs are missing, they will be requested.
33+
"""
3334
@spec add_block(SignedBeaconBlock.t()) :: :ok
3435
def add_block(signed_block) do
35-
GenServer.cast(__MODULE__, {:add_block, signed_block})
36-
end
37-
38-
@spec on_tick(Types.uint64()) :: :ok
39-
def on_tick(time) do
40-
GenServer.cast(__MODULE__, {:on_tick, time})
41-
end
42-
43-
##########################
44-
### GenServer Callbacks
45-
##########################
46-
47-
@impl true
48-
@spec init(any) :: {:ok, state()}
49-
def init(_opts) do
50-
schedule_blocks_processing()
51-
schedule_blocks_download()
52-
schedule_blobs_download()
36+
block_info = BlockInfo.from_block(signed_block)
5337

54-
{:ok, nil}
55-
end
38+
# If the block is new or was to be downloaded, we store it.
39+
loaded_block = Blocks.get_block_info(block_info.root)
5640

57-
@spec handle_cast(any(), state()) :: {:noreply, state()}
41+
if is_nil(loaded_block) or loaded_block.status == :download do
42+
missing_blobs = missing_blobs(block_info)
5843

59-
@impl true
60-
def handle_cast({:add_block, %SignedBeaconBlock{} = signed_block}, _state) do
61-
block_info = BlockInfo.from_block(signed_block)
44+
if Enum.empty?(missing_blobs) do
45+
Blocks.new_block_info(block_info)
46+
process_block_and_check_children(block_info)
47+
else
48+
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, 30)
6249

63-
# If already processing or processed, ignore it
64-
if not Blocks.has_block?(block_info.root) do
65-
if Enum.empty?(missing_blobs(block_info)) do
6650
block_info
67-
else
68-
block_info |> BlockInfo.change_status(:download_blobs)
51+
|> BlockInfo.change_status(:download_blobs)
52+
|> Blocks.new_block_info()
6953
end
70-
|> Blocks.new_block_info()
71-
end
72-
73-
{:noreply, nil}
74-
end
75-
76-
@impl true
77-
def handle_cast({:on_tick, time}, state) do
78-
ForkChoice.on_tick(time)
79-
{:noreply, state}
80-
end
81-
82-
@doc """
83-
Iterates through the pending blocks and adds them to the fork choice if their parent is already in the fork choice.
84-
"""
85-
@impl true
86-
@spec handle_info(atom(), state()) :: {:noreply, state()}
87-
def handle_info(:process_blocks, _state) do
88-
schedule_blocks_processing()
89-
process_blocks()
90-
{:noreply, nil}
91-
end
92-
93-
@impl true
94-
def handle_info(:download_blocks, _state) do
95-
case Blocks.get_blocks_with_status(:download) do
96-
{:ok, blocks_to_download} ->
97-
blocks_to_download
98-
|> Enum.take(16)
99-
|> Enum.map(& &1.root)
100-
|> BlockDownloader.request_blocks_by_root()
101-
|> case do
102-
{:ok, signed_blocks} ->
103-
signed_blocks
104-
105-
{:error, reason} ->
106-
Logger.debug("Block download failed: '#{reason}'")
107-
[]
108-
end
109-
|> Enum.each(fn signed_block ->
110-
signed_block
111-
|> BlockInfo.from_block(:download)
112-
|> Blocks.change_status(:download_blobs)
113-
end)
114-
115-
{:error, reason} ->
116-
Logger.error("[PendingBlocks] Failed to get blocks to download. Reason: #{reason}")
11754
end
118-
119-
schedule_blocks_download()
120-
{:noreply, nil}
121-
end
122-
123-
@impl true
124-
def handle_info(:download_blobs, _state) do
125-
case Blocks.get_blocks_with_status(:download_blobs) do
126-
{:ok, blocks_with_missing_blobs} ->
127-
blocks_with_blobs =
128-
blocks_with_missing_blobs
129-
|> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end)
130-
|> Enum.take(16)
131-
132-
blobs_to_download = Enum.flat_map(blocks_with_blobs, &missing_blobs/1)
133-
134-
downloaded_blobs =
135-
blobs_to_download
136-
|> BlobDownloader.request_blobs_by_root()
137-
|> case do
138-
{:ok, blobs} ->
139-
blobs
140-
141-
{:error, reason} ->
142-
Logger.debug("Blob download failed: '#{reason}'")
143-
[]
144-
end
145-
146-
Enum.each(downloaded_blobs, &BlobDb.store_blob/1)
147-
148-
# TODO: is it not possible that blobs were downloaded for one and not for another?
149-
if length(downloaded_blobs) == length(blobs_to_download) do
150-
Enum.each(blocks_with_blobs, &Blocks.change_status(&1, :pending))
151-
end
152-
end
153-
154-
schedule_blobs_download()
155-
{:noreply, nil}
15655
end
15756

15857
##########################
@@ -173,23 +72,37 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
17372
end
17473
end
17574

75+
# Processes a block. If it was transitioned or declared invalid, then process_blocks
76+
# is called to check if there's any children that can now be processed. This function
77+
# is only to be called when a new block is saved as pending, not when processing blocks
78+
# in batch, to avoid unneeded recursion.
79+
defp process_block_and_check_children(block_info) do
80+
if process_block(block_info) in [:transitioned, :invalid] do
81+
process_blocks()
82+
end
83+
end
84+
17685
defp process_block(block_info) do
86+
if block_info.status != :pending do
87+
Logger.error("Called process block for a block that's not ready: #{block_info}")
88+
end
89+
17790
parent_root = block_info.signed_block.message.parent_root
17891

17992
case Blocks.get_block_info(parent_root) do
18093
nil ->
18194
Blocks.add_block_to_download(parent_root)
95+
:download_pending
18296

18397
%BlockInfo{status: :invalid} ->
18498
Blocks.change_status(block_info, :invalid)
99+
:invalid
185100

186101
%BlockInfo{status: :transitioned} ->
187102
case ForkChoice.on_block(block_info) do
188103
:ok ->
189104
Blocks.change_status(block_info, :transitioned)
190-
191-
# Block is valid. We immediately check if we can process another block.
192-
# process_blocks()
105+
:transitioned
193106

194107
{:error, reason} ->
195108
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
@@ -198,13 +111,42 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
198111
)
199112

200113
Blocks.change_status(block_info, :invalid)
114+
:invalid
201115
end
202116

203117
_other ->
204118
:ok
205119
end
206120
end
207121

122+
defp process_blobs({:ok, blobs}), do: add_blobs(blobs)
123+
124+
defp process_blobs({:error, reason}) do
125+
Logger.error("Error downloading blobs: #{inspect(reason)}")
126+
127+
# We might want to declare a block invalid here.
128+
end
129+
130+
# To be used when a series of blobs are downloaded. Stores each blob.
131+
# If there are blocks that can be processed, does so immediately.
132+
defp add_blobs(blobs) do
133+
Enum.map(blobs, fn blob ->
134+
BlobDb.store_blob(blob)
135+
Ssz.hash_tree_root!(blob.signed_block_header.message)
136+
end)
137+
|> Enum.uniq()
138+
|> Enum.each(fn root ->
139+
with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do
140+
# TODO: add a new missing blobs call if some blobs are still missing for a block.
141+
if Enum.empty?(missing_blobs(block_info)) do
142+
block_info
143+
|> Blocks.change_status(:pending)
144+
|> process_block_and_check_children()
145+
end
146+
end
147+
end)
148+
end
149+
208150
@spec missing_blobs(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
209151
defp missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do
210152
signed_block.message.body.blob_kzg_commitments
@@ -222,16 +164,4 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
222164
true
223165
end
224166
end
225-
226-
defp schedule_blocks_processing() do
227-
Process.send_after(__MODULE__, :process_blocks, 500)
228-
end
229-
230-
defp schedule_blobs_download() do
231-
Process.send_after(__MODULE__, :download_blobs, 500)
232-
end
233-
234-
defp schedule_blocks_download() do
235-
Process.send_after(__MODULE__, :download_blocks, 1000)
236-
end
237167
end

0 commit comments

Comments
 (0)