Skip to content

Commit 9cda203

Browse files
authored
agent: subscribe all connections (#1655)
* agent: subscribe all connections * query, version * BoolInt * add query to errors * Revert "add query to errors" This reverts commit 32a1f7f. * fix optional field * version * limit number of in-flight subscriptions to 35000
1 parent 318ddf6 commit 9cda203

File tree

9 files changed

+273
-121
lines changed

9 files changed

+273
-121
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 112 additions & 47 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ module Simplex.Messaging.Agent.Client
4848
newRcvQueue,
4949
newRcvQueue_,
5050
subscribeQueues,
51+
subscribeUserServerQueues,
5152
getQueueMessage,
5253
decryptSMPMessage,
5354
failSubscription,
@@ -427,7 +428,7 @@ getAgentWorker' toW fromW name hasWork c@AgentClient {agentEnv} key ws work = do
427428
newWorker :: AgentClient -> STM Worker
428429
newWorker c = do
429430
workerId <- stateTVar (workerSeq c) $ \next -> (next, next + 1)
430-
doWork <- newTMVar ()
431+
doWork <- newTMVar () -- new worker is created with "some work to do" (indicated by () in TMVar)
431432
action <- newTMVar Nothing
432433
restarts <- newTVar $ RestartCount 0 0
433434
pure Worker {workerId, doWork, action, restarts}
@@ -733,7 +734,7 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
733734
mode <- getSessionModeIO c
734735
let resubscribe
735736
| (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess
736-
| otherwise = void $ subscribeQueues c qs True
737+
| otherwise = void $ subscribeQueues c True qs
737738
runReaderT resubscribe env
738739

739740
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
@@ -1401,6 +1402,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
14011402
shortLink,
14021403
clientService = ClientService DBNewEntity <$> serviceId,
14031404
status = New,
1405+
enableNtfs,
14041406
dbQueueId = DBNewEntity,
14051407
primary = True,
14061408
dbReplaceQueueId = Nothing,
@@ -1509,30 +1511,51 @@ serverHostError = \case
15091511
_ -> False
15101512

15111513
-- | Batch by transport session and subscribe queues. The list of results can have a different order.
1512-
subscribeQueues :: AgentClient -> [RcvQueueSub] -> Bool -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
1513-
subscribeQueues c qs withEvents = do
1514+
subscribeQueues :: AgentClient -> Bool -> [RcvQueueSub] -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
1515+
subscribeQueues c withEvents qs = do
15141516
(errs, qs') <- checkQueues c qs
15151517
atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
15161518
qss <- batchQueues mkSMPTSession qs' <$> getSessionModeIO c
1517-
mapM_ addPendingSubs qss
1518-
rs <- mapConcurrently subscribeQueues_ qss
1519+
mapM_ (addPendingSubs c) qss
1520+
rs <- mapConcurrently (subscribeQueues_ c withEvents) qss
15191521
when withEvents $ forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
15201522
pure $ map (second Left) errs <> concatMap L.toList rs
1523+
1524+
addPendingSubs :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' ()
1525+
addPendingSubs c (tSess, qs') = atomically $ SS.batchAddPendingSubs tSess (L.toList qs') $ currentSubs c
1526+
1527+
subscribeQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId))
1528+
subscribeQueues_ c withEvents qs'@(tSess@(_, srv, _), _) = do
1529+
(rs, active) <- subscribeSessQueues_ c withEvents qs'
1530+
if active
1531+
then when (hasTempErrors rs) resubscribe $> rs
1532+
else do
1533+
logWarn "subcription batch result for replaced SMP client, resubscribing"
1534+
-- we use BROKER NETWORK error here instead of the original error, so it becomes temporary.
1535+
resubscribe $> L.map (second $ Left . toNESubscribeError) rs
15211536
where
1522-
addPendingSubs (tSess, qs') = atomically $ SS.batchAddPendingSubs tSess (L.toList qs') $ currentSubs c
1523-
subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do
1524-
(rs, active) <- subscribeSessQueues_ c qs' withEvents
1525-
if active
1526-
then when (hasTempErrors rs) resubscribe $> rs
1527-
else do
1528-
logWarn "subcription batch result for replaced SMP client, resubscribing"
1529-
-- we use BROKER NETWORK error here instead of the original error, so it becomes temporary.
1530-
resubscribe $> L.map (second $ Left . toNESubscribeError) rs
1531-
where
1532-
-- treating host errors as temporary here as well
1533-
hasTempErrors = any (either temporaryOrHostError (const False) . snd)
1534-
toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show
1535-
resubscribe = resubscribeSMPSession c tSess
1537+
-- treating host errors as temporary here as well
1538+
hasTempErrors = any (either temporaryOrHostError (const False) . snd)
1539+
toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show
1540+
resubscribe = resubscribeSMPSession c tSess
1541+
1542+
subscribeUserServerQueues :: AgentClient -> UserId -> SMPServer -> [RcvQueueSub] -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
1543+
subscribeUserServerQueues c userId srv qs = do
1544+
mode <- getSessionModeIO c
1545+
if mode == TSMEntity
1546+
then subscribeQueues c True qs
1547+
else do
1548+
let tSess = (userId, srv, Nothing)
1549+
(errs, qs_) <- checkQueues c qs
1550+
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
1551+
let errs' = map (second Left) errs
1552+
case L.nonEmpty qs_ of
1553+
Just qs' -> do
1554+
atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId $ L.toList qs'))
1555+
addPendingSubs c (tSess, qs')
1556+
rs <- subscribeQueues_ c True (tSess, qs')
1557+
pure $ errs' <> L.toList rs
1558+
Nothing -> pure errs'
15361559

15371560
-- only "checked" queues are subscribed
15381561
checkQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, AgentErrorType)], [RcvQueueSub])
@@ -1547,14 +1570,14 @@ checkQueues c = fmap partitionEithers . mapM checkQueue
15471570
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
15481571
resubscribeSessQueues c tSess qs = do
15491572
(errs, qs_) <- checkQueues c qs
1550-
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True
1573+
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c True (tSess, qs')
15511574
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
15521575

1553-
subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
1554-
subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs
1576+
subscribeSessQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
1577+
subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c NRMBackground qs
15551578
where
1556-
subscribeQueues_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool)
1557-
subscribeQueues_ smp qs' = do
1579+
subscribe_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool)
1580+
subscribe_ smp qs' = do
15581581
let (userId, srv, _) = tSess
15591582
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
15601583
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
@@ -1646,12 +1669,6 @@ getRemovedSubs AgentClient {removedSubs} k = TM.lookup k removedSubs >>= maybe n
16461669
TM.insert k s removedSubs
16471670
pure s
16481671

