diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs index 4dfc3e7da..86b9c36e3 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs @@ -9,5 +9,6 @@ public interface IStreamChannel Task ConnectAsync(string conversationId); Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellation); Task SendAsync(byte[] data, CancellationToken cancellation); + void ClearBuffer(); Task CloseAsync(StreamChannelStatus status, string description, CancellationToken cancellation); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs index 6201967a9..b3132df71 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs @@ -9,6 +9,7 @@ public class RealtimeHubConnection public string KeypadInputBuffer { get; set; } = string.Empty; public string CurrentAgentId { get; set; } = null!; public string ConversationId { get; set; } = null!; + public Func OnModelReady { get; set; } = () => string.Empty; public Func OnModelMessageReceived { get; set; } = null!; public Func OnModelAudioResponseDone { get; set; } = null!; public Func OnModelUserInterrupted { get; set; } = null!; diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs index 25bdbf2e0..294d77d7e 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs @@ -56,8 +56,10 @@ await _completer.Connect(_conn, { // Not TriggerModelInference, waiting for user utter. var instruction = await _completer.UpdateSession(_conn); - + var data = _conn.OnModelReady(); + await responseToUser(data); await HookEmitter.Emit(_services, async hook => await hook.OnModeReady(agent, _completer)); + }, onModelAudioDeltaReceived: async (audioDeltaData, itemId) => { diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs b/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs index ca8f3eeb7..4b194efbe 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs @@ -1,7 +1,6 @@ +using System.Collections.Concurrent; using BotSharp.Abstraction.Realtime.Enums; using NAudio.Wave; -using System.Collections.Concurrent; -using System.IO; namespace BotSharp.Core.Realtime.Services; @@ -11,7 +10,7 @@ public class WaveStreamChannel : IStreamChannel private WaveInEvent _waveIn; private WaveOutEvent _waveOut; private BufferedWaveProvider _bufferedWaveProvider; - private readonly ConcurrentQueue _audioBufferQueue = new ConcurrentQueue(); + private readonly ConcurrentQueue _audioBufferQueue = []; private readonly ILogger _logger; public WaveStreamChannel(IServiceProvider services, ILogger logger) @@ -39,7 +38,7 @@ public async Task ConnectAsync(string conversationId) // Initialize audio output for streaming var waveFormat = new WaveFormat(24000, 16, 1); // 24000 Hz, 16-bit PCM, Mono _bufferedWaveProvider = new BufferedWaveProvider(waveFormat); - _bufferedWaveProvider.BufferLength = 1024 * 512; // Buffer length + _bufferedWaveProvider.BufferLength = 1024 * 1024; // Buffer length _bufferedWaveProvider.DiscardOnBufferOverflow = true; _waveOut = new WaveOutEvent(); @@ -83,6 +82,12 @@ public Task SendAsync(byte[] data, CancellationToken cancellation) return Task.CompletedTask; } + public void ClearBuffer() + { + _bufferedWaveProvider?.ClearBuffer(); + _audioBufferQueue?.Clear(); + } + private void WaveIn_DataAvailable(object? sender, WaveInEventArgs e) { // Add the buffer to the queue diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs index 513d5f93e..f4f3120f0 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs @@ -15,6 +15,7 @@ public class RealTimeCompletionProvider : IRealTimeCompletion protected readonly OpenAiSettings _settings; protected readonly IServiceProvider _services; protected readonly ILogger _logger; + private readonly BotSharpOptions _options; protected string _model = "gpt-4o-mini-realtime-preview"; private ClientWebSocket _webSocket; @@ -22,11 +23,13 @@ public class RealTimeCompletionProvider : IRealTimeCompletion public RealTimeCompletionProvider( OpenAiSettings settings, ILogger logger, - IServiceProvider services) + IServiceProvider services, + BotSharpOptions options) { _settings = settings; _logger = logger; _services = services; + _options = options; } public async Task Connect(RealtimeHubConnection conn, @@ -45,6 +48,7 @@ public async Task Connect(RealtimeHubConnection conn, var settingsService = _services.GetRequiredService(); var settings = settingsService.GetSetting(Provider, _model); + _webSocket?.Dispose(); _webSocket = new ClientWebSocket(); _webSocket.Options.SetRequestHeader("Authorization", $"Bearer {settings.ApiKey}"); _webSocket.Options.SetRequestHeader("OpenAI-Beta", "realtime=v1"); @@ -141,7 +145,7 @@ private async Task ReceiveMessage(RealtimeHubConnection conn, Action onUserAudioTranscriptionCompleted, Action onInterruptionDetected) { - var buffer = new byte[1024 * 32]; + var buffer = new byte[1024 * 1024 * 32]; // Model response timeout var settings = _services.GetRequiredService(); var timeout = settings.ModelResponseTimeout; @@ -259,7 +263,7 @@ public async Task SendEventToModel(object message) if (message is not string data) { - data = JsonSerializer.Serialize(message, BotSharpOptions.defaultJsonOptions); + data = JsonSerializer.Serialize(message, _options.JsonSerializerOptions); } var buffer = Encoding.UTF8.GetBytes(data); @@ -274,12 +278,9 @@ public async Task UpdateSession(RealtimeHubConnection conn) var agentService = _services.GetRequiredService(); var agent = await agentService.LoadAgent(conn.CurrentAgentId); - - var client = ProviderHelper.GetClient(Provider, _model, _services); - var chatClient = client.GetChatClient(_model); var (prompt, messages, options) = PrepareOptions(agent, []); - var instruction = messages.FirstOrDefault()?.Content.FirstOrDefault()?.Text ?? agent.Description; + var instruction = messages.FirstOrDefault()?.Content.FirstOrDefault()?.Text ?? agent?.Description ?? string.Empty; var functions = options.Tools.Select(x => { var fn = new FunctionDef diff --git a/tests/BotSharp.Test.RealtimeVoice/Program.cs b/tests/BotSharp.Test.RealtimeVoice/Program.cs index be45e079c..1c03d21dd 100644 --- a/tests/BotSharp.Test.RealtimeVoice/Program.cs +++ b/tests/BotSharp.Test.RealtimeVoice/Program.cs @@ -22,22 +22,14 @@ await channel.ConnectAsync(conv.Id); -var settings = services.GetRequiredService(); var hub = services.GetRequiredService(); var conn = hub.SetHubConnection(conv.Id); -await hub.ConnectToModel(async data => -{ - var response = JsonSerializer.Deserialize(data); - if (response.Event == "media") +conn.OnModelReady = () => + JsonSerializer.Serialize(new { - var message = JsonSerializer.Deserialize(data); - await channel.SendAsync(Convert.FromBase64String(message.Media), CancellationToken.None); - } -}); - -StreamReceiveResult result; -var buffer = new byte[1024 * 8]; + @event = "init" + }); conn.OnModelMessageReceived = message => JsonSerializer.Serialize(new @@ -59,6 +51,23 @@ await hub.ConnectToModel(async data => @event = "clear" }); +await hub.ConnectToModel(async data => +{ + var response = JsonSerializer.Deserialize(data); + if (response.Event == "clear") + { + channel.ClearBuffer(); + } + else if (response.Event == "media") + { + var message = JsonSerializer.Deserialize(data); + await channel.SendAsync(Convert.FromBase64String(message.Media), CancellationToken.None); + } +}); + +StreamReceiveResult result; +var buffer = new byte[1024 * 8]; + do { var seg = new ArraySegment(buffer); @@ -74,29 +83,44 @@ await hub.ConnectToModel(async data => int CalculateAudioLevel(byte[] buffer, int bytesRecorded) { // Simple audio level calculation (RMS) - int sum = 0; + int bytesPerSample = 2; // 16-bit PCM = 2 bytes per sample + int sampleCount = bytesRecorded / bytesPerSample; + if (sampleCount == 0) return 0; + + double sum = 0; for (int i = 0; i < bytesRecorded; i += 2) { if (i + 1 < bytesRecorded) { short sample = (short)((buffer[i + 1] << 8) | buffer[i]); - sum += Math.Abs(sample); + double normalized = sample / (short.MaxValue * 1.0 + 1); + sum += normalized * normalized; } } - return bytesRecorded > 0 ? sum / (bytesRecorded / 2) : 0; + + double rms = Math.Sqrt(sum / sampleCount); + double db = 20 * Math.Log10(rms); + + if (double.IsInfinity(db) || double.IsNaN(db)) + { + return 0; + } + + db = Math.Clamp(db, -100, 0); + return (int)((db + 100) * 1); } void DisplayAudioLevel(int level) { + const int sep = 50; // Normalize level to 0-50 range for display - int displayLevel = Math.Min(50, level / 100); + int displayLevel = (level * sep) / 100; // Clear the current line Console.Write("\r" + new string(' ', 60)); // Display audio level as a bar Console.Write("\rMicrophone: ["); - Console.Write(new string('#', displayLevel)); - Console.Write(new string(' ', 50 - displayLevel)); + Console.Write(new string('#', displayLevel).PadRight(sep, ' ')); Console.Write("]"); }