diff --git a/audio_streamer_glue.cpp b/audio_streamer_glue.cpp index 139bcaa9..f6199228 100644 --- a/audio_streamer_glue.cpp +++ b/audio_streamer_glue.cpp @@ -11,6 +11,10 @@ #include #include #include +#include +#include +#include +#include #include "base64.h" #define FRAME_SIZE_8000 320 /* 1000x0.02 (20ms)= 160 x(16bit= 2 bytes) 320 frame size*/ @@ -22,7 +26,8 @@ class AudioStreamer bool suppressLog, const char *extra_headers, bool no_reconnect, const char *tls_cafile, const char *tls_keyfile, const char *tls_certfile, bool tls_disable_hostname_validation) : m_sessionId(uuid), m_notify(callback), - m_suppress_log(suppressLog), m_extra_headers(extra_headers), m_playFile(0) + m_suppress_log(suppressLog), m_extra_headers(extra_headers), m_playFile(0), + m_shutdown(false) { WebSocketHeaders hdrs; @@ -134,8 +139,33 @@ class AudioStreamer cJSON_Delete(root); switch_safe_free(json_str); }); + // Start the worker thread to process callback events + m_worker_thread = std::thread(&AudioStreamer::workerThread, this); + // Now that our callback is setup, we can start our background thread and receive messages - client.connect(); + try + { + client.connect(); + } + catch (...) + { + // If connect() throws, we must clean up the worker thread before + // the exception propagates, otherwise std::terminate() will be called + // when m_worker_thread's destructor runs on a joinable thread. + { + std::lock_guard lock(m_queue_mutex); + m_shutdown = true; + } + m_queue_cv.notify_all(); + + if (m_worker_thread.joinable()) + { + m_worker_thread.join(); + } + + // Re-throw the exception so the constructor fails properly + throw; + } } switch_media_bug_t *get_media_bug(switch_core_session_t *session) @@ -177,38 +207,20 @@ class AudioStreamer void eventCallback(notifyEvent_t event, const char *message) { - switch_core_session_t *psession = switch_core_session_locate(m_sessionId.c_str()); - if (psession) + // Enqueue event for processing by the worker thread + // This avoids blocking the event loop, enabling full-duplex communication + std::lock_guard lock(m_queue_mutex); + if (m_shutdown.load()) { - switch (event) - { - case CONNECT_SUCCESS: - send_initial_metadata(psession); - m_notify(psession, EVENT_CONNECT, message); - break; - case CONNECTION_DROPPED: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection closed\n"); - m_notify(psession, EVENT_DISCONNECT, message); - break; - case CONNECT_ERROR: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection error\n"); - m_notify(psession, EVENT_ERROR, message); - - media_bug_close(psession); - - break; - case MESSAGE: - std::string msg(message); - if (processMessage(psession, msg) != SWITCH_TRUE) - { - m_notify(psession, EVENT_JSON, msg.c_str()); - } - if (!m_suppress_log) - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "response: %s\n", msg.c_str()); - break; - } - switch_core_session_rwunlock(psession); + return; // Don't queue events if we're shutting down } + + EventItem item; + item.event = event; + item.message = message ? std::string(message) : std::string(); + + m_event_queue.push(std::move(item)); + m_queue_cv.notify_one(); } switch_bool_t processMessage(switch_core_session_t *session, std::string &message) @@ -388,7 +400,20 @@ class AudioStreamer return status; } - ~AudioStreamer() = default; + ~AudioStreamer() + { + // Signal shutdown and wait for worker thread to finish + { + std::lock_guard lock(m_queue_mutex); + m_shutdown = true; + } + m_queue_cv.notify_all(); + + if (m_worker_thread.joinable()) + { + m_worker_thread.join(); + } + } void disconnect() { @@ -405,6 +430,9 @@ class AudioStreamer { if (!this->isConnected()) return; + if (!m_suppress_log) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "request writeBinary len: %zu\n", len); + } client.sendBinary(buffer, len); } @@ -439,6 +467,91 @@ class AudioStreamer } private: + // Event queue structure for worker thread + struct EventItem + { + notifyEvent_t event; + std::string message; + }; + + // Worker thread function that processes events from the queue + void workerThread() + { + while (true) + { + std::unique_lock lock(m_queue_mutex); + + // Wait for events or shutdown signal + m_queue_cv.wait(lock, [this] { + return !m_event_queue.empty() || m_shutdown.load(); + }); + + // Check if we should shutdown + if (m_shutdown.load() && m_event_queue.empty()) + { + break; + } + + // Process all queued events + while (!m_event_queue.empty()) + { + EventItem item = std::move(m_event_queue.front()); + m_event_queue.pop(); + + // Release lock while processing to allow new events to be queued + lock.unlock(); + + // Process the event + switch_core_session_t *psession = switch_core_session_locate(m_sessionId.c_str()); + if (psession) + { + try + { + switch (item.event) + { + case CONNECT_SUCCESS: + send_initial_metadata(psession); + m_notify(psession, EVENT_CONNECT, item.message.c_str()); + break; + case CONNECTION_DROPPED: + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection closed\n"); + m_notify(psession, EVENT_DISCONNECT, item.message.c_str()); + break; + case CONNECT_ERROR: + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection error\n"); + m_notify(psession, EVENT_ERROR, item.message.c_str()); + media_bug_close(psession); + break; + case MESSAGE: + { + std::string msg = item.message; + if (processMessage(psession, msg) != SWITCH_TRUE) + { + m_notify(psession, EVENT_JSON, msg.c_str()); + } + if (!m_suppress_log) + { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "response: %s\n", msg.c_str()); + } + break; + } + } + } + catch (...) + { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "(%s) Exception in worker thread event processing\n", + m_sessionId.c_str()); + } + switch_core_session_rwunlock(psession); + } + + // Re-acquire lock for next iteration + lock.lock(); + } + } + } + std::string m_sessionId; responseHandler_t m_notify; WebSocketClient client; @@ -447,6 +560,13 @@ class AudioStreamer int m_playFile; std::unordered_set m_Files; std::atomic m_cleanedUp{false}; + + // Worker thread and event queue + std::thread m_worker_thread; + std::queue m_event_queue; + std::mutex m_queue_mutex; + std::condition_variable m_queue_cv; + std::atomic m_shutdown; }; namespace diff --git a/libs/libwsc b/libs/libwsc index 09d18e5d..366fa041 160000 --- a/libs/libwsc +++ b/libs/libwsc @@ -1 +1 @@ -Subproject commit 09d18e5db9243a996692261a7a0839c9c346bc47 +Subproject commit 366fa04186cda066165f10326002681adaf65d98