Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 153 additions & 33 deletions audio_streamer_glue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include <memory>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include "base64.h"

#define FRAME_SIZE_8000 320 /* 1000x0.02 (20ms)= 160 x(16bit= 2 bytes) 320 frame size*/
Expand All @@ -22,7 +26,8 @@ class AudioStreamer
bool suppressLog, const char *extra_headers, bool no_reconnect,
const char *tls_cafile, const char *tls_keyfile, const char *tls_certfile,
bool tls_disable_hostname_validation) : m_sessionId(uuid), m_notify(callback),
m_suppress_log(suppressLog), m_extra_headers(extra_headers), m_playFile(0)
m_suppress_log(suppressLog), m_extra_headers(extra_headers), m_playFile(0),
m_shutdown(false)
{

WebSocketHeaders hdrs;
Expand Down Expand Up @@ -134,8 +139,33 @@ class AudioStreamer
cJSON_Delete(root);
switch_safe_free(json_str); });

// Start the worker thread to process callback events
m_worker_thread = std::thread(&AudioStreamer::workerThread, this);

// Now that our callback is setup, we can start our background thread and receive messages
client.connect();
try
{
client.connect();
}
catch (...)
{
// If connect() throws, we must clean up the worker thread before
// the exception propagates, otherwise std::terminate() will be called
// when m_worker_thread's destructor runs on a joinable thread.
{
std::lock_guard<std::mutex> lock(m_queue_mutex);
m_shutdown = true;
}
m_queue_cv.notify_all();

if (m_worker_thread.joinable())
{
m_worker_thread.join();
}

// Re-throw the exception so the constructor fails properly
throw;
}
}

switch_media_bug_t *get_media_bug(switch_core_session_t *session)
Expand Down Expand Up @@ -177,38 +207,20 @@ class AudioStreamer

void eventCallback(notifyEvent_t event, const char *message)
{
switch_core_session_t *psession = switch_core_session_locate(m_sessionId.c_str());
if (psession)
// Enqueue event for processing by the worker thread
// This avoids blocking the event loop, enabling full-duplex communication
std::lock_guard<std::mutex> lock(m_queue_mutex);
if (m_shutdown.load())
{
switch (event)
{
case CONNECT_SUCCESS:
send_initial_metadata(psession);
m_notify(psession, EVENT_CONNECT, message);
break;
case CONNECTION_DROPPED:
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection closed\n");
m_notify(psession, EVENT_DISCONNECT, message);
break;
case CONNECT_ERROR:
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection error\n");
m_notify(psession, EVENT_ERROR, message);

media_bug_close(psession);

break;
case MESSAGE:
std::string msg(message);
if (processMessage(psession, msg) != SWITCH_TRUE)
{
m_notify(psession, EVENT_JSON, msg.c_str());
}
if (!m_suppress_log)
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "response: %s\n", msg.c_str());
break;
}
switch_core_session_rwunlock(psession);
return; // Don't queue events if we're shutting down
}

EventItem item;
item.event = event;
item.message = message ? std::string(message) : std::string();

m_event_queue.push(std::move(item));
m_queue_cv.notify_one();
}

switch_bool_t processMessage(switch_core_session_t *session, std::string &message)
Expand Down Expand Up @@ -388,7 +400,20 @@ class AudioStreamer
return status;
}

~AudioStreamer() = default;
~AudioStreamer()
{
// Signal shutdown and wait for worker thread to finish
{
std::lock_guard<std::mutex> lock(m_queue_mutex);
m_shutdown = true;
}
m_queue_cv.notify_all();

if (m_worker_thread.joinable())
{
m_worker_thread.join();
}
}

void disconnect()
{
Expand All @@ -405,6 +430,9 @@ class AudioStreamer
{
if (!this->isConnected())
return;
if (!m_suppress_log) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "request writeBinary len: %zu\n", len);
}
client.sendBinary(buffer, len);
}

Expand Down Expand Up @@ -439,6 +467,91 @@ class AudioStreamer
}

private:
// Event queue structure for worker thread
struct EventItem
{
notifyEvent_t event;
std::string message;
};

// Worker thread function that processes events from the queue
void workerThread()
{
while (true)
{
std::unique_lock<std::mutex> lock(m_queue_mutex);

// Wait for events or shutdown signal
m_queue_cv.wait(lock, [this] {
return !m_event_queue.empty() || m_shutdown.load();
});

// Check if we should shutdown
if (m_shutdown.load() && m_event_queue.empty())
{
break;
}

// Process all queued events
while (!m_event_queue.empty())
{
EventItem item = std::move(m_event_queue.front());
m_event_queue.pop();

// Release lock while processing to allow new events to be queued
lock.unlock();

// Process the event
switch_core_session_t *psession = switch_core_session_locate(m_sessionId.c_str());
if (psession)
{
try
{
switch (item.event)
{
case CONNECT_SUCCESS:
send_initial_metadata(psession);
m_notify(psession, EVENT_CONNECT, item.message.c_str());
break;
case CONNECTION_DROPPED:
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection closed\n");
m_notify(psession, EVENT_DISCONNECT, item.message.c_str());
break;
case CONNECT_ERROR:
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection error\n");
m_notify(psession, EVENT_ERROR, item.message.c_str());
media_bug_close(psession);
break;
case MESSAGE:
{
std::string msg = item.message;
if (processMessage(psession, msg) != SWITCH_TRUE)
{
m_notify(psession, EVENT_JSON, msg.c_str());
}
if (!m_suppress_log)
{
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "response: %s\n", msg.c_str());
}
break;
}
}
}
catch (...)
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,
"(%s) Exception in worker thread event processing\n",
m_sessionId.c_str());
}
switch_core_session_rwunlock(psession);
}

// Re-acquire lock for next iteration
lock.lock();
}
}
}

std::string m_sessionId;
responseHandler_t m_notify;
WebSocketClient client;
Expand All @@ -447,6 +560,13 @@ class AudioStreamer
int m_playFile;
std::unordered_set<std::string> m_Files;
std::atomic<bool> m_cleanedUp{false};

// Worker thread and event queue
std::thread m_worker_thread;
std::queue<EventItem> m_event_queue;
std::mutex m_queue_mutex;
std::condition_variable m_queue_cv;
std::atomic<bool> m_shutdown;
};

namespace
Expand Down
2 changes: 1 addition & 1 deletion libs/libwsc
Submodule libwsc updated 1 files
+7 −4 README.md