Skip to content

Commit

Permalink
feat: don't log broken pipe error on server by default (#548)
Browse files Browse the repository at this point in the history
* feat: don't log broken pipe error on server by default

* fix ci fail on volo-http ws

* fix self test script on macOS
  • Loading branch information
PureWhiteWu authored Feb 17, 2025
1 parent 5a6018b commit 0b5d6be
Show file tree
Hide file tree
Showing 15 changed files with 811 additions and 717 deletions.
1,413 changes: 731 additions & 682 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
] }
clap = "4"
colored = "2"
colored = "3"
cookie = "0.18"
cookie_store = "0.21"
dashmap = "6"
dirs = "5"
dirs = "6"
faststr = { version = "0.2.21", features = ["serde"] }
futures = "0.3"
futures-util = "0.3"
flate2 = "1"
git2 = { version = "0.19", default-features = false }
governor = "0.7"
git2 = { version = "0.20", default-features = false }
governor = "0.8"
h2 = "0.4"
heck = "0.5"
hex = "0.4"
Expand All @@ -70,7 +70,7 @@ hyper = "1"
hyper-timeout = "0.5"
hyper-util = "0.1"
ipnet = "2"
itertools = "0.13"
itertools = "0.14"
itoa = "1"
libc = "0.2"
linkedbytes = "0.1"
Expand All @@ -85,7 +85,7 @@ mockall_double = "0.3"
multer = "3"
mur3 = "0.1"
nix = "0.29"
nom = "7"
nom = "8"
normpath = "1"
num_enum = "0.7"
once_cell = "1"
Expand All @@ -97,7 +97,7 @@ pin-project = "1"
pretty_env_logger = "0.5"
proc-macro2 = "1"
quote = "1"
rand = "0.8"
rand = "0.9"
regex = "1"
reqwest = "0.12"
run_script = "0.11"
Expand All @@ -112,7 +112,7 @@ simdutf8 = "0.1"
socket2 = "0.5"
sonic-rs = "0.3"
syn = "2"
sysinfo = "0.32"
sysinfo = "0.33"
tempfile = "3"
thiserror = "2"
tokio = "1"
Expand All @@ -139,8 +139,8 @@ tokio-rustls = "0.26"
native-tls = "0.2"
tokio-native-tls = "0.3"

tungstenite = "0.24"
tokio-tungstenite = "0.24"
tungstenite = "0.26"
tokio-tungstenite = "0.26"

