Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions examples/std/src/bin/direct-async-calls/io_expander.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::{future::Future, pin::pin};

use embassy_futures::select::select_slice;
use embassy_sync::{
blocking_mutex::raw::NoopRawMutex,
mutex::Mutex,
pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult},
};
use log::{info, warn};

pub trait EventReceiver {
fn wait_next(&mut self) -> impl Future<Output = InterruptEvent>;
}

pub trait EventSender {
fn on_interrupt(&self, pin: u8, level: bool);
}

/// IO expander device trait
pub trait Device {
fn name(&self) -> &str;

fn set_level(&mut self, pin: u8, value: bool) -> impl Future<Output = ()>;
}

pub struct Sender<'channel> {
Copy link
Contributor Author

@RobertZ2011 RobertZ2011 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The corresponding traits could be made more generic to make these implementations generic over the message type.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that but to what end? The generic service impl would just no-op? The only purpose I could foresee would be vendor-defined messaging, but in that case, should the vendor simply add a side channel to their own types that implements the sender and receiver traits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receiver and sender traits in this example are specialized versions of generic EventReceiver<T> and EventSender<T>. This would allow doing blanket implementations like impl<T> EventReceiver<T> for DynPublisher<T> to simplify development. I would also argue that the message type is the real API here. So different services/devices can share EventReceiver<ConcreteType> instead of creating their own incompatible traits. Though I think we'd also want an Event trait, even if it's just a marker trait.

publisher: DynImmediatePublisher<'channel, InterruptEvent>,
}

impl<'channel> Sender<'channel> {
pub fn new(publisher: DynImmediatePublisher<'channel, InterruptEvent>) -> Self {
Self { publisher }
}
}

pub struct Receiver<'channel> {
subscriber: DynSubscriber<'channel, InterruptEvent>,
}

impl<'channel> Receiver<'channel> {
pub fn new(subscriber: DynSubscriber<'channel, InterruptEvent>) -> Self {
Self { subscriber }
}
}

impl EventReceiver for Receiver<'_> {
async fn wait_next(&mut self) -> InterruptEvent {
loop {
match self.subscriber.next_message().await {
WaitResult::Message(msg) => return msg,
WaitResult::Lagged(n) => {
warn!("Receiver lagged by {n} messages");
}
}
}
}
}

impl EventSender for Sender<'_> {
fn on_interrupt(&self, pin: u8, level: bool) {
self.publisher.publish_immediate(InterruptEvent { pin, level });
}
}

#[derive(Copy, Clone)]
pub struct InterruptEvent {
pub pin: u8,
pub level: bool,
}

const MAX_SUPPORTED_DEVICES: usize = 4;

pub struct ServiceImplementation<'storage, 'device, D: Device, R: EventReceiver> {
devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)],
}

impl<'storage, 'device, D: Device, R: EventReceiver> ServiceImplementation<'storage, 'device, D, R> {
pub fn new(devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)]) -> Self {
Self { devices }
}

pub async fn wait_next(&mut self) -> (&'device Mutex<NoopRawMutex, D>, InterruptEvent) {
let futures =
heapless::Vec::<_, MAX_SUPPORTED_DEVICES>::from_iter(self.devices.iter_mut().map(|(r, _)| r.wait_next()));

let (event, index) = select_slice(pin!(futures)).await;
(self.devices[index].1, event)
}

pub async fn process_event(&mut self, event: (&'device Mutex<NoopRawMutex, D>, InterruptEvent)) {
let mut device = event.0.lock().await;
info!(
"Interrupt from device {}: pin {}, level {}",
device.name(),
event.1.pin,
event.1.level
);
if event.1.pin == 0 {
info!("Asserting INT_OUT pin");
}

device.set_level(1, event.1.level).await;
}
}
124 changes: 124 additions & 0 deletions examples/std/src/bin/direct-async-calls/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use embassy_executor::Executor;
use embassy_futures::join::join3;
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, mutex::Mutex, pubsub::PubSubChannel};
use embassy_time::Timer;
use static_cell::StaticCell;

use crate::{io_expander::EventSender as _, power::EventSender as _};

mod io_expander;
mod power;

pub struct Device<I: io_expander::EventSender, P: power::EventSender> {
name: &'static str,
io_sender: I,
power_sender: P,
}

pub struct DeviceContainer<I: io_expander::EventSender, P: power::EventSender> {
inner: Mutex<NoopRawMutex, Device<I, P>>,
}

impl<I: io_expander::EventSender, P: power::EventSender> DeviceContainer<I, P> {
pub fn new(name: &'static str, io_sender: I, power_sender: P) -> Self {
Self {
inner: Mutex::new(Device {
name,
io_sender,
power_sender,
}),
}
}
}

impl<I: io_expander::EventSender, P: power::EventSender> power::Device for Device<I, P> {
fn name(&self) -> &str {
self.name
}

async fn accept_contract(&mut self) {
log::info!("{}: Contract accepted", self.name);
}

async fn disconnect(&mut self) {
log::info!("{}: Device disconnected", self.name);
}
}

