Skip to content

Commit 1329fc7

Browse files
authored
smp: support client notices (#1659)
* agent: support client notices * improve * fix, test * rename * cleanup * send and process notices in more cases * dont delete * dont remove notice on other permanent errors * dont remove notice if there is no notice ID in queue * add server to error * allow deleting * only use notice if key hash matches
1 parent 234aeb8 commit 1329fc7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+653
-221
lines changed

simplexmq.cabal

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ library
103103
Simplex.Messaging.Agent.Store.AgentStore
104104
Simplex.Messaging.Agent.Store.Common
105105
Simplex.Messaging.Agent.Store.DB
106+
Simplex.Messaging.Agent.Store.Entity
106107
Simplex.Messaging.Agent.Store.Interface
107108
Simplex.Messaging.Agent.Store.Migrations
108109
Simplex.Messaging.Agent.Store.Migrations.App
@@ -130,12 +131,13 @@ library
130131
Simplex.Messaging.Notifications.Types
131132
Simplex.Messaging.Parsers
132133
Simplex.Messaging.Protocol
134+
Simplex.Messaging.Protocol.Types
133135
Simplex.Messaging.Server.Expiration
134136
Simplex.Messaging.Server.QueueStore.Postgres.Config
135137
Simplex.Messaging.Server.QueueStore.QueueInfo
136138
Simplex.Messaging.ServiceScheme
137139
Simplex.Messaging.Session
138-
Simplex.Messaging.Agent.Store.Entity
140+
Simplex.Messaging.SystemTime
139141
Simplex.Messaging.TMap
140142
Simplex.Messaging.Transport
141143
Simplex.Messaging.Transport.Buffer
@@ -164,6 +166,7 @@ library
164166
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
165167
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
166168
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
169+
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
167170
else
168171
exposed-modules:
169172
Simplex.Messaging.Agent.Store.SQLite
@@ -212,6 +215,7 @@ library
212215
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links
213216
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
214217
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
218+
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
215219
if flag(client_postgres) || flag(server_postgres)
216220
exposed-modules:
217221
Simplex.Messaging.Agent.Store.Postgres

src/Simplex/FileTransfer/Server.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ import Simplex.Messaging.Protocol (BlockingInfo, EntityId (..), RcvPublicAuthKey
5858
import Simplex.Messaging.Server (controlPortAuth, dummyVerifyCmd, verifyCmdAuthorization)
5959
import Simplex.Messaging.Server.Control (CPClientRole (..))
6060
import Simplex.Messaging.Server.Expiration
61-
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..), getRoundedSystemTime)
61+
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
6262
import Simplex.Messaging.Server.Stats
63+
import Simplex.Messaging.SystemTime
6364
import Simplex.Messaging.TMap (TMap)
6465
import qualified Simplex.Messaging.TMap as TM
6566
import Simplex.Messaging.Transport (CertChainPubKey (..), SessionId, THandleAuth (..), THandleParams (..), TransportPeer (..), defaultSupportedParams)
@@ -451,7 +452,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
451452
let rIds = L.map (\(FileRecipient rId _) -> rId) rcps
452453
pure $ FRSndIds sId rIds
453454
pure $ either FRErr id r
454-
addFileRetry :: FileStore -> FileInfo -> Int -> RoundedSystemTime -> M (Either XFTPErrorType XFTPFileId)
455+
addFileRetry :: FileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId)
455456
addFileRetry st file n ts =
456457
retryAdd n $ \sId -> runExceptT $ do
457458
ExceptT $ addFile st sId file ts EntityActive
@@ -579,8 +580,8 @@ deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExce
579580
liftIO $ atomicModifyIORef'_ (filesCount stats) (subtract 1)
580581
liftIO $ atomicModifyIORef'_ (filesSize stats) (subtract $ fromIntegral $ size fileInfo)
581582

582-
getFileTime :: IO RoundedSystemTime
583-
getFileTime = getRoundedSystemTime fileTimePrecision
583+
getFileTime :: IO RoundedFileTime
584+
getFileTime = getRoundedSystemTime
584585

585586
expireServerFiles :: Maybe Int -> ExpirationConfig -> M ()
586587
expireServerFiles itemDelay expCfg = do

