Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ public interface IStreamChannel
Task ConnectAsync(string conversationId);
Task<StreamReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellation);
Task SendAsync(byte[] data, CancellationToken cancellation);
void ClearBuffer();
Task CloseAsync(StreamChannelStatus status, string description, CancellationToken cancellation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class RealtimeHubConnection
public string KeypadInputBuffer { get; set; } = string.Empty;
public string CurrentAgentId { get; set; } = null!;
public string ConversationId { get; set; } = null!;
public Func<string> OnModelReady { get; set; } = () => string.Empty;
public Func<string, string> OnModelMessageReceived { get; set; } = null!;
public Func<string> OnModelAudioResponseDone { get; set; } = null!;
public Func<string> OnModelUserInterrupted { get; set; } = null!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ await _completer.Connect(_conn,
{
// Not TriggerModelInference, waiting for user utter.
var instruction = await _completer.UpdateSession(_conn);

var data = _conn.OnModelReady();
await responseToUser(data);
await HookEmitter.Emit<IRealtimeHook>(_services, async hook => await hook.OnModeReady(agent, _completer));

},
onModelAudioDeltaReceived: async (audioDeltaData, itemId) =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Collections.Concurrent;
using BotSharp.Abstraction.Realtime.Enums;
using NAudio.Wave;
using System.Collections.Concurrent;
using System.IO;

namespace BotSharp.Core.Realtime.Services;

Expand All @@ -11,7 +10,7 @@ public class WaveStreamChannel : IStreamChannel
private WaveInEvent _waveIn;
private WaveOutEvent _waveOut;
private BufferedWaveProvider _bufferedWaveProvider;
private readonly ConcurrentQueue<byte[]> _audioBufferQueue = new ConcurrentQueue<byte[]>();
private readonly ConcurrentQueue<byte[]> _audioBufferQueue = [];
private readonly ILogger _logger;

public WaveStreamChannel(IServiceProvider services, ILogger<WaveStreamChannel> logger)
Expand Down Expand Up @@ -39,7 +38,7 @@ public async Task ConnectAsync(string conversationId)
// Initialize audio output for streaming
var waveFormat = new WaveFormat(24000, 16, 1); // 24000 Hz, 16-bit PCM, Mono
_bufferedWaveProvider = new BufferedWaveProvider(waveFormat);
_bufferedWaveProvider.BufferLength = 1024 * 512; // Buffer length
_bufferedWaveProvider.BufferLength = 1024 * 1024; // Buffer length
_bufferedWaveProvider.DiscardOnBufferOverflow = true;

_waveOut = new WaveOutEvent();
Expand Down Expand Up @@ -83,6 +82,12 @@ public Task SendAsync(byte[] data, CancellationToken cancellation)
return Task.CompletedTask;
}

public void ClearBuffer()
{
_bufferedWaveProvider?.ClearBuffer();
_audioBufferQueue?.Clear();
}

private void WaveIn_DataAvailable(object? sender, WaveInEventArgs e)
{
// Add the buffer to the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ public class RealTimeCompletionProvider : IRealTimeCompletion
protected readonly OpenAiSettings _settings;
protected readonly IServiceProvider _services;
protected readonly ILogger<RealTimeCompletionProvider> _logger;
private readonly BotSharpOptions _options;

protected string _model = "gpt-4o-mini-realtime-preview";
private ClientWebSocket _webSocket;

public RealTimeCompletionProvider(
OpenAiSettings settings,
ILogger<RealTimeCompletionProvider> logger,
IServiceProvider services)
IServiceProvider services,
BotSharpOptions options)
{
_settings = settings;
_logger = logger;
_services = services;
_options = options;
}

public async Task Connect(RealtimeHubConnection conn,
Expand All @@ -45,6 +48,7 @@ public async Task Connect(RealtimeHubConnection conn,
var settingsService = _services.GetRequiredService<ILlmProviderService>();
var settings = settingsService.GetSetting(Provider, _model);

_webSocket?.Dispose();
_webSocket = new ClientWebSocket();
_webSocket.Options.SetRequestHeader("Authorization", $"Bearer {settings.ApiKey}");
_webSocket.Options.SetRequestHeader("OpenAI-Beta", "realtime=v1");
Expand Down Expand Up @@ -141,7 +145,7 @@ private async Task ReceiveMessage(RealtimeHubConnection conn,
Action<RoleDialogModel> onUserAudioTranscriptionCompleted,
Action onInterruptionDetected)
{
var buffer = new byte[1024 * 32];
var buffer = new byte[1024 * 1024 * 32];
// Model response timeout
var settings = _services.GetRequiredService<RealtimeModelSettings>();
var timeout = settings.ModelResponseTimeout;
Expand Down Expand Up @@ -259,7 +263,7 @@ public async Task SendEventToModel(object message)

if (message is not string data)
{
data = JsonSerializer.Serialize(message, BotSharpOptions.defaultJsonOptions);
data = JsonSerializer.Serialize(message, _options.JsonSerializerOptions);
}

var buffer = Encoding.UTF8.GetBytes(data);
Expand All @@ -274,12 +278,9 @@ public async Task<string> UpdateSession(RealtimeHubConnection conn)

var agentService = _services.GetRequiredService<IAgentService>();
var agent = await agentService.LoadAgent(conn.CurrentAgentId);

var client = ProviderHelper.GetClient(Provider, _model, _services);
var chatClient = client.GetChatClient(_model);
var (prompt, messages, options) = PrepareOptions(agent, []);

var instruction = messages.FirstOrDefault()?.Content.FirstOrDefault()?.Text ?? agent.Description;
var instruction = messages.FirstOrDefault()?.Content.FirstOrDefault()?.Text ?? agent?.Description ?? string.Empty;
var functions = options.Tools.Select(x =>
{
var fn = new FunctionDef
Expand Down
60 changes: 42 additions & 18 deletions tests/BotSharp.Test.RealtimeVoice/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,14 @@

await channel.ConnectAsync(conv.Id);

var settings = services.GetRequiredService<RealtimeModelSettings>();
var hub = services.GetRequiredService<IRealtimeHub>();
var conn = hub.SetHubConnection(conv.Id);

await hub.ConnectToModel(async data =>
{
var response = JsonSerializer.Deserialize<ModelResponseEvent>(data);
if (response.Event == "media")
conn.OnModelReady = () =>
JsonSerializer.Serialize(new
{
var message = JsonSerializer.Deserialize<ModelResponseMediaEvent>(data);
await channel.SendAsync(Convert.FromBase64String(message.Media), CancellationToken.None);
}
});

StreamReceiveResult result;
var buffer = new byte[1024 * 8];
@event = "init"
});

conn.OnModelMessageReceived = message =>
JsonSerializer.Serialize(new
Expand All @@ -59,6 +51,23 @@ await hub.ConnectToModel(async data =>
@event = "clear"
});

await hub.ConnectToModel(async data =>
{
var response = JsonSerializer.Deserialize<ModelResponseEvent>(data);
if (response.Event == "clear")
{
channel.ClearBuffer();
}
else if (response.Event == "media")
{
var message = JsonSerializer.Deserialize<ModelResponseMediaEvent>(data);
await channel.SendAsync(Convert.FromBase64String(message.Media), CancellationToken.None);
}
});

StreamReceiveResult result;
var buffer = new byte[1024 * 8];

do
{
var seg = new ArraySegment<byte>(buffer);
Expand All @@ -74,29 +83,44 @@ await hub.ConnectToModel(async data =>
int CalculateAudioLevel(byte[] buffer, int bytesRecorded)
{
// Simple audio level calculation (RMS)
int sum = 0;
int bytesPerSample = 2; // 16-bit PCM = 2 bytes per sample
int sampleCount = bytesRecorded / bytesPerSample;
if (sampleCount == 0) return 0;

double sum = 0;
for (int i = 0; i < bytesRecorded; i += 2)
{
if (i + 1 < bytesRecorded)
{
short sample = (short)((buffer[i + 1] << 8) | buffer[i]);
sum += Math.Abs(sample);
double normalized = sample / (short.MaxValue * 1.0 + 1);
sum += normalized * normalized;
}
}
return bytesRecorded > 0 ? sum / (bytesRecorded / 2) : 0;

double rms = Math.Sqrt(sum / sampleCount);
double db = 20 * Math.Log10(rms);

if (double.IsInfinity(db) || double.IsNaN(db))
{
return 0;
}

db = Math.Clamp(db, -100, 0);
return (int)((db + 100) * 1);
}

void DisplayAudioLevel(int level)
{
const int sep = 50;
// Normalize level to 0-50 range for display
int displayLevel = Math.Min(50, level / 100);
int displayLevel = (level * sep) / 100;

// Clear the current line
Console.Write("\r" + new string(' ', 60));

// Display audio level as a bar
Console.Write("\rMicrophone: [");
Console.Write(new string('#', displayLevel));
Console.Write(new string(' ', 50 - displayLevel));
Console.Write(new string('#', displayLevel).PadRight(sep, ' '));
Console.Write("]");
}
Loading