@@ -1498,15 +1498,15 @@ client
14981498 | clntIds -> pure $ ERR AUTH -- no retry on collision if sender ID is client-supplied
14991499 | otherwise -> tryCreate (n - 1 )
15001500 Left e -> pure $ ERR e
1501- Right q -> do
1501+ Right _q -> do
15021502 stats <- asks serverStats
15031503 incStat $ qCreated stats
15041504 incStat $ qCount stats
15051505 -- TODO [notifications]
15061506 -- when (isJust ntf) $ incStat $ ntfCreated stats
15071507 case subMode of
15081508 SMOnlyCreate -> pure ()
1509- SMSubscribe -> void $ subscribeQueue q qr -- no need to check if message is available, it's a new queue
1509+ SMSubscribe -> subscribeNewQueue rcvId qr -- no need to check if message is available, it's a new queue
15101510 pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId = fst <$> queueData, serviceId = rcvServiceId} -- , serverNtfCreds = snd <$> ntf
15111511 (corrId,entId,) <$> tryCreate (3 :: Int )
15121512
@@ -1569,32 +1569,35 @@ client
15691569
15701570 -- TODO [certs rcv] if serviceId is passed, associate with the service and respond with SOK
15711571 subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
1572- subscribeQueueAndDeliver q qr =
1573- liftIO (TM. lookupIO rId $ subscriptions clnt) >>= \ case
1574- Nothing -> subscribeQueue q qr >>= deliver True
1572+ subscribeQueueAndDeliver q qr@ QueueRec {rcvServiceId} =
1573+ liftIO (TM. lookupIO entId $ subscriptions clnt) >>= \ case
1574+ Nothing ->
1575+ sharedSubscribeQueue SRecipientService q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub ) rcvServices >>= \ case
1576+ Left e -> pure (err e, Nothing )
1577+ Right s -> deliver s
15751578 Just s@ Sub {subThread} -> do
15761579 stats <- asks serverStats
15771580 case subThread of
15781581 ProhibitSub -> do
15791582 -- cannot use SUB in the same connection where GET was used
15801583 incStat $ qSubProhibited stats
1581- pure ((corrId, rId, ERR $ CMD PROHIBITED ), Nothing )
1584+ pure (err ( CMD PROHIBITED ), Nothing )
15821585 _ -> do
15831586 incStat $ qSubDuplicate stats
1584- atomically (tryTakeTMVar $ delivered s) >> deliver False (Just s)
1587+ let clntServiceId = (\ THClientService {serviceId} -> serviceId) <$> service
1588+ atomically (tryTakeTMVar $ delivered s) >> deliver (True , Just s, clntServiceId)
15851589 where
1586- rId = recipientId q
1587- deliver :: Bool -> Maybe Sub -> M s ResponseAndMessage
1588- deliver inc sub_ = do
1590+ deliver :: (Bool , Maybe Sub , Maybe ServiceId ) -> M s ResponseAndMessage
1591+ deliver (hasSub, sub_, serviceId) = do
15891592 stats <- asks serverStats
1590- fmap (either (\ e -> ((corrId, rId, ERR e), Nothing )) id ) $ liftIO $ runExceptT $ do
1593+ fmap (either ((, Nothing ) . err ) id ) $ liftIO $ runExceptT $ do
15911594 msg_ <- tryPeekMsg ms q
15921595 msg' <- forM msg_ $ \ msg -> do
1593- sub <- maybe (atomically $ getSub rId ) pure sub_
1596+ sub <- maybe (atomically $ getSub entId ) pure sub_
15941597 void $ atomically $ setDelivered sub msg
1595- when inc $ incStat $ qSub stats
1596- pure (rId , encryptMsg qr msg)
1597- pure ((corrId, rId , SOK Nothing ), msg')
1598+ unless hasSub $ incStat $ qSub stats
1599+ pure (entId , encryptMsg qr msg)
1600+ pure ((corrId, entId , SOK serviceId ), msg')
15981601
15991602 getSub :: RecipientId -> STM Sub
16001603 getSub rId =
@@ -1605,14 +1608,14 @@ client
16051608 TM. insert rId sub $ subscriptions clnt
16061609 pure sub
16071610
1608- subscribeQueue :: StoreQueue s -> QueueRec -> M s (Maybe Sub ) -- (Either ErrorType (Bool, Maybe Sub, Maybe ServiceId) )
1609- subscribeQueue q QueueRec {rcvServiceId} = do
1610- -- sharedSubscribeQueue SRecipientService q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
1611- let rId = recipientId q
1612- sub <- atomically $ newSubscription NoSub
1613- atomically $ TM. insert rId sub $ subscriptions clnt
1614- atomically $ writeTQueue (subQ subscribers) ( CSClient rId rcvServiceId Nothing , clientId)
1615- pure $ Just sub
1611+ subscribeNewQueue :: RecipientId -> QueueRec -> M s ()
1612+ subscribeNewQueue rId QueueRec {rcvServiceId} = do
1613+ case rcvServiceId of
1614+ Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) ( + 1 )
1615+ Nothing -> do
1616+ sub <- atomically $ newSubscription NoSub
1617+ atomically $ TM. insert rId sub $ subscriptions clnt
1618+ atomically $ writeTQueue (subQ subscribers) ( CSClient rId rcvServiceId rcvServiceId, clientId)
16161619
16171620 -- clients that use GET are not added to server subscribers
16181621 getMessage :: StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg )
@@ -1867,10 +1870,13 @@ client
18671870 tryDeliverMessage msg =
18681871 -- the subscribed client var is read outside of STM to avoid transaction cost
18691872 -- in case no client is subscribed.
1870- getSubscribedClient rId (queueSubscribers subscribers)
1873+ getSubscribed
18711874 $>>= atomically . deliverToSub
18721875 >>= mapM_ forkDeliver
18731876 where
1877+ getSubscribed = case rcvServiceId qr of
1878+ Just serviceId -> getSubscribedClient serviceId $ serviceSubscribers subscribers
1879+ Nothing -> getSubscribedClient rId $ queueSubscribers subscribers
18741880 rId = recipientId q
18751881 deliverToSub rcv =
18761882 -- reading client TVar in the same transaction,
@@ -1879,6 +1885,7 @@ client
18791885 -- the new client will receive message in response to SUB.
18801886 readTVar rcv
18811887 $>>= \ rc@ Client {subscriptions = subs, sndQ = sndQ'} -> TM. lookup rId subs
1888+ >>= maybe (newServiceDeliverySub subs) (pure . Just )
18821889 $>>= \ s@ Sub {subThread, delivered} -> case subThread of
18831890 ProhibitSub -> pure Nothing
18841891 ServerSub st -> readTVar st >>= \ case
@@ -1891,6 +1898,12 @@ client
18911898 (writeTVar st SubPending $> Just (rc, s, st))
18921899 (deliver sndQ' s $> Nothing )
18931900 _ -> pure Nothing
1901+ newServiceDeliverySub subs
1902+ | isJust (rcvServiceId qr) = do
1903+ sub <- newSubscription NoSub
1904+ TM. insert rId sub subs
1905+ pure $ Just sub
1906+ | otherwise = pure Nothing
18941907 deliver sndQ' s = do
18951908 let encMsg = encryptMsg qr msg
18961909 writeTBQueue sndQ' ([(NoCorrId , rId, MSG encMsg)], [] )
0 commit comments