Skip to content

Commit 234aeb8

Browse files
agent: add subscription status to connection stats (#1658)
* agent: add subscription status to connection stats * wip * conn status * format * refactor * refactor * m * shorter * shorter --------- Co-authored-by: Evgeny <[email protected]>
1 parent 1dbc15b commit 234aeb8

File tree

5 files changed

+99
-39
lines changed

5 files changed

+99
-39
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 68 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ module Simplex.Messaging.Agent
137137
)
138138
where
139139

140+
import Control.Applicative ((<|>))
140141
import Control.Concurrent.STM (retry)
141142
import Control.Logger.Simple
142143
import Control.Monad
@@ -859,7 +860,7 @@ switchConnectionAsync' c corrId connId =
859860
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSwitchStarted
860861
enqueueCommand c corrId connId Nothing $ AClientCommand SWCH
861862
let rqs' = updatedQs rq1 rqs
862-
pure . connectionStats $ DuplexConnection cData rqs' sqs
863+
connectionStats c $ DuplexConnection cData rqs' sqs
863864
_ -> throwE $ CMD PROHIBITED "switchConnectionAsync: not duplex"
864865

865866
newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
@@ -1704,7 +1705,8 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
17041705
rq1' <- withStore' c $ \db -> setRcvSwitchStatus db rq1 $ Just RSSendingQUSE
17051706
let rqs' = updatedQs rq1' rqs
17061707
conn' = DuplexConnection cData rqs' sqs
1707-
notify . SWITCH QDRcv SPSecured $ connectionStats conn'
1708+
cStats <- connectionStats c conn'
1709+
notify $ SWITCH QDRcv SPSecured cStats
17081710
_ -> internalErr "ICQSecure: no switching queue found"
17091711
_ -> internalErr "ICQSecure: queue address not found in connection"
17101712
ICQDelete rId -> do
@@ -1727,7 +1729,8 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
17271729
ns <- asks ntfSupervisor
17281730
liftIO $ sendNtfSubCommand ns (NSCCreate, [connId])
17291731
let conn' = DuplexConnection cData (rq'' :| rqs') sqs
1730-
notify $ SWITCH QDRcv SPCompleted $ connectionStats conn'
1732+
cStats <- connectionStats c conn'
1733+
notify $ SWITCH QDRcv SPCompleted cStats
17311734
_ -> internalErr "ICQDelete: cannot delete the only queue in connection"
17321735
where
17331736
ack srv rId srvMsgId = do
@@ -2016,7 +2019,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
20162019
AM_QADD_ -> pure ()
20172020
AM_QKEY_ -> do
20182021
SomeConn _ conn <- withStore c (`getConn` connId)
2019-
notify . SWITCH QDSnd SPConfirmed $ connectionStats conn
2022+
cStats <- connectionStats c conn
2023+
notify $ SWITCH QDSnd SPConfirmed cStats
20202024
AM_QUSE_ -> pure ()
20212025
AM_QTEST_ -> withConnLock c connId "runSmpQueueMsgDelivery AM_QTEST_" $ do
20222026
withStore' c $ \db -> setSndQueueStatus db sq Active
@@ -2041,7 +2045,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
20412045
deleteConnSndQueue db connId sq'
20422046
let sqs'' = sq'' :| sqs'
20432047
conn' = DuplexConnection cData' rqs sqs''
2044-
notify . SWITCH QDSnd SPCompleted $ connectionStats conn'
2048+
cStats <- connectionStats c conn'
2049+
notify $ SWITCH QDSnd SPCompleted cStats
20452050
_ -> internalErr msgId "sent QTEST: there is only one queue in connection"
20462051
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
20472052
_ -> internalErr msgId "QTEST sent not in duplex connection"
@@ -2152,7 +2157,7 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
21522157
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
21532158
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD
21542159
let rqs' = updatedQs rq1 rqs <> [rq'']
2155-
pure . connectionStats $ DuplexConnection cData rqs' sqs
2160+
connectionStats c $ DuplexConnection cData rqs' sqs
21562161