1649-
addPendingSubscription :: AgentClient -> RcvQueueSub -> STM ()
1650-
addPendingSubscription c rq = do
1651-
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
1652-
tSess <- mkSMPTransportSession c rq
1653-
SS.addPendingSub tSess rq $ currentSubs c
1654-
16551672
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' ()
16561673
addNewQueueSubscription c rq' tSess sessId = do
16571674
let rq = rcvQueueSub rq'

src/Simplex/Messaging/Agent/Env/SQLite.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ data AgentConfig = AgentConfig
166166
ntfBatchSize :: Int,
167167
ntfSubFirstCheckInterval :: NominalDiffTime,
168168
ntfSubCheckInterval :: NominalDiffTime,
169+
maxPendingSubscriptions :: Int,
169170
caCertificateFile :: FilePath,
170171
privateKeyFile :: FilePath,
171172
certificateFile :: FilePath,
@@ -237,6 +238,7 @@ defaultAgentConfig =
237238
ntfBatchSize = 150,
238239
ntfSubFirstCheckInterval = nominalDay,
239240
ntfSubCheckInterval = 3 * nominalDay,
241+
maxPendingSubscriptions = 35000,
240242
-- CA certificate private key is not needed for initialization
241243
-- ! we do not generate these
242244
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",

src/Simplex/Messaging/Agent/Store.hs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ data StoredRcvQueue (q :: DBStored) = RcvQueue
8888
clientService :: Maybe (StoredClientService q),
8989
-- | queue status
9090
status :: QueueStatus,
91+
-- | to enable notifications for this queue - this field is duplicated from ConnData
92+
enableNtfs :: Bool,
9193
-- | database queue ID (within connection)
9294
dbQueueId :: DBEntityId' q,
9395
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
@@ -110,15 +112,16 @@ data RcvQueueSub = RcvQueueSub
110112
rcvId :: SMP.RecipientId,
111113
rcvPrivateKey :: RcvPrivateAuthKey,
112114
status :: QueueStatus,
115+
enableNtfs :: Bool,
113116
dbQueueId :: Int64,
114117
primary :: Bool,
115118
dbReplaceQueueId :: Maybe Int64
116119
}
117120
deriving (Show)
118121

