Skip to content

Commit 2b35310

Browse files
authored
servers: maintain xor-hash of all associated queue IDs in PostgreSQL (#1615)
* ntf server: maintain xor-hash of all associated queue IDs via PostgreSQL triggers * smp server: xor hash with triggers * fix sql and using pgcrypto extension in tests * track counts and hashes in smp/ntf servers via triggers, smp server stats for service subscription, update SMP protocol to pass expected count and hash in SSUB/NSSUB commands * agent migrations with functions/triggers * remove agent triggers * try tracking service subs in the agent (WIP, does not compile) * Revert "try tracking service subs in the agent (WIP, does not compile)" This reverts commit 59e9081. * comment
1 parent cdcb8e7 commit 2b35310

36 files changed

+2437
-239
lines changed

simplexmq.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ library
167167
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
168168
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
169169
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
170+
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs
170171
else
171172
exposed-modules:
172173
Simplex.Messaging.Agent.Store.SQLite
@@ -223,6 +224,7 @@ library
223224
Simplex.Messaging.Agent.Store.Postgres.Common
224225
Simplex.Messaging.Agent.Store.Postgres.DB
225226
Simplex.Messaging.Agent.Store.Postgres.Migrations
227+
Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
226228
Simplex.Messaging.Agent.Store.Postgres.Util
227229
if !flag(client_library)
228230
exposed-modules:

src/Simplex/Messaging/Agent.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ import Simplex.Messaging.Protocol
211211
ErrorType (AUTH),
212212
MsgBody,
213213
MsgFlags (..),
214-
IdsHash,
215214
NtfServer,
216215
ProtoServerWithAuth (..),
217216
ProtocolServer (..),
@@ -222,6 +221,7 @@ import Simplex.Messaging.Protocol
222221
SMPMsgMeta,
223222
SParty (..),
224223
SProtocolType (..),
224+
ServiceSub (..),
225225
SndPublicAuthKey,
226226
SubscriptionMode (..),
227227
UserProtocol,
@@ -500,7 +500,7 @@ resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either Agen
500500
resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
501501
{-# INLINE resubscribeConnections #-}
502502

503-
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
503+
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSub))
504504
subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
505505
{-# INLINE subscribeClientServices #-}
506506

@@ -1507,15 +1507,15 @@ resubscribeConnections' c connIds = do
15071507
[] -> pure True
15081508
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'
15091509

1510-
-- TODO [certs rcv] compare hash with lock
1511-
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
1510+
-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
1511+
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSub))
15121512
subscribeClientServices' c userId =
15131513
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
15141514
where
15151515
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
15161516
subscribe = do
15171517
srvs <- withStore' c (`getClientServiceServers` userId)
1518-
lift $ M.fromList . zip srvs <$> mapConcurrently (tryAllErrors' . subscribeClientService c userId) srvs
1518+
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c userId srv n idsHash) srvs
15191519

15201520
-- requesting messages sequentially, to reduce memory usage
15211521
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ import Simplex.Messaging.Protocol
279279
RcvNtfPublicDhKey,
280280
SMPMsgMeta (..),
281281
SProtocolType (..),
282+
ServiceSub,
282283
SndPublicAuthKey,
283284
SubscriptionMode (..),
284285
NewNtfCreds (..),
@@ -1689,10 +1690,10 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
16891690
logError $ "processClientNotices error: " <> tshow e
16901691
notifySub' c "" $ ERR e
16911692

1692-
subscribeClientService :: AgentClient -> UserId -> SMPServer -> AM (Int64, IdsHash)
1693-
subscribeClientService c userId srv =
1694-
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $
1695-
(`subscribeService` SMP.SRecipientService) . connectedClient
1693+
subscribeClientService :: AgentClient -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSub
1694+
subscribeClientService c userId srv n idsHash =
1695+
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
1696+
subscribeService smp SMP.SRecipientService n idsHash
16961697

16971698
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
16981699
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)

src/Simplex/Messaging/Agent/NtfSubSupervisor.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ runNtfWorker c srv Worker {doWork} =
314314
_ -> ((ntfSubConnId sub, INTERNAL "NSACheck - no subscription ID") : errs, subs, subIds)
315315
updateSub :: DB.Connection -> NtfServer -> UTCTime -> UTCTime -> (NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer)
316316
updateSub db ntfServer ts nextCheckTs (sub, status)
317-
| ntfShouldSubscribe status =
317+
| status `elem` subscribeNtfStatuses =
318318
let sub' = sub {ntfSubStatus = NASCreated status}
319319
in Nothing <$ updateNtfSubscription db sub' (NSANtf NSACheck) nextCheckTs
320320
-- ntf server stopped subscribing to this queue

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,19 +431,20 @@ getClientService db userId srv =
431431
where
432432
toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)
433433

434-
getClientServiceServers :: DB.Connection -> UserId -> IO [SMPServer]
434+
getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)]
435435
getClientServiceServers db userId =
436436
map toServer
437437
<$> DB.query
438438
db
439439
[sql|
440-
SELECT c.host, c.port, s.key_hash
440+
SELECT c.host, c.port, s.key_hash, c.rcv_service_id, c.rcv_service_queue_count, c.rcv_service_queue_ids_hash
441441
FROM client_services c
442442
JOIN servers s ON s.host = c.host AND s.port = c.port
443443
|]
444444
(Only userId)
445445
where
446-
toServer (host, port, kh) = SMPServer host port kh
446+
toServer (host, port, kh, serviceId, n, Binary idsHash) =
447+
(SMPServer host port kh, ServiceSub serviceId n (IdsHash idsHash))
447448