src/Simplex/FileTransfer/Server/Store.hs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE DataKinds #-}
12
{-# LANGUAGE GADTs #-}
23
{-# LANGUAGE LambdaCase #-}
34
{-# LANGUAGE NamedFieldPuns #-}
@@ -8,6 +9,7 @@ module Simplex.FileTransfer.Server.Store
89
( FileStore (..),
910
FileRec (..),
1011
FileRecipient (..),
12+
RoundedFileTime,
1113
newFileStore,
1214
addFile,
1315
setFilePath,
@@ -33,7 +35,8 @@ import Simplex.FileTransfer.Transport (XFTPErrorType (..))
3335
import qualified Simplex.Messaging.Crypto as C
3436
import Simplex.Messaging.Encoding.String
3537
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
36-
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..), ServerEntityStatus (..))
38+
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
39+
import Simplex.Messaging.SystemTime
3740
import Simplex.Messaging.TMap (TMap)
3841
import qualified Simplex.Messaging.TMap as TM
3942
import Simplex.Messaging.Util (ifM, ($>>=))
@@ -49,10 +52,12 @@ data FileRec = FileRec
4952
fileInfo :: FileInfo,
5053
filePath :: TVar (Maybe FilePath),
5154
recipientIds :: TVar (Set RecipientId),
52-
createdAt :: RoundedSystemTime,
55+
createdAt :: RoundedFileTime,
5356
fileStatus :: TVar ServerEntityStatus
5457
}
5558

59+
type RoundedFileTime = RoundedSystemTime 3600
60+
5661
fileTimePrecision :: Int64
5762
fileTimePrecision = 3600 -- truncate creation time to 1 hour
5863

@@ -70,14 +75,14 @@ newFileStore = do
7075
usedStorage <- newTVarIO 0
7176
pure FileStore {files, recipients, usedStorage}
7277

73-
addFile :: FileStore -> SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> STM (Either XFTPErrorType ())
78+
addFile :: FileStore -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM (Either XFTPErrorType ())
7479
addFile FileStore {files} sId fileInfo createdAt status =
7580
ifM (TM.member sId files) (pure $ Left DUPLICATE_) $ do
7681
f <- newFileRec sId fileInfo createdAt status
7782
TM.insert sId f files
7883
pure $ Right ()
7984

80-
newFileRec :: SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> STM FileRec
85+
newFileRec :: SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM FileRec
8186
newFileRec senderId fileInfo createdAt status = do
8287
recipientIds <- newTVar S.empty
8388
filePath <- newTVar Nothing

src/Simplex/FileTransfer/Server/StoreLog.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ import Simplex.FileTransfer.Protocol (FileInfo (..))
3434
import Simplex.FileTransfer.Server.Store
3535
import Simplex.Messaging.Encoding.String
3636
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
37-
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..))
37+
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
3838
import Simplex.Messaging.Server.StoreLog
3939
import Simplex.Messaging.Util (bshow)
4040
import System.IO
4141

4242
data FileStoreLogRecord
43-
= AddFile SenderId FileInfo RoundedSystemTime ServerEntityStatus
43+
= AddFile SenderId FileInfo RoundedFileTime ServerEntityStatus
4444
| PutFile SenderId FilePath
4545
| AddRecipients SenderId (NonEmpty FileRecipient)
4646
| DeleteFile SenderId
@@ -69,7 +69,7 @@ instance StrEncoding FileStoreLogRecord where
6969
logFileStoreRecord :: StoreLog 'WriteMode -> FileStoreLogRecord -> IO ()
7070
logFileStoreRecord = writeStoreLogRecord
7171

72-
logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> IO ()
72+
logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> IO ()
7373
logAddFile s = logFileStoreRecord s .:: AddFile
7474

7575
logPutFile :: StoreLog 'WriteMode -> SenderId -> FilePath -> IO ()

src/Simplex/FileTransfer/Types.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ import Data.Text.Encoding (encodeUtf8)
1515
import Data.Word (Word32)
1616
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
1717
import Simplex.FileTransfer.Description
18+
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..), fromTextField_)
1819
import qualified Simplex.Messaging.Crypto as C
1920
import Simplex.Messaging.Crypto.File (CryptoFile (..))
2021
import Simplex.Messaging.Encoding
2122
import Simplex.Messaging.Encoding.String
2223
import Simplex.Messaging.Parsers
2324
import Simplex.Messaging.Protocol (XFTPServer)
2425
import System.FilePath ((</>))
25-
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..), fromTextField_)
2626

2727
type RcvFileId = ByteString -- Agent entity ID
2828

