diff --git a/Cargo.lock b/Cargo.lock index c314ead..86c1a42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,6 +64,12 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "cfg-if" version = "1.0.1" @@ -458,6 +464,7 @@ dependencies = [ name = "windows-capture" version = "1.5.0" dependencies = [ + "byteorder", "clap", "ctrlc", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 567bcf8..054853b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,9 @@ rayon = "1.10.0" # Error handling thiserror = "2.0.12" +# Network serialization +byteorder = "1.5" + [dev-dependencies] clap = { version = "4.5.40", features = ["derive"] } ctrlc = "3.4.7" @@ -75,5 +78,9 @@ doc-scrape-examples = false name = "cli" doc-scrape-examples = false +[[example]] +name = "streaming" +doc-scrape-examples = false + [workspace] members = ["windows-capture-python"] diff --git a/README.md b/README.md index c56bb26..e0bcc46 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,99 @@ fn main() { } ``` +## Real-time Streaming + +Windows Capture now supports real-time video streaming without writing to files! This feature allows you to transmit encoded video frames over the network for live streaming applications. + +### Streaming Example + +```rust +use windows_capture::capture::{Context, GraphicsCaptureApiHandler}; +use windows_capture::encoder::{ + AudioSettingsBuilder, ContainerSettingsBuilder, StreamingVideoEncoder, VideoSettingsBuilder, +}; +use windows_capture::frame::Frame; +use windows_capture::graphics_capture_api::InternalCaptureControl; +use windows_capture::monitor::Monitor; +use windows_capture::network::{NetworkCallback, NetworkConfig, Protocol}; +use windows_capture::settings::{ + ColorFormat, CursorCaptureSettings, DirtyRegionSettings, DrawBorderSettings, + MinimumUpdateIntervalSettings, SecondaryWindowSettings, Settings, +}; + +struct StreamingCapture { + encoder: Option, + start: std::time::Instant, +} + +impl GraphicsCaptureApiHandler for StreamingCapture { + type Flags = String; + type Error = Box; + + fn new(ctx: Context) -> Result { + let monitor = Monitor::primary()?; + let width = monitor.width()?; + let height = monitor.height()?; + + let video_settings = VideoSettingsBuilder::new(width, height); + let audio_settings = AudioSettingsBuilder::default().disabled(true); + let container_settings = ContainerSettingsBuilder::default(); + + // Create network callback for TCP streaming + let config = NetworkConfig { + protocol: Protocol::Tcp, + address: "127.0.0.1:8080".to_string(), + frame_rate: 30, + ..Default::default() + }; + let callback = Box::new(NetworkCallback::new(config)?); + + let encoder = StreamingVideoEncoder::new( + video_settings, + audio_settings, + container_settings, + callback, + )?; + + Ok(Self { + encoder: Some(encoder), + start: std::time::Instant::now(), + }) + } + + fn on_frame_arrived( + &mut self, + frame: &mut Frame, + _capture_control: InternalCaptureControl, + ) -> Result<(), Self::Error> { + // Send the frame to the streaming encoder + self.encoder.as_mut().unwrap().send_frame(frame)?; + Ok(()) + } +} +``` + +### Supported Protocols + +- **TCP**: Reliable transmission for local network streaming +- **UDP**: Fast transmission with potential packet loss +- **File**: Save encoded frames to files for debugging +- **WebRTC**: Real-time communication (planned) +- **RTMP**: Streaming protocol (planned) + +### Running the Streaming Example + +```bash +# Stream to TCP server +cargo run --example streaming tcp + +# Stream to UDP client +cargo run --example streaming udp + +# Save encoded frames to files +cargo run --example streaming file +``` + ## Documentation Detailed documentation for each API and type can be found [here](https://docs.rs/windows-capture). diff --git a/examples/streaming.rs b/examples/streaming.rs new file mode 100644 index 0000000..a397c0d --- /dev/null +++ b/examples/streaming.rs @@ -0,0 +1,205 @@ +use std::io::{self, Write}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{Duration, Instant}; + +use windows_capture::capture::{Context, GraphicsCaptureApiHandler}; +use windows_capture::encoder::{ + AudioSettingsBuilder, ContainerSettingsBuilder, StreamingVideoEncoder, VideoSettingsBuilder, +}; +use windows_capture::frame::Frame; +use windows_capture::graphics_capture_api::InternalCaptureControl; +use windows_capture::monitor::Monitor; +use windows_capture::network::{NetworkCallback, NetworkConfig, Protocol, FileCallback}; +use windows_capture::settings::{ + ColorFormat, CursorCaptureSettings, DirtyRegionSettings, DrawBorderSettings, + MinimumUpdateIntervalSettings, SecondaryWindowSettings, Settings, +}; + +/// Handles the streaming capture events. +struct StreamingCapture { + /// The streaming video encoder used to encode the frames. + encoder: Option, + /// The timestamp of when the capture started, used to calculate the recording duration. + start: Instant, + /// A flag to signal the capture thread to stop. + stop_flag: Arc, + /// The number of frames captured since the last FPS calculation. + frame_count_since_reset: u64, + /// The timestamp of the last FPS calculation, used to measure the interval. + last_reset: Instant, +} + +impl GraphicsCaptureApiHandler for StreamingCapture { + /// The type of flags used to pass settings to the `new` function. + type Flags = (Arc, String); + + /// The error type that can be returned from the capture handlers. + type Error = Box; + + /// Called by the library to create a new instance of the handler. + fn new(ctx: Context) -> Result { + println!("Streaming capture started. Press Ctrl+C to stop."); + + let (stop_flag, protocol) = ctx.flags; + + let monitor = Monitor::primary()?; + let width = monitor.width()?; + let height = monitor.height()?; + + let video_settings = VideoSettingsBuilder::new(width, height); + let audio_settings = AudioSettingsBuilder::default().disabled(true); + let container_settings = ContainerSettingsBuilder::default(); + + // Create network callback based on protocol + let callback: Box = match protocol.as_str() { + "tcp" => { + let config = NetworkConfig { + protocol: Protocol::Tcp, + address: "127.0.0.1:8080".to_string(), + frame_rate: 30, + ..Default::default() + }; + Box::new(NetworkCallback::new(config)?) + } + "udp" => { + let config = NetworkConfig { + protocol: Protocol::Udp, + address: "127.0.0.1:8080".to_string(), + frame_rate: 30, + ..Default::default() + }; + Box::new(NetworkCallback::new(config)?) + } + "file" => { + Box::new(FileCallback::new("streaming_output".to_string())) + } + _ => { + eprintln!("Unknown protocol: {}. Using file callback as fallback.", protocol); + Box::new(FileCallback::new("streaming_output".to_string())) + } + }; + + let encoder = StreamingVideoEncoder::new( + video_settings, + audio_settings, + container_settings, + callback, + )?; + + Ok(Self { + encoder: Some(encoder), + start: Instant::now(), + stop_flag, + frame_count_since_reset: 0, + last_reset: Instant::now(), + }) + } + + /// Called for each new frame that is captured. + fn on_frame_arrived( + &mut self, + frame: &mut Frame, + capture_control: InternalCaptureControl, + ) -> Result<(), Self::Error> { + self.frame_count_since_reset += 1; + + // Calculate the time elapsed since the last FPS reset. + let elapsed_since_reset = self.last_reset.elapsed(); + // Calculate and display the current FPS. + let fps = self.frame_count_since_reset as f64 / elapsed_since_reset.as_secs_f64(); + + // Print the recording duration and current FPS. + print!( + "Streaming for: {:.2}s | FPS: {:.2}", + self.start.elapsed().as_secs_f64(), + fps + ); + io::stdout().flush()?; + + // Send the frame to the streaming video encoder. + self.encoder.as_mut().unwrap().send_frame(frame)?; + + // Check if the stop flag has been set (e.g., by Ctrl+C). + if self.stop_flag.load(Ordering::SeqCst) { + println!("\nStopping streaming..."); + + // Finalize the encoding. + self.encoder.take().unwrap().finish()?; + + // Signal the capture loop to stop. + capture_control.stop(); + + println!("Streaming stopped."); + } + + // Reset the FPS counter every second. + if elapsed_since_reset >= Duration::from_secs(1) { + self.frame_count_since_reset = 0; + self.last_reset = Instant::now(); + } + + Ok(()) + } + + /// Optional handler for when the capture item (e.g., a window) is closed. + fn on_closed(&mut self) -> Result<(), Self::Error> { + println!("\nCapture item closed, stopping streaming."); + + // Stop the capture gracefully. + self.stop_flag.store(true, Ordering::SeqCst); + + Ok(()) + } +} + +fn main() { + // Parse command line arguments + let args: Vec = std::env::args().collect(); + let protocol = if args.len() > 1 { + args[1].clone() + } else { + "file".to_string() // Default to file output for safety + }; + + println!("Streaming protocol: {}", protocol); + + // Gets the primary monitor. + let primary_monitor = Monitor::primary().expect("There is no primary monitor"); + let monitor_name = primary_monitor.name().expect("Failed to get monitor name"); + + // Create an atomic boolean flag to signal the capture to stop. + let stop_flag = Arc::new(AtomicBool::new(false)); + + // Set up a Ctrl+C handler to gracefully stop the capture. + { + let stop_flag = stop_flag.clone(); + ctrlc::set_handler(move || { + stop_flag.store(true, Ordering::SeqCst); + }) + .expect("Failed to set Ctrl-C handler"); + } + + let settings = Settings::new( + // The item to capture. + primary_monitor, + // The cursor capture settings. + CursorCaptureSettings::Default, + // The draw border settings. + DrawBorderSettings::Default, + // The secondary window settings. + SecondaryWindowSettings::Default, + // The minimum update interval. + MinimumUpdateIntervalSettings::Default, + // The dirty region settings. + DirtyRegionSettings::Default, + // The desired color format for the captured frame. + ColorFormat::Bgra8, + // The flags to pass to the `new` function of the handler. + (stop_flag, protocol), + ); + + // Starts the capture and takes control of the current thread. + // The errors from the handler trait will end up here. + StreamingCapture::start(settings).expect("Screen capture failed"); +} \ No newline at end of file diff --git a/src/encoder.rs b/src/encoder.rs index edbb664..3b26c78 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -24,12 +24,68 @@ use windows::Storage::Streams::{ Buffer, DataReader, IRandomAccessStream, InMemoryRandomAccessStream, InputStreamOptions, }; use windows::Storage::{FileAccessMode, StorageFile}; -use windows::core::{HSTRING, Interface}; +use windows::core::{Array, HSTRING, Interface}; use crate::d3d11::SendDirectX; use crate::frame::{Frame, ImageFormat}; use crate::settings::ColorFormat; +/// Represents an encoded video frame with metadata +#[derive(Debug, Clone)] +pub struct EncodedFrame { + /// The encoded frame data + pub data: Vec, + /// The timestamp of the frame in 100-nanosecond units + pub timestamp: i64, + /// The frame type (keyframe, delta frame, etc.) + pub frame_type: FrameType, + /// The width of the original frame + pub width: u32, + /// The height of the original frame + pub height: u32, +} + +/// Represents the type of encoded frame +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FrameType { + /// Key frame (I-frame) - contains complete frame data + KeyFrame, + /// Delta frame (P-frame) - contains only changes from previous frame + DeltaFrame, + /// Bidirectional frame (B-frame) - depends on both past and future frames + BidirectionalFrame, +} + +/// Represents an encoded audio frame with metadata +#[derive(Debug, Clone)] +pub struct EncodedAudioFrame { + /// The encoded audio data + pub data: Vec, + /// The timestamp of the audio frame in 100-nanosecond units + pub timestamp: i64, + /// The number of audio samples in this frame + pub sample_count: u32, +} + +/// Callback trait for handling encoded frames in real-time +pub trait FrameCallback: Send + Sync { + /// Called when a new encoded video frame is available + fn on_video_frame(&mut self, frame: EncodedFrame) -> Result<(), Box>; + + /// Called when a new encoded audio frame is available + fn on_audio_frame(&mut self, frame: EncodedAudioFrame) -> Result<(), Box>; + + /// Called when the stream starts + fn on_stream_start(&mut self) -> Result<(), Box> { + Ok(()) + } + + /// Called when the stream ends + fn on_stream_end(&mut self) -> Result<(), Box> { + Ok(()) + } +} + #[derive(thiserror::Error, Eq, PartialEq, Clone, Debug)] pub enum ImageEncoderError { #[error("This color format is not supported for saving as an image")] @@ -1208,3 +1264,487 @@ impl Drop for VideoEncoder { #[allow(clippy::non_send_fields_in_send_ty)] unsafe impl Send for VideoEncoder {} + +/// The `StreamingVideoEncoder` struct is used to encode video frames and stream them in real-time +/// without writing to files. It uses a callback mechanism to deliver encoded frames. +pub struct StreamingVideoEncoder { + first_timestamp: Option, + frame_sender: mpsc::Sender>, + audio_sender: mpsc::Sender>, + sample_requested: i64, + media_stream_source: MediaStreamSource, + starting: i64, + transcode_thread: Option>>, + frame_notify: Arc<(Mutex, Condvar)>, + audio_notify: Arc<(Mutex, Condvar)>, + error_notify: Arc, + is_video_disabled: bool, + is_audio_disabled: bool, + callback: Arc>>, + encoded_frame_sender: mpsc::Sender, + encoded_audio_sender: mpsc::Sender, +} + +impl StreamingVideoEncoder { + /// Creates a new `StreamingVideoEncoder` instance with the specified parameters. + /// + /// # Arguments + /// + /// * `video_settings` - The video encoder settings. + /// * `audio_settings` - The audio encoder settings. + /// * `container_settings` - The container settings. + /// * `callback` - The callback for handling encoded frames. + /// + /// # Returns + /// + /// Returns a `Result` containing the `StreamingVideoEncoder` instance if successful, or a + /// `VideoEncoderError` if an error occurs. + #[inline] + pub fn new( + video_settings: VideoSettingsBuilder, + audio_settings: AudioSettingsBuilder, + container_settings: ContainerSettingsBuilder, + callback: Box, + ) -> Result { + let media_encoding_profile = MediaEncodingProfile::new()?; + + let (video_encoding_properties, is_video_disabled) = video_settings.build()?; + media_encoding_profile.SetVideo(&video_encoding_properties)?; + let (audio_encoding_properties, is_audio_disabled) = audio_settings.build()?; + media_encoding_profile.SetAudio(&audio_encoding_properties)?; + let container_encoding_properties = container_settings.build()?; + media_encoding_profile.SetContainer(&container_encoding_properties)?; + + let video_encoding_properties = VideoEncodingProperties::CreateUncompressed( + &MediaEncodingSubtypes::Bgra8()?, + video_encoding_properties.Width()?, + video_encoding_properties.Height()?, + )?; + let video_stream_descriptor = VideoStreamDescriptor::Create(&video_encoding_properties)?; + + let audio_encoding_properties = AudioEncodingProperties::CreateAac( + audio_encoding_properties.SampleRate()?, + audio_encoding_properties.ChannelCount()?, + audio_encoding_properties.Bitrate()?, + )?; + let audio_stream_descriptor = AudioStreamDescriptor::Create(&audio_encoding_properties)?; + + let media_stream_source = MediaStreamSource::CreateFromDescriptors( + &video_stream_descriptor, + &audio_stream_descriptor, + )?; + media_stream_source.SetBufferTime(TimeSpan::default())?; + + let starting = media_stream_source.Starting(&TypedEventHandler::< + MediaStreamSource, + MediaStreamSourceStartingEventArgs, + >::new(move |_, stream_start| { + let stream_start = stream_start + .as_ref() + .expect("MediaStreamSource Starting parameter was None. This should not happen."); + + stream_start.Request()?.SetActualStartPosition(TimeSpan { Duration: 0 })?; + Ok(()) + }))?; + + let (frame_sender, frame_receiver) = + mpsc::channel::>(); + + let (audio_sender, audio_receiver) = + mpsc::channel::>(); + + let frame_notify = Arc::new((Mutex::new(false), Condvar::new())); + let audio_notify = Arc::new((Mutex::new(false), Condvar::new())); + + let (encoded_frame_sender, encoded_frame_receiver) = mpsc::channel::(); + let (encoded_audio_sender, encoded_audio_receiver) = mpsc::channel::(); + + let callback = Arc::new(Mutex::new(callback)); + + let sample_requested = media_stream_source.SampleRequested(&TypedEventHandler::< + MediaStreamSource, + MediaStreamSourceSampleRequestedEventArgs, + >::new({ + let frame_receiver = frame_receiver; + let frame_notify = frame_notify.clone(); + let audio_receiver = audio_receiver; + let audio_notify = audio_notify.clone(); + let callback = callback.clone(); + let encoded_frame_sender = encoded_frame_sender.clone(); + let encoded_audio_sender = encoded_audio_sender.clone(); + + move |_, sample_requested| { + let sample_requested = sample_requested.as_ref().expect( + "MediaStreamSource SampleRequested parameter was None. This should not happen.", + ); + + if sample_requested + .Request()? + .StreamDescriptor()? + .cast::() + .is_ok() + { + if is_audio_disabled { + sample_requested.Request()?.SetSample(None)?; + return Ok(()); + } + + let audio = match audio_receiver.recv() { + Ok(audio) => audio, + Err(e) => panic!("Failed to receive audio from the audio sender: {e}"), + }; + + match audio { + Some((source, timestamp)) => { + let sample = match source { + AudioEncoderSource::Buffer(buffer_data) => { + let buffer = buffer_data.0; + let buffer = + unsafe { slice::from_raw_parts(buffer.0, buffer_data.1) }; + let buffer = CryptographicBuffer::CreateFromByteArray(buffer)?; + MediaStreamSample::CreateFromBuffer(&buffer, timestamp)? + } + }; + + // Extract encoded audio data and send to callback + let audio_data = sample.Buffer()?; + let mut audio_array: Array = Array::new(); + CryptographicBuffer::CopyToByteArray(&audio_data, &mut audio_array)?; + + let encoded_audio = EncodedAudioFrame { + data: audio_array.as_slice().to_vec(), + timestamp: timestamp.Duration, + sample_count: audio_encoding_properties.SampleRate()? / 1000, // Approximate + }; + + // Send to callback + if let Err(e) = encoded_audio_sender.send(encoded_audio) { + eprintln!("Failed to send encoded audio frame: {}", e); + } + + sample_requested.Request()?.SetSample(&sample)?; + } + None => { + sample_requested.Request()?.SetSample(None)?; + } + } + + let (lock, cvar) = &*audio_notify; + *lock.lock() = true; + cvar.notify_one(); + } else { + if is_video_disabled { + sample_requested.Request()?.SetSample(None)?; + return Ok(()); + } + + let frame = match frame_receiver.recv() { + Ok(frame) => frame, + Err(e) => panic!("Failed to receive a frame from the frame sender: {e}"), + }; + + match frame { + Some((source, timestamp)) => { + let sample = match source { + VideoEncoderSource::DirectX(surface) => { + MediaStreamSample::CreateFromDirect3D11Surface( + &surface.0, timestamp, + )? + } + VideoEncoderSource::Buffer(buffer_data) => { + let buffer = buffer_data.0; + let buffer = + unsafe { slice::from_raw_parts(buffer.0, buffer_data.1) }; + let buffer = CryptographicBuffer::CreateFromByteArray(buffer)?; + MediaStreamSample::CreateFromBuffer(&buffer, timestamp)? + } + }; + + // Extract encoded video data and send to callback + let video_data = sample.Buffer()?; + let mut video_array: Array = Array::new(); + CryptographicBuffer::CopyToByteArray(&video_data, &mut video_array)?; + + let encoded_frame = EncodedFrame { + data: video_array.as_slice().to_vec(), + timestamp: timestamp.Duration, + frame_type: FrameType::DeltaFrame, // Default, could be determined from sample properties + width: video_encoding_properties.Width()?, + height: video_encoding_properties.Height()?, + }; + + // Send to callback + if let Err(e) = encoded_frame_sender.send(encoded_frame) { + eprintln!("Failed to send encoded video frame: {}", e); + } + + sample_requested.Request()?.SetSample(&sample)?; + } + None => { + sample_requested.Request()?.SetSample(None)?; + } + } + + let (lock, cvar) = &*frame_notify; + *lock.lock() = true; + cvar.notify_one(); + } + + Ok(()) + } + }))?; + + let media_transcoder = MediaTranscoder::new()?; + media_transcoder.SetHardwareAccelerationEnabled(true)?; + + // Create an in-memory stream for transcoding + let stream = InMemoryRandomAccessStream::new()?; + + let transcode = media_transcoder + .PrepareMediaStreamSourceTranscodeAsync( + &media_stream_source, + &stream, + &media_encoding_profile, + )? + .get()?; + + let error_notify = Arc::new(AtomicBool::new(false)); + let transcode_thread = thread::spawn({ + let error_notify = error_notify.clone(); + let callback = callback.clone(); + + move || -> Result<(), VideoEncoderError> { + // Notify callback that stream is starting + if let Err(e) = callback.lock().on_stream_start() { + eprintln!("Failed to notify stream start: {}", e); + } + + let result = transcode.TranscodeAsync(); + + if result.is_err() { + error_notify.store(true, atomic::Ordering::Relaxed); + } + + result?.get()?; + + // Notify callback that stream is ending + if let Err(e) = callback.lock().on_stream_end() { + eprintln!("Failed to notify stream end: {}", e); + } + + drop(media_transcoder); + + Ok(()) + } + }); + + // Start callback processing thread + let callback_thread = thread::spawn({ + let callback = callback.clone(); + move || { + while let Ok(encoded_frame) = encoded_frame_receiver.recv() { + if let Err(e) = callback.lock().on_video_frame(encoded_frame) { + eprintln!("Failed to process video frame: {}", e); + } + } + } + }); + + let audio_callback_thread = thread::spawn({ + let callback = callback.clone(); + move || { + while let Ok(encoded_audio) = encoded_audio_receiver.recv() { + if let Err(e) = callback.lock().on_audio_frame(encoded_audio) { + eprintln!("Failed to process audio frame: {}", e); + } + } + } + }); + + Ok(Self { + first_timestamp: None, + frame_sender, + audio_sender, + sample_requested, + media_stream_source, + starting, + transcode_thread: Some(transcode_thread), + frame_notify, + audio_notify, + error_notify, + is_video_disabled, + is_audio_disabled, + callback, + encoded_frame_sender, + encoded_audio_sender, + }) + } + + /// Sends a video frame to the streaming video encoder for encoding. + /// + /// # Arguments + /// + /// * `frame` - A mutable reference to the `Frame` to be encoded. + /// + /// # Returns + /// + /// Returns `Ok(())` if the frame is successfully sent for encoding, or a `VideoEncoderError` + /// if an error occurs. + #[inline] + pub fn send_frame(&mut self, frame: &mut Frame) -> Result<(), VideoEncoderError> { + if self.is_video_disabled { + return Err(VideoEncoderError::VideoDisabled); + } + + let timestamp = match self.first_timestamp { + Some(timestamp) => { + TimeSpan { Duration: frame.timestamp().Duration - timestamp.Duration } + } + None => { + let timestamp = frame.timestamp(); + self.first_timestamp = Some(timestamp); + TimeSpan { Duration: 0 } + } + }; + + self.frame_sender.send(Some(( + VideoEncoderSource::DirectX(SendDirectX::new(unsafe { + frame.as_raw_surface().clone() + })), + timestamp, + )))?; + + let (lock, cvar) = &*self.frame_notify; + let mut processed = lock.lock(); + if !*processed { + cvar.wait(&mut processed); + } + *processed = false; + drop(processed); + + if self.error_notify.load(atomic::Ordering::Relaxed) { + if let Some(transcode_thread) = self.transcode_thread.take() { + transcode_thread.join().expect("Failed to join transcode thread")?; + } + } + + Ok(()) + } + + /// Sends a video frame with audio to the streaming video encoder for encoding. + /// + /// # Arguments + /// + /// * `frame` - A mutable reference to the `Frame` to be encoded. + /// * `audio_buffer` - A reference to the audio byte slice to be encoded. + /// + /// # Returns + /// + /// Returns `Ok(())` if the frame is successfully sent for encoding, or a `VideoEncoderError` + /// if an error occurs. + #[inline] + pub fn send_frame_with_audio( + &mut self, + frame: &mut Frame, + audio_buffer: &[u8], + ) -> Result<(), VideoEncoderError> { + if self.is_video_disabled { + return Err(VideoEncoderError::VideoDisabled); + } + + if self.is_audio_disabled { + return Err(VideoEncoderError::AudioDisabled); + } + + let timestamp = match self.first_timestamp { + Some(timestamp) => { + TimeSpan { Duration: frame.timestamp().Duration - timestamp.Duration } + } + None => { + let timestamp = frame.timestamp(); + self.first_timestamp = Some(timestamp); + TimeSpan { Duration: 0 } + } + }; + + self.frame_sender.send(Some(( + VideoEncoderSource::DirectX(SendDirectX::new(unsafe { + frame.as_raw_surface().clone() + })), + timestamp, + )))?; + + let (lock, cvar) = &*self.frame_notify; + let mut processed = lock.lock(); + if !*processed { + cvar.wait(&mut processed); + } + *processed = false; + drop(processed); + + if self.error_notify.load(atomic::Ordering::Relaxed) { + if let Some(transcode_thread) = self.transcode_thread.take() { + transcode_thread.join().expect("Failed to join transcode thread")?; + } + } + + self.audio_sender.send(Some(( + AudioEncoderSource::Buffer(( + SendDirectX::new(audio_buffer.as_ptr()), + audio_buffer.len(), + )), + timestamp, + )))?; + + let (lock, cvar) = &*self.audio_notify; + let mut processed = lock.lock(); + if !*processed { + cvar.wait(&mut processed); + } + *processed = false; + drop(processed); + + if self.error_notify.load(atomic::Ordering::Relaxed) { + if let Some(transcode_thread) = self.transcode_thread.take() { + transcode_thread.join().expect("Failed to join transcode thread")?; + } + } + + Ok(()) + } + + /// Finishes encoding the video and performs any necessary cleanup. + /// + /// # Returns + /// + /// Returns `Ok(())` if the encoding is successfully finished, or a `VideoEncoderError` if an + /// error occurs. + #[inline] + pub fn finish(mut self) -> Result<(), VideoEncoderError> { + self.frame_sender.send(None)?; + self.audio_sender.send(None)?; + + if let Some(transcode_thread) = self.transcode_thread.take() { + transcode_thread.join().expect("Failed to join transcode thread")?; + } + + self.media_stream_source.RemoveStarting(self.starting)?; + self.media_stream_source.RemoveSampleRequested(self.sample_requested)?; + + Ok(()) + } +} + +impl Drop for StreamingVideoEncoder { + #[inline] + fn drop(&mut self) { + let _ = self.frame_sender.send(None); + + if let Some(transcode_thread) = self.transcode_thread.take() { + let _ = transcode_thread.join(); + } + } +} + +#[allow(clippy::non_send_fields_in_send_ty)] +unsafe impl Send for StreamingVideoEncoder {} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index f793fbc..bd60e3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -167,3 +167,6 @@ pub mod monitor; pub mod settings; /// Contains the functionality for working with windows and capturing specific windows. pub mod window; + +/// Contains network transmission utilities for streaming encoded frames. +pub mod network; diff --git a/src/network.rs b/src/network.rs new file mode 100644 index 0000000..c8bd600 --- /dev/null +++ b/src/network.rs @@ -0,0 +1,385 @@ +use std::collections::VecDeque; +use std::net::{TcpListener, TcpStream, UdpSocket}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; +use std::io::Write; + +use crate::encoder::{EncodedFrame, EncodedAudioFrame, FrameCallback}; + +/// Network transmission protocols +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Protocol { + /// TCP protocol for reliable transmission + Tcp, + /// UDP protocol for fast transmission with potential packet loss + Udp, + /// WebRTC protocol for real-time communication + WebRtc, + /// RTMP protocol for streaming + Rtmp, +} + +/// Configuration for network transmission +#[derive(Debug, Clone)] +pub struct NetworkConfig { + /// The protocol to use for transmission + pub protocol: Protocol, + /// The target address (e.g., "127.0.0.1:8080") + pub address: String, + /// Maximum frame buffer size + pub max_buffer_size: usize, + /// Frame rate for transmission + pub frame_rate: u32, + /// Quality settings + pub quality: Quality, +} + +/// Quality settings for network transmission +#[derive(Debug, Clone)] +pub struct Quality { + /// Video bitrate in bits per second + pub video_bitrate: u32, + /// Audio bitrate in bits per second + pub audio_bitrate: u32, + /// Maximum frame size in bytes + pub max_frame_size: usize, +} + +impl Default for Quality { + fn default() -> Self { + Self { + video_bitrate: 2_000_000, // 2 Mbps + audio_bitrate: 128_000, // 128 kbps + max_frame_size: 1024 * 1024, // 1 MB + } + } +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + protocol: Protocol::Tcp, + address: "127.0.0.1:8080".to_string(), + max_buffer_size: 100, + frame_rate: 30, + quality: Quality::default(), + } + } +} + +/// A simple TCP server for streaming encoded frames +pub struct TcpStreamServer { + listener: TcpListener, + clients: Arc>>, + frame_buffer: Arc>>, + audio_buffer: Arc>>, + config: NetworkConfig, +} + +impl TcpStreamServer { + /// Creates a new TCP streaming server + /// + /// # Arguments + /// + /// * `config` - The network configuration + /// + /// # Returns + /// + /// Returns a `Result` containing the server instance if successful + pub fn new(config: NetworkConfig) -> Result> { + let listener = TcpListener::bind(&config.address)?; + println!("TCP streaming server started on {}", config.address); + + Ok(Self { + listener, + clients: Arc::new(Mutex::new(Vec::new())), + frame_buffer: Arc::new(Mutex::new(VecDeque::new())), + audio_buffer: Arc::new(Mutex::new(VecDeque::new())), + config, + }) + } + + /// Starts the server and begins accepting connections + pub fn start(&self) -> Result<(), Box> { + let clients = self.clients.clone(); + let listener = self.listener.try_clone()?; + + // Accept connections in a separate thread + thread::spawn(move || { + for stream in listener.incoming() { + match stream { + Ok(stream) => { + println!("New client connected"); + if let Ok(mut clients) = clients.lock() { + clients.push(stream); + } + } + Err(e) => eprintln!("Failed to accept connection: {}", e), + } + } + }); + + Ok(()) + } + + /// Broadcasts a frame to all connected clients + pub fn broadcast_frame(&self, frame: EncodedFrame) -> Result<(), Box> { + let mut clients = self.clients.lock().unwrap(); + let frame_data = self.serialize_frame(&frame)?; + + // Remove disconnected clients + clients.retain(|mut client| { + if let Err(_) = client.write_all(&frame_data) { + false + } else { + true + } + }); + + Ok(()) + } + + /// Serializes a frame for network transmission + fn serialize_frame(&self, frame: &EncodedFrame) -> Result, Box> { + use std::io::{Cursor, Write}; + use byteorder::{LittleEndian, WriteBytesExt}; + + let mut buffer = Vec::new(); + let mut cursor = Cursor::new(&mut buffer); + + // Write frame header + cursor.write_u32::(frame.data.len() as u32)?; + cursor.write_i64::(frame.timestamp)?; + cursor.write_u32::(frame.frame_type as u32)?; + cursor.write_u32::(frame.width)?; + cursor.write_u32::(frame.height)?; + + // Write frame data + cursor.write_all(&frame.data)?; + + Ok(buffer) + } +} + +/// A UDP streaming client for sending encoded frames +pub struct UdpStreamClient { + socket: UdpSocket, + config: NetworkConfig, + frame_count: u64, + last_frame_time: Instant, +} + +impl UdpStreamClient { + /// Creates a new UDP streaming client + /// + /// # Arguments + /// + /// * `config` - The network configuration + /// + /// # Returns + /// + /// Returns a `Result` containing the client instance if successful + pub fn new(config: NetworkConfig) -> Result> { + let socket = UdpSocket::bind("0.0.0.0:0")?; + socket.connect(&config.address)?; + println!("UDP streaming client connected to {}", config.address); + + Ok(Self { + socket, + config, + frame_count: 0, + last_frame_time: Instant::now(), + }) + } + + /// Sends a frame over UDP + pub fn send_frame(&mut self, frame: EncodedFrame) -> Result<(), Box> { + // Rate limiting + let now = Instant::now(); + let frame_interval = Duration::from_secs_f64(1.0 / self.config.frame_rate as f64); + + if now.duration_since(self.last_frame_time) < frame_interval { + return Ok(()); + } + + let frame_data = self.serialize_frame(&frame)?; + + if frame_data.len() > self.config.quality.max_frame_size { + eprintln!("Frame too large: {} bytes", frame_data.len()); + return Ok(()); + } + + self.socket.send(&frame_data)?; + self.frame_count += 1; + self.last_frame_time = now; + + Ok(()) + } + + /// Serializes a frame for UDP transmission + fn serialize_frame(&self, frame: &EncodedFrame) -> Result, Box> { + use std::io::{Cursor, Write}; + use byteorder::{LittleEndian, WriteBytesExt}; + + let mut buffer = Vec::new(); + let mut cursor = Cursor::new(&mut buffer); + + // Write frame header + cursor.write_u32::(frame.data.len() as u32)?; + cursor.write_i64::(frame.timestamp)?; + cursor.write_u32::(frame.frame_type as u32)?; + cursor.write_u32::(frame.width)?; + cursor.write_u32::(frame.height)?; + + // Write frame data + cursor.write_all(&frame.data)?; + + Ok(buffer) + } +} + +/// A network callback that implements FrameCallback for streaming +pub struct NetworkCallback { + tcp_server: Option, + udp_client: Option, + config: NetworkConfig, +} + +impl NetworkCallback { + /// Creates a new network callback + /// + /// # Arguments + /// + /// * `config` - The network configuration + /// + /// # Returns + /// + /// Returns a `Result` containing the callback instance if successful + pub fn new(config: NetworkConfig) -> Result> { + let tcp_server = if config.protocol == Protocol::Tcp { + Some(TcpStreamServer::new(config.clone())?) + } else { + None + }; + + let udp_client = if config.protocol == Protocol::Udp { + Some(UdpStreamClient::new(config.clone())?) + } else { + None + }; + + Ok(Self { + tcp_server, + udp_client, + config, + }) + } + + /// Starts the network services + pub fn start(&self) -> Result<(), Box> { + if let Some(ref server) = self.tcp_server { + server.start()?; + } + Ok(()) + } +} + +impl FrameCallback for NetworkCallback { + fn on_video_frame(&mut self, frame: EncodedFrame) -> Result<(), Box> { + match self.config.protocol { + Protocol::Tcp => { + if let Some(ref server) = self.tcp_server { + server.broadcast_frame(frame)?; + } + } + Protocol::Udp => { + if let Some(ref mut client) = self.udp_client { + client.send_frame(frame)?; + } + } + Protocol::WebRtc => { + // WebRTC implementation would go here + eprintln!("WebRTC protocol not yet implemented"); + } + Protocol::Rtmp => { + // RTMP implementation would go here + eprintln!("RTMP protocol not yet implemented"); + } + } + Ok(()) + } + + fn on_audio_frame(&mut self, frame: EncodedAudioFrame) -> Result<(), Box> { + // Audio frame handling would go here + // For now, we'll just log it + println!("Received audio frame: {} bytes, timestamp: {}", frame.data.len(), frame.timestamp); + Ok(()) + } + + fn on_stream_start(&mut self) -> Result<(), Box> { + println!("Network stream started"); + self.start()?; + Ok(()) + } + + fn on_stream_end(&mut self) -> Result<(), Box> { + println!("Network stream ended"); + Ok(()) + } +} + +/// A simple file callback for debugging and testing +pub struct FileCallback { + output_dir: String, + frame_count: u64, +} + +impl FileCallback { + /// Creates a new file callback for saving encoded frames + /// + /// # Arguments + /// + /// * `output_dir` - The directory to save frames to + /// + /// # Returns + /// + /// Returns a new FileCallback instance + pub fn new(output_dir: String) -> Self { + std::fs::create_dir_all(&output_dir).unwrap_or_else(|_| { + eprintln!("Failed to create output directory: {}", output_dir); + }); + + Self { + output_dir, + frame_count: 0, + } + } +} + +impl FrameCallback for FileCallback { + fn on_video_frame(&mut self, frame: EncodedFrame) -> Result<(), Box> { + let filename = format!("{}/frame_{:06}.h264", self.output_dir, self.frame_count); + std::fs::write(&filename, &frame.data)?; + self.frame_count += 1; + println!("Saved frame {} to {}", self.frame_count, filename); + Ok(()) + } + + fn on_audio_frame(&mut self, frame: EncodedAudioFrame) -> Result<(), Box> { + let filename = format!("{}/audio_{:06}.aac", self.output_dir, self.frame_count); + std::fs::write(&filename, &frame.data)?; + println!("Saved audio frame to {}", filename); + Ok(()) + } + + fn on_stream_start(&mut self) -> Result<(), Box> { + println!("File stream started, saving to: {}", self.output_dir); + Ok(()) + } + + fn on_stream_end(&mut self) -> Result<(), Box> { + println!("File stream ended, saved {} frames", self.frame_count); + Ok(()) + } +} \ No newline at end of file diff --git a/test_implementation b/test_implementation new file mode 100755 index 0000000..28e6261 Binary files /dev/null and b/test_implementation differ diff --git a/test_windows_integration b/test_windows_integration new file mode 100755 index 0000000..16681b5 Binary files /dev/null and b/test_windows_integration differ