448449
setClientServiceId :: DB.Connection -> UserId -> SMPServer -> ServiceId -> IO ()
449450
setClientServiceId db userId srv serviceId =
@@ -2099,7 +2100,7 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} subMode serverKeyHash_ = do
20992100
ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret
21002101
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
21012102
|]
2102-
( (host server, port server, rcvId, rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
2103+
( (host server, port server, rcvId, BI rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
21032104
:. (sndId, queueMode, status, BI toSubscribe, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
21042105
:. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
21052106
:. ntfCredsFields
@@ -2468,13 +2469,13 @@ rcvQueueQuery =
24682469

24692470
toRcvQueue ::
24702471
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
2471-
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, ServiceAssoc)
2472+
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, BoolInt)
24722473
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
24732474
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
24742475
RcvQueue
24752476
toRcvQueue
24762477
( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode)
2477-
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, rcvServiceAssoc)
2478+
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, BI rcvServiceAssoc)
24782479
:. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)
24792480
:. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_)
24802481
) =

src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
1010
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
1111
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
1212
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
13+
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs
1314
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
1415

1516
schemaMigrations :: [(String, Text, Maybe Text)]
@@ -19,7 +20,8 @@ schemaMigrations =
1920
("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
2021
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
2122
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
22-
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices)
23+
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
24+
("20251020_service_certs", m20251020_service_certs, Just down_m20251020_service_certs)
2325
]
2426

2527
-- | The list of migrations in ascending order by date
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs where
4+
5+
import Data.Text (Text)
6+
import qualified Data.Text as T
7+
import Text.RawString.QQ (r)
8+
9+
m20251020_service_certs :: Text
10+
m20251020_service_certs =
11+
T.pack
12+
[r|
13+
CREATE TABLE client_services(
14+
user_id BIGINT NOT NULL REFERENCES users ON UPDATE RESTRICT ON DELETE CASCADE,
15+
host TEXT NOT NULL,
16+
port TEXT NOT NULL,
17+
service_cert BYTEA NOT NULL,
18+
service_cert_hash BYTEA NOT NULL,
19+
service_priv_key BYTEA NOT NULL,
20+
service_id BYTEA,
21+
service_queue_count BIGINT NOT NULL DEFAULT 0,
22+
service_queue_ids_hash BYTEA NOT NULL DEFAULT '\x00000000000000000000000000000000',
23+
FOREIGN KEY(host, port) REFERENCES servers ON UPDATE CASCADE ON DELETE RESTRICT
24+
);
25+
26+
CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(user_id, host, port);
27+
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);
28+
29+
ALTER TABLE rcv_queues ADD COLUMN rcv_service_assoc SMALLINT NOT NULL DEFAULT 0;
30+
|]
31+
32+
down_m20251020_service_certs :: Text
33+
down_m20251020_service_certs =
34+
T.pack
35+
[r|
36+
ALTER TABLE rcv_queues DROP COLUMN rcv_service_assoc;
37+
38+
DROP INDEX idx_server_certs_host_port;
39+
DROP INDEX idx_server_certs_user_id_host_port;
40+
DROP TABLE client_services;
41+
|]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Simplex.Messaging.Agent.Store.Postgres.Migrations.Util where
4+
5+
import Data.Text (Text)
6+
import qualified Data.Text as T
7+
import Text.RawString.QQ (r)
8+
9+
-- xor_combine is only applied to locally computed md5 hashes (128 bits/16 bytes),
10+
-- so it is safe to require that all values are of the same length.
11+
createXorHashFuncs :: Text
12+
createXorHashFuncs =
13+
T.pack
14+
[r|
15+
CREATE OR REPLACE FUNCTION xor_combine(state BYTEA, value BYTEA) RETURNS BYTEA
16+
LANGUAGE plpgsql IMMUTABLE STRICT
17+
AS $$
18+
DECLARE
19+
result BYTEA := state;
20+
i INTEGER;
21+
len INTEGER := octet_length(value);
22+
BEGIN
23+
IF octet_length(state) != len THEN
24+
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
25+
END IF;
26+
FOR i IN 0..len-1 LOOP
27+
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
28+
END LOOP;
29+
RETURN result;
30+
END;
31+
$$;
32+
33+
CREATE OR REPLACE AGGREGATE xor_aggregate(BYTEA) (
34+
SFUNC = xor_combine,
35+
STYPE = BYTEA,
36+
INITCOND = '\x00000000000000000000000000000000' -- 16 bytes
37+
);
38+
|]
39+
40+
dropXorHashFuncs :: Text
41+
dropXorHashFuncs =
42+
T.pack
43+
[r|
44+
DROP AGGREGATE xor_aggregate(BYTEA);
45+
DROP FUNCTION xor_combine;
46+
|]

0 commit comments

Comments
 (0)