119122
rcvQueueSub :: RcvQueue -> RcvQueueSub
120-
rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} =
121-
RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, dbQueueId, primary, dbReplaceQueueId}
123+
rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} =
124+
RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId}
122125

123126
data ShortLinkCreds = ShortLinkCreds
124127
{ shortLinkId :: SMP.LinkId,

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ module Simplex.Messaging.Agent.Store.AgentStore
3939
updateNewConnRcv,
4040
updateNewConnSnd,
4141
createSndConn,
42+
getSubscriptionServers,
43+
getUserServerRcvQueueSubs,
4244
getConn,
4345
getDeletedConn,
4446
getConns,
@@ -110,6 +112,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
110112
updateSndMsgRcpt,
111113
getPendingQueueMsg,
112114
getConnectionsForDelivery,
115+
getAllSndQueuesForDelivery,
113116
updatePendingMsgRIState,
114117
deletePendingMsgs,
115118
getExpiredSndMessages,
@@ -137,6 +140,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
137140
-- Async commands
138141
createCommand,
139142
getPendingCommandServers,
143+
getAllPendingCommandConns,
140144
getPendingServerCommand,
141145
updateCommandServer,
142146
deleteCommand,
@@ -884,8 +888,8 @@ createSndMsg db connId sndMsgData@SndMsgData {internalSndId, internalHash} = do
884888
insertSndMsgDetails_ db connId sndMsgData
885889
updateSndMsgHash db connId internalSndId internalHash
886890

887-
createSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> IO ()
888-
createSndMsgDelivery db connId SndQueue {dbQueueId} msgId =
891+
createSndMsgDelivery :: DB.Connection -> SndQueue -> InternalId -> IO ()
892+
createSndMsgDelivery db SndQueue {connId, dbQueueId} msgId =
889893
DB.execute db "INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?)" (connId, dbQueueId, msgId)
890894

891895
getSndMsgViaRcpt :: DB.Connection -> ConnId -> InternalSndId -> IO (Either StoreError SndMsg)
@@ -917,6 +921,15 @@ getConnectionsForDelivery :: DB.Connection -> IO [ConnId]
917921
getConnectionsForDelivery db =
918922
map fromOnly <$> DB.query_ db "SELECT DISTINCT conn_id FROM snd_message_deliveries WHERE failed = 0"
919923

924+
getAllSndQueuesForDelivery :: DB.Connection -> IO [SndQueue]
925+
getAllSndQueuesForDelivery db = map toSndQueue <$> DB.query_ db (sndQueueQuery <> " " <> delivery)
926+
where
927+
delivery = [sql|
928+
JOIN (SELECT DISTINCT conn_id, snd_queue_id FROM snd_message_deliveries WHERE failed = 0) d
929+
ON d.conn_id = q.conn_id AND d.snd_queue_id = q.snd_queue_id
930+
WHERE c.deleted = 0
931+
|]
932+
920933
getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData)))
921934
getPendingQueueMsg db connId SndQueue {dbQueueId} =
922935
getWorkItem "message" getMsgId getMsgData markMsgFailed
@@ -1319,6 +1332,21 @@ getPendingCommandServers db connIds =
13191332
smpServer (host, port, keyHash) = SMPServer <$> host <*> port <*> keyHash
13201333
conns = S.fromList connIds
13211334

1335+
getAllPendingCommandConns :: DB.Connection -> IO [(ConnId, Maybe SMPServer)]
1336+
getAllPendingCommandConns db =
1337+
map toResult
1338+
<$> DB.query_
1339+
db
1340+
[sql|
1341+
SELECT DISTINCT c.conn_id, c.host, c.port, COALESCE(c.server_key_hash, s.key_hash)
1342+
FROM commands c
1343+
JOIN connections cs ON c.conn_id = cs.conn_id
1344+
LEFT JOIN servers s ON s.host = c.host AND s.port = c.port
1345+
WHERE cs.deleted = 0
1346+
|]
1347+
where
1348+
toResult (connId, host, port, keyHash) = (connId, SMPServer <$> host <*> port <*> keyHash)
1349+
13221350
getPendingServerCommand :: DB.Connection -> ConnId -> Maybe SMPServer -> IO (Either StoreError (Maybe PendingCommand))
13231351
getPendingServerCommand db connId srv_ = getWorkItem "command" getCmdId getCommand markCommandFailed
13241352
where
@@ -2023,6 +2051,30 @@ newQueueId_ :: [Only Int64] -> DBEntityId
20232051
newQueueId_ [] = DBEntityId 1
20242052
newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1)
20252053

