Skip to content

Improve error handling in engine_api client using thiserror. Fixes #326 #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 94 additions & 56 deletions crates/engine-api/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{ffi::CString, os::unix::prelude::FileTypeExt};

use anyhow::{Context, Result, anyhow, bail, ensure};
use futures::{Stream, StreamExt};
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::body::{Buf, Bytes};
Expand All @@ -13,7 +12,7 @@ use tokio_tungstenite::{client_async, tungstenite::Message};

use crate::{
dto::{ConfigKV, ModuleConfigKVs},
error::WebsocketError,
error::{EngineClientError, WebsocketError},
};

#[derive(Debug, Clone)]
Expand All @@ -23,44 +22,43 @@ pub struct EngineApiClient {
}

impl EngineApiClient {
pub fn new() -> Result<Self> {
pub fn new() -> Result<Self, EngineClientError> {
Self::unix(super::DEFAULT_UDS.to_owned())
}

pub fn unix(socket: String) -> Result<Self> {
pub fn unix(socket: String) -> Result<Self, EngineClientError> {
// The is a bug using Metadata in combination with OpenOption when using pulsar cli via sudo so I had to use the libc to verify the permissions
// Probably https://stackoverflow.com/questions/71505367/no-such-a-file-or-directory-even-path-is-file-true it's a reference
// TODO: Should be investigated

// Check if input exists and if it is a unix socket
match std::fs::metadata(&socket) {
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => {
bail!("'{socket}' not found. Check if the daemon is running")
}
std::io::ErrorKind::PermissionDenied => {
bail!("No write permission on '{socket}'")
}
_ => {
bail!(
anyhow::Error::new(err)
.context(format!("Failed to get '{socket}' metadata"))
)
}
},
Err(err) => {
return match err.kind() {
std::io::ErrorKind::NotFound => Err(EngineClientError::SocketNotFound(socket)),
std::io::ErrorKind::PermissionDenied => {
Err(EngineClientError::NoReadPermission(socket))
}
_ => Err(EngineClientError::FailedToGetMetadata(socket)),
};
}
Ok(metadata) => {
ensure!(
metadata.file_type().is_socket(),
"'{socket}' is not a unix socket"
);
if !metadata.file_type().is_socket() {
return Err(EngineClientError::NotASocket(socket));
}
}
};

// Check for write permission on socket
let cstring = CString::new(socket.as_str())
.with_context(|| format!("Can't convert '{socket}' to a valid string "))?;
let cstring = match CString::new(socket.as_str()) {
Ok(cs) => cs,
Err(err) => return Err(EngineClientError::CStringConversion(err)),
};

let write_permission = unsafe { libc::access(cstring.as_ptr(), libc::W_OK) } == 0;
ensure!(write_permission, "No write permission on '{socket}'");
if !write_permission {
return Err(EngineClientError::NoWritePermission(socket));
}

Ok(Self {
socket,
Expand All @@ -72,52 +70,82 @@ impl EngineApiClient {
hyperlocal::Uri::new(self.socket.clone(), path.as_ref()).into()
}

async fn get<T: DeserializeOwned>(&self, uri: Uri) -> Result<T> {
async fn get<T: DeserializeOwned>(&self, uri: Uri) -> Result<T, EngineClientError> {
let req = Request::builder()
.method(Method::GET)
.uri(uri)
.body(Either::Right(Empty::<Bytes>::new()))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;
.map_err(EngineClientError::RequestBuilderError)?;

let res = self
.client
.request(req)
.await
.map_err(|err| anyhow!("Error during the http request: reason {}", err))?;
.map_err(EngineClientError::HyperError)?;

let buf = res.collect().await?.aggregate();
let status = res.status();

match status.is_success() {
true => {
let buf = res
.collect()
.await
.map_err(EngineClientError::CollectResponseError)?
.aggregate();

let output = serde_json::from_reader(buf.reader())
.map_err(|err| EngineClientError::DeserializeError(err.to_string()))?;

Ok(output)
}
false => {
let error = res
.collect()
.await
.map_err(EngineClientError::CollectResponseError)?
.to_bytes();

let output = serde_json::from_reader(buf.reader())?;
let error_str =
std::str::from_utf8(&error).map_err(EngineClientError::Utf8Error)?;

Ok(output)
Err(EngineClientError::UnexpectedResponse(format!(
"HTTP error {}: {}",
status.as_u16(),
error_str
)))
}
}
}

pub async fn list_modules(&self) -> Result<Vec<ModuleOverview>> {
pub async fn list_modules(&self) -> Result<Vec<ModuleOverview>, EngineClientError> {
let url = self.uri("/modules");
self.get(url).await
}

pub async fn get_configs(&self) -> Result<Vec<ModuleConfigKVs>> {
pub async fn get_configs(&self) -> Result<Vec<ModuleConfigKVs>, EngineClientError> {
let url = self.uri("/configs");
self.get(url).await
}

pub async fn start(&self, module_name: &str) -> Result<()> {
pub async fn start(&self, module_name: &str) -> Result<(), EngineClientError> {
let url = self.uri(format!("/modules/{module_name}/start"));
self.empty_post(url).await
}

pub async fn stop(&self, module_name: &str) -> Result<()> {
pub async fn stop(&self, module_name: &str) -> Result<(), EngineClientError> {
let url = self.uri(format!("/modules/{module_name}/stop"));
self.empty_post(url).await
}

pub async fn restart(&self, module_name: &str) -> Result<()> {
pub async fn restart(&self, module_name: &str) -> Result<(), EngineClientError> {
let url = self.uri(format!("/modules/{module_name}/restart"));
self.empty_post(url).await
}

pub async fn get_module_config(&self, module_name: &str) -> Result<Vec<ConfigKV>> {
pub async fn get_module_config(
&self,
module_name: &str,
) -> Result<Vec<ConfigKV>, EngineClientError> {
let url = self.uri(format!("/modules/{module_name}/config"));
self.get(url).await
}
Expand All @@ -127,27 +155,27 @@ impl EngineApiClient {
module_name: &str,
config_key: String,
config_value: String,
) -> Result<()> {
) -> Result<(), EngineClientError> {
let url = self.uri(format!("/modules/{module_name}/config"));

let body_string = serde_json::to_string(&ConfigKV {
key: config_key,
value: config_value,
})
.map_err(|err| anyhow!("Error during object serialization. Reason: {err}"))?;
.map_err(|err| EngineClientError::SerializeError(err.to_string()))?;

let req = Request::builder()
.method(Method::PATCH)
.uri(url)
.header("content-type", "application/json")
.body(Either::Left(Full::from(body_string)))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;
.map_err(EngineClientError::RequestBuilderError)?;

let res = self
.client
.request(req)
.await
.map_err(|err| anyhow!("Error during the http request. Reason: {}", err))?;
.map_err(EngineClientError::HyperError)?;

let status = res.status();

Expand All @@ -157,27 +185,29 @@ impl EngineApiClient {
let error = res
.collect()
.await
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?
.map_err(EngineClientError::CollectResponseError)?
.to_bytes();
let error = std::str::from_utf8(&error)
.map_err(|err| anyhow!("Cannot parse error str. Reason: {}", err))?;
Err(anyhow!("Error during request. {error}"))

let error_str =
std::str::from_utf8(&error).map_err(EngineClientError::Utf8Error)?;

Err(EngineClientError::UnexpectedResponse(error_str.to_string()))
}
}
}

async fn empty_post(&self, uri: Uri) -> Result<()> {
async fn empty_post(&self, uri: Uri) -> Result<(), EngineClientError> {
let req = Request::builder()
.method(Method::POST)
.uri(uri)
.body(Either::Right(Empty::<Bytes>::new()))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;
.map_err(EngineClientError::RequestBuilderError)?;

let res = self
.client
.request(req)
.await
.map_err(|err| anyhow!("Error during the http request. Reason: {}", err))?;
.map_err(EngineClientError::HyperError)?;

let status = res.status();

Expand All @@ -187,25 +217,33 @@ impl EngineApiClient {
let error = res
.collect()
.await
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?
.map_err(EngineClientError::CollectResponseError)?
.to_bytes();
let error = std::str::from_utf8(&error)
.map_err(|err| anyhow!("Cannot parse error str. Reason: {}", err))?;
Err(anyhow!("Error during request. {error}"))

let error_str =
std::str::from_utf8(&error).map_err(EngineClientError::Utf8Error)?;

Err(EngineClientError::UnexpectedResponse(error_str.to_string()))
}
}
}

pub async fn event_monitor(&self) -> Result<impl Stream<Item = Result<Event, WebsocketError>>> {
let stream = tokio::net::UnixStream::connect(&self.socket).await?;
pub async fn event_monitor(
&self,
) -> Result<impl Stream<Item = Result<Event, WebsocketError>>, EngineClientError> {
let stream = tokio::net::UnixStream::connect(&self.socket)
.await
.map_err(EngineClientError::SocketConnectionError)?;

// The `localhost` domain is simply a placeholder for the url. It's not used because is already present a stream
let (ws_stream, _) = client_async("ws://localhost/monitor", stream).await?;
let (ws_stream, _) = client_async("ws://localhost/monitor", stream)
.await
.map_err(EngineClientError::from)?;

let (_, read_stream) = ws_stream.split();

let events_stream = read_stream.map(|item| {
item.map_err(|e| e.into()).and_then(|msg| {
item.map_err(|err| err.into()).and_then(|msg| {
if let Message::Text(json) = msg {
let event: Event =
serde_json::from_str(&json).map_err(WebsocketError::JsonError)?;
Expand Down
Loading