Skip to content

Implement ingress client #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful"
pin-project-lite = "0.2"
rand = { version = "0.8.5", optional = true }
regress = "0.10"
reqwest = { version = "0.12", features = ["json"] }
restate-sdk-macros = { version = "0.3.2", path = "macros" }
restate-sdk-shared-core = "0.1.0"
serde = "1.0"
Expand Down
168 changes: 168 additions & 0 deletions src/ingress/internal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::time::Duration;

use reqwest::{header::HeaderMap, Url};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use super::{
request::{IngressRequestOptions, SendResponse, SendStatus},
result::{IngressResultOptions, ResultOp, ResultTarget},
};
use crate::{context::RequestTarget, errors::TerminalError};

const IDEMPOTENCY_KEY_HEADER: &str = "Idempotency-Key";

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct SendResponseSchema {
invocation_id: String,
status: SendStatusSchema,
}

#[derive(Deserialize)]
enum SendStatusSchema {
Accepted,
PreviouslyAccepted,
}

impl From<SendStatusSchema> for SendStatus {
fn from(value: SendStatusSchema) -> Self {
match value {
SendStatusSchema::Accepted => SendStatus::Accepted,
SendStatusSchema::PreviouslyAccepted => SendStatus::PreviouslyAccepted,
}
}
}

#[derive(Deserialize)]
struct TerminalErrorSchema {
code: Option<u16>,
message: String,
}

pub(super) struct IngressInternal {
pub(super) client: reqwest::Client,
pub(super) url: Url,
pub(super) headers: HeaderMap,
}

impl IngressInternal {
pub(super) async fn call<Req: Serialize, Res: DeserializeOwned>(
&self,
target: RequestTarget,
req: Req,
opts: IngressRequestOptions,
) -> Result<Result<Res, TerminalError>, reqwest::Error> {
let mut headers = self.headers.clone();
if let Some(key) = opts.idempotency_key {
headers.append(IDEMPOTENCY_KEY_HEADER, key);
}

let url = format!("{}/{target}", self.url.as_str().trim_end_matches("/"));

let mut builder = self.client.post(url).headers(headers).json(&req);

if let Some(timeout) = opts.timeout {
builder = builder.timeout(timeout);
}

let res = builder.send().await?;

if let Err(e) = res.error_for_status_ref() {
let status = res.status().as_u16();
if let Ok(e) = res.json::<TerminalErrorSchema>().await {

Check failure on line 72 in src/ingress/internal.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

this block may be rewritten with the `?` operator
Ok(Err(TerminalError::new_with_code(
e.code.unwrap_or(status),
e.message,
)))
} else {
Err(e)
}
} else {
Ok(Ok(res.json::<Res>().await?))
}
}

pub(super) async fn send<Req: Serialize>(
&self,
target: RequestTarget,
req: Req,
opts: IngressRequestOptions,
delay: Option<Duration>,
) -> Result<Result<SendResponse, TerminalError>, reqwest::Error> {
let mut headers = self.headers.clone();
let attachable = if let Some(key) = opts.idempotency_key {
headers.append(IDEMPOTENCY_KEY_HEADER, key);
true
} else {
false
};

let url = if let Some(delay) = delay {
format!(
"{}/{target}/send?delay={}ms",
self.url.as_str().trim_end_matches("/"),
delay.as_millis()
)
} else {
format!("{}/{target}/send", self.url.as_str().trim_end_matches("/"))
};

let mut builder = self.client.post(url).headers(headers).json(&req);

if let Some(timeout) = opts.timeout {
builder = builder.timeout(timeout);
}

let res = builder.send().await?;

if let Err(e) = res.error_for_status_ref() {
let status = res.status().as_u16();
if let Ok(e) = res.json::<TerminalErrorSchema>().await {

Check failure on line 120 in src/ingress/internal.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

this block may be rewritten with the `?` operator
Ok(Err(TerminalError::new_with_code(
e.code.unwrap_or(status),
e.message,
)))
} else {
Err(e)
}
} else {
let res = res.json::<SendResponseSchema>().await?;
Ok(Ok(SendResponse {
invocation_id: res.invocation_id,
status: res.status.into(),
attachable,
}))
}
}

pub(super) async fn result<Res: DeserializeOwned>(
&self,
target: ResultTarget,
op: ResultOp,
opts: IngressResultOptions,
) -> Result<Result<Res, TerminalError>, reqwest::Error> {
let url = format!("{}/{target}/{op}", self.url.as_str().trim_end_matches("/"));

let mut builder = self.client.get(url).headers(self.headers.clone());

if let Some(timeout) = opts.timeout {
builder = builder.timeout(timeout);
}

let res = builder.send().await?;

if let Err(e) = res.error_for_status_ref() {
let status = res.status().as_u16();
if let Ok(e) = res.json::<TerminalErrorSchema>().await {

Check failure on line 156 in src/ingress/internal.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

this block may be rewritten with the `?` operator
Ok(Err(TerminalError::new_with_code(
e.code.unwrap_or(status),
e.message,
)))
} else {
Err(e)
}
} else {
Ok(Ok(res.json::<Res>().await?))
}
}
}
130 changes: 130 additions & 0 deletions src/ingress/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use reqwest::{header::HeaderMap, Url};

