Skip to content

Commit 1dbc15b

Browse files
agent: sync connections (#1654)
* agent: sync subscriptions * remove comment * add shouldDelete flag * compare api * remove instance * query * rename * refactor * functor * JSON instances --------- Co-authored-by: Evgeny Poberezkin <[email protected]>
1 parent 80aa56c commit 1dbc15b

File tree

2 files changed

+64
-1
lines changed

2 files changed

+64
-1
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
{-# LANGUAGE PatternSynonyms #-}
1414
{-# LANGUAGE RankNTypes #-}
1515
{-# LANGUAGE ScopedTypeVariables #-}
16+
{-# LANGUAGE TemplateHaskell #-}
1617
{-# LANGUAGE TupleSections #-}
1718
{-# LANGUAGE TypeApplications #-}
1819
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
@@ -67,6 +68,9 @@ module Simplex.Messaging.Agent
6768
allowConnection,
6869
acceptContact,
6970
rejectContact,
71+
DatabaseDiff (..),
72+
compareConnections,
73+
syncConnections,
7074
subscribeConnection,
7175
subscribeConnections,
7276
subscribeAllConnections,
@@ -140,7 +144,9 @@ import Control.Monad.Except
140144
import Control.Monad.Reader
141145
import Control.Monad.Trans.Except
142146
import Crypto.Random (ChaChaDRG)
147+
import Data.Aeson (FromJSON (..), ToJSON (..))
143148
import qualified Data.Aeson as J
149+
import qualified Data.Aeson.TH as JQ
144150
import Data.Bifunctor (bimap, first)
145151
import Data.ByteString.Char8 (ByteString)
146152
import qualified Data.ByteString.Char8 as B
@@ -195,7 +201,7 @@ import Simplex.Messaging.Encoding
195201
import Simplex.Messaging.Encoding.String
196202
import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..), NtfTokenId, PNMessageData (..), pnMessagesP)
197203
import Simplex.Messaging.Notifications.Types
198-
import Simplex.Messaging.Parsers (parse)
204+
import Simplex.Messaging.Parsers (defaultJSON, parse)
199205
import Simplex.Messaging.Protocol
200206
( BrokerMsg,
201207
Cmd (..),
@@ -434,6 +440,24 @@ rejectContact :: AgentClient -> ConfirmationId -> AE ()
434440
rejectContact c = withAgentEnv c . rejectContact' c
435441
{-# INLINE rejectContact #-}
436442

443+
data DatabaseDiff a = DatabaseDiff
444+
{ missingIds :: [a],
445+
extraIds :: [a]
446+
}
447+
deriving (Show)
448+
449+
instance Functor DatabaseDiff where
450+
fmap f DatabaseDiff {missingIds, extraIds} =
451+
DatabaseDiff {missingIds = map f missingIds, extraIds = map f extraIds}
452+
453+
compareConnections :: AgentClient -> [UserId] -> [ConnId] -> AE (DatabaseDiff UserId, DatabaseDiff ConnId)
454+
compareConnections c = withAgentEnv c .: compareConnections' c
455+
{-# INLINE compareConnections #-}
456+
457+
syncConnections :: AgentClient -> [UserId] -> [ConnId] -> AE (DatabaseDiff UserId, DatabaseDiff ConnId)
458+
syncConnections c = withAgentEnv c .: syncConnections' c
459+
{-# INLINE syncConnections #-}
460+
437461
-- | Subscribe to receive connection messages (SUB command)
438462
subscribeConnection :: AgentClient -> ConnId -> AE (Maybe ClientServiceId)
439463
subscribeConnection c = withAgentEnv c . subscribeConnection' c
@@ -1253,6 +1277,27 @@ rejectContact' c invId =
12531277
withStore' c $ \db -> deleteInvitation db invId
12541278
{-# INLINE rejectContact' #-}
12551279

1280+
syncConnections' :: AgentClient -> [UserId] -> [ConnId] -> AM (DatabaseDiff UserId, DatabaseDiff ConnId)
1281+
syncConnections' c userIds connIds = do
1282+
r@(DatabaseDiff {extraIds = uIds}, DatabaseDiff {extraIds = cIds}) <- compareConnections' c userIds connIds
1283+
forM_ uIds $ \uid -> deleteUser' c uid False
1284+
deleteConnectionsAsync' c False cIds
1285+
pure r
1286+
1287+
compareConnections' :: AgentClient -> [UserId] -> [ConnId] -> AM (DatabaseDiff UserId, DatabaseDiff ConnId)
1288+
compareConnections' c userIds connIds = do
1289+
knownUserIds <- withStore' c getUserIds
1290+
knownConnIds <- withStore' c getConnIds
1291+
pure (databaseDiff userIds knownUserIds, databaseDiff connIds knownConnIds)
1292+
1293+
databaseDiff :: Ord a => [a] -> [a] -> DatabaseDiff a
1294+
databaseDiff passed known =
1295+
let passedSet = S.fromList passed
1296+
knownSet = S.fromList known
1297+
missingIds = S.toList $ passedSet `S.difference` knownSet
1298+
extraIds = S.toList $ knownSet `S.difference` passedSet
1299+
in DatabaseDiff {missingIds, extraIds}
1300+
12561301
-- | Subscribe to receive connection messages (SUB command) in Reader monad
12571302
subscribeConnection' :: AgentClient -> ConnId -> AM (Maybe ClientServiceId)
12581303
subscribeConnection' c connId = toConnResult connId =<< subscribeConnections' c [connId]
@@ -3478,3 +3523,12 @@ newSndQueue userId connId (Compatible (SMPQueueInfo smpClientVersion SMPQueueAdd
34783523
smpClientVersion
34793524
}
34803525
pure (sq, e2ePubKey)
3526+
3527+
$(pure [])
3528+
3529+
instance FromJSON a => FromJSON (DatabaseDiff a) where
3530+
parseJSON = $(JQ.mkParseJSON defaultJSON ''DatabaseDiff)
3531+
3532+
instance ToJSON a => ToJSON (DatabaseDiff a) where
3533+
toEncoding = $(JQ.mkToEncoding defaultJSON ''DatabaseDiff)
3534+
toJSON = $(JQ.mkToJSON defaultJSON ''DatabaseDiff)

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
module Simplex.Messaging.Agent.Store.AgentStore
2929
( -- * Users
3030
createUserRecord,
31+
getUserIds,
3132
deleteUserRecord,
3233
setUserDeleted,
3334
deleteUserWithoutConns,
@@ -42,6 +43,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
4243
getSubscriptionServers,
4344
getUserServerRcvQueueSubs,
4445
unsetQueuesToSubscribe,
46+
getConnIds,
4547
getConn,
4648
getDeletedConn,
4749
getConns,
@@ -330,6 +332,10 @@ createUserRecord db = do
330332
DB.execute_ db "INSERT INTO users DEFAULT VALUES"
331333
insertedRowId db
332334

335+
getUserIds :: DB.Connection -> IO [UserId]
336+
getUserIds db =
337+
map fromOnly <$> DB.query_ db "SELECT user_id FROM users WHERE deleted = 0"
338+
333339
checkUser :: DB.Connection -> UserId -> IO (Either StoreError ())
334340
checkUser db userId =
335341
firstRow (\(_ :: Only Int64) -> ()) SEUserNotFound $
@@ -2089,6 +2095,9 @@ unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe =
20892095

20902096
-- * getConn helpers
20912097

2098+
getConnIds :: DB.Connection -> IO [ConnId]
2099+
getConnIds db = map fromOnly <$> DB.query_ db "SELECT conn_id FROM connections WHERE deleted = 0"
2100+
20922101
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
20932102
getConn = getAnyConn False
20942103
{-# INLINE getConn #-}

0 commit comments

Comments
 (0)