Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ base64 = "0.22"
[dev-dependencies]
http = { workspace = true }
serde_json = { workspace = true }
portpicker = "0.1.1"

api = { path = "libs/api" }
cli = { path = "libs/cli" }
Expand Down
50 changes: 29 additions & 21 deletions liveion/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::{env, net::SocketAddr, str::FromStr};

use iceserver::{IceServer, default_ice_servers};
use serde::{Deserialize, Serialize};
#[cfg(feature = "source")]
use url::Url;

#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct Config {
Expand Down Expand Up @@ -139,37 +141,43 @@ impl ChannelStream {
/// Parse the URL into (listen_host, listen_port, target_host, target_port).
/// Supported format: udp://<listen_host>:<listen_port>?host=<target_host>&port=<target_port>
pub fn parse(&self) -> Option<(String, u16, String, u16)> {
// Parse udp://<listen_host>:<listen_port>?host=<target_host>&port=<target_port>
let s = self.url.strip_prefix("udp://")?;
let (host_port, query) = s.split_once('?')?;
// rsplit_once handles IPv6 like [::1]:7774 correctly
let (listen_host_raw, listen_port_str) = host_port.rsplit_once(':')?;
let listen_port: u16 = listen_port_str.parse().ok()?;
// Strip brackets from IPv6 address e.g. [::1] -> ::1, then re-add for socket addr
let listen_host_inner = listen_host_raw.trim_matches(|c| c == '[' || c == ']');
let listen_host = if listen_host_inner.contains(':') {
format!("[{}]", listen_host_inner)
let parsed = Url::parse(&self.url).ok()?;
if parsed.scheme() != "udp" {
return None;
}

// url::Url::host_str() returns IPv6 already bracketed (e.g. "[::1]").
// Normalize to bracketed form for socket addresses.
let listen_host = parsed.host_str()?.to_string();
let listen_host = if listen_host.starts_with('[') {
listen_host
} else if listen_host.contains(':') {
format!("[{}]", listen_host)
} else {
listen_host_inner.to_string()
listen_host
};
let listen_port = parsed.port()?;

let mut target_host = String::new();
let mut target_port: u16 = 0;
for param in query.split('&') {
if let Some(v) = param.strip_prefix("host=") {
// Wrap IPv6 addresses in brackets for use as socket address
target_host = if v.parse::<std::net::Ipv6Addr>().is_ok() {
format!("[{}]", v)
} else {
v.to_string()
};
} else if let Some(v) = param.strip_prefix("port=") {
target_port = v.parse().ok()?;
for (key, value) in parsed.query_pairs() {
match key.as_ref() {
"host" => target_host = value.into_owned(),
"port" => target_port = value.parse().ok()?,
_ => {}
}
}
if target_host.is_empty() || target_port == 0 {
return None;
}

// query_pairs() returns raw IPv6 without brackets, add them for socket addresses
let target_host = if target_host.contains(':') {
format!("[{}]", target_host)
} else {
target_host
};

Some((listen_host, listen_port, target_host, target_port))
}
}
Expand Down
36 changes: 25 additions & 11 deletions liveion/src/forward/internal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::borrow::ToOwned;
use std::sync::Arc;
#[cfg(feature = "source")]
use std::sync::atomic::{AtomicBool, Ordering};

use chrono::Utc;
use libwish::Client;
Expand Down Expand Up @@ -67,6 +69,8 @@ pub(crate) struct PeerForwardInternal {
event_sender: broadcast::Sender<ForwardEvent>,
#[cfg(feature = "source")]
channel: Channel,
#[cfg(feature = "source")]
channel_started: AtomicBool,
}

impl PeerForwardInternal {
Expand All @@ -93,6 +97,7 @@ impl PeerForwardInternal {
ice_server,
event_sender: new_broadcast_channel!(16),
channel,
channel_started: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -240,6 +245,26 @@ impl PeerForwardInternal {
Ok(())
}

/// Initialize the UDP <-> DataChannel bridge for this stream, if configured.
/// Idempotent: subsequent calls are no-ops (guarded by AtomicBool).
#[cfg(feature = "source")]
pub(crate) async fn try_init_udp_channel(&self) -> Result<()> {
if self
.channel_started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return Ok(());
}

if let Some(stream_cfg) = self.channel.streams.get(&self.stream).cloned() {
let dc_rx = self.data_channel_forward.publish.subscribe();
let dc_tx = self.data_channel_forward.subscribe.clone();
super::channel::spawn_channel(self.stream.clone(), dc_rx, dc_tx, stream_cfg).await?;
}
Ok(())
}

async fn data_channel_forward(
dc: Arc<RTCDataChannel>,
sender: broadcast::Sender<Vec<u8>>,
Expand Down Expand Up @@ -493,17 +518,6 @@ impl PeerForwardInternal {
) -> Result<()> {
let sender = self.data_channel_forward.subscribe.clone();
let receiver = self.data_channel_forward.publish.subscribe();
// DataChannel ↔ UDP bidirectional forwarding (feature=source only).
// Messages from the WHIP publisher arrive on the subscribe channel.
#[cfg(feature = "source")]
if let Some(stream_cfg) = self.channel.streams.get(&self.stream).cloned() {
// UDP acts as a member of the WHIP group:
// - dc_rx: receive messages from WHEP group (publish channel)
// - dc_tx: send messages to WHEP group (subscribe channel)
let dc_rx = self.data_channel_forward.publish.subscribe();
let dc_tx = self.data_channel_forward.subscribe.clone();
super::channel::spawn_channel(self.stream.clone(), dc_rx, dc_tx, stream_cfg).await?;
}
Self::data_channel_forward(dc, sender, receiver).await;
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions liveion/src/forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ impl PeerForward {
internal: Arc::new(PeerForwardInternal::new(stream, ice_server)),
}
}
#[cfg(feature = "source")]
pub(crate) async fn try_init_udp_channel(&self) -> Result<()> {
self.internal.try_init_udp_channel().await
}

pub fn subscribe_event(&self) -> broadcast::Receiver<ForwardEvent> {
self.internal.subscribe_event()
}
Expand Down
14 changes: 13 additions & 1 deletion liveion/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,18 @@ impl Manager {
metrics::STREAM.inc();
let _ = self.event_sender.send(Event::Stream(StreamEvent {
stream: Stream {
stream,
stream: stream.clone(),
session: None,
publish: 1,
subscribe: 0,
reforward: 0,
},
r#type: StreamEventType::Up,
}));
#[cfg(feature = "source")]
if let Err(e) = forward.try_init_udp_channel().await {
tracing::warn!("Failed to init UDP channel for stream {}: {:?}", stream, e);
}
forward
}

Expand Down Expand Up @@ -616,6 +620,14 @@ impl Manager {
stream_map.insert(stream_id.to_string(), forward.clone());

tracing::info!("Created PeerForward for source: {}", stream_id);
#[cfg(feature = "source")]
if let Err(e) = forward.try_init_udp_channel().await {
tracing::warn!(
"Failed to init UDP channel for source {}: {:?}",
stream_id,
e
);
}
Arc::new(forward)
}
}
Expand Down
11 changes: 9 additions & 2 deletions livetwo/src/utils/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use webrtc::{
api::{APIBuilder, interceptor_registry::register_default_interceptors, media_engine::*},
api::{
APIBuilder, interceptor_registry::register_default_interceptors, media_engine::*,
setting_engine::SettingEngine,
},
ice_transport::ice_server::RTCIceServer,
interceptor::registry::Registry,
peer_connection::{
Expand All @@ -22,9 +25,13 @@ pub async fn create_api() -> Result<(APIBuilder, RTCConfiguration)> {
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;

let mut s = SettingEngine::default();
s.detach_data_channels();

let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry);
.with_interceptor_registry(registry)
.with_setting_engine(s);

let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
Expand Down
Loading
Loading