Skip to content

Commit ba652f1

Browse files
authored
Pre-connect final API (#685)
1 parent 2530485 commit ba652f1

File tree

15 files changed

+614
-251
lines changed

15 files changed

+614
-251
lines changed

Sources/LiveKit/Core/PreConnectAudioBuffer.swift

Lines changed: 102 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,63 +17,93 @@
1717
import AVFAudio
1818
import Foundation
1919

20-
/// A buffer that captures audio before connecting to the server,
21-
/// and sends it on certain ``RoomDelegate`` events.
20+
/// A buffer that captures audio before connecting to the server.
2221
@objc
23-
public final class PreConnectAudioBuffer: NSObject, Loggable {
24-
/// The default participant attribute key used to indicate that the audio buffer is active.
25-
@objc
26-
public static let attributeKey = "lk.agent.pre-connect-audio"
22+
public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable {
23+
public typealias OnError = @Sendable (Error) -> Void
24+
25+
public enum Constants {
26+
public static let maxSize = 10 * 1024 * 1024 // 10MB
27+
public static let sampleRate = 24000
28+
public static let timeout: TimeInterval = 10
29+
}
2730

2831
/// The default data topic used to send the audio buffer.
2932
@objc
3033
public static let dataTopic = "lk.agent.pre-connect-audio-buffer"
3134

32-
/// The room instance to listen for events.
35+
/// The room instance to send the audio buffer to.
3336
@objc
34-
public let room: Room?
37+
public var room: Room? { state.room }
3538

3639
/// The audio recorder instance.
3740
@objc
38-
public let recorder: LocalAudioTrackRecorder
41+
public var recorder: LocalAudioTrackRecorder? { state.recorder }
3942

4043
private let state = StateSync<State>(State())
4144
private struct State {
45+
weak var room: Room?
46+
var recorder: LocalAudioTrackRecorder?
4247
var audioStream: LocalAudioTrackRecorder.Stream?
48+
var timeoutTask: Task<Void, Error>?
49+
var sent: Bool = false
50+
var onError: OnError? = nil
4351
}
4452

4553
/// Initialize the audio buffer with a room instance.
4654
/// - Parameters:
47-
/// - room: The room instance to listen for events.
48-
/// - recorder: The audio recorder to use for capturing.
55+
/// - room: The room instance to send the audio buffer to.
56+
/// - onError: The error handler to call when an error occurs while sending the audio buffer.
4957
@objc
50-
public init(room: Room?,
51-
recorder: LocalAudioTrackRecorder = LocalAudioTrackRecorder(
52-
track: LocalAudioTrack.createTrack(),
53-
format: .pcmFormatInt16, // supported by agent plugins
54-
sampleRate: 24000, // supported by agent plugins
55-
maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB
56-
))
57-
{
58-
self.room = room
59-
self.recorder = recorder
58+
public init(room: Room?, onError: OnError? = nil) {
59+
state.mutate {
60+
$0.room = room
61+
$0.onError = onError
62+
}
6063
super.init()
6164
}
6265

6366
deinit {
6467
stopRecording()
65-
room?.remove(delegate: self)
6668
}
6769

68-
/// Start capturing audio and listening to ``RoomDelegate`` events.
6970
@objc
70-
public func startRecording() async throws {
71+
public func setErrorHandler(_ onError: OnError?) {
72+
state.mutate { $0.onError = onError }
73+
}
74+
75+
/// Start capturing audio.
76+
/// - Parameters:
77+
/// - timeout: The timeout for the remote participant to subscribe to the audio track.
78+
/// The room connection needs to be established and the remote participant needs to subscribe to the audio track
79+
/// before the timeout is reached. Otherwise, the audio stream will be flushed without sending.
80+
/// - recorder: Optional custom recorder instance. If not provided, a new one will be created.
81+
@objc
82+
public func startRecording(timeout: TimeInterval = Constants.timeout, recorder: LocalAudioTrackRecorder? = nil) async throws {
7183
room?.add(delegate: self)
7284

73-
let stream = try await recorder.start()
85+
let roomOptions = room?._state.roomOptions
86+
let newRecorder = recorder ?? LocalAudioTrackRecorder(
87+
track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions,
88+
reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false),
89+
format: .pcmFormatInt16,
90+
sampleRate: Constants.sampleRate,
91+
maxSize: Constants.maxSize
92+
)
93+
94+
let stream = try await newRecorder.start()
7495
log("Started capturing audio", .info)
96+
97+
state.timeoutTask?.cancel()
7598
state.mutate { state in
99+
state.recorder = newRecorder
76100
state.audioStream = stream
101+
state.timeoutTask = Task { [weak self] in
102+
try await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC)
103+
try Task.checkCancellation()
104+
self?.stopRecording(flush: true)
105+
}
106+
state.sent = false
77107
}
78108
}
79109

@@ -82,66 +112,79 @@ public final class PreConnectAudioBuffer: NSObject, Loggable {
82112
/// - flush: If `true`, the audio stream will be flushed immediately without sending.
83113
@objc
84114
public func stopRecording(flush: Bool = false) {
115+
guard let recorder, recorder.isRecording else { return }
116+
85117
recorder.stop()
86118
log("Stopped capturing audio", .info)
119+
87120
if flush, let stream = state.audioStream {
121+
log("Flushing audio stream", .info)
88122
Task {
89123
for await _ in stream {}
90124
}
125+
room?.remove(delegate: self)
91126
}
92127
}
93-
}
94-
95-
// MARK: - RoomDelegate
96-
97-
extension PreConnectAudioBuffer: RoomDelegate {
98-
public func roomDidConnect(_ room: Room) {
99-
Task {
100-
try? await setParticipantAttribute(room: room)
101-
}
102-
}
103-
104-
public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) {
105-
stopRecording()
106-
Task {
107-
try? await sendAudioData(to: room)
108-
}
109-
}
110-
111-
/// Set the participant attribute to indicate that the audio buffer is active.
112-
/// - Parameters:
113-
/// - key: The key to set the attribute.
114-
/// - room: The room instance to set the attribute.
115-
@objc
116-
public func setParticipantAttribute(key _: String = attributeKey, room: Room) async throws {
117-
var attributes = room.localParticipant.attributes
118-
attributes[Self.attributeKey] = "true"
119-
try await room.localParticipant.set(attributes: attributes)
120-
log("Set participant attribute", .info)
121-
}
122128

123129
/// Send the audio data to the room.
124130
/// - Parameters:
125131
/// - room: The room instance to send the audio data.
132+
/// - agents: The agents to send the audio data to.
126133
/// - topic: The topic to send the audio data.
127134
@objc
128-
public func sendAudioData(to room: Room, on topic: String = dataTopic) async throws {
135+
public func sendAudioData(to room: Room, agents: [Participant.Identity], on topic: String = dataTopic) async throws {
136+
guard !agents.isEmpty else { return }
137+
138+
guard !state.sent else { return }
139+
state.mutate { $0.sent = true }
140+
141+
guard let recorder else {
142+
throw LiveKitError(.invalidState, message: "Recorder is nil")
143+
}
144+
129145
guard let audioStream = state.audioStream else {
130146
throw LiveKitError(.invalidState, message: "Audio stream is nil")
131147
}
132148

149+
let audioData = try await audioStream.collect()
150+
guard audioData.count > 1024 else {
151+
throw LiveKitError(.unknown, message: "Audio data size too small, nothing to send")
152+
}
153+
133154
let streamOptions = StreamByteOptions(
134155
topic: topic,
135156
attributes: [
136157
"sampleRate": "\(recorder.sampleRate)",
137158
"channels": "\(recorder.channels)",
138-
]
159+
"trackId": recorder.track.sid?.stringValue ?? "",
160+
],
161+
destinationIdentities: agents,
162+
totalSize: audioData.count
139163
)
140164
let writer = try await room.localParticipant.streamBytes(options: streamOptions)
141-
try await writer.write(audioStream.collect())
165+
try await writer.write(audioData)
142166
try await writer.close()
143-
log("Sent audio data", .info)
167+
log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agents.count) agent(s) \(agents)", .info)
168+
}
169+
}
144170

145-
room.remove(delegate: self)
171+
extension PreConnectAudioBuffer: RoomDelegate {
172+
public func room(_: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) {
173+
log("Subscribed by remote participant, stopping audio", .info)
174+
stopRecording()
175+
}
176+
177+
public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) {
178+
guard participant.kind == .agent, state == .active, let agent = participant.identity else { return }
179+
log("Detected active agent participant: \(agent), sending audio", .info)
180+
181+
Task {
182+
do {
183+
try await sendAudioData(to: room, agents: [agent])
184+
} catch {
185+
log("Unable to send preconnect audio: \(error)", .error)
186+
self.state.onError?(error)
187+
}
188+
}
146189
}
147190
}

