From 9031b3998b88c4dd862bd5a9e8271a26101c9ca4 Mon Sep 17 00:00:00 2001 From: HuYilong Date: Mon, 27 Apr 2026 15:28:44 +0800 Subject: [PATCH 1/5] whepfrom:datachannel realization and udp extension --- livetwo/src/utils/webrtc.rs | 11 +- livetwo/src/whep/channel.rs | 209 ++++++++++++++++++++++++++++++++++++ livetwo/src/whep/mod.rs | 13 ++- livetwo/src/whep/webrtc.rs | 65 ++++++++++- src/whepfrom.rs | 6 ++ 5 files changed, 297 insertions(+), 7 deletions(-) create mode 100644 livetwo/src/whep/channel.rs diff --git a/livetwo/src/utils/webrtc.rs b/livetwo/src/utils/webrtc.rs index 6fba0b40..22fee277 100644 --- a/livetwo/src/utils/webrtc.rs +++ b/livetwo/src/utils/webrtc.rs @@ -4,7 +4,10 @@ use std::sync::Arc; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use webrtc::{ - api::{APIBuilder, interceptor_registry::register_default_interceptors, media_engine::*}, + api::{ + APIBuilder, interceptor_registry::register_default_interceptors, media_engine::*, + setting_engine::SettingEngine, + }, ice_transport::ice_server::RTCIceServer, interceptor::registry::Registry, peer_connection::{ @@ -22,9 +25,13 @@ pub async fn create_api() -> Result<(APIBuilder, RTCConfiguration)> { let mut registry = Registry::new(); registry = register_default_interceptors(registry, &mut m)?; + let mut s = SettingEngine::default(); + s.detach_data_channels(); + let api = APIBuilder::new() .with_media_engine(m) - .with_interceptor_registry(registry); + .with_interceptor_registry(registry) + .with_setting_engine(s); let config = RTCConfiguration { ice_servers: vec![RTCIceServer { diff --git a/livetwo/src/whep/channel.rs b/livetwo/src/whep/channel.rs new file mode 100644 index 00000000..043b40b7 --- /dev/null +++ b/livetwo/src/whep/channel.rs @@ -0,0 +1,209 @@ +/// DataChannel <-> UDP bidirectional forwarding for whepfrom +/// +/// Symmetric to liveion's channel.rs, but on the WHEP subscriber side. +/// Messages received from liveion via DataChannel are forwarded to UDP, +/// and messages received from UDP are sent back to liveion via DataChannel. +/// +/// URL format: udp://:?host=&port= +use tokio::net::UdpSocket; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +/// Buffer size for incoming UDP packets. +/// - WebRTC DataChannel SCTP max: 1024 * 64 = 65536 bytes +/// - RFC 8831 WebRTC DataChannel max: < 1024 * 16 = 16384 bytes +/// - IP UDP MTU: 1500 bytes +/// - Recommended single payload: < 1200 bytes +const UDP_BUF_SIZE: usize = 1024; + +/// Parse UDP URL into (listen_host, listen_port, target_host, target_port) +pub fn parse_channel_url(url: &str) -> Option<(String, u16, String, u16)> { + let s = url.strip_prefix("udp://")?; + let (host_port, query) = s.split_once('?')?; + let (listen_host_raw, listen_port_str) = host_port.rsplit_once(':')?; + let listen_port: u16 = listen_port_str.parse().ok()?; + let listen_host_inner = listen_host_raw.trim_matches(|c| c == '[' || c == ']'); + let listen_host = if listen_host_inner.contains(':') { + format!("[{}]", listen_host_inner) + } else { + listen_host_inner.to_string() + }; + + let mut target_host = String::new(); + let mut target_port: u16 = 0; + for param in query.split('&') { + if let Some(v) = param.strip_prefix("host=") { + target_host = if v.parse::().is_ok() { + format!("[{}]", v) + } else { + v.to_string() + }; + } else if let Some(v) = param.strip_prefix("port=") { + target_port = v.parse().ok()?; + } + } + if target_host.is_empty() || target_port == 0 { + return None; + } + Some((listen_host, listen_port, target_host, target_port)) +} + +/// Spawn bidirectional DataChannel <-> UDP forwarding. +/// `dc_recv`: messages received from liveion DataChannel +/// `dc_send`: sender to write messages back to liveion DataChannel +pub async fn spawn_channel( + url: String, + mut dc_recv: mpsc::UnboundedReceiver>, + dc_send: mpsc::UnboundedSender>, +) -> anyhow::Result<()> { + let (listen_host, listen_port, target_host, target_port) = + parse_channel_url(&url).ok_or_else(|| anyhow::anyhow!("invalid channel url: {}", url))?; + + let target = format!("{}:{}", target_host, target_port); + let listen = format!("{}:{}", listen_host, listen_port); + + let socket = match UdpSocket::bind(&listen).await { + Ok(s) => { + info!("whepfrom channel: listen={} target={}", listen, target); + s + } + Err(e) => { + warn!("whepfrom channel: bind {} failed: {}", listen, e); + return Err(anyhow::anyhow!("bind {} failed: {}", listen, e)); + } + }; + + tokio::spawn(async move { + let mut buf = vec![0u8; UDP_BUF_SIZE]; + loop { + tokio::select! { + // DataChannel -> UDP (messages from liveion WHIP group) + msg = dc_recv.recv() => { + match msg { + Some(data) => { + if let Err(e) = socket.send_to(&data, &target).await { + warn!("whepfrom channel: send to {} failed: {}", target, e); + } else { + debug!("whepfrom channel: DC->UDP {} bytes -> {}", data.len(), target); + } + } + None => { + info!("whepfrom channel: DC recv closed"); + break; + } + } + }, + // UDP -> DataChannel (messages to liveion WHIP group) + result = socket.recv_from(&mut buf) => { + match result { + Ok((n, addr)) => { + let data = buf[..n].to_vec(); + debug!("whepfrom channel: UDP->DC {} bytes from {}", n, addr); + if dc_send.send(data).is_err() { + info!("whepfrom channel: DC send closed"); + break; + } + } + Err(e) => { + warn!("whepfrom channel: recv_from failed: {}", e); + } + } + }, + } + } + }); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::net::UdpSocket; + use tokio::sync::mpsc; + + #[test] + fn test_parse_channel_url_ipv4() { + let (listen_host, listen_port, target_host, target_port) = + parse_channel_url("udp://0.0.0.0:9001?host=127.0.0.1&port=9000").unwrap(); + assert_eq!(listen_host, "0.0.0.0"); + assert_eq!(listen_port, 9001); + assert_eq!(target_host, "127.0.0.1"); + assert_eq!(target_port, 9000); + } + + #[test] + fn test_parse_channel_url_ipv6() { + let (listen_host, listen_port, target_host, target_port) = + parse_channel_url("udp://[::]:9001?host=::1&port=9000").unwrap(); + assert_eq!(listen_host, "[::]"); + assert_eq!(listen_port, 9001); + assert_eq!(target_host, "[::1]"); + assert_eq!(target_port, 9000); + } + + #[test] + fn test_parse_channel_url_invalid_scheme() { + assert!(parse_channel_url("tcp://0.0.0.0:9001?host=127.0.0.1&port=9000").is_none()); + } + + #[test] + fn test_parse_channel_url_missing_target() { + assert!(parse_channel_url("udp://0.0.0.0:9001").is_none()); + } + + #[tokio::test] + async fn test_dc_to_udp_forwarding() { + let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let receiver_port = receiver.local_addr().unwrap().port(); + + let url = format!("udp://0.0.0.0:0?host=127.0.0.1&port={}", receiver_port); + + let (dc_recv_tx, dc_recv_rx) = mpsc::unbounded_channel::>(); + let (dc_send_tx, _dc_send_rx) = mpsc::unbounded_channel::>(); + + spawn_channel(url, dc_recv_rx, dc_send_tx).await.unwrap(); + + let msg = b"hello from datachannel"; + dc_recv_tx.send(msg.to_vec()).unwrap(); + + let mut buf = vec![0u8; 1024]; + let (n, _) = tokio::time::timeout( + std::time::Duration::from_secs(2), + receiver.recv_from(&mut buf), + ) + .await + .expect("timeout") + .unwrap(); + + assert_eq!(&buf[..n], msg); + } + + #[tokio::test] + async fn test_udp_to_dc_forwarding() { + let listen_port = portpicker::pick_unused_port().unwrap(); + let url = format!("udp://0.0.0.0:{}?host=127.0.0.1&port=19999", listen_port); + + let (_dc_recv_tx, dc_recv_rx) = mpsc::unbounded_channel::>(); + let (dc_send_tx, mut dc_send_rx) = mpsc::unbounded_channel::>(); + + spawn_channel(url, dc_recv_rx, dc_send_tx).await.unwrap(); + + let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let msg = b"hello from udp"; + sender + .send_to(msg, format!("127.0.0.1:{}", listen_port)) + .await + .unwrap(); + + let received = tokio::time::timeout( + std::time::Duration::from_secs(2), + dc_send_rx.recv(), + ) + .await + .expect("timeout") + .unwrap(); + + assert_eq!(received, msg); + } +} diff --git a/livetwo/src/whep/mod.rs b/livetwo/src/whep/mod.rs index 19058e02..f5e0092b 100644 --- a/livetwo/src/whep/mod.rs +++ b/livetwo/src/whep/mod.rs @@ -1,3 +1,4 @@ +mod channel; mod output; mod webrtc; @@ -28,6 +29,7 @@ pub async fn from( sdp_file: Option, token: Option, command: Option, + channel_url: Option, ) -> Result<()> { info!("Starting WHEP session: {}", target_url); @@ -37,7 +39,7 @@ pub async fn from( let mut client = Client::new(whep_url.clone(), Client::get_auth_header_map(token.clone())); - let (peer, answer, stats) = webrtc::setup_whep_peer( + let (peer, answer, stats, dc_recv_rx, dc_send_tx) = webrtc::setup_whep_peer( ct.clone(), &mut client, video_send, @@ -47,6 +49,11 @@ pub async fn from( .await?; info!("WebRTC peer connection established"); + // Start DataChannel <-> UDP forwarding if channel_url is configured + if let Some(url) = channel_url { + channel::spawn_channel(url, dc_recv_rx, dc_send_tx).await?; + } + start_stats_monitor(ct.clone(), peer.clone(), stats.clone()).await; let stats_clone = stats.clone(); @@ -57,12 +64,12 @@ pub async fn from( loop { tokio::select! { _ = interval.tick() => { - let summary = stats_clone.get_summary().await; + let summary: crate::utils::stats::StatsSummary = stats_clone.get_summary().await; info!("{}", summary.format()); } _ = ct_clone.cancelled() => { info!("Stats reporter shutting down"); - let final_summary = stats_clone.get_summary().await; + let final_summary: crate::utils::stats::StatsSummary = stats_clone.get_summary().await; info!("Final Statistics:\n{}", final_summary.format()); break; } diff --git a/livetwo/src/whep/webrtc.rs b/livetwo/src/whep/webrtc.rs index de18d14f..09906243 100644 --- a/livetwo/src/whep/webrtc.rs +++ b/livetwo/src/whep/webrtc.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::{Result, anyhow}; use libwish::Client; +use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; use tokio_util::sync::CancellationToken; use tracing::{debug, info, trace, warn}; @@ -30,8 +31,13 @@ pub async fn setup_whep_peer( Arc, RTCSessionDescription, Arc, + mpsc::UnboundedReceiver>, + mpsc::UnboundedSender>, )> { - let peer = create_peer(ct, video_send, audio_send, codec_info.clone()).await?; + let (dc_recv_tx, dc_recv_rx) = mpsc::unbounded_channel::>(); + let (dc_send_tx, dc_send_rx) = mpsc::unbounded_channel::>(); + + let peer = create_peer(ct, video_send, audio_send, codec_info.clone(), dc_recv_tx, dc_send_rx).await?; utils::webrtc::setup_connection(peer.clone(), client).await?; @@ -44,7 +50,7 @@ pub async fn setup_whep_peer( setup_rtcp_listener_for_senders(peer.clone(), stats.clone()).await; - Ok((peer, answer, stats)) + Ok((peer, answer, stats, dc_recv_rx, dc_send_tx)) } async fn setup_rtcp_listener_for_senders(peer: Arc, stats: Arc) { @@ -119,6 +125,8 @@ async fn create_peer( video_send: UnboundedSender>, audio_send: UnboundedSender>, codec_info: Arc>, + dc_recv_tx: mpsc::UnboundedSender>, + mut dc_send_rx: mpsc::UnboundedReceiver>, ) -> Result> { let (api, config) = utils::webrtc::create_api().await?; @@ -129,6 +137,59 @@ async fn create_peer( .map_err(|error| anyhow!(format!("{:?}: {}", error, error)))?, ); + // Create DataChannel to participate in liveion's WHEP group + let dc = peer + .create_data_channel("control", None) + .await + .map_err(|e| anyhow!("create_data_channel failed: {:?}", e))?; + + // detach 模式:在 on_open 里 detach,然后用 raw read/write loop + let dc_for_detach = dc.clone(); + dc.on_open(Box::new(move || { + info!("whepfrom: DataChannel opened"); + let dc_recv_tx = dc_recv_tx.clone(); + Box::pin(async move { + let raw = match dc_for_detach.detach().await { + Ok(raw) => raw, + Err(e) => { + warn!("whepfrom: DataChannel detach failed: {}", e); + return; + } + }; + + // raw read loop: DataChannel -> dc_recv_tx + let raw_r = raw.clone(); + tokio::spawn(async move { + let mut buf = vec![0u8; 65536]; + loop { + match raw_r.read(&mut buf).await { + Ok(0) => { + info!("whepfrom: DataChannel read loop ended"); + break; + } + Ok(n) => { + let _ = dc_recv_tx.send(buf[..n].to_vec()); + } + Err(e) => { + info!("whepfrom: DataChannel read error: {}", e); + break; + } + } + } + }); + + // raw write loop: dc_send_rx -> DataChannel + tokio::spawn(async move { + while let Some(data) = dc_send_rx.recv().await { + if let Err(e) = raw.write(&data.into()).await { + warn!("whepfrom: DataChannel write failed: {}", e); + break; + } + } + }); + }) + })); + peer.add_transceiver_from_kind( RTPCodecType::Video, Some(RTCRtpTransceiverInit { diff --git a/src/whepfrom.rs b/src/whepfrom.rs index b8e68459..d72e7822 100644 --- a/src/whepfrom.rs +++ b/src/whepfrom.rs @@ -27,6 +27,11 @@ struct Args { /// Run a command as childprocess #[arg(long)] command: Option, + /// Channel URL for DataChannel <-> UDP forwarding + /// Format: udp://:?host=&port= + /// Example: udp://0.0.0.0:9001?host=127.0.0.1&port=9000 + #[arg(long)] + channel: Option, } #[tokio::main] @@ -53,6 +58,7 @@ async fn main() -> Result<()> { args.sdp_file.clone(), args.token.clone(), args.command.clone(), + args.channel.clone(), )); utils::shutdown_signal().await; From 70e860197503a7156d32aa8127c813ab86e0520a Mon Sep 17 00:00:00 2001 From: HuYilong Date: Mon, 27 Apr 2026 15:42:31 +0800 Subject: [PATCH 2/5] solve the format and clippy problems --- livetwo/src/whep/channel.rs | 11 ++++------- livetwo/src/whep/webrtc.rs | 10 +++++++++- tests/rtp.rs | 1 + tests/rtsp.rs | 1 + tests/rtsp2.rs | 3 +++ tests/tests.rs | 1 + 6 files changed, 19 insertions(+), 8 deletions(-) diff --git a/livetwo/src/whep/channel.rs b/livetwo/src/whep/channel.rs index 043b40b7..fb13cfdd 100644 --- a/livetwo/src/whep/channel.rs +++ b/livetwo/src/whep/channel.rs @@ -196,13 +196,10 @@ mod tests { .await .unwrap(); - let received = tokio::time::timeout( - std::time::Duration::from_secs(2), - dc_send_rx.recv(), - ) - .await - .expect("timeout") - .unwrap(); + let received = tokio::time::timeout(std::time::Duration::from_secs(2), dc_send_rx.recv()) + .await + .expect("timeout") + .unwrap(); assert_eq!(received, msg); } diff --git a/livetwo/src/whep/webrtc.rs b/livetwo/src/whep/webrtc.rs index 09906243..4bb8823d 100644 --- a/livetwo/src/whep/webrtc.rs +++ b/livetwo/src/whep/webrtc.rs @@ -37,7 +37,15 @@ pub async fn setup_whep_peer( let (dc_recv_tx, dc_recv_rx) = mpsc::unbounded_channel::>(); let (dc_send_tx, dc_send_rx) = mpsc::unbounded_channel::>(); - let peer = create_peer(ct, video_send, audio_send, codec_info.clone(), dc_recv_tx, dc_send_rx).await?; + let peer = create_peer( + ct, + video_send, + audio_send, + codec_info.clone(), + dc_recv_tx, + dc_send_rx, + ) + .await?; utils::webrtc::setup_connection(peer.clone(), client).await?; diff --git a/tests/rtp.rs b/tests/rtp.rs index 7adfd9e0..437b2a5a 100644 --- a/tests/rtp.rs +++ b/tests/rtp.rs @@ -405,6 +405,7 @@ async fn helper_livetwo_rtp( Some(tmp_path.clone()), None, None, + None, )); let mut result = None; diff --git a/tests/rtsp.rs b/tests/rtsp.rs index d3554769..4a2d59b0 100644 --- a/tests/rtsp.rs +++ b/tests/rtsp.rs @@ -568,6 +568,7 @@ async fn helper_livetwo_rtsp( None, None, None, + None, )); let mut result = None; diff --git a/tests/rtsp2.rs b/tests/rtsp2.rs index 98763afb..22497c8c 100644 --- a/tests/rtsp2.rs +++ b/tests/rtsp2.rs @@ -620,6 +620,7 @@ async fn start_stream_a_whep( None, None, None, + None, )) } @@ -682,6 +683,7 @@ async fn start_stream_b_whep( None, None, None, + None, )) } @@ -700,6 +702,7 @@ async fn start_stream_c_whep( None, None, None, + None, )) } diff --git a/tests/tests.rs b/tests/tests.rs index 97d2514e..8e7318f5 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -181,6 +181,7 @@ a=rtpmap:96 VP8/90000 Some(tmp_path.clone()), None, None, + None, )); let mut result = None; From f5d3484ec0cc2f491ecfa57b03c7f2781732fca9 Mon Sep 17 00:00:00 2001 From: HuYilong Date: Sat, 9 May 2026 13:10:31 +0800 Subject: [PATCH 3/5] code improvement --- Cargo.toml | 1 + livetwo/src/whep/channel.rs | 44 ++++---- livetwo/src/whep/mod.rs | 1 + livetwo/src/whep/webrtc.rs | 59 ++++++----- tests/channel.rs | 195 ++++++++++++++++++++++++++++++++++++ tests/rtp.rs | 65 ++---------- 6 files changed, 265 insertions(+), 100 deletions(-) create mode 100644 tests/channel.rs diff --git a/Cargo.toml b/Cargo.toml index a0abd8d4..11810b0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ base64 = "0.22" [dev-dependencies] http = { workspace = true } serde_json = { workspace = true } +portpicker = "0.1.1" api = { path = "libs/api" } cli = { path = "libs/cli" } diff --git a/livetwo/src/whep/channel.rs b/livetwo/src/whep/channel.rs index fb13cfdd..2803b765 100644 --- a/livetwo/src/whep/channel.rs +++ b/livetwo/src/whep/channel.rs @@ -18,33 +18,43 @@ const UDP_BUF_SIZE: usize = 1024; /// Parse UDP URL into (listen_host, listen_port, target_host, target_port) pub fn parse_channel_url(url: &str) -> Option<(String, u16, String, u16)> { - let s = url.strip_prefix("udp://")?; - let (host_port, query) = s.split_once('?')?; - let (listen_host_raw, listen_port_str) = host_port.rsplit_once(':')?; - let listen_port: u16 = listen_port_str.parse().ok()?; - let listen_host_inner = listen_host_raw.trim_matches(|c| c == '[' || c == ']'); - let listen_host = if listen_host_inner.contains(':') { - format!("[{}]", listen_host_inner) + let parsed = url::Url::parse(url).ok()?; + if parsed.scheme() != "udp" { + return None; + } + + // url::Url::host_str() returns IPv6 addresses already wrapped in brackets (e.g. "[::1]"), + // but plain IPv4/domain without brackets. Normalize both to bracketed form for socket addresses. + let listen_host = parsed.host_str()?.to_string(); + let listen_host = if listen_host.starts_with('[') { + listen_host + } else if listen_host.contains(':') { + format!("[{}]", listen_host) } else { - listen_host_inner.to_string() + listen_host }; + let listen_port = parsed.port()?; let mut target_host = String::new(); let mut target_port: u16 = 0; - for param in query.split('&') { - if let Some(v) = param.strip_prefix("host=") { - target_host = if v.parse::().is_ok() { - format!("[{}]", v) - } else { - v.to_string() - }; - } else if let Some(v) = param.strip_prefix("port=") { - target_port = v.parse().ok()?; + for (key, value) in parsed.query_pairs() { + match key.as_ref() { + "host" => target_host = value.into_owned(), + "port" => target_port = value.parse().ok()?, + _ => {} } } if target_host.is_empty() || target_port == 0 { return None; } + + // query_pairs() returns raw IPv6 without brackets, add them for socket addresses + let target_host = if target_host.contains(':') { + format!("[{}]", target_host) + } else { + target_host + }; + Some((listen_host, listen_port, target_host, target_port)) } diff --git a/livetwo/src/whep/mod.rs b/livetwo/src/whep/mod.rs index f5e0092b..96b6c696 100644 --- a/livetwo/src/whep/mod.rs +++ b/livetwo/src/whep/mod.rs @@ -51,6 +51,7 @@ pub async fn from( // Start DataChannel <-> UDP forwarding if channel_url is configured if let Some(url) = channel_url { + debug!("Starting DataChannel <-> UDP forwarding: {}", url); channel::spawn_channel(url, dc_recv_rx, dc_send_tx).await?; } diff --git a/livetwo/src/whep/webrtc.rs b/livetwo/src/whep/webrtc.rs index 4bb8823d..852f12b0 100644 --- a/livetwo/src/whep/webrtc.rs +++ b/livetwo/src/whep/webrtc.rs @@ -21,6 +21,9 @@ use webrtc::{ use crate::utils; use crate::utils::stats::RtcpStats; +/// DataChannel label used to join liveion's WHEP group for bidirectional control messaging. +const DATA_CHANNEL_LABEL: &str = "control"; + pub async fn setup_whep_peer( ct: CancellationToken, client: &mut Client, @@ -147,11 +150,11 @@ async fn create_peer( // Create DataChannel to participate in liveion's WHEP group let dc = peer - .create_data_channel("control", None) + .create_data_channel(DATA_CHANNEL_LABEL, None) .await .map_err(|e| anyhow!("create_data_channel failed: {:?}", e))?; - // detach 模式:在 on_open 里 detach,然后用 raw read/write loop + // Detach mode: call detach() inside on_open, then drive reads/writes via raw loops. let dc_for_detach = dc.clone(); dc.on_open(Box::new(move || { info!("whepfrom: DataChannel opened"); @@ -165,33 +168,39 @@ async fn create_peer( } }; - // raw read loop: DataChannel -> dc_recv_tx + // Single task driving both directions with tokio::select!: + // - raw read -> dc_recv_tx (DataChannel -> upstream) + // - dc_send_rx -> raw write (upstream -> DataChannel) let raw_r = raw.clone(); tokio::spawn(async move { let mut buf = vec![0u8; 65536]; loop { - match raw_r.read(&mut buf).await { - Ok(0) => { - info!("whepfrom: DataChannel read loop ended"); - break; - } - Ok(n) => { - let _ = dc_recv_tx.send(buf[..n].to_vec()); - } - Err(e) => { - info!("whepfrom: DataChannel read error: {}", e); - break; - } - } - } - }); - - // raw write loop: dc_send_rx -> DataChannel - tokio::spawn(async move { - while let Some(data) = dc_send_rx.recv().await { - if let Err(e) = raw.write(&data.into()).await { - warn!("whepfrom: DataChannel write failed: {}", e); - break; + tokio::select! { + result = raw_r.read(&mut buf) => match result { + Ok(0) => { + info!("whepfrom: DataChannel read loop ended"); + break; + } + Ok(n) => { + let _ = dc_recv_tx.send(buf[..n].to_vec()); + } + Err(e) => { + info!("whepfrom: DataChannel read error: {}", e); + break; + } + }, + msg = dc_send_rx.recv() => match msg { + Some(data) => { + if let Err(e) = raw.write(&data.into()).await { + warn!("whepfrom: DataChannel write failed: {}", e); + break; + } + } + None => { + info!("whepfrom: DataChannel send channel closed"); + break; + } + }, } } }); diff --git a/tests/channel.rs b/tests/channel.rs new file mode 100644 index 00000000..5026a865 --- /dev/null +++ b/tests/channel.rs @@ -0,0 +1,195 @@ +/// Integration test: DataChannel <-> UDP forwarding in whepfrom +/// +/// This test verifies that whepfrom correctly bridges DataChannel messages +/// to/from UDP when started with the --channel flag. +/// +/// Topology: +/// +/// liveion (SFU) +/// | +/// WHIP publisher (whipinto, no media, DataChannel only) +/// | DataChannel WHIP group +/// liveion internal broadcast +/// | DataChannel WHEP group +/// whepfrom (WHEP subscriber + --channel) +/// | +/// UDP socket (whepfrom_ch_target) <-- receives forwarded messages +/// +/// And the reverse: +/// +/// UDP sender --> whepfrom_ch_listen +/// | +/// whepfrom DataChannel --> liveion publish broadcast +/// | +/// liveion DataChannel write loop --> WHIP publisher DataChannel +/// +/// NOTE: liveion's own UDP channel (channel.streams) requires a DataChannel +/// to be opened by the WHIP publisher. Since whip::into does not open a +/// DataChannel, we test only the whepfrom side here. The liveion UDP channel +/// is covered by liveion's own unit tests (forward/channel.rs). +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use tokio::net::{TcpListener, UdpSocket}; +use tokio_util::sync::CancellationToken; + +mod common; +use common::shutdown_signal; + +async fn wait_for_session_connected(addr: &SocketAddr, stream_id: &str, is_publish: bool) -> bool { + for _ in 0..200 { + let body = reqwest::get(format!("http://{addr}{}", api::path::streams(""))) + .await + .unwrap() + .json::>() + .await + .unwrap_or_default(); + + if let Some(stream) = body.into_iter().find(|s| s.id == stream_id) { + let sessions = if is_publish { + &stream.publish.sessions + } else { + &stream.subscribe.sessions + }; + if !sessions.is_empty() + && sessions[0].state == api::response::RTCPeerConnectionState::Connected + { + return true; + } + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + false +} + +#[tokio::test] +async fn test_whepfrom_datachannel_udp_forwarding() { + let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + let stream_id = "test-dc-channel"; + + // ── 1. Pick free ports ──────────────────────────────────────────────────── + let whepfrom_ch_listen: u16 = portpicker::pick_unused_port().unwrap(); + let whepfrom_ch_target: u16 = portpicker::pick_unused_port().unwrap(); + + // ── 2. Start liveion ────────────────────────────────────────────────────── + let cfg = liveion::config::Config::default(); + let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(liveion::serve(cfg, listener, shutdown_signal())); + + // ── 3. Create the stream ────────────────────────────────────────────────── + let res = reqwest::Client::new() + .post(format!("http://{addr}{}", api::path::streams(stream_id))) + .send() + .await + .unwrap(); + assert_eq!(http::StatusCode::NO_CONTENT, res.status()); + + // ── 4. Connect a WHIP publisher (SDP file, no real media) ───────────────── + // whip::into reads the SDP and establishes a WebRTC connection. + // This puts a peer in the WHIP group so liveion can relay DataChannel messages. + let sdp_content = "v=0\r\n\ + o=- 0 0 IN IP4 127.0.0.1\r\n\ + s=test\r\n\ + c=IN IP4 127.0.0.1\r\n\ + t=0 0\r\n\ + m=video 5004 RTP/AVP 96\r\n\ + a=rtpmap:96 VP8/90000\r\n"; + let sdp_file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(sdp_file.path(), sdp_content).unwrap(); + let sdp_path = sdp_file.path().to_str().unwrap().to_string(); + + let ct = CancellationToken::new(); + let handle_whip = tokio::spawn(livetwo::whip::into( + ct.clone(), + sdp_path, + format!("http://{addr}{}", api::path::whip(stream_id)), + None, + None, + )); + + assert!( + wait_for_session_connected(&addr, stream_id, true).await, + "WHIP publisher failed to connect" + ); + + // ── 5. Start whepfrom with --channel ───────────────────────────────────── + let whep_channel_url = + format!("udp://0.0.0.0:{whepfrom_ch_listen}?host=127.0.0.1&port={whepfrom_ch_target}"); + let handle_whepfrom = tokio::spawn(livetwo::whep::from( + ct.clone(), + format!("rtp://{ip}"), + format!("http://{addr}{}", api::path::whep(stream_id)), + None, + None, + None, + Some(whep_channel_url), + )); + + assert!( + wait_for_session_connected(&addr, stream_id, false).await, + "WHEP subscriber (whepfrom) failed to connect" + ); + + // Bind receivers before sending so no packets are dropped + let whepfrom_receiver = UdpSocket::bind(format!("127.0.0.1:{whepfrom_ch_target}")) + .await + .unwrap(); + + // Give DataChannel time to open and detach on both sides + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // ── 6. Test path: whepfrom UDP listen → DataChannel → liveion ──────────── + // Send a UDP packet into whepfrom's listen port. + // whepfrom forwards it via DataChannel to liveion's subscribe broadcast. + // liveion's write loop sends it to the WHIP publisher's DataChannel. + // (We don't assert receipt at the WHIP side here since whip::into has no + // DataChannel receive path, but we verify the send doesn't error out.) + let udp_sender = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let msg_to_dc = b"udp to datachannel"; + udp_sender + .send_to(msg_to_dc, format!("127.0.0.1:{whepfrom_ch_listen}")) + .await + .unwrap(); + + // Small delay to let the message propagate + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + // ── 7. Test path: liveion DataChannel → whepfrom UDP target ────────────── + // Inject a message directly into liveion's subscribe broadcast by having + // the WHIP publisher's DataChannel send it. Since whip::into doesn't expose + // a DataChannel send API, we verify the reverse path via the whepfrom UDP + // channel's own loopback: send to whepfrom listen, expect it at whepfrom target + // after it round-trips through the DataChannel. + // + // For a full end-to-end test of liveion UDP <-> whepfrom UDP, the WHIP + // publisher would need to open a DataChannel (not currently supported by + // whip::into). That path is covered by manual integration testing. + // + // Here we verify that whepfrom correctly forwards UDP→DC without errors, + // and that the channel infrastructure is wired up correctly. + let msg_echo = b"echo test"; + udp_sender + .send_to(msg_echo, format!("127.0.0.1:{whepfrom_ch_listen}")) + .await + .unwrap(); + + // The message should NOT appear at whepfrom_ch_target (it went DC→liveion, + // not looped back). Verify the receiver stays empty for a short window. + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + // try_recv_from is non-blocking; WouldBlock confirms the message went into + // the DataChannel rather than being echoed back to UDP. + let mut buf = vec![0u8; 256]; + let recv_result = whepfrom_receiver.try_recv_from(&mut buf); + assert!( + recv_result.is_err(), + "unexpected data at whepfrom UDP target: {:?}", + recv_result + ); + + // ── 8. Teardown ─────────────────────────────────────────────────────────── + ct.cancel(); + let result_whepfrom = handle_whepfrom.await.unwrap(); + let result_whip = handle_whip.await.unwrap(); + assert!(result_whepfrom.is_ok()); + assert!(result_whip.is_ok()); +} diff --git a/tests/rtp.rs b/tests/rtp.rs index 437b2a5a..6aeb5fb9 100644 --- a/tests/rtp.rs +++ b/tests/rtp.rs @@ -19,9 +19,6 @@ async fn test_livetwo_rtp_vp8() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5000; - let whep_port: u16 = 5005; - let width = 1280; let height = 720; let vcodec = "-vcodec libvpx -pix_fmt yuv420p -g 30 -keyint_min 30 -deadline realtime -speed 4 -b:v 2000k -maxrate 2500k -bufsize 5000k"; @@ -31,8 +28,6 @@ async fn test_livetwo_rtp_vp8() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: None, video: Some((width, height)), @@ -46,9 +41,6 @@ async fn test_livetwo_rtp_vp8_ipv6() { let ip = IpAddr::V6(Ipv6Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5010; - let whep_port: u16 = 5015; - let width = 1280; let height = 720; let vcodec = "-vcodec libvpx -pix_fmt yuv420p -g 30 -keyint_min 30 -deadline realtime -speed 4 -b:v 2000k -maxrate 2500k -bufsize 5000k"; @@ -58,8 +50,6 @@ async fn test_livetwo_rtp_vp8_ipv6() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: None, video: Some((width, height)), @@ -73,9 +63,6 @@ async fn test_livetwo_rtp_vp9() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5020; - let whep_port: u16 = 5025; - let width = 1280; let height = 720; let vcodec = "-vcodec libvpx-vp9 -pix_fmt yuv420p -g 30 -keyint_min 30 -deadline realtime -speed 5 -row-mt 1 -tile-columns 2 -frame-parallel 1 -b:v 1800k -maxrate 2200k -bufsize 4400k"; @@ -87,8 +74,6 @@ async fn test_livetwo_rtp_vp9() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: None, video: Some((width, height)), @@ -102,9 +87,6 @@ async fn test_livetwo_rtp_h264() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5030; - let whep_port: u16 = 5035; - let width = 1280; let height = 720; let vcodec = "-vcodec libx264 -pix_fmt yuv420p -g 30 -keyint_min 30 -crf 23 -preset ultrafast -tune zerolatency -profile:v main -level 4.1"; @@ -114,8 +96,6 @@ async fn test_livetwo_rtp_h264() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: None, video: Some((width, height)), @@ -129,9 +109,6 @@ async fn test_livetwo_rtp_h265() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5090; - let whep_port: u16 = 5095; - let width = 1280; let height = 720; let vcodec = "-vcodec libx265 -pix_fmt yuv420p -g 30 -keyint_min 30 -crf 25 -preset ultrafast -tune zerolatency -profile:v main -level 4.1"; @@ -141,8 +118,6 @@ async fn test_livetwo_rtp_h265() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: None, video: Some((width, height)), @@ -156,9 +131,6 @@ async fn test_livetwo_rtp_vp9_4k() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5040; - let whep_port: u16 = 5045; - let width = 3840; let height = 2160; let vcodec = "-vcodec libvpx-vp9 -pix_fmt yuv420p -g 30 -keyint_min 30 -deadline realtime -speed 5 -row-mt 1 -tile-columns 2 -frame-parallel 1 -b:v 10m -maxrate 15m -bufsize 30m"; @@ -170,8 +142,6 @@ async fn test_livetwo_rtp_vp9_4k() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: None, video: Some((width, height)), @@ -185,9 +155,6 @@ async fn test_livetwo_rtp_opus() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5050; - let whep_port: u16 = 5055; - let acodec = "-acodec libopus -ar 48000 -ac 2 -b:a 48k -application voip -frame_duration 10 -vbr constrained"; let prefix = format!("ffmpeg -re -f lavfi -i sine=frequency=1000 {acodec}"); @@ -195,8 +162,6 @@ async fn test_livetwo_rtp_opus() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: Some(2), video: None, @@ -210,9 +175,6 @@ async fn test_livetwo_rtp_g722() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5060; - let whep_port: u16 = 5065; - let acodec = "-acodec g722"; let prefix = format!("ffmpeg -re -f lavfi -i sine=frequency=1000 {acodec}"); @@ -220,8 +182,6 @@ async fn test_livetwo_rtp_g722() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: Some(1), video: None, @@ -235,9 +195,6 @@ async fn test_livetwo_rtp_vp8_opus() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5070; - let whep_port: u16 = 5075; - let width = 1280; let height = 720; @@ -251,8 +208,6 @@ async fn test_livetwo_rtp_vp8_opus() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: Some(2), video: Some((width, height)), @@ -266,9 +221,6 @@ async fn test_livetwo_rtp_h264_g722() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = 0; - let whip_port: u16 = 5080; - let whep_port: u16 = 5085; - let width = 1280; let height = 720; @@ -282,8 +234,6 @@ async fn test_livetwo_rtp_h264_g722() { ip, port, &prefix, - whip_port, - whep_port, Detect { audio: Some(1), video: Some((width, height)), @@ -292,14 +242,7 @@ async fn test_livetwo_rtp_h264_g722() { .await; } -async fn helper_livetwo_rtp( - ip: IpAddr, - port: u16, - prefix: &str, - whip_port: u16, - whep_port: u16, - detect: Detect, -) { +async fn helper_livetwo_rtp(ip: IpAddr, port: u16, prefix: &str, detect: Detect) { let cfg = liveion::config::Config::default(); let listener = TcpListener::bind(SocketAddr::new(ip, port)).await.unwrap(); @@ -330,6 +273,10 @@ async fn helper_livetwo_rtp( .unwrap() .to_string(); + // Use port 0 to let the OS assign a free port, avoiding conflicts when + // tests run concurrently (WSAEADDRINUSE / -10048 on Windows). + let whip_port = portpicker::pick_unused_port().unwrap(); + let ct = CancellationToken::new(); let handle_whip = tokio::spawn(livetwo::whip::into( ct.clone(), @@ -383,6 +330,8 @@ async fn helper_livetwo_rtp( _ => ip.to_string(), }; + let whep_port = portpicker::pick_unused_port().unwrap(); + let target_url = if detect.audio.is_some() && detect.video.is_some() { format!( "rtp://{}?video={}&audio={}", From 4aac2f624e5279e467e753412551f313bc8f67d3 Mon Sep 17 00:00:00 2001 From: HuYilong Date: Thu, 14 May 2026 11:34:18 +0800 Subject: [PATCH 4/5] revise the architecture:UDP channel is no longer dependent on WHIP publisher --- Cargo.lock | 1 + liveion/src/forward/internal.rs | 36 ++++-- liveion/src/forward/mod.rs | 5 + liveion/src/stream/manager.rs | 14 ++- tests/channel.rs | 194 ++++++++++++++------------------ 5 files changed, 132 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e6725cd..33ffcd60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2349,6 +2349,7 @@ dependencies = [ "livetwo", "net4mqtt", "opendal", + "portpicker", "reqwest", "serde", "serde_json", diff --git a/liveion/src/forward/internal.rs b/liveion/src/forward/internal.rs index f1a20a20..40745d18 100644 --- a/liveion/src/forward/internal.rs +++ b/liveion/src/forward/internal.rs @@ -1,5 +1,7 @@ use std::borrow::ToOwned; use std::sync::Arc; +#[cfg(feature = "source")] +use std::sync::atomic::{AtomicBool, Ordering}; use chrono::Utc; use libwish::Client; @@ -67,6 +69,8 @@ pub(crate) struct PeerForwardInternal { event_sender: broadcast::Sender, #[cfg(feature = "source")] channel: Channel, + #[cfg(feature = "source")] + channel_started: AtomicBool, } impl PeerForwardInternal { @@ -93,6 +97,7 @@ impl PeerForwardInternal { ice_server, event_sender: new_broadcast_channel!(16), channel, + channel_started: AtomicBool::new(false), } } @@ -240,6 +245,26 @@ impl PeerForwardInternal { Ok(()) } + /// Initialize the UDP <-> DataChannel bridge for this stream, if configured. + /// Idempotent: subsequent calls are no-ops (guarded by AtomicBool). + #[cfg(feature = "source")] + pub(crate) async fn try_init_udp_channel(&self) -> Result<()> { + if self + .channel_started + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_err() + { + return Ok(()); + } + + if let Some(stream_cfg) = self.channel.streams.get(&self.stream).cloned() { + let dc_rx = self.data_channel_forward.publish.subscribe(); + let dc_tx = self.data_channel_forward.subscribe.clone(); + super::channel::spawn_channel(self.stream.clone(), dc_rx, dc_tx, stream_cfg).await?; + } + Ok(()) + } + async fn data_channel_forward( dc: Arc, sender: broadcast::Sender>, @@ -493,17 +518,8 @@ impl PeerForwardInternal { ) -> Result<()> { let sender = self.data_channel_forward.subscribe.clone(); let receiver = self.data_channel_forward.publish.subscribe(); - // DataChannel ↔ UDP bidirectional forwarding (feature=source only). - // Messages from the WHIP publisher arrive on the subscribe channel. #[cfg(feature = "source")] - if let Some(stream_cfg) = self.channel.streams.get(&self.stream).cloned() { - // UDP acts as a member of the WHIP group: - // - dc_rx: receive messages from WHEP group (publish channel) - // - dc_tx: send messages to WHEP group (subscribe channel) - let dc_rx = self.data_channel_forward.publish.subscribe(); - let dc_tx = self.data_channel_forward.subscribe.clone(); - super::channel::spawn_channel(self.stream.clone(), dc_rx, dc_tx, stream_cfg).await?; - } + self.try_init_udp_channel().await?; Self::data_channel_forward(dc, sender, receiver).await; Ok(()) } diff --git a/liveion/src/forward/mod.rs b/liveion/src/forward/mod.rs index 9bf54cb4..05811e03 100644 --- a/liveion/src/forward/mod.rs +++ b/liveion/src/forward/mod.rs @@ -91,6 +91,11 @@ impl PeerForward { internal: Arc::new(PeerForwardInternal::new(stream, ice_server)), } } + #[cfg(feature = "source")] + pub(crate) async fn try_init_udp_channel(&self) -> Result<()> { + self.internal.try_init_udp_channel().await + } + pub fn subscribe_event(&self) -> broadcast::Receiver { self.internal.subscribe_event() } diff --git a/liveion/src/stream/manager.rs b/liveion/src/stream/manager.rs index d6aab52b..36ef5d3c 100644 --- a/liveion/src/stream/manager.rs +++ b/liveion/src/stream/manager.rs @@ -235,7 +235,7 @@ impl Manager { metrics::STREAM.inc(); let _ = self.event_sender.send(Event::Stream(StreamEvent { stream: Stream { - stream, + stream: stream.clone(), session: None, publish: 1, subscribe: 0, @@ -243,6 +243,10 @@ impl Manager { }, r#type: StreamEventType::Up, })); + #[cfg(feature = "source")] + if let Err(e) = forward.try_init_udp_channel().await { + tracing::warn!("Failed to init UDP channel for stream {}: {:?}", stream, e); + } forward } @@ -616,6 +620,14 @@ impl Manager { stream_map.insert(stream_id.to_string(), forward.clone()); tracing::info!("Created PeerForward for source: {}", stream_id); + #[cfg(feature = "source")] + if let Err(e) = forward.try_init_udp_channel().await { + tracing::warn!( + "Failed to init UDP channel for source {}: {:?}", + stream_id, + e + ); + } Arc::new(forward) } } diff --git a/tests/channel.rs b/tests/channel.rs index 5026a865..6b1d342c 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -1,41 +1,40 @@ -/// Integration test: DataChannel <-> UDP forwarding in whepfrom +/// Integration test: liveion UDP channel <-> whepfrom DataChannel <-> UDP /// -/// This test verifies that whepfrom correctly bridges DataChannel messages -/// to/from UDP when started with the --channel flag. +/// This test verifies end-to-end DataChannel <-> UDP forwarding without a WHIP +/// publisher. liveion's own UDP channel (channel.streams) is initialized at +/// stream creation time, and whepfrom bridges its DataChannel to UDP via +/// the --channel flag. /// /// Topology: /// -/// liveion (SFU) +/// UDP sender --> liveion UDP listen (8702) /// | -/// WHIP publisher (whipinto, no media, DataChannel only) -/// | DataChannel WHIP group -/// liveion internal broadcast -/// | DataChannel WHEP group -/// whepfrom (WHEP subscriber + --channel) +/// liveion subscribe broadcast --> all WHEP subscribers' DataChannels /// | -/// UDP socket (whepfrom_ch_target) <-- receives forwarded messages +/// whepfrom DataChannel --> whepfrom UDP target (8701) /// /// And the reverse: /// -/// UDP sender --> whepfrom_ch_listen +/// UDP sender --> whepfrom UDP listen (8700) /// | /// whepfrom DataChannel --> liveion publish broadcast /// | -/// liveion DataChannel write loop --> WHIP publisher DataChannel -/// -/// NOTE: liveion's own UDP channel (channel.streams) requires a DataChannel -/// to be opened by the WHIP publisher. Since whip::into does not open a -/// DataChannel, we test only the whepfrom side here. The liveion UDP channel -/// is covered by liveion's own unit tests (forward/channel.rs). +/// liveion UDP channel --> liveion UDP target (8703) +#[cfg(feature = "source")] use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +#[cfg(feature = "source")] use tokio::net::{TcpListener, UdpSocket}; +#[cfg(feature = "source")] use tokio_util::sync::CancellationToken; +#[cfg(feature = "source")] mod common; +#[cfg(feature = "source")] use common::shutdown_signal; -async fn wait_for_session_connected(addr: &SocketAddr, stream_id: &str, is_publish: bool) -> bool { +#[cfg(feature = "source")] +async fn wait_for_session_connected(addr: &SocketAddr, stream_id: &str) -> bool { for _ in 0..200 { let body = reqwest::get(format!("http://{addr}{}", api::path::streams(""))) .await @@ -44,39 +43,45 @@ async fn wait_for_session_connected(addr: &SocketAddr, stream_id: &str, is_publi .await .unwrap_or_default(); - if let Some(stream) = body.into_iter().find(|s| s.id == stream_id) { - let sessions = if is_publish { - &stream.publish.sessions - } else { - &stream.subscribe.sessions - }; - if !sessions.is_empty() - && sessions[0].state == api::response::RTCPeerConnectionState::Connected - { - return true; - } + if let Some(stream) = body.into_iter().find(|s| s.id == stream_id) + && !stream.subscribe.sessions.is_empty() + && stream.subscribe.sessions[0].state + == api::response::RTCPeerConnectionState::Connected + { + return true; } tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } false } +#[cfg(feature = "source")] #[tokio::test] async fn test_whepfrom_datachannel_udp_forwarding() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let stream_id = "test-dc-channel"; - // ── 1. Pick free ports ──────────────────────────────────────────────────── - let whepfrom_ch_listen: u16 = portpicker::pick_unused_port().unwrap(); - let whepfrom_ch_target: u16 = portpicker::pick_unused_port().unwrap(); - - // ── 2. Start liveion ────────────────────────────────────────────────────── - let cfg = liveion::config::Config::default(); + // ── 1. Static ports ──────────────────────────────────────────────────────── + let whepfrom_ch_listen: u16 = 8700; + let whepfrom_ch_target: u16 = 8701; + let liveion_ch_listen: u16 = 8702; + let liveion_ch_target: u16 = 8703; + + // ── 2. Start liveion with UDP channel config ──────────────────────────────── + let mut cfg = liveion::config::Config::default(); + cfg.channel.streams.insert( + stream_id.to_string(), + liveion::config::ChannelStream { + url: format!( + "udp://0.0.0.0:{liveion_ch_listen}?host=127.0.0.1&port={liveion_ch_target}" + ), + }, + ); let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(liveion::serve(cfg, listener, shutdown_signal())); - // ── 3. Create the stream ────────────────────────────────────────────────── + // ── 3. Create the stream ──────────────────────────────────────────────────── let res = reqwest::Client::new() .post(format!("http://{addr}{}", api::path::streams(stream_id))) .send() @@ -84,35 +89,8 @@ async fn test_whepfrom_datachannel_udp_forwarding() { .unwrap(); assert_eq!(http::StatusCode::NO_CONTENT, res.status()); - // ── 4. Connect a WHIP publisher (SDP file, no real media) ───────────────── - // whip::into reads the SDP and establishes a WebRTC connection. - // This puts a peer in the WHIP group so liveion can relay DataChannel messages. - let sdp_content = "v=0\r\n\ - o=- 0 0 IN IP4 127.0.0.1\r\n\ - s=test\r\n\ - c=IN IP4 127.0.0.1\r\n\ - t=0 0\r\n\ - m=video 5004 RTP/AVP 96\r\n\ - a=rtpmap:96 VP8/90000\r\n"; - let sdp_file = tempfile::NamedTempFile::new().unwrap(); - std::fs::write(sdp_file.path(), sdp_content).unwrap(); - let sdp_path = sdp_file.path().to_str().unwrap().to_string(); - + // ── 4. Start whepfrom with --channel ─────────────────────────────────────── let ct = CancellationToken::new(); - let handle_whip = tokio::spawn(livetwo::whip::into( - ct.clone(), - sdp_path, - format!("http://{addr}{}", api::path::whip(stream_id)), - None, - None, - )); - - assert!( - wait_for_session_connected(&addr, stream_id, true).await, - "WHIP publisher failed to connect" - ); - - // ── 5. Start whepfrom with --channel ───────────────────────────────────── let whep_channel_url = format!("udp://0.0.0.0:{whepfrom_ch_listen}?host=127.0.0.1&port={whepfrom_ch_target}"); let handle_whepfrom = tokio::spawn(livetwo::whep::from( @@ -126,70 +104,72 @@ async fn test_whepfrom_datachannel_udp_forwarding() { )); assert!( - wait_for_session_connected(&addr, stream_id, false).await, + wait_for_session_connected(&addr, stream_id).await, "WHEP subscriber (whepfrom) failed to connect" ); // Bind receivers before sending so no packets are dropped - let whepfrom_receiver = UdpSocket::bind(format!("127.0.0.1:{whepfrom_ch_target}")) + let whepfrom_target = UdpSocket::bind(format!("127.0.0.1:{whepfrom_ch_target}")) + .await + .unwrap(); + let liveion_target = UdpSocket::bind(format!("127.0.0.1:{liveion_ch_target}")) .await .unwrap(); - // Give DataChannel time to open and detach on both sides + // Give DataChannel time to open and detach tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - // ── 6. Test path: whepfrom UDP listen → DataChannel → liveion ──────────── - // Send a UDP packet into whepfrom's listen port. - // whepfrom forwards it via DataChannel to liveion's subscribe broadcast. - // liveion's write loop sends it to the WHIP publisher's DataChannel. - // (We don't assert receipt at the WHIP side here since whip::into has no - // DataChannel receive path, but we verify the send doesn't error out.) let udp_sender = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let msg_to_dc = b"udp to datachannel"; + let mut buf = vec![0u8; 256]; + + // ── 5. Test: UDP → liveion listen → DC → whepfrom target ─────────────────── + let msg_liveion_to_whepfrom = b"liveion->dc->whepfrom"; udp_sender - .send_to(msg_to_dc, format!("127.0.0.1:{whepfrom_ch_listen}")) + .send_to( + msg_liveion_to_whepfrom, + format!("127.0.0.1:{liveion_ch_listen}"), + ) .await .unwrap(); - // Small delay to let the message propagate - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - // ── 7. Test path: liveion DataChannel → whepfrom UDP target ────────────── - // Inject a message directly into liveion's subscribe broadcast by having - // the WHIP publisher's DataChannel send it. Since whip::into doesn't expose - // a DataChannel send API, we verify the reverse path via the whepfrom UDP - // channel's own loopback: send to whepfrom listen, expect it at whepfrom target - // after it round-trips through the DataChannel. - // - // For a full end-to-end test of liveion UDP <-> whepfrom UDP, the WHIP - // publisher would need to open a DataChannel (not currently supported by - // whip::into). That path is covered by manual integration testing. - // - // Here we verify that whepfrom correctly forwards UDP→DC without errors, - // and that the channel infrastructure is wired up correctly. - let msg_echo = b"echo test"; + let (n, _) = tokio::time::timeout( + std::time::Duration::from_secs(2), + whepfrom_target.recv_from(&mut buf), + ) + .await + .expect("timeout waiting for message at whepfrom target") + .unwrap(); + assert_eq!( + &buf[..n], + msg_liveion_to_whepfrom, + "unexpected data at whepfrom target" + ); + + // ── 6. Test: UDP → whepfrom listen → DC → liveion target ─────────────────── + let msg_whepfrom_to_liveion = b"whepfrom->dc->liveion"; udp_sender - .send_to(msg_echo, format!("127.0.0.1:{whepfrom_ch_listen}")) + .send_to( + msg_whepfrom_to_liveion, + format!("127.0.0.1:{whepfrom_ch_listen}"), + ) .await .unwrap(); - // The message should NOT appear at whepfrom_ch_target (it went DC→liveion, - // not looped back). Verify the receiver stays empty for a short window. - tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; - // try_recv_from is non-blocking; WouldBlock confirms the message went into - // the DataChannel rather than being echoed back to UDP. - let mut buf = vec![0u8; 256]; - let recv_result = whepfrom_receiver.try_recv_from(&mut buf); - assert!( - recv_result.is_err(), - "unexpected data at whepfrom UDP target: {:?}", - recv_result + let (n, _) = tokio::time::timeout( + std::time::Duration::from_secs(2), + liveion_target.recv_from(&mut buf), + ) + .await + .expect("timeout waiting for message at liveion target") + .unwrap(); + assert_eq!( + &buf[..n], + msg_whepfrom_to_liveion, + "unexpected data at liveion target" ); - // ── 8. Teardown ─────────────────────────────────────────────────────────── + // ── 7. Teardown ───────────────────────────────────────────────────────────── ct.cancel(); let result_whepfrom = handle_whepfrom.await.unwrap(); - let result_whip = handle_whip.await.unwrap(); assert!(result_whepfrom.is_ok()); - assert!(result_whip.is_ok()); } From 152cd08a7bc094e406f6d79b58b76699d86a6405 Mon Sep 17 00:00:00 2001 From: HuYilong Date: Thu, 21 May 2026 21:14:27 +0800 Subject: [PATCH 5/5] Fix parsing issues,dead code and static port problems --- liveion/src/config.rs | 50 +++++++++++++++++++-------------- liveion/src/forward/internal.rs | 2 -- tests/rtp.rs | 36 +++++++++++++++++++----- 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/liveion/src/config.rs b/liveion/src/config.rs index afa5de6f..0258e165 100644 --- a/liveion/src/config.rs +++ b/liveion/src/config.rs @@ -2,6 +2,8 @@ use std::{env, net::SocketAddr, str::FromStr}; use iceserver::{IceServer, default_ice_servers}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "source")] +use url::Url; #[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct Config { @@ -139,37 +141,43 @@ impl ChannelStream { /// Parse the URL into (listen_host, listen_port, target_host, target_port). /// Supported format: udp://:?host=&port= pub fn parse(&self) -> Option<(String, u16, String, u16)> { - // Parse udp://:?host=&port= - let s = self.url.strip_prefix("udp://")?; - let (host_port, query) = s.split_once('?')?; - // rsplit_once handles IPv6 like [::1]:7774 correctly - let (listen_host_raw, listen_port_str) = host_port.rsplit_once(':')?; - let listen_port: u16 = listen_port_str.parse().ok()?; - // Strip brackets from IPv6 address e.g. [::1] -> ::1, then re-add for socket addr - let listen_host_inner = listen_host_raw.trim_matches(|c| c == '[' || c == ']'); - let listen_host = if listen_host_inner.contains(':') { - format!("[{}]", listen_host_inner) + let parsed = Url::parse(&self.url).ok()?; + if parsed.scheme() != "udp" { + return None; + } + + // url::Url::host_str() returns IPv6 already bracketed (e.g. "[::1]"). + // Normalize to bracketed form for socket addresses. + let listen_host = parsed.host_str()?.to_string(); + let listen_host = if listen_host.starts_with('[') { + listen_host + } else if listen_host.contains(':') { + format!("[{}]", listen_host) } else { - listen_host_inner.to_string() + listen_host }; + let listen_port = parsed.port()?; let mut target_host = String::new(); let mut target_port: u16 = 0; - for param in query.split('&') { - if let Some(v) = param.strip_prefix("host=") { - // Wrap IPv6 addresses in brackets for use as socket address - target_host = if v.parse::().is_ok() { - format!("[{}]", v) - } else { - v.to_string() - }; - } else if let Some(v) = param.strip_prefix("port=") { - target_port = v.parse().ok()?; + for (key, value) in parsed.query_pairs() { + match key.as_ref() { + "host" => target_host = value.into_owned(), + "port" => target_port = value.parse().ok()?, + _ => {} } } if target_host.is_empty() || target_port == 0 { return None; } + + // query_pairs() returns raw IPv6 without brackets, add them for socket addresses + let target_host = if target_host.contains(':') { + format!("[{}]", target_host) + } else { + target_host + }; + Some((listen_host, listen_port, target_host, target_port)) } } diff --git a/liveion/src/forward/internal.rs b/liveion/src/forward/internal.rs index 40745d18..b5fa102c 100644 --- a/liveion/src/forward/internal.rs +++ b/liveion/src/forward/internal.rs @@ -518,8 +518,6 @@ impl PeerForwardInternal { ) -> Result<()> { let sender = self.data_channel_forward.subscribe.clone(); let receiver = self.data_channel_forward.publish.subscribe(); - #[cfg(feature = "source")] - self.try_init_udp_channel().await?; Self::data_channel_forward(dc, sender, receiver).await; Ok(()) } diff --git a/tests/rtp.rs b/tests/rtp.rs index 6aeb5fb9..03cee126 100644 --- a/tests/rtp.rs +++ b/tests/rtp.rs @@ -32,6 +32,8 @@ async fn test_livetwo_rtp_vp8() { audio: None, video: Some((width, height)), }, + 8800, + 8802, ) .await; } @@ -54,6 +56,8 @@ async fn test_livetwo_rtp_vp8_ipv6() { audio: None, video: Some((width, height)), }, + 8804, + 8806, ) .await; } @@ -78,6 +82,8 @@ async fn test_livetwo_rtp_vp9() { audio: None, video: Some((width, height)), }, + 8808, + 8810, ) .await; } @@ -100,6 +106,8 @@ async fn test_livetwo_rtp_h264() { audio: None, video: Some((width, height)), }, + 8812, + 8814, ) .await; } @@ -122,6 +130,8 @@ async fn test_livetwo_rtp_h265() { audio: None, video: Some((width, height)), }, + 8816, + 8818, ) .await; } @@ -146,6 +156,8 @@ async fn test_livetwo_rtp_vp9_4k() { audio: None, video: Some((width, height)), }, + 8820, + 8822, ) .await; } @@ -166,6 +178,8 @@ async fn test_livetwo_rtp_opus() { audio: Some(2), video: None, }, + 8824, + 8826, ) .await; } @@ -186,6 +200,8 @@ async fn test_livetwo_rtp_g722() { audio: Some(1), video: None, }, + 8828, + 8830, ) .await; } @@ -212,6 +228,8 @@ async fn test_livetwo_rtp_vp8_opus() { audio: Some(2), video: Some((width, height)), }, + 8832, + 8834, ) .await; } @@ -238,11 +256,20 @@ async fn test_livetwo_rtp_h264_g722() { audio: Some(1), video: Some((width, height)), }, + 8838, + 8840, ) .await; } -async fn helper_livetwo_rtp(ip: IpAddr, port: u16, prefix: &str, detect: Detect) { +async fn helper_livetwo_rtp( + ip: IpAddr, + port: u16, + prefix: &str, + detect: Detect, + whip_port: u16, + whep_port: u16, +) { let cfg = liveion::config::Config::default(); let listener = TcpListener::bind(SocketAddr::new(ip, port)).await.unwrap(); @@ -273,10 +300,7 @@ async fn helper_livetwo_rtp(ip: IpAddr, port: u16, prefix: &str, detect: Detect) .unwrap() .to_string(); - // Use port 0 to let the OS assign a free port, avoiding conflicts when - // tests run concurrently (WSAEADDRINUSE / -10048 on Windows). - let whip_port = portpicker::pick_unused_port().unwrap(); - + // Static port assigned per test to avoid WSAEADDRINUSE (-10048) on Windows CI. let ct = CancellationToken::new(); let handle_whip = tokio::spawn(livetwo::whip::into( ct.clone(), @@ -330,8 +354,6 @@ async fn helper_livetwo_rtp(ip: IpAddr, port: u16, prefix: &str, detect: Detect) _ => ip.to_string(), }; - let whep_port = portpicker::pick_unused_port().unwrap(); - let target_url = if detect.audio.is_some() && detect.video.is_some() { format!( "rtp://{}?video={}&audio={}",