Skip to content

Commit e9a235c

Browse files
New route for subscribing to events via websockets (#503)
1 parent 03c17c0 commit e9a235c

File tree

6 files changed

+255
-149
lines changed

6 files changed

+255
-149
lines changed

NBXplorer.Client/ExplorerClient.cs

+11
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,17 @@ public async Task<WebsocketNotificationSession> CreateWebsocketNotificationSessi
229229
await session.ConnectAsync(cancellation).ConfigureAwait(false);
230230
return session;
231231
}
232+
public WebsocketNotificationSessionLegacy CreateWebsocketNotificationSessionLegacy(CancellationToken cancellation = default)
233+
{
234+
return CreateWebsocketNotificationSessionLegacyAsync(cancellation).GetAwaiter().GetResult();
235+
}
236+
237+
public async Task<WebsocketNotificationSessionLegacy> CreateWebsocketNotificationSessionLegacyAsync(CancellationToken cancellation = default)
238+
{
239+
var session = new WebsocketNotificationSessionLegacy(this);
240+
await session.ConnectAsync(cancellation).ConfigureAwait(false);
241+
return session;
242+
}
232243

233244
public UTXOChanges GetUTXOs(TrackedSource trackedSource, CancellationToken cancellation = default)
234245
{

NBXplorer.Client/WebsocketNotificationSession.cs

+69-60
Original file line numberDiff line numberDiff line change
@@ -7,71 +7,16 @@
77
using System.Text;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using NBitcoin.Protocol;
1011

1112
namespace NBXplorer
1213
{
13-
public class WebsocketNotificationSession : NotificationSessionBase, IDisposable
14+
public class WebsocketNotificationSessionLegacy : WebsocketNotificationSession
1415
{
15-
16-
private readonly ExplorerClient _Client;
17-
public ExplorerClient Client
18-
{
19-
get
20-
{
21-
return _Client;
22-
}
23-
}
24-
internal WebsocketNotificationSession(ExplorerClient client)
25-
{
26-
if(client == null)
27-
throw new ArgumentNullException(nameof(client));
28-
_Client = client;
29-
}
30-
31-
internal async Task ConnectAsync(CancellationToken cancellation)
32-
{
33-
var uri = _Client.GetFullUri($"v1/cryptos/{_Client.CryptoCode}/connect");
34-
uri = ToWebsocketUri(uri);
35-
WebSocket socket = null;
36-
try
37-
{
38-
socket = await ConnectAsyncCore(uri, cancellation);
39-
}
40-
catch(WebSocketException) // For some reason the ErrorCode is not properly set, so we can check for error 401
41-
{
42-
if(!_Client.Auth.RefreshCache())
43-
throw;
44-
socket = await ConnectAsyncCore(uri, cancellation);
45-
}
46-
JsonSerializerSettings settings = new JsonSerializerSettings();
47-
new Serializer(_Client.Network).ConfigureSerializer(settings);
48-
_MessageListener = new WebsocketMessageListener(socket, settings);
49-
}
50-
51-
private async Task<ClientWebSocket> ConnectAsyncCore(string uri, CancellationToken cancellation)
52-
{
53-
var socket = new ClientWebSocket();
54-
_Client.Auth.SetWebSocketAuth(socket);
55-
try
56-
{
57-
await socket.ConnectAsync(new Uri(uri, UriKind.Absolute), cancellation).ConfigureAwait(false);
58-
}
59-
catch { socket.Dispose(); throw; }
60-
return socket;
61-
}
62-
63-
private static string ToWebsocketUri(string uri)
16+
protected override FormattableString GetConnectPath() => $"v1/cryptos/{_Client.CryptoCode}/connect";
17+
internal WebsocketNotificationSessionLegacy(ExplorerClient client) : base(client)
6418
{
65-
if(uri.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
66-
uri = uri.Replace("https://", "wss://");
67-
if(uri.StartsWith("http://", StringComparison.OrdinalIgnoreCase))
68-
uri = uri.Replace("http://", "ws://");
69-
return uri;
7019
}
71-
72-
WebsocketMessageListener _MessageListener;
73-
UTF8Encoding UTF8 = new UTF8Encoding(false, true);
74-
7520
public void ListenNewBlock(CancellationToken cancellation = default)
7621
{
7722
ListenNewBlockAsync(cancellation).GetAwaiter().GetResult();
@@ -128,7 +73,7 @@ public void ListenDerivationSchemes(DerivationStrategyBase[] derivationSchemes,
12873

12974
public Task ListenDerivationSchemesAsync(DerivationStrategyBase[] derivationSchemes, CancellationToken cancellation = default)
13075
{
131-
return _MessageListener.Send(new Models.NewTransactionEventRequest() { DerivationSchemes = derivationSchemes.Select(d=>d.ToString()).ToArray(), CryptoCode = _Client.CryptoCode }, null, cancellation);
76+
return _MessageListener.Send(new Models.NewTransactionEventRequest() { DerivationSchemes = derivationSchemes.Select(d => d.ToString()).ToArray(), CryptoCode = _Client.CryptoCode }, null, cancellation);
13277
}
13378

13479
public void ListenTrackedSources(TrackedSource[] trackedSources, CancellationToken cancellation = default)
@@ -141,6 +86,70 @@ public Task ListenTrackedSourcesAsync(TrackedSource[] trackedSources, Cancellati
14186
return _MessageListener.Send(new Models.NewTransactionEventRequest() { TrackedSources = trackedSources.Select(d => d.ToString()).ToArray(), CryptoCode = _Client.CryptoCode }, null, cancellation);
14287
}
14388

89+
}
90+
public class WebsocketNotificationSession : NotificationSessionBase, IDisposable
91+
{
92+
93+
protected readonly ExplorerClient _Client;
94+
public ExplorerClient Client
95+
{
96+
get
97+
{
98+
return _Client;
99+
}
100+
}
101+
internal WebsocketNotificationSession(ExplorerClient client)
102+
{
103+
if(client == null)
104+
throw new ArgumentNullException(nameof(client));
105+
_Client = client;
106+
}
107+
108+
internal async Task ConnectAsync(CancellationToken cancellation)
109+
{
110+
var uri = _Client.GetFullUri(GetConnectPath());
111+
uri = ToWebsocketUri(uri);
112+
WebSocket socket = null;
113+
try
114+
{
115+
socket = await ConnectAsyncCore(uri, cancellation);
116+
}
117+
catch (WebSocketException) // For some reason the ErrorCode is not properly set, so we can check for error 401
118+
{
119+
if (!_Client.Auth.RefreshCache())
120+
throw;
121+
socket = await ConnectAsyncCore(uri, cancellation);
122+
}
123+
JsonSerializerSettings settings = new JsonSerializerSettings();
124+
new Serializer(_Client.Network).ConfigureSerializer(settings);
125+
_MessageListener = new WebsocketMessageListener(socket, settings);
126+
}
127+
128+
protected virtual FormattableString GetConnectPath() => $"v1/cryptos/connect?cryptoCode={_Client.Network.CryptoCode}";
129+
130+
private async Task<ClientWebSocket> ConnectAsyncCore(string uri, CancellationToken cancellation)
131+
{
132+
var socket = new ClientWebSocket();
133+
_Client.Auth.SetWebSocketAuth(socket);
134+
try
135+
{
136+
await socket.ConnectAsync(new Uri(uri, UriKind.Absolute), cancellation).ConfigureAwait(false);
137+
}
138+
catch { socket.Dispose(); throw; }
139+
return socket;
140+
}
141+
142+
private static string ToWebsocketUri(string uri)
143+
{
144+
if(uri.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
145+
uri = uri.Replace("https://", "wss://");
146+
if(uri.StartsWith("http://", StringComparison.OrdinalIgnoreCase))
147+
uri = uri.Replace("http://", "ws://");
148+
return uri;
149+
}
150+
151+
protected WebsocketMessageListener _MessageListener;
152+
144153
public override Task<NewEventBase> NextEventAsync(CancellationToken cancellation = default)
145154
{
146155
return _MessageListener.NextMessageAsync(cancellation);

NBXplorer.Tests/UnitTest1.cs

+41-27
Original file line numberDiff line numberDiff line change
@@ -2015,39 +2015,38 @@ public async Task CanPrune2()
20152015
}
20162016
}
20172017

2018-
[FactWithTimeout]
2019-
public async Task CanUseWebSockets()
2018+
[TheoryWithTimeout]
2019+
[InlineData(false)]
2020+
[InlineData(true)]
2021+
public async Task CanUseWebSockets(bool legacyAPI)
20202022
{
20212023
using (var tester = ServerTester.Create())
20222024
{
20232025
tester.Client.WaitServerStarted();
20242026
var key = new BitcoinExtKey(new ExtKey(), tester.Network);
20252027
var pubkey = tester.CreateDerivationStrategy(key.Neuter(), true);
20262028
await tester.Client.TrackAsync(pubkey);
2027-
using (var connected = tester.Client.CreateWebsocketNotificationSession())
2029+
using (var connected = CreateNotificationSession(tester, legacyAPI))
20282030
{
2029-
connected.ListenNewBlock();
2031+
var legacy = connected as WebsocketNotificationSessionLegacy;
2032+
legacy?.ListenNewBlock();
20302033
var expectedBlockId = tester.Explorer.CreateRPCClient().Generate(1)[0];
2031-
var blockEvent = (Models.NewBlockEvent)connected.NextEvent(Cancel);
2032-
// Sometimes Postgres backend emit one more block during warmup. That's not a bug,
2033-
// but make test flaky.
2034-
if (blockEvent.Hash != expectedBlockId)
2035-
blockEvent = (Models.NewBlockEvent)connected.NextEvent(Cancel);
2034+
var blockEvent = await WaitBlock(connected, expectedBlockId, Cancel);
20362035

20372036
Assert.True(blockEvent.EventId != 0);
20382037
Assert.Equal(expectedBlockId, blockEvent.Hash);
20392038
Assert.NotEqual(0, blockEvent.Height);
20402039

20412040
Assert.Equal(1, blockEvent.Confirmations);
20422041

2043-
connected.ListenDerivationSchemes(new[] { pubkey });
2042+
legacy?.ListenDerivationSchemes(new[] { pubkey });
20442043
await tester.SendToAddressAsync(tester.AddressOf(pubkey, "0/1"), Money.Coins(1.0m));
20452044

20462045
var txEvent = (Models.NewTransactionEvent)connected.NextEvent(Cancel);
20472046
Assert.Equal(txEvent.DerivationStrategy, pubkey);
20482047
}
20492048

2050-
using (var connected = tester.Client.CreateWebsocketNotificationSession())
2049+
using (var connected = tester.Client.CreateWebsocketNotificationSessionLegacy())
20512050
{
20522051
connected.ListenAllDerivationSchemes();
20532052
await tester.SendToAddressAsync(tester.AddressOf(pubkey, "0/1"), Money.Coins(1.0m));
@@ -2056,7 +2055,7 @@ public async Task CanUseWebSockets()
20562055
Assert.Equal(txEvent.DerivationStrategy, pubkey);
20572056
}
20582057

2059-
using (var connected = tester.Client.CreateWebsocketNotificationSession())
2058+
using (var connected = tester.Client.CreateWebsocketNotificationSessionLegacy())
20602059
{
20612060
connected.ListenAllTrackedSource();
20622061
await tester.SendToAddressAsync(tester.AddressOf(pubkey, "0/1"), Money.Coins(1.0m));
@@ -2123,7 +2122,7 @@ public async Task CanUseWebSockets2()
21232122

21242123
(var pubkey, var pubkey2) = (wLegacy.DerivationScheme, wSegwit.DerivationScheme);
21252124

2126-
using (var connected = tester.Client.CreateWebsocketNotificationSession())
2125+
using (var connected = tester.Client.CreateWebsocketNotificationSessionLegacy())
21272126
{
21282127
connected.ListenAllDerivationSchemes();
21292128
tester.Explorer.CreateRPCClient().SendCommand(RPCOperations.sendmany, "",
@@ -2313,7 +2312,7 @@ public async Task CanTrack3()
23132312
var key = new BitcoinExtKey(new ExtKey(), tester.Network);
23142313
var pubkey = tester.CreateDerivationStrategy(key.Neuter());
23152314
await tester.Client.TrackAsync(pubkey);
2316-
var events = tester.Client.CreateWebsocketNotificationSession();
2315+
var events = tester.Client.CreateWebsocketNotificationSessionLegacy();
23172316
events.ListenDerivationSchemes(new[] { pubkey });
23182317

23192318
Logs.Tester.LogInformation("Let's send to 0/0, 0/1, 0/2, 0, 1");
@@ -2418,35 +2417,37 @@ public async Task CanTrackSeveralTransactions()
24182417
}
24192418
}
24202419

2421-
[FactWithTimeout]
2422-
public async void CanUseWebSocketsOnAddress()
2420+
[TheoryWithTimeout]
2421+
[InlineData(false)]
2422+
[InlineData(true)]
2423+
public async Task CanUseWebSocketsOnAddress(bool legacyAPI)
24232424
{
24242425
using (var tester = ServerTester.Create())
24252426
{
24262427
tester.Client.WaitServerStarted();
2427-
await Task.Delay(500);
24282428
var key = new Key();
24292429
var pubkey = TrackedSource.Create(key.PubKey.GetAddress(ScriptPubKeyType.Legacy, tester.Network));
24302430
tester.Client.Track(pubkey);
2431-
using (var connected = tester.Client.CreateWebsocketNotificationSession())
2431+
using (var connected = CreateNotificationSession(tester, legacyAPI))
24322432
{
2433-
connected.ListenNewBlock();
2434-
var expectedBlockId = tester.Explorer.CreateRPCClient().Generate(1)[0];
2435-
var blockEvent = (Models.NewBlockEvent)connected.NextEvent(Cancel);
2433+
var legacy = connected as WebsocketNotificationSessionLegacy;
2434+
legacy?.ListenNewBlock();
2435+
var expectedBlockId = (await tester.Explorer.CreateRPCClient().GenerateAsync(1))[0];
2436+
var blockEvent = await WaitBlock(connected, expectedBlockId, Cancel);
24362437
Assert.Equal(expectedBlockId, blockEvent.Hash);
24372438
Assert.NotEqual(0, blockEvent.Height);
24382439

2439-
connected.ListenTrackedSources(new[] { pubkey });
2440+
legacy?.ListenTrackedSources(new[] { pubkey });
24402441
tester.SendToAddress(pubkey.Address, Money.Coins(1.0m));
24412442

2442-
var txEvent = (Models.NewTransactionEvent)connected.NextEvent(Cancel);
2443+
var txEvent = (Models.NewTransactionEvent)await connected.NextEventAsync(Cancel);
24432444
Assert.NotEmpty(txEvent.Outputs);
24442445
Assert.Equal(pubkey.Address.ScriptPubKey, txEvent.Outputs[0].ScriptPubKey);
24452446
Assert.Equal(pubkey.Address, txEvent.Outputs[0].Address);
24462447
Assert.Equal(txEvent.TrackedSource, pubkey);
24472448
}
24482449

2449-
using (var connected = tester.Client.CreateWebsocketNotificationSession())
2450+
using (var connected = tester.Client.CreateWebsocketNotificationSessionLegacy())
24502451
{
24512452
connected.ListenAllTrackedSource();
24522453
tester.SendToAddress(pubkey.Address, Money.Coins(1.0m));
@@ -2457,6 +2458,19 @@ public async void CanUseWebSocketsOnAddress()
24572458
}
24582459
}
24592460

2461+
private WebsocketNotificationSession CreateNotificationSession(ServerTester tester, bool legacyAPI)
2462+
=> legacyAPI ? tester.Client.CreateWebsocketNotificationSessionLegacy() : tester.Client.CreateWebsocketNotificationSession();
2463+
2464+
private async Task<Models.NewBlockEvent> WaitBlock(WebsocketNotificationSession connected, uint256 expectedBlockId, CancellationToken cancel)
2465+
{
2466+
var evt = await connected.NextEventAsync(Cancel);
2467+
while (evt is not Models.NewBlockEvent b || b.Hash != expectedBlockId)
2468+
{
2469+
evt = await connected.NextEventAsync(Cancel);
2470+
}
2471+
return (Models.NewBlockEvent)evt;
2472+
}
2473+
24602474
[FactWithTimeout]
24612475
public async Task CanUseWebSocketsOnAddress2()
24622476
{
@@ -2471,7 +2485,7 @@ public async Task CanUseWebSocketsOnAddress2()
24712485

24722486
await tester.Client.TrackAsync(pubkey);
24732487
await tester.Client.TrackAsync(pubkey2);
2474-
using (var connected = tester.Client.CreateWebsocketNotificationSession())
2488+
using (var connected = tester.Client.CreateWebsocketNotificationSessionLegacy())
24752489
{
24762490
connected.ListenAllTrackedSource();
24772491
tester.Explorer.CreateRPCClient().SendCommand(RPCOperations.sendmany, "",
@@ -2920,7 +2934,7 @@ public async Task CanRescan()
29202934
var txId3 = tester.SendToAddress(tester.AddressOf(key, "0/0"), Money.Coins(1.0m));
29212935
var txId4 = tester.SendToAddress(tester.AddressOf(key, "0/0"), Money.Coins(1.0m));
29222936
var tx4 = tester.RPC.GetRawTransaction(txId4);
2923-
var notify = tester.Client.CreateWebsocketNotificationSession();
2937+
var notify = tester.Client.CreateWebsocketNotificationSessionLegacy();
29242938
notify.ListenNewBlock();
29252939
var blockId = tester.RPC.Generate(1)[0];
29262940
var blockId2 = tester.RPC.Generate(1)[0];
@@ -3916,7 +3930,7 @@ public async Task ElementsTests()
39163930
DerivationFeature.Deposit).Address);
39173931

39183932
Assert.IsType<BitcoinBlindedAddress>(tester.Client.GetKeyInformation(userDerivationScheme, address.ScriptPubKey).Address);
3919-
using (var session = await tester.Client.CreateWebsocketNotificationSessionAsync(Timeout))
3933+
using (var session = await tester.Client.CreateWebsocketNotificationSessionLegacyAsync(Timeout))
39203934
{
39213935
await session.ListenAllTrackedSourceAsync(cancellation: Timeout);
39223936

0 commit comments

Comments
 (0)