impl<I: io_expander::EventSender, P: power::EventSender> io_expander::Device for Device<I, P> {
fn name(&self) -> &str {
self.name
}

async fn set_level(&mut self, pin: u8, value: bool) {
log::info!("{}: Set pin {} to level {}", self.name, pin, value);
}
}

#[embassy_executor::task]
async fn run() {
let power_channel0: PubSubChannel<NoopRawMutex, power::Event, 4, 1, 1> = PubSubChannel::new();
let power_receiver0 = power::Receiver::new(power_channel0.dyn_subscriber().unwrap());
let power_sender0 = power::Sender::new(power_channel0.dyn_immediate_publisher());

let io_channel0: PubSubChannel<NoopRawMutex, io_expander::InterruptEvent, 4, 1, 1> = PubSubChannel::new();
let io_receiver0 = io_expander::Receiver::new(io_channel0.dyn_subscriber().unwrap());
let io_sender0 = io_expander::Sender::new(io_channel0.dyn_immediate_publisher());
let device0 = DeviceContainer::new("Device0", io_sender0, power_sender0);

let power_channel1: PubSubChannel<NoopRawMutex, power::Event, 4, 1, 1> = PubSubChannel::new();
let power_receiver1 = power::Receiver::new(power_channel1.dyn_subscriber().unwrap());
let power_sender1 = power::Sender::new(power_channel1.dyn_immediate_publisher());

let io_channel1: PubSubChannel<NoopRawMutex, io_expander::InterruptEvent, 4, 1, 1> = PubSubChannel::new();
let io_receiver1 = io_expander::Receiver::new(io_channel1.dyn_subscriber().unwrap());
let io_sender1 = io_expander::Sender::new(io_channel1.dyn_immediate_publisher());
let device1 = DeviceContainer::new("Device1", io_sender1, power_sender1);

let mut power_devices = [(power_receiver0, &device0.inner), (power_receiver1, &device1.inner)];
let mut power_service = power::ServiceImplementation::new(&mut power_devices);

let mut io_devices = [(io_receiver0, &device0.inner), (io_receiver1, &device1.inner)];
let mut io_service = io_expander::ServiceImplementation::new(&mut io_devices);

join3(
async {
loop {
let event = io_service.wait_next().await;
io_service.process_event(event).await;
}
},
async {
loop {
let event = power_service.wait_next().await;
power_service.process_event(event).await;
}
},
async {
device0.inner.lock().await.power_sender.on_plug(1000);
device0.inner.lock().await.io_sender.on_interrupt(0, true);
Timer::after_millis(500).await;

device1.inner.lock().await.power_sender.on_plug(2000);
Timer::after_millis(500).await;

device0.inner.lock().await.power_sender.on_unplug();
Timer::after_millis(500).await;

device1.inner.lock().await.io_sender.on_interrupt(0, true);
device1.inner.lock().await.power_sender.on_unplug();
Timer::after_millis(500).await;
},
)
.await;
}

pub fn main() {
env_logger::builder().filter_level(log::LevelFilter::Trace).init();

static EXECUTOR: StaticCell<Executor> = StaticCell::new();
let executor = EXECUTOR.init(Executor::new());
executor.run(|spawner| {
spawner.must_spawn(run());
});
}
152 changes: 152 additions & 0 deletions examples/std/src/bin/direct-async-calls/power.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::{future::Future, pin::pin};

use embassy_futures::select::select_slice;
use embassy_sync::{
blocking_mutex::raw::NoopRawMutex,
mutex::Mutex,
pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult},
};

use log::{info, warn};

/// Receive events from a [`Device`]
pub trait EventReceiver {
fn wait_next(&mut self) -> impl Future<Output = Event>;
}

pub trait EventSender {
fn on_plug(&self, power_mw: i32);
fn on_unplug(&self);
}

/// Power device trait
pub trait Device {
fn name(&self) -> &str;

fn accept_contract(&mut self) -> impl Future<Output = ()>;
fn disconnect(&mut self) -> impl Future<Output = ()>;
}

#[derive(Copy, Clone, Debug)]
pub struct NewContract {
pub power_mw: i32,
}

#[derive(Copy, Clone, Debug)]
pub enum Event {
Plug(NewContract),
Unplug,
}

struct CurrentContract<'device, D: Device> {
power_mw: i32,
connected_device: &'device Mutex<NoopRawMutex, D>,
}

pub struct Sender<'channel> {
publisher: DynImmediatePublisher<'channel, Event>,
}

impl<'channel> Sender<'channel> {
pub fn new(publisher: DynImmediatePublisher<'channel, Event>) -> Self {
Self { publisher }
}
}

pub struct Receiver<'channel> {
subscriber: DynSubscriber<'channel, Event>,
}

