Skip to content

Commit c65752b

Browse files
authored
sync: coreth PR #1323: wait for tx pool event loop (#1871)
Signed-off-by: Yacov Manevich <[email protected]>
1 parent e90a3a3 commit c65752b

File tree

3 files changed

+223
-54
lines changed

3 files changed

+223
-54
lines changed

plugin/evm/block_builder.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type blockBuilder struct {
4747
// but at least after a minimum delay of minBlockBuildingRetryDelay.
4848
lastBuildParentHash common.Hash
4949
lastBuildTime time.Time
50+
51+
chainHeadHash common.Hash
52+
mempoolHeadHash common.Hash
5053
}
5154

5255
func (vm *VM) NewBlockBuilder() *blockBuilder {
@@ -94,9 +97,13 @@ func (b *blockBuilder) awaitSubmittedTxs() {
9497
txSubmitChan := make(chan core.NewTxsEvent)
9598
b.txPool.SubscribeTransactions(txSubmitChan, true)
9699

100+
events := make(chan core.NewTxPoolReorgEvent)
101+
sub := b.txPool.SubscribeNewReorgEvent(events)
102+
97103
b.shutdownWg.Add(1)
98104
go b.ctx.Log.RecoverAndPanic(func() {
99105
defer b.shutdownWg.Done()
106+
defer sub.Unsubscribe()
100107

101108
for {
102109
select {
@@ -105,6 +112,12 @@ func (b *blockBuilder) awaitSubmittedTxs() {
105112
b.signalCanBuild()
106113
case <-b.shutdownChan:
107114
return
115+
case event := <-events:
116+
if event.Head == nil || event.Head.Number == nil {
117+
log.Warn("nil head or block number in tx pool reorg event")
118+
continue
119+
}
120+
b.setMempoolHeadHash(event.Head.Hash())
108121
}
109122
}
110123
})
@@ -143,7 +156,7 @@ func (b *blockBuilder) waitForEvent(ctx context.Context, currentHeader *types.He
143156
func (b *blockBuilder) waitForNeedToBuild(ctx context.Context) (time.Time, common.Hash, error) {
144157
b.buildBlockLock.Lock()
145158
defer b.buildBlockLock.Unlock()
146-
for !b.needToBuild() {
159+
for !b.needToBuild() || b.pendingPoolUpdate() {
147160
if err := b.pendingSignal.Wait(ctx); err != nil {
148161
return time.Time{}, common.Hash{}, err
149162
}
@@ -186,3 +199,33 @@ func minNextBlockTime(parent *types.Header) time.Time {
186199
requiredDelay := time.Duration(acp226DelayExcess.Delay()) * time.Millisecond
187200
return parentTime.Add(requiredDelay)
188201
}
202+
203+
func (b *blockBuilder) setChainHeadHash(hash common.Hash) {
204+
b.buildBlockLock.Lock()
205+
defer b.buildBlockLock.Unlock()
206+
207+
b.chainHeadHash = hash
208+
209+
if b.pendingPoolUpdate() {
210+
return
211+
}
212+
213+
b.pendingSignal.Broadcast()
214+
}
215+
216+
func (b *blockBuilder) setMempoolHeadHash(hash common.Hash) {
217+
b.buildBlockLock.Lock()
218+
defer b.buildBlockLock.Unlock()
219+
220+
b.mempoolHeadHash = hash
221+
222+
if b.pendingPoolUpdate() {
223+
return
224+
}
225+
226+
b.pendingSignal.Broadcast()
227+
}
228+
229+
func (b *blockBuilder) pendingPoolUpdate() bool {
230+
return b.chainHeadHash != b.mempoolHeadHash
231+
}

plugin/evm/vm.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,8 @@ func (vm *VM) onNormalOperationsStarted() error {
891891
// NOTE: gossip network must be initialized first otherwise ETH tx gossip will not work.
892892
vm.builderLock.Lock()
893893
vm.builder = vm.NewBlockBuilder()
894+
vm.builder.setChainHeadHash(vm.blockChain.CurrentBlock().Hash())
895+
vm.builder.setMempoolHeadHash(vm.blockChain.CurrentBlock().Hash())
894896
vm.builder.awaitSubmittedTxs()
895897
vm.builderLock.Unlock()
896898

@@ -1130,7 +1132,14 @@ func (vm *VM) SetPreference(ctx context.Context, blkID ids.ID) error {
11301132
return fmt.Errorf("failed to set preference to %s: %w", blkID, err)
11311133
}
11321134

1133-
return vm.blockChain.SetPreference(block.(*wrappedBlock).ethBlock)
1135+
wb, isWrappedBlock := block.(*wrappedBlock)
1136+
if !isWrappedBlock {
1137+
return fmt.Errorf("expected block %s to be of type *wrappedBlock but got %T", blkID, block)
1138+
}
1139+
1140+
vm.setPendingBlock(wb.GetEthBlock().Hash())
1141+
1142+
return vm.blockChain.SetPreference(wb.ethBlock)
11341143
}
11351144

11361145
// GetBlockIDAtHeight returns the canonical block at [height].
@@ -1370,6 +1379,17 @@ func attachEthService(handler *rpc.Server, apis []rpc.API, names []string) error
13701379
return nil
13711380
}
13721381

1382+
func (vm *VM) setPendingBlock(hash common.Hash) {
1383+
vm.builderLock.Lock()
1384+
defer vm.builderLock.Unlock()
1385+
1386+
if vm.builder == nil {
1387+
return
1388+
}
1389+
1390+
vm.builder.setChainHeadHash(hash)
1391+
}
1392+
13731393
func (vm *VM) Connected(ctx context.Context, nodeID ids.NodeID, version *version.Application) error {
13741394
vm.vmLock.Lock()
13751395
defer vm.vmLock.Unlock()

0 commit comments

Comments
 (0)