Skip to content

Commit 80aa56c

Browse files
authored
agent: track which queues need subscribing for iOS NSE (#1657)
* agent: track which queues need subscribing for iOS NSE * fix down migration * fix, cleanup
1 parent 9cda203 commit 80aa56c

File tree

11 files changed

+121
-52
lines changed

11 files changed

+121
-52
lines changed

simplexmq.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ library
163163
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
164164
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
165165
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
166+
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
166167
else
167168
exposed-modules:
168169
Simplex.Messaging.Agent.Store.SQLite
@@ -210,6 +211,7 @@ library
210211
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250203_msg_bodies
211212
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links
212213
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
214+
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
213215
if flag(client_postgres) || flag(server_postgres)
214216
exposed-modules:
215217
Simplex.Messaging.Agent.Store.Postgres

src/Simplex/Messaging/Agent.hs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ import Data.Functor.Identity
152152
import Data.Int (Int64)
153153
import Data.IntMap.Strict (IntMap)
154154
import qualified Data.IntMap.Strict as IM
155-
import Data.List (find)
155+
import Data.List (find, sortOn)
156156
import Data.List.NonEmpty (NonEmpty (..))
157157
import qualified Data.List.NonEmpty as L
158158
import Data.Map.Strict (Map)
@@ -445,8 +445,8 @@ subscribeConnections c = withAgentEnv c . subscribeConnections' c
445445
{-# INLINE subscribeConnections #-}
446446

447447
-- | Subscribe to all connections
448-
subscribeAllConnections :: AgentClient -> AE ()
449-
subscribeAllConnections c = withAgentEnv c $ subscribeAllConnections' c
448+
subscribeAllConnections :: AgentClient -> Bool -> Maybe UserId -> AE ()
449+
subscribeAllConnections c = withAgentEnv c .: subscribeAllConnections' c
450450

451451
-- | Get messages for connections (GET commands)
452452
getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
@@ -972,7 +972,7 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userData_ clientData pqInitKey
972972
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
973973
(rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
974974
atomically $ incSMPServerStat c userId srv connCreated
975-
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq
975+
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode
976976
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
977977
mapM_ (newQueueNtfSubscription c rq') ntfServer_
978978
pure (rq', qUri)
@@ -1224,7 +1224,7 @@ createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientV
12241224
(rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
12251225
atomically $ incSMPServerStat c userId (qServer rq) connCreated
12261226
let qInfo = toVersionT qUri smpClientVersion
1227-
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq
1227+
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode
12281228
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
12291229
mapM_ (newQueueNtfSubscription c rq') ntfServer_
12301230
pure (qInfo, clientServiceId rq')
@@ -1357,25 +1357,30 @@ subscribeConnections_ c conns = do
13571357
when (actual /= expected) . atomically $
13581358
writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected)
13591359

1360-
subscribeAllConnections' :: AgentClient -> AM ()
1361-
subscribeAllConnections' c = do
1362-
userSrvs <- withStore' c getSubscriptionServers
1363-
maxPending <- asks $ maxPendingSubscriptions . config
1364-
currPending <- newTVarIO 0
1360+
subscribeAllConnections' :: AgentClient -> Bool -> Maybe UserId -> AM ()
1361+
subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
1362+
userSrvs <- withStore' c (`getSubscriptionServers` onlyNeeded)
13651363
unless (null userSrvs) $ do
1366-
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs
1364+
maxPending <- asks $ maxPendingSubscriptions . config
1365+
currPending <- newTVarIO 0
1366+
let userSrvs' = case activeUserId_ of
1367+
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
1368+
Nothing -> userSrvs
1369+
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs'
13671370
let (errs, oks) = partitionEithers rs
13681371
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
13691372
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
1373+
withStore' c unsetQueuesToSubscribe
13701374
resumeAllDelivery
13711375
resumeAllCommands c
13721376
where
1377+
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
13731378
subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int)
13741379
subscribeUserServer maxPending currPending (userId, srv) = do
13751380
atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry
13761381
tryAllErrors' $ do
13771382
qs <- withStore' c $ \db -> do
1378-
qs <- getUserServerRcvQueueSubs db userId srv
1383+
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded
13791384
atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
13801385
pure qs
13811386
let n = length qs
@@ -2097,7 +2102,7 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
20972102
-- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions.
20982103
(q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
20992104
let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
2100-
rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq'
2105+
rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' SMSubscribe
21012106
lift $ addNewQueueSubscription c rq'' tSess sessId
21022107
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
21032108
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ module Simplex.Messaging.Agent.Client
158158
unsafeWithStore,
159159
storeError,
160160
notifySub,
161+
notifySub',
161162
userServers,
162163
pickServer,
163164
getNextServer,

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
4141
createSndConn,
4242
getSubscriptionServers,
4343
getUserServerRcvQueueSubs,
44+
unsetQueuesToSubscribe,
4445
getConn,
4546
getDeletedConn,
4647
getConns,
@@ -392,15 +393,15 @@ createNewConn db gVar cData cMode = do
392393
fst <$$> createConn_ gVar cData (\connId -> createConnRecord db connId cData cMode)
393394

394395
-- TODO [certs rcv] store clientServiceId from NewRcvQueue
395-
updateNewConnRcv :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue)
396-
updateNewConnRcv db connId rq =
396+
updateNewConnRcv :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue)
397+
updateNewConnRcv db connId rq subMode =
397398
getConn db connId $>>= \case
398399
(SomeConn _ NewConnection {}) -> updateConn
399400
(SomeConn _ RcvConnection {}) -> updateConn -- to allow retries
400401
(SomeConn c _) -> pure . Left . SEBadConnType "updateNewConnRcv" $ connType c
401402
where
402403
updateConn :: IO (Either StoreError RcvQueue)
403-
updateConn = Right <$> addConnRcvQueue_ db connId rq
404+
updateConn = Right <$> addConnRcvQueue_ db connId rq subMode
404405

405406
updateNewConnSnd :: DB.Connection -> ConnId -> NewSndQueue -> IO (Either StoreError SndQueue)
406407
updateNewConnSnd db connId sq =
@@ -482,25 +483,25 @@ upgradeRcvConnToDuplex db connId sq =
482483
(SomeConn c _) -> pure . Left . SEBadConnType "upgradeRcvConnToDuplex" $ connType c
483484

484485
-- TODO [certs rcv] store clientServiceId from NewRcvQueue
485-
upgradeSndConnToDuplex :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue)
486-
upgradeSndConnToDuplex db connId rq =
486+
upgradeSndConnToDuplex :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue)
487+
upgradeSndConnToDuplex db connId rq subMode =
487488
getConn db connId >>= \case
488-
Right (SomeConn _ SndConnection {}) -> Right <$> addConnRcvQueue_ db connId rq
489+
Right (SomeConn _ SndConnection {}) -> Right <$> addConnRcvQueue_ db connId rq subMode
489490
Right (SomeConn c _) -> pure . Left . SEBadConnType "upgradeSndConnToDuplex" $ connType c
490491
_ -> pure $ Left SEConnNotFound
491492

492493
-- TODO [certs rcv] store clientServiceId from NewRcvQueue
493-
addConnRcvQueue :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue)
494-
addConnRcvQueue db connId rq =
494+
addConnRcvQueue :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue)
495+
addConnRcvQueue db connId rq subMode =
495496
getConn db connId >>= \case
496-
Right (SomeConn _ DuplexConnection {}) -> Right <$> addConnRcvQueue_ db connId rq
497+
Right (SomeConn _ DuplexConnection {}) -> Right <$> addConnRcvQueue_ db connId rq subMode
497498
Right (SomeConn c _) -> pure . Left . SEBadConnType "addConnRcvQueue" $ connType c
498499
_ -> pure $ Left SEConnNotFound
499500

500-
addConnRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> IO RcvQueue
501-
addConnRcvQueue_ db connId rq@RcvQueue {server} = do
501+
addConnRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO RcvQueue
502+
addConnRcvQueue_ db connId rq@RcvQueue {server} subMode = do
502503
serverKeyHash_ <- createServer_ db server
503-
insertRcvQueue_ db connId rq serverKeyHash_
504+
insertRcvQueue_ db connId rq subMode serverKeyHash_
504505

505506
addConnSndQueue :: DB.Connection -> ConnId -> NewSndQueue -> IO (Either StoreError SndQueue)
506507
addConnSndQueue db connId sq =
@@ -1983,8 +1984,8 @@ upsertNtfServer_ db ProtocolServer {host, port, keyHash} = do
19831984

19841985
-- * createRcvConn helpers
19851986

1986-
insertRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> Maybe C.KeyHash -> IO RcvQueue
1987-
insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do
1987+
insertRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> Maybe C.KeyHash -> IO RcvQueue
1988+
insertRcvQueue_ db connId' rq@RcvQueue {..} subMode serverKeyHash_ = do
19881989
-- to preserve ID if the queue already exists.
19891990
-- possibly, it can be done in one query.
19901991
currQId_ <- maybeFirstRow fromOnly $ DB.query db "SELECT rcv_queue_id FROM rcv_queues WHERE conn_id = ? AND host = ? AND port = ? AND snd_id = ?" (connId', host server, port server, sndId)
@@ -1994,19 +1995,20 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do
19941995
[sql|
19951996
INSERT INTO rcv_queues
19961997
( host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret,
1997-
snd_id, queue_mode, status, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version, server_key_hash,
1998+
snd_id, queue_mode, status, to_subscribe, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version, server_key_hash,
19981999
link_id, link_key, link_priv_sig_key, link_enc_fixed_data,
19992000
ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret
2000-
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
2001+
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
20012002
|]
20022003
( (host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
2003-
:. (sndId, queueMode, status, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
2004+
:. (sndId, queueMode, status, BI toSubscribe, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
20042005
:. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
20052006
:. ntfCredsFields
20062007
)
20072008
-- TODO [certs rcv] save client service
20082009
pure (rq :: NewRcvQueue) {connId = connId', dbQueueId = qId, clientService = Nothing}
20092010
where
2011+
toSubscribe = subMode == SMOnlyCreate
20102012
ntfCredsFields = case clientNtfCreds of
20112013
Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} ->
20122014
(Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret)
@@ -2053,27 +2055,37 @@ newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1)
20532055

20542056
-- * subscribe all connections
20552057

2056-
getSubscriptionServers :: DB.Connection -> IO [(UserId, SMPServer)]
2057-
getSubscriptionServers db =
2058-
map toUserServer
2059-
<$> DB.query_
2060-
db
2058+
getSubscriptionServers :: DB.Connection -> Bool -> IO [(UserId, SMPServer)]
2059+
getSubscriptionServers db onlyNeeded =
2060+
map toUserServer <$> DB.query_ db (select <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0")
2061+
where
2062+
select =
20612063
[sql|
20622064
SELECT DISTINCT c.user_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash)
20632065
FROM rcv_queues q
20642066
JOIN servers s ON q.host = s.host AND q.port = s.port
20652067
JOIN connections c ON q.conn_id = c.conn_id
2066-
WHERE c.deleted = 0 AND q.deleted = 0
20672068
|]
2068-
where
2069+
toSubscribe
2070+
| onlyNeeded = " WHERE q.to_subscribe = 1 AND "
2071+
| otherwise = " WHERE "
20692072
toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer)
20702073
toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash)
20712074

2072-
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
2073-
getUserServerRcvQueueSubs db userId srv =
2074-
map toRcvQueueSub <$> DB.query db (rcvQueueSubQuery <> condition) (userId, host srv, port srv)
2075+
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> IO [RcvQueueSub]
2076+
getUserServerRcvQueueSubs db userId srv onlyNeeded =
2077+
map toRcvQueueSub
2078+
<$> DB.query
2079+
db
2080+
(rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?")
2081+
(userId, host srv, port srv)
20752082
where
2076-
condition = " WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?"
2083+
toSubscribe
2084+
| onlyNeeded = " WHERE q.to_subscribe = 1 AND "
2085+
| otherwise = " WHERE "
2086+
2087+
unsetQueuesToSubscribe :: DB.Connection -> IO ()
2088+
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"
20772089

20782090
-- * getConn helpers
20792091

src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
88
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
99
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
1010
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
11+
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
1112
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
1213

1314
schemaMigrations :: [(String, Text, Maybe Text)]
1415
schemaMigrations =
1516
[ ("20241210_initial", m20241210_initial, Nothing),
1617
("20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies),
1718
("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
18-
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete)
19+
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
20+
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
1921
]
2022

2123
-- | The list of migrations in ascending order by date
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe where
4+
5+
import Data.Text (Text)
6+
import qualified Data.Text as T
7+
import Text.RawString.QQ (r)
8+
9+
m20251009_queue_to_subscribe :: Text
10+
m20251009_queue_to_subscribe =
11+
T.pack
12+
[r|
13+
ALTER TABLE rcv_queues ADD COLUMN to_subscribe SMALLINT NOT NULL DEFAULT 0;
14+
CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe);
15+
|]
16+
17+
down_m20251009_queue_to_subscribe :: Text
18+
down_m20251009_queue_to_subscribe =
19+
T.pack
20+
[r|
21+
DROP INDEX idx_rcv_queues_to_subscribe;
22+
ALTER TABLE rcv_queues DROP COLUMN to_subscribe;
23+
|]

src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20241224_ratchet_e2e_snd
4444
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250203_msg_bodies
4545
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links
4646
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
47+
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
4748
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
4849

4950
schemaMigrations :: [(String, Query, Maybe Query)]
@@ -87,7 +88,8 @@ schemaMigrations =
8788
("m20241224_ratchet_e2e_snd_params", m20241224_ratchet_e2e_snd_params, Just down_m20241224_ratchet_e2e_snd_params),
8889
("m20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies),
8990
("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
90-
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete)
91+
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
92+
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
9193
]
9294

9395
-- | The list of migrations in ascending order by date
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe where
4+
5+
import Database.SQLite.Simple (Query)
6+
import Database.SQLite.Simple.QQ (sql)
7+
8+
m20251009_queue_to_subscribe :: Query
9+
m20251009_queue_to_subscribe =
10+
[sql|
11+
ALTER TABLE rcv_queues ADD COLUMN to_subscribe INTEGER NOT NULL DEFAULT 0;
12+
CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe);
13+
|]
14+
15+
down_m20251009_queue_to_subscribe :: Query
16+
down_m20251009_queue_to_subscribe =
17+
[sql|
18+
DROP INDEX idx_rcv_queues_to_subscribe;
19+
ALTER TABLE rcv_queues DROP COLUMN to_subscribe;
20+
|]

src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ CREATE TABLE rcv_queues(
4747
ntf_private_key BLOB,
4848
ntf_id BLOB,
4949
rcv_ntf_dh_secret BLOB,
50+
to_subscribe INTEGER NOT NULL DEFAULT 0,
5051
rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL),
5152
rcv_primary INTEGER CHECK(rcv_primary NOT NULL),
5253
replace_rcv_queue_id INTEGER NULL,
@@ -437,6 +438,7 @@ CREATE TABLE inv_short_links(
437438
FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE
438439
);
439440
CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id);
441+
CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe);
440442
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id);
441443
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);
442444
CREATE INDEX idx_snd_message_deliveries ON snd_message_deliveries(

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2437,7 +2437,7 @@ testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = do
24372437
liftIO $ S.fromList (cs1 ++ cs2) `shouldBe` S.fromList cs
24382438
subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
24392439
subscribe c cs = do
2440-
subscribeAllConnections c
2440+
subscribeAllConnections c False Nothing
24412441
liftIO $ up c cs
24422442
delete :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
24432443
delete c cs = do
@@ -2469,7 +2469,7 @@ testBatchedPendingMessages nCreate nMsgs =
24692469
replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False
24702470
withB $ \b -> runRight_ $ do
24712471
let aIds = map fst conns
2472-
subscribeAllConnections b
2472+
subscribeAllConnections b False Nothing
24732473
("", "", UP _ aIds') <- nGet b
24742474
liftIO $ S.fromList aIds' `shouldBe` S.fromList aIds
24752475
replicateM_ nMsgs $ do

0 commit comments

Comments
 (0)