Skip to content

Commit 1ca4677

Browse files
authored
smp server: messaging services (#1565)
* smp server: refactor message delivery to always respond SOK to subscriptions * refactor ntf subscribe * cancel subscription thread and reduce service subscription count when queue is deleted * subscribe rcv service, deliver sent messages to subscribed service * subscribe rcv service to messages (TODO delivery on subscription) * WIP * efficient initial delivery of messages to subscribed service * test: delivery to client with service certificate * test: upgrade/downgrade to/from service subscriptions * remove service association from agent API, add per-user flag to use the service * agent client (WIP) * service certificates in the client * rfc about drift detection, and SALL to mark end of message delivery * fix test * fix test * add function for postgresql message storage * update migration
1 parent 3016b92 commit 1ca4677

31 files changed

+969
-305
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Detecting and fixing state with service subscriptions
2+
3+
## Problem
4+
5+
While service certificates and subscriptions hugely decrease startup time and delivery delays on server restarts, they introduce the risk of losing subscriptions in case of state drifts. They also do not provide efficient mechanism for validating that the list of subscribed queues is in sync.
6+
7+
How can the state drift happen?
8+
9+
There are several possibilities:
10+
- lost broker response would make the broker consider that the queue is associated, but the client won't know it, and will have to re-associate. While in itself it is not a problem, as it'll be resolved, it would make drift detected more frequently (regardless of the detection logic used). That service certificates are used on clients with good connection would make it less likely though.
11+
- server state restored from the backup, in case of some failure. Nothing can be done to recover lost queues, but we may restore lost service associations.
12+
- queue blocking or removal by server operator because of policy violation.
13+
- server downgrade (when it loses all service associations) with subsequent upgrade - the client would think queues are associated, while they are not, and won't receive any messages at all in this scenario.
14+
- any other server-side error or logic error.
15+
16+
In addition to the possibility of the drift, we simply need to have confidence that service subscriptions work as intended, without skipping queues. We ignored this consideration for notifications, as the tolerance to lost notifications is higher, but we can't ignore it for messages.
17+
18+
## Solution
19+
20+
Previously considered approach of sending NIL to all queues without messages is very expensive for traffic (most queues don't have messages), and it is also very expensive to detect and validate drift in the client because of asynchronous / concurrent events.
21+
22+
We cannot read all queues into memory, and we cannot aggregate all responses in memory, and we cannot create database writes on every single service subscription to say 1m queues (a realistic number), as it simply won't work well even at the current scale.
23+
24+
An approach of having an efficient way to detect drift, but load the full list of IDs when drift is detected, also won't work well, as drifts may be common, so we need both efficient way to detect there is diff and also to reconcile it.
25+
26+
### Drift detection
27+
28+
Both client and server would maintain the number of associated queues and the "symmetric" hash over the set of queue IDs. The requirements for this hash algorithm are:
29+
- not cryptographically strong, to be fast.
30+
- 128 bits to minimize collisions over the large set of millions of queues.
31+
- symmetric - the result should not depend on ID order.
32+
- allows fast additions and removals.
33+
34+
In this way, every time association is added or removed (including queue marked as deleted), both peers would recompute this hash in the same transaction.
35+
36+
The client would suspend sending and processing any other commands on the server and the queues of this server until SOKS response is received from this server, to prevent drift. It can be achieved with per-server semaphores/locks in memory. UI clients need to become responsive sooner than these responses are received, but we do not service certificates on UI clients, and chat relays may prevent operations on server queues until SOKS response is received.
37+
38+
SOKS response would include both the count of associated queues (as now) and the hash over all associated queue IDs (to be added). If both count and hash match, the client will not do anything. If either does not match the client would perform full sync (see below).
39+
40+
There is a value from doing the same in notification server as well to detect and "fix" drifts.
41+
42+
The algorithm to compute hashes can be the following.
43+
44+
1. Compute hash of each queue ID using xxHash3_128 ([xxhash-ffi](https://hackage.haskell.org/package/xxhash-ffi) library). They don't need to be stored or loaded at once, initially, it can be done with streaming if it is detected on start that there is no pre-computed hash.
45+
2. Combine hashes using XOR. XOR is both commutative and associative, so it would produce the same aggregate hash irrespective of the ID order.
46+
3. Adding queue ID to pre-computed hash requires a single XOR with ID hash: `new_aggregate = aggregate XOR hash(queue_id)`.
47+
4. Removing queue ID from pre-computed hash also requires the same XOR (XOR is involutory, it undoes itself): `new_aggregate = aggregate XOR hash(queue_id)`.
48+
49+
These hashes need to be computed per user/server in the client and per service certificate in the server - on startup both have to validate and compute them once if necessary.
50+
51+
There can be also a start-up option to recompute hashe(s) to detect and fix any errors.
52+
53+
This is all rather simple and would help detecting drifts.
54+
55+
### Synchronization when drift is detected
56+
57+
The assumption here is that in most cases drifts are rare, and isolated to few IDs (e.g., this is the case with notification server).
58+
59+
But the algorithm should be resilient to losing all associations, and it should not be substantially worse than simply restoring all associations or loading all IDs.
60+
61+
We have `c_n` and `c_hash` for client-side count and hash of queue IDs and `s_n` and `s_hash` for server-side, which are returned in SOKS response to SUBS command.
62+
63+
1. If `c_n /= s_n || c_hash /= s_hash`, the client must perform sync.
64+
65+
2. If `abs(c_n - s_n) / max(c_n, s_n) > 0.5`, the client will request the full list of queues (more than half of the queues are different), and will perform diff with the queues it has. While performing the diff the client will continue block operations with this user/server.
66+
67+
3. Otherwise would perform some algorithm for determining the difference between queue IDs between client and server. This algorithm can be made efficient (`O(log N)`) by relying on efficient sorting of IDs and database loading of ranges, via computing and communicating hashes of ranges, and performing a binary search on ranges, with batching to optimize network traffic.
68+
69+
This algorithm is similar to Merkle tree reconcilliation, but it is optimized for database reading of ordered ranges, and for our 16kb block size to minimize network requests.
70+
71+
The algorithm:
72+
1. The client would request all ranges from the server.
73+
2. The server would compute hashes for N ranges of IDs and send them to the client. Each range would include start_id, optional end_id (for single ID ranges) and XOR-hash of the range. N is determined based on the block size and the range size.
74+
3. The client would perform the same computation for the same ranges, and compare them with the returned ranges from the server, while detecting any gaps between ranges and missing range boundaries.
75+
4. If more than half of the ranges don't match, the client would request the full list. Otherwise it would repeat the same algorithm for each mismatched range and for gaps.
76+
77+
It can be further optimized by merging adjacent ranges and by batching all range requests, it is quite simple.
78+
79+
Once the client determines the list of missing and extra queues it can:
80+
- create associations (via SUB) for missing queues,
81+
- request removal of association (a new command, e.g. BUS) for extra queues on the server.
82+
83+
The pseudocode for the algorightm:
84+
85+
For the server to return all ranges or subranges of requested range:
86+
87+
```haskell
88+
getSubRanges :: Maybe (RecipientId, RecipientId) -> [(RecipientId, Maybe RecipientId, Hash)]
89+
getSubRanges range_ = do
90+
((min_id, max_id), s_n) <- case range_ of
91+
Nothing -> getAssociatedQueueRange -- with the certificate in the client session.
92+
Just range -> (range,) <$> getAssociatedQueueCount range
93+
if
94+
| s_n <= max_N -> reply_with_single_queue_ranges
95+
| otherwise -> do
96+
let range_size = s_n `div` max_N
97+
read_all_ranges -- in a recursive loop, with max_id, range_hash and next_min_id in each step
98+
reply_ranges
99+
```
100+
101+
We don't need to implement this synchronization logic right now, so not including client logic here, it's sufficient to implement drift detection, and the action to fix the drift would be to disable and to re-enable certificates via some command-line parameter of CLI.

simplexmq.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ library
216216
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
217217
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
218218
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
219+
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251020_service_certs
219220
if flag(client_postgres) || flag(server_postgres)
220221
exposed-modules:
221222
Simplex.Messaging.Agent.Store.Postgres
@@ -553,6 +554,7 @@ test-suite simplexmq-test
553554
, text
554555
, time
555556
, timeit ==2.0.*
557+
, tls >=1.9.0 && <1.10
556558
, transformers
557559
, unliftio
558560
, unliftio-core

0 commit comments

Comments
 (0)