Skip to content

task + async misc #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ jobs:
working-directory: zephyr-lang-rust
run: |
# Note that the above build doesn't set Zephyrbase, so we'll need to do that here.
west build -t rustdoc -b qemu_cortex_m3 docgen
west build -b qemu_cortex_m3 docgen
west build -t rustdoc
mkdir rustdocs
mv build/rust/target/thumbv7m-none-eabi/doc rustdocs/nostd

Expand Down
74 changes: 31 additions & 43 deletions samples/bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
extern crate alloc;

use core::mem;
use core::pin::Pin;

use alloc::collections::vec_deque::VecDeque;
use alloc::vec;
use executor::AsyncTests;
use static_cell::StaticCell;
use zephyr::kobj_define;
use zephyr::define_work_queue;
use zephyr::raw::k_yield;
use zephyr::sync::{PinWeak, SpinMutex};
use zephyr::sync::{SpinMutex, Weak};
use zephyr::time::NoWait;
use zephyr::work::{SimpleAction, Work, WorkQueueBuilder};
use zephyr::work::{ArcWork, SimpleAction, Work};
use zephyr::{
kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC,
printkln,
Expand Down Expand Up @@ -80,7 +79,7 @@ extern "C" fn rust_main() {
spin_bench();
sem_bench();

let simple = Simple::new(tester.workq.clone());
let simple = Simple::new(tester.workq);
let mut num = 6;
while num < 250 {
simple.run(num, TOTAL_ITERS / num);
Expand Down Expand Up @@ -147,7 +146,7 @@ struct ThreadTests {
high_command: Sender<Command>,

/// A work queue for the main runners.
workq: Arc<WorkQueue>,
workq: &'static WorkQueue,

/// The test also all return their result to the main. The threads Send, the main running
/// receives.
Expand All @@ -163,15 +162,7 @@ impl ThreadTests {
let (low_send, low_recv) = bounded(1);
let (high_send, high_recv) = bounded(1);

let workq = Arc::new(
WorkQueueBuilder::new()
.set_priority(5)
.set_no_yield(true)
.start(WORK_STACK.init_once(()).unwrap()),
);

// Leak the workqueue so it doesn't get dropped.
let _ = Arc::into_raw(workq.clone());
let workq = WORKQ.start();

let mut result = Self {
sems: &SEMS,
Expand Down Expand Up @@ -581,32 +572,32 @@ enum TestResult {

/// The Simple test just does a ping pong test using manually submitted work.
struct Simple {
workq: Arc<WorkQueue>,
workq: &'static WorkQueue,
}

impl Simple {
fn new(workq: Arc<WorkQueue>) -> Self {
fn new(workq: &'static WorkQueue) -> Self {
Self { workq }
}

fn run(&self, workers: usize, iterations: usize) {
// printkln!("Running Simple");
let main = Work::new(SimpleMain::new(workers * iterations, self.workq.clone()));
let main = Work::new_arc(SimpleMain::new(workers * iterations, self.workq));

let children: VecDeque<_> = (0..workers)
.map(|n| Work::new(SimpleWorker::new(main.clone(), self.workq.clone(), n)))
.map(|n| Work::new_arc(SimpleWorker::new(main.0.clone(), self.workq, n)).0)
.collect();

let mut locked = main.action().locked.lock().unwrap();
let mut locked = main.0.action().locked.lock().unwrap();
let _ = mem::replace(&mut locked.works, children);
drop(locked);

let start = now();
// Fire off main, which will run everything.
Work::submit_to_queue(main.clone(), &self.workq).unwrap();
main.clone().submit_to_queue(self.workq).unwrap();

// And wait for the completion semaphore.
main.action().done.take(Forever).unwrap();
main.0.action().done.take(Forever).unwrap();

let stop = now();
let time = (stop - start) as f64 / (CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC as f64) * 1000.0;
Expand All @@ -627,40 +618,39 @@ impl Simple {
);

// Before we go away, make sure that there aren't any leaked workers.
/*
let mut locked = main.action().locked.lock().unwrap();
let mut locked = main.0.action().locked.lock().unwrap();
while let Some(other) = locked.works.pop_front() {
// Portable atomic's Arc seems to be a problem here.
let other = unsafe { Pin::into_inner_unchecked(other) };
assert_eq!(Arc::strong_count(&other), 1);
// printkln!("Child: {} refs", Arc::strong_count(&other));
}
*/
drop(locked);

// And nothing has leaked main, either.
assert_eq!(Arc::strong_count(&main.0), 1);
}
}

/// A simple worker. When run, it submits the main worker to do the next work.
struct SimpleWorker {
main: PinWeak<Work<SimpleMain>>,
workq: Arc<WorkQueue>,
main: Weak<Work<SimpleMain>>,
workq: &'static WorkQueue,
_id: usize,
}

impl SimpleWorker {
fn new(main: Pin<Arc<Work<SimpleMain>>>, workq: Arc<WorkQueue>, id: usize) -> Self {
fn new(main: Arc<Work<SimpleMain>>, workq: &'static WorkQueue, id: usize) -> Self {
Self {
main: PinWeak::downgrade(main),
main: Arc::downgrade(&main),
workq,
_id: id,
}
}
}

impl SimpleAction for SimpleWorker {
fn act(self: Pin<&Self>) {
fn act(self: &Self) {
// Each time we are run, fire the main worker back up.
let main = self.main.upgrade().unwrap();
Work::submit_to_queue(main.clone(), &self.workq).unwrap();
let main = ArcWork(self.main.upgrade().unwrap());
main.clone().submit_to_queue(self.workq).unwrap();
}
}

Expand All @@ -670,12 +660,12 @@ impl SimpleAction for SimpleWorker {
struct SimpleMain {
/// All of the work items.
locked: SpinMutex<Locked>,
workq: Arc<WorkQueue>,
workq: &'static WorkQueue,
done: Semaphore,
}

impl SimpleAction for SimpleMain {
fn act(self: Pin<&Self>) {
fn act(self: &Self) {
// Each time, take a worker from the queue, and submit it.
let mut lock = self.locked.lock().unwrap();

Expand All @@ -690,12 +680,12 @@ impl SimpleAction for SimpleMain {
lock.count -= 1;
drop(lock);

Work::submit_to_queue(worker.clone(), &self.workq).unwrap();
ArcWork(worker.clone()).submit_to_queue(self.workq).unwrap();
}
}

impl SimpleMain {
fn new(count: usize, workq: Arc<WorkQueue>) -> Self {
fn new(count: usize, workq: &'static WorkQueue) -> Self {
Self {
locked: SpinMutex::new(Locked::new(count)),
done: Semaphore::new(0, 1),
Expand All @@ -705,7 +695,7 @@ impl SimpleMain {
}

struct Locked {
works: VecDeque<Pin<Arc<Work<SimpleWorker>>>>,
works: VecDeque<Arc<Work<SimpleWorker>>>,
count: usize,
}

Expand Down Expand Up @@ -812,9 +802,7 @@ impl<'a> BenchTimer<'a> {
}
}

kobj_define! {
static WORK_STACK: ThreadStack<WORK_STACK_SIZE>;
}
define_work_queue!(WORKQ, WORK_STACK_SIZE, priority = 5, no_yield = true);

static SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS];
static BACK_SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS];
9 changes: 4 additions & 5 deletions samples/embassy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@ features = [

[dependencies.embassy-futures]
version = "0.1.1"
# path = "../../embassy/embassy-futures"

[dependencies.embassy-sync]
version = "0.6.2"
# path = "../../embassy/embassy-sync"

[dependencies.embassy-time]
version = "0.4.0"
# path = "../../embassy/embassy-time"
# This is board specific.
features = ["tick-hz-10_000"]
# For real builds, you should figure out your target's tick rate and set the appropriate feature,
# like in these examples. Without this, embassy-time will assume a 1Mhz tick rate, and every time
# operation will involve a conversion.
#features = ["tick-hz-10_000"]

[dependencies.critical-section]
version = "1.2"
Expand Down
6 changes: 6 additions & 0 deletions tests/drivers/gpio-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Async gpio

This demo makes use of the GPIO `wait_for_high()` and `wait_for_low()` async operations.

Unfortunately, not all GPIO controllers support level triggered interrupts. Notably, most of the
stm32 line does not support level triggered interrupts.
6 changes: 6 additions & 0 deletions zephyr-build/src/devicetree/augment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ pub trait Augment {
/// The default implementation checks if this node matches and calls a generator if it does, or
/// does nothing if not.
fn augment(&self, node: &Node, tree: &DeviceTree) -> TokenStream {
// If there is a status field present, and it is not set to "okay", don't augment this node.
if let Some(status) = node.get_single_string("status") {
if status != "okay" {
return TokenStream::new();
}
}
if self.is_compatible(node) {
self.generate(node, tree)
} else {
Expand Down
2 changes: 1 addition & 1 deletion zephyr-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ descriptions = "Macros for managing tasks and work queues in Zephyr"
proc-macro = true

[dependencies]
syn = { version = "2.0.85", features = ["full", "visit"] }
syn = { version = "2.0.79", features = ["full", "visit"] }
quote = "1.0.37"
proc-macro2 = "1.0.86"
darling = "0.20.1"
2 changes: 1 addition & 1 deletion zephyr/src/embassy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
//!
//! ## Caveats
//!
//! [`Semaphore::take_async`]: crate::sys::sync::Semaphore::take_async
//! The executor currently doesn't support async waits on Zephyr primitives, such as Semaphore.

#[cfg(feature = "time-driver")]
mod time_driver;
Expand Down
60 changes: 53 additions & 7 deletions zephyr/src/embassy/time_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ use embassy_time_queue_utils::Queue;
use crate::raw::{k_timeout_t, k_timer, k_timer_init, k_timer_start};
use crate::sys::K_FOREVER;

/// The time base configured into Zephyr.
pub const ZEPHYR_TICK_HZ: u64 = crate::time::SYS_FREQUENCY as u64;

/// The configured Embassy time tick rate.
pub const EMBASSY_TICK_HZ: u64 = embassy_time_driver::TICK_HZ;

/// When the zephyr and embassy rates differ, use this intermediate type. This can be selected by
/// feature. At the worst case, with Embassy's tick at 1Mhz, and Zephyr's at 50k, it is a little
/// over 11 years. Higher of either will reduce that further. But, 128-bit arithmetic is fairly
/// inefficient.
type InterTime = u128;

embassy_time_driver::time_driver_impl!(static DRIVER: ZephyrTimeDriver = ZephyrTimeDriver {
queue: Mutex::new(RefCell::new(Queue::new())),
timer: Mutex::new(RefCell::new(unsafe { mem::zeroed() })),
Expand Down Expand Up @@ -63,20 +75,54 @@ impl ZTimer {
}
}

/// Convert from a zephyr tick count, to an embassy tick count.
///
/// This is done using an intermediate type defined above.
/// This conversion truncates.
fn zephyr_to_embassy(ticks: u64) -> u64 {
if ZEPHYR_TICK_HZ == EMBASSY_TICK_HZ {
// This should happen at compile time.
return ticks;
}

// Otherwise do the intermediate conversion.
let prod = (ticks as InterTime) * (EMBASSY_TICK_HZ as InterTime);
(prod / (ZEPHYR_TICK_HZ as InterTime)) as u64
}

/// Convert from an embassy tick count to a zephyr.
///
/// This conversion use ceil so that values are always large enough.
fn embassy_to_zephyr(ticks: u64) -> u64 {
if ZEPHYR_TICK_HZ == EMBASSY_TICK_HZ {
return ticks;
}

let prod = (ticks as InterTime) * (ZEPHYR_TICK_HZ as InterTime);
prod.div_ceil(EMBASSY_TICK_HZ as InterTime) as u64
}

fn zephyr_now() -> u64 {
crate::time::now().ticks()
}

impl Driver for ZephyrTimeDriver {
fn now(&self) -> u64 {
crate::time::now().ticks()
zephyr_to_embassy(zephyr_now())
}

fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
critical_section::with(|cs| {
let mut queue = self.queue.borrow(cs).borrow_mut();
let mut timer = self.timer.borrow(cs).borrow_mut();

// All times below are in Zephyr units.
let at = embassy_to_zephyr(at);

if queue.schedule_wake(at, waker) {
let mut next = queue.next_expiration(self.now());
while !timer.set_alarm(next, self.now()) {
next = queue.next_expiration(self.now());
let mut next = queue.next_expiration(zephyr_now());
while !timer.set_alarm(next, zephyr_now()) {
next = queue.next_expiration(zephyr_now());
}
}
})
Expand All @@ -89,9 +135,9 @@ impl ZephyrTimeDriver {
let mut queue = self.queue.borrow(cs).borrow_mut();
let mut timer = self.timer.borrow(cs).borrow_mut();

let mut next = queue.next_expiration(self.now());
while !timer.set_alarm(next, self.now()) {
next = queue.next_expiration(self.now());
let mut next = queue.next_expiration(zephyr_now());
while !timer.set_alarm(next, zephyr_now()) {
next = queue.next_expiration(zephyr_now());
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion zephyr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! level operation that is still quite useful in regular code.
//! - [`timer`]: Rust interfaces to Zephyr timers. These timers can be used either by registering a
//! callback, or polled or waited for for an elapsed time.
//! - [`work`]: Zephyr work queues for Rust. The [`work::WorkQueueBuilder`] and resulting
//! - [`work`]: Zephyr work queues for Rust. The [`define_work_queue`] macro and resulting
//! [`work::WorkQueue`] allow creation of Zephyr work queues to be used from Rust. The
//! [`work::Work`] item had an action that will be invoked by the work queue, and can be manually
//! submitted when needed.
Expand Down
15 changes: 15 additions & 0 deletions zephyr/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ mod pinweak {
.upgrade()
.map(|arc| unsafe { Pin::new_unchecked(arc) })
}

/// Equivalent to [`Weak::strong_count`]
pub fn strong_count(&self) -> usize {
self.0.strong_count()
}

/// Equivalent to [`Weak::weak_count`]
pub fn weak_count(&self) -> usize {
self.0.weak_count()
}

/// Equivalent to [`Weak::ptr_eq`]
pub fn ptr_eq(&self, other: &Self) -> bool {
self.0.ptr_eq(&other.0)
}
}
}

Expand Down
Loading
Loading