21572162
abortConnectionSwitch' :: AgentClient -> ConnId -> AM ConnectionStats
21582163
abortConnectionSwitch' c connId =
@@ -2172,7 +2177,7 @@ abortConnectionSwitch' c connId =
21722177
forM_ delRqs $ \RcvQueue {server, rcvId} -> enqueueCommand c "" connId (Just server) $ AInternalCommand $ ICDeleteRcvQueue rcvId
21732178
let rqs'' = updatedQs rq' rqs'
21742179
conn' = DuplexConnection cData rqs'' sqs
2175-
pure $ connectionStats conn'
2180+
connectionStats c conn'
21762181
_ -> throwE $ INTERNAL "won't delete all rcv queues in connection"
21772182
| otherwise -> throwE $ CMD PROHIBITED "abortConnectionSwitch: no rcv queues left"
21782183
_ -> throwE $ CMD PROHIBITED "abortConnectionSwitch: not allowed"
@@ -2195,7 +2200,7 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni
21952200
setRatchetX3dhKeys db connId pk1 pk2 pKem
21962201
let cData'' = cData' {ratchetSyncState = RSStarted} :: ConnData
21972202
conn' = DuplexConnection cData'' rqs sqs
2198-
pure $ connectionStats conn'
2203+
connectionStats c conn'
21992204
| otherwise -> throwE $ CMD PROHIBITED "synchronizeRatchet: not allowed"
22002205
_ -> throwE $ CMD PROHIBITED "synchronizeRatchet: not duplex"
22012206

@@ -2363,34 +2368,62 @@ deleteConnections_ getConnections ntf waitDelivery c nm connIds = do
23632368
getConnectionServers' :: AgentClient -> ConnId -> AM ConnectionStats
23642369
getConnectionServers' c connId = do
23652370
SomeConn _ conn <- withStore c (`getConn` connId)
2366-
pure $ connectionStats conn
2371+
connectionStats c conn
23672372

23682373
getConnectionRatchetAdHash' :: AgentClient -> ConnId -> AM ByteString
23692374
getConnectionRatchetAdHash' c connId = do
23702375
CR.Ratchet {rcAD = Str rcAD} <- withStore c (`getRatchet` connId)
23712376
pure $ C.sha256Hash rcAD
23722377

