Skip to content

Commit 0854fe1

Browse files
authored
Ensure subject is completed when unsubscribed (#1135)
1 parent 911c4a4 commit 0854fe1

File tree

2 files changed

+44
-29
lines changed

2 files changed

+44
-29
lines changed

.changeset/afraid-glasses-cross.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@livekit/components-core": patch
3+
---
4+
5+
Ensure subject is completed when unsubscribed

packages/core/src/observables/room.ts

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { Participant, TrackPublication } from 'livekit-client';
22
import { LocalParticipant, Room, RoomEvent, Track } from 'livekit-client';
33
import type { Subscriber, Subscription } from 'rxjs';
4-
import { concat, filter, finalize, map, Observable, startWith, Subject } from 'rxjs';
4+
import { concat, filter, map, Observable, startWith } from 'rxjs';
55
// @ts-ignore some module resolutions (other than 'node') choke on this
66
import type { RoomEventCallbacks } from 'livekit-client/dist/src/room/Room';
77
import { log } from '../logger';
@@ -182,38 +182,48 @@ export function createMediaDeviceObserver(
182182
onError?: (e: Error) => void,
183183
requestPermissions = true,
184184
) {
185-
const onDeviceChange = async () => {
186-
try {
187-
const newDevices = await Room.getLocalDevices(kind, requestPermissions);
188-
deviceSubject.next(newDevices);
189-
} catch (e: any) {
190-
onError?.(e);
185+
// Initial devices fetch observable
186+
const initialDevices$ = new Observable<MediaDeviceInfo[]>((subscriber) => {
187+
Room.getLocalDevices(kind, requestPermissions)
188+
.then((devices) => {
189+
subscriber.next(devices);
190+
subscriber.complete();
191+
})
192+
.catch((e) => {
193+
onError?.(e);
194+
subscriber.next([] as MediaDeviceInfo[]);
195+
subscriber.complete();
196+
});
197+
});
198+
199+
// Device change observable
200+
const deviceChanges$ = new Observable<MediaDeviceInfo[]>((subscriber) => {
201+
const onDeviceChange = async () => {
202+
try {
203+
const newDevices = await Room.getLocalDevices(kind, requestPermissions);
204+
subscriber.next(newDevices);
205+
} catch (e: any) {
206+
onError?.(e);
207+
}
208+
};
209+
210+
if (typeof window !== 'undefined') {
211+
if (!window.isSecureContext) {
212+
throw new Error(
213+
`Accessing media devices is available only in secure contexts (HTTPS and localhost), in some or all supporting browsers. See: https://developer.mozilla.org/en-US/docs/Web/API/Navigator/mediaDevices`,
214+
);
215+
}
216+
navigator?.mediaDevices?.addEventListener('devicechange', onDeviceChange);
191217
}
192-
};
193-
const deviceSubject = new Subject<MediaDeviceInfo[]>();
194218

195-
const observable = deviceSubject.pipe(
196-
finalize(() => {
219+
// Return unsubscribe function that cleans up when the observable is unsubscribed
220+
return () => {
197221
navigator?.mediaDevices?.removeEventListener('devicechange', onDeviceChange);
198-
}),
199-
);
222+
};
223+
});
200224

201-
if (typeof window !== 'undefined') {
202-
if (!window.isSecureContext) {
203-
throw new Error(
204-
`Accessing media devices is available only in secure contexts (HTTPS and localhost), in some or all supporting browsers. See: https://developer.mozilla.org/en-US/docs/Web/API/Navigator/mediaDevices`,
205-
);
206-
}
207-
navigator?.mediaDevices?.addEventListener('devicechange', onDeviceChange);
208-
}
209-
// because we rely on an async function, concat the promise to retrieve the initial values with the observable
210-
return concat(
211-
Room.getLocalDevices(kind, requestPermissions).catch((e) => {
212-
onError?.(e);
213-
return [] as MediaDeviceInfo[];
214-
}),
215-
observable,
216-
);
225+
// Combine both observables
226+
return concat(initialDevices$, deviceChanges$);
217227
}
218228

219229
export function createDataObserver(room: Room) {

0 commit comments

Comments
 (0)