Skip to content

Commit e90a3a3

Browse files
JonathanOppenheimeralarso16StephenButtolph
authored
sync: coreth PR #1380: fix: Flake in txindexer test (#1862)
Signed-off-by: Austin Larson <[email protected]> Signed-off-by: Jonathan Oppenheimer <[email protected]> Co-authored-by: Austin Larson <[email protected]> Co-authored-by: Stephen Buttolph <[email protected]>
1 parent 0d227ec commit e90a3a3

File tree

2 files changed

+32
-64
lines changed

2 files changed

+32
-64
lines changed

core/txindexer.go

Lines changed: 32 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,6 @@ import (
3636
"github.com/ava-labs/libevm/log"
3737
)
3838

39-
// TxIndexProgress is the struct describing the progress for transaction indexing.
40-
type TxIndexProgress struct {
41-
Indexed uint64 // number of blocks whose transactions are indexed
42-
Remaining uint64 // number of blocks whose transactions are not indexed yet
43-
}
44-
45-
// Done returns an indicator if the transaction indexing is finished.
46-
func (progress TxIndexProgress) Done() bool {
47-
return progress.Remaining == 0
48-
}
49-
5039
// txIndexer is the module responsible for maintaining transaction indexes
5140
// according to the configured indexing range by users.
5241
type txIndexer struct {
@@ -55,24 +44,22 @@ type txIndexer struct {
5544
// * 0: means the entire chain should be indexed
5645
// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
5746
// and all others shouldn't.
58-
limit uint64
59-
db ethdb.Database
60-
progress chan chan TxIndexProgress
61-
term chan chan struct{}
62-
closed chan struct{}
47+
limit uint64
48+
db ethdb.Database
49+
term chan chan struct{}
50+
closed chan struct{}
6351

6452
chain *BlockChain
6553
}
6654

6755
// newTxIndexer initializes the transaction indexer.
6856
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
6957
indexer := &txIndexer{
70-
limit: limit,
71-
db: chain.db,
72-
progress: make(chan chan TxIndexProgress),
73-
term: make(chan chan struct{}),
74-
closed: make(chan struct{}),
75-
chain: chain,
58+
limit: limit,
59+
db: chain.db,
60+
term: make(chan chan struct{}),
61+
closed: make(chan struct{}),
62+
chain: chain,
7663
}
7764
chain.wg.Add(1)
7865
go func() {
@@ -125,9 +112,10 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
125112
defer close(indexer.closed)
126113
// Listening to chain events and manipulate the transaction indexes.
127114
var (
128-
stop chan struct{} // Non-nil if background routine is active.
129-
done chan struct{} // Non-nil if background routine is active.
130-
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
115+
stop chan struct{} // Non-nil if background routine is active.
116+
done chan struct{} // Non-nil if background routine is active.
117+
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
118+
runningHead uint64 // The head number being processed in background.
131119

132120
headCh = make(chan ChainEvent)
133121
sub = chain.SubscribeChainAcceptedEvent(headCh)
@@ -138,18 +126,21 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
138126
}
139127
defer sub.Unsubscribe()
140128

129+
// startRun launches the background unindexing task.
130+
startRun := func(newHead uint64) {
131+
stop = make(chan struct{})
132+
done = make(chan struct{})
133+
runningHead = newHead
134+
indexer.chain.wg.Add(1)
135+
go indexer.lockedRun(runningHead, stop, done)
136+
}
137+
141138
log.Info("Initialized transaction unindexer", "limit", indexer.limit)
142139

143140
// Launch the initial processing if chain is not empty (head != genesis).
144141
// This step is useful in these scenarios that chain has no progress.
145142
if head := indexer.chain.CurrentBlock(); head != nil && head.Number.Uint64() > indexer.limit {
146-
stop = make(chan struct{})
147-
done = make(chan struct{})
148-
lastHead = head.Number.Uint64()
149-
indexer.chain.wg.Add(1)
150-
go func() {
151-
indexer.lockedRun(head.Number.Uint64(), stop, done)
152-
}()
143+
startRun(head.Number.Uint64())
153144
}
154145
for {
155146
select {
@@ -159,20 +150,20 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
159150
break
160151
}
161152

153+
// If no background task is running, start a new one.
154+
// We cannot block on the subscription channel because it can
155+
// cause a fatal error.
162156
if done == nil {
163-
stop = make(chan struct{})
164-
done = make(chan struct{})
165-
indexer.chain.wg.Add(1)
166-
go func() {
167-
indexer.lockedRun(headNum, stop, done)
168-
}()
157+
startRun(headNum)
169158
}
170-
lastHead = head.Block.NumberU64()
159+
lastHead = headNum
171160
case <-done:
172161
stop = nil
173162
done = nil
174-
case ch := <-indexer.progress:
175-
ch <- indexer.report(lastHead, rawdb.ReadTxIndexTail(indexer.db))
163+
// If there is a new head arrived during the last run, start a new one.
164+
if runningHead < lastHead {
165+
startRun(lastHead)
166+
}
176167
case ch := <-indexer.term:
177168
if stop != nil {
178169
close(stop)
@@ -187,28 +178,6 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
187178
}
188179
}
189180

190-
// report returns the tx indexing progress.
191-
func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
192-
total := indexer.limit
193-
if indexer.limit == 0 || total > head {
194-
total = head + 1 // genesis included
195-
}
196-
var indexed uint64
197-
if tail != nil {
198-
indexed = head - *tail + 1
199-
}
200-
// The value of indexed might be larger than total if some blocks need
201-
// to be unindexed, avoiding a negative remaining.
202-
var remaining uint64
203-
if indexed < total {
204-
remaining = total - indexed
205-
}
206-
return TxIndexProgress{
207-
Indexed: indexed,
208-
Remaining: remaining,
209-
}
210-
}
211-
212181
// close shutdown the indexer. Safe to be called for multiple times.
213182
func (indexer *txIndexer) close() {
214183
ch := make(chan struct{})

scripts/known_flakes.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
TestChainIndexerWithChildren
22
TestClientCancelWebsocket
33
TestClientWebsocketLargeMessage
4-
TestTransactionSkipIndexing
54
TestVMShutdownWhileSyncing
65
TestWebsocketLargeRead

0 commit comments

Comments
 (0)