Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions cfu-service/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use embedded_cfu_protocol::protocol_definitions::*;
use embedded_services::{
cfu::{
self,
component::{CfuDevice, InternalResponseData, RequestData},
component::{CfuDevice, InternalResponse, InternalResponseData, RequestData},
},
error, intrusive_list, trace,
error, intrusive_list,
ipc::deferred::RequestId,
trace,
};

/// Internal state for [`Buffer`]
Expand Down Expand Up @@ -148,24 +150,37 @@ impl<'a> Buffer<'a> {
state.pending_response = None;
}

let mut have_id = false;
let mut req_id: RequestId = RequestId::default();
if state.component_busy {
// Buffer the content if the component is busy
// If the buffer is full, this will block until space is available
trace!("Component is busy, buffering content");
self.buffer_sender.send(*content).await;
} else {
// Buffered component can accept new content, send it
if let Err(e) = cfu::send_device_request(self.buffered_id, RequestData::GiveContent(*content)).await {
error!(
"Failed to send content to buffered component {:?}: {:?}",
self.buffered_id, e
);
return Self::create_content_rejection(content.header.sequence_num);
match cfu::send_device_request(self.buffered_id, RequestData::GiveContent(*content)).await {
Ok(id) => {
have_id = true;
req_id = id;
}
Err(e) => {
error!(
"Failed to send content to buffered component {:?}: {:?}",
self.buffered_id, e
);
return Self::create_content_rejection(content.header.sequence_num);
}
}
}

// Wait for a response from the buffered component
match with_timeout(self.config.buffer_timeout, cfu::wait_device_response(self.buffered_id)).await {
let fut = if have_id {
cfu::wait_device_response(self.buffered_id, Some(req_id))
} else {
cfu::wait_device_response(self.buffered_id, None)
};
match with_timeout(self.config.buffer_timeout, fut).await {
Err(TimeoutError) => {
// Component didn't respond in time
state.component_busy = true;
Expand Down Expand Up @@ -195,7 +210,7 @@ impl<'a> Buffer<'a> {
Ok(response) => {
trace!("Buffered component responded");
state.component_busy = false;
match response {
match response.unwrap() {
Ok(InternalResponseData::ContentResponse(mut response)) => {
response.sequence = content.header.sequence_num;
InternalResponseData::ContentResponse(response)
Expand Down Expand Up @@ -233,9 +248,9 @@ impl<'a> Buffer<'a> {
// Wait for a buffered content request
self.wait_buffered_content(is_busy),
// Wait for a request from the host
self.cfu_device.wait_request(),
self.cfu_device.receive(),
// Wait for response from the buffered component
cfu::wait_device_response(self.buffered_id),
cfu::wait_device_response(self.buffered_id, None),
)
.await
{
Expand All @@ -244,11 +259,11 @@ impl<'a> Buffer<'a> {
Event::BufferedContent(content)
}
Either3::Second(request) => {
trace!("Request received: {:?}", request);
Event::CfuRequest(request)
trace!("Request received: {:?}", request.command);
Event::CfuRequest(request.command.data)
}
Either3::Third(response) => {
if let Ok(response) = response {
if let Ok(Ok(response)) = response {
trace!("Response received: {:?}", response);
Event::ComponentResponse(response)
} else {
Expand Down Expand Up @@ -315,8 +330,8 @@ impl<'a> Buffer<'a> {
}

/// Send a response to the CFU message
pub async fn send_response(&self, response: InternalResponseData) {
self.cfu_device.send_response(response).await;
pub async fn send_response(&self, response: InternalResponse, request_id: RequestId) {
self.cfu_device.send_response(request_id, response).await
}

/// Register the buffer with all relevant services
Expand Down
28 changes: 14 additions & 14 deletions cfu-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,16 @@ impl CfuClient {
tp: comms::Endpoint::uninit(comms::EndpointID::Internal(comms::Internal::Nonvol)),
})
}
pub async fn process_request(&self) -> Result<(), CfuError> {
let request = self.context.wait_request().await;
pub async fn process_request(&self) -> InternalResponse {
let request = self.context.receive().await;
//let device = self.context.get_device(request.id).await?;
let comp = request.id;
let comp = request.command.id;

match request.data {
match request.command.data {
RequestData::FwVersionRequest => {
info!("Received FwVersionRequest, comp {}", comp);
if let Ok(device) = self.context.get_device(comp).await {
let resp = device
.execute_device_request(request.data)
.await
.map_err(CfuError::ProtocolError)?;
let resp = device.execute_device_request(request.command).await?;

// TODO replace with signal to component to get its own fw version
//cfu::send_request(comp, RequestData::FwVersionRequest).await?;
Expand All @@ -54,15 +51,18 @@ impl CfuClient {
return Err(CfuError::ProtocolError(CfuProtocolError::BadResponse));
}
}
self.context.send_response(resp).await;
return Ok(());
return Ok(resp);
}
Err(CfuError::InvalidComponent)
}
RequestData::GiveContent(_content_cmd) => Ok(()),
RequestData::GiveOffer(_offer_cmd) => Ok(()),
RequestData::PrepareComponentForUpdate => Ok(()),
RequestData::FinalizeUpdate => Ok(()),
RequestData::GiveContent(_content_cmd) => {
Ok(InternalResponseData::ContentResponse(FwUpdateContentResponse::default()))
}
RequestData::GiveOffer(_offer_cmd) => {
Ok(InternalResponseData::OfferResponse(FwUpdateOfferResponse::default()))
}
RequestData::PrepareComponentForUpdate => Ok(InternalResponseData::ComponentPrepared),
RequestData::FinalizeUpdate => Ok(InternalResponseData::ComponentPrepared),
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions cfu-service/src/splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use embedded_cfu_protocol::protocol_definitions::*;
use embedded_services::{
cfu::{
self,
component::{CfuDevice, InternalResponseData, RequestData},
component::{CfuDevice, CfuRequest, InternalResponseData, RequestData},
},
error, intrusive_list, trace,
error, intrusive_list,
ipc::deferred,
trace,
};

/// Trait containing customization functionality for [`Splitter`]
Expand Down Expand Up @@ -156,8 +158,8 @@ impl<'a, C: Customization> Splitter<'a, C> {
}

/// Wait for a CFU message
pub async fn wait_request(&self) -> RequestData {
self.cfu_device.wait_request().await
pub async fn wait_request(&self) -> CfuRequest {
self.cfu_device.receive().await.command
}

/// Process a CFU message and produce a response
Expand Down Expand Up @@ -187,8 +189,8 @@ impl<'a, C: Customization> Splitter<'a, C> {
}

/// Send a response to the CFU message
pub async fn send_response(&self, response: InternalResponseData) {
self.cfu_device.send_response(response).await;
pub async fn send_response(&self, response: InternalResponseData, request_id: deferred::RequestId) {
self.cfu_device.send_response(request_id, Ok(response)).await;
}

pub async fn register(&'static self) -> Result<(), intrusive_list::Error> {
Expand Down
83 changes: 52 additions & 31 deletions embedded-service/src/cfu/component.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! Device struct and methods for component communication
use core::future::Future;

#[cfg(feature = "defmt")]
use defmt::error;

use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::channel::Channel;
use embassy_sync::mutex::Mutex;
use embedded_cfu_protocol::components::{CfuComponentInfo, CfuComponentStorage, CfuComponentTraits};
use embedded_cfu_protocol::protocol_definitions::*;
Expand All @@ -12,6 +14,7 @@ use heapless::Vec;
use super::CfuError;
use crate::cfu::route_request;
use crate::intrusive_list;
use crate::ipc::deferred;

/// Component internal update state
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -57,6 +60,16 @@ impl Default for InternalState {
}
}

/// Request to the cfu service
#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Request {
/// Component that sent this request
pub id: ComponentId,
/// Request data
pub data: RequestData,
}

/// CFU Request types and necessary data
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
Expand Down Expand Up @@ -93,17 +106,16 @@ pub enum InternalResponseData {
ComponentPrepared,
}

/// Channel size for device requests
pub const DEVICE_CHANNEL_SIZE: usize = 1;
/// Wrapper type to make code cleaner
pub type InternalResponse = Result<InternalResponseData, CfuError>;

/// CfuDevice struct
/// Can be inserted in an intrusive-list+
pub struct CfuDevice {
node: intrusive_list::Node,
component_id: ComponentId,
state: Mutex<NoopRawMutex, InternalState>,
request: Channel<NoopRawMutex, RequestData, DEVICE_CHANNEL_SIZE>,
response: Channel<NoopRawMutex, InternalResponseData, DEVICE_CHANNEL_SIZE>,
request: deferred::Channel<NoopRawMutex, Request, InternalResponse>,
}

impl intrusive_list::NodeContainer for CfuDevice {
Expand All @@ -124,15 +136,17 @@ impl CfuDeviceContainer for CfuDevice {
}
}

/// Convenience type for CFU request
pub type CfuRequest<'a> = deferred::Request<'a, NoopRawMutex, Request, InternalResponse>;

impl CfuDevice {
/// Constructor for CfuDevice
pub fn new(component_id: ComponentId) -> Self {
Self {
node: intrusive_list::Node::uninit(),
component_id,
state: Mutex::new(InternalState::default()),
request: Channel::new(),
response: Channel::new(),
request: deferred::Channel::new(),
}
}
/// Getter for component id
Expand All @@ -145,30 +159,34 @@ impl CfuDevice {
*self.state.lock().await
}

/// Send a request to this device
pub async fn send_request(&self, request: RequestData) {
self.request.send(request).await;
}

/// Sends a request to this device and returns a response
pub async fn execute_device_request(&self, request: RequestData) -> Result<InternalResponseData, CfuProtocolError> {
self.send_request(request).await;
Ok(self.wait_response().await)
pub async fn execute_device_request(&self, request: Request) -> InternalResponse {
self.request.execute(request).await
}

/// Wait for a request
pub async fn wait_request(&self) -> RequestData {
/// Send a response
pub async fn receive(&self) -> CfuRequest {
self.request.receive().await
}

/// Send a response
pub async fn send_response(&self, response: InternalResponseData) {
self.response.send(response).await;
/// Send a request to the device, returns a request ID for later tracking if needed
pub async fn send_request(&self, request: Request) -> deferred::RequestId {
self.request.send_command(request).await
}

/// Wait for a response for a specific request ID
pub async fn wait_response(&self, id: deferred::RequestId) -> InternalResponse {
self.request.wait_for_response(id).await
}

/// Waits for a response
pub async fn wait_response(&self) -> InternalResponseData {
self.response.receive().await
/// Wait for the next response, regardless of request ID
pub async fn wait_any_response(&self) -> InternalResponse {
self.request.wait_for_next_response().await
}

/// Send a response
pub async fn send_response(&self, id: deferred::RequestId, response: InternalResponse) {
self.request.send_response(response, id).await
}
}

Expand Down Expand Up @@ -213,7 +231,8 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
}
/// wait for a request and process it
pub async fn process_request(&self) -> Result<(), CfuError> {
match self.device.wait_request().await {
let request = self.device.receive().await;
match request.command.data {
RequestData::FwVersionRequest => {
let fwv = self.get_fw_version().await.map_err(CfuError::ProtocolError)?;
let dev_inf = FwVerComponentInfo::new(fwv, self.get_component_id());
Expand Down Expand Up @@ -250,9 +269,7 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
),
component_info: comp_info,
};
self.device
.send_response(InternalResponseData::FwVersionResponse(resp))
.await;
request.respond(Ok(InternalResponseData::FwVersionResponse(resp)));
}
RequestData::PrepareComponentForUpdate => {
self.storage_prepare()
Expand All @@ -263,9 +280,7 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
// accept any and all offers regardless of what version it is
if buf.component_info.component_id == self.get_component_id() {
let resp = FwUpdateOfferResponse::new_accept(HostToken::Driver);
self.device
.send_response(InternalResponseData::OfferResponse(resp))
.await;
request.respond(Ok(InternalResponseData::OfferResponse(resp)));
}
}
RequestData::GiveContent(buf) => {
Expand All @@ -276,8 +291,14 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
.cfu_write(Some(offset), &buf.data)
.await
.map_err(|e| CfuError::ProtocolError(CfuProtocolError::WriterError(e)))?;
request.respond(Ok(InternalResponseData::ContentResponse(FwUpdateContentResponse::new(
buf.header.sequence_num,
CfuUpdateContentResponseStatus::Success,
))));
}
RequestData::FinalizeUpdate => {
request.respond(Ok(InternalResponseData::ComponentPrepared));
}
RequestData::FinalizeUpdate => {}
}
Ok(())
}
Expand Down
Loading