use self::{
internal::IngressInternal,
request::IngressRequest,
result::{IngressResult, ResultTarget},
};
use crate::context::RequestTarget;

pub mod internal;
pub mod request;
pub mod result;

/// A client for invoking handlers via the ingress.
pub struct IngressClient {
inner: IngressInternal,
}

impl IngressClient {
/// Create a new [`IngressClient`].
pub fn new(url: Url) -> Self {
Self {
inner: IngressInternal {
client: reqwest::Client::new(),
url,
headers: Default::default(),
},
}
}

/// Create a new [`IngressClient`] with custom headers.
pub fn new_with_headers(url: Url, headers: HeaderMap) -> Self {
Self {
inner: IngressInternal {
client: reqwest::Client::new(),
url,
headers,
},
}
}

/// Create a new [`IngressRequest`].
pub fn request<Req, Res>(&self, target: RequestTarget, req: Req) -> IngressRequest<Req, Res> {
IngressRequest::new(&self.inner, target, req)
}

/// Create a new [`IngressResult`].
pub fn result<Res>(&self, target: ResultTarget) -> IngressResult<Res> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the naming result should be replaced by attach

Copy link
Author

@patrickariel patrickariel Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that the method returns a type that the user will have to either call .attach() or .output() on. So calling it attach will make the chain look a bit weird (e.g. .attach().attach()).

The actual usage will look something like this (see my first post for more examples):

let res = client
    .service_result::<MyServiceResult>()
    .my_handler("lorem_ipsum") // <- this is the idempotency key
    .attach() // or .output()
    .await?;

let res = client
    .workflow_result::<MyWorkflowResult>("Me")
    .attach() // or .output()
    .await?;

The term "result" was taken from this part of the docs:

Restate allows you to retrieve the result of workflows and invocations with an idempotency key.

Do you have any suggestions for a better term? Some ideas: response, answer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhhh got it, i think then the name should be workflow_handle/invocation_handle, see here https://docs.restate.dev/javadocs/dev/restate/sdk/client/Client

IngressResult::new(&self.inner, target)
}

pub fn service_ingress<'a, I>(&'a self) -> I
where
I: IntoServiceIngress<'a>,
{
I::create_ingress(self)
}

pub fn object_ingress<'a, I>(&'a self, key: impl Into<String>) -> I
where
I: IntoObjectIngress<'a>,
{
I::create_ingress(self, key.into())
}

pub fn workflow_ingress<'a, I>(&'a self, id: impl Into<String>) -> I
where
I: IntoWorkflowIngress<'a>,
{
I::create_ingress(self, id.into())
}

pub fn invocation_result<'a, Res>(

Check failure on line 73 in src/ingress/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

the following explicit lifetimes could be elided: 'a
&'a self,
invocation_id: impl Into<String>,
) -> IngressResult<'a, Res> {
self.result(ResultTarget::invocation(invocation_id))
}

pub fn service_result<'a, R>(&'a self) -> R
where
R: IntoServiceResult<'a>,
{
R::create_result(self)
}

pub fn object_result<'a, R>(&'a self, key: impl Into<String>) -> R
where
R: IntoObjectResult<'a>,
{
R::create_result(self, key.into())
}

pub fn workflow_result<'a, R>(&'a self, id: impl Into<String>) -> R
where
R: IntoWorkflowResult<'a>,
{
R::create_result(self, id.into())
}
}

/// Trait used by codegen to use the service ingress.
pub trait IntoServiceIngress<'a>: Sized {
fn create_ingress(client: &'a IngressClient) -> Self;
}

/// Trait used by codegen to use the object ingress.
pub trait IntoObjectIngress<'a>: Sized {
fn create_ingress(client: &'a IngressClient, key: String) -> Self;
}

/// Trait used by codegen to use the workflow ingress.
pub trait IntoWorkflowIngress<'a>: Sized {
fn create_ingress(client: &'a IngressClient, id: String) -> Self;
}

/// Trait used by codegen to retrieve the service result.
pub trait IntoServiceResult<'a>: Sized {
fn create_result(client: &'a IngressClient) -> Self;
}

/// Trait used by codegen to retrieve the object result.
pub trait IntoObjectResult<'a>: Sized {
fn create_result(client: &'a IngressClient, key: String) -> Self;
}

/// Trait used by codegen to retrieve the workflow result.
pub trait IntoWorkflowResult<'a>: Sized {
fn create_result(client: &'a IngressClient, id: String) -> Self;
}
Loading
Loading