[profile.release]
opt-level = 3
Expand Down
2 changes: 1 addition & 1 deletion benchmark/src/perf/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub async fn record_usage(cpu_usage_list: &mut Vec<f32>, cancel: CancellationTok
loop {
tokio::select! {
_ = tokio::time::sleep(DEFAULT_INTERVAL) => {
system.refresh_processes_specifics(sysinfo::ProcessesToUpdate::Some(&[pid]), true, ProcessRefreshKind::new().with_cpu());
system.refresh_processes_specifics(sysinfo::ProcessesToUpdate::Some(&[pid]), true, ProcessRefreshKind::nothing().with_cpu());
let cpu_usage = system
.process(pid)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion benchmark/src/perf/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn record_usage(mem_usage_list: &mut Vec<u64>, cancel: CancellationTok
loop {
tokio::select! {
_ = tokio::time::sleep(DEFAULT_INTERVAL) => {
system.refresh_processes_specifics(sysinfo::ProcessesToUpdate::Some(&[pid]), true, ProcessRefreshKind::new().with_memory());
system.refresh_processes_specifics(sysinfo::ProcessesToUpdate::Some(&[pid]), true, ProcessRefreshKind::nothing().with_memory());
let mem_usage = system
.process(pid)
.unwrap()
Expand Down
8 changes: 4 additions & 4 deletions scripts/volo-cli-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ escape_tmp_dir() {

init() {
export VOLO_DIR="$PWD"
echo_command cargo build -p volo-cli -j `nproc`
echo_command cargo build -p volo-cli
export VOLO_CLI="$PWD/target/debug/volo"
trap 'echo "Failed to run $LINENO: $BASH_COMMAND (exit code: $?)" && exit 1' ERR
}
Expand Down Expand Up @@ -68,7 +68,7 @@ thrift_test() {

echo_command "${VOLO_CLI}" init thrift-test "${idl_path}"
patch_cargo_toml
echo_command cargo build -j `nproc`
echo_command cargo build

escape_tmp_dir
}
Expand All @@ -81,7 +81,7 @@ grpc_test() {

echo_command "${VOLO_CLI}" init --includes "${idl_dir}" grpc-test "${idl_path}"
patch_cargo_toml
echo_command cargo build -j `nproc`
echo_command cargo build

escape_tmp_dir
}
Expand All @@ -91,7 +91,7 @@ http_test() {

echo_command "${VOLO_CLI}" http init http-test
patch_cargo_toml
echo_command cargo build -j `nproc`
echo_command cargo build

escape_tmp_dir
}
Expand Down
1 change: 1 addition & 0 deletions volo-build/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ mod outer {

use anyhow::bail;

#[allow(dead_code)]
pub fn get_repo_latest_commit_id(repo: &str, r#ref: &str) -> anyhow::Result<String> {
let commit_list = match Command::new("git")
.arg("ls-remote")
Expand Down
6 changes: 3 additions & 3 deletions volo-http/src/server/utils/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,14 +765,14 @@ mod websocket_tests {
let (mut ws_stream, _) =
run_ws_handler(test_helpers::to_service(handler), None, 25231).await;

let input = Message::Text("foobar".to_owned());
let input = Message::Text("foobar".into());
ws_stream.send(input.clone()).await.unwrap();
let output = ws_stream.next().await.unwrap().unwrap();
assert_eq!(input, output);

let input = Message::Ping("foobar".to_owned().into_bytes());
let input = Message::Ping("foobar".into());
ws_stream.send(input).await.unwrap();
let output = ws_stream.next().await.unwrap().unwrap();
assert_eq!(output, Message::Pong("foobar".to_owned().into_bytes()));
assert_eq!(output, Message::Pong("foobar".into()));
}
}
2 changes: 1 addition & 1 deletion volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-thrift"
version = "0.10.6"
version = "0.10.7"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
18 changes: 17 additions & 1 deletion volo-thrift/src/transport/multiplex/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,20 @@ pub async fn serve<Svc, Req, Resp, E, D>(
)
.await
{
stat_tracer.iter().for_each(|f| f(&cx));
if let ThriftException::Transport(te) = &e {
if volo::util::server_remote_error::is_remote_closed_error(te.io_error())
&& !volo::util::server_remote_error::remote_closed_error_log_enabled()
{
return;
}
}
// log it
error!(
"[VOLO] server send response error: {:?}, cx: \
{:?}, peer_addr: {:?}",
e, cx, peer_addr
);
stat_tracer.iter().for_each(|f| f(&cx));
return;
}
stat_tracer.iter().for_each(|f| f(&cx));
Expand All @@ -94,12 +101,21 @@ pub async fn serve<Svc, Req, Resp, E, D>(
.encode::<DummyMessage, ServerContext>(&mut cx, msg)
.await
{
stat_tracer.iter().for_each(|f| f(&cx));
if let ThriftException::Transport(te) = &e {
if volo::util::server_remote_error::is_remote_closed_error(te.io_error())
&& !volo::util::server_remote_error::remote_closed_error_log_enabled()
{
return;
}
}
// log it
error!(
"[VOLO] server send error error: {:?}, cx: {:?}, \
peer_addr: {:?}",
e, cx, peer_addr
);
return;
}
stat_tracer.iter().for_each(|f| f(&cx));
return;
Expand Down
8 changes: 8 additions & 0 deletions volo-thrift/src/transport/pingpong/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ pub async fn serve<Svc, Req, Resp, E, D, SP>(
.instrument(span_provider.on_encode(tracing_cx))
.await
{
if let ThriftException::Transport(te) = &e {
if volo::util::server_remote_error::is_remote_closed_error(te.io_error())
&& !volo::util::server_remote_error::remote_closed_error_log_enabled()
{
stat_tracer.iter().for_each(|f| f(&cx));
return Err(());
}
}
error!(
"[VOLO] server send response error: {:?}, cx: {:?}, \
peer_addr: {:?}",
Expand Down
2 changes: 1 addition & 1 deletion volo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo"
version = "0.10.4"
version = "0.10.5"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
14 changes: 7 additions & 7 deletions volo/src/loadbalance/consistent_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,12 @@ mod tests {

async fn consistent_hash_balance_tests() {
// TODO: Using standard deviation to evaluate load balancing is better?
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
let mut instances = vec![];
for _ in 0..50 {
let w = rng.gen_range(10..=100);
let sub_net = rng.gen_range(0..=255);
let port = rng.gen_range(1000..=65535);
let w = rng.random_range(10..=100);
let sub_net = rng.random_range(0..=255);
let port = rng.random_range(1000..=65535);
instances.push(new_instance(format!("172.17.0.{}:{}", sub_net, port), w));
instances.push(new_instance(format!("192.168.32.{}:{}", sub_net, port), w));
}
Expand Down Expand Up @@ -508,9 +508,9 @@ mod tests {
virtual_factor: 100,
weighted: true,
};
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
for i in 0..30 {
let w = rng.gen_range(10..=100);
let w = rng.random_range(10..=100);
instances.push(new_instance(format!("127.0.0.1:{}", i), w));
}
let discovery = StaticDiscover::new(instances.clone());
Expand All @@ -519,7 +519,7 @@ mod tests {
let virtual_nodes = lb.build_weighted_instances(instances.clone()).virtual_nodes;
let virtual_nodes: BTreeSet<_> = virtual_nodes.into_iter().collect();

let remove_index = rng.gen_range(0..instances.len());
let remove_index = rng.random_range(0..instances.len());
let _remove_instance = instances.remove(remove_index);
let new_virtual_nodes = lb.build_weighted_instances(instances.clone()).virtual_nodes;
for node in new_virtual_nodes {
Expand Down
12 changes: 6 additions & 6 deletions volo/src/loadbalance/random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use crate::{
};

#[inline]
fn pick_one(weight: isize, iter: &[Arc<Instance>]) -> Option<(usize, Arc<Instance>)> {
fn pick_one(weight: usize, iter: &[Arc<Instance>]) -> Option<(usize, Arc<Instance>)> {
if weight == 0 {
return None;
}
let mut weight = rand::thread_rng().gen_range(0..weight);
let mut weight = rand::rng().random_range(0..weight) as isize;
for (offset, instance) in iter.iter().enumerate() {
weight -= instance.weight as isize;
if weight <= 0 {
Expand All @@ -29,7 +29,7 @@ fn pick_one(weight: isize, iter: &[Arc<Instance>]) -> Option<(usize, Arc<Instanc
#[derive(Debug)]
pub struct InstancePicker {
shared_instances: Arc<WeightedInstances>,
sum_of_weights: isize,
sum_of_weights: usize,
owned_instances: OnceCell<Vec<Arc<Instance>>>,
last_pick: Option<(usize, Arc<Instance>)>,
}
Expand All @@ -54,7 +54,7 @@ impl Iterator for InstancePicker {
.get_or_init(|| shared_instances.to_vec());
let owned = self.owned_instances.get_mut().unwrap();

self.sum_of_weights -= last_pick.weight as isize;
self.sum_of_weights -= last_pick.weight as usize;
owned.remove(*last_offset);

(*last_offset, *last_pick) = pick_one(self.sum_of_weights, owned)?;
Expand All @@ -67,15 +67,15 @@ impl Iterator for InstancePicker {

#[derive(Debug, Clone)]
struct WeightedInstances {
sum_of_weights: isize,
sum_of_weights: usize,
instances: Vec<Arc<Instance>>,
}

impl From<Vec<Arc<Instance>>> for WeightedInstances {
fn from(instances: Vec<Arc<Instance>>) -> Self {
let sum_of_weights = instances
.iter()
.fold(0, |lhs, rhs| lhs + rhs.weight as isize);
.fold(0, |lhs, rhs| lhs + rhs.weight as usize);
Self {
instances,
sum_of_weights,
Expand Down
4 changes: 4 additions & 0 deletions volo/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
pub mod buf_reader;

// used internally.
#[doc(hidden)]
pub mod server_remote_error;

use std::{borrow::Borrow, fmt, sync::Arc};

#[derive(Debug, PartialEq, PartialOrd, Eq, Hash)]
Expand Down
16 changes: 16 additions & 0 deletions volo/src/util/server_remote_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! This module is used internally.
pub const ENABLE_REMOTE_CLOSED_ERROR_LOG_ENV_KEY: &str = "VOLO_ENABLE_REMOTE_CLOSED_ERROR_LOG";

pub static ENABLE_REMOTE_CLOSE_ERROR_LOG: std::sync::LazyLock<bool> =
std::sync::LazyLock::new(|| std::env::var(ENABLE_REMOTE_CLOSED_ERROR_LOG_ENV_KEY).is_ok());

pub fn remote_closed_error_log_enabled() -> bool {
*ENABLE_REMOTE_CLOSE_ERROR_LOG
}

pub fn is_remote_closed_error(err: &std::io::Error) -> bool {
err.kind() == std::io::ErrorKind::ConnectionReset
|| err.kind() == std::io::ErrorKind::ConnectionAborted
|| err.kind() == std::io::ErrorKind::BrokenPipe
}

0 comments on commit 0b5d6be

Please sign in to comment.