-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathmain.rs
246 lines (211 loc) · 8.9 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
use anyhow::Result;
use clap::{App, AppSettings, Arg};
use hyper::{Body, Client, Method, Request};
use std::io::Write;
use std::sync::Arc;
use tokio::time::Duration;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_VP8};
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::signaling_state::RTCSignalingState;
use webrtc::rtp_transceiver::rtp_codec::{
RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType,
};
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::TrackLocal;
//const ECHO_TEST_SERVER_URL: &str = "http://localhost:8080/offer";
const CONNECTION_ATTEMPT_TIMEOUT_SECONDS: u64 = 10;
const SUCCESS_RETURN_VALUE: i32 = 0;
//const ERROR_RETURN_VALUE: i32 = 1;
#[tokio::main]
async fn main() -> Result<()> {
let mut app = App::new("webrtc-rs echo client")
.version("0.1.0")
.author("Rain Liu <[email protected]>")
.about("An example of webrtc-rs echo client.")
.setting(AppSettings::DeriveDisplayOrder)
.setting(AppSettings::SubcommandsNegateReqs)
.arg(
Arg::with_name("FULLHELP")
.help("Prints more detailed help information")
.long("fullhelp"),
)
.arg(
Arg::with_name("debug")
.long("debug")
.short("d")
.help("Prints debug log information"),
)
.arg(
Arg::with_name("server_url")
.default_value("http://localhost:8080/offer")
.help("Echo HTTP server is hosted on."),
);
let matches = app.clone().get_matches();
if matches.is_present("FULLHELP") {
app.print_long_help().unwrap();
std::process::exit(0);
}
let debug = matches.is_present("debug");
if debug {
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{}:{} [{}] {} - {}",
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
record.level(),
chrono::Local::now().format("%H:%M:%S.%6f"),
record.args()
)
})
.filter(None, log::LevelFilter::Trace)
.init();
}
let echo_test_server_url = matches.value_of("server_url").unwrap().to_owned();
// Everything below is the WebRTC.rs API! Thanks for using it ❤️.
// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();
// Setup the codecs you want to use.
// We'll use a VP8 but you can also define your own
m.register_codec(
RTCRtpCodecParameters {
capability: RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
clock_rate: 90000,
channels: 0,
sdp_fmtp_line: "".to_owned(),
rtcp_feedback: vec![],
},
payload_type: 96,
..Default::default()
},
RTPCodecType::Video,
)?;
// Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new();
// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;
// Create the API object with the MediaEngine
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
// Prepare the configuration
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);
// Create Track that we send video back to browser on
let output_track = Arc::new(TrackLocalStaticRTP::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
"video".to_owned(),
"webrtc-rs".to_owned(),
));
// Add this newly created track to the PeerConnection
let rtp_sender = peer_connection
.add_track(Arc::clone(&output_track) as Arc<dyn TrackLocal + Send + Sync>)
.await?;
// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});
// Add ICE candidates to the local offer (simulates non-trickle).
peer_connection.on_ice_candidate(Box::new(move |c: Option<RTCIceCandidate>| {
if c.is_none() {
//println!(peer_connection.LocalDescription())
}
Box::pin(async {})
}));
// Create an offer to send to the other process
let offer = peer_connection.create_offer(None).await?;
// Sets the LocalDescription, and starts our UDP listeners
peer_connection.set_local_description(offer).await?;
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change(Box::new(
|connection_state: RTCIceConnectionState| {
println!("ICE connection state has changed {}.", connection_state);
Box::pin(async {})
},
));
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
let done_tx = Arc::new(done_tx);
peer_connection.on_peer_connection_state_change(Box::new(
move |state: RTCPeerConnectionState| {
println!("Peer connection state has changed to {}.", state);
let done_tx2 = Arc::clone(&done_tx);
Box::pin(async move {
if state == RTCPeerConnectionState::Connected {
let _ = done_tx2.send(()).await;
} else if state == RTCPeerConnectionState::Disconnected
|| state == RTCPeerConnectionState::Failed
|| state == RTCPeerConnectionState::Closed
{
std::process::exit(SUCCESS_RETURN_VALUE);
}
})
},
));
peer_connection.on_signaling_state_change(Box::new(|state: RTCSignalingState| {
println!("Signaling state has changed to {}.", state);
Box::pin(async {})
}));
// Create channel that is blocked until ICE Gathering is complete
let mut gather_complete = peer_connection.gathering_complete_promise().await;
// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
let _ = gather_complete.recv().await;
println!("Attempting to POST offer to {}.", echo_test_server_url);
// POST the offer to the echo server.
let offer_with_ice = peer_connection.local_description().await.unwrap();
let offer_buf = serde_json::to_string(&offer_with_ice)?;
let req = Request::builder()
.method(Method::POST)
.uri(echo_test_server_url.to_owned())
.header("content-type", "application/json")
.body(Body::from(offer_buf))?;
let resp = Client::new().request(req).await?;
println!("POST offer response Status: {}", resp.status());
let buf = hyper::body::to_bytes(resp.into_body()).await?;
let sdp_str = std::str::from_utf8(&buf)?;
let answer = serde_json::from_str::<RTCSessionDescription>(sdp_str)?;
// Set the remote SessionDescription
peer_connection.set_remote_description(answer).await?;
let timeout = tokio::time::sleep(Duration::from_secs(CONNECTION_ATTEMPT_TIMEOUT_SECONDS));
tokio::pin!(timeout);
tokio::select! {
_ = timeout.as_mut() =>{
println!("Peer Connnection TimeOut! Exit echo client now...");
}
_ = done_rx.recv() => {
println!("Peer Connnection Connected! Exit echo client now...");
}
}
peer_connection.close().await?;
Ok(())
}