Skip to content

Commit 5369d7a

Browse files
committed
Fix: NBX would sometimes stays stuck in case of reorg (Fix #461 #409)
1 parent e83cfdb commit 5369d7a

File tree

2 files changed

+100
-80
lines changed

2 files changed

+100
-80
lines changed

NBXplorer.Tests/UnitTest1.cs

+30
Original file line numberDiff line numberDiff line change
@@ -4501,5 +4501,35 @@ public async Task CanUseRPCProxy(Backend backend)
45014501
await tester.Client.RPCClient.GetTxOutAsync(uint256.One, 0);
45024502
}
45034503
}
4504+
4505+
4506+
[Fact]
4507+
public async Task DoNotHangDuringReorg()
4508+
{
4509+
using var tester = ServerTester.Create(Backend.Postgres);
4510+
var wallet = await tester.Client.GenerateWalletAsync(new GenerateWalletRequest());
4511+
var addr = await tester.Client.GetUnusedAsync(wallet.DerivationScheme, DerivationFeature.Deposit);
4512+
var txId = tester.SendToAddress(addr.Address, Money.Coins(1.0m));
4513+
tester.Notifications.WaitForTransaction(wallet.DerivationScheme, txId);
4514+
var blocks = await tester.RPC.GenerateAsync(4);
4515+
for (int i = 0; i < blocks.Length; i++)
4516+
{
4517+
Logs.Tester.LogInformation($"Chain1: [{i}]: {blocks[i]}");
4518+
}
4519+
tester.Notifications.WaitForBlocks(blocks[^1]);
4520+
Logs.Tester.LogInformation("Invalidate the first block which confirmed the transaction " + blocks[0]);
4521+
tester.RPC.InvalidateBlock(blocks[0]);
4522+
var blocks2 = await tester.RPC.GenerateAsync(3);
4523+
for (int i = 0; i < blocks2.Length; i++)
4524+
{
4525+
Logs.Tester.LogInformation($"Chain2: [{i}]: {blocks2[i]}");
4526+
}
4527+
tester.Notifications.WaitForBlocks(blocks2[^1]);
4528+
Logs.Tester.LogInformation("Reconsider the block " + blocks[0]);
4529+
tester.RPC.SendCommand("reconsiderblock", blocks[0]);
4530+
4531+
Logs.Tester.LogInformation($"Waiting for the first chain to be processed again");
4532+
tester.Notifications.WaitForBlocks(blocks[^1]);
4533+
}
45044534
}
45054535
}

NBXplorer/Backends/Postgres/PostgresIndexers.cs

+70-80
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ public PostgresIndexer(
4646
CancellationTokenSource cts;
4747
Task _indexerLoop;
4848
Task _watchdogLoop;
49-
Node _Node;
50-
Channel<object> _Channel;
51-
Channel<Block> _DownloadedBlocks;
5249

5350
// This one will check if the indexer is "stuck" and disconnect the node if it is the case
5451
async Task WatchdogLoop()
@@ -58,14 +55,14 @@ async Task WatchdogLoop()
5855
try
5956
{
6057
await Task.Delay(TimeSpan.FromMinutes(5.0), cancellationToken);
61-
var height = await SeemsStuck(cancellationToken);
62-
if (height is null)
58+
var lastBlock = await SeemsStuck(cancellationToken);
59+
if (lastBlock is null)
6360
goto wait;
6461
await Task.Delay(TimeSpan.FromMinutes(2.0), cancellationToken);
65-
var height2 = await SeemsStuck(cancellationToken);
66-
if (height != height2)
62+
var lastBlock2 = await SeemsStuck(cancellationToken);
63+
if (lastBlock != lastBlock2)
6764
goto wait;
68-
_Node?.DisconnectAsync($"Sync seems stuck at height {height.Value}, restarting the connection.");
65+
_Connection?.Dispose($"Sync seems stuck after block {lastBlock.Hash} ({lastBlock.Hash}), restarting the connection.");
6966
goto wait;
7067
}
7168
catch when (cts.Token.IsCancellationRequested)
@@ -80,18 +77,17 @@ async Task WatchdogLoop()
8077
end:;
8178
}
8279

83-
async Task<long?> SeemsStuck(CancellationToken cancellationToken)
80+
async Task<SlimChainedBlock> SeemsStuck(CancellationToken cancellationToken)
8481
{
8582
if (State is not (BitcoinDWaiterState.NBXplorerSynching or BitcoinDWaiterState.Ready) ||
86-
SyncHeight is not long syncHeight ||
83+
lastIndexedBlock is not { } lastBlock ||
8784
GetConnectedClient() is not RPCClient rpc)
8885
{
8986
return null;
9087
}
88+
9189
var blockchainInfo = await rpc.GetBlockchainInfoAsyncEx(cancellationToken);
92-
if (Math.Min(blockchainInfo.Headers, blockchainInfo.Blocks) > syncHeight)
93-
return syncHeight;
94-
return null;
90+
return blockchainInfo.BestBlockHash != lastBlock.Hash ? lastBlock : null;
9591
}
9692

9793
async Task IndexerLoop()
@@ -121,24 +117,53 @@ async Task IndexerLoop()
121117
}
122118
}
123119