2373-
connectionStats :: Connection c -> ConnectionStats
2374-
connectionStats = \case
2375-
RcvConnection cData rq ->
2376-
(stats cData) {rcvQueuesInfo = [rcvQueueInfo rq]}
2377-
SndConnection cData sq ->
2378-
(stats cData) {sndQueuesInfo = [sndQueueInfo sq]}
2379-
DuplexConnection cData rqs sqs ->
2380-
(stats cData) {rcvQueuesInfo = map rcvQueueInfo $ L.toList rqs, sndQueuesInfo = map sndQueueInfo $ L.toList sqs}
2381-
ContactConnection cData rq ->
2382-
(stats cData) {rcvQueuesInfo = [rcvQueueInfo rq]}
2378+
connectionStats :: AgentClient -> Connection c -> AM ConnectionStats
2379+
connectionStats c = \case
2380+
RcvConnection cData rq -> do
2381+
rcvQueuesInfo <- (: []) <$> rcvQueueInfo rq
2382+
pure (stats cData) {rcvQueuesInfo, subStatus = connSubStatus rcvQueuesInfo}
2383+
SndConnection cData sq -> do
2384+
pure (stats cData) {sndQueuesInfo = [sndQueueInfo sq]}
2385+
DuplexConnection cData rqs sqs -> do
2386+
rcvQueuesInfo <- mapM rcvQueueInfo (L.toList rqs)
2387+
pure
2388+
(stats cData)
2389+
{ rcvQueuesInfo,
2390+
sndQueuesInfo = map sndQueueInfo $ L.toList sqs,
2391+
subStatus = connSubStatus rcvQueuesInfo
2392+
}
2393+
ContactConnection cData rq -> do
2394+
rcvQueuesInfo <- (: []) <$> rcvQueueInfo rq
2395+
pure (stats cData) {rcvQueuesInfo, subStatus = connSubStatus rcvQueuesInfo}
23832396
NewConnection cData ->
2384-
stats cData
2397+
pure $ stats cData
23852398
where
2399+
stats :: ConnData -> ConnectionStats
23862400
stats ConnData {connAgentVersion, ratchetSyncState} =
23872401
ConnectionStats
23882402
{ connAgentVersion,
23892403
rcvQueuesInfo = [],
23902404
sndQueuesInfo = [],
23912405
ratchetSyncState,
2392-
ratchetSyncSupported = connAgentVersion >= ratchetSyncSMPAgentVersion
2406+
ratchetSyncSupported = connAgentVersion >= ratchetSyncSMPAgentVersion,
2407+
subStatus = Nothing
23932408
}
2409+
rcvQueueInfo :: RcvQueue -> AM RcvQueueInfo
2410+
rcvQueueInfo rq@RcvQueue {server, status, rcvSwchStatus} = do
2411+
subStatus <- atomically checkQueueSubStatus
2412+
pure $ RcvQueueInfo {rcvServer = server, status, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq, subStatus}
2413+
where
2414+
checkQueueSubStatus :: STM SubscriptionStatus
2415+
checkQueueSubStatus =
2416+
ifM (hasActiveSubscription c rq) (pure SSActive) $
2417+
ifM (hasPendingSubscription c rq) (pure SSPending) $
2418+
maybe SSNoSub (SSRemoved . show) <$> hasRemovedSubscription c rq
2419+
sndQueueInfo :: SndQueue -> SndQueueInfo
2420+
sndQueueInfo SndQueue {server, status, sndSwchStatus} =
2421+
SndQueueInfo {sndServer = server, status, sndSwitchStatus = sndSwchStatus}
2422+
connSubStatus :: [RcvQueueInfo] -> Maybe SubscriptionStatus
2423+
connSubStatus rqs =
2424+
let isActive RcvQueueInfo {status} = status == Active
2425+
subStatus' RcvQueueInfo {subStatus} = subStatus
2426+
in minimum . L.map subStatus' <$> (L.nonEmpty (filter isActive rqs) <|> L.nonEmpty rqs)
23942427

