Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
Expand Down Expand Up @@ -223,6 +224,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Common
Simplex.Messaging.Agent.Store.Postgres.DB
Simplex.Messaging.Agent.Store.Postgres.Migrations
Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
Simplex.Messaging.Agent.Store.Postgres.Util
if !flag(client_library)
exposed-modules:
Expand Down
10 changes: 5 additions & 5 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ import Simplex.Messaging.Protocol
ErrorType (AUTH),
MsgBody,
MsgFlags (..),
IdsHash,
NtfServer,
ProtoServerWithAuth (..),
ProtocolServer (..),
Expand All @@ -222,6 +221,7 @@ import Simplex.Messaging.Protocol
SMPMsgMeta,
SParty (..),
SProtocolType (..),
ServiceSub (..),
SndPublicAuthKey,
SubscriptionMode (..),
UserProtocol,
Expand Down Expand Up @@ -500,7 +500,7 @@ resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either Agen
resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
{-# INLINE resubscribeConnections #-}

subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
{-# INLINE subscribeClientServices #-}

Expand Down Expand Up @@ -1507,15 +1507,15 @@ resubscribeConnections' c connIds = do
[] -> pure True
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'

-- TODO [certs rcv] compare hash with lock
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices' c userId =
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
where
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
subscribe = do
srvs <- withStore' c (`getClientServiceServers` userId)
lift $ M.fromList . zip srvs <$> mapConcurrently (tryAllErrors' . subscribeClientService c userId) srvs
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c userId srv n idsHash) srvs

-- requesting messages sequentially, to reduce memory usage
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
Expand Down
12 changes: 7 additions & 5 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ import Simplex.Messaging.Protocol
RcvNtfPublicDhKey,
SMPMsgMeta (..),
SProtocolType (..),
ServiceSub,
SndPublicAuthKey,
SubscriptionMode (..),
NewNtfCreds (..),
Expand Down Expand Up @@ -1689,10 +1690,10 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
logError $ "processClientNotices error: " <> tshow e
notifySub' c "" $ ERR e

subscribeClientService :: AgentClient -> UserId -> SMPServer -> AM (Int64, IdsHash)
subscribeClientService c userId srv =
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $
(`subscribeService` SMP.SRecipientService) . connectedClient
subscribeClientService :: AgentClient -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSub
subscribeClientService c userId srv n idsHash =
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
subscribeService smp SMP.SRecipientService n idsHash

activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)
Expand Down Expand Up @@ -2301,7 +2302,8 @@ withStore c action = do
[ E.Handler $ \(e :: SQL.SQLError) ->
let se = SQL.sqlError e
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ bshow se,
err = tshow se <> ": " <> SQL.sqlErrorDetails e <> ", " <> SQL.sqlErrorContext e
in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ encodeUtf8 err,
E.Handler $ \(E.SomeException e) -> pure . Left $ SEInternal $ bshow e
]
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent/NtfSubSupervisor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ runNtfWorker c srv Worker {doWork} =
_ -> ((ntfSubConnId sub, INTERNAL "NSACheck - no subscription ID") : errs, subs, subIds)
updateSub :: DB.Connection -> NtfServer -> UTCTime -> UTCTime -> (NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer)
updateSub db ntfServer ts nextCheckTs (sub, status)
| ntfShouldSubscribe status =
| status `elem` subscribeNtfStatuses =
let sub' = sub {ntfSubStatus = NASCreated status}
in Nothing <$ updateNtfSubscription db sub' (NSANtf NSACheck) nextCheckTs
-- ntf server stopped subscribing to this queue
Expand Down
13 changes: 7 additions & 6 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -431,19 +431,20 @@ getClientService db userId srv =
where
toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)

getClientServiceServers :: DB.Connection -> UserId -> IO [SMPServer]
getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)]
getClientServiceServers db userId =
map toServer
<$> DB.query
db
[sql|
SELECT c.host, c.port, s.key_hash
SELECT c.host, c.port, s.key_hash, c.rcv_service_id, c.rcv_service_queue_count, c.rcv_service_queue_ids_hash
FROM client_services c
JOIN servers s ON s.host = c.host AND s.port = c.port
|]
(Only userId)
where
toServer (host, port, kh) = SMPServer host port kh
toServer (host, port, kh, serviceId, n, Binary idsHash) =
(SMPServer host port kh, ServiceSub serviceId n (IdsHash idsHash))

setClientServiceId :: DB.Connection -> UserId -> SMPServer -> ServiceId -> IO ()
setClientServiceId db userId srv serviceId =
Expand Down Expand Up @@ -2099,7 +2100,7 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} subMode serverKeyHash_ = do
ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|]
( (host server, port server, rcvId, rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
( (host server, port server, rcvId, BI rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
:. (sndId, queueMode, status, BI toSubscribe, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
:. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
:. ntfCredsFields
Expand Down Expand Up @@ -2468,13 +2469,13 @@ rcvQueueQuery =

toRcvQueue ::
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, ServiceAssoc)
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, BoolInt)
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
RcvQueue
toRcvQueue
( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode)
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, rcvServiceAssoc)
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, BI rcvServiceAssoc)
:. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)
:. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_)
) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs
import Simplex.Messaging.Agent.Store.Shared (Migration (..))

schemaMigrations :: [(String, Text, Maybe Text)]
Expand All @@ -19,7 +20,8 @@ schemaMigrations =
("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices)
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("20251020_service_certs", m20251020_service_certs, Just down_m20251020_service_certs)
]

