Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ switchConnectionAsync' c corrId connId =
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSwitchStarted
enqueueCommand c corrId connId Nothing $ AClientCommand SWCH
let rqs' = updatedQs rq1 rqs
pure . connectionStats $ DuplexConnection cData rqs' sqs
connectionStats c (DuplexConnection cData rqs' sqs)
_ -> throwE $ CMD PROHIBITED "switchConnectionAsync: not duplex"

newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
Expand Down Expand Up @@ -1704,7 +1704,8 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
rq1' <- withStore' c $ \db -> setRcvSwitchStatus db rq1 $ Just RSSendingQUSE
let rqs' = updatedQs rq1' rqs
conn' = DuplexConnection cData rqs' sqs
notify . SWITCH QDRcv SPSecured $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDRcv SPSecured cStats
_ -> internalErr "ICQSecure: no switching queue found"
_ -> internalErr "ICQSecure: queue address not found in connection"
ICQDelete rId -> do
Expand All @@ -1727,7 +1728,8 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
ns <- asks ntfSupervisor
liftIO $ sendNtfSubCommand ns (NSCCreate, [connId])
let conn' = DuplexConnection cData (rq'' :| rqs') sqs
notify $ SWITCH QDRcv SPCompleted $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDRcv SPCompleted cStats
_ -> internalErr "ICQDelete: cannot delete the only queue in connection"
where
ack srv rId srvMsgId = do
Expand Down Expand Up @@ -2016,7 +2018,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
AM_QADD_ -> pure ()
AM_QKEY_ -> do
SomeConn _ conn <- withStore c (`getConn` connId)
notify . SWITCH QDSnd SPConfirmed $ connectionStats conn
cStats <- connectionStats c conn
notify $ SWITCH QDSnd SPConfirmed cStats
AM_QUSE_ -> pure ()
AM_QTEST_ -> withConnLock c connId "runSmpQueueMsgDelivery AM_QTEST_" $ do
withStore' c $ \db -> setSndQueueStatus db sq Active
Expand All @@ -2041,7 +2044,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
deleteConnSndQueue db connId sq'
let sqs'' = sq'' :| sqs'
conn' = DuplexConnection cData' rqs sqs''
notify . SWITCH QDSnd SPCompleted $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDSnd SPCompleted cStats
_ -> internalErr msgId "sent QTEST: there is only one queue in connection"
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
_ -> internalErr msgId "QTEST sent not in duplex connection"
Expand Down Expand Up @@ -2152,7 +2156,7 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD
let rqs' = updatedQs rq1 rqs <> [rq'']
pure . connectionStats $ DuplexConnection cData rqs' sqs
connectionStats c $ DuplexConnection cData rqs' sqs

abortConnectionSwitch' :: AgentClient -> ConnId -> AM ConnectionStats
abortConnectionSwitch' c connId =
Expand All @@ -2172,7 +2176,7 @@ abortConnectionSwitch' c connId =
forM_ delRqs $ \RcvQueue {server, rcvId} -> enqueueCommand c "" connId (Just server) $ AInternalCommand $ ICDeleteRcvQueue rcvId
let rqs'' = updatedQs rq' rqs'
conn' = DuplexConnection cData rqs'' sqs
pure $ connectionStats conn'
connectionStats c conn'
_ -> throwE $ INTERNAL "won't delete all rcv queues in connection"
| otherwise -> throwE $ CMD PROHIBITED "abortConnectionSwitch: no rcv queues left"
_ -> throwE $ CMD PROHIBITED "abortConnectionSwitch: not allowed"
Expand All @@ -2195,7 +2199,7 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni
setRatchetX3dhKeys db connId pk1 pk2 pKem
let cData'' = cData' {ratchetSyncState = RSStarted} :: ConnData
conn' = DuplexConnection cData'' rqs sqs
pure $ connectionStats conn'
connectionStats c conn'
| otherwise -> throwE $ CMD PROHIBITED "synchronizeRatchet: not allowed"
_ -> throwE $ CMD PROHIBITED "synchronizeRatchet: not duplex"