120+
class Connection : IDisposable
121+
{
122+
public Channel<Object> Events;
123+
public Channel<Block> Blocks;
124+
public Node Node;
125+
public Connection(Node node)
126+
{
127+
Node = node;
128+
Events = Channel.CreateUnbounded<object>(new() { AllowSynchronousContinuations = false });
129+
Blocks = Channel.CreateUnbounded<Block>(new() { AllowSynchronousContinuations = false });
130+
}
131+
bool _Disposed = false;
132+
133+
public void Dispose()
134+
{
135+
Dispose(null);
136+
}
137+
public void Dispose(string reason)
138+
{
139+
if (_Disposed)
140+
return;
141+
Node.DisconnectAsync(reason);
142+
Events.Writer.TryComplete();
143+
Blocks.Writer.TryComplete();
144+
_Disposed = true;
145+
}
146+
}
147+
Connection _Connection;
124148
private async Task IndexerLoopCore(CancellationToken token)
125149
{
126-
await ConnectNode(token, true);
127-
await foreach (var item in _Channel.Reader.ReadAllAsync(token))
150+
await ConnectNode(token);
151+
var connection = _Connection;
152+
await foreach (var item in connection.Events.Reader.ReadAllAsync(token))
128153
{
129154
await using var conn = await ConnectionFactory.CreateConnectionHelper(Network);
130155
if (item is PullBlocks pb)
131156
{
132-
var headers = ConsolidatePullBlocks(_Channel.Reader, pb);
157+
var headers = ConsolidatePullBlocks(connection.Events.Reader, pb);
133158
foreach (var batch in headers.Chunk(maxinflight))
134159
{
135-
_ = _Node.SendMessageAsync(
160+
_ = connection.Node.SendMessageAsync(
136161
new GetDataPayload(
137-
batch.Select(b => new InventoryVector(_Node.AddSupportedOptions(InventoryType.MSG_BLOCK), b.GetHash())
162+
batch.Select(b => new InventoryVector(connection.Node.AddSupportedOptions(InventoryType.MSG_BLOCK), b.GetHash())
138163
).ToArray()));
139164
var remaining = batch.Select(b => b.GetHash()).ToHashSet();
140165
List<Block> unorderedBlocks = new List<Block>();
141-
await foreach (var block in _DownloadedBlocks.Reader.ReadAllAsync(token))
166+
await foreach (var block in connection.Blocks.Reader.ReadAllAsync(token))
142167
{
143168
if (!remaining.Remove(block.Header.GetHash()))
144169
continue;
@@ -188,17 +213,14 @@ private async Task IndexerLoopCore(CancellationToken token)
188213
}
189214
}
190215
await SaveProgress(conn);
191-
await UpdateState();
216+
await UpdateState(connection.Node);
192217
}
193-
await AskNextHeaders(token);
194-
}
195-
if (item is NodeDisconnected)
196-
{
197-
await ConnectNode(token, false);
218+
if (connection.Node.State != NodeState.HandShaked)
219+
await AskNextHeaders(connection.Node, token);
198220
}
199221
if (item is Transaction tx)
200222
{
201-
var txs = PullTransactions(_Channel.Reader, tx);
223+
var txs = PullTransactions(connection.Events.Reader, tx);
202224
await SaveMatches(conn, txs, null, true);
203225
}
204226
}
@@ -255,15 +277,8 @@ private IList<BlockHeader> ConsolidatePullBlocks(ChannelReader<object> reader, P
255277
}
256278

257279

258-
private async Task ConnectNode(CancellationToken token, bool forceRestart)
280+
private async Task ConnectNode(CancellationToken token)
259281
{
260-
if (_Node is not null)
261-
{
262-
if (!forceRestart && _Node.State == NodeState.HandShaked)
263-
return;
264-
_Node.DisconnectAsync("Restarting");
265-
_Node = null;
266-
}
267282
State = BitcoinDWaiterState.NotStarted;
268283
using (var handshakeTimeout = CancellationTokenSource.CreateLinkedTokenSource(token))
269284
{
@@ -344,35 +359,35 @@ private async Task ConnectNode(CancellationToken token, bool forceRestart)
344359
State = BitcoinDWaiterState.NBXplorerSynching;
345360
// Refresh the NetworkInfo that may have become different while it was synching.
346361
NetworkInfo = await RPCClient.GetNetworkInfoAsync();
347-
_Node = node;
348-
_Channel?.Writer.Complete();
349-
_Channel = Channel.CreateUnbounded<object>();
350-
_DownloadedBlocks?.Writer.Complete();
351-
_DownloadedBlocks = Channel.CreateUnbounded<Block>();
362+
363+
_Connection?.Dispose("Creating new connection");
364+
_Connection = new Connection(node);
352365
node.MessageReceived += Node_MessageReceived;
353366
node.Disconnected += Node_Disconnected;
354-
355-
var locator = await AskNextHeaders(token);
367+
var locator = await AskNextHeaders(node, token);
356368
lastIndexedBlock = await Repository.GetLastIndexedSlimChainedBlock(locator);
357369
if (lastIndexedBlock is null)
358370
{
359371
var locatorTip = await RPCClient.GetBlockHeaderAsyncEx(locator.Blocks[0], token);
360372
lastIndexedBlock = locatorTip?.ToSlimChainedBlock();
361373
}
362-
await UpdateState();
374+
await UpdateState(node);
363375
}
364376
}
365377

366-
367378
bool firstConnect = true;
368-
private async Task<BlockLocator> AskNextHeaders(CancellationToken token)
379+
private async Task<BlockLocator> AskNextHeaders(Node node, CancellationToken token)
369380
{
370381
var indexProgress = await Repository.GetIndexProgress();
371382
if (indexProgress is null)
372383
{
373384
indexProgress = await GetDefaultCurrentLocation(token);
374385
}
375-
await _Node.SendMessageAsync(new GetHeadersPayload(indexProgress));
386+
foreach (var block in indexProgress.Blocks)
387+
{
388+
Logger.LogInformation($"Asking for block {block}");
389+
}
390+
await node.SendMessageAsync(new GetHeadersPayload(indexProgress));
376391
return indexProgress;
377392
}
378393

@@ -391,8 +406,10 @@ private async Task SaveProgress(DbConnectionHelper conn)
391406
await Repository.SetIndexProgress(conn.Connection, locator);
392407
}
393408

394-
private async Task UpdateState()
409+
private async Task UpdateState(Node node)
395410
{
411+
if (node.State != NodeState.HandShaked)
412+
return;
396413
var blockchainInfo = await RPCClient.GetBlockchainInfoAsyncEx();
397414
if (blockchainInfo.IsSynching(Network))
398415
{
@@ -508,18 +525,16 @@ private async Task SaveMatches(DbConnectionHelper conn, List<Transaction> transa
508525

509526
SlimChainedBlock lastIndexedBlock;
510527
record PullBlocks(IList<BlockHeader> headers);
511-
record NodeDisconnected();
512528
private void Node_MessageReceived(Node node, IncomingMessage message)
513529
{
514-
var channel = _Channel;
515-
var downloadedBlocks = _DownloadedBlocks;
530+
var connection = _Connection;
516531
if (message.Message.Payload is HeadersPayload h && h.Headers.Count != 0)
517532
{
518-
channel.Writer.TryWrite(new PullBlocks(h.Headers));
533+
connection.Events.Writer.TryWrite(new PullBlocks(h.Headers));
519534
}
520535
else if (message.Message.Payload is BlockPayload b)
521536
{
522-
downloadedBlocks.Writer.TryWrite(b.Object);
537+
connection.Blocks.Writer.TryWrite(b.Object);
523538
}
524539
else if (message.Message.Payload is InvPayload invs)
525540
{
@@ -535,41 +550,17 @@ private void Node_MessageReceived(Node node, IncomingMessage message)
535550
{
536551
node.SendMessageAsync(data);
537552
}
538-
// DOGE coin doing doge things forget we want header first sync... reboot the connection
539-
else
540-
{
541-
if (invs.Inventory.Where(t => t.Type.HasFlag(InventoryType.MSG_BLOCK)).Any())
542-
{
543-
node.DisconnectAsync("Not sending headers first anymore");
544-
}
545-
}
546553
}
547554
else if (message.Message.Payload is TxPayload tx)
548555
{
549-
channel.Writer.TryWrite(tx.Object);
556+
connection.Events.Writer.TryWrite(tx.Object);
550557
}
551558
}
552559

553560
private void Node_Disconnected(Node node)
554561
{
555-
var channel = _Channel;
556-
if (node.DisconnectReason.Reason != "Restarting")
557-
{
558-
if (!cts.IsCancellationRequested)
559-
{
560-
var exception = node.DisconnectReason.Exception?.Message;
561-
if (!string.IsNullOrEmpty(exception))
562-
exception = $" ({exception})";
563-
else
564-
exception = String.Empty;
565-
Logger.LogWarning($"Node disconnected for reason: {node.DisconnectReason.Reason}{exception}");
566-
}
567-
channel.Writer.TryWrite(new NodeDisconnected());
568-
}
569-
else
570-
{
571-
Logger.LogInformation($"Restarting node connection...");
572-
}
562+
Logger.LogInformation($"Node disconnected ({node.DisconnectReason.Reason})");
563+
_Connection?.Dispose();
573564
node.MessageReceived -= Node_MessageReceived;
574565
node.Disconnected -= Node_Disconnected;
575566
State = BitcoinDWaiterState.NotStarted;
@@ -589,12 +580,11 @@ public async Task StartAsync(CancellationToken cancellationToken)
589580
public async Task StopAsync(CancellationToken cancellationToken)
590581
{
591582
cts?.Cancel();
592-
_Channel.Writer.Complete();
583+
_Connection?.Dispose("NBXplorer stopping...");
593584
if (_indexerLoop is not null)
594585
await _indexerLoop;
595586
if (_watchdogLoop is not null)
596587
await _watchdogLoop;
597-
_Node?.DisconnectAsync();
598588
}
599589
public NBXplorerNetwork Network => network;
600590

0 commit comments

Comments
 (0)