23952428
-- | Change servers to be used for creating new queues.
23962429
-- This function will set all servers as enabled in case all passed servers are disabled.
@@ -2903,7 +2936,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
29032936
| rss `notElem` ([RSOk, RSStarted] :: [RatchetSyncState]) = do
29042937
let cData'' = (toConnData conn') {ratchetSyncState = RSOk} :: ConnData
29052938
conn'' = updateConnection cData'' conn'
2906-
notify . RSYNC RSOk Nothing $ connectionStats conn''
2939+
cStats <- connectionStats c conn''
2940+
notify $ RSYNC RSOk Nothing cStats
29072941
withStore' c $ \db -> setConnRatchetSync db connId RSOk
29082942
pure conn''
29092943
| otherwise = pure conn'
@@ -2933,7 +2967,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
29332967
when (rss `elem` ([RSOk, RSAllowed, RSRequired] :: [RatchetSyncState])) $ do
29342968
let cData'' = (toConnData conn') {ratchetSyncState = rss'} :: ConnData
29352969
conn'' = updateConnection cData'' connDuplex
2936-
notify . RSYNC rss' (Just e) $ connectionStats conn''
2970+
cStats <- connectionStats c conn''
2971+
notify $ RSYNC rss' (Just e) cStats
29372972
withStore' c $ \db -> setConnRatchetSync db connId rss'
29382973
Left e -> do
29392974
atomically $ incSMPServerStat c userId srv recvErrs
@@ -3188,7 +3223,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
31883223
sq1 <- withStore' c $ \db -> setSndSwitchStatus db sq $ Just SSSendingQKEY
31893224
let sqs'' = updatedQs sq1 sqs' <> [sq2]
31903225
conn' = DuplexConnection cData' rqs sqs''
3191-
notify . SWITCH QDSnd SPStarted $ connectionStats conn'
3226+
cStats <- connectionStats c conn'
3227+
notify $ SWITCH QDSnd SPStarted cStats
31923228
_ -> qError "QADD: won't delete all snd queues in connection"
31933229
_ -> qError "QADD: replaced queue address is not found in connection"
31943230
_ -> throwE $ AGENT A_VERSION
@@ -3207,7 +3243,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
32073243
let dhSecret = C.dh' dhPublicKey dhPrivKey
32083244
withStore' c $ \db -> setRcvQueueConfirmedE2E db rq' dhSecret $ min cVer cVer'
32093245
enqueueCommand c "" connId (Just smpServer) $ AInternalCommand $ ICQSecure rcvId senderKey
3210-
notify . SWITCH QDRcv SPConfirmed $ connectionStats conn'
3246+
cStats <- connectionStats c conn'
3247+
notify $ SWITCH QDRcv SPConfirmed cStats
32113248
| otherwise -> qError "QKEY: queue already secured"
32123249
_ -> qError "QKEY: queue address not found in connection"
32133250
where
@@ -3232,7 +3269,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
32323269
sq1' <- withStore' c $ \db -> setSndSwitchStatus db sq1 $ Just SSSendingQTEST
32333270
let sqs' = updatedQs sq1' sqs
32343271
conn' = DuplexConnection cData' rqs sqs'
3235-
notify . SWITCH QDSnd SPSecured $ connectionStats conn'
3272+
cStats <- connectionStats c conn'
3273+
notify $ SWITCH QDSnd SPSecured cStats
32363274
_ -> qError "QUSE: switching SndQueue not found in connection"
32373275
_ -> qError "QUSE: switched queue address not found in connection"
32383276

@@ -3308,12 +3346,14 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
33083346
notifyRatchetSyncError = do
33093347
let cData'' = cData' {ratchetSyncState = RSRequired} :: ConnData
33103348
conn'' = updateConnection cData'' conn'
3311-
notify $ RSYNC RSRequired (Just RATCHET_SYNC) (connectionStats conn'')
3349+
cStats <- connectionStats c conn''
3350+
notify $ RSYNC RSRequired (Just RATCHET_SYNC) cStats
33123351
notifyAgreed :: AM ()
33133352
notifyAgreed = do
33143353
let cData'' = cData' {ratchetSyncState = RSAgreed} :: ConnData
33153354
conn'' = updateConnection cData'' conn'
3316-
notify . RSYNC RSAgreed Nothing $ connectionStats conn''
3355+
cStats <- connectionStats c conn''
3356+
notify $ RSYNC RSAgreed Nothing cStats
33173357
recreateRatchet :: CR.Ratchet 'C.X448 -> AM ()
33183358
recreateRatchet rc = withStore' c $ \db -> do
33193359
setConnRatchetSync db connId RSAgreed

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ module Simplex.Messaging.Agent.Client
101101
removeSubscription,
102102
removeSubscriptions,
103103
hasActiveSubscription,
104+
hasPendingSubscription,
105+
hasRemovedSubscription,
104106
hasGetLock,
105107
releaseGetLock,
106108
activeClientSession,
@@ -1688,6 +1690,16 @@ hasActiveSubscription c rq = do
16881690
SS.hasActiveSub tSess (queueId rq) $ currentSubs c
16891691
{-# INLINE hasActiveSubscription #-}
16901692

1693+
hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
1694+
hasPendingSubscription c rq = do
1695+
tSess <- mkSMPTransportSession c rq
1696+
SS.hasPendingSub tSess (queueId rq) $ currentSubs c
1697+
{-# INLINE hasPendingSubscription #-}
1698+
1699+
hasRemovedSubscription :: SomeRcvQueue q => AgentClient -> q -> STM (Maybe SMPClientError)
1700+
hasRemovedSubscription c rq = do
1701+
TM.lookup (qUserId rq, qServer rq) (removedSubs c) $>>= TM.lookup (queueId rq)
1702+
16911703
removeSubscription :: SomeRcvQueue q => AgentClient -> SMPTransportSession -> ConnId -> q -> STM ()
16921704
removeSubscription c tSess connId rq = do
16931705
modifyTVar' (subscrConns c) $ S.delete connId

src/Simplex/Messaging/Agent/Protocol.hs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ module Simplex.Messaging.Agent.Protocol
7070
MsgMeta (..),
7171
RcvQueueInfo (..),
7272
SndQueueInfo (..),
73+
SubscriptionStatus (..),
7374
ConnectionStats (..),
7475
SwitchPhase (..),
7576
RcvSwitchStatus (..),
@@ -643,23 +644,34 @@ instance FromJSON RatchetSyncState where
643644

644645
data RcvQueueInfo = RcvQueueInfo
645646
{ rcvServer :: SMPServer,
647+
status :: QueueStatus,
646648
rcvSwitchStatus :: Maybe RcvSwitchStatus,
647-
canAbortSwitch :: Bool
649+
canAbortSwitch :: Bool,
650+
subStatus :: SubscriptionStatus
648651
}
649652
deriving (Eq, Show)
650653

651654
data SndQueueInfo = SndQueueInfo
652655
{ sndServer :: SMPServer,
656+
status :: QueueStatus,
653657
sndSwitchStatus :: Maybe SndSwitchStatus
654658
}
655659
deriving (Eq, Show)
656660

661+
data SubscriptionStatus
662+
= SSActive
663+
| SSPending
664+
| SSRemoved {subError :: String}
665+
| SSNoSub
666+
deriving (Eq, Ord, Show)
667+
657668
data ConnectionStats = ConnectionStats
658669
{ connAgentVersion :: VersionSMPA,
659670
rcvQueuesInfo :: [RcvQueueInfo],
660671
sndQueuesInfo :: [SndQueueInfo],
661672
ratchetSyncState :: RatchetSyncState,
662-
ratchetSyncSupported :: Bool
673+
ratchetSyncSupported :: Bool,
674+
subStatus :: Maybe SubscriptionStatus
663675
}
664676
deriving (Eq, Show)
665677

@@ -2000,6 +2012,10 @@ serializeCommand = \case
20002012
serializeBinary :: ByteString -> ByteString
20012013
serializeBinary body = bshow (B.length body) <> "\n" <> body
20022014

2015+
$(J.deriveJSON (enumJSON fstToLower) ''QueueStatus)
2016+
2017+
$(J.deriveJSON (sumTypeJSON $ dropPrefix "SS") ''SubscriptionStatus)
2018+
20032019
$(J.deriveJSON defaultJSON ''RcvQueueInfo)
20042020

20052021
$(J.deriveJSON defaultJSON ''SndQueueInfo)

src/Simplex/Messaging/Agent/Store.hs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,6 @@ clientServiceId :: RcvQueue -> Maybe ClientServiceId
135135
clientServiceId = fmap dbServiceId . clientService
136136
{-# INLINE clientServiceId #-}
137137

138-
rcvQueueInfo :: RcvQueue -> RcvQueueInfo
139-
rcvQueueInfo rq@RcvQueue {server, rcvSwchStatus} =
140-
RcvQueueInfo {rcvServer = server, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq}
141-
142138
rcvSMPQueueAddress :: RcvQueue -> SMPQueueAddress
143139
rcvSMPQueueAddress RcvQueue {server, sndId, e2ePrivKey, queueMode} =
144140
SMPQueueAddress server sndId (C.publicKey e2ePrivKey) queueMode
@@ -211,10 +207,6 @@ data StoredSndQueue (q :: DBStored) = SndQueue
211207
}
212208
deriving (Show)
213209

214-
sndQueueInfo :: SndQueue -> SndQueueInfo
215-
sndQueueInfo SndQueue {server, sndSwchStatus} =
216-
SndQueueInfo {sndServer = server, sndSwitchStatus = sndSwchStatus}
217-
218210
instance SMPQueue RcvQueue where
219211
qServer RcvQueue {server} = server
220212
{-# INLINE qServer #-}

src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ schemaMigrations =
8989
("m20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies),
9090
("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
9191
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
92-
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
92+
("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
9393
]
9494

9595
-- | The list of migrations in ascending order by date

0 commit comments

Comments
 (0)