Expand Down Expand Up @@ -2363,34 +2367,71 @@ deleteConnections_ getConnections ntf waitDelivery c nm connIds = do
getConnectionServers' :: AgentClient -> ConnId -> AM ConnectionStats
getConnectionServers' c connId = do
SomeConn _ conn <- withStore c (`getConn` connId)
pure $ connectionStats conn
connectionStats c conn

getConnectionRatchetAdHash' :: AgentClient -> ConnId -> AM ByteString
getConnectionRatchetAdHash' c connId = do
CR.Ratchet {rcAD = Str rcAD} <- withStore c (`getRatchet` connId)
pure $ C.sha256Hash rcAD

connectionStats :: Connection c -> ConnectionStats
connectionStats = \case
RcvConnection cData rq ->
(stats cData) {rcvQueuesInfo = [rcvQueueInfo rq]}
SndConnection cData sq ->
(stats cData) {sndQueuesInfo = [sndQueueInfo sq]}
DuplexConnection cData rqs sqs ->
(stats cData) {rcvQueuesInfo = map rcvQueueInfo $ L.toList rqs, sndQueuesInfo = map sndQueueInfo $ L.toList sqs}
ContactConnection cData rq ->
(stats cData) {rcvQueuesInfo = [rcvQueueInfo rq]}
connectionStats :: AgentClient -> Connection c -> AM ConnectionStats
connectionStats c = \case
RcvConnection cData rq -> do
rcvQueuesInfo <- (: []) <$> rcvQueueInfo rq
pure (stats cData) {rcvQueuesInfo, subStatus = connSubStatus rcvQueuesInfo}
SndConnection cData sq -> do
pure (stats cData) {sndQueuesInfo = [sndQueueInfo sq]}
DuplexConnection cData rqs sqs -> do
rcvQueuesInfo <- mapM rcvQueueInfo (L.toList rqs)
pure
(stats cData)
{ rcvQueuesInfo,
sndQueuesInfo = map sndQueueInfo $ L.toList sqs,
subStatus = connSubStatus rcvQueuesInfo
}
ContactConnection cData rq -> do
rcvQueuesInfo <- (: []) <$> rcvQueueInfo rq
pure (stats cData) {rcvQueuesInfo, subStatus = connSubStatus rcvQueuesInfo}
NewConnection cData ->
stats cData
pure $ stats cData
where
stats :: ConnData -> ConnectionStats
stats ConnData {connAgentVersion, ratchetSyncState} =
ConnectionStats
{ connAgentVersion,
rcvQueuesInfo = [],
sndQueuesInfo = [],
ratchetSyncState,
ratchetSyncSupported = connAgentVersion >= ratchetSyncSMPAgentVersion
ratchetSyncSupported = connAgentVersion >= ratchetSyncSMPAgentVersion,
subStatus = Nothing
}
rcvQueueInfo :: RcvQueue -> AM RcvQueueInfo
rcvQueueInfo rq@RcvQueue {server, status, rcvSwchStatus} = do
subStatus <- checkQueueSubStatus
pure $ RcvQueueInfo {rcvServer = server, status, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq, subStatus}
where
checkQueueSubStatus :: AM SubscriptionStatus
checkQueueSubStatus =
atomically $
hasActiveSubscription c rq >>= \case
True -> pure SSActive
False ->
hasPendingSubscription c rq >>= \case
True -> pure SSPending
False ->
hasRemovedSubscription c rq >>= \case
Just err -> pure $ SSRemoved (show err)
Nothing -> pure SSNoSub
sndQueueInfo :: SndQueue -> SndQueueInfo
sndQueueInfo SndQueue {server, status, sndSwchStatus} =
SndQueueInfo {sndServer = server, status, sndSwitchStatus = sndSwchStatus}
connSubStatus :: [RcvQueueInfo] -> Maybe SubscriptionStatus
connSubStatus rqis = do
let activeRqis = filter (\RcvQueueInfo {status} -> status == Active) rqis
rqisForStatus = if null activeRqis then rqis else activeRqis
if null rqisForStatus
then Nothing
else Just $ minimum $ map (\RcvQueueInfo {subStatus} -> subStatus) rqisForStatus

