diff --git a/Cargo.toml b/Cargo.toml index ae96dcb..7b6ce8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ rust-version = "1.76.0" default = ["http_server", "rand", "uuid"] hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"] http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"] +ingress_client = ["dep:reqwest", "restate-sdk-macros/ingress_client"] [dependencies] bytes = "1.6.1" @@ -22,6 +23,7 @@ hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful" pin-project-lite = "0.2" rand = { version = "0.8.5", optional = true } regress = "0.10" +reqwest = { version = "0.12", optional = true, features = ["json"] } restate-sdk-macros = { version = "0.3.2", path = "macros" } restate-sdk-shared-core = "0.1.0" serde = "1.0" diff --git a/macros/Cargo.toml b/macros/Cargo.toml index 5f125c8..3aa09d7 100644 --- a/macros/Cargo.toml +++ b/macros/Cargo.toml @@ -9,6 +9,9 @@ repository = "https://github.com/restatedev/sdk-rust" [lib] proc-macro = true +[features] +ingress_client = [] + [dependencies] proc-macro2 = "1.0" quote = "1.0" diff --git a/macros/src/gen.rs b/macros/src/gen.rs index a882b4d..5002f32 100644 --- a/macros/src/gen.rs +++ b/macros/src/gen.rs @@ -1,7 +1,7 @@ use crate::ast::{Handler, Object, Service, ServiceInner, ServiceType, Workflow}; use proc_macro2::TokenStream as TokenStream2; use proc_macro2::{Ident, Literal}; -use quote::{format_ident, quote, ToTokens}; +use quote::{format_ident, quote, quote_spanned, ToTokens}; use syn::{Attribute, PatType, Visibility}; pub(crate) struct ServiceGenerator<'a> { @@ -9,6 +9,10 @@ pub(crate) struct ServiceGenerator<'a> { pub(crate) restate_name: &'a str, pub(crate) service_ident: &'a Ident, pub(crate) client_ident: Ident, + #[cfg(feature = "ingress_client")] + pub(crate) ingress_ident: Ident, + #[cfg(feature = "ingress_client")] + pub(crate) handle_ident: Ident, pub(crate) serve_ident: Ident, pub(crate) vis: &'a Visibility, pub(crate) attrs: &'a [Attribute], @@ -22,6 +26,10 @@ impl<'a> ServiceGenerator<'a> { restate_name: &s.restate_name, service_ident: &s.ident, client_ident: format_ident!("{}Client", s.ident), + #[cfg(feature = "ingress_client")] + ingress_ident: format_ident!("{}Ingress", s.ident), + #[cfg(feature = "ingress_client")] + handle_ident: format_ident!("{}Handle", s.ident), serve_ident: format_ident!("Serve{}", s.ident), vis: &s.vis, attrs: &s.attrs, @@ -336,6 +344,271 @@ impl<'a> ServiceGenerator<'a> { } } } + + #[cfg(feature = "ingress_client")] + fn struct_ingress(&self) -> TokenStream2 { + let &Self { + vis, + ref ingress_ident, + ref service_ty, + service_ident, + .. + } = self; + + let key_field = match service_ty { + ServiceType::Service => quote! {}, + ServiceType::Object | ServiceType::Workflow => quote! { + key: String, + }, + }; + + let into_client_impl = match service_ty { + ServiceType::Service => { + quote! { + impl<'client> ::restate_sdk::ingress_client::IntoServiceIngress<'client> for #ingress_ident<'client> { + fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient) -> Self { + Self { client } + } + } + } + } + ServiceType::Object => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoObjectIngress<'client> for #ingress_ident<'client> { + fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self { + Self { client, key } + } + } + }, + ServiceType::Workflow => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoWorkflowIngress<'client> for #ingress_ident<'client> { + fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self { + Self { client, key } + } + } + }, + }; + + let doc_msg = format!( + "Struct exposing the ingress client to invoke [`{service_ident}`] without a context." + ); + quote! { + #[doc = #doc_msg] + #vis struct #ingress_ident<'client> { + client: &'client ::restate_sdk::ingress_client::IngressClient, + #key_field + } + + #into_client_impl + } + } + + #[cfg(feature = "ingress_client")] + fn impl_ingress(&self) -> TokenStream2 { + let &Self { + vis, + ref ingress_ident, + handlers, + restate_name, + service_ty, + .. + } = self; + + let service_literal = Literal::string(restate_name); + + let handlers_fns = handlers.iter().map(|handler| { + let handler_ident = &handler.ident; + let handler_literal = Literal::string(&handler.restate_name); + + let argument = match &handler.arg { + None => quote! {}, + Some(PatType { + ty, .. + }) => quote! { req: #ty } + }; + let argument_ty = match &handler.arg { + None => quote! { () }, + Some(PatType { + ty, .. + }) => quote! { #ty } + }; + let res_ty = &handler.output_ok; + let input = match &handler.arg { + None => quote! { () }, + Some(_) => quote! { req } + }; + let request_target = match service_ty { + ServiceType::Service => quote! { + ::restate_sdk::context::RequestTarget::service(#service_literal, #handler_literal) + }, + ServiceType::Object => quote! { + ::restate_sdk::context::RequestTarget::object(#service_literal, &self.key, #handler_literal) + }, + ServiceType::Workflow => quote! { + ::restate_sdk::context::RequestTarget::workflow(#service_literal, &self.key, #handler_literal) + } + }; + + quote! { + #vis fn #handler_ident(&self, #argument) -> ::restate_sdk::ingress_client::request::IngressRequest<'client, #argument_ty, #res_ty> { + self.client.request(#request_target, #input) + } + } + }); + + quote! { + impl<'client> #ingress_ident<'client> { + #( #handlers_fns )* + } + } + } + + #[cfg(feature = "ingress_client")] + fn struct_handle(&self) -> TokenStream2 { + let &Self { + vis, + ref service_ty, + service_ident, + restate_name, + ref handle_ident, + handlers, + .. + } = self; + + let service_literal = Literal::string(restate_name); + + let key_field = match service_ty { + ServiceType::Service | ServiceType::Workflow => quote! {}, + ServiceType::Object => quote! { + key: String, + }, + }; + + let into_client_impl = match service_ty { + ServiceType::Service => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoServiceHandle<'client> for #handle_ident<'client> { + fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient) -> Self { + Self { client } + } + } + }, + ServiceType::Object => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoObjectHandle<'client> for #handle_ident<'client> { + fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self { + Self { client, key } + } + } + }, + ServiceType::Workflow => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoWorkflowHandle<'client> for #handle_ident<'client> { + fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient, id: String) -> Self { + Self { result: client.handle(::restate_sdk::ingress_client::handle::HandleTarget::workflow(#service_literal, id)) } + } + } + }, + }; + + let doc_msg = + format!("Struct exposing the handle to retrieve the result of [`{service_ident}`]."); + match service_ty { + ServiceType::Service | ServiceType::Object => quote! { + #[doc = #doc_msg] + #vis struct #handle_ident<'client> { + client: &'client ::restate_sdk::ingress_client::IngressClient, + #key_field + } + + #into_client_impl + }, + ServiceType::Workflow => { + let Some(handler) = &handlers + .iter() + .find(|handler| handler.restate_name == "run") + else { + return quote_spanned! { + service_ident.span() => compile_error!("A workflow definition must contain a `run` handler"); + }; + }; + let res_ty = &handler.output_ok; + + quote! { + #[doc = #doc_msg] + #vis struct #handle_ident<'client> { + result: ::restate_sdk::ingress_client::handle::IngressHandle<'client, #res_ty>, + } + + #into_client_impl + } + } + } + } + + #[cfg(feature = "ingress_client")] + fn impl_handle(&self) -> TokenStream2 { + let &Self { + vis, + ref handle_ident, + handlers, + restate_name, + service_ty, + service_ident, + .. + } = self; + + let service_literal = Literal::string(restate_name); + + if let ServiceType::Service | ServiceType::Object = service_ty { + let handlers_fns = handlers.iter().map(|handler| { + let handler_ident = &handler.ident; + let handler_literal = Literal::string(&handler.restate_name); + let res_ty = &handler.output_ok; + + let handle_target = match service_ty { + ServiceType::Service => quote! { + ::restate_sdk::ingress_client::handle::HandleTarget::service(#service_literal, #handler_literal, idempotency_key) + }, + ServiceType::Object => quote! { + ::restate_sdk::ingress_client::handle::HandleTarget::object(#service_literal, &self.key, #handler_literal, idempotency_key) + }, + ServiceType::Workflow => quote! { + ::restate_sdk::ingress_client::handle::HandleTarget::workflow(#service_literal, &self.key, #handler_literal) + } + }; + quote! { + #vis fn #handler_ident(&self, idempotency_key: impl Into) -> ::restate_sdk::ingress_client::handle::IngressHandle<'client, #res_ty> { + self.client.handle(#handle_target) + } + } + }); + + quote! { + impl<'client> #handle_ident<'client> { + #( #handlers_fns )* + } + } + } else { + let Some(handler) = &handlers + .iter() + .find(|handler| handler.restate_name == "run") + else { + return quote_spanned! { + service_ident.span() => compile_error!("A workflow definition must contain a `run` handler"); + }; + }; + let res_ty = &handler.output_ok; + + quote! { + impl<'client> #handle_ident<'client> { + #vis async fn attach(self) -> Result<#res_ty, ::restate_sdk::ingress_client::internal::IngressClientError> { + self.result.attach().await + } + + #vis async fn output(self) -> Result<#res_ty, ::restate_sdk::ingress_client::internal::IngressClientError> { + self.result.output().await + } + } + } + } + } } impl<'a> ToTokens for ServiceGenerator<'a> { @@ -347,6 +620,14 @@ impl<'a> ToTokens for ServiceGenerator<'a> { self.impl_discoverable(), self.struct_client(), self.impl_client(), + #[cfg(feature = "ingress_client")] + self.struct_ingress(), + #[cfg(feature = "ingress_client")] + self.impl_ingress(), + #[cfg(feature = "ingress_client")] + self.struct_handle(), + #[cfg(feature = "ingress_client")] + self.impl_handle(), ]); } } diff --git a/src/ingress_client/awakeable.rs b/src/ingress_client/awakeable.rs new file mode 100644 index 0000000..fa812ae --- /dev/null +++ b/src/ingress_client/awakeable.rs @@ -0,0 +1,49 @@ +use std::time::Duration; + +use crate::serde::Serialize; + +use super::{internal::IngressClientError, IngressInternal}; + +pub struct IngressAwakeable<'a> { + inner: &'a IngressInternal, + key: String, + opts: IngressAwakeableOptions, +} + +#[derive(Default, Clone)] +pub(super) struct IngressAwakeableOptions { + pub(super) timeout: Option, +} + +impl<'a> IngressAwakeable<'a> { + pub(super) fn new(inner: &'a IngressInternal, key: impl Into) -> Self { + Self { + inner, + key: key.into(), + opts: Default::default(), + } + } + + /// Set the timeout for the request. + pub fn timeout(mut self, value: Duration) -> Self { + self.opts.timeout = Some(value); + self + } + + /// Resolve the awakeable with a payload + pub async fn resolve( + self, + payload: T, + ) -> Result<(), IngressClientError> { + self.inner + .resolve_awakeable(&self.key, payload, self.opts) + .await + } + + /// Reject the awakeable with a failure message + pub async fn reject(self, message: impl Into) -> Result<(), IngressClientError> { + self.inner + .reject_awakeable(&self.key, &message.into(), self.opts) + .await + } +} diff --git a/src/ingress_client/handle.rs b/src/ingress_client/handle.rs new file mode 100644 index 0000000..5175002 --- /dev/null +++ b/src/ingress_client/handle.rs @@ -0,0 +1,165 @@ +use std::{ + fmt::{self, Display, Formatter}, + marker::PhantomData, + time::Duration, +}; + +use super::internal::{IngressClientError, IngressInternal}; +use crate::serde::Deserialize; + +/// The target invocation or workflow to retrieve the handle of. +#[derive(Debug, Clone)] +pub enum HandleTarget { + Invocation { + id: String, + }, + Service { + name: String, + handler: String, + idempotency_key: String, + }, + Object { + name: String, + key: String, + handler: String, + idempotency_key: String, + }, + Workflow { + name: String, + id: String, + }, +} + +impl HandleTarget { + pub fn invocation(id: impl Into) -> Self { + Self::Invocation { id: id.into() } + } + + pub fn service( + name: impl Into, + handler: impl Into, + idempotency_key: impl Into, + ) -> Self { + Self::Service { + name: name.into(), + handler: handler.into(), + idempotency_key: idempotency_key.into(), + } + } + + pub fn object( + name: impl Into, + key: impl Into, + handler: impl Into, + idempotency_key: impl Into, + ) -> Self { + Self::Object { + name: name.into(), + key: key.into(), + handler: handler.into(), + idempotency_key: idempotency_key.into(), + } + } + + pub fn workflow(name: impl Into, id: impl Into) -> Self { + Self::Workflow { + name: name.into(), + id: id.into(), + } + } +} + +impl Display for HandleTarget { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + HandleTarget::Invocation { id } => { + write!(f, "restate/invocation/{id}") + } + HandleTarget::Service { + name, + handler, + idempotency_key, + } => { + write!(f, "restate/invocation/{name}/{handler}/{idempotency_key}") + } + HandleTarget::Object { + name, + key, + handler, + idempotency_key, + } => write!( + f, + "restate/invocation/{name}/{key}/{handler}/{idempotency_key}" + ), + HandleTarget::Workflow { name, id } => { + write!(f, "restate/workflow/{name}/{id}") + } + } + } +} + +/// The mode of operation to use on the handle of the invocation or workflow. +#[derive(Debug, Clone, Copy)] +pub enum HandleOp { + Attach, + Output, +} + +impl Display for HandleOp { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + HandleOp::Attach => write!(f, "attach"), + HandleOp::Output => write!(f, "output"), + } + } +} + +/// This struct encapsulates the parameters for operating on the handle of an invocation or workflow. +pub struct IngressHandle<'a, Res = ()> { + inner: &'a IngressInternal, + target: HandleTarget, + res: PhantomData, + opts: IngressHandleOptions, +} + +#[derive(Default)] +pub(super) struct IngressHandleOptions { + pub(super) timeout: Option, +} + +impl<'a, Res> IngressHandle<'a, Res> { + pub(super) fn new(inner: &'a IngressInternal, target: HandleTarget) -> Self { + Self { + inner, + target, + res: PhantomData, + opts: Default::default(), + } + } + + /// Set the timeout for the request. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.opts.timeout = Some(timeout); + self + } + + /// Attach to the invocation or workflow and wait for it to finish. + pub async fn attach(self) -> Result + where + Res: Deserialize + 'static, + { + self.inner + .handle(self.target, HandleOp::Attach, self.opts) + .await + } + + /// Peek at the output of the invocation or workflow. + pub async fn output(self) -> Result + where + Res: Deserialize + 'static, + { + self.inner + .handle(self.target, HandleOp::Output, self.opts) + .await + } +} diff --git a/src/ingress_client/internal.rs b/src/ingress_client/internal.rs new file mode 100644 index 0000000..aa42948 --- /dev/null +++ b/src/ingress_client/internal.rs @@ -0,0 +1,272 @@ +use std::time::Duration; + +use http::HeaderValue; +use reqwest::Url; +use thiserror::Error; + +use super::{ + awakeable::IngressAwakeableOptions, + handle::{HandleOp, HandleTarget, IngressHandleOptions}, + request::{IngressRequestOptions, SendResponse, SendStatus}, +}; +use crate::{ + context::RequestTarget, + errors::TerminalError, + serde::{Deserialize, Serialize}, +}; + +const IDEMPOTENCY_KEY_HEADER: &str = "Idempotency-Key"; +const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json"); +const TEXT_PLAIN: HeaderValue = HeaderValue::from_static("text/plain"); + +#[derive(serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct SendResponseSchema { + invocation_id: String, + status: SendStatusSchema, +} + +#[derive(serde::Deserialize)] +enum SendStatusSchema { + Accepted, + PreviouslyAccepted, +} + +impl From for SendStatus { + fn from(value: SendStatusSchema) -> Self { + match value { + SendStatusSchema::Accepted => SendStatus::Accepted, + SendStatusSchema::PreviouslyAccepted => SendStatus::PreviouslyAccepted, + } + } +} + +#[derive(serde::Deserialize)] +struct TerminalErrorSchema { + code: Option, + message: String, +} + +pub(super) struct IngressInternal { + pub(super) client: reqwest::Client, + pub(super) url: Url, +} + +#[derive(Debug, Error)] +pub enum IngressClientError { + #[error(transparent)] + Http(#[from] reqwest::Error), + #[error("{0}")] + Terminal(TerminalError), + #[error(transparent)] + Serde(Box), +} + +impl From for IngressClientError { + fn from(value: TerminalError) -> Self { + Self::Terminal(value) + } +} + +impl IngressInternal { + pub(super) async fn call( + &self, + target: RequestTarget, + req: Req, + opts: IngressRequestOptions, + ) -> Result { + let url = format!("{}/{target}", self.url.as_str().trim_end_matches("/")); + + let mut builder = self + .client + .post(url) + .header(http::header::CONTENT_TYPE, APPLICATION_JSON) + .body( + req.serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); + + if let Some(key) = opts.idempotency_key { + builder = builder.header(IDEMPOTENCY_KEY_HEADER, key); + } + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) + } else { + Err(e.into()) + } + } else { + Ok(Res::deserialize(&mut res.bytes().await?) + .map_err(|e| IngressClientError::Serde(Box::new(e)))?) + } + } + + pub(super) async fn send( + &self, + target: RequestTarget, + req: Req, + opts: IngressRequestOptions, + delay: Option, + ) -> Result { + let url = if let Some(delay) = delay { + format!( + "{}/{target}/send?delay={}ms", + self.url.as_str().trim_end_matches("/"), + delay.as_millis() + ) + } else { + format!("{}/{target}/send", self.url.as_str().trim_end_matches("/")) + }; + + let mut builder = self + .client + .post(url) + .header(http::header::CONTENT_TYPE, APPLICATION_JSON) + .body( + req.serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); + + let attachable = if let Some(key) = opts.idempotency_key { + builder = builder.header(IDEMPOTENCY_KEY_HEADER, key); + true + } else { + false + }; + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) + } else { + Err(e.into()) + } + } else { + let res = res.json::().await?; + Ok(SendResponse { + invocation_id: res.invocation_id, + status: res.status.into(), + attachable, + }) + } + } + + pub(super) async fn handle( + &self, + target: HandleTarget, + op: HandleOp, + opts: IngressHandleOptions, + ) -> Result { + let url = format!("{}/{target}/{op}", self.url.as_str().trim_end_matches("/")); + + let mut builder = self.client.get(url); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) + } else { + Err(e.into()) + } + } else { + Ok(Res::deserialize(&mut res.bytes().await?) + .map_err(|e| IngressClientError::Serde(Box::new(e)))?) + } + } + + pub(super) async fn resolve_awakeable( + &self, + key: &str, + payload: T, + opts: IngressAwakeableOptions, + ) -> Result<(), IngressClientError> { + let url = format!( + "{}/restate/awakeables/{}/resolve", + self.url.as_str().trim_end_matches("/"), + key + ); + + let mut builder = self + .client + .post(url) + .header(http::header::CONTENT_TYPE, APPLICATION_JSON) + .body( + payload + .serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) + } else { + Err(e.into()) + } + } else { + Ok(()) + } + } + + pub(super) async fn reject_awakeable( + &self, + key: &str, + message: &str, + opts: IngressAwakeableOptions, + ) -> Result<(), IngressClientError> { + let url = format!( + "{}/restate/awakeables/{}/reject", + self.url.as_str().trim_end_matches("/"), + key + ); + + let mut builder = self + .client + .post(url) + .header(http::header::CONTENT_TYPE, TEXT_PLAIN) + .body(message.to_string()); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) + } else { + Err(e.into()) + } + } else { + Ok(()) + } + } +} diff --git a/src/ingress_client/mod.rs b/src/ingress_client/mod.rs new file mode 100644 index 0000000..c749f22 --- /dev/null +++ b/src/ingress_client/mod.rs @@ -0,0 +1,132 @@ +use awakeable::IngressAwakeable; +use reqwest::Url; + +use self::{ + handle::{HandleTarget, IngressHandle}, + internal::IngressInternal, + request::IngressRequest, +}; +use crate::context::RequestTarget; + +pub mod awakeable; +pub mod handle; +pub mod internal; +pub mod request; + +/// A client for invoking handlers via the ingress. +pub struct IngressClient { + inner: IngressInternal, +} + +impl IngressClient { + /// Create a new [`IngressClient`]. + pub fn new(url: Url) -> Self { + Self { + inner: IngressInternal { + client: reqwest::Client::new(), + url, + }, + } + } + + /// Create a new [`IngressClient`] with a custom client. + pub fn new_with_client(url: Url, client: reqwest::Client) -> Self { + Self { + inner: IngressInternal { client, url }, + } + } + + /// Create a new [`IngressRequest`]. + pub fn request(&self, target: RequestTarget, req: Req) -> IngressRequest { + IngressRequest::new(&self.inner, target, req) + } + + /// Create a new [`IngressHandle`]. + pub fn handle(&self, target: HandleTarget) -> IngressHandle { + IngressHandle::new(&self.inner, target) + } + + pub fn service_ingress<'a, I>(&'a self) -> I + where + I: IntoServiceIngress<'a>, + { + I::create_ingress(self) + } + + pub fn object_ingress<'a, I>(&'a self, key: impl Into) -> I + where + I: IntoObjectIngress<'a>, + { + I::create_ingress(self, key.into()) + } + + pub fn workflow_ingress<'a, I>(&'a self, id: impl Into) -> I + where + I: IntoWorkflowIngress<'a>, + { + I::create_ingress(self, id.into()) + } + + pub fn invocation_handle( + &self, + invocation_id: impl Into, + ) -> IngressHandle<'_, Res> { + self.handle(HandleTarget::invocation(invocation_id)) + } + + pub fn service_handle<'a, H>(&'a self) -> H + where + H: IntoServiceHandle<'a>, + { + H::create_handle(self) + } + + pub fn object_handle<'a, H>(&'a self, key: impl Into) -> H + where + H: IntoObjectHandle<'a>, + { + H::create_handle(self, key.into()) + } + + pub fn workflow_handle<'a, H>(&'a self, id: impl Into) -> H + where + H: IntoWorkflowHandle<'a>, + { + H::create_handle(self, id.into()) + } + + /// Create a new [`IngressAwakeable`]. + pub fn awakeable(&self, key: impl Into) -> IngressAwakeable<'_> { + IngressAwakeable::new(&self.inner, key) + } +} + +/// Trait used by codegen to use the service ingress. +pub trait IntoServiceIngress<'a>: Sized { + fn create_ingress(client: &'a IngressClient) -> Self; +} + +/// Trait used by codegen to use the object ingress. +pub trait IntoObjectIngress<'a>: Sized { + fn create_ingress(client: &'a IngressClient, key: String) -> Self; +} + +/// Trait used by codegen to use the workflow ingress. +pub trait IntoWorkflowIngress<'a>: Sized { + fn create_ingress(client: &'a IngressClient, id: String) -> Self; +} + +/// Trait used by codegen to use the service handle. +pub trait IntoServiceHandle<'a>: Sized { + fn create_handle(client: &'a IngressClient) -> Self; +} + +/// Trait used by codegen to use the object handle. +pub trait IntoObjectHandle<'a>: Sized { + fn create_handle(client: &'a IngressClient, key: String) -> Self; +} + +/// Trait used by codegen to use the workflow handle. +pub trait IntoWorkflowHandle<'a>: Sized { + fn create_handle(client: &'a IngressClient, id: String) -> Self; +} diff --git a/src/ingress_client/request.rs b/src/ingress_client/request.rs new file mode 100644 index 0000000..9f93ae2 --- /dev/null +++ b/src/ingress_client/request.rs @@ -0,0 +1,93 @@ +use std::{marker::PhantomData, time::Duration}; + +use super::internal::{IngressClientError, IngressInternal}; +use crate::{ + context::RequestTarget, + serde::{Deserialize, Serialize}, +}; + +/// A send response. +#[derive(Debug, Clone)] +pub struct SendResponse { + pub invocation_id: String, + pub status: SendStatus, + pub attachable: bool, +} + +/// The status of the send. +#[derive(Debug, Clone, Copy)] +pub enum SendStatus { + Accepted, + PreviouslyAccepted, +} + +/// This struct encapsulates the parameters for a request to an ingress. +pub struct IngressRequest<'a, Req, Res = ()> { + inner: &'a IngressInternal, + target: RequestTarget, + req: Req, + res: PhantomData, + opts: IngressRequestOptions, +} + +#[derive(Default, Clone)] +pub(super) struct IngressRequestOptions { + pub(super) idempotency_key: Option, + pub(super) timeout: Option, +} + +impl<'a, Req, Res> IngressRequest<'a, Req, Res> { + pub(super) fn new(inner: &'a IngressInternal, target: RequestTarget, req: Req) -> Self { + Self { + inner, + target, + req, + res: PhantomData, + opts: Default::default(), + } + } + + /// Set the idempotency key for the request. + pub fn idempotency_key(mut self, value: impl Into) -> Self { + self.opts.idempotency_key = Some(value.into()); + self + } + + /// Set the timeout for the request. + pub fn timeout(mut self, value: Duration) -> Self { + self.opts.timeout = Some(value); + self + } + + /// Call a service via the ingress. This returns a future encapsulating the response. + pub async fn call(self) -> Result + where + Req: Serialize + 'static, + Res: Deserialize + 'static, + { + self.inner.call(self.target, self.req, self.opts).await + } + + /// Send the request to the ingress, without waiting for the response. + pub async fn send(self) -> Result + where + Req: Serialize + 'static, + { + self.inner + .send(self.target, self.req, self.opts, None) + .await + } + + /// Schedule the request to the ingress, without waiting for the response. + pub async fn send_with_delay( + self, + duration: Duration, + ) -> Result + where + Req: Serialize + 'static, + { + self.inner + .send(self.target, self.req, self.opts, Some(duration)) + .await + } +} diff --git a/src/lib.rs b/src/lib.rs index 930e9d6..454bf63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -222,6 +222,8 @@ pub mod errors; pub mod http_server; #[cfg(feature = "hyper")] pub mod hyper; +#[cfg(feature = "ingress_client")] +pub mod ingress_client; pub mod serde; /// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1).