-
Notifications
You must be signed in to change notification settings - Fork 10
rust testcontainer framework #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
0521ff2
b46ad68
9451953
cffc3fa
475ae7d
44a4255
4c46cdc
bf7f48d
24129fa
88ef9a4
af3613a
a4856c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
[package] | ||
name = "restate-test-utils" | ||
version = "0.3.2" | ||
edition = "2021" | ||
description = "Test Utilities for Restate SDK for Rust" | ||
license = "MIT" | ||
repository = "https://github.com/restatedev/sdk-rust/test-utils" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just |
||
rust-version = "1.76.0" | ||
|
||
|
||
[dependencies] | ||
futures = "0.3.31" | ||
http = "1.2.0" | ||
nu-ansi-term = "0.50.1" | ||
reqwest = {version= "0.12.12", features = ["json"]} | ||
restate-sdk = { version = "0.3.2", path = ".." } | ||
restate-sdk-shared-core = "0.2.0" | ||
serde = "1.0.217" | ||
serde_json = "1.0.138" | ||
testcontainers = "0.23.1" | ||
tokio = "1.43.0" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also re-check which dependencies you need here? I think some of those are not needed |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pub mod test_utils; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
use nu_ansi_term::Style; | ||
use reqwest::Response; | ||
use testcontainers::{core::{IntoContainerPort, WaitFor}, runners::AsyncRunner, ContainerAsync, ContainerRequest, GenericImage, ImageExt}; | ||
use serde::{Serialize, Deserialize}; | ||
use restate_sdk::{discovery::Service, errors::HandlerError, prelude::{Endpoint, HttpServer}}; | ||
use tokio::{io::{self, AsyncWriteExt}, task::{self, JoinHandle}}; | ||
use std::time::Duration; | ||
|
||
// addapted from from restate-admin-rest-model crate version 1.1.6 | ||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct RegisterDeploymentRequestHttp { | ||
uri: String, | ||
additional_headers:Option<Vec<(String, String)>>, | ||
use_http_11: bool, | ||
force: bool, | ||
dry_run: bool | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct RegisterDeploymentRequestLambda { | ||
arn: String, | ||
assume_role_arn: Option<String>, | ||
force: bool, | ||
dry_run: bool, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
struct VersionResponse { | ||
version:String, | ||
min_admin_api_version:u32, | ||
max_admin_api_version:u32 | ||
} | ||
|
||
pub struct TestContainer { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if the test infra bootstraps the service endpoint server too, and then on |
||
container:ContainerAsync<GenericImage>, | ||
stdout_logging:JoinHandle<()>, | ||
stderr_logging:JoinHandle<()>, | ||
endpoint:Option<JoinHandle<()>> | ||
} | ||
|
||
|
||
impl TestContainer { | ||
|
||
//"docker.io/restatedev/restate", "latest" | ||
pub async fn new(image:&str, version:&str) -> Result<Self, HandlerError> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should use |
||
|
||
let image = GenericImage::new(image, version) | ||
.with_exposed_port(9070.tcp()) | ||
.with_exposed_port(8080.tcp()) | ||
.with_wait_for(WaitFor::message_on_stdout("Ingress HTTP listening")); | ||
|
||
// have to expose entire host network because testcontainer-rs doesn't implement selective SSH port forward from host | ||
// see https://github.com/testcontainers/testcontainers-rs/issues/535 | ||
let container = ContainerRequest::from(image) | ||
.with_host("host.docker.internal" , testcontainers::core::Host::HostGateway) | ||
slinkydeveloper marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.start() | ||
.await?; | ||
|
||
let mut container_stdout = container.stdout(true); | ||
// Spawn a task to copy data from the AsyncBufRead to stdout | ||
let stdout_logging = task::spawn(async move { | ||
let mut stdout = io::stdout(); | ||
if let Err(e) = io::copy(&mut container_stdout, &mut stdout).await { | ||
eprintln!("Error copying data: {}", e); | ||
} | ||
}); | ||
|
||
let mut container_stderr = container.stderr(true); | ||
// Spawn a task to copy data from the AsyncBufRead to stderr | ||
let stderr_logging = task::spawn(async move { | ||
let mut stderr = io::stderr(); | ||
if let Err(e) = io::copy(&mut container_stderr, &mut stderr).await { | ||
eprintln!("Error copying data: {}", e); | ||
} | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO this should be printed using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I took a shot at doing this via tracing::debug and thought it was way too verbose. As an alternative I added a |
||
|
||
let host = container.get_host().await?; | ||
let ports = container.ports().await?; | ||
|
||
let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); | ||
|
||
let admin_url = format!("http://{}:{}/version", host, admin_port); | ||
reqwest::get(admin_url) | ||
.await? | ||
.json::<VersionResponse>() | ||
.await?; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a log line here (with |
||
Ok(TestContainer {container, stdout_logging, stderr_logging, endpoint:None}) | ||
} | ||
|
||
pub async fn serve_endpoint(&mut self, endpoint:Endpoint) { | ||
|
||
println!("\n\n{}\n\n", Style::new().bold().paint(format!("starting enpoint server..."))); | ||
// uses higher port number to avoid collisions | ||
// with non-test instances running locally | ||
let host_port:u16 = 19080; | ||
let host_address = format!("0.0.0.0:{}", host_port); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to start the TcpListener on port |
||
|
||
// boot restate server | ||
let endpoint = tokio::spawn(async move { | ||
HttpServer::new(endpoint) | ||
.listen_and_serve(host_address.parse().unwrap()).await; | ||
}); | ||
|
||
let registered = self.register(host_port).await; | ||
|
||
assert!(registered.is_ok()); | ||
|
||
self.endpoint = Some(endpoint); | ||
} | ||
|
||
async fn register(&self, server_port:u16) -> Result<(), HandlerError> { | ||
|
||
println!("\n\n{}\n\n", Style::new().bold().paint(format!("registering server..."))); | ||
|
||
let host = self.container.get_host().await?; | ||
let ports = self.container.ports().await?; | ||
|
||
let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); | ||
let server_url = format!("http://localhost:{}", admin_port); | ||
|
||
let client = reqwest::Client::builder().http2_prior_knowledge().build()?; | ||
|
||
// wait for server to respond | ||
while let Err(_) = client.get(format!("{}/health", server_url)) | ||
.header("accept", "application/vnd.restate.endpointmanifest.v1+json") | ||
.send().await { | ||
tokio::time::sleep(Duration::from_secs(1)).await; | ||
} | ||
|
||
client.get(format!("{}/health", server_url)) | ||
.header("accept", "application/vnd.restate.endpointmanifest.v1+json") | ||
.send().await?; | ||
|
||
let deployment_uri:String = format!("http://host.docker.internal:{}/", server_port); | ||
let deployment_payload = RegisterDeploymentRequestHttp { | ||
uri:deployment_uri, | ||
additional_headers:None, | ||
use_http_11: false, | ||
force: false, | ||
dry_run: false }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } | ||
|
||
let register_admin_url = format!("http://{}:{}/deployments", host, admin_port); | ||
|
||
client.post(register_admin_url) | ||
.json(&deployment_payload) | ||
.send().await?; | ||
|
||
let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); | ||
let ingress_host = format!("http://{}:{}", host, ingress_port); | ||
|
||
println!("\n\n{}\n\n", Style::new().bold().paint(format!("ingress url: {}", ingress_host, ))); | ||
|
||
return Ok(()); | ||
} | ||
|
||
pub async fn delay(milliseconds:u64) { | ||
tokio::time::sleep(Duration::from_millis(milliseconds)).await; | ||
} | ||
|
||
pub async fn invoke(&self, service:Service, handler:&str) -> Result<Response, HandlerError> { | ||
|
||
let host = self.container.get_host().await?; | ||
let ports = self.container.ports().await?; | ||
|
||
let client = reqwest::Client::builder().http2_prior_knowledge().build().unwrap(); | ||
|
||
let service_name:String = service.name.to_string(); | ||
let handler_names:Vec<String> = service.handlers.iter().map(|h|h.name.to_string()).collect(); | ||
|
||
assert!(handler_names.contains(&handler.to_string())); | ||
|
||
println!("\n\n{}\n\n", Style::new().bold().paint(format!("invoking {}/{}", service_name, handler))); | ||
|
||
let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); | ||
let admin_host = format!("http://{}:{}", host, admin_port); | ||
|
||
let service_discovery_url = format!("{}/services/{}/handlers", admin_host, service_name); | ||
|
||
client.get(service_discovery_url) | ||
.send().await?; | ||
|
||
// todo verify discovery response contains service/handler | ||
|
||
let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); | ||
let ingress_host = format!("http://{}:{}", host, ingress_port); | ||
|
||
let ingress_handler_url = format!("{}/{}/{}", ingress_host, service_name, handler); | ||
|
||
let ingress_resopnse = client.post(ingress_handler_url) | ||
.send().await?; | ||
|
||
return Ok(ingress_resopnse); | ||
} | ||
} | ||
|
||
impl Drop for TestContainer { | ||
fn drop(&mut self) { | ||
|
||
// todo cleanup on drop? | ||
// testcontainers-rs already implements stop/rm on drop] | ||
// https://docs.rs/testcontainers/latest/testcontainers/ | ||
// | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you probably need to stop the hyper server here on drop |
||
|
||
} | ||
} | ||
|
||
// #[tokio::test] | ||
// async fn boot_test_container() { | ||
// let _test_comtainer = crate::test_utils::TestContainer::new("docker.io/restatedev/restate".to_string(), "latest".to_string()).await.unwrap(); | ||
// } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
use restate_test_utils::test_utils::TestContainer; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you move this test in |
||
use restate_sdk::{discovery::{self, Service}, prelude::*}; | ||
|
||
// Should compile | ||
#[restate_sdk::service] | ||
trait MyService { | ||
async fn my_handler() -> HandlerResult<String>; | ||
} | ||
|
||
#[restate_sdk::object] | ||
trait MyObject { | ||
async fn my_handler(input: String) -> HandlerResult<String>; | ||
#[shared] | ||
async fn my_shared_handler(input: String) -> HandlerResult<String>; | ||
} | ||
|
||
#[restate_sdk::workflow] | ||
trait MyWorkflow { | ||
async fn my_handler(input: String) -> HandlerResult<String>; | ||
#[shared] | ||
async fn my_shared_handler(input: String) -> HandlerResult<String>; | ||
} | ||
|
||
|
||
struct MyServiceImpl; | ||
|
||
impl MyService for MyServiceImpl { | ||
async fn my_handler(&self, _: Context<'_>) -> HandlerResult<String> { | ||
let result = "hello!"; | ||
Ok(result.to_string()) | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_container_image() { | ||
|
||
let mut test_container = TestContainer::new("docker.io/restatedev/restate", "latest").await.unwrap(); | ||
|
||
let endpoint = Endpoint::builder() | ||
.bind(MyServiceImpl.serve()) | ||
.build(); | ||
|
||
test_container.serve_endpoint(endpoint).await; | ||
|
||
// optionally insert a delays via tokio sleep | ||
TestContainer::delay(1000).await; | ||
|
||
// optionally call invoke on service handlers | ||
use restate_sdk::service::Discoverable; | ||
let my_service:Service = ServeMyService::<MyServiceImpl>::discover(); | ||
let invoke_response = test_container.invoke(my_service, "my_handler").await; | ||
|
||
assert!(invoke_response.is_ok()); | ||
|
||
println!("invoke response:"); | ||
println!("{}", invoke_response.unwrap().text().await.unwrap()); | ||
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we're getting there with the API, maybe let me describe better what I had in mind that aligns with the other SDKs:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename the package to
restate-test-env
and the containing directory totest-env