Sources/LiveKit/Core/Room+PreConnect.swift

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,51 @@
1717
import Foundation
1818

1919
public extension Room {
20-
/// Start capturing audio before connecting to the server,
21-
/// so that it's not lost when the connection is established.
22-
/// It will be automatically sent via data stream to the other participant
23-
/// using the `PreConnectAudioBuffer.dataTopic` when the local track is subscribed.
20+
/// Starts a pre-connect audio sequence that will automatically be cleaned up
21+
/// when the operation fails.
22+
///
23+
/// - Parameters:
24+
/// - timeout: The timeout for the remote participant to subscribe to the audio track.
25+
/// The room connection needs to be established and the remote participant needs to subscribe to the audio track
26+
/// before the timeout is reached. Otherwise, the audio stream will be flushed without sending.
27+
/// - operation: The operation to perform while audio is being captured.
28+
/// - onError: The error handler to call when an error occurs while sending the audio buffer.
29+
/// - Returns: The result of the operation.
30+
///
31+
/// - Example:
32+
/// ```swift
33+
/// try await room.withPreConnectAudio {
34+
/// // Audio is being captured automatically
35+
/// // Perform any other (async) setup here
36+
/// guard let connectionDetails = try await tokenService.fetchConnectionDetails(roomName: roomName, participantName: participantName) else {
37+
/// return
38+
/// }
39+
/// try await room.connect(url: connectionDetails.serverUrl, token: connectionDetails.participantToken)
40+
/// } onError: { error in
41+
/// print("Error sending audio buffer: \(error)")
42+
/// }
43+
/// ```
44+
///
2445
/// - See: ``PreConnectAudioBuffer``
25-
/// - Note: Use ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` to request microphone permissions early.
26-
func startCapturingBeforeConnecting() async throws {
27-
try await preConnectBuffer.startRecording()
46+
/// - Important: Call ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` during app launch sequence to request microphone permissions early.
47+
///
48+
func withPreConnectAudio<T>(timeout: TimeInterval = 10,
49+
_ operation: @Sendable @escaping () async throws -> T,
50+
onError: PreConnectAudioBuffer.OnError? = nil) async throws -> T
51+
{
52+
preConnectBuffer.setErrorHandler(onError)
53+
try await preConnectBuffer.startRecording(timeout: timeout)
54+
55+
do {
56+
return try await operation()
57+
} catch {
58+
preConnectBuffer.stopRecording(flush: true)
59+
throw error
60+
}
61+
}
62+
63+
@available(*, deprecated, message: "Use withPreConnectAudio instead")
64+
func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws {
65+
try await preConnectBuffer.startRecording(timeout: timeout)
2866
}
2967
}

Sources/LiveKit/Core/Room.swift

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -354,20 +354,30 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
354354
let enableMicrophone = _state.connectOptions.enableMicrophone
355355
log("Concurrent enable microphone mode: \(enableMicrophone)")
356356

357-
let createMicrophoneTrackTask: Task<LocalTrack, any Error>? = enableMicrophone ? Task {
358-
let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions,
359-
reportStatistics: _state.roomOptions.reportRemoteTrackStatistics)
360-
// Initializes AudioDeviceModule's recording
361-
try await localTrack.start()
362-
return localTrack
363-
} : nil
357+
let createMicrophoneTrackTask: Task<LocalTrack, any Error>? = {
358+
if let recorder = preConnectBuffer.recorder, recorder.isRecording {
359+
return Task {
360+
recorder.track
361+
}
362+
} else if enableMicrophone {
363+
return Task {
364+
let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions,
365+
reportStatistics: _state.roomOptions.reportRemoteTrackStatistics)
366+
// Initializes AudioDeviceModule's recording
367+
try await localTrack.start()
368+
return localTrack
369+
}
370+
} else {
371+
return nil
372+
}
373+
}()
364374