-- | Change servers to be used for creating new queues.
-- This function will set all servers as enabled in case all passed servers are disabled.
Expand Down Expand Up @@ -2903,7 +2944,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
| rss `notElem` ([RSOk, RSStarted] :: [RatchetSyncState]) = do
let cData'' = (toConnData conn') {ratchetSyncState = RSOk} :: ConnData
conn'' = updateConnection cData'' conn'
notify . RSYNC RSOk Nothing $ connectionStats conn''
cStats <- connectionStats c conn''
notify $ RSYNC RSOk Nothing cStats
withStore' c $ \db -> setConnRatchetSync db connId RSOk
pure conn''
| otherwise = pure conn'
Expand Down Expand Up @@ -2933,7 +2975,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
when (rss `elem` ([RSOk, RSAllowed, RSRequired] :: [RatchetSyncState])) $ do
let cData'' = (toConnData conn') {ratchetSyncState = rss'} :: ConnData
conn'' = updateConnection cData'' connDuplex
notify . RSYNC rss' (Just e) $ connectionStats conn''
cStats <- connectionStats c conn''
notify $ RSYNC rss' (Just e) cStats
withStore' c $ \db -> setConnRatchetSync db connId rss'
Left e -> do
atomically $ incSMPServerStat c userId srv recvErrs
Expand Down Expand Up @@ -3188,7 +3231,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
sq1 <- withStore' c $ \db -> setSndSwitchStatus db sq $ Just SSSendingQKEY
let sqs'' = updatedQs sq1 sqs' <> [sq2]
conn' = DuplexConnection cData' rqs sqs''
notify . SWITCH QDSnd SPStarted $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDSnd SPStarted cStats
_ -> qError "QADD: won't delete all snd queues in connection"
_ -> qError "QADD: replaced queue address is not found in connection"
_ -> throwE $ AGENT A_VERSION
Expand All @@ -3207,7 +3251,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
let dhSecret = C.dh' dhPublicKey dhPrivKey
withStore' c $ \db -> setRcvQueueConfirmedE2E db rq' dhSecret $ min cVer cVer'
enqueueCommand c "" connId (Just smpServer) $ AInternalCommand $ ICQSecure rcvId senderKey
notify . SWITCH QDRcv SPConfirmed $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDRcv SPConfirmed cStats
| otherwise -> qError "QKEY: queue already secured"
_ -> qError "QKEY: queue address not found in connection"
where
Expand All @@ -3232,7 +3277,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
sq1' <- withStore' c $ \db -> setSndSwitchStatus db sq1 $ Just SSSendingQTEST
let sqs' = updatedQs sq1' sqs
conn' = DuplexConnection cData' rqs sqs'
notify . SWITCH QDSnd SPSecured $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDSnd SPSecured cStats
_ -> qError "QUSE: switching SndQueue not found in connection"
_ -> qError "QUSE: switched queue address not found in connection"

Expand Down Expand Up @@ -3308,12 +3354,14 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
notifyRatchetSyncError = do
let cData'' = cData' {ratchetSyncState = RSRequired} :: ConnData
conn'' = updateConnection cData'' conn'
notify $ RSYNC RSRequired (Just RATCHET_SYNC) (connectionStats conn'')
cStats <- connectionStats c conn''
notify $ RSYNC RSRequired (Just RATCHET_SYNC) cStats
notifyAgreed :: AM ()
notifyAgreed = do
let cData'' = cData' {ratchetSyncState = RSAgreed} :: ConnData
conn'' = updateConnection cData'' conn'
notify . RSYNC RSAgreed Nothing $ connectionStats conn''
cStats <- connectionStats c conn''
notify $ RSYNC RSAgreed Nothing cStats
recreateRatchet :: CR.Ratchet 'C.X448 -> AM ()
recreateRatchet rc = withStore' c $ \db -> do
setConnRatchetSync db connId RSAgreed
Expand Down
12 changes: 12 additions & 0 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ module Simplex.Messaging.Agent.Client
removeSubscription,
removeSubscriptions,
hasActiveSubscription,
hasPendingSubscription,
hasRemovedSubscription,
hasGetLock,
releaseGetLock,
activeClientSession,
Expand Down Expand Up @@ -1688,6 +1690,16 @@ hasActiveSubscription c rq = do
SS.hasActiveSub tSess (queueId rq) $ currentSubs c
{-# INLINE hasActiveSubscription #-}

hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
hasPendingSubscription c rq = do
tSess <- mkSMPTransportSession c rq
SS.hasPendingSub tSess (queueId rq) $ currentSubs c
{-# INLINE hasPendingSubscription #-}

hasRemovedSubscription :: SomeRcvQueue q => AgentClient -> q -> STM (Maybe SMPClientError)
hasRemovedSubscription c rq = do
TM.lookup (qUserId rq, qServer rq) (removedSubs c) $>>= TM.lookup (queueId rq)

removeSubscription :: SomeRcvQueue q => AgentClient -> SMPTransportSession -> ConnId -> q -> STM ()
removeSubscription c tSess connId rq = do
modifyTVar' (subscrConns c) $ S.delete connId
Expand Down
20 changes: 18 additions & 2 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ module Simplex.Messaging.Agent.Protocol
MsgMeta (..),
RcvQueueInfo (..),
SndQueueInfo (..),
SubscriptionStatus (..),
ConnectionStats (..),
SwitchPhase (..),
RcvSwitchStatus (..),
Expand Down Expand Up @@ -643,23 +644,34 @@ instance FromJSON RatchetSyncState where

data RcvQueueInfo = RcvQueueInfo
{ rcvServer :: SMPServer,
status :: QueueStatus,
rcvSwitchStatus :: Maybe RcvSwitchStatus,
canAbortSwitch :: Bool
canAbortSwitch :: Bool,
subStatus :: SubscriptionStatus
}
deriving (Eq, Show)

data SndQueueInfo = SndQueueInfo
{ sndServer :: SMPServer,
status :: QueueStatus,
sndSwitchStatus :: Maybe SndSwitchStatus
}
deriving (Eq, Show)

data SubscriptionStatus
= SSActive
| SSPending
| SSRemoved {subError :: String}
| SSNoSub
deriving (Eq, Ord, Show)

data ConnectionStats = ConnectionStats
{ connAgentVersion :: VersionSMPA,
rcvQueuesInfo :: [RcvQueueInfo],
sndQueuesInfo :: [SndQueueInfo],
ratchetSyncState :: RatchetSyncState,
ratchetSyncSupported :: Bool
ratchetSyncSupported :: Bool,
subStatus :: Maybe SubscriptionStatus
}
deriving (Eq, Show)

Expand Down Expand Up @@ -2000,6 +2012,10 @@ serializeCommand = \case
serializeBinary :: ByteString -> ByteString
serializeBinary body = bshow (B.length body) <> "\n" <> body

$(J.deriveJSON (enumJSON fstToLower) ''QueueStatus)

$(J.deriveJSON (sumTypeJSON $ dropPrefix "SS") ''SubscriptionStatus)

$(J.deriveJSON defaultJSON ''RcvQueueInfo)

$(J.deriveJSON defaultJSON ''SndQueueInfo)
Expand Down
8 changes: 0 additions & 8 deletions src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ clientServiceId :: RcvQueue -> Maybe ClientServiceId
clientServiceId = fmap dbServiceId . clientService
{-# INLINE clientServiceId #-}

rcvQueueInfo :: RcvQueue -> RcvQueueInfo
rcvQueueInfo rq@RcvQueue {server, rcvSwchStatus} =
RcvQueueInfo {rcvServer = server, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq}

rcvSMPQueueAddress :: RcvQueue -> SMPQueueAddress
rcvSMPQueueAddress RcvQueue {server, sndId, e2ePrivKey, queueMode} =
SMPQueueAddress server sndId (C.publicKey e2ePrivKey) queueMode
Expand Down Expand Up @@ -211,10 +207,6 @@ data StoredSndQueue (q :: DBStored) = SndQueue
}
deriving (Show)

sndQueueInfo :: SndQueue -> SndQueueInfo
sndQueueInfo SndQueue {server, sndSwchStatus} =
SndQueueInfo {sndServer = server, sndSwitchStatus = sndSwchStatus}

instance SMPQueue RcvQueue where
qServer RcvQueue {server} = server
{-# INLINE qServer #-}
Expand Down
Loading