@@ -1579,23 +1579,36 @@ client
15791579 pure ((corrId, rId, ERR $ CMD PROHIBITED ), Nothing )
15801580 _ -> do
15811581 incStat $ qSubDuplicate stats
1582- atomically (tryTakeTMVar $ delivered s) >> deliver False s
1582+ atomically (tryTakeTMVar $ delivered s) >> deliver False ( Just s)
15831583 where
15841584 rId = recipientId q
1585- deliver :: Bool -> Sub -> M s ResponseAndMessage
1586- deliver inc sub = do
1585+ deliver :: Bool -> Maybe Sub -> M s ResponseAndMessage
1586+ deliver inc sub_ = do
15871587 stats <- asks serverStats
15881588 fmap (either (\ e -> ((corrId, rId, ERR e), Nothing )) id ) $ liftIO $ runExceptT $ do
15891589 msg_ <- tryPeekMsg ms q
1590- mapM_ (\ msg -> atomically (setDelivered sub msg) >> when inc (incStat $ qSub stats)) msg_
1591- pure ((corrId, rId, SOK Nothing ), (rId,) . encryptMsg qr <$> msg_)
1590+ msg' <- forM msg_ $ \ msg -> do
1591+ sub <- maybe (atomically $ getSub rId) pure sub_
1592+ void $ atomically $ setDelivered sub msg
1593+ when inc $ incStat $ qSub stats
1594+ pure (rId, encryptMsg qr msg)
1595+ pure ((corrId, rId, SOK Nothing ), msg')
1596+
1597+ getSub :: RecipientId -> STM Sub
1598+ getSub rId =
1599+ TM. lookup rId subscriptions >>= \ case
1600+ Just sub -> pure sub
1601+ Nothing -> do
1602+ sub <- newSubscription NoSub
1603+ TM. insert rId sub subscriptions
1604+ pure sub
15921605
1593- subscribeQueue :: RecipientId -> QueueRec -> M s Sub
1594- subscribeQueue rId QueueRec {rcvServiceId} = atomically $ do
1595- writeTQueue (subQ subscribers) ( CSClient rId rcvServiceId Nothing , clientId)
1596- sub <- newSubscription NoSub
1597- TM. insert rId sub subscriptions
1598- pure sub
1606+ subscribeQueue :: RecipientId -> QueueRec -> M s ( Maybe Sub )
1607+ subscribeQueue rId QueueRec {rcvServiceId} = do
1608+ sub <- atomically $ newSubscription NoSub
1609+ atomically $ TM. insert rId sub subscriptions
1610+ atomically $ writeTQueue (subQ subscribers) ( CSClient rId rcvServiceId Nothing , clientId)
1611+ pure $ Just sub
15991612
16001613 -- clients that use GET are not added to server subscribers
16011614 getMessage :: StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg )
@@ -1653,58 +1666,75 @@ client
16531666 else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)
16541667
16551668 subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
1656- subscribeNotifications q NtfCreds {ntfServiceId} = do
1669+ subscribeNotifications q NtfCreds {ntfServiceId} =
1670+ sharedSubscribeQueue SNotifierService q ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure () ) ntfServices >>= \ case
1671+ Left e -> pure $ ERR e
1672+ Right (hasSub, _, serviceId) -> do
1673+ when (isNothing serviceId) $
1674+ asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
1675+ pure $ SOK serviceId
1676+
1677+ sharedSubscribeQueue ::
1678+ (PartyI p , ServiceParty p ) =>
1679+ SParty p ->
1680+ StoreQueue s ->
1681+ Maybe ServiceId ->
1682+ ServerSubscribers s ->
1683+ TMap QueueId sub ->
1684+ TVar Int64 ->
1685+ STM sub ->
1686+ (ServerStats -> ServiceStats ) ->
1687+ M s (Either ErrorType (Bool , Maybe sub , Maybe ServiceId ))
1688+ sharedSubscribeQueue party q queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
16571689 stats <- asks serverStats
1658- let incNtfSrvStat sel = incStat $ sel $ ntfServices stats
1690+ let incSrvStat sel = incStat $ sel $ servicesSel stats
16591691 case service of
16601692 Just THClientService {serviceId}
1661- | ntfServiceId == Just serviceId -> do
1693+ | queueServiceId == Just serviceId -> do
16621694 -- duplicate queue-service association - can only happen in case of response error/timeout
16631695 hasSub <- atomically $ ifM hasServiceSub (pure True ) (False <$ newServiceQueueSub)
16641696 unless hasSub $ do
1665- incNtfSrvStat srvSubCount
1666- incNtfSrvStat srvSubQueues
1667- incNtfSrvStat srvAssocDuplicate
1668- pure $ SOK $ Just serviceId
1669- | otherwise ->
1670- -- new or updated queue-service association
1671- liftIO (setQueueService (queueStore ms) q SNotifierService ( Just serviceId)) >>= \ case
1672- Left e -> pure $ ERR e
1673- Right () -> do
1674- hasSub <- atomically $ ( <$ newServiceQueueSub) =<< hasServiceSub
1675- unless hasSub $ incNtfSrvStat srvSubCount
1676- incNtfSrvStat srvSubQueues
1677- incNtfSrvStat $ maybe srvAssocNew (const srvAssocUpdated) ntfServiceId
1678- pure $ SOK $ Just serviceId
1697+ incSrvStat srvSubCount
1698+ incSrvStat srvSubQueues
1699+ incSrvStat srvAssocDuplicate
1700+ pure $ Right (hasSub, Nothing , Just serviceId)
1701+ | otherwise -> do
1702+ runExceptT $ do
1703+ -- new or updated queue-service association
1704+ ExceptT $ liftIO $ setQueueService (queueStore ms) q party ( Just serviceId)
1705+ hasSub <- atomically $ ( <$ newServiceQueueSub) =<< hasServiceSub
1706+ lift $ do
1707+ unless hasSub $ incSrvStat srvSubCount
1708+ incSrvStat srvSubQueues
1709+ incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
1710+ pure (hasSub, Nothing , Just serviceId)
16791711 where
1680- hasServiceSub = (0 /= ) <$> readTVar ntfServiceSubsCount
1712+ hasServiceSub = (0 /= ) <$> readTVar clientServiceSubs
16811713 -- This function is used when queue is associated with the service.
16821714 newServiceQueueSub = do
1683- writeTQueue (subQ ntfSubscribers) (CSClient entId ntfServiceId (Just serviceId), clientId)
1684- modifyTVar' ntfServiceSubsCount (+ 1 ) -- service count
1685- modifyTVar' (totalServiceSubs ntfSubscribers) (+ 1 ) -- server count for all services
1686- Nothing -> case ntfServiceId of
1687- Just _ ->
1688- liftIO (setQueueService (queueStore ms) q SNotifierService Nothing ) >>= \ case
1689- Left e -> pure $ ERR e
1690- Right () -> do
1691- -- hasSubscription should never be True in this branch, because queue was associated with service.
1692- -- So unless storage and session states diverge, this check is redundant.
1693- hasSub <- atomically $ hasSubscription >>= newSub
1694- incNtfSrvStat srvAssocRemoved
1695- sok hasSub
1696- Nothing -> do
1697- hasSub <- atomically $ ifM hasSubscription (pure True ) (newSub False )
1698- sok hasSub
1715+ writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId (Just serviceId), clientId)
1716+ modifyTVar' clientServiceSubs (+ 1 ) -- service count
1717+ modifyTVar' (totalServiceSubs srvSubscribers) (+ 1 ) -- server count for all services
1718+ Nothing -> case queueServiceId of
1719+ Just _ -> runExceptT $ do
1720+ -- getSubscription should never be Just in this branch, because queue was associated with service.
1721+ -- So unless storage and session states diverge, this check is redundant.
1722+ ExceptT $ liftIO $ setQueueService (queueStore ms) q party Nothing
1723+ lift $ incSrvStat srvAssocRemoved
1724+ atomically $ getSubscription >>= newSub
1725+ Nothing ->
1726+ atomically $ fmap Right $
1727+ getSubscription >>= maybe (newSub Nothing ) (\ sub -> pure (True , Just sub, Nothing ))
16991728 where
1700- hasSubscription = TM. member entId ntfSubscriptions
1701- newSub hasSub = do
1702- writeTQueue (subQ ntfSubscribers) (CSClient entId ntfServiceId Nothing , clientId)
1703- unless (hasSub) $ TM. insert entId () ntfSubscriptions
1704- pure hasSub
1705- sok hasSub = do
1706- incStat $ if hasSub then ntfSubDuplicate stats else ntfSub stats
1707- pure $ SOK Nothing
1729+ getSubscription = TM. lookup entId clientSubs
1730+ newSub sub_ = do
1731+ writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId Nothing , clientId)
1732+ case sub_ of
1733+ Just sub -> pure (True , Just sub, Nothing )
1734+ Nothing -> do
1735+ sub <- mkSub
1736+ TM. insert entId sub clientSubs
1737+ pure (False , Just sub, Nothing )
17081738
17091739 subscribeServiceNotifications :: THPeerClientService -> M s BrokerMsg
17101740 subscribeServiceNotifications THClientService {serviceId} = do
0 commit comments