-- | The list of migrations in ascending order by date
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{-# LANGUAGE QuasiQuotes #-}

module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs where

import Data.Text (Text)
import qualified Data.Text as T
import Text.RawString.QQ (r)

m20251020_service_certs :: Text
m20251020_service_certs =
T.pack
[r|
CREATE TABLE client_services(
user_id BIGINT NOT NULL REFERENCES users ON UPDATE RESTRICT ON DELETE CASCADE,
host TEXT NOT NULL,
port TEXT NOT NULL,
service_cert BYTEA NOT NULL,
service_cert_hash BYTEA NOT NULL,
service_priv_key BYTEA NOT NULL,
service_id BYTEA,
service_queue_count BIGINT NOT NULL DEFAULT 0,
service_queue_ids_hash BYTEA NOT NULL DEFAULT '\x00000000000000000000000000000000',
FOREIGN KEY(host, port) REFERENCES servers ON UPDATE CASCADE ON DELETE RESTRICT
);

CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(user_id, host, port);
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);

ALTER TABLE rcv_queues ADD COLUMN rcv_service_assoc SMALLINT NOT NULL DEFAULT 0;

CREATE FUNCTION update_aggregates(p_conn_id BYTEA, p_host TEXT, p_port TEXT, p_change BIGINT, p_rcv_id BYTEA) RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE q_user_id BIGINT;
BEGIN
SELECT user_id INTO q_user_id FROM connections WHERE conn_id = p_conn_id;
UPDATE client_services
SET service_queue_count = service_queue_count + p_change,
service_queue_ids_hash = xor_combine(service_queue_ids_hash, public.digest(p_rcv_id, 'md5'))
WHERE user_id = q_user_id AND host = p_host AND port = p_port;
END;
$$;

CREATE FUNCTION on_rcv_queue_insert() RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
PERFORM update_aggregates(NEW.conn_id, NEW.host, NEW.port, 1, NEW.rcv_id);
END IF;
RETURN NEW;
END;
$$;

CREATE FUNCTION on_rcv_queue_delete() RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
PERFORM update_aggregates(OLD.conn_id, OLD.host, OLD.port, -1, OLD.rcv_id);
END IF;
RETURN OLD;
END;
$$;

CREATE FUNCTION on_rcv_queue_update() RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
IF NOT (NEW.rcv_service_assoc != 0 AND NEW.deleted = 0) THEN
PERFORM update_aggregates(OLD.conn_id, OLD.host, OLD.port, -1, OLD.rcv_id);
END IF;
ELSIF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
PERFORM update_aggregates(NEW.conn_id, NEW.host, NEW.port, 1, NEW.rcv_id);
END IF;
RETURN NEW;
END;
$$;

CREATE TRIGGER tr_rcv_queue_insert
AFTER INSERT ON rcv_queues
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_insert();

CREATE TRIGGER tr_rcv_queue_delete
AFTER DELETE ON rcv_queues
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_delete();

CREATE TRIGGER tr_rcv_queue_update
AFTER UPDATE ON rcv_queues
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_update();
|]

down_m20251020_service_certs :: Text
down_m20251020_service_certs =
T.pack
[r|
DROP TRIGGER tr_rcv_queue_insert ON rcv_queues;
DROP TRIGGER tr_rcv_queue_delete ON rcv_queues;
DROP TRIGGER tr_rcv_queue_update ON rcv_queues;

DROP FUNCTION on_rcv_queue_insert;
DROP FUNCTION on_rcv_queue_delete;
DROP FUNCTION on_rcv_queue_update;

DROP FUNCTION update_aggregates;

ALTER TABLE rcv_queues DROP COLUMN rcv_service_assoc;

DROP INDEX idx_server_certs_host_port;
DROP INDEX idx_server_certs_user_id_host_port;
DROP TABLE client_services;
|]
46 changes: 46 additions & 0 deletions src/Simplex/Messaging/Agent/Store/Postgres/Migrations/Util.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{-# LANGUAGE QuasiQuotes #-}

module Simplex.Messaging.Agent.Store.Postgres.Migrations.Util where

import Data.Text (Text)
import qualified Data.Text as T
import Text.RawString.QQ (r)

-- xor_combine is only applied to locally computed md5 hashes (128 bits/16 bytes),
-- so it is safe to require that all values are of the same length.
createXorHashFuncs :: Text
createXorHashFuncs =
T.pack
[r|
CREATE OR REPLACE FUNCTION xor_combine(state BYTEA, value BYTEA) RETURNS BYTEA
LANGUAGE plpgsql IMMUTABLE STRICT
AS $$
DECLARE
result BYTEA := state;
i INTEGER;
len INTEGER := octet_length(value);
BEGIN
IF octet_length(state) != len THEN
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
END IF;
FOR i IN 0..len-1 LOOP
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
END LOOP;
RETURN result;
END;
$$;

CREATE OR REPLACE AGGREGATE xor_aggregate(BYTEA) (
SFUNC = xor_combine,
STYPE = BYTEA,
INITCOND = '\x00000000000000000000000000000000' -- 16 bytes
);
|]

dropXorHashFuncs :: Text
dropXorHashFuncs =
T.pack
[r|
DROP AGGREGATE xor_aggregate(BYTEA);
DROP FUNCTION xor_combine;
|]
Loading
Loading