2054+
-- * subscribe all connections
2055+
2056+
getSubscriptionServers :: DB.Connection -> IO [(UserId, SMPServer)]
2057+
getSubscriptionServers db =
2058+
map toUserServer
2059+
<$> DB.query_
2060+
db
2061+
[sql|
2062+
SELECT DISTINCT c.user_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash)
2063+
FROM rcv_queues q
2064+
JOIN servers s ON q.host = s.host AND q.port = s.port
2065+
JOIN connections c ON q.conn_id = c.conn_id
2066+
WHERE c.deleted = 0 AND q.deleted = 0
2067+
|]
2068+
where
2069+
toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer)
2070+
toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash)
2071+
2072+
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
2073+
getUserServerRcvQueueSubs db userId srv =
2074+
map toRcvQueueSub <$> DB.query db (rcvQueueSubQuery <> condition) (userId, host srv, port srv)
2075+
where
2076+
condition = " WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?"
2077+
20262078
-- * getConn helpers
20272079

20282080
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
@@ -2229,7 +2281,7 @@ rcvQueueQuery :: Query
22292281
rcvQueueQuery =
22302282
[sql|
22312283
SELECT c.user_id, COALESCE(q.server_key_hash, s.key_hash), q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
2232-
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status,
2284+
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status, c.enable_ntfs,
22332285
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.switch_status, q.smp_client_version, q.delete_errors,
22342286
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret,
22352287
q.link_id, q.link_key, q.link_priv_sig_key, q.link_enc_fixed_data
@@ -2240,13 +2292,13 @@ rcvQueueQuery =
22402292

22412293
toRcvQueue ::
22422294
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
2243-
:. (QueueStatus, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int)
2295+
:. (QueueStatus, Maybe BoolInt, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int)
22442296
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
22452297
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
22462298
RcvQueue
22472299
toRcvQueue
22482300
( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode)
2249-
:. (status, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors)
2301+
:. (status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors)
22502302
:. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)
22512303
:. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_)
22522304
) =
@@ -2258,8 +2310,9 @@ toRcvQueue
22582310
shortLink = case (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_) of
22592311
(Just shortLinkId, Just shortLinkKey, Just linkPrivSigKey, Just linkEncFixedData) -> Just ShortLinkCreds {shortLinkId, shortLinkKey, linkPrivSigKey, linkEncFixedData}
22602312
_ -> Nothing
2313+
enableNtfs = maybe True unBI enableNtfs_
22612314
-- TODO [certs rcv] read client service
2262-
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors}
2315+
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors}
22632316

22642317
-- | returns all connection queue credentials, the first queue is the primary one
22652318
getRcvQueueSubsByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueueSub))
@@ -2270,16 +2323,17 @@ getRcvQueueSubsByConnId_ db connId =
22702323
rcvQueueSubQuery :: Query
22712324
rcvQueueSubQuery =
22722325
[sql|
2273-
SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status,
2326+
SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status, c.enable_ntfs,
22742327
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id
22752328
FROM rcv_queues q
22762329
JOIN servers s ON q.host = s.host AND q.port = s.port
22772330
JOIN connections c ON q.conn_id = c.conn_id
22782331
|]
22792332

2280-
toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey, QueueStatus, Int64, BoolInt, Maybe Int64) -> RcvQueueSub
2281-
toRcvQueueSub (userId, connId, host, port, keyHash, rcvId, rcvPrivateKey, status, dbQueueId, BI primary, dbReplaceQueueId) =
2282-
RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, dbQueueId, primary, dbReplaceQueueId, rcvPrivateKey, status}
2333+
toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey, QueueStatus, Maybe BoolInt, Int64, BoolInt, Maybe Int64) -> RcvQueueSub
2334+
toRcvQueueSub (userId, connId, host, port, keyHash, rcvId, rcvPrivateKey, status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId) =
2335+
let enableNtfs = maybe True unBI enableNtfs_
2336+
in RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId}
22832337

22842338
getRcvQueueById :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue)
22852339
getRcvQueueById db connId dbRcvId =

0 commit comments

Comments
 (0)