365375
do {
366376
try await fullConnectSequence(url, token)
367377

368378
if let createMicrophoneTrackTask, !createMicrophoneTrackTask.isCancelled {
369379
let track = try await createMicrophoneTrackTask.value
370-
try await localParticipant._publish(track: track)
380+
try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect(preConnectBuffer.recorder?.isRecording ?? false))
371381
}
372382

373383
// Connect sequence successful
@@ -437,10 +447,6 @@ extension Room {
437447
e2eeManager.cleanUp()
438448
}
439449

440-
if disconnectError != nil {
441-
preConnectBuffer.stopRecording(flush: true)
442-
}
443-
444450
// Reset state
445451
_state.mutate {
446452
// if isFullReconnect, keep connection related states

Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,16 @@ public extension AVAudioPCMBuffer {
167167
}
168168
}
169169
}
170+
171+
extension AVAudioCommonFormat {
172+
var bytesPerSample: Int {
173+
switch self {
174+
case .pcmFormatInt16: return 2
175+
case .pcmFormatInt32: return 4
176+
case .pcmFormatFloat32: return 4
177+
case .pcmFormatFloat64: return 8
178+
case .otherFormat: return 0
179+
@unknown default: return 0
180+
}
181+
}
182+
}

0 commit comments

Comments
 (0)