Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3df4ce8
Provide syncer session call back handlers that can be intercepted
mjfh Jun 2, 2025
e204951
Provide tracer framework with intercepting syncer session handlers
mjfh Jul 23, 2025
c4066a0
Provide command line tracer tool
mjfh Aug 28, 2025
609c0bf
Provide capture inspection framework, part of replay
mjfh Jun 6, 2025
9005334
Provide command line capture inspection tool
mjfh Aug 28, 2025
5446e81
Update replay framework for full capture replay
mjfh Sep 2, 2025
0f6abba
Provide command line capture replay tool
mjfh Aug 28, 2025
effddef
Fix copyright years
mjfh Sep 8, 2025
d796adc
Merge branch 'master' into Beacon-sync-trace-replay-test-tools
mjfh Sep 16, 2025
a460c9a
Merge branch 'master' into Beacon-sync-trace-replay-test-tools
mjfh Sep 22, 2025
765d0ee
Use `confutils` for command line options management
mjfh Sep 9, 2025
4fceca7
Bumb nim-zlib
mjfh Sep 10, 2025
2c02803
Using gunzip from updated `nim-zlib` package
mjfh Sep 9, 2025
aa21670
Refactor capture records for JSON format read/write
mjfh Sep 10, 2025
9c8b527
Simplify/prettify code
mjfh Sep 12, 2025
0d154f0
Replace bit-mask controlled optional capture fields by `Opt[]` struct…
mjfh Sep 16, 2025
457cfab
Re-factor `ReplayRef` inheritable by `ReplayRunnerRef`
mjfh Sep 18, 2025
9c25749
Remove `ReplayMsgRef` types and use `ReplayPayloadRef` instead
mjfh Sep 17, 2025
0ef32ac
Merge branch 'master' into Beacon-sync-trace-replay-test-tools
mjfh Sep 23, 2025
c371563
Adjust to changed main API
mjfh Sep 29, 2025
70f7e33
Docu for setting up network tunnel for testing
mjfh Sep 30, 2025
b26204e
Merge branch 'master' into Beacon-sync-trace-replay-test-tools
mjfh Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,19 @@ evmstate_test: | build deps evmstate
txparse: | build deps
$(ENV_SCRIPT) nim c $(NIM_PARAMS) "tools/txparse/[email protected]"

# build syncer debugging and analysis tools
SYNCER_TOOLS_DIR := tools/syncer
SYNCER_TOOLS := $(foreach name,trace inspect replay,syncer_test_client_$(name))
.PHONY: syncer-tools syncer-tools-clean $(SYNCER_TOOLS)
syncer-tools: $(SYNCER_TOOLS)
syncer-tools-clean:
rm -f $(foreach exe,$(SYNCER_TOOLS),build/$(exe))
$(SYNCER_TOOLS): | build deps rocksdb
echo -e $(BUILD_MSG) "build/$@"
$(ENV_SCRIPT) nim c $(NIM_PARAMS) -o:build/$@ "$(SYNCER_TOOLS_DIR)/[email protected]"

# usual cleaning
clean: | clean-common
clean: | clean-common syncer-tools-clean
rm -rf build/{nimbus,nimbus_execution_client,nimbus_portal_client,fluffy,portal_bridge,libverifproxy,nimbus_verified_proxy,$(TOOLS_CSV),$(PORTAL_TOOLS_CSV),all_tests,test_kvstore_rocksdb,test_rpc,all_portal_tests,all_history_network_custom_chain_tests,test_portal_testnet,utp_test_app,utp_test,*.dSYM}
rm -rf tools/t8n/{t8n,t8n_test}
rm -rf tools/evmstate/{evmstate,evmstate_test}
Expand Down
76 changes: 66 additions & 10 deletions execution_chain/sync/beacon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import
pkg/stew/[interval_set, sorted_set],
../core/chain,
../networking/p2p,
./beacon/worker/headers/headers_target,
./beacon/[beacon_desc, worker],
./beacon/worker/blocks/[blocks_fetch, blocks_import],
./beacon/worker/headers/[headers_fetch, headers_target],
./beacon/worker/update,
./[sync_sched, wire_protocol]

