@@ -31,6 +31,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
3131 @Injected ( \. internetConnectionObserver) private var internetConnectionObserver
3232
3333 private let disposableBag = DisposableBag ( )
34+ private var updateSubscriptionsAdapter : WebRTCUpdateSubscriptionsAdapter ?
3435
3536 /// Initializes a new instance of `JoinedStage`.
3637 /// - Parameter context: The context for the joined stage.
@@ -97,10 +98,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
9798
9899 try Task . checkCancellation ( )
99100
100- await observeForSubscriptionUpdates ( )
101-
102- try Task . checkCancellation ( )
103-
104101 await observeConnection ( )
105102
106103 try Task . checkCancellation ( )
@@ -121,10 +118,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
121118
122119 try Task . checkCancellation ( )
123120
124- await observeIncomingVideoQualitySettingsUpdates ( )
125-
126- try Task . checkCancellation ( )
127-
128121 await observeCallSettingsUpdates ( )
129122
130123 try Task . checkCancellation ( )
@@ -134,6 +127,10 @@ extension WebRTCCoordinator.StateMachine.Stage {
134127 try Task . checkCancellation ( )
135128
136129 await configureStatsCollectionAndDelivery ( )
130+
131+ try Task . checkCancellation ( )
132+
133+ await configureUpdateSubscriptions ( )
137134 } catch {
138135 await cleanUpPreviousSessionIfRequired ( )
139136 transitionDisconnectOrError ( error)
@@ -327,24 +324,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
327324 . store ( in: disposableBag)
328325 }
329326
330- /// Observes changes to the list of participants and triggers subscription
331- /// updates when the participant list changes. This ensures that subscriptions
332- /// are kept in sync with the current participants.
333- private func observeForSubscriptionUpdates( ) async {
334- guard
335- let stateAdapter = context. coordinator? . stateAdapter
336- else {
337- return
338- }
339-
340- await stateAdapter
341- . $participants
342- . removeDuplicates ( )
343- . log ( . debug) { " \( $0. count) Participants updated and we update subscriptions now. " }
344- . sinkTask ( storeIn: disposableBag) { [ weak self] _ in await self ? . updateSubscriptions ( ) }
345- . store ( in: disposableBag) // Store the Combine subscription in the disposable bag.
346- }
347-
348327 /// Observes updates to the `callSettings` and ensures that any changes are
349328 /// reflected in the publisher. This ensures that updates to audio, video, and
350329 /// audio output settings are applied correctly during a WebRTC session.
@@ -423,30 +402,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
423402 . store ( in: disposableBag)
424403 }
425404
426- /// Observes changes to the `incomingVideoQualitySettings` and triggers updates when
427- /// the settings change. It ensures that the video subscriptions are updated
428- /// accordingly and forces a participant update to refresh the UI.
429- private func observeIncomingVideoQualitySettingsUpdates( ) async {
430- guard
431- let stateAdapter = context. coordinator? . stateAdapter
432- else {
433- return
434- }
435-
436- await stateAdapter
437- . $incomingVideoQualitySettings
438- . removeDuplicates ( )
439- . log ( . debug) { " Incoming video quality settings updated \( $0) and we update subscriptions now. " }
440- . sinkTask ( storeIn: disposableBag) { [ weak self] _ in
441- guard let self else { return }
442-
443- await updateSubscriptions ( )
444- /// Force a participant update to ensure the UI reflects the new policy.
445- await stateAdapter. enqueue { $0 }
446- }
447- . store ( in: disposableBag)
448- }
449-
450405 /// Configures the collection and delivery of WebRTC statistics by setting up
451406 /// or updating the `WebRTCStatsReporter` for the current session. This ensures
452407 /// that statistics such as network quality, peer connection status, and SFU
@@ -527,58 +482,29 @@ extension WebRTCCoordinator.StateMachine.Stage {
527482 . store ( in: disposableBag)
528483 }
529484
530- // MARK: - Private helpers
531-
532- /// Updates the WebRTC subscriptions based on the current state, including the
533- /// incoming video policy and the list of participants. The method communicates
534- /// with the SFU (Selective Forwarding Unit) adapter to adjust track subscriptions.
485+ /// Configures the subscription adapter responsible for managing WebRTC
486+ /// track subscriptions.
535487 ///
536- /// - Throws: An error if the subscriptions cannot be updated or if the task
537- /// is cancelled during execution.
538- private func updateSubscriptions( ) async {
488+ /// This function initializes the `WebRTCUpdateSubscriptionsAdapter` using
489+ /// the current participants and incoming video quality settings. It ensures
490+ /// that subscription updates are properly set up for the active SFU adapter
491+ /// and session.
492+ private func configureUpdateSubscriptions( ) async {
539493 guard
540- let coordinator = context. coordinator,
541- let sfuAdapter = await coordinator . stateAdapter. sfuAdapter
494+ let stateAdapter = context. coordinator? . stateAdapter ,
495+ let sfuAdapter = await stateAdapter. sfuAdapter
542496 else {
543497 return
544498 }
545499
546- let incomingVideoQualitySettings = await coordinator
547- . stateAdapter
548- . incomingVideoQualitySettings
549-
550- let tracks = await WebRTCJoinRequestFactory ( )
551- . buildSubscriptionDetails (
552- nil ,
553- coordinator: coordinator,
554- incomingVideoQualitySettings: incomingVideoQualitySettings
555- )
556-
557- do {
558- try Task . checkCancellation ( )
559-
560- let participants = await coordinator. stateAdapter. participants
561-
562- log. debug (
563- """
564- Updating subscriptions for \( participants. count - 1 ) participants
565- with incomingVideoQualitySettings: \( incomingVideoQualitySettings) .
566- """ ,
567- subsystems: . webRTC
568- )
569-
570- try await sfuAdapter. updateSubscriptions (
571- tracks: tracks,
572- for: await coordinator. stateAdapter. sessionID
573- )
574- } catch {
575- log. warning (
576- """
577- UpdateSubscriptions failed with error: \( error) .
578- """ ,
579- subsystems: . webRTC
580- )
581- }
500+ updateSubscriptionsAdapter = . init(
501+ participantsPublisher: await stateAdapter. $participants. eraseToAnyPublisher ( ) ,
502+ incomingVideoQualitySettingsPublisher: await stateAdapter
503+ . $incomingVideoQualitySettings
504+ . eraseToAnyPublisher ( ) ,
505+ sfuAdapter: sfuAdapter,
506+ sessionID: await stateAdapter. sessionID
507+ )
582508 }
583509 }
584510}
0 commit comments