diff --git a/examples/std/src/bin/direct-async-calls/io_expander.rs b/examples/std/src/bin/direct-async-calls/io_expander.rs new file mode 100644 index 00000000..2abee43e --- /dev/null +++ b/examples/std/src/bin/direct-async-calls/io_expander.rs @@ -0,0 +1,104 @@ +use std::{future::Future, pin::pin}; + +use embassy_futures::select::select_slice; +use embassy_sync::{ + blocking_mutex::raw::NoopRawMutex, + mutex::Mutex, + pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult}, +}; +use log::{info, warn}; + +pub trait EventReceiver { + fn wait_next(&mut self) -> impl Future; +} + +pub trait EventSender { + fn on_interrupt(&self, pin: u8, level: bool); +} + +/// IO expander device trait +pub trait Device { + fn name(&self) -> &str; + + fn set_level(&mut self, pin: u8, value: bool) -> impl Future; +} + +pub struct Sender<'channel> { + publisher: DynImmediatePublisher<'channel, InterruptEvent>, +} + +impl<'channel> Sender<'channel> { + pub fn new(publisher: DynImmediatePublisher<'channel, InterruptEvent>) -> Self { + Self { publisher } + } +} + +pub struct Receiver<'channel> { + subscriber: DynSubscriber<'channel, InterruptEvent>, +} + +impl<'channel> Receiver<'channel> { + pub fn new(subscriber: DynSubscriber<'channel, InterruptEvent>) -> Self { + Self { subscriber } + } +} + +impl EventReceiver for Receiver<'_> { + async fn wait_next(&mut self) -> InterruptEvent { + loop { + match self.subscriber.next_message().await { + WaitResult::Message(msg) => return msg, + WaitResult::Lagged(n) => { + warn!("Receiver lagged by {n} messages"); + } + } + } + } +} + +impl EventSender for Sender<'_> { + fn on_interrupt(&self, pin: u8, level: bool) { + self.publisher.publish_immediate(InterruptEvent { pin, level }); + } +} + +#[derive(Copy, Clone)] +pub struct InterruptEvent { + pub pin: u8, + pub level: bool, +} + +const MAX_SUPPORTED_DEVICES: usize = 4; + +pub struct ServiceImplementation<'storage, 'device, D: Device, R: EventReceiver> { + devices: &'storage mut [(R, &'device Mutex)], +} + +impl<'storage, 'device, D: Device, R: EventReceiver> ServiceImplementation<'storage, 'device, D, R> { + pub fn new(devices: &'storage mut [(R, &'device Mutex)]) -> Self { + Self { devices } + } + + pub async fn wait_next(&mut self) -> (&'device Mutex, InterruptEvent) { + let futures = + heapless::Vec::<_, MAX_SUPPORTED_DEVICES>::from_iter(self.devices.iter_mut().map(|(r, _)| r.wait_next())); + + let (event, index) = select_slice(pin!(futures)).await; + (self.devices[index].1, event) + } + + pub async fn process_event(&mut self, event: (&'device Mutex, InterruptEvent)) { + let mut device = event.0.lock().await; + info!( + "Interrupt from device {}: pin {}, level {}", + device.name(), + event.1.pin, + event.1.level + ); + if event.1.pin == 0 { + info!("Asserting INT_OUT pin"); + } + + device.set_level(1, event.1.level).await; + } +} diff --git a/examples/std/src/bin/direct-async-calls/main.rs b/examples/std/src/bin/direct-async-calls/main.rs new file mode 100644 index 00000000..bf91f561 --- /dev/null +++ b/examples/std/src/bin/direct-async-calls/main.rs @@ -0,0 +1,124 @@ +use embassy_executor::Executor; +use embassy_futures::join::join3; +use embassy_sync::{blocking_mutex::raw::NoopRawMutex, mutex::Mutex, pubsub::PubSubChannel}; +use embassy_time::Timer; +use static_cell::StaticCell; + +use crate::{io_expander::EventSender as _, power::EventSender as _}; + +mod io_expander; +mod power; + +pub struct Device { + name: &'static str, + io_sender: I, + power_sender: P, +} + +pub struct DeviceContainer { + inner: Mutex>, +} + +impl DeviceContainer { + pub fn new(name: &'static str, io_sender: I, power_sender: P) -> Self { + Self { + inner: Mutex::new(Device { + name, + io_sender, + power_sender, + }), + } + } +} + +impl power::Device for Device { + fn name(&self) -> &str { + self.name + } + + async fn accept_contract(&mut self) { + log::info!("{}: Contract accepted", self.name); + } + + async fn disconnect(&mut self) { + log::info!("{}: Device disconnected", self.name); + } +} + +impl io_expander::Device for Device { + fn name(&self) -> &str { + self.name + } + + async fn set_level(&mut self, pin: u8, value: bool) { + log::info!("{}: Set pin {} to level {}", self.name, pin, value); + } +} + +#[embassy_executor::task] +async fn run() { + let power_channel0: PubSubChannel = PubSubChannel::new(); + let power_receiver0 = power::Receiver::new(power_channel0.dyn_subscriber().unwrap()); + let power_sender0 = power::Sender::new(power_channel0.dyn_immediate_publisher()); + + let io_channel0: PubSubChannel = PubSubChannel::new(); + let io_receiver0 = io_expander::Receiver::new(io_channel0.dyn_subscriber().unwrap()); + let io_sender0 = io_expander::Sender::new(io_channel0.dyn_immediate_publisher()); + let device0 = DeviceContainer::new("Device0", io_sender0, power_sender0); + + let power_channel1: PubSubChannel = PubSubChannel::new(); + let power_receiver1 = power::Receiver::new(power_channel1.dyn_subscriber().unwrap()); + let power_sender1 = power::Sender::new(power_channel1.dyn_immediate_publisher()); + + let io_channel1: PubSubChannel = PubSubChannel::new(); + let io_receiver1 = io_expander::Receiver::new(io_channel1.dyn_subscriber().unwrap()); + let io_sender1 = io_expander::Sender::new(io_channel1.dyn_immediate_publisher()); + let device1 = DeviceContainer::new("Device1", io_sender1, power_sender1); + + let mut power_devices = [(power_receiver0, &device0.inner), (power_receiver1, &device1.inner)]; + let mut power_service = power::ServiceImplementation::new(&mut power_devices); + + let mut io_devices = [(io_receiver0, &device0.inner), (io_receiver1, &device1.inner)]; + let mut io_service = io_expander::ServiceImplementation::new(&mut io_devices); + + join3( + async { + loop { + let event = io_service.wait_next().await; + io_service.process_event(event).await; + } + }, + async { + loop { + let event = power_service.wait_next().await; + power_service.process_event(event).await; + } + }, + async { + device0.inner.lock().await.power_sender.on_plug(1000); + device0.inner.lock().await.io_sender.on_interrupt(0, true); + Timer::after_millis(500).await; + + device1.inner.lock().await.power_sender.on_plug(2000); + Timer::after_millis(500).await; + + device0.inner.lock().await.power_sender.on_unplug(); + Timer::after_millis(500).await; + + device1.inner.lock().await.io_sender.on_interrupt(0, true); + device1.inner.lock().await.power_sender.on_unplug(); + Timer::after_millis(500).await; + }, + ) + .await; +} + +pub fn main() { + env_logger::builder().filter_level(log::LevelFilter::Trace).init(); + + static EXECUTOR: StaticCell = StaticCell::new(); + let executor = EXECUTOR.init(Executor::new()); + executor.run(|spawner| { + spawner.must_spawn(run()); + }); +} diff --git a/examples/std/src/bin/direct-async-calls/power.rs b/examples/std/src/bin/direct-async-calls/power.rs new file mode 100644 index 00000000..10a1404c --- /dev/null +++ b/examples/std/src/bin/direct-async-calls/power.rs @@ -0,0 +1,152 @@ +use std::{future::Future, pin::pin}; + +use embassy_futures::select::select_slice; +use embassy_sync::{ + blocking_mutex::raw::NoopRawMutex, + mutex::Mutex, + pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult}, +}; + +use log::{info, warn}; + +/// Receive events from a [`Device`] +pub trait EventReceiver { + fn wait_next(&mut self) -> impl Future; +} + +pub trait EventSender { + fn on_plug(&self, power_mw: i32); + fn on_unplug(&self); +} + +/// Power device trait +pub trait Device { + fn name(&self) -> &str; + + fn accept_contract(&mut self) -> impl Future; + fn disconnect(&mut self) -> impl Future; +} + +#[derive(Copy, Clone, Debug)] +pub struct NewContract { + pub power_mw: i32, +} + +#[derive(Copy, Clone, Debug)] +pub enum Event { + Plug(NewContract), + Unplug, +} + +struct CurrentContract<'device, D: Device> { + power_mw: i32, + connected_device: &'device Mutex, +} + +pub struct Sender<'channel> { + publisher: DynImmediatePublisher<'channel, Event>, +} + +impl<'channel> Sender<'channel> { + pub fn new(publisher: DynImmediatePublisher<'channel, Event>) -> Self { + Self { publisher } + } +} + +pub struct Receiver<'channel> { + subscriber: DynSubscriber<'channel, Event>, +} + +impl<'channel> Receiver<'channel> { + pub fn new(subscriber: DynSubscriber<'channel, Event>) -> Self { + Self { subscriber } + } +} + +impl EventReceiver for Receiver<'_> { + async fn wait_next(&mut self) -> Event { + loop { + match self.subscriber.next_message().await { + WaitResult::Message(msg) => return msg, + WaitResult::Lagged(n) => { + warn!("Receiver lagged by {n} messages"); + } + } + } + } +} + +impl EventSender for Sender<'_> { + fn on_plug(&self, power_mw: i32) { + self.publisher.publish_immediate(Event::Plug(NewContract { power_mw })); + } + + fn on_unplug(&self) { + self.publisher.publish_immediate(Event::Unplug); + } +} + +pub struct ServiceImplementation<'storage, 'device, D: Device, R: EventReceiver> { + current_connection: Option>, + devices: &'storage mut [(R, &'device Mutex)], +} + +const MAX_SUPPORTED_DEVICES: usize = 4; + +impl<'storage, 'device, D: Device, R: EventReceiver> ServiceImplementation<'storage, 'device, D, R> { + pub fn new(devices: &'storage mut [(R, &'device Mutex)]) -> Self { + Self { + devices, + current_connection: None, + } + } + + pub async fn wait_next(&mut self) -> (&'device Mutex, Event) { + let futures = + heapless::Vec::<_, MAX_SUPPORTED_DEVICES>::from_iter(self.devices.iter_mut().map(|(r, _)| r.wait_next())); + + let (event, index) = select_slice(pin!(futures)).await; + (self.devices[index].1, event) + } + + pub async fn process_event(&mut self, event: (&'device Mutex, Event)) { + let mut event_device = event.0.lock().await; + match event.1 { + Event::Plug(data) => { + info!("{} connected with contract: {:?}", event_device.name(), data.power_mw); + if let Some(current) = &self.current_connection { + if data.power_mw > current.power_mw { + info!("New contract has higher power, switching"); + current.connected_device.lock().await.disconnect().await; + + self.current_connection = Some(CurrentContract { + power_mw: data.power_mw, + connected_device: event.0, + }); + event_device.accept_contract().await; + } else { + info!("New contract has lower or equal power, not switching"); + } + } else { + info!("No current contract, accepting new one"); + self.current_connection = Some(CurrentContract { + power_mw: data.power_mw, + connected_device: event.0, + }); + event_device.accept_contract().await; + } + } + Event::Unplug => { + info!("{} disconnected", event_device.name()); + if let Some(current) = &self.current_connection { + if std::ptr::eq(current.connected_device, event.0) { + info!("Current device disconnected"); + self.current_connection = None; + } else { + info!("A non-connected device unplugged, nothing to do"); + } + } + } + } + } +}