export
Expand All @@ -25,33 +27,62 @@ export
logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Interceptable handlers
# ------------------------------------------------------------------------------

proc schedDaemonCB(
ctx: BeaconCtxRef;
): Future[Duration]
{.async: (raises: []).} =
return worker.runDaemon(ctx, "RunDaemon") # async/template

proc schedStartCB(buddy: BeaconBuddyRef): bool =
return worker.start(buddy, "RunStart")

proc schedStopCB(buddy: BeaconBuddyRef) =
worker.stop(buddy, "RunStop")

proc schedPoolCB(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
return worker.runPool(buddy, last, laps, "RunPool")

proc schedPeerCB(
buddy: BeaconBuddyRef;
): Future[Duration]
{.async: (raises: []).} =
return worker.runPeer(buddy, "RunPeer") # async/template

proc noOpFn(buddy: BeaconBuddyRef) = discard
proc noOpEx(self: BeaconHandlersSyncRef) = discard

# ------------------------------------------------------------------------------
# Virtual methods/interface, `mixin` functions
# ------------------------------------------------------------------------------

proc runSetup(ctx: BeaconCtxRef): bool =
worker.setup(ctx, "RunSetup")
return worker.setup(ctx, "RunSetup")

proc runRelease(ctx: BeaconCtxRef) =
worker.release(ctx, "RunRelease")

proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return worker.runDaemon(ctx, "RunDaemon")

proc runTicker(ctx: BeaconCtxRef) =
worker.runTicker(ctx, "RunTicker")


proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return await ctx.handler.schedDaemon(ctx)

proc runStart(buddy: BeaconBuddyRef): bool =
worker.start(buddy, "RunStart")
return buddy.ctx.handler.schedStart(buddy)

proc runStop(buddy: BeaconBuddyRef) =
worker.stop(buddy, "RunStop")
buddy.ctx.handler.schedStop(buddy)

proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
worker.runPool(buddy, last, laps, "RunPool")
return buddy.ctx.handler.schedPool(buddy, last, laps)

proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
return worker.runPeer(buddy, "RunPeer")
return await buddy.ctx.handler.schedPeer(buddy)

# ------------------------------------------------------------------------------
# Public functions
Expand Down Expand Up @@ -83,6 +114,25 @@ proc config*(
desc.initSync(ethNode, maxPeers)
desc.ctx.pool.chain = chain

# Set up handlers so they can be overlayed
desc.ctx.pool.handlers = BeaconHandlersSyncRef(
version: 0,
activate: updateActivateCB,
suspend: updateSuspendCB,
schedDaemon: schedDaemonCB,
schedStart: schedStartCB,
schedStop: schedStopCB,
schedPool: schedPoolCB,
schedPeer: schedPeerCB,
getBlockHeaders: getBlockHeadersCB,
syncBlockHeaders: noOpFn,
getBlockBodies: getBlockBodiesCB,
syncBlockBodies: noOpFn,
importBlock: importBlockCB,
syncImportBlock: noOpFn,
startSync: noOpEx,
stopSync: noOpEx)

if not desc.lazyConfigHook.isNil:
desc.lazyConfigHook(desc)
desc.lazyConfigHook = nil
Expand All @@ -99,10 +149,16 @@ proc configTarget*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =

proc start*(desc: BeaconSyncRef): bool =
doAssert not desc.ctx.isNil
desc.startSync()
if desc.startSync():
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
w.startSync(w)
return true
# false

proc stop*(desc: BeaconSyncRef) {.async.} =
doAssert not desc.ctx.isNil
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
w.stopSync(w)
await desc.stopSync()

# ------------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions execution_chain/sync/beacon/beacon_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ type
## Instance descriptor, extends scheduler object
lazyConfigHook*: BeaconSyncConfigHook

BeaconHandlersSyncRef* = ref object of BeaconHandlersRef
## Add start/stop helpers to function list. By default, this functiona
## are no-ops.
startSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}
stopSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}

