Skip to content

Commit fe162f0

Browse files
mjfhjordan
andauthored
Beacon sync update peer connection mgmnt (#3776)
* Update error register names why More intuitive names for assigned error types * Cosmetics, update logging * Update `worker` module (with basic tasks managed by scheduler) why Explicitly check for block headers or blocks to unstage queued block headers or blocks. Not explicitly unstaging might lead to unnecessary (maybe small) delays when ending the header or block sync phase. * Classify syncer peers and select `faster` ones for downloading why When there are more than enough peers available to fetch data simultaneously, only those are selected that are unknown or have shown higher throughput when fetching. * Relax fetch response timeouts and error threshold why There is no need to zombify sync peers unless they deliver ostensibly bogus data. With zero response, only throughput statistics will be affected if a peer responses in time. Low throughput will leave a peer available but ignored if there are other peers with higher throughput unless all peers can run parallel. * Fix fringe case when all higher throughput measurements are equal why When most higher throughputs are equal then case all ranks must be the topmost rank (an not the least.) Otherwise one might have delays until the situation is resolved. * Degrade ranks of useless peers as well as increasing their error count why Previously, syncer peers that did not deliver and ran in a timeout were not marked degraded (for ranking) but rather the error count increased. As a consequence, these peers did not lose with ranking. This led these peers linger on and tried again until the error count threshold was reached. --------- Co-authored-by: jordan <jordan@curd>
1 parent fe3abfe commit fe162f0

File tree

13 files changed

+299
-158
lines changed

13 files changed

+299
-158
lines changed

execution_chain/sync/beacon/worker.nim

Lines changed: 54 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,7 @@ import
1818
../../networking/p2p,
1919
./worker/headers/headers_target,
2020
./worker/update/metrics,
21-
./worker/[blocks, headers, start_stop, update, worker_desc]
22-
23-
# ------------------------------------------------------------------------------
24-
# Private functions
25-
# ------------------------------------------------------------------------------
26-
27-
proc somethingToCollect(buddy: BeaconBuddyRef): bool =
28-
if buddy.ctx.hibernate: # not activated yet?
29-
return false
30-
if buddy.headersCollectOk() or # something on TODO list
31-
buddy.blocksCollectOk():
32-
return true
33-
false
21+
./worker/[blocks, classify, headers, start_stop, update, worker_desc]
3422

3523
# ------------------------------------------------------------------------------
3624
# Public start/stop and admin functions
@@ -68,7 +56,7 @@ proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
6856
## Clean up this peer
6957
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
7058
throughput=buddy.only.thruPutStats.toMeanVar.psStr,
71-
nSyncPeers=(buddy.ctx.pool.nBuddies-1), syncState=($buddy.syncState)
59+
nSyncPeers=(buddy.ctx.pool.nBuddies-1), state=($buddy.syncState)
7260
buddy.stopBuddy()
7361

7462
# ------------------------------------------------------------------------------
@@ -151,53 +139,67 @@ template runPeer*(buddy: BeaconBuddyRef; info: static[string]): Duration =
151139
##
152140
var bodyRc = chronos.nanoseconds(0)
153141
block body:
154-
if buddy.somethingToCollect():
155-
156-
# Download and process headers and blocks
157-
while buddy.headersCollectOk():
142+
if buddy.somethingToCollectOrUnstage():
158143

159-
# Collect headers and either stash them on the header chain cache
160-
# directly, or stage on the header queue to get them serialised and
161-
# stashed, later.
162-
buddy.headersCollect info # async/template
144+
# Classify sync peer (aka buddy) performance
145+
let (fetchPerf {.inject.}, rank) = buddy.classifyForFetching()
163146

164-
# Store serialised headers from the `staged` queue onto the header
165-
# chain cache.
166-
if not buddy.headersUnstage info:
167-
# Need to proceed with another peer (e.g. gap between queue and
168-
# header chain cache.)
169-
bodyRc = workerIdleWaitInterval
170-
break body
147+
trace info & ": start processing", peer=buddy.peer,
148+
throughput=buddy.only.thruPutStats.toMeanVar.psStr,
149+
fetchPerf, rank=(if rank < 0: "n/a" else: $rank),
150+
nSyncPeers=buddy.ctx.pool.nBuddies, state=($buddy.syncState)
171151

172-
# End `while()`
152+
if fetchPerf == rankingTooLow:
153+
bodyRc = workerIdleWaitInterval
154+
break body # done, exit
173155

174-
# Fetch bodies and combine them with headers to blocks to be staged.
175-
# These staged blocks are then excuted by the daemon process (no `peer`
176-
# needed.)
177-
while buddy.blocksCollectOk():
178-
179-
# Collect bodies and either import them via `FC` module, or stage on
180-
# the blocks queue to get them serialised and imported, later.
181-
buddy.blocksCollect info # async/template
182-
183-
# Import bodies from the `staged` queue.
184-
if not buddy.blocksUnstage info: # async/template
185-
# Need to proceed with another peer (e.g. gap between top imported
186-
# block and blocks queue.)
187-
bodyRc = workerIdleWaitInterval
188-
break body
189-
190-
# End `while()`
156+
# Download and process headers and blocks
157+
block downloadAndProcess:
158+
while buddy.headersCollectOk():
159+
160+
# Collect headers and either stash them on the header chain cache
161+
# directly, or stage on the header queue to get them serialised and
162+
# stashed, later.
163+
buddy.headersCollect info # async/template
164+
165+
# Store serialised headers from the `staged` queue onto the header
166+
# chain cache.
167+
if not buddy.headersUnstage info: # async/template
168+
# Need to proceed with another peer (e.g. gap between queue and
169+
# header chain cache.)
170+
bodyRc = workerIdleWaitInterval
171+
break downloadAndProcess
172+
173+
# End `while()`
174+
175+
# Fetch bodies and combine them with headers to blocks to be staged.
176+
# These staged blocks are then excuted by the daemon process (no `peer`
177+
# needed.)
178+
while buddy.blocksCollectOk():
179+
# Collect bodies and either import them via `FC` module, or stage on
180+
# the blocks queue to get them serialised and imported, later.
181+
buddy.blocksCollect info # async/template
182+
183+
# Import bodies from the `staged` queue.
184+
if not buddy.blocksUnstage info: # async/template
185+
# Need to proceed with another peer (e.g. gap between top imported
186+
# block and blocks queue.)
187+
bodyRc = workerIdleWaitInterval
188+
break downloadAndProcess
189+
190+
# End `while()`
191+
192+
# End block: `actionLoop`
191193

192194
else:
193-
# Potential manual target set up
195+
# Potentially a manual sync target set up
194196
buddy.headersTargetActivate info
195197

196-
# End block: `body`
198+
# Idle sleep unless there is something to do
199+
if not buddy.somethingToCollectOrUnstage():
200+
bodyRc = workerIdleWaitInterval
197201

198-
# Idle sleep unless there is something to do
199-
if not buddy.somethingToCollect():
200-
bodyRc = workerIdleWaitInterval
202+
# End block: `body`
201203

202204
bodyRc
203205

execution_chain/sync/beacon/worker/blocks.nim

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ template blocksCollect*(
191191

192192
debug info & ": no blocks yet (failed peer)", peer,
193193
failedPeers=ctx.pool.failedPeers.len,
194-
syncState=($buddy.syncState), bdyErrors=buddy.bdyErrors
194+
state=($buddy.syncState), nErrors=buddy.blkErrors()
195195
break body # return
196196

197197
# This message might run in addition to the `chronicles.info` part
@@ -205,9 +205,10 @@ template blocksCollect*(
205205

206206
# --------------
207207

208-
proc blocksUnstageOk*(ctx: BeaconCtxRef): bool =
208+
proc blocksUnstageOk*(buddy: BeaconBuddyRef): bool =
209209
## Check whether import processing is possible
210210
##
211+
let ctx = buddy.ctx
211212
not ctx.poolMode and
212213
0 < ctx.blk.staged.len
213214

execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ template blocksFetchCheckImpl(
5151
iv {.inject,used.} = iv
5252
peer {.inject,used.} = buddy.peer
5353

54-
# Preset/append headers to be completed with bodies. Also collect block
54+
# Preset headers to be completed with bodies. Also collect block
5555
# hashes for fetching missing blocks.
5656
var
5757
request = BlockBodiesRequest(blockHashes: newSeqUninit[Hash32](iv.len))
@@ -98,7 +98,8 @@ template blocksFetchCheckImpl(
9898
blocks.setLen(n) # curb off junk
9999
buddy.bdyFetchRegisterError()
100100
trace info & ": Cut off junk blocks", peer, iv, n=n,
101-
nTxs=bodies[n].transactions.len, nBodies, bdyErrors=buddy.bdyErrors
101+
nTxs=bodies[n].transactions.len, nBodies,
102+
nErrors=buddy.nErrors.fetch.bdy
102103
break loop
103104

104105
# In order to avoid extensive checking here and also within the `FC`
@@ -117,7 +118,7 @@ template blocksFetchCheckImpl(
117118
if 0 < blocks.len.uint64:
118119
bodyRc = Opt[seq[EthBlock]].ok(blocks) # return ok()
119120

120-
buddy.only.nProcErrors.blk.inc
121+
buddy.nErrors.apply.blk.inc
121122
break body # return err()
122123

123124
bodyRc # return
@@ -141,7 +142,7 @@ template blocksFetch*(
141142
block body:
142143
# Make sure that this sync peer is not banned from block processing,
143144
# already.
144-
if nProcBlocksErrThreshold < buddy.only.nProcErrors.blk:
145+
if nProcBlocksErrThreshold < buddy.nErrors.apply.blk:
145146
buddy.ctrl.zombie = true
146147
break body # return err()
147148

@@ -195,7 +196,7 @@ template blocksImport*(
195196

196197
var isError = false
197198
block loop:
198-
trace info & ": Start importing blocks", peer, iv,
199+
trace info & ": start importing blocks", peer, iv,
199200
nBlocks=iv.len, base=ctx.chain.baseNumber.bnStr,
200201
head=ctx.chain.latestNumber.bnStr
201202

@@ -208,7 +209,7 @@ template blocksImport*(
208209
# Mark peer that produced that unusable headers list as a zombie
209210
let srcPeer = buddy.getPeer peerID
210211
if not srcPeer.isNil:
211-
srcPeer.only.nProcErrors.blk = nProcBlocksErrThreshold + 1
212+
srcPeer.only.nErrors.apply.blk = nProcBlocksErrThreshold + 1
212213

213214
# Check whether it is enough to skip the current blocks list, only
214215
if ctx.subState.procFailNum != nBn:
@@ -248,7 +249,7 @@ template blocksImport*(
248249
if not isError:
249250
let srcPeer = buddy.getPeer peerID
250251
if not srcPeer.isNil:
251-
srcPeer.only.nProcErrors.blk = 0
252+
srcPeer.only.nErrors.apply.blk = 0
252253

253254
nBlocks = ctx.subState.top - iv.minPt + 1 # number of blocks imported
254255

execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ template fetchBodies*(
6969
nReq {.inject,used.} = request.blockHashes.len
7070

7171
trace trEthSendSendingGetBlockBodies,
72-
peer, nReq, bdyErrors=buddy.bdyErrors
72+
peer, nReq, nErrors=buddy.nErrors.fetch.bdy
7373

7474
let rc = await buddy.getBlockBodies(request)
7575
var elapsed: Duration
@@ -82,47 +82,53 @@ template fetchBodies*(
8282
of ENoException:
8383
break evalError
8484
of EPeerDisconnected, ECancelledError:
85-
buddy.only.nRespErrors.blk.inc
85+
buddy.nErrors.fetch.bdy.inc
8686
buddy.ctrl.zombie = true
8787
of ECatchableError:
8888
buddy.bdyFetchRegisterError()
8989

9090
chronicles.info trEthRecvReceivedBlockBodies & " error", peer, nReq,
91-
elapsed=rc.error.elapsed.toStr, syncState=($buddy.syncState),
92-
error=rc.error.name, msg=rc.error.msg, bdyErrors=buddy.bdyErrors
91+
elapsed=rc.error.elapsed.toStr, state=($buddy.syncState),
92+
error=rc.error.name, msg=rc.error.msg, nErrors=buddy.nErrors.fetch.bdy
9393
break body # return err()
9494

9595
# Evaluate result
9696
if rc.isErr or buddy.ctrl.stopped:
9797
buddy.bdyFetchRegisterError()
9898
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0,
99-
elapsed=elapsed.toStr, syncState=($buddy.syncState),
100-
bdyErrors=buddy.bdyErrors
99+
elapsed=elapsed.toStr, state=($buddy.syncState),
100+
nErrors=buddy.nErrors.fetch.bdy
101101
break body # return err()
102102

103+
# Verify the correct number of block bodies received
103104
let b = rc.value.packet.bodies
104105
if b.len == 0 or nReq < b.len:
105-
buddy.bdyFetchRegisterError()
106+
if nReq < b.len:
107+
# Bogus peer returning additional rubbish
108+
buddy.bdyFetchRegisterError(forceZombie=true)
109+
else:
110+
# Data not avail but fast enough answer: degrade througput stats only
111+
discard buddy.only.thruPutStats.blk.bpsSample(elapsed, 0)
112+
if fetchBodiesErrTimeout <= elapsed:
113+
buddy.bdyFetchRegisterError(slowPeer=true)
106114
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
107-
elapsed=elapsed.toStr, syncState=($buddy.syncState),
108-
nRespErrors=buddy.only.nRespErrors.blk
115+
elapsed=elapsed.toStr, state=($buddy.syncState),
116+
nErrors=buddy.nErrors.fetch.bdy
109117
break body # return err()
110118

111119
# Update download statistics
112120
let bps = buddy.only.thruPutStats.blk.bpsSample(elapsed, b.getEncodedLength)
113121

114-
# Ban an overly slow peer for a while when seen in a row. Also there is a
115-
# mimimum share of the number of requested headers expected, typically 10%.
116-
if fetchBodiesErrTimeout < elapsed or
117-
b.len.uint64 * 100 < nReq.uint64 * fetchBodiesMinResponsePC:
122+
# Ban an overly slow peer for a while when observed consecutively.
123+
if fetchBodiesErrTimeout < elapsed:
118124
buddy.bdyFetchRegisterError(slowPeer=true)
119125
else:
120-
buddy.only.nRespErrors.blk = 0 # reset error count
126+
buddy.nErrors.fetch.bdy = 0 # reset error count
121127
buddy.ctx.pool.lastSlowPeer = Opt.none(Hash) # not last one or not error
122128

123129
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
124130
elapsed=elapsed.toStr, throughput=(bps.toIECb(1) & "ps"),
125-
syncState=($buddy.syncState), bdyErrors=buddy.bdyErrors
131+
state=($buddy.syncState), nErrors=buddy.nErrors.fetch.bdy
126132

127133
bodyRc = Opt[seq[BlockBody]].ok(b)
128134

execution_chain/sync/beacon/worker/blocks/blocks_helpers.nim

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@ import
1717
# Public functions
1818
# ------------------------------------------------------------------------------
1919

20-
func bdyErrors*(buddy: BeaconBuddyRef): string =
21-
$buddy.only.nRespErrors.blk & "/" & $buddy.only.nProcErrors.blk
22-
23-
proc bdyFetchRegisterError*(buddy: BeaconBuddyRef, slowPeer = false) =
24-
buddy.only.nRespErrors.blk.inc
25-
if nFetchBodiesErrThreshold < buddy.only.nRespErrors.blk:
26-
if buddy.ctx.pool.nBuddies == 1 and slowPeer:
20+
func blkErrors*(buddy: BeaconBuddyRef): string =
21+
$buddy.nErrors.fetch.bdy & "/" & $buddy.nErrors.apply.blk
22+
23+
proc bdyFetchRegisterError*(
24+
buddy: BeaconBuddyRef;
25+
slowPeer = false;
26+
forceZombie = false) =
27+
buddy.nErrors.fetch.bdy.inc
28+
if nFetchBodiesErrThreshold < buddy.nErrors.fetch.bdy:
29+
if not forceZombie and buddy.ctx.pool.nBuddies == 1 and slowPeer:
2730
# Remember that the current peer is the last one and is lablelled slow.
2831
# It would have been zombified if it were not the last one. This can be
2932
# used in functions -- depending on context -- that will trigger if the

0 commit comments

Comments
 (0)