impl<'channel> Receiver<'channel> {
pub fn new(subscriber: DynSubscriber<'channel, Event>) -> Self {
Self { subscriber }
}
}

impl EventReceiver for Receiver<'_> {
async fn wait_next(&mut self) -> Event {
loop {
match self.subscriber.next_message().await {
WaitResult::Message(msg) => return msg,
WaitResult::Lagged(n) => {
warn!("Receiver lagged by {n} messages");
}
}
}
}
}

impl EventSender for Sender<'_> {
fn on_plug(&self, power_mw: i32) {
self.publisher.publish_immediate(Event::Plug(NewContract { power_mw }));
}

fn on_unplug(&self) {
self.publisher.publish_immediate(Event::Unplug);
}
}
Comment on lines +46 to +87
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you actually need a new type, just to implement the trait on the inner itself

Suggested change
pub struct Sender<'channel> {
publisher: DynImmediatePublisher<'channel, Event>,
}
impl<'channel> Sender<'channel> {
pub fn new(publisher: DynImmediatePublisher<'channel, Event>) -> Self {
Self { publisher }
}
}
pub struct Receiver<'channel> {
subscriber: DynSubscriber<'channel, Event>,
}
impl<'channel> Receiver<'channel> {
pub fn new(subscriber: DynSubscriber<'channel, Event>) -> Self {
Self { subscriber }
}
}
impl EventReceiver for Receiver<'_> {
async fn wait_next(&mut self) -> Event {
loop {
match self.subscriber.next_message().await {
WaitResult::Message(msg) => return msg,
WaitResult::Lagged(n) => {
warn!("Receiver lagged by {n} messages");
}
}
}
}
}
impl EventSender for Sender<'_> {
fn on_plug(&self, power_mw: i32) {
self.publisher.publish_immediate(Event::Plug(NewContract { power_mw }));
}
fn on_unplug(&self) {
self.publisher.publish_immediate(Event::Unplug);
}
}
impl EventReceiver for &DynSubscriber<'_, Event> {
async fn wait_next(&mut self) -> Event {
loop {
match self.next_message().await {
WaitResult::Message(msg) => return msg,
WaitResult::Lagged(n) => {
warn!("Receiver lagged by {n} messages");
}
}
}
}
}
impl EventSender for DynImmediatePublisher<'_, Event> {
fn on_plug(&self, power_mw: i32) {
self.publish_immediate(Event::Plug(NewContract { power_mw }));
}
fn on_unplug(&self) {
self.publish_immediate(Event::Unplug);
}
}


pub struct ServiceImplementation<'storage, 'device, D: Device, R: EventReceiver> {
current_connection: Option<CurrentContract<'device, D>>,
devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)],
}

const MAX_SUPPORTED_DEVICES: usize = 4;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that ServiceImplementation should be generic on const N: usize


impl<'storage, 'device, D: Device, R: EventReceiver> ServiceImplementation<'storage, 'device, D, R> {
pub fn new(devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)]) -> Self {
Self {
devices,
current_connection: None,
}
}

pub async fn wait_next(&mut self) -> (&'device Mutex<NoopRawMutex, D>, Event) {
let futures =
heapless::Vec::<_, MAX_SUPPORTED_DEVICES>::from_iter(self.devices.iter_mut().map(|(r, _)| r.wait_next()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures can end up arbitrarily large so I'm worried that this could blow the stack and we wouldn't know until runtime. Thoughts on possibly introducing a macro/wrapper type that uses size_of to check the total size of the vec at compile time?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a different risk than when calling any other generic future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, though I think the risk is more acute here because of the multiplication. But I guess that's a broader discussion to have.


let (event, index) = select_slice(pin!(futures)).await;
(self.devices[index].1, event)
}

pub async fn process_event(&mut self, event: (&'device Mutex<NoopRawMutex, D>, Event)) {
let mut event_device = event.0.lock().await;
match event.1 {
Event::Plug(data) => {
info!("{} connected with contract: {:?}", event_device.name(), data.power_mw);
if let Some(current) = &self.current_connection {
if data.power_mw > current.power_mw {
info!("New contract has higher power, switching");
current.connected_device.lock().await.disconnect().await;

self.current_connection = Some(CurrentContract {
power_mw: data.power_mw,
connected_device: event.0,
});
event_device.accept_contract().await;
} else {
info!("New contract has lower or equal power, not switching");
}
} else {
info!("No current contract, accepting new one");
self.current_connection = Some(CurrentContract {
power_mw: data.power_mw,
connected_device: event.0,
});
event_device.accept_contract().await;
}
}
Event::Unplug => {
info!("{} disconnected", event_device.name());
if let Some(current) = &self.current_connection {
if std::ptr::eq(current.connected_device, event.0) {
info!("Current device disconnected");
self.current_connection = None;
} else {
info!("A non-connected device unplugged, nothing to do");
}
}
}
}
}
}