src/Simplex/Messaging/Agent.hs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ import qualified Data.Aeson.TH as JQ
151151
import Data.Bifunctor (bimap, first)
152152
import Data.ByteString.Char8 (ByteString)
153153
import qualified Data.ByteString.Char8 as B
154-
import Data.Composition ((.:), (.:.), (.::), (.::.))
154+
import Data.Composition
155155
import Data.Either (isRight, partitionEithers, rights)
156156
import Data.Foldable (foldl', toList)
157157
import Data.Functor (($>))
@@ -189,10 +189,11 @@ import Simplex.Messaging.Agent.Store
189189
import Simplex.Messaging.Agent.Store.AgentStore
190190
import Simplex.Messaging.Agent.Store.Common (DBStore)
191191
import qualified Simplex.Messaging.Agent.Store.DB as DB
192+
import Simplex.Messaging.Agent.Store.Entity
192193
import Simplex.Messaging.Agent.Store.Interface (closeDBStore, execSQL, getCurrentMigrations)
193194
import Simplex.Messaging.Agent.Store.Shared (UpMigration (..), upMigration)
194195
import qualified Simplex.Messaging.Agent.TSessionSubs as SS
195-
import Simplex.Messaging.Client (NetworkRequestMode (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, nonBlockingWriteTBQueue, temporaryClientError, unexpectedResponse)
196+
import Simplex.Messaging.Client (NetworkRequestMode (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, nonBlockingWriteTBQueue, smpErrorClientNotice, temporaryClientError, unexpectedResponse)
196197
import qualified Simplex.Messaging.Crypto as C
197198
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
198199
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
@@ -227,7 +228,7 @@ import Simplex.Messaging.Protocol
227228
)
228229
import qualified Simplex.Messaging.Protocol as SMP
229230
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
230-
import Simplex.Messaging.Agent.Store.Entity
231+
import Simplex.Messaging.SystemTime
231232
import qualified Simplex.Messaging.TMap as TM
232233
import Simplex.Messaging.Transport (SMPVersion)
233234
import Simplex.Messaging.Util
@@ -251,13 +252,14 @@ getSMPAgentClient = getSMPAgentClient_ 1
251252
{-# INLINE getSMPAgentClient #-}
252253

253254
getSMPAgentClient_ :: Int -> AgentConfig -> InitialAgentServers -> DBStore -> Bool -> IO AgentClient
254-
getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp} store backgroundMode =
255+
getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp, presetServers} store backgroundMode =
255256
newSMPAgentEnv cfg store >>= runReaderT runAgent
256257
where
257258
runAgent = do
258259
liftIO $ checkServers "SMP" smp >> checkServers "XFTP" xftp
259260
currentTs <- liftIO getCurrentTime
260-
c@AgentClient {acThread} <- liftIO . newAgentClient clientId initServers currentTs =<< ask
261+
notices <- liftIO $ withTransaction store (`getClientNotices` presetServers) `catchAll_` pure []
262+
c@AgentClient {acThread} <- liftIO . newAgentClient clientId initServers currentTs notices =<< ask
261263
t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c)
262264
atomically . writeTVar acThread . Just =<< mkWeakThreadId t
263265
pure c
@@ -379,8 +381,8 @@ deleteConnectionsAsync c waitDelivery = withAgentEnv c . deleteConnectionsAsync'
379381
{-# INLINE deleteConnectionsAsync #-}
380382

381383
-- | Create SMP agent connection (NEW command)
382-
createConnection :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AE (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
383-
createConnection c nm userId enableNtfs = withAgentEnv c .::. newConn c nm userId enableNtfs
384+
createConnection :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AE (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
385+
createConnection c nm userId enableNtfs checkNotices = withAgentEnv c .::. newConn c nm userId enableNtfs checkNotices
384386
{-# INLINE createConnection #-}
385387

386388
-- | Create or update user's contact connection short link
@@ -863,13 +865,27 @@ switchConnectionAsync' c corrId connId =
863865
connectionStats c $ DuplexConnection cData rqs' sqs
864866
_ -> throwE $ CMD PROHIBITED "switchConnectionAsync: not duplex"
865867

866-
newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
867-
newConn c nm userId enableNtfs cMode userData_ clientData pqInitKeys subMode = do
868+
newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
869+
newConn c nm userId enableNtfs checkNotices cMode userData_ clientData pqInitKeys subMode = do
868870
srv <- getSMPServer c userId
871+
when (checkNotices && connMode cMode == CMContact) $ checkClientNotices c srv
869872
connId <- newConnNoQueues c userId enableNtfs cMode (CR.connPQEncryption pqInitKeys)
870873
(connId,) <$> newRcvConnSrv c nm userId connId enableNtfs cMode userData_ clientData pqInitKeys subMode srv
871874
`catchE` \e -> withStore' c (`deleteConnRecord` connId) >> throwE e
872875

876+
checkClientNotices :: AgentClient -> SMPServerWithAuth -> AM ()
877+
checkClientNotices AgentClient {clientNotices, presetServers} (ProtoServerWithAuth srv@(ProtocolServer {host}) _) = do
878+
notices <- readTVarIO clientNotices
879+
unless (M.null notices) $ checkNotices notices =<< liftIO getSystemSeconds
880+
where
881+
srvKey
882+
| isPresetServer srv presetServers = Nothing -- Nothing is used as key for preset servers
883+
| otherwise = Just srv
884+
checkNotices notices ts =
885+
forM_ (M.lookup srvKey notices) $ \expires_ ->
886+
when (maybe True (ts <) expires_) $
887+
throwError NOTICE {server = safeDecodeUtf8 $ strEncode $ L.head host, preset = isNothing srvKey, expiresAt = roundedToUTCTime <$> expires_}
888+
873889
setConnShortLink' :: AgentClient -> NetworkRequestMode -> ConnId -> SConnectionMode c -> UserLinkData -> Maybe CRClientData -> AM (ConnShortLink c)
874890
setConnShortLink' c nm connId cMode userData clientData =
875891
withConnLock c connId "setConnShortLink" $ do
@@ -2794,18 +2810,20 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
27942810
STEvent msgOrErr ->
27952811
withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of
27962812
Right msg -> runProcessSMP rq conn (toConnData conn) msg
2797-
Left e -> lift $ notifyErr connId e
2813+
Left e -> lift $ do
2814+
processClientNotice rq e
2815+
notifyErr connId e
27982816
STResponse (Cmd SRecipient cmd) respOrErr ->
27992817
withRcvConn entId $ \rq conn -> case cmd of
28002818
SMP.SUB -> case respOrErr of
2801-
Right SMP.OK -> processSubOk rq upConnIds
2819+
Right SMP.OK -> liftIO $ processSubOk rq upConnIds
28022820
-- TODO [certs rcv] associate queue with the service
2803-
Right (SMP.SOK serviceId_) -> processSubOk rq upConnIds
2821+
Right (SMP.SOK serviceId_) -> liftIO $ processSubOk rq upConnIds
28042822
Right msg@SMP.MSG {} -> do
2805-
processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails
2823+
liftIO $ processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails
28062824
runProcessSMP rq conn (toConnData conn) msg
2807-
Right r -> processSubErr rq $ unexpectedResponse r
2808-
Left e -> unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported
2825+
Right r -> lift $ processSubErr rq $ unexpectedResponse r
2826+
Left e -> lift $ unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported
28092827
SMP.ACK _ -> case respOrErr of
28102828
Right msg@SMP.MSG {} -> runProcessSMP rq conn (toConnData conn) msg
28112829
_ -> pure () -- TODO process OK response to ACK
@@ -2827,21 +2845,28 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
28272845
tryAllErrors' (a rq conn) >>= \case
28282846
Left e -> notify' connId (ERR e)
28292847
Right () -> pure ()
2830-
processSubOk :: RcvQueue -> TVar [ConnId] -> AM ()
2848+
processSubOk :: RcvQueue -> TVar [ConnId] -> IO ()
28312849
processSubOk rq@RcvQueue {connId} upConnIds =
28322850
atomically . whenM (isPendingSub rq) $ do
28332851
SS.addActiveSub tSess sessId (rcvQueueSub rq) $ currentSubs c
28342852
modifyTVar' upConnIds (connId :)
2835-
processSubErr :: RcvQueue -> SMPClientError -> AM ()
2853+
processSubErr :: RcvQueue -> SMPClientError -> AM' ()
28362854
processSubErr rq@RcvQueue {connId} e = do
28372855
atomically . whenM (isPendingSub rq) $
28382856
failSubscription c tSess rq e >> incSMPServerStat c userId srv connSubErrs
2839-
lift $ notifyErr connId e
2857+
processClientNotice rq e
2858+
notifyErr connId e
28402859
isPendingSub :: RcvQueue -> STM Bool
28412860
isPendingSub rq = do
28422861
pending <- (&&) <$> SS.hasPendingSub tSess (queueId rq) (currentSubs c) <*> activeClientSession c tSess sessId
28432862
unless pending $ incSMPServerStat c userId srv connSubIgnored
28442863
pure pending
2864+
processClientNotice rq e =
2865+
forM_ (smpErrorClientNotice e) $ \notice_ ->
2866+
E.bracket_
2867+
(atomically $ takeTMVar $ clientNoticesLock c)
2868+
(atomically $ putTMVar (clientNoticesLock c) ())
2869+
(processClientNotices c tSess [(rcvQueueSub rq, notice_)])
28452870
notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m ()
28462871
notify' connId msg = atomically $ writeTBQueue subQ ("", connId, AEvt (sAEntity @e) msg)
28472872
notifyErr :: ConnId -> SMPClientError -> AM' ()

0 commit comments

Comments
 (0)