# End
19 changes: 17 additions & 2 deletions execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,27 @@ import
../../../../networking/p2p,
../../../wire_protocol/types,
../[update, worker_desc],
./[blocks_fetch, blocks_helpers, blocks_import, blocks_unproc]
./[blocks_fetch, blocks_helpers, blocks_unproc]

# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------

template importBlock(
buddy: BeaconBuddyRef;
blk: EthBlock;
effPeerID: Hash;
): Result[Duration,BeaconError] =
## Async/template
##
## Wrapper around `importBlock()` handler
##
let
ctx = buddy.ctx
rc = await ctx.handler.importBlock(buddy, blk, effPeerID)
ctx.handler.syncImportBlock(buddy) # debugging, trace, replay
rc

proc getNthHash(ctx: BeaconCtxRef; blocks: seq[EthBlock]; n: int): Hash32 =
ctx.hdrCache.getHash(blocks[n].header.number).valueOr:
return zeroHash32
Expand Down Expand Up @@ -201,7 +216,7 @@ template blocksImport*(

for n in 0 ..< blocks.len:
let nBn = blocks[n].header.number
discard (await buddy.importBlock(blocks[n], peerID)).valueOr:
buddy.importBlock(blocks[n], peerID).isOkOr:
if error.excp != ECancelledError:
isError = true

Expand Down
25 changes: 22 additions & 3 deletions execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,30 @@ import
../worker_desc,
./blocks_helpers

logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Private helper
# -----------------------------------------------------------------------------

template getBlockBodies(
buddy: BeaconBuddyRef;
req: BlockBodiesRequest;
): Result[FetchBodiesData,BeaconError] =
## Async/template
##
## Wrapper around `getBlockBodies()` handler
##
let rc = await buddy.ctx.handler.getBlockBodies(buddy, req)
buddy.ctx.handler.syncBlockBodies(buddy) # debugging, sync, replay
rc

# ------------------------------------------------------------------------------
# Private helpers
# Public handler
# -----------------------------------------------------------------------------

proc getBlockBodies(
proc getBlockBodiesCB*(
buddy: BeaconBuddyRef;
req: BlockBodiesRequest;
): Future[Result[FetchBodiesData,BeaconError]]
Expand Down Expand Up @@ -70,7 +89,7 @@ template fetchBodies*(
trace trEthSendSendingGetBlockBodies,
peer, nReq, bdyErrors=buddy.bdyErrors

let rc = await buddy.getBlockBodies(request)
let rc = buddy.getBlockBodies(request)
var elapsed: Duration
if rc.isOk:
elapsed = rc.value.elapsed
Expand Down
7 changes: 5 additions & 2 deletions execution_chain/sync/beacon/worker/blocks/blocks_import.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import
../../../wire_protocol,
../worker_desc

logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Public function
# Public handler
# ------------------------------------------------------------------------------

proc importBlock*(
proc importBlockCB*(
buddy: BeaconBuddyRef;
blk: EthBlock;
effPeerID: Hash;
Expand Down
23 changes: 21 additions & 2 deletions execution_chain/sync/beacon/worker/headers/headers_fetch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,30 @@ import
../worker_desc,
./headers_helpers

logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Private helpers
# -----------------------------------------------------------------------------

template getBlockHeaders(
buddy: BeaconBuddyRef;
req: BlockHeadersRequest;
): Result[FetchHeadersData,BeaconError] =
## Async/template
##
## Wrapper around `getBlockHeaders()` handler
##
let rc = await buddy.ctx.handler.getBlockHeaders(buddy, req)
buddy.ctx.handler.syncBlockHeaders(buddy) # debugging, sync, replay
rc

# ------------------------------------------------------------------------------
# Public handler
# ------------------------------------------------------------------------------

proc getBlockHeaders(
proc getBlockHeadersCB*(
buddy: BeaconBuddyRef;
req: BlockHeadersRequest;
): Future[Result[FetchHeadersData,BeaconError]]
Expand Down Expand Up @@ -88,7 +107,7 @@ template fetchHeadersReversed*(
trace trEthSendSendingGetBlockHeaders & " reverse", peer, req=ivReq,
nReq=req.maxResults, hash=topHash.toStr, hdrErrors=buddy.hdrErrors

let rc = await buddy.getBlockHeaders(req)
let rc = buddy.getBlockHeaders(req)
var elapsed: Duration
if rc.isOk:
elapsed = rc.value.elapsed
Expand Down
6 changes: 3 additions & 3 deletions execution_chain/sync/beacon/worker/start_stop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
pkg/[chronicles, chronos, eth/common, metrics],
../../../networking/p2p,
../../wire_protocol,
./[blocks, headers, update, worker_desc]
./[blocks, headers, worker_desc]

type
SyncStateData = tuple
Expand Down Expand Up @@ -59,8 +59,8 @@ proc setupServices*(ctx: BeaconCtxRef; info: static[string]) =

# Set up the notifier informing when a new syncer session has started.
ctx.hdrCache.start proc() =
# Activates the syncer. Work will be picked up by peers when available.
ctx.updateActivateSyncer()
# This directive captures `ctx` for calling the activation handler.
ctx.handler.activate(ctx)

# Provide progress info call back handler
ctx.pool.chain.com.beaconSyncerProgress = proc(): SyncStateData =
Expand Down
39 changes: 20 additions & 19 deletions execution_chain/sync/beacon/worker/update.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,6 @@ declareGauge nec_sync_head, "" &
# Private functions, state handler helpers
# ------------------------------------------------------------------------------

proc updateSuspendSyncer(ctx: BeaconCtxRef) =
## Clean up sync target buckets, stop syncer activity, and and get ready
## for awaiting a new request from the `CL`.
##
ctx.hdrCache.clear()

ctx.pool.failedPeers.clear()
ctx.pool.seenData = false

ctx.hibernate = true

metrics.set(nec_sync_last_block_imported, 0)
metrics.set(nec_sync_head, 0)

info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies

proc commitCollectHeaders(ctx: BeaconCtxRef; info: static[string]): bool =
## Link header chain into `FC` module. Gets ready for block import.
##
Expand Down Expand Up @@ -227,7 +210,7 @@ proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) =

# Final sync scrum layout reached or inconsistent/impossible state
if newState == idle:
ctx.updateSuspendSyncer()
ctx.handler.suspend(ctx)


proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
Expand All @@ -238,7 +221,7 @@ proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
# Public functions, call-back handler ready
# ------------------------------------------------------------------------------

proc updateActivateSyncer*(ctx: BeaconCtxRef) =
proc updateActivateCB*(ctx: BeaconCtxRef) =
## If in hibernate mode, accept a cache session and activate syncer
##
if ctx.hibernate and # only in idle mode
Expand Down Expand Up @@ -277,6 +260,24 @@ proc updateActivateSyncer*(ctx: BeaconCtxRef) =
head=ctx.chain.latestNumber.bnStr, state=ctx.hdrCache.state,
initTarget=ctx.pool.initTarget.isSome(), nSyncPeers=ctx.pool.nBuddies


proc updateSuspendCB*(ctx: BeaconCtxRef) =
## Clean up sync target buckets, stop syncer activity, and and get ready
## for a new sync request from the `CL`.
##
ctx.hdrCache.clear()

ctx.pool.failedPeers.clear()
ctx.pool.seenData = false

ctx.hibernate = true

metrics.set(nec_sync_last_block_imported, 0)
metrics.set(nec_sync_head, 0)

info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies

# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------
Loading