From ce57ec6ea17e60690906be91f812a00b17f24797 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Mon, 13 Oct 2025 16:55:31 +0530 Subject: [PATCH 01/16] light client init --- nimbus_verified_proxy/lc/lc.nim | 158 ++++++++++ nimbus_verified_proxy/lc/lc_manager.nim | 387 ++++++++++++++++++++++++ 2 files changed, 545 insertions(+) create mode 100644 nimbus_verified_proxy/lc/lc.nim create mode 100644 nimbus_verified_proxy/lc/lc_manager.nim diff --git a/nimbus_verified_proxy/lc/lc.nim b/nimbus_verified_proxy/lc/lc.nim new file mode 100644 index 0000000000..476b70e9cf --- /dev/null +++ b/nimbus_verified_proxy/lc/lc.nim @@ -0,0 +1,158 @@ +# nimbus_verified_proxy +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + chronicles, + chronos, + beacon_chain/gossip_processing/light_client_processor, + beacon_chain/beacon_clock, + ./lc_manager # use the modified light client manager + + +type + LightClientHeaderCallback* = proc( + lightClient: LightClient, header: ForkedLightClientHeader + ) {.gcsafe, raises: [].} + + LightClient* = ref object + cfg: RuntimeConfig + forkDigests: ref ForkDigests + getBeaconTime*: GetBeaconTimeFn + store*: ref ForkedLightClientStore + processor*: ref LightClientProcessor + manager: LightClientManager + onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback + trustedBlockRoot*: Option[Eth2Digest] + +func getFinalizedHeader*(lightClient: LightClient): ForkedLightClientHeader = + withForkyStore(lightClient.store[]): + when lcDataFork > LightClientDataFork.None: + var header = ForkedLightClientHeader(kind: lcDataFork) + header.forky(lcDataFork) = forkyStore.finalized_header + header + else: + default(ForkedLightClientHeader) + +func getOptimisticHeader*(lightClient: LightClient): ForkedLightClientHeader = + withForkyStore(lightClient.store[]): + when lcDataFork > LightClientDataFork.None: + var header = ForkedLightClientHeader(kind: lcDataFork) + header.forky(lcDataFork) = forkyStore.optimistic_header + header + else: + default(ForkedLightClientHeader) + +proc new*( + T: type LightClient, + rng: ref HmacDrbgContext, + cfg: RuntimeConfig, + forkDigests: ref ForkDigests, + getBeaconTime: GetBeaconTimeFn, + genesis_validators_root: Eth2Digest, + trustedBlockRoot: Option[Eth2Digest], + finalizationMode: LightClientFinalizationMode, +): T = + let lightClient = LightClient( + cfg: cfg, + forkDigests: forkDigests, + getBeaconTime: getBeaconTime, + store: (ref ForkedLightClientStore)(), + trustedBlockRoot: trustedBlockRoot + ) + + func getTrustedBlockRoot(): Option[Eth2Digest] = + lightClient.trustedBlockRoot + + proc onStoreInitialized() = + discard + + proc onFinalizedHeader() = + if lightClient.onFinalizedHeader != nil: + lightClient.onFinalizedHeader(lightClient, lightClient.getFinalizedHeader) + + proc onOptimisticHeader() = + if lightClient.onOptimisticHeader != nil: + lightClient.onOptimisticHeader(lightClient, lightClient.getOptimisticHeader) + + # initialize without dumping + lightClient.processor = LightClientProcessor.new( + false, ".", ".", cfg, genesis_validators_root, + finalizationMode, lightClient.store, getBeaconTime, trustedBlockRoot, + onStoreInitialized, onFinalizedHeader, onOptimisticHeader, + ) + + proc lightClientVerifier( + obj: SomeForkedLightClientObject + ): Future[Result[void, LightClientVerifierError]] {. + async: (raises: [CancelledError], raw: true) + .} = + let resfut = Future[Result[void, LightClientVerifierError]] + .Raising([CancelledError]) + .init("lightClientVerifier") + lightClient.processor[].addObject(MsgSource.gossip, obj, resfut) + resfut + + proc bootstrapVerifier(obj: ForkedLightClientBootstrap): auto = + lightClientVerifier(obj) + + proc updateVerifier(obj: ForkedLightClientUpdate): auto = + lightClientVerifier(obj) + + proc finalityVerifier(obj: ForkedLightClientFinalityUpdate): auto = + lightClientVerifier(obj) + + proc optimisticVerifier(obj: ForkedLightClientOptimisticUpdate): auto = + lightClientVerifier(obj) + + func isLightClientStoreInitialized(): bool = + lightClient.store[].kind > LightClientDataFork.None + + func isNextSyncCommitteeKnown(): bool = + withForkyStore(lightClient.store[]): + when lcDataFork > LightClientDataFork.None: + forkyStore.is_next_sync_committee_known + else: + false + + func getFinalizedSlot(): Slot = + withForkyStore(lightClient.store[]): + when lcDataFork > LightClientDataFork.None: + forkyStore.finalized_header.beacon.slot + else: + GENESIS_SLOT + + func getOptimisticSlot(): Slot = + withForkyStore(lightClient.store[]): + when lcDataFork > LightClientDataFork.None: + forkyStore.optimistic_header.beacon.slot + else: + GENESIS_SLOT + + lightClient.manager = LightClientManager.init( + rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, + finalityVerifier, optimisticVerifier, isLightClientStoreInitialized, + isNextSyncCommitteeKnown, getFinalizedSlot, getOptimisticSlot, getBeaconTime, + ) + + lightClient + +proc start*(lightClient: LightClient) = + info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot() + lightClient.manager.start() + +proc stop*(lightClient: LightClient) {.async: (raises: []).} = + info "Stopping beacon light client" + await lightClient.manager.stop() + +proc resetToFinalizedHeader*( + lightClient: LightClient, + header: ForkedLightClientHeader, + current_sync_committee: SyncCommittee, +) = + lightClient.processor[].resetToFinalizedHeader(header, current_sync_committee) diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim new file mode 100644 index 0000000000..a2d22391f9 --- /dev/null +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -0,0 +1,387 @@ +# nimbus_verified_proxy +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import chronos, chronicles +import + beacon_chain/beacon_clock, + beacon_chain/networking/peer_scores, + beacon_chain/[light_client_sync_helpers, sync_manager] + +logScope: + topics = "lcman" + +type + Nothing = object + ResponseError = object of CatchableError + Endpoint[K, V] = + (K, V) # https://github.com/nim-lang/Nim/issues/19531 + Bootstrap = + Endpoint[Eth2Digest, ForkedLightClientBootstrap] + UpdatesByRange = + Endpoint[ + tuple[startPeriod: SyncCommitteePeriod, count: uint64], + ForkedLightClientUpdate] + FinalityUpdate = + Endpoint[Nothing, ForkedLightClientFinalityUpdate] + OptimisticUpdate = + Endpoint[Nothing, ForkedLightClientOptimisticUpdate] + + ValueVerifier[V] = + proc(v: V): Future[Result[void, LightClientVerifierError]] {.async: (raises: [CancelledError]).} + BootstrapVerifier* = + ValueVerifier[ForkedLightClientBootstrap] + UpdateVerifier* = + ValueVerifier[ForkedLightClientUpdate] + FinalityUpdateVerifier* = + ValueVerifier[ForkedLightClientFinalityUpdate] + OptimisticUpdateVerifier* = + ValueVerifier[ForkedLightClientOptimisticUpdate] + + GetTrustedBlockRootCallback* = + proc(): Option[Eth2Digest] {.gcsafe, raises: [].} + GetBoolCallback* = + proc(): bool {.gcsafe, raises: [].} + GetSyncCommitteePeriodCallback* = + proc(): SyncCommitteePeriod {.gcsafe, raises: [].} + + LightClientBootstrapProc = proc(id: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] + LightClientUpdatesByRangeProc = proc(id: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[NetRes[ForkedLightClientUpdateList]] + LightClientFinalityUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] + LightClientOptimisticUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] + ReportResponseQualityProc = proc(id: uint64, value: int) + + EthLCBackend* = object + getLightClientBootstrap: LightClientBootstrapProc + getLightClientUpdatesByRange: LightClientUpdatesByRangeProc + getLightClientFinalityUpdate: LightClientFInalityUpdateProc + getLightClientOptimisticUpdate: LightClientOptimisticUpdateProc + reportRequestQuality: ReportRequestQualityProc + + LightClientManager* = object + rng: ref HmacDrbgContext + backend*: EthLCBackend + getTrustedBlockRoot: GetTrustedBlockRootCallback + bootstrapVerifier: BootstrapVerifier + updateVerifier: UpdateVerifier + finalityUpdateVerifier: FinalityUpdateVerifier + optimisticUpdateVerifier: OptimisticUpdateVerifier + isLightClientStoreInitialized: GetBoolCallback + isNextSyncCommitteeKnown: GetBoolCallback + getFinalizedPeriod: GetSyncCommitteePeriodCallback + getOptimisticPeriod: GetSyncCommitteePeriodCallback + getBeaconTime: GetBeaconTimeFn + loopFuture: Future[void].Raising([CancelledError]) + +func init*( + T: type LightClientManager, + rng: ref HmacDrbgContext, + getTrustedBlockRoot: GetTrustedBlockRootCallback, + bootstrapVerifier: BootstrapVerifier, + updateVerifier: UpdateVerifier, + finalityUpdateVerifier: FinalityUpdateVerifier, + optimisticUpdateVerifier: OptimisticUpdateVerifier, + isLightClientStoreInitialized: GetBoolCallback, + isNextSyncCommitteeKnown: GetBoolCallback, + getFinalizedPeriod: GetSyncCommitteePeriodCallback, + getOptimisticPeriod: GetSyncCommitteePeriodCallback, + getBeaconTime: GetBeaconTimeFn, +): LightClientManager = + ## Initialize light client manager. + LightClientManager( + rng: rng, + getTrustedBlockRoot: getTrustedBlockRoot, + bootstrapVerifier: bootstrapVerifier, + updateVerifier: updateVerifier, + finalityUpdateVerifier: finalityUpdateVerifier, + optimisticUpdateVerifier: optimisticUpdateVerifier, + isLightClientStoreInitialized: isLightClientStoreInitialized, + isNextSyncCommitteeKnown: isNextSyncCommitteeKnown, + getFinalizedPeriod: getFinalizedPeriod, + getOptimisticPeriod: getOptimisticPeriod, + getBeaconTime: getBeaconTime) + +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap +proc doRequest( + e: typedesc[Bootstrap], + backend: EthLCBackend, + reqId: uint64, + blockRoot: Eth2Digest +): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError], raw: true).} = + backend.getLightClientBootstrap(reqId, blockRoot) + +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange +type LightClientUpdatesByRangeResponse = + NetRes[List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]] +proc doRequest( + e: typedesc[UpdatesByRange], + backend: EthLCBackend, + reqId: uint64, + key: tuple[startPeriod: SyncCommitteePeriod, count: uint64] +): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [ResponseError, CancelledError]).} = + let (startPeriod, count) = key + doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES + let response = await backend.getLightClientUpdatesByRange(reqId, startPeriod, count) + if response.isOk: + let e = distinctBase(response.get) + .checkLightClientUpdates(startPeriod, count) + if e.isErr: + raise newException(ResponseError, e.error) + return response + +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate +proc doRequest( + e: typedesc[FinalityUpdate], + backend: EthLCBackend, + reqId: uint64 +): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError], raw: true).} = + backend.getLightClientFinalityUpdate(reqId) + +# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate +proc doRequest( + e: typedesc[OptimisticUpdate], + backend: EthLCBackend, + reqId: uint64 +): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError], raw: true).} = + backend.getLightClientOptimisticUpdate(reqId) + +template valueVerifier[E]( + self: LightClientManager, + e: typedesc[E] +): ValueVerifier[E.V] = + when E.V is ForkedLightClientBootstrap: + self.bootstrapVerifier + elif E.V is ForkedLightClientUpdate: + self.updateVerifier + elif E.V is ForkedLightClientFinalityUpdate: + self.finalityUpdateVerifier + elif E.V is ForkedLightClientOptimisticUpdate: + self.optimisticUpdateVerifier + else: static: doAssert false + +iterator values(v: auto): auto = + ## Local helper for `workerTask` to share the same implementation for both + ## scalar and aggregate values, by treating scalars as 1-length aggregates. + when v is List: + for i in v: + yield i + else: + yield v + +proc workerTask[E]( + self: LightClientManager, + e: typedesc[E], + key: E.K +): Future[bool] {.async: (raises: [CancelledError]).} = + var + didProgress = false + try: + var reqId: uint64 + self.rng[].generate(reqId) + + let value = + when E.K is Nothing: + await E.doRequest(self.backend, reqId) + else: + await E.doRequest(self.backend, reqId, key) + if value.isOk: + var applyReward = false + for val in value.get().values: + let res = await self.valueVerifier(E)(val) + if res.isErr: + case res.error + of LightClientVerifierError.MissingParent: + # Stop, requires different request to progress + return didProgress + of LightClientVerifierError.Duplicate: + # Ignore, a concurrent request may have already fulfilled this + when E.V is ForkedLightClientBootstrap: + didProgress = true + else: + discard + of LightClientVerifierError.UnviableFork: + # Descore, peer is on an incompatible fork version + withForkyObject(val): + when lcDataFork > LightClientDataFork.None: + notice "Received value from an unviable fork", value = forkyObject, endpoint = E.name + else: + notice "Received value from an unviable fork", endpoint = E.name + self.backend.reportRequestQuality(reqId, PeerScoreUnviableFork) + return didProgress + of LightClientVerifierError.Invalid: + # Descore, received data is malformed + withForkyObject(val): + when lcDataFork > LightClientDataFork.None: + warn "Received invalid value", value = forkyObject.shortLog, endpoint = E.name + else: + warn "Received invalid value", endpoint = E.name + self.backend.reportRequestQuality(reqId, PeerScoreBadValues) + return didProgress + else: + # Reward, peer returned something useful + applyReward = true + didProgress = true + if applyReward: + self.backend.reportRequestQuality(reqId, PeerScoreGoodValues) + else: + self.backend.reportRequestQuality(reqId, PeerScoreNoValues) + debug "Failed to receive value on request", value, endpoint = E.name + except ResponseError as exc: + warn "Received invalid response", error = exc.msg, endpoint = E.name + self.backend.reportRequestQuality(reqId, PeerScoreBadValues) + except CancelledError as exc: + raise exc + + return didProgress + +proc query[E]( + self: LightClientManager, + e: typedesc[E], + key: E.K +): Future[bool] {.async: (raises: [CancelledError]).} = + const PARALLEL_REQUESTS = 2 + var workers: array[PARALLEL_REQUESTS, Future[bool]] + + let + progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress") + doneFut = Future[void].Raising([CancelledError]).init("lcmanDone") + var + numCompleted = 0 + maxCompleted = workers.len + + proc handleFinishedWorker(future: pointer) = + try: + let didProgress = cast[Future[bool]](future).read() + if didProgress and not progressFut.finished: + progressFut.complete() + except CancelledError: + if not progressFut.finished: + progressFut.cancelSoon() + except CatchableError: + discard + finally: + inc numCompleted + if numCompleted == maxCompleted: + doneFut.complete() + + try: + # Start concurrent workers + for i in 0 ..< workers.len: + try: + workers[i] = self.workerTask(e, key) + workers[i].addCallback(handleFinishedWorker) + except CancelledError as exc: + raise exc + except CatchableError: + workers[i] = newFuture[bool]() + workers[i].complete(false) + + # Wait for any worker to report progress, or for all workers to finish + try: + discard await race(progressFut, doneFut) + except ValueError: + raiseAssert "race API invariant" + finally: + for i in 0 ..< maxCompleted: + if workers[i] == nil: + maxCompleted = i + if numCompleted == maxCompleted: + doneFut.complete() + break + if not workers[i].finished: + workers[i].cancelSoon() + while true: + try: + await allFutures(workers[0 ..< maxCompleted]) + break + except CancelledError: + continue + while true: + try: + await doneFut + break + except CancelledError: + continue + + if not progressFut.finished: + progressFut.cancelSoon() + return progressFut.completed + +template query[E]( + self: LightClientManager, + e: typedesc[E] +): Future[bool].Raising([CancelledError]) = + self.query(e, Nothing()) + +# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.0/specs/altair/light-client/light-client.md#light-client-sync-process +proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = + var nextSyncTaskTime = self.getBeaconTime() + + while true: + # Periodically wake and check for changes + let wallTime = self.getBeaconTime() + if wallTime < nextSyncTaskTime: + await sleepAsync(chronos.seconds(2)) + continue + + # Obtain bootstrap data once a trusted block root is supplied + if not self.isLightClientStoreInitialized(): + let trustedBlockRoot = self.getTrustedBlockRoot() + if trustedBlockRoot.isNone: + await sleepAsync(chronos.seconds(2)) + continue + + let didProgress = await self.query(Bootstrap, trustedBlockRoot.get) + nextSyncTaskTime = + if didProgress: + wallTime + else: + wallTime + self.rng.computeDelayWithJitter(chronos.seconds(0)) + continue + + # Fetch updates + let + current = wallTime.slotOrZero().sync_committee_period + + syncTask = nextLightClientSyncTask( + current = current, + finalized = self.getFinalizedPeriod(), + optimistic = self.getOptimisticPeriod(), + isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()) + + didProgress = + case syncTask.kind + of LcSyncKind.UpdatesByRange: + await self.query(UpdatesByRange, + (startPeriod: syncTask.startPeriod, count: syncTask.count)) + of LcSyncKind.FinalityUpdate: + haveFinalityUpdate = true + await self.query(FinalityUpdate) + of LcSyncKind.OptimisticUpdate: + await self.query(OptimisticUpdate) + + nextSyncTaskTime = + wallTime + + self.rng.nextLcSyncTaskDelay( + wallTime, + finalized = self.getFinalizedPeriod(), + optimistic = self.getOptimisticPeriod(), + isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(), + didLatestSyncTaskProgress = didProgress + ) + +proc start*(self: var LightClientManager) = + ## Start light client manager's loop. + doAssert self.loopFuture == nil + self.loopFuture = self.loop() + +proc stop*(self: var LightClientManager) {.async: (raises: []).} = + ## Stop light client manager's loop. + if self.loopFuture != nil: + await noCancel self.loopFuture.cancelAndWait() + self.loopFuture = nil From eaa3aca3b12ce21bc8a196bc733e97baaea14b94 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Mon, 13 Oct 2025 19:47:19 +0530 Subject: [PATCH 02/16] builds --- nimbus_verified_proxy/lc/lc.nim | 22 ++-- nimbus_verified_proxy/lc/lc_manager.nim | 27 +++-- .../nimbus_verified_proxy.nim | 103 ++---------------- 3 files changed, 31 insertions(+), 121 deletions(-) diff --git a/nimbus_verified_proxy/lc/lc.nim b/nimbus_verified_proxy/lc/lc.nim index 476b70e9cf..18c73de1d2 100644 --- a/nimbus_verified_proxy/lc/lc.nim +++ b/nimbus_verified_proxy/lc/lc.nim @@ -14,7 +14,6 @@ import beacon_chain/beacon_clock, ./lc_manager # use the modified light client manager - type LightClientHeaderCallback* = proc( lightClient: LightClient, header: ForkedLightClientHeader @@ -55,7 +54,6 @@ proc new*( forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn, genesis_validators_root: Eth2Digest, - trustedBlockRoot: Option[Eth2Digest], finalizationMode: LightClientFinalizationMode, ): T = let lightClient = LightClient( @@ -63,7 +61,6 @@ proc new*( forkDigests: forkDigests, getBeaconTime: getBeaconTime, store: (ref ForkedLightClientStore)(), - trustedBlockRoot: trustedBlockRoot ) func getTrustedBlockRoot(): Option[Eth2Digest] = @@ -83,7 +80,7 @@ proc new*( # initialize without dumping lightClient.processor = LightClientProcessor.new( false, ".", ".", cfg, genesis_validators_root, - finalizationMode, lightClient.store, getBeaconTime, trustedBlockRoot, + finalizationMode, lightClient.store, getBeaconTime, getTrustedBlockRoot, onStoreInitialized, onFinalizedHeader, onOptimisticHeader, ) @@ -119,31 +116,30 @@ proc new*( forkyStore.is_next_sync_committee_known else: false - - func getFinalizedSlot(): Slot = + func getFinalizedPeriod(): SyncCommitteePeriod = withForkyStore(lightClient.store[]): when lcDataFork > LightClientDataFork.None: - forkyStore.finalized_header.beacon.slot + forkyStore.finalized_header.beacon.slot.sync_committee_period else: - GENESIS_SLOT + GENESIS_SLOT.sync_committee_period - func getOptimisticSlot(): Slot = + func getOptimisticPeriod(): SyncCommitteePeriod = withForkyStore(lightClient.store[]): when lcDataFork > LightClientDataFork.None: - forkyStore.optimistic_header.beacon.slot + forkyStore.optimistic_header.beacon.slot.sync_committee_period else: - GENESIS_SLOT + GENESIS_SLOT.sync_committee_period lightClient.manager = LightClientManager.init( rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier, isLightClientStoreInitialized, - isNextSyncCommitteeKnown, getFinalizedSlot, getOptimisticSlot, getBeaconTime, + isNextSyncCommitteeKnown, getFinalizedPeriod, getOptimisticPeriod, getBeaconTime, ) lightClient proc start*(lightClient: LightClient) = - info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot() + info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot lightClient.manager.start() proc stop*(lightClient: LightClient) {.async: (raises: []).} = diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim index a2d22391f9..eb9e216ec2 100644 --- a/nimbus_verified_proxy/lc/lc_manager.nim +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -11,11 +11,14 @@ import chronos, chronicles import beacon_chain/beacon_clock, beacon_chain/networking/peer_scores, - beacon_chain/[light_client_sync_helpers, sync_manager] + beacon_chain/sync/[light_client_sync_helpers, sync_manager] logScope: topics = "lcman" +const + MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128 + type Nothing = object ResponseError = object of CatchableError @@ -32,6 +35,7 @@ type OptimisticUpdate = Endpoint[Nothing, ForkedLightClientOptimisticUpdate] + NetRes*[T] = Result[T, void] ValueVerifier[V] = proc(v: V): Future[Result[void, LightClientVerifierError]] {.async: (raises: [CancelledError]).} BootstrapVerifier* = @@ -50,16 +54,18 @@ type GetSyncCommitteePeriodCallback* = proc(): SyncCommitteePeriod {.gcsafe, raises: [].} - LightClientBootstrapProc = proc(id: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] - LightClientUpdatesByRangeProc = proc(id: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[NetRes[ForkedLightClientUpdateList]] - LightClientFinalityUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] - LightClientOptimisticUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] - ReportResponseQualityProc = proc(id: uint64, value: int) + LightClientUpdatesByRangeResponse = NetRes[List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]] + + LightClientBootstrapProc = proc(id: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} + LightClientUpdatesByRangeProc = proc(id: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} + LightClientFinalityUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError]).} + LightClientOptimisticUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError]).} + ReportRequestQualityProc = proc(id: uint64, value: int) {.gcsafe, raises: [].} EthLCBackend* = object getLightClientBootstrap: LightClientBootstrapProc getLightClientUpdatesByRange: LightClientUpdatesByRangeProc - getLightClientFinalityUpdate: LightClientFInalityUpdateProc + getLightClientFinalityUpdate: LightClientFinalityUpdateProc getLightClientOptimisticUpdate: LightClientOptimisticUpdateProc reportRequestQuality: ReportRequestQualityProc @@ -116,8 +122,6 @@ proc doRequest( backend.getLightClientBootstrap(reqId, blockRoot) # https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange -type LightClientUpdatesByRangeResponse = - NetRes[List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]] proc doRequest( e: typedesc[UpdatesByRange], backend: EthLCBackend, @@ -180,8 +184,8 @@ proc workerTask[E]( ): Future[bool] {.async: (raises: [CancelledError]).} = var didProgress = false + reqId: uint64 try: - var reqId: uint64 self.rng[].generate(reqId) let value = @@ -232,8 +236,8 @@ proc workerTask[E]( self.backend.reportRequestQuality(reqId, PeerScoreNoValues) debug "Failed to receive value on request", value, endpoint = E.name except ResponseError as exc: - warn "Received invalid response", error = exc.msg, endpoint = E.name self.backend.reportRequestQuality(reqId, PeerScoreBadValues) + warn "Received invalid response", error = exc.msg, endpoint = E.name except CancelledError as exc: raise exc @@ -360,7 +364,6 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = await self.query(UpdatesByRange, (startPeriod: syncTask.startPeriod, count: syncTask.count)) of LcSyncKind.FinalityUpdate: - haveFinalityUpdate = true await self.query(FinalityUpdate) of LcSyncKind.OptimisticUpdate: await self.query(OptimisticUpdate) diff --git a/nimbus_verified_proxy/nimbus_verified_proxy.nim b/nimbus_verified_proxy/nimbus_verified_proxy.nim index 43460dc519..9ffab73e98 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy.nim @@ -14,17 +14,18 @@ import confutils, eth/common/[keys, eth_types_rlp], json_rpc/rpcproxy, - beacon_chain/gossip_processing/optimistic_processor, + beacon_chain/gossip_processing/light_client_processor, beacon_chain/networking/network_metadata, - beacon_chain/networking/topic_params, beacon_chain/spec/beaconstate, - beacon_chain/[beacon_clock, buildinfo, light_client, nimbus_binary_common], + beacon_chain/conf, + beacon_chain/[beacon_clock, buildinfo, nimbus_binary_common], ../execution_chain/common/common, ./nimbus_verified_proxy_conf, ./engine/engine, ./engine/header_store, ./engine/utils, ./engine/types, + ./lc/lc, ./json_rpc_backend, ./json_rpc_frontend, ../execution_chain/version_info @@ -78,7 +79,7 @@ proc run*( try: notice "Launching Nimbus verified proxy", - version = fullVersionStr, cmdParams = commandLineParams(), config + version = FullVersionStr, cmdParams = commandLineParams(), config except Exception: notice "commandLineParams() exception" @@ -156,27 +157,11 @@ proc run*( let rng = keys.newRng() - netKeys = getRandomNetKeys(rng[]) - - network = createEth2Node( - rng, lcConfig, netKeys, cfg, forkDigests, getBeaconTime, genesis_validators_root - ) - # light client is set to optimistic finalization mode - lightClient = createLightClient( - network, rng, lcConfig, cfg, forkDigests, getBeaconTime, genesis_validators_root, - LightClientFinalizationMode.Optimistic, + lightClient = LightClient.new( + rng, cfg, forkDigests, getBeaconTime, genesis_validators_root, LightClientFinalizationMode.Optimistic, ) - # registerbasic p2p protocols for maintaing peers ping/status/get_metadata/... etc. - network.registerProtocol( - PeerSync, - PeerSync.NetworkState.init(cfg, forkDigests, genesisBlockRoot, getBeaconTime), - ) - - # start the p2p network and rpcProxy - waitFor network.startListening() - waitFor network.start() # verify chain id that the proxy is connected to waitFor engine.verifyChaindId() @@ -221,89 +206,15 @@ proc run*( lightClient.onFinalizedHeader = onFinalizedHeader lightClient.onOptimisticHeader = onOptimisticHeader lightClient.trustedBlockRoot = some config.trustedBlockRoot - lightClient.installMessageValidators() - func shouldSyncOptimistically(wallSlot: Slot): bool = - let optimisticHeader = lightClient.optimisticHeader - withForkyHeader(optimisticHeader): - when lcDataFork > LightClientDataFork.None: - # Check whether light client has synced sufficiently close to wall slot - const maxAge = 2 * SLOTS_PER_EPOCH - forkyHeader.beacon.slot >= max(wallSlot, maxAge.Slot) - maxAge - else: - false - - var blocksGossipState: GossipState - proc updateBlocksGossipStatus(slot: Slot) = - let - isBehind = not shouldSyncOptimistically(slot) - - targetGossipState = getTargetGossipState(slot.epoch, cfg, isBehind) - - template currentGossipState(): auto = - blocksGossipState - - if currentGossipState == targetGossipState: - return - - if currentGossipState.card == 0 and targetGossipState.card > 0: - debug "Enabling blocks topic subscriptions", wallSlot = slot, targetGossipState - elif currentGossipState.card > 0 and targetGossipState.card == 0: - debug "Disabling blocks topic subscriptions", wallSlot = slot - else: - # Individual forks added / removed - discard - - let - newGossipEpochs = targetGossipState - currentGossipState - oldGossipEpochs = currentGossipState - targetGossipState - - for gossipEpoch in oldGossipEpochs: - let forkDigest = forkDigests[].atEpoch(gossipEpoch, cfg) - network.unsubscribe(getBeaconBlocksTopic(forkDigest)) - - for gossipEpoch in newGossipEpochs: - let forkDigest = forkDigests[].atEpoch(gossipEpoch, cfg) - network.subscribe( - getBeaconBlocksTopic(forkDigest), - getBlockTopicParams(cfg.timeParams), - enableTopicMetrics = true, - ) - - blocksGossipState = targetGossipState - - proc updateGossipStatus(time: Moment) = - let wallSlot = getBeaconTime().slotOrZero(cfg.timeParams) - updateBlocksGossipStatus(wallSlot + 1) - lightClient.updateGossipStatus(wallSlot + 1) - - # updates gossip status every second every second - proc runOnSecondLoop() {.async.} = - let sleepTime = chronos.seconds(1) - while true: - let start = chronos.now(chronos.Moment) - await chronos.sleepAsync(sleepTime) - let afterSleep = chronos.now(chronos.Moment) - let sleepTime = afterSleep - start - updateGossipStatus(start) - let finished = chronos.now(chronos.Moment) - let processingTime = finished - afterSleep - trace "onSecond task completed", sleepTime, processingTime - - # update gossip status before starting the light client - updateGossipStatus(Moment.now()) # start the light client lightClient.start() - # launch a async routine - asyncSpawn runOnSecondLoop() - # run an infinite loop and wait for a stop signal while true: poll() if ctx != nil and ctx.stop: # Cleanup - waitFor network.stop() waitFor jsonRpcClient.stop() waitFor jsonRpcServer.stop() ctx.cleanup() From 4db88daa923ec9d9859b6b01bc72d9c349f26d38 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Tue, 14 Oct 2025 19:27:37 +0530 Subject: [PATCH 03/16] download per slot --- nimbus_verified_proxy/json_lc_backend.nim | 118 +++++++++++++++ nimbus_verified_proxy/lc/lc.nim | 18 ++- nimbus_verified_proxy/lc/lc_manager.nim | 142 ++++++++++-------- .../nimbus_verified_proxy.nim | 18 ++- .../nimbus_verified_proxy_conf.nim | 94 +----------- 5 files changed, 225 insertions(+), 165 deletions(-) create mode 100644 nimbus_verified_proxy/json_lc_backend.nim diff --git a/nimbus_verified_proxy/json_lc_backend.nim b/nimbus_verified_proxy/json_lc_backend.nim new file mode 100644 index 0000000000..8908fd34a5 --- /dev/null +++ b/nimbus_verified_proxy/json_lc_backend.nim @@ -0,0 +1,118 @@ +# nimbus_verified_proxy +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [], gcsafe.} + +import + stint, + chronos, + chronicles, + presto/client, + beacon_chain/spec/eth2_apis/rest_light_client_calls, + beacon_chain/spec/presets, + beacon_chain/spec/forks, + ./lc/lc_manager, + ./nimbus_verified_proxy_conf + +logScope: + topics = "SSZLCRestClient" + +const + MaxMessageBodyBytes* = 128 * 1024 * 1024 # 128 MB (JSON encoded) + BASE_URL="/eth/v1/beacon/light_client" + +type + LCRestPeer = ref object + score: int + restClient: RestClientRef + + LCRestClient* = ref object + cfg: RuntimeConfig + forkDigests: ref ForkDigests + peers: seq[LCRestPeer] + urls: seq[string] + +func new*(T: type LCRestClient, cfg: RuntimeConfig, forkDigests: ref ForkDigests): LCRestClient = + LCRestClient(cfg: cfg, forkDigests: forkDigests, peers: @[]) + +proc addEndpoint*(client: LCRestClient, endpoint: string) {.raises: [ValueError].} = + if endpoint in client.urls: + raise newException(ValueError, "Endpoint already added") + + let restClient = RestClientRef.new(endpoint).valueOr: + raise newException(ValueError, $error) + + client.peers.add(LCRestPeer(score: 0, restClient: restClient)) + client.urls.add(endpoint) + +proc closeAll*(client: LCRestClient) {.async: (raises: []).} = + for peer in client.peers: + await peer.restClient.closeWait() + + client.peers.setLen(0) + client.urls.setLen(0) + +proc getEthLCBackend*(client: LCRestClient): EthLCBackend = + + let + getLCBootstrapProc = proc(reqId: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} = + let + peer = client.peers[reqId mod uint64(client.peers.len)] + res = + try: + await peer.restClient.getLightClientBootstrap(blockRoot, client.cfg, client.forkDigests) + except CatchableError as e: + raise newException(CancelledError, e.msg) + + ok(res) + + getLCUpdatesProc = proc(reqId: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} = + let + peer = client.peers[reqId mod uint64(client.peers.len)] + res = + try: + await peer.restClient.getLightClientUpdatesByRange(startPeriod, count, client.cfg, client.forkDigests) + except CatchableError as e: + raise newException(CancelledError, e.msg) + + ok(res) + + getLCFinalityProc = proc(reqId: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError]).} = + let + peer = client.peers[reqId mod uint64(client.peers.len)] + res = + try: + await peer.restClient.getLightClientFinalityUpdate(client.cfg, client.forkDigests) + except CatchableError as e: + raise newException(CancelledError, e.msg) + + ok(res) + + getLCOptimisticProc = proc(reqId: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError]).} = + let + peer = client.peers[reqId mod uint64(client.peers.len)] + res = + try: + await peer.restClient.getLightClientOptimisticUpdate(client.cfg, client.forkDigests) + except CatchableError as e: + raise newException(CancelledError, e.msg) + + ok(res) + + updateScoreProc = proc(reqId: uint64, value: int) = + let peer = client.peers[reqId mod uint64(client.peers.len)] + peer.score += value + + EthLCBackend( + getLightClientBootstrap: getLCBootstrapProc, + getLightClientUpdatesByRange: getLCUpdatesProc, + getLightClientFinalityUpdate: getLCFinalityProc, + getLightClientOptimisticUpdate: getLCOptimisticProc, + updateScore: updateScoreProc, + ) + + diff --git a/nimbus_verified_proxy/lc/lc.nim b/nimbus_verified_proxy/lc/lc.nim index 18c73de1d2..9ed807b820 100644 --- a/nimbus_verified_proxy/lc/lc.nim +++ b/nimbus_verified_proxy/lc/lc.nim @@ -116,28 +116,32 @@ proc new*( forkyStore.is_next_sync_committee_known else: false - func getFinalizedPeriod(): SyncCommitteePeriod = + + func getFinalizedSlot(): Slot = withForkyStore(lightClient.store[]): when lcDataFork > LightClientDataFork.None: - forkyStore.finalized_header.beacon.slot.sync_committee_period + forkyStore.finalized_header.beacon.slot else: - GENESIS_SLOT.sync_committee_period + GENESIS_SLOT - func getOptimisticPeriod(): SyncCommitteePeriod = + func getOptimisticSlot(): Slot = withForkyStore(lightClient.store[]): when lcDataFork > LightClientDataFork.None: - forkyStore.optimistic_header.beacon.slot.sync_committee_period + forkyStore.optimistic_header.beacon.slot else: - GENESIS_SLOT.sync_committee_period + GENESIS_SLOT lightClient.manager = LightClientManager.init( rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier, isLightClientStoreInitialized, - isNextSyncCommitteeKnown, getFinalizedPeriod, getOptimisticPeriod, getBeaconTime, + isNextSyncCommitteeKnown, getFinalizedSlot, getOptimisticSlot, getBeaconTime, ) lightClient +proc setBackend*(lightClient: LightClient, backend: EthLCBackend) = + lightClient.manager.backend = backend + proc start*(lightClient: LightClient) = info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot lightClient.manager.start() diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim index eb9e216ec2..9c4900eb44 100644 --- a/nimbus_verified_proxy/lc/lc_manager.nim +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -51,23 +51,23 @@ type proc(): Option[Eth2Digest] {.gcsafe, raises: [].} GetBoolCallback* = proc(): bool {.gcsafe, raises: [].} - GetSyncCommitteePeriodCallback* = - proc(): SyncCommitteePeriod {.gcsafe, raises: [].} + GetSlotCallback* = + proc(): Slot {.gcsafe, raises: [].} - LightClientUpdatesByRangeResponse = NetRes[List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]] + LightClientUpdatesByRangeResponse* = NetRes[seq[ForkedLightClientUpdate]] LightClientBootstrapProc = proc(id: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} LightClientUpdatesByRangeProc = proc(id: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} LightClientFinalityUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError]).} LightClientOptimisticUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError]).} - ReportRequestQualityProc = proc(id: uint64, value: int) {.gcsafe, raises: [].} + UpdateScoreProc = proc(id: uint64, value: int) {.gcsafe, raises: [].} EthLCBackend* = object - getLightClientBootstrap: LightClientBootstrapProc - getLightClientUpdatesByRange: LightClientUpdatesByRangeProc - getLightClientFinalityUpdate: LightClientFinalityUpdateProc - getLightClientOptimisticUpdate: LightClientOptimisticUpdateProc - reportRequestQuality: ReportRequestQualityProc + getLightClientBootstrap*: LightClientBootstrapProc + getLightClientUpdatesByRange*: LightClientUpdatesByRangeProc + getLightClientFinalityUpdate*: LightClientFinalityUpdateProc + getLightClientOptimisticUpdate*: LightClientOptimisticUpdateProc + updateScore*: UpdateScoreProc LightClientManager* = object rng: ref HmacDrbgContext @@ -79,8 +79,8 @@ type optimisticUpdateVerifier: OptimisticUpdateVerifier isLightClientStoreInitialized: GetBoolCallback isNextSyncCommitteeKnown: GetBoolCallback - getFinalizedPeriod: GetSyncCommitteePeriodCallback - getOptimisticPeriod: GetSyncCommitteePeriodCallback + getFinalizedSlot: GetSlotCallback + getOptimisticSlot: GetSlotCallback getBeaconTime: GetBeaconTimeFn loopFuture: Future[void].Raising([CancelledError]) @@ -94,8 +94,8 @@ func init*( optimisticUpdateVerifier: OptimisticUpdateVerifier, isLightClientStoreInitialized: GetBoolCallback, isNextSyncCommitteeKnown: GetBoolCallback, - getFinalizedPeriod: GetSyncCommitteePeriodCallback, - getOptimisticPeriod: GetSyncCommitteePeriodCallback, + getFinalizedSlot: GetSlotCallback, + getOptimisticSlot: GetSlotCallback, getBeaconTime: GetBeaconTimeFn, ): LightClientManager = ## Initialize light client manager. @@ -108,8 +108,8 @@ func init*( optimisticUpdateVerifier: optimisticUpdateVerifier, isLightClientStoreInitialized: isLightClientStoreInitialized, isNextSyncCommitteeKnown: isNextSyncCommitteeKnown, - getFinalizedPeriod: getFinalizedPeriod, - getOptimisticPeriod: getOptimisticPeriod, + getFinalizedSlot: getFinalizedSlot, + getOptimisticSlot: getOptimisticSlot, getBeaconTime: getBeaconTime) # https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap @@ -171,7 +171,7 @@ template valueVerifier[E]( iterator values(v: auto): auto = ## Local helper for `workerTask` to share the same implementation for both ## scalar and aggregate values, by treating scalars as 1-length aggregates. - when v is List: + when v is seq: for i in v: yield i else: @@ -215,7 +215,7 @@ proc workerTask[E]( notice "Received value from an unviable fork", value = forkyObject, endpoint = E.name else: notice "Received value from an unviable fork", endpoint = E.name - self.backend.reportRequestQuality(reqId, PeerScoreUnviableFork) + self.backend.updateScore(reqId, PeerScoreUnviableFork) return didProgress of LightClientVerifierError.Invalid: # Descore, received data is malformed @@ -224,19 +224,19 @@ proc workerTask[E]( warn "Received invalid value", value = forkyObject.shortLog, endpoint = E.name else: warn "Received invalid value", endpoint = E.name - self.backend.reportRequestQuality(reqId, PeerScoreBadValues) + self.backend.updateScore(reqId, PeerScoreBadValues) return didProgress else: # Reward, peer returned something useful applyReward = true didProgress = true if applyReward: - self.backend.reportRequestQuality(reqId, PeerScoreGoodValues) + self.backend.updateScore(reqId, PeerScoreGoodValues) else: - self.backend.reportRequestQuality(reqId, PeerScoreNoValues) + self.backend.updateScore(reqId, PeerScoreNoValues) debug "Failed to receive value on request", value, endpoint = E.name except ResponseError as exc: - self.backend.reportRequestQuality(reqId, PeerScoreBadValues) + self.backend.updateScore(reqId, PeerScoreBadValues) warn "Received invalid response", error = exc.msg, endpoint = E.name except CancelledError as exc: raise exc @@ -324,59 +324,77 @@ template query[E]( # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.0/specs/altair/light-client/light-client.md#light-client-sync-process proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = - var nextSyncTaskTime = self.getBeaconTime() + var + downloadOptimistic = true + downloadFinality = false + didOptimisticProgress = false + didFinalityProgress = false while true: - # Periodically wake and check for changes - let wallTime = self.getBeaconTime() - if wallTime < nextSyncTaskTime: - await sleepAsync(chronos.seconds(2)) - continue + let + wallTime = self.getBeaconTime() + currentSlot = wallTime.slotOrZero() + currentEpoch = (currentSlot mod SLOTS_PER_EPOCH) + currentPeriod = currentSlot.sync_committee_period + finalizedSlot = self.getFinalizedSlot() + finalizedPeriod = finalizedSlot.sync_committee_period + finalizedEpoch = (finalizedSlot mod SLOTS_PER_EPOCH) + optimisticSlot = self.getOptimisticSlot() + optimisticPeriod = optimisticSlot.sync_committee_period + optimisitcEpoch = (optimisticSlot mod SLOTS_PER_EPOCH) # Obtain bootstrap data once a trusted block root is supplied if not self.isLightClientStoreInitialized(): let trustedBlockRoot = self.getTrustedBlockRoot() + + # reattempt bootstrap download in 2 seconds if trustedBlockRoot.isNone: + debug "TrustedBlockRoot unavaialble re-attempting bootstrap download" await sleepAsync(chronos.seconds(2)) continue let didProgress = await self.query(Bootstrap, trustedBlockRoot.get) - nextSyncTaskTime = - if didProgress: - wallTime - else: - wallTime + self.rng.computeDelayWithJitter(chronos.seconds(0)) - continue - # Fetch updates - let - current = wallTime.slotOrZero().sync_committee_period - - syncTask = nextLightClientSyncTask( - current = current, - finalized = self.getFinalizedPeriod(), - optimistic = self.getOptimisticPeriod(), - isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()) - - didProgress = - case syncTask.kind - of LcSyncKind.UpdatesByRange: - await self.query(UpdatesByRange, - (startPeriod: syncTask.startPeriod, count: syncTask.count)) - of LcSyncKind.FinalityUpdate: - await self.query(FinalityUpdate) - of LcSyncKind.OptimisticUpdate: - await self.query(OptimisticUpdate) - - nextSyncTaskTime = - wallTime + - self.rng.nextLcSyncTaskDelay( - wallTime, - finalized = self.getFinalizedPeriod(), - optimistic = self.getOptimisticPeriod(), - isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(), - didLatestSyncTaskProgress = didProgress - ) + # reattempt bootstrap download in 2 seconds + if not didProgress: + debug "Re-attempting bootstrap download" + await sleepAsync(chronos.seconds(2)) + continue + + # check and download sync committee updates + if finalizedPeriod == optimisticPeriod and not self.isNextSyncCommitteeKnown(): + if finalizedPeriod >= currentPeriod: + debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=1 + discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(1))) + else: + let count = min(currentPeriod - finalizedPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES) + debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=count + discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count))) + elif finalizedPeriod + 1 < currentPeriod: + let count = min(currentPeriod - (finalizedPeriod + 1), MAX_REQUEST_LIGHT_CLIENT_UPDATES) + debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=count + discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count))) + + # check and download optimistic update + if optimisticSlot < currentSlot: + debug "Downloading light client optimistic updates", slot=currentSlot + let didProgress = await self.query(OptimisticUpdate) + if not didProgress: + # retry in 2 seconds + await sleepAsync(chronos.seconds(2)) + continue + + # check and download finality update + if currentEpoch > finalizedEpoch + 2: + debug "Downloading light client finality updates", slot=currentSlot + let didProgress = await self.query(FinalityUpdate) + if not didProgress: + # retry in two seconds + await sleepAsync(chronos.seconds(2)) + continue + + # check for updates every slot + await sleepAsync(chronos.seconds(int64(SECONDS_PER_SLOT))) proc start*(self: var LightClientManager) = ## Start light client manager's loop. diff --git a/nimbus_verified_proxy/nimbus_verified_proxy.nim b/nimbus_verified_proxy/nimbus_verified_proxy.nim index 9ffab73e98..024db5ddf4 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy.nim @@ -26,6 +26,7 @@ import ./engine/utils, ./engine/types, ./lc/lc, + ./json_lc_backend, ./json_rpc_backend, ./json_rpc_frontend, ../execution_chain/version_info @@ -148,13 +149,6 @@ proc run*( genesisBlockRoot = get_initial_beacon_block(genesisState[]).root - # transform the config to fit as a light client config and as a p2p node(Eth2Node) config - var lcConfig = config.asLightClientConf() - for node in metadata.bootstrapNodes: - lcConfig.bootstrapNodes.add node - - # create new network keys, create a p2p node(Eth2Node) and create a light client - let rng = keys.newRng() # light client is set to optimistic finalization mode @@ -162,6 +156,15 @@ proc run*( rng, cfg, forkDigests, getBeaconTime, genesis_validators_root, LightClientFinalizationMode.Optimistic, ) + # REST client for json LC updates + lcRestClient = LCRestClient.new(cfg, forkDigests) + + debugEcho config.lcEndpoint + + # add endpoints to the client + lcRestClient.addEndpoint(config.lcEndpoint) + lightClient.setBackend(lcRestClient.getEthLCBackend()) + # verify chain id that the proxy is connected to waitFor engine.verifyChaindId() @@ -215,6 +218,7 @@ proc run*( poll() if ctx != nil and ctx.stop: # Cleanup + waitFor lcRestClient.closeAll() waitFor jsonRpcClient.stop() waitFor jsonRpcServer.stop() ctx.cleanup() diff --git a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim index 3579a2fe2a..a6dbed3ccf 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim @@ -114,71 +114,13 @@ type VerifiedProxyConf* = object name: "frontend-url" .}: Web3Url - # Libp2p - bootstrapNodes* {. - desc: "Specifies one or more bootstrap nodes to use when connecting to the network" - abbr: "b" - name: "bootstrap-node" .}: seq[string] - - bootstrapNodesFile* {. - desc: "Specifies a line-delimited file of bootstrap Ethereum network addresses" - defaultValue: "" - name: "bootstrap-file" .}: InputFile - - listenAddress* {. - desc: "Listening address for the Ethereum LibP2P and Discovery v5 traffic" - name: "listen-address" .}: Option[IpAddress] - - tcpPort* {. - desc: "Listening TCP port for Ethereum LibP2P traffic" - defaultValue: defaultEth2TcpPort - defaultValueDesc: $defaultEth2TcpPortDesc - name: "tcp-port" .}: Port - - udpPort* {. - desc: "Listening UDP port for node discovery" - defaultValue: defaultEth2TcpPort - defaultValueDesc: $defaultEth2TcpPortDesc - name: "udp-port" .}: Port - - # TODO: Select a lower amount of peers. - maxPeers* {. - desc: "The target number of peers to connect to", - defaultValue: 160, # 5 (fanout) * 64 (subnets) / 2 (subs) for a healthy mesh - name: "max-peers" - .}: int - - hardMaxPeers* {. - desc: "The maximum number of peers to connect to. Defaults to maxPeers * 1.5" - name: "hard-max-peers" .}: Option[int] - - nat* {. - desc: "Specify method to use for determining public address. " & - "Must be one of: any, none, upnp, pmp, extip:" - defaultValue: NatConfig(hasExtIp: false, nat: NatAny) - defaultValueDesc: "any" - name: "nat" .}: NatConfig - - enrAutoUpdate* {. - desc: "Discovery can automatically update its ENR with the IP address " & - "and UDP port as seen by other nodes it communicates with. " & - "This option allows to enable/disable this functionality" - defaultValue: false - name: "enr-auto-update" .}: bool - - agentString* {. - defaultValue: "nimbus", - desc: "Node agent string which is used as identifier in the LibP2P network", - name: "agent-string" + # (Untrusted) web3 provider + # No default - Needs to be provided by the user + lcEndpoint* {. + desc: "command seperated URLs of the light client data provider", + name: "lc-endpoint" .}: string - discv5Enabled* {.desc: "Enable Discovery v5", defaultValue: true, name: "discv5".}: - bool - - directPeers* {. - desc: "The list of priviledged, secure and known peers to connect and maintain the connection to, this requires a not random netkey-file. In the complete multiaddress format like: /ip4/
/tcp//p2p/. Peering agreements are established out of band and must be reciprocal." - name: "direct-peer" .}: seq[string] - #!fmt: on proc parseCmdArg*(T: type Web3Url, p: string): T {.raises: [ValueError].} = @@ -198,32 +140,6 @@ proc parseCmdArg*(T: type Web3Url, p: string): T {.raises: [ValueError].} = proc completeCmdArg*(T: type Web3Url, val: string): seq[string] = return @[] -func asLightClientConf*(pc: VerifiedProxyConf): LightClientConf = - return LightClientConf( - configFile: pc.configFile, - logLevel: pc.logLevel, - logStdout: pc.logStdout, - logFile: none(OutFile), - dataDirFlag: pc.dataDirFlag, - eth2Network: pc.eth2Network, - bootstrapNodes: pc.bootstrapNodes, - bootstrapNodesFile: pc.bootstrapNodesFile, - listenAddress: pc.listenAddress, - tcpPort: pc.tcpPort, - udpPort: pc.udpPort, - maxPeers: pc.maxPeers, - hardMaxPeers: pc.hardMaxPeers, - nat: pc.nat, - enrAutoUpdate: pc.enrAutoUpdate, - agentString: pc.agentString, - discv5Enabled: pc.discv5Enabled, - directPeers: pc.directPeers, - trustedBlockRoot: pc.trustedBlockRoot, - web3Urls: @[EngineApiUrlConfigValue(url: pc.backendUrl.web3Url)], - jwtSecret: none(InputFile), - stopAtEpoch: 0, - ) - # TODO: Cannot use ClientConfig in VerifiedProxyConf due to the fact that # it contain `set[TLSFlags]` which does not have proper toml serialization func asClientConfig*(url: Web3Url): ClientConfig = From 4cc71f9c5131c4fc96b6642c1e817c738974cdeb Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Tue, 21 Oct 2025 15:47:39 +0530 Subject: [PATCH 04/16] format --- nimbus_verified_proxy/json_lc_backend.nim | 59 +++--- nimbus_verified_proxy/lc/lc.nim | 12 +- nimbus_verified_proxy/lc/lc_manager.nim | 168 ++++++++++-------- .../nimbus_verified_proxy.nim | 3 +- 4 files changed, 136 insertions(+), 106 deletions(-) diff --git a/nimbus_verified_proxy/json_lc_backend.nim b/nimbus_verified_proxy/json_lc_backend.nim index 8908fd34a5..abcf584c5c 100644 --- a/nimbus_verified_proxy/json_lc_backend.nim +++ b/nimbus_verified_proxy/json_lc_backend.nim @@ -21,9 +21,9 @@ import logScope: topics = "SSZLCRestClient" -const - MaxMessageBodyBytes* = 128 * 1024 * 1024 # 128 MB (JSON encoded) - BASE_URL="/eth/v1/beacon/light_client" +const + MaxMessageBodyBytes* = 128 * 1024 * 1024 # 128 MB (JSON encoded) + BASE_URL = "/eth/v1/beacon/light_client" type LCRestPeer = ref object @@ -36,7 +36,9 @@ type peers: seq[LCRestPeer] urls: seq[string] -func new*(T: type LCRestClient, cfg: RuntimeConfig, forkDigests: ref ForkDigests): LCRestClient = +func new*( + T: type LCRestClient, cfg: RuntimeConfig, forkDigests: ref ForkDigests +): LCRestClient = LCRestClient(cfg: cfg, forkDigests: forkDigests, peers: @[]) proc addEndpoint*(client: LCRestClient, endpoint: string) {.raises: [ValueError].} = @@ -57,53 +59,72 @@ proc closeAll*(client: LCRestClient) {.async: (raises: []).} = client.urls.setLen(0) proc getEthLCBackend*(client: LCRestClient): EthLCBackend = - let - getLCBootstrapProc = proc(reqId: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} = + getLCBootstrapProc = proc( + reqId: uint64, blockRoot: Eth2Digest + ): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} = let peer = client.peers[reqId mod uint64(client.peers.len)] - res = + res = try: - await peer.restClient.getLightClientBootstrap(blockRoot, client.cfg, client.forkDigests) + await peer.restClient.getLightClientBootstrap( + blockRoot, client.cfg, client.forkDigests + ) except CatchableError as e: raise newException(CancelledError, e.msg) ok(res) - getLCUpdatesProc = proc(reqId: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} = + getLCUpdatesProc = proc( + reqId: uint64, startPeriod: SyncCommitteePeriod, count: uint64 + ): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} = let peer = client.peers[reqId mod uint64(client.peers.len)] - res = + res = try: - await peer.restClient.getLightClientUpdatesByRange(startPeriod, count, client.cfg, client.forkDigests) + await peer.restClient.getLightClientUpdatesByRange( + startPeriod, count, client.cfg, client.forkDigests + ) except CatchableError as e: raise newException(CancelledError, e.msg) ok(res) - getLCFinalityProc = proc(reqId: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError]).} = + getLCFinalityProc = proc( + reqId: uint64 + ): Future[NetRes[ForkedLightClientFinalityUpdate]] {. + async: (raises: [CancelledError]) + .} = let peer = client.peers[reqId mod uint64(client.peers.len)] - res = + res = try: - await peer.restClient.getLightClientFinalityUpdate(client.cfg, client.forkDigests) + await peer.restClient.getLightClientFinalityUpdate( + client.cfg, client.forkDigests + ) except CatchableError as e: raise newException(CancelledError, e.msg) ok(res) - getLCOptimisticProc = proc(reqId: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError]).} = + getLCOptimisticProc = proc( + reqId: uint64 + ): Future[NetRes[ForkedLightClientOptimisticUpdate]] {. + async: (raises: [CancelledError]) + .} = let peer = client.peers[reqId mod uint64(client.peers.len)] - res = + res = try: - await peer.restClient.getLightClientOptimisticUpdate(client.cfg, client.forkDigests) + await peer.restClient.getLightClientOptimisticUpdate( + client.cfg, client.forkDigests + ) except CatchableError as e: raise newException(CancelledError, e.msg) ok(res) - updateScoreProc = proc(reqId: uint64, value: int) = + updateScoreProc = proc(reqId: uint64, value: int) = let peer = client.peers[reqId mod uint64(client.peers.len)] peer.score += value @@ -114,5 +135,3 @@ proc getEthLCBackend*(client: LCRestClient): EthLCBackend = getLightClientOptimisticUpdate: getLCOptimisticProc, updateScore: updateScoreProc, ) - - diff --git a/nimbus_verified_proxy/lc/lc.nim b/nimbus_verified_proxy/lc/lc.nim index 9ed807b820..a46f35af61 100644 --- a/nimbus_verified_proxy/lc/lc.nim +++ b/nimbus_verified_proxy/lc/lc.nim @@ -79,9 +79,9 @@ proc new*( # initialize without dumping lightClient.processor = LightClientProcessor.new( - false, ".", ".", cfg, genesis_validators_root, - finalizationMode, lightClient.store, getBeaconTime, getTrustedBlockRoot, - onStoreInitialized, onFinalizedHeader, onOptimisticHeader, + false, ".", ".", cfg, genesis_validators_root, finalizationMode, lightClient.store, + getBeaconTime, getTrustedBlockRoot, onStoreInitialized, onFinalizedHeader, + onOptimisticHeader, ) proc lightClientVerifier( @@ -132,9 +132,9 @@ proc new*( GENESIS_SLOT lightClient.manager = LightClientManager.init( - rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, - finalityVerifier, optimisticVerifier, isLightClientStoreInitialized, - isNextSyncCommitteeKnown, getFinalizedSlot, getOptimisticSlot, getBeaconTime, + rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, finalityVerifier, + optimisticVerifier, isLightClientStoreInitialized, isNextSyncCommitteeKnown, + getFinalizedSlot, getOptimisticSlot, getBeaconTime, ) lightClient diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim index 9c4900eb44..1e5af2272c 100644 --- a/nimbus_verified_proxy/lc/lc_manager.nim +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -16,50 +16,50 @@ import logScope: topics = "lcman" -const - MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128 +const MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128 type Nothing = object ResponseError = object of CatchableError - Endpoint[K, V] = - (K, V) # https://github.com/nim-lang/Nim/issues/19531 - Bootstrap = - Endpoint[Eth2Digest, ForkedLightClientBootstrap] - UpdatesByRange = - Endpoint[ - tuple[startPeriod: SyncCommitteePeriod, count: uint64], - ForkedLightClientUpdate] - FinalityUpdate = - Endpoint[Nothing, ForkedLightClientFinalityUpdate] - OptimisticUpdate = - Endpoint[Nothing, ForkedLightClientOptimisticUpdate] + Endpoint[K, V] = (K, V) # https://github.com/nim-lang/Nim/issues/19531 + Bootstrap = Endpoint[Eth2Digest, ForkedLightClientBootstrap] + UpdatesByRange = Endpoint[ + tuple[startPeriod: SyncCommitteePeriod, count: uint64], ForkedLightClientUpdate + ] + FinalityUpdate = Endpoint[Nothing, ForkedLightClientFinalityUpdate] + OptimisticUpdate = Endpoint[Nothing, ForkedLightClientOptimisticUpdate] NetRes*[T] = Result[T, void] - ValueVerifier[V] = - proc(v: V): Future[Result[void, LightClientVerifierError]] {.async: (raises: [CancelledError]).} - BootstrapVerifier* = - ValueVerifier[ForkedLightClientBootstrap] - UpdateVerifier* = - ValueVerifier[ForkedLightClientUpdate] - FinalityUpdateVerifier* = - ValueVerifier[ForkedLightClientFinalityUpdate] - OptimisticUpdateVerifier* = - ValueVerifier[ForkedLightClientOptimisticUpdate] - - GetTrustedBlockRootCallback* = - proc(): Option[Eth2Digest] {.gcsafe, raises: [].} - GetBoolCallback* = - proc(): bool {.gcsafe, raises: [].} - GetSlotCallback* = - proc(): Slot {.gcsafe, raises: [].} + ValueVerifier[V] = proc(v: V): Future[Result[void, LightClientVerifierError]] {. + async: (raises: [CancelledError]) + .} + BootstrapVerifier* = ValueVerifier[ForkedLightClientBootstrap] + UpdateVerifier* = ValueVerifier[ForkedLightClientUpdate] + FinalityUpdateVerifier* = ValueVerifier[ForkedLightClientFinalityUpdate] + OptimisticUpdateVerifier* = ValueVerifier[ForkedLightClientOptimisticUpdate] + + GetTrustedBlockRootCallback* = proc(): Option[Eth2Digest] {.gcsafe, raises: [].} + GetBoolCallback* = proc(): bool {.gcsafe, raises: [].} + GetSlotCallback* = proc(): Slot {.gcsafe, raises: [].} LightClientUpdatesByRangeResponse* = NetRes[seq[ForkedLightClientUpdate]] - LightClientBootstrapProc = proc(id: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} - LightClientUpdatesByRangeProc = proc(id: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} - LightClientFinalityUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError]).} - LightClientOptimisticUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError]).} + LightClientBootstrapProc = proc( + id: uint64, blockRoot: Eth2Digest + ): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} + LightClientUpdatesByRangeProc = proc( + id: uint64, startPeriod: SyncCommitteePeriod, count: uint64 + ): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} + LightClientFinalityUpdateProc = proc( + id: uint64 + ): Future[NetRes[ForkedLightClientFinalityUpdate]] {. + async: (raises: [CancelledError]) + .} + LightClientOptimisticUpdateProc = proc( + id: uint64 + ): Future[NetRes[ForkedLightClientOptimisticUpdate]] {. + async: (raises: [CancelledError]) + .} UpdateScoreProc = proc(id: uint64, value: int) {.gcsafe, raises: [].} EthLCBackend* = object @@ -110,53 +110,53 @@ func init*( isNextSyncCommitteeKnown: isNextSyncCommitteeKnown, getFinalizedSlot: getFinalizedSlot, getOptimisticSlot: getOptimisticSlot, - getBeaconTime: getBeaconTime) + getBeaconTime: getBeaconTime, + ) # https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap proc doRequest( - e: typedesc[Bootstrap], - backend: EthLCBackend, - reqId: uint64, - blockRoot: Eth2Digest -): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError], raw: true).} = + e: typedesc[Bootstrap], backend: EthLCBackend, reqId: uint64, blockRoot: Eth2Digest +): Future[NetRes[ForkedLightClientBootstrap]] {. + async: (raises: [CancelledError], raw: true) +.} = backend.getLightClientBootstrap(reqId, blockRoot) # https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange proc doRequest( e: typedesc[UpdatesByRange], backend: EthLCBackend, - reqId: uint64, - key: tuple[startPeriod: SyncCommitteePeriod, count: uint64] -): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [ResponseError, CancelledError]).} = + reqId: uint64, + key: tuple[startPeriod: SyncCommitteePeriod, count: uint64], +): Future[LightClientUpdatesByRangeResponse] {. + async: (raises: [ResponseError, CancelledError]) +.} = let (startPeriod, count) = key doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES let response = await backend.getLightClientUpdatesByRange(reqId, startPeriod, count) if response.isOk: - let e = distinctBase(response.get) - .checkLightClientUpdates(startPeriod, count) + let e = distinctBase(response.get).checkLightClientUpdates(startPeriod, count) if e.isErr: raise newException(ResponseError, e.error) return response # https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate proc doRequest( - e: typedesc[FinalityUpdate], - backend: EthLCBackend, - reqId: uint64 -): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError], raw: true).} = + e: typedesc[FinalityUpdate], backend: EthLCBackend, reqId: uint64 +): Future[NetRes[ForkedLightClientFinalityUpdate]] {. + async: (raises: [CancelledError], raw: true) +.} = backend.getLightClientFinalityUpdate(reqId) # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate proc doRequest( - e: typedesc[OptimisticUpdate], - backend: EthLCBackend, - reqId: uint64 -): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError], raw: true).} = + e: typedesc[OptimisticUpdate], backend: EthLCBackend, reqId: uint64 +): Future[NetRes[ForkedLightClientOptimisticUpdate]] {. + async: (raises: [CancelledError], raw: true) +.} = backend.getLightClientOptimisticUpdate(reqId) template valueVerifier[E]( - self: LightClientManager, - e: typedesc[E] + self: LightClientManager, e: typedesc[E] ): ValueVerifier[E.V] = when E.V is ForkedLightClientBootstrap: self.bootstrapVerifier @@ -166,7 +166,9 @@ template valueVerifier[E]( self.finalityUpdateVerifier elif E.V is ForkedLightClientOptimisticUpdate: self.optimisticUpdateVerifier - else: static: doAssert false + else: + static: + doAssert false iterator values(v: auto): auto = ## Local helper for `workerTask` to share the same implementation for both @@ -178,9 +180,7 @@ iterator values(v: auto): auto = yield v proc workerTask[E]( - self: LightClientManager, - e: typedesc[E], - key: E.K + self: LightClientManager, e: typedesc[E], key: E.K ): Future[bool] {.async: (raises: [CancelledError]).} = var didProgress = false @@ -212,7 +212,8 @@ proc workerTask[E]( # Descore, peer is on an incompatible fork version withForkyObject(val): when lcDataFork > LightClientDataFork.None: - notice "Received value from an unviable fork", value = forkyObject, endpoint = E.name + notice "Received value from an unviable fork", + value = forkyObject, endpoint = E.name else: notice "Received value from an unviable fork", endpoint = E.name self.backend.updateScore(reqId, PeerScoreUnviableFork) @@ -221,7 +222,8 @@ proc workerTask[E]( # Descore, received data is malformed withForkyObject(val): when lcDataFork > LightClientDataFork.None: - warn "Received invalid value", value = forkyObject.shortLog, endpoint = E.name + warn "Received invalid value", + value = forkyObject.shortLog, endpoint = E.name else: warn "Received invalid value", endpoint = E.name self.backend.updateScore(reqId, PeerScoreBadValues) @@ -244,9 +246,7 @@ proc workerTask[E]( return didProgress proc query[E]( - self: LightClientManager, - e: typedesc[E], - key: E.K + self: LightClientManager, e: typedesc[E], key: E.K ): Future[bool] {.async: (raises: [CancelledError]).} = const PARALLEL_REQUESTS = 2 var workers: array[PARALLEL_REQUESTS, Future[bool]] @@ -317,14 +317,13 @@ proc query[E]( return progressFut.completed template query[E]( - self: LightClientManager, - e: typedesc[E] + self: LightClientManager, e: typedesc[E] ): Future[bool].Raising([CancelledError]) = self.query(e, Nothing()) # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.0/specs/altair/light-client/light-client.md#light-client-sync-process proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = - var + var downloadOptimistic = true downloadFinality = false didOptimisticProgress = false @@ -364,20 +363,31 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = # check and download sync committee updates if finalizedPeriod == optimisticPeriod and not self.isNextSyncCommitteeKnown(): if finalizedPeriod >= currentPeriod: - debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=1 - discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(1))) + debug "Downloading light client sync committee updates", + start_period = finalizedPeriod, count = 1 + discard await self.query( + UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(1)) + ) else: - let count = min(currentPeriod - finalizedPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES) - debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=count - discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count))) + let count = + min(currentPeriod - finalizedPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES) + debug "Downloading light client sync committee updates", + start_period = finalizedPeriod, count = count + discard await self.query( + UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count)) + ) elif finalizedPeriod + 1 < currentPeriod: - let count = min(currentPeriod - (finalizedPeriod + 1), MAX_REQUEST_LIGHT_CLIENT_UPDATES) - debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=count - discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count))) + let count = + min(currentPeriod - (finalizedPeriod + 1), MAX_REQUEST_LIGHT_CLIENT_UPDATES) + debug "Downloading light client sync committee updates", + start_period = finalizedPeriod, count = count + discard await self.query( + UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count)) + ) # check and download optimistic update if optimisticSlot < currentSlot: - debug "Downloading light client optimistic updates", slot=currentSlot + debug "Downloading light client optimistic updates", slot = currentSlot let didProgress = await self.query(OptimisticUpdate) if not didProgress: # retry in 2 seconds @@ -386,8 +396,8 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = # check and download finality update if currentEpoch > finalizedEpoch + 2: - debug "Downloading light client finality updates", slot=currentSlot - let didProgress = await self.query(FinalityUpdate) + debug "Downloading light client finality updates", slot = currentSlot + let didProgress = await self.query(FinalityUpdate) if not didProgress: # retry in two seconds await sleepAsync(chronos.seconds(2)) diff --git a/nimbus_verified_proxy/nimbus_verified_proxy.nim b/nimbus_verified_proxy/nimbus_verified_proxy.nim index 024db5ddf4..9e413f0e57 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy.nim @@ -153,7 +153,8 @@ proc run*( # light client is set to optimistic finalization mode lightClient = LightClient.new( - rng, cfg, forkDigests, getBeaconTime, genesis_validators_root, LightClientFinalizationMode.Optimistic, + rng, cfg, forkDigests, getBeaconTime, genesis_validators_root, + LightClientFinalizationMode.Optimistic, ) # REST client for json LC updates From 0ed7634856e0c106c37c31d30f23d6265c75f9ce Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Tue, 21 Oct 2025 15:56:05 +0530 Subject: [PATCH 05/16] fix library --- nimbus_verified_proxy/libverifproxy/verifproxy.nim | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/nimbus_verified_proxy/libverifproxy/verifproxy.nim b/nimbus_verified_proxy/libverifproxy/verifproxy.nim index 5a70b0cfad..47eb08a2a4 100644 --- a/nimbus_verified_proxy/libverifproxy/verifproxy.nim +++ b/nimbus_verified_proxy/libverifproxy/verifproxy.nim @@ -7,7 +7,6 @@ import std/[atomics, json, net], - eth/net/nat, beacon_chain/spec/[digest, network], beacon_chain/nimbus_binary_common, ../nimbus_verified_proxy, @@ -38,20 +37,14 @@ proc runContext(ctx: ptr Context) {.thread.} = let rpcAddr = jsonNode["RpcAddress"].getStr() let myConfig = VerifiedProxyConf( - listenAddress: some(defaultListenAddress), eth2Network: some(jsonNode["Eth2Network"].getStr()), trustedBlockRoot: Eth2Digest.fromHex(jsonNode["TrustedBlockRoot"].getStr()), - backendUrl: parseCmdArg(Web3Url, jsonNode["Web3Url"].getStr()), - frontendUrl: parseCmdArg(Web3Url, jsonNode["Web3Url"].getStr()), + backendUrl: parseCmdArg(Web3Url, jsonNode["backendUrl"].getStr()), + frontendUrl: parseCmdArg(Web3Url, jsonNode["frontendUrl"].getStr()), + lcEndpoint: jsonNode["lcEndpoint"].getStr(), logLevel: jsonNode["LogLevel"].getStr(), - maxPeers: 160, - nat: NatConfig(hasExtIp: false, nat: NatAny), logStdout: StdoutLogKind.Auto, dataDirFlag: none(OutDir), - tcpPort: Port(defaultEth2TcpPort), - udpPort: Port(defaultEth2TcpPort), - agentString: "nimbus", - discv5Enabled: true, ) run(myConfig, ctx) From 321289cb4edf43dab4ccb4a4e37ac5c0c2705b6c Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Tue, 21 Oct 2025 16:13:09 +0530 Subject: [PATCH 06/16] nits --- nimbus_verified_proxy/c_frontend.nim | 197 --------------------------- 1 file changed, 197 deletions(-) delete mode 100644 nimbus_verified_proxy/c_frontend.nim diff --git a/nimbus_verified_proxy/c_frontend.nim b/nimbus_verified_proxy/c_frontend.nim deleted file mode 100644 index 0daad6cba0..0000000000 --- a/nimbus_verified_proxy/c_frontend.nim +++ /dev/null @@ -1,197 +0,0 @@ -# nimbus_verified_proxy -# Copyright (c) 2025 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -{.push raises: [], gcsafe.} - -import - stint, - std/strutils, - json_rpc/[rpcserver, rpcproxy], - web3/[eth_api, eth_api_types], - ../execution_chain/rpc/cors, - ./engine/types, - ./nimbus_verified_proxy_conf - -type JsonRpcServer* = ref object - case kind*: ClientKind #we reuse clientKind for servers also - of Http: - httpServer: RpcHttpServer - of WebSocket: - wsServer: RpcWebSocketServer - -proc init*( - T: type JsonRpcServer, url: Web3Url -): JsonRpcServer {.raises: [JsonRpcError, ValueError, TransportAddressError].} = - let - auth = @[httpCors(@[])] # TODO: for now we serve all cross origin requests - parsedUrl = parseUri(url.web3Url) - hostname = if parsedUrl.hostname == "": "127.0.0.1" else: parsedUrl.hostname - port = - if parsedUrl.port == "": - 8545 - else: - parseInt(parsedUrl.port) - listenAddress = initTAddress(hostname, port) - - case url.kind - of HttpUrl: - JsonRpcServer( - kind: Http, httpServer: newRpcHttpServer([listenAddress], RpcRouter.init(), auth) - ) - of WsUrl: - let server = - JsonRpcServer(kind: WebSocket, wsServer: newRpcWebSocketServer(listenAddress)) - - server.wsServer.router = RpcRouter.init() - server - -func getServer(server: JsonRpcServer): RpcServer = - case server.kind - of Http: server.httpServer - of WebSocket: server.wsServer - -proc start*(server: JsonRpcServer): Result[void, string] = - try: - case server.kind - of Http: - server.httpServer.start() - of WebSocket: - server.wsServer.start() - except CatchableError as e: - return err(e.msg) - - ok() - -proc injectEngineFrontend*(server: JsonRpcServer, frontend: EthApiFrontend) = - server.getServer().rpc("eth_blockNumber") do() -> uint64: - await frontend.eth_blockNumber() - - server.getServer().rpc("eth_getBalance") do( - address: Address, quantityTag: BlockTag - ) -> UInt256: - await frontend.eth_getBalance(address, quantityTag) - - server.getServer().rpc("eth_getStorageAt") do( - address: Address, slot: UInt256, quantityTag: BlockTag - ) -> FixedBytes[32]: - await frontend.eth_getStorageAt(address, slot, quantityTag) - - server.getServer().rpc("eth_getTransactionCount") do( - address: Address, quantityTag: BlockTag - ) -> Quantity: - await frontend.eth_getTransactionCount(address, quantityTag) - - server.getServer().rpc("eth_getCode") do( - address: Address, quantityTag: BlockTag - ) -> seq[byte]: - await frontend.eth_getCode(address, quantityTag) - - server.getServer().rpc("eth_getBlockByHash") do( - blockHash: Hash32, fullTransactions: bool - ) -> BlockObject: - await frontend.eth_getBlockByHash(blockHash, fullTransactions) - - server.getServer().rpc("eth_getBlockByNumber") do( - blockTag: BlockTag, fullTransactions: bool - ) -> BlockObject: - await frontend.eth_getBlockByNumber(blockTag, fullTransactions) - - server.getServer().rpc("eth_getUncleCountByBlockNumber") do( - blockTag: BlockTag - ) -> Quantity: - await frontend.eth_getUncleCountByBlockNumber(blockTag) - - server.getServer().rpc("eth_getUncleCountByBlockHash") do( - blockHash: Hash32 - ) -> Quantity: - await frontend.eth_getUncleCountByBlockHash(blockHash) - - server.getServer().rpc("eth_getBlockTransactionCountByNumber") do( - blockTag: BlockTag - ) -> Quantity: - await frontend.eth_getBlockTransactionCountByNumber(blockTag) - - server.getServer().rpc("eth_getBlockTransactionCountByHash") do( - blockHash: Hash32 - ) -> Quantity: - await frontend.eth_getBlockTransactionCountByHash(blockHash) - - server.getServer().rpc("eth_getTransactionByBlockNumberAndIndex") do( - blockTag: BlockTag, index: Quantity - ) -> TransactionObject: - await frontend.eth_getTransactionByBlockNumberAndIndex(blockTag, index) - - server.getServer().rpc("eth_getTransactionByBlockHashAndIndex") do( - blockHash: Hash32, index: Quantity - ) -> TransactionObject: - await frontend.eth_getTransactionByBlockHashAndIndex(blockHash, index) - - server.getServer().rpc("eth_call") do( - tx: TransactionArgs, blockTag: BlockTag, optimisticStateFetch: Opt[bool] - ) -> seq[byte]: - await frontend.eth_call(tx, blockTag, optimisticStateFetch.get(true)) - - server.getServer().rpc("eth_createAccessList") do( - tx: TransactionArgs, blockTag: BlockTag, optimisticStateFetch: Opt[bool] - ) -> AccessListResult: - await frontend.eth_createAccessList(tx, blockTag, optimisticStateFetch.get(true)) - - server.getServer().rpc("eth_estimateGas") do( - tx: TransactionArgs, blockTag: BlockTag, optimisticStateFetch: Opt[bool] - ) -> Quantity: - await frontend.eth_estimateGas(tx, blockTag, optimisticStateFetch.get(true)) - - server.getServer().rpc("eth_getTransactionByHash") do( - txHash: Hash32 - ) -> TransactionObject: - await frontend.eth_getTransactionByHash(txHash) - - server.getServer().rpc("eth_getBlockReceipts") do( - blockTag: BlockTag - ) -> Opt[seq[ReceiptObject]]: - await frontend.eth_getBlockReceipts(blockTag) - - server.getServer().rpc("eth_getTransactionReceipt") do( - txHash: Hash32 - ) -> ReceiptObject: - await frontend.eth_getTransactionReceipt(txHash) - - server.getServer().rpc("eth_getLogs") do( - filterOptions: FilterOptions - ) -> seq[LogObject]: - await frontend.eth_getLogs(filterOptions) - - server.getServer().rpc("eth_newFilter") do(filterOptions: FilterOptions) -> string: - await frontend.eth_newFilter(filterOptions) - - server.getServer().rpc("eth_uninstallFilter") do(filterId: string) -> bool: - await frontend.eth_uninstallFilter(filterId) - - server.getServer().rpc("eth_getFilterLogs") do(filterId: string) -> seq[LogObject]: - await frontend.eth_getFilterLogs(filterId) - - server.getServer().rpc("eth_getFilterChanges") do(filterId: string) -> seq[LogObject]: - await frontend.eth_getFilterChanges(filterId) - - server.getServer().rpc("eth_blobBaseFee") do() -> UInt256: - await frontend.eth_blobBaseFee() - - server.getServer().rpc("eth_gasPrice") do() -> Quantity: - await frontend.eth_gasPrice() - - server.getServer().rpc("eth_maxPriorityFeePerGas") do() -> Quantity: - await frontend.eth_maxPriorityFeePerGas() - -proc stop*(server: JsonRpcServer) {.async: (raises: [CancelledError]).} = - try: - case server.kind - of Http: - await server.httpServer.closeWait() - of WebSocket: - await server.wsServer.closeWait() - except CatchableError as e: - raise newException(CancelledError, e.msg) From b63749d459d66fc0f2ada2d820c958b233dc3a5d Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Tue, 21 Oct 2025 18:03:16 +0530 Subject: [PATCH 07/16] multiple endpoints in config --- nimbus_verified_proxy/json_lc_backend.nim | 15 ++-- .../nimbus_verified_proxy.nim | 4 +- .../nimbus_verified_proxy_conf.nim | 70 +++++++++++++------ 3 files changed, 57 insertions(+), 32 deletions(-) diff --git a/nimbus_verified_proxy/json_lc_backend.nim b/nimbus_verified_proxy/json_lc_backend.nim index abcf584c5c..65223ca721 100644 --- a/nimbus_verified_proxy/json_lc_backend.nim +++ b/nimbus_verified_proxy/json_lc_backend.nim @@ -41,15 +41,16 @@ func new*( ): LCRestClient = LCRestClient(cfg: cfg, forkDigests: forkDigests, peers: @[]) -proc addEndpoint*(client: LCRestClient, endpoint: string) {.raises: [ValueError].} = - if endpoint in client.urls: - raise newException(ValueError, "Endpoint already added") +proc addEndpoints*(client: LCRestClient, urlList: UrlList) {.raises: [ValueError].} = + for endpoint in urlList.urls: + if endpoint in client.urls: + continue - let restClient = RestClientRef.new(endpoint).valueOr: - raise newException(ValueError, $error) + let restClient = RestClientRef.new(endpoint).valueOr: + raise newException(ValueError, $error) - client.peers.add(LCRestPeer(score: 0, restClient: restClient)) - client.urls.add(endpoint) + client.peers.add(LCRestPeer(score: 0, restClient: restClient)) + client.urls.add(endpoint) proc closeAll*(client: LCRestClient) {.async: (raises: []).} = for peer in client.peers: diff --git a/nimbus_verified_proxy/nimbus_verified_proxy.nim b/nimbus_verified_proxy/nimbus_verified_proxy.nim index 9e413f0e57..140d110776 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy.nim @@ -160,10 +160,8 @@ proc run*( # REST client for json LC updates lcRestClient = LCRestClient.new(cfg, forkDigests) - debugEcho config.lcEndpoint - # add endpoints to the client - lcRestClient.addEndpoint(config.lcEndpoint) + lcRestClient.addEndpoints(config.lcEndpoints) lightClient.setBackend(lcRestClient.getEthLCBackend()) # verify chain id that the proxy is connected to diff --git a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim index a6dbed3ccf..b549eb1e04 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim @@ -12,7 +12,8 @@ import json_rpc/rpcproxy, # must be early (compilation annoyance) json_serialization/std/net, beacon_chain/conf_light_client, - beacon_chain/nimbus_binary_common + beacon_chain/nimbus_binary_common, + std/strutils export net @@ -25,37 +26,45 @@ type kind*: Web3UrlKind web3Url*: string + UrlList* = object + urls*: seq[string] + #!fmt: off type VerifiedProxyConf* = object # Config configFile* {. - desc: "Loads the configuration from a TOML file" - name: "config-file" .}: Option[InputFile] + desc: "Loads the configuration from a TOML file", + name: "config-file" + .}: Option[InputFile] # Logging logLevel* {. - desc: "Sets the log level" - defaultValue: "INFO" - name: "log-level" .}: string + desc: "Sets the log level", + defaultValue: "INFO", + name: "log-level" + .}: string logStdout* {. - hidden - desc: "Specifies what kind of logs should be written to stdout (auto, colors, nocolors, json)" - defaultValueDesc: "auto" - defaultValue: StdoutLogKind.Auto - name: "log-format" .}: StdoutLogKind + hidden, + desc: "Specifies what kind of logs should be written to stdout (auto, colors, nocolors, json)", + defaultValueDesc: "auto", + defaultValue: StdoutLogKind.Auto, + name: "log-format" + .}: StdoutLogKind # Storage dataDirFlag* {. - desc: "The directory where nimbus will store all blockchain data" - abbr: "d" - name: "data-dir" .}: Option[OutDir] + desc: "The directory where nimbus will store all blockchain data", + abbr: "d", + name: "data-dir" + .}: Option[OutDir] # Network eth2Network* {. - desc: "The Eth2 network to join" - defaultValueDesc: "mainnet" - name: "network" .}: Option[string] + desc: "The Eth2 network to join", + defaultValueDesc: "mainnet", + name: "network" + .}: Option[string] accountCacheLen* {. hidden, @@ -95,8 +104,9 @@ type VerifiedProxyConf* = object # Consensus light sync # No default - Needs to be provided by the user trustedBlockRoot* {. - desc: "Recent trusted finalized block root to initialize light client from" - name: "trusted-block-root" .}: Eth2Digest + desc: "Recent trusted finalized block root to initialize light client from", + name: "trusted-block-root" + .}: Eth2Digest # (Untrusted) web3 provider # No default - Needs to be provided by the user @@ -116,10 +126,10 @@ type VerifiedProxyConf* = object # (Untrusted) web3 provider # No default - Needs to be provided by the user - lcEndpoint* {. + lcEndpoints* {. desc: "command seperated URLs of the light client data provider", - name: "lc-endpoint" - .}: string + name: "lc-endpoints" + .}: UrlList #!fmt: on @@ -137,9 +147,25 @@ proc parseCmdArg*(T: type Web3Url, p: string): T {.raises: [ValueError].} = ValueError, "Web3 url should have defined scheme (http/https/ws/wss)" ) +proc parseCmdArg*(T: type UrlList, p: string): T {.raises: [ValueError].} = + let urls = p.split(',') + + for u in urls: + let + parsed = parseUri(u) + normalizedScheme = parsed.scheme.toLowerAscii() + + if not (normalizedScheme == "http" or normalizedScheme == "https"): + raise newException(ValueError, "Light Client Endpoint should be a http(s) url") + + UrlList(urls: urls) + proc completeCmdArg*(T: type Web3Url, val: string): seq[string] = return @[] +proc completeCmdArg*(T: type UrlList, val: string): seq[string] = + return @[] + # TODO: Cannot use ClientConfig in VerifiedProxyConf due to the fact that # it contain `set[TLSFlags]` which does not have proper toml serialization func asClientConfig*(url: Web3Url): ClientConfig = From 3dd8ae77cbf8c7e81da34eea2245342c39d62a64 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Tue, 21 Oct 2025 20:30:15 +0530 Subject: [PATCH 08/16] fix worker issue --- nimbus_verified_proxy/lc/lc_manager.nim | 72 +++++++------------ .../libverifproxy/verifproxy.nim | 2 +- 2 files changed, 25 insertions(+), 49 deletions(-) diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim index 1e5af2272c..a3255c6690 100644 --- a/nimbus_verified_proxy/lc/lc_manager.nim +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -248,14 +248,14 @@ proc workerTask[E]( proc query[E]( self: LightClientManager, e: typedesc[E], key: E.K ): Future[bool] {.async: (raises: [CancelledError]).} = - const PARALLEL_REQUESTS = 2 - var workers: array[PARALLEL_REQUESTS, Future[bool]] + const NUM_WORKERS = 2 + var workers: array[NUM_WORKERS, Future[bool]] let progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress") - doneFut = Future[void].Raising([CancelledError]).init("lcmanDone") var numCompleted = 0 + success = false maxCompleted = workers.len proc handleFinishedWorker(future: pointer) = @@ -263,58 +263,33 @@ proc query[E]( let didProgress = cast[Future[bool]](future).read() if didProgress and not progressFut.finished: progressFut.complete() - except CancelledError: - if not progressFut.finished: - progressFut.cancelSoon() + success = true except CatchableError: discard finally: inc numCompleted if numCompleted == maxCompleted: - doneFut.complete() + progressFut.cancelSoon() - try: - # Start concurrent workers - for i in 0 ..< workers.len: - try: - workers[i] = self.workerTask(e, key) - workers[i].addCallback(handleFinishedWorker) - except CancelledError as exc: - raise exc - except CatchableError: - workers[i] = newFuture[bool]() - workers[i].complete(false) - - # Wait for any worker to report progress, or for all workers to finish + # Start concurrent workers + for i in 0 ..< workers.len: try: - discard await race(progressFut, doneFut) - except ValueError: - raiseAssert "race API invariant" - finally: - for i in 0 ..< maxCompleted: - if workers[i] == nil: - maxCompleted = i - if numCompleted == maxCompleted: - doneFut.complete() - break - if not workers[i].finished: - workers[i].cancelSoon() - while true: - try: - await allFutures(workers[0 ..< maxCompleted]) - break - except CancelledError: - continue - while true: - try: - await doneFut - break - except CancelledError: - continue + workers[i] = self.workerTask(e, key) + workers[i].addCallback(handleFinishedWorker) + except CancelledError as exc: + raise exc + except CatchableError: + workers[i] = newFuture[bool]() + workers[i].complete(false) + + # Wait for any worker to report progress, or for all workers to finish + waitFor progressFut + + # cancel all workers + for i in 0 ..< NUM_WORKERS: + workers[i].cancelSoon() - if not progressFut.finished: - progressFut.cancelSoon() - return progressFut.completed + return success template query[E]( self: LightClientManager, e: typedesc[E] @@ -358,7 +333,8 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = if not didProgress: debug "Re-attempting bootstrap download" await sleepAsync(chronos.seconds(2)) - continue + + continue # check and download sync committee updates if finalizedPeriod == optimisticPeriod and not self.isNextSyncCommitteeKnown(): diff --git a/nimbus_verified_proxy/libverifproxy/verifproxy.nim b/nimbus_verified_proxy/libverifproxy/verifproxy.nim index 47eb08a2a4..cab39ed901 100644 --- a/nimbus_verified_proxy/libverifproxy/verifproxy.nim +++ b/nimbus_verified_proxy/libverifproxy/verifproxy.nim @@ -41,7 +41,7 @@ proc runContext(ctx: ptr Context) {.thread.} = trustedBlockRoot: Eth2Digest.fromHex(jsonNode["TrustedBlockRoot"].getStr()), backendUrl: parseCmdArg(Web3Url, jsonNode["backendUrl"].getStr()), frontendUrl: parseCmdArg(Web3Url, jsonNode["frontendUrl"].getStr()), - lcEndpoint: jsonNode["lcEndpoint"].getStr(), + lcEndpoints: jsonNode["lcEndpoints"].getStr(), logLevel: jsonNode["LogLevel"].getStr(), logStdout: StdoutLogKind.Auto, dataDirFlag: none(OutDir), From 5b6e98c27a3baee744bfbac35df681be15d32c5b Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 22 Oct 2025 09:48:46 +0530 Subject: [PATCH 09/16] fix --- nimbus_verified_proxy/libverifproxy/verifproxy.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nimbus_verified_proxy/libverifproxy/verifproxy.nim b/nimbus_verified_proxy/libverifproxy/verifproxy.nim index cab39ed901..b17ffd556d 100644 --- a/nimbus_verified_proxy/libverifproxy/verifproxy.nim +++ b/nimbus_verified_proxy/libverifproxy/verifproxy.nim @@ -41,7 +41,7 @@ proc runContext(ctx: ptr Context) {.thread.} = trustedBlockRoot: Eth2Digest.fromHex(jsonNode["TrustedBlockRoot"].getStr()), backendUrl: parseCmdArg(Web3Url, jsonNode["backendUrl"].getStr()), frontendUrl: parseCmdArg(Web3Url, jsonNode["frontendUrl"].getStr()), - lcEndpoints: jsonNode["lcEndpoints"].getStr(), + lcEndpoints: parseCmdArg(UrlList, jsonNode["lcEndpoints"].getStr()), logLevel: jsonNode["LogLevel"].getStr(), logStdout: StdoutLogKind.Auto, dataDirFlag: none(OutDir), From bf129cf5bde1f0cc53cd5d3cad460c069a46cab8 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 22 Oct 2025 09:52:27 +0530 Subject: [PATCH 10/16] format --- nimbus_verified_proxy/lc/lc_manager.nim | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim index a3255c6690..acce8ab69e 100644 --- a/nimbus_verified_proxy/lc/lc_manager.nim +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -251,8 +251,7 @@ proc query[E]( const NUM_WORKERS = 2 var workers: array[NUM_WORKERS, Future[bool]] - let - progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress") + let progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress") var numCompleted = 0 success = false From b7373d7ae73dc42d0882d0667a0668021840049a Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 29 Oct 2025 13:15:59 +0530 Subject: [PATCH 11/16] reviews --- nimbus_verified_proxy/lc/lc.nim | 2 +- nimbus_verified_proxy/lc/lc_manager.nim | 75 +++++++++---------- .../{json_lc_backend.nim => lc_backend.nim} | 32 ++++---- .../nimbus_verified_proxy.nim | 17 +++-- .../nimbus_verified_proxy_conf.nim | 16 ++-- 5 files changed, 72 insertions(+), 70 deletions(-) rename nimbus_verified_proxy/{json_lc_backend.nim => lc_backend.nim} (79%) diff --git a/nimbus_verified_proxy/lc/lc.nim b/nimbus_verified_proxy/lc/lc.nim index a46f35af61..dd3fc0534f 100644 --- a/nimbus_verified_proxy/lc/lc.nim +++ b/nimbus_verified_proxy/lc/lc.nim @@ -132,7 +132,7 @@ proc new*( GENESIS_SLOT lightClient.manager = LightClientManager.init( - rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, finalityVerifier, + rng, cfg.timeParams, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier, isLightClientStoreInitialized, isNextSyncCommitteeKnown, getFinalizedSlot, getOptimisticSlot, getBeaconTime, ) diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim index acce8ab69e..4ffdc19494 100644 --- a/nimbus_verified_proxy/lc/lc_manager.nim +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -72,6 +72,7 @@ type LightClientManager* = object rng: ref HmacDrbgContext backend*: EthLCBackend + timeParams: TimeParams getTrustedBlockRoot: GetTrustedBlockRootCallback bootstrapVerifier: BootstrapVerifier updateVerifier: UpdateVerifier @@ -87,6 +88,7 @@ type func init*( T: type LightClientManager, rng: ref HmacDrbgContext, + timeParams: TimeParams, getTrustedBlockRoot: GetTrustedBlockRootCallback, bootstrapVerifier: BootstrapVerifier, updateVerifier: UpdateVerifier, @@ -101,6 +103,7 @@ func init*( ## Initialize light client manager. LightClientManager( rng: rng, + timeParams: timeParams, getTrustedBlockRoot: getTrustedBlockRoot, bootstrapVerifier: bootstrapVerifier, updateVerifier: updateVerifier, @@ -113,7 +116,7 @@ func init*( getBeaconTime: getBeaconTime, ) -# https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap proc doRequest( e: typedesc[Bootstrap], backend: EthLCBackend, reqId: uint64, blockRoot: Eth2Digest ): Future[NetRes[ForkedLightClientBootstrap]] {. @@ -121,7 +124,7 @@ proc doRequest( .} = backend.getLightClientBootstrap(reqId, blockRoot) -# https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-beta.1/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange proc doRequest( e: typedesc[UpdatesByRange], backend: EthLCBackend, @@ -139,7 +142,7 @@ proc doRequest( raise newException(ResponseError, e.error) return response -# https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate proc doRequest( e: typedesc[FinalityUpdate], backend: EthLCBackend, reqId: uint64 ): Future[NetRes[ForkedLightClientFinalityUpdate]] {. @@ -147,7 +150,7 @@ proc doRequest( .} = backend.getLightClientFinalityUpdate(reqId) -# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate proc doRequest( e: typedesc[OptimisticUpdate], backend: EthLCBackend, reqId: uint64 ): Future[NetRes[ForkedLightClientOptimisticUpdate]] {. @@ -170,6 +173,7 @@ template valueVerifier[E]( static: doAssert false +# NOTE: Do not export this iterator it is just for shorthand convenience iterator values(v: auto): auto = ## Local helper for `workerTask` to share the same implementation for both ## scalar and aggregate values, by treating scalars as 1-length aggregates. @@ -295,7 +299,7 @@ template query[E]( ): Future[bool].Raising([CancelledError]) = self.query(e, Nothing()) -# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.0/specs/altair/light-client/light-client.md#light-client-sync-process +# https://github.com/ethereum/consensus-specs/blob/v1.6.0-beta.1/specs/altair/light-client/light-client.md#light-client-sync-process proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = var downloadOptimistic = true @@ -303,83 +307,78 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = didOptimisticProgress = false didFinalityProgress = false + # try atleast twice + let + NUM_RETRIES = 2 + RETRY_TIMEOUT = chronos.seconds(int64(self.timeParams.SECONDS_PER_SLOT) div (NUM_RETRIES + 1)) + while true: let wallTime = self.getBeaconTime() - currentSlot = wallTime.slotOrZero() - currentEpoch = (currentSlot mod SLOTS_PER_EPOCH) - currentPeriod = currentSlot.sync_committee_period - finalizedSlot = self.getFinalizedSlot() - finalizedPeriod = finalizedSlot.sync_committee_period - finalizedEpoch = (finalizedSlot mod SLOTS_PER_EPOCH) - optimisticSlot = self.getOptimisticSlot() - optimisticPeriod = optimisticSlot.sync_committee_period - optimisitcEpoch = (optimisticSlot mod SLOTS_PER_EPOCH) + current = wallTime.slotOrZero(self.timeParams) + finalized = self.getFinalizedSlot() + optimistic = self.getOptimisticSlot() # Obtain bootstrap data once a trusted block root is supplied if not self.isLightClientStoreInitialized(): let trustedBlockRoot = self.getTrustedBlockRoot() - # reattempt bootstrap download in 2 seconds if trustedBlockRoot.isNone: debug "TrustedBlockRoot unavaialble re-attempting bootstrap download" - await sleepAsync(chronos.seconds(2)) + await sleepAsync(RETRY_TIMEOUT) continue let didProgress = await self.query(Bootstrap, trustedBlockRoot.get) - # reattempt bootstrap download in 2 seconds if not didProgress: debug "Re-attempting bootstrap download" - await sleepAsync(chronos.seconds(2)) + await sleepAsync(RETRY_TIMEOUT) continue # check and download sync committee updates - if finalizedPeriod == optimisticPeriod and not self.isNextSyncCommitteeKnown(): - if finalizedPeriod >= currentPeriod: + if finalized.sync_committee_period == optimistic.sync_committee_period and not self.isNextSyncCommitteeKnown(): + if finalized.sync_committee_period >= current.sync_committee_period: debug "Downloading light client sync committee updates", - start_period = finalizedPeriod, count = 1 + start_period = finalized.sync_committee_period, count = 1 discard await self.query( - UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(1)) + UpdatesByRange, (startPeriod: finalized.sync_committee_period, count: uint64(1)) ) else: let count = - min(currentPeriod - finalizedPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES) + min(current.sync_committee_period - finalized.sync_committee_period, MAX_REQUEST_LIGHT_CLIENT_UPDATES) debug "Downloading light client sync committee updates", - start_period = finalizedPeriod, count = count + start_period = finalized.sync_committee_period, count = count discard await self.query( - UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count)) + UpdatesByRange, (startPeriod: finalized.sync_committee_period, count: uint64(count)) ) - elif finalizedPeriod + 1 < currentPeriod: + elif finalized.sync_committee_period + 1 < current.sync_committee_period: let count = - min(currentPeriod - (finalizedPeriod + 1), MAX_REQUEST_LIGHT_CLIENT_UPDATES) + min(current.sync_committee_period - (finalized.sync_committee_period + 1), MAX_REQUEST_LIGHT_CLIENT_UPDATES) debug "Downloading light client sync committee updates", - start_period = finalizedPeriod, count = count + start_period = finalized.sync_committee_period, count = count discard await self.query( - UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count)) + UpdatesByRange, (startPeriod: finalized.sync_committee_period, count: uint64(count)) ) # check and download optimistic update - if optimisticSlot < currentSlot: - debug "Downloading light client optimistic updates", slot = currentSlot + if optimistic < current: + debug "Downloading light client optimistic updates", slot = current let didProgress = await self.query(OptimisticUpdate) if not didProgress: - # retry in 2 seconds - await sleepAsync(chronos.seconds(2)) + await sleepAsync(RETRY_TIMEOUT) continue # check and download finality update - if currentEpoch > finalizedEpoch + 2: - debug "Downloading light client finality updates", slot = currentSlot + if current.epoch > finalized.epoch + 2: + debug "Downloading light client finality updates", slot = current let didProgress = await self.query(FinalityUpdate) if not didProgress: - # retry in two seconds - await sleepAsync(chronos.seconds(2)) + await sleepAsync(RETRY_TIMEOUT) continue # check for updates every slot - await sleepAsync(chronos.seconds(int64(SECONDS_PER_SLOT))) + await sleepAsync(chronos.seconds(int64(self.timeParams.SECONDS_PER_SLOT))) proc start*(self: var LightClientManager) = ## Start light client manager's loop. diff --git a/nimbus_verified_proxy/json_lc_backend.nim b/nimbus_verified_proxy/lc_backend.nim similarity index 79% rename from nimbus_verified_proxy/json_lc_backend.nim rename to nimbus_verified_proxy/lc_backend.nim index 65223ca721..e5d103323f 100644 --- a/nimbus_verified_proxy/json_lc_backend.nim +++ b/nimbus_verified_proxy/lc_backend.nim @@ -19,29 +19,29 @@ import ./nimbus_verified_proxy_conf logScope: - topics = "SSZLCRestClient" + topics = "LCRestClientPool" const MaxMessageBodyBytes* = 128 * 1024 * 1024 # 128 MB (JSON encoded) BASE_URL = "/eth/v1/beacon/light_client" type - LCRestPeer = ref object + LCRestClient = ref object score: int restClient: RestClientRef - LCRestClient* = ref object + LCRestClientPool* = ref object cfg: RuntimeConfig forkDigests: ref ForkDigests - peers: seq[LCRestPeer] + peers: seq[LCRestClient] urls: seq[string] func new*( - T: type LCRestClient, cfg: RuntimeConfig, forkDigests: ref ForkDigests -): LCRestClient = - LCRestClient(cfg: cfg, forkDigests: forkDigests, peers: @[]) + T: type LCRestClientPool, cfg: RuntimeConfig, forkDigests: ref ForkDigests +): LCRestClientPool = + LCRestClientPool(cfg: cfg, forkDigests: forkDigests, peers: @[]) -proc addEndpoints*(client: LCRestClient, urlList: UrlList) {.raises: [ValueError].} = +proc addEndpoints*(client: LCRestClientPool, urlList: UrlList) {.raises: [ValueError].} = for endpoint in urlList.urls: if endpoint in client.urls: continue @@ -49,23 +49,23 @@ proc addEndpoints*(client: LCRestClient, urlList: UrlList) {.raises: [ValueError let restClient = RestClientRef.new(endpoint).valueOr: raise newException(ValueError, $error) - client.peers.add(LCRestPeer(score: 0, restClient: restClient)) + client.peers.add(LCRestClient(score: 0, restClient: restClient)) client.urls.add(endpoint) -proc closeAll*(client: LCRestClient) {.async: (raises: []).} = +proc closeAll*(client: LCRestClientPool) {.async: (raises: []).} = for peer in client.peers: await peer.restClient.closeWait() client.peers.setLen(0) client.urls.setLen(0) -proc getEthLCBackend*(client: LCRestClient): EthLCBackend = +proc getEthLCBackend*(client: LCRestClientPool): EthLCBackend = let getLCBootstrapProc = proc( reqId: uint64, blockRoot: Eth2Digest ): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} = let - peer = client.peers[reqId mod uint64(client.peers.len)] + peer = client.peers[reqId mod client.peers.lenu64] res = try: await peer.restClient.getLightClientBootstrap( @@ -80,7 +80,7 @@ proc getEthLCBackend*(client: LCRestClient): EthLCBackend = reqId: uint64, startPeriod: SyncCommitteePeriod, count: uint64 ): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} = let - peer = client.peers[reqId mod uint64(client.peers.len)] + peer = client.peers[reqId mod client.peers.lenu64] res = try: await peer.restClient.getLightClientUpdatesByRange( @@ -97,7 +97,7 @@ proc getEthLCBackend*(client: LCRestClient): EthLCBackend = async: (raises: [CancelledError]) .} = let - peer = client.peers[reqId mod uint64(client.peers.len)] + peer = client.peers[reqId mod client.peers.lenu64] res = try: await peer.restClient.getLightClientFinalityUpdate( @@ -114,7 +114,7 @@ proc getEthLCBackend*(client: LCRestClient): EthLCBackend = async: (raises: [CancelledError]) .} = let - peer = client.peers[reqId mod uint64(client.peers.len)] + peer = client.peers[reqId mod client.peers.lenu64] res = try: await peer.restClient.getLightClientOptimisticUpdate( @@ -126,7 +126,7 @@ proc getEthLCBackend*(client: LCRestClient): EthLCBackend = ok(res) updateScoreProc = proc(reqId: uint64, value: int) = - let peer = client.peers[reqId mod uint64(client.peers.len)] + let peer = client.peers[reqId mod client.peers.lenu64] peer.score += value EthLCBackend( diff --git a/nimbus_verified_proxy/nimbus_verified_proxy.nim b/nimbus_verified_proxy/nimbus_verified_proxy.nim index 140d110776..4c56c92826 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy.nim @@ -26,7 +26,7 @@ import ./engine/utils, ./engine/types, ./lc/lc, - ./json_lc_backend, + ./lc_backend, ./json_rpc_backend, ./json_rpc_frontend, ../execution_chain/version_info @@ -158,11 +158,11 @@ proc run*( ) # REST client for json LC updates - lcRestClient = LCRestClient.new(cfg, forkDigests) + lcRestClientPool = LCRestClientPool.new(cfg, forkDigests) # add endpoints to the client - lcRestClient.addEndpoints(config.lcEndpoints) - lightClient.setBackend(lcRestClient.getEthLCBackend()) + lcRestClientPool.addEndpoints(config.beaconApiUrls) + lightClient.setBackend(lcRestClientPool.getEthLCBackend()) # verify chain id that the proxy is connected to waitFor engine.verifyChaindId() @@ -212,14 +212,17 @@ proc run*( # start the light client lightClient.start() + proc stopProxy() {.async: (raises: [CancelledError]).} = + await lcRestClientPool.closeAll() + await jsonRpcClient.stop() + await jsonRpcServer.stop() + # run an infinite loop and wait for a stop signal while true: poll() if ctx != nil and ctx.stop: + waitFor stopProxy() # Cleanup - waitFor lcRestClient.closeAll() - waitFor jsonRpcClient.stop() - waitFor jsonRpcServer.stop() ctx.cleanup() # Notify client that cleanup is finished ctx.onHeader(nil, 2) diff --git a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim index b549eb1e04..f4e460c72a 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim @@ -9,11 +9,11 @@ import std/os, + std/strutils, json_rpc/rpcproxy, # must be early (compilation annoyance) json_serialization/std/net, beacon_chain/conf_light_client, - beacon_chain/nimbus_binary_common, - std/strutils + beacon_chain/nimbus_binary_common export net @@ -126,9 +126,9 @@ type VerifiedProxyConf* = object # (Untrusted) web3 provider # No default - Needs to be provided by the user - lcEndpoints* {. - desc: "command seperated URLs of the light client data provider", - name: "lc-endpoints" + beaconApiUrls* {. + desc: "command separated URLs of the light client data provider", + name: "external-beacon-api-urls" .}: UrlList #!fmt: on @@ -156,15 +156,15 @@ proc parseCmdArg*(T: type UrlList, p: string): T {.raises: [ValueError].} = normalizedScheme = parsed.scheme.toLowerAscii() if not (normalizedScheme == "http" or normalizedScheme == "https"): - raise newException(ValueError, "Light Client Endpoint should be a http(s) url") + raise newException(ValueError, "Light Client Endpoint should be a http(s) URL") UrlList(urls: urls) proc completeCmdArg*(T: type Web3Url, val: string): seq[string] = - return @[] + @[] proc completeCmdArg*(T: type UrlList, val: string): seq[string] = - return @[] + @[] # TODO: Cannot use ClientConfig in VerifiedProxyConf due to the fact that # it contain `set[TLSFlags]` which does not have proper toml serialization From f1d1d81082f9e7aa388d8466eaa7333fd0cc1572 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 29 Oct 2025 13:28:05 +0530 Subject: [PATCH 12/16] change casing and fix lib --- nimbus_verified_proxy/libverifproxy/verifproxy.nim | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nimbus_verified_proxy/libverifproxy/verifproxy.nim b/nimbus_verified_proxy/libverifproxy/verifproxy.nim index b17ffd556d..2bb9a3cdd1 100644 --- a/nimbus_verified_proxy/libverifproxy/verifproxy.nim +++ b/nimbus_verified_proxy/libverifproxy/verifproxy.nim @@ -35,10 +35,9 @@ proc runContext(ctx: ptr Context) {.thread.} = try: let jsonNode = parseJson(str) - let rpcAddr = jsonNode["RpcAddress"].getStr() let myConfig = VerifiedProxyConf( - eth2Network: some(jsonNode["Eth2Network"].getStr()), - trustedBlockRoot: Eth2Digest.fromHex(jsonNode["TrustedBlockRoot"].getStr()), + eth2Network: some(jsonNode["eth2Network"].getStr()), + trustedBlockRoot: Eth2Digest.fromHex(jsonNode["trustedBlockRoot"].getStr()), backendUrl: parseCmdArg(Web3Url, jsonNode["backendUrl"].getStr()), frontendUrl: parseCmdArg(Web3Url, jsonNode["frontendUrl"].getStr()), lcEndpoints: parseCmdArg(UrlList, jsonNode["lcEndpoints"].getStr()), From 8d4605379a07bc8f6ca346749af356493176f554 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 29 Oct 2025 13:29:31 +0530 Subject: [PATCH 13/16] formatting --- nimbus_verified_proxy/lc/lc.nim | 6 ++--- nimbus_verified_proxy/lc/lc_manager.nim | 29 ++++++++++++++++--------- nimbus_verified_proxy/lc_backend.nim | 4 +++- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/nimbus_verified_proxy/lc/lc.nim b/nimbus_verified_proxy/lc/lc.nim index dd3fc0534f..caf6e44006 100644 --- a/nimbus_verified_proxy/lc/lc.nim +++ b/nimbus_verified_proxy/lc/lc.nim @@ -132,9 +132,9 @@ proc new*( GENESIS_SLOT lightClient.manager = LightClientManager.init( - rng, cfg.timeParams, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, finalityVerifier, - optimisticVerifier, isLightClientStoreInitialized, isNextSyncCommitteeKnown, - getFinalizedSlot, getOptimisticSlot, getBeaconTime, + rng, cfg.timeParams, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, + finalityVerifier, optimisticVerifier, isLightClientStoreInitialized, + isNextSyncCommitteeKnown, getFinalizedSlot, getOptimisticSlot, getBeaconTime, ) lightClient diff --git a/nimbus_verified_proxy/lc/lc_manager.nim b/nimbus_verified_proxy/lc/lc_manager.nim index 4ffdc19494..7b83543588 100644 --- a/nimbus_verified_proxy/lc/lc_manager.nim +++ b/nimbus_verified_proxy/lc/lc_manager.nim @@ -308,9 +308,10 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = didFinalityProgress = false # try atleast twice - let + let NUM_RETRIES = 2 - RETRY_TIMEOUT = chronos.seconds(int64(self.timeParams.SECONDS_PER_SLOT) div (NUM_RETRIES + 1)) + RETRY_TIMEOUT = + chronos.seconds(int64(self.timeParams.SECONDS_PER_SLOT) div (NUM_RETRIES + 1)) while true: let @@ -337,28 +338,36 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = continue # check and download sync committee updates - if finalized.sync_committee_period == optimistic.sync_committee_period and not self.isNextSyncCommitteeKnown(): + if finalized.sync_committee_period == optimistic.sync_committee_period and + not self.isNextSyncCommitteeKnown(): if finalized.sync_committee_period >= current.sync_committee_period: debug "Downloading light client sync committee updates", start_period = finalized.sync_committee_period, count = 1 discard await self.query( - UpdatesByRange, (startPeriod: finalized.sync_committee_period, count: uint64(1)) + UpdatesByRange, + (startPeriod: finalized.sync_committee_period, count: uint64(1)), ) else: - let count = - min(current.sync_committee_period - finalized.sync_committee_period, MAX_REQUEST_LIGHT_CLIENT_UPDATES) + let count = min( + current.sync_committee_period - finalized.sync_committee_period, + MAX_REQUEST_LIGHT_CLIENT_UPDATES, + ) debug "Downloading light client sync committee updates", start_period = finalized.sync_committee_period, count = count discard await self.query( - UpdatesByRange, (startPeriod: finalized.sync_committee_period, count: uint64(count)) + UpdatesByRange, + (startPeriod: finalized.sync_committee_period, count: uint64(count)), ) elif finalized.sync_committee_period + 1 < current.sync_committee_period: - let count = - min(current.sync_committee_period - (finalized.sync_committee_period + 1), MAX_REQUEST_LIGHT_CLIENT_UPDATES) + let count = min( + current.sync_committee_period - (finalized.sync_committee_period + 1), + MAX_REQUEST_LIGHT_CLIENT_UPDATES, + ) debug "Downloading light client sync committee updates", start_period = finalized.sync_committee_period, count = count discard await self.query( - UpdatesByRange, (startPeriod: finalized.sync_committee_period, count: uint64(count)) + UpdatesByRange, + (startPeriod: finalized.sync_committee_period, count: uint64(count)), ) # check and download optimistic update diff --git a/nimbus_verified_proxy/lc_backend.nim b/nimbus_verified_proxy/lc_backend.nim index e5d103323f..fe1bf9f186 100644 --- a/nimbus_verified_proxy/lc_backend.nim +++ b/nimbus_verified_proxy/lc_backend.nim @@ -41,7 +41,9 @@ func new*( ): LCRestClientPool = LCRestClientPool(cfg: cfg, forkDigests: forkDigests, peers: @[]) -proc addEndpoints*(client: LCRestClientPool, urlList: UrlList) {.raises: [ValueError].} = +proc addEndpoints*( + client: LCRestClientPool, urlList: UrlList +) {.raises: [ValueError].} = for endpoint in urlList.urls: if endpoint in client.urls: continue From f1515f69cab201ed3695adba71bd2bf2ad9af532 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 29 Oct 2025 13:37:20 +0530 Subject: [PATCH 14/16] reviews --- nimbus_verified_proxy/lc/lc.nim | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/nimbus_verified_proxy/lc/lc.nim b/nimbus_verified_proxy/lc/lc.nim index caf6e44006..711dc3ec60 100644 --- a/nimbus_verified_proxy/lc/lc.nim +++ b/nimbus_verified_proxy/lc/lc.nim @@ -77,11 +77,16 @@ proc new*( if lightClient.onOptimisticHeader != nil: lightClient.onOptimisticHeader(lightClient, lightClient.getOptimisticHeader) + const + dumpEnabled = false + dumpDirInvalid = "." + dumpDirIncoming = "." + # initialize without dumping lightClient.processor = LightClientProcessor.new( - false, ".", ".", cfg, genesis_validators_root, finalizationMode, lightClient.store, - getBeaconTime, getTrustedBlockRoot, onStoreInitialized, onFinalizedHeader, - onOptimisticHeader, + dumpEnabled, dumpDirInvalid, dumpDirIncoming, cfg, genesis_validators_root, + finalizationMode, lightClient.store, getBeaconTime, getTrustedBlockRoot, + onStoreInitialized, onFinalizedHeader, onOptimisticHeader, ) proc lightClientVerifier( From 43cbbba8c1d4fade472553fb74a54829ed080bd3 Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 29 Oct 2025 13:53:17 +0530 Subject: [PATCH 15/16] fix --- nimbus_verified_proxy/libverifproxy/verifproxy.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nimbus_verified_proxy/libverifproxy/verifproxy.nim b/nimbus_verified_proxy/libverifproxy/verifproxy.nim index 2bb9a3cdd1..20f3452500 100644 --- a/nimbus_verified_proxy/libverifproxy/verifproxy.nim +++ b/nimbus_verified_proxy/libverifproxy/verifproxy.nim @@ -40,8 +40,8 @@ proc runContext(ctx: ptr Context) {.thread.} = trustedBlockRoot: Eth2Digest.fromHex(jsonNode["trustedBlockRoot"].getStr()), backendUrl: parseCmdArg(Web3Url, jsonNode["backendUrl"].getStr()), frontendUrl: parseCmdArg(Web3Url, jsonNode["frontendUrl"].getStr()), - lcEndpoints: parseCmdArg(UrlList, jsonNode["lcEndpoints"].getStr()), - logLevel: jsonNode["LogLevel"].getStr(), + beaconApiUrls: parseCmdArg(UrlList, jsonNode["beaconApiUrls"].getStr()), + logLevel: jsonNode["logLevel"].getStr(), logStdout: StdoutLogKind.Auto, dataDirFlag: none(OutDir), ) From 1aa5db7d341053919c39b59e196e5558c21bc08f Mon Sep 17 00:00:00 2001 From: chirag-parmar Date: Wed, 29 Oct 2025 15:03:14 +0530 Subject: [PATCH 16/16] reraise cancelled --- nimbus_verified_proxy/lc_backend.nim | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/nimbus_verified_proxy/lc_backend.nim b/nimbus_verified_proxy/lc_backend.nim index fe1bf9f186..cec2beb91e 100644 --- a/nimbus_verified_proxy/lc_backend.nim +++ b/nimbus_verified_proxy/lc_backend.nim @@ -73,8 +73,10 @@ proc getEthLCBackend*(client: LCRestClientPool): EthLCBackend = await peer.restClient.getLightClientBootstrap( blockRoot, client.cfg, client.forkDigests ) + except CancelledError as e: + raise e except CatchableError as e: - raise newException(CancelledError, e.msg) + err(e.msg) ok(res) @@ -88,8 +90,10 @@ proc getEthLCBackend*(client: LCRestClientPool): EthLCBackend = await peer.restClient.getLightClientUpdatesByRange( startPeriod, count, client.cfg, client.forkDigests ) + except CancelledError as e: + raise e except CatchableError as e: - raise newException(CancelledError, e.msg) + err(e.msg) ok(res) @@ -105,8 +109,10 @@ proc getEthLCBackend*(client: LCRestClientPool): EthLCBackend = await peer.restClient.getLightClientFinalityUpdate( client.cfg, client.forkDigests ) + except CancelledError as e: + raise e except CatchableError as e: - raise newException(CancelledError, e.msg) + err(e.msg) ok(res) @@ -122,8 +128,10 @@ proc getEthLCBackend*(client: LCRestClientPool): EthLCBackend = await peer.restClient.getLightClientOptimisticUpdate( client.cfg, client.forkDigests ) + except CancelledError as e: + raise e except CatchableError as e: - raise newException(CancelledError, e.msg) + err(e.msg) ok(res)