From f45d8102093c809245f87204c9753af0bcd82415 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 24 Oct 2025 20:16:16 +0800 Subject: [PATCH 01/11] refine --- .../servers/http/v1/http_query_handlers.rs | 60 ++++++++++++------- .../servers/http/v1/query/execute_state.rs | 37 ++++++------ .../src/servers/http/v1/query/http_query.rs | 28 ++++----- .../service/src/servers/http/v1/query/mod.rs | 1 + 4 files changed, 70 insertions(+), 56 deletions(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index b8d71a8bca746..daa776d872dcc 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -27,7 +27,7 @@ use databend_common_base::runtime::ParentMemStat; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_config::GlobalConfig; -use databend_common_expression::DataSchemaRef; +use databend_common_expression::DataSchema; use databend_common_management::WorkloadGroupResourceManager; use databend_common_metrics::http::metrics_incr_http_response_errors_count; use fastrace::prelude::*; @@ -60,6 +60,7 @@ use super::query::ExecuteStateKind; use super::query::HttpQuery; use super::query::HttpQueryRequest; use super::query::HttpQueryResponseInternal; +use super::query::ResponseState; use crate::clusters::ClusterDiscovery; use crate::servers::http::error::HttpErrorCode; use crate::servers::http::error::QueryError; @@ -123,7 +124,7 @@ pub struct QueryResponseField { } impl QueryResponseField { - pub fn from_schema(schema: DataSchemaRef) -> Vec { + pub fn from_schema(schema: &DataSchema) -> Vec { schema .fields() .iter() @@ -167,15 +168,31 @@ pub struct QueryResponse { impl QueryResponse { pub(crate) fn from_internal( id: String, - r: HttpQueryResponseInternal, + HttpQueryResponseInternal { + data, + session_id, + session, + node_id, + result_timeout_secs, + state: + ResponseState { + has_result_set, + schema, + running_time_ms, + progresses, + state, + affect, + error, + warnings, + }, + }: HttpQueryResponseInternal, is_final: bool, ) -> impl IntoResponse { - let state = r.state.clone(); let (data, next_uri) = if is_final { (Arc::new(BlocksSerializer::empty()), None) } else { - match state.state { - ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data { + match state { + ExecuteStateKind::Running | ExecuteStateKind::Starting => match data { None => ( Arc::new(BlocksSerializer::empty()), Some(make_state_uri(&id)), @@ -192,7 +209,7 @@ impl QueryResponse { Arc::new(BlocksSerializer::empty()), Some(make_final_uri(&id)), ), - ExecuteStateKind::Succeeded => match r.data { + ExecuteStateKind::Succeeded => match data { None => ( Arc::new(BlocksSerializer::empty()), Some(make_final_uri(&id)), @@ -208,38 +225,37 @@ impl QueryResponse { } }; - if let Some(err) = &r.state.error { + if let Some(err) = &error { metrics_incr_http_response_errors_count(err.name(), err.code()); } - let session_id = r.session_id.clone(); let stats = QueryStats { - progresses: state.progresses.clone(), - running_time_ms: state.running_time_ms, + progresses, + running_time_ms, }; let rows = data.num_rows(); Json(QueryResponse { data, - state: state.state, - schema: state.schema.clone(), + state, + schema: QueryResponseField::from_schema(&schema), session_id: Some(session_id), - node_id: r.node_id, - session: r.session, + node_id, + session, stats, - affect: state.affect, - warnings: r.state.warnings, + affect, + warnings, id: id.clone(), next_uri, stats_uri: Some(make_state_uri(&id)), final_uri: Some(make_final_uri(&id)), kill_uri: Some(make_kill_uri(&id)), - error: r.state.error.map(QueryError::from_error_code), - has_result_set: r.state.has_result_set, - result_timeout_secs: Some(r.result_timeout_secs), + error: error.map(QueryError::from_error_code), + has_result_set, + result_timeout_secs: Some(result_timeout_secs), }) - .with_header(HEADER_QUERY_ID, id.clone()) - .with_header(HEADER_QUERY_STATE, state.state.to_string()) + .with_header(HEADER_QUERY_ID, id) + .with_header(HEADER_QUERY_STATE, state.to_string()) .with_header(HEADER_QUERY_PAGE_ROWS, rows) } } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 9babedec3f03d..585d14ff990c3 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -38,13 +38,12 @@ use serde::Deserialize; use serde::Serialize; use ExecuteState::*; +use super::http_query::ResponseState; +use super::sized_spsc::SizedChannelSender; use crate::interpreters::interpreter_plan_sql; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterFactory; use crate::interpreters::InterpreterQueryLog; -use crate::servers::http::v1::http_query_handlers::QueryResponseField; -use crate::servers::http::v1::query::http_query::ResponseState; -use crate::servers::http::v1::query::sized_spsc::SizedChannelSender; use crate::sessions::AcquireQueueGuard; use crate::sessions::QueryAffect; use crate::sessions::QueryContext; @@ -129,14 +128,14 @@ pub struct ExecuteRunning { session: Arc, // mainly used to get progress for now pub(crate) ctx: Arc, - schema: Vec, + schema: DataSchemaRef, has_result_set: bool, #[allow(dead_code)] queue_guard: AcquireQueueGuard, } pub struct ExecuteStopped { - pub schema: Vec, + pub schema: DataSchemaRef, pub has_result_set: Option, pub stats: Progresses, pub affect: Option, @@ -192,6 +191,12 @@ impl ExecutorSessionState { impl Executor { pub fn get_response_state(&self) -> ResponseState { let (exe_state, err) = self.state.extract(); + let schema = match &self.state { + Starting(_) => Default::default(), + Running(r) => r.schema.clone(), + Stopped(f) => f.schema.clone(), + }; + ResponseState { running_time_ms: self.get_query_duration_ms(), progresses: self.get_progress(), @@ -199,17 +204,10 @@ impl Executor { error: err, warnings: self.get_warnings(), affect: self.get_affect(), - schema: self.get_schema(), + schema, has_result_set: self.has_result_set(), } } - pub fn get_schema(&self) -> Vec { - match &self.state { - Starting(_) => Default::default(), - Running(r) => r.schema.clone(), - Stopped(f) => f.schema.clone(), - } - } pub fn has_result_set(&self) -> Option { match &self.state { @@ -252,11 +250,10 @@ impl Executor { } pub fn update_schema(this: &Arc>, schema: DataSchemaRef) { - let mut guard = this.lock(); - match &mut guard.state { + match &mut this.lock().state { Starting(_) => {} - Running(r) => r.schema = QueryResponseField::from_schema(schema), - Stopped(f) => f.schema = QueryResponseField::from_schema(schema), + Running(r) => r.schema = schema, + Stopped(f) => f.schema = schema, } } @@ -311,7 +308,7 @@ impl Executor { } ExecuteStopped { stats: Default::default(), - schema: vec![], + schema: Default::default(), has_result_set: None, reason: reason.clone(), session_state: ExecutorSessionState::new(s.ctx.get_current_session()), @@ -388,9 +385,9 @@ impl ExecuteState { let is_dynamic_schema = plan.is_dynamic_schema(); let schema = if has_result_set && !is_dynamic_schema { // check has_result_set first for safety - QueryResponseField::from_schema(plan.schema()) + plan.schema() } else { - vec![] + Default::default() }; let running_state = ExecuteRunning { diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 47e3d68f9841e..d6a5cebeb76b1 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -33,6 +33,7 @@ use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; +use databend_common_expression::DataSchemaRef; use databend_common_expression::Scalar; use databend_common_metrics::http::metrics_incr_http_response_errors_count; use databend_common_settings::ScopeLevel; @@ -51,22 +52,21 @@ use serde::Deserializer; use serde::Serialize; use serde::Serializer; +use super::blocks_serializer::BlocksSerializer; +use super::execute_state::ExecuteStarting; +use super::execute_state::ExecuteStopped; use super::execute_state::ExecutionError; +use super::execute_state::ExecutorSessionState; +use super::execute_state::Progresses; use super::CloseReason; +use super::ExecuteState; +use super::ExecuteStateKind; +use super::Executor; use super::HttpQueryContext; +use super::PageManager; +use super::ResponseData; +use super::Wait; use crate::servers::http::error::QueryError; -use crate::servers::http::v1::http_query_handlers::QueryResponseField; -use crate::servers::http::v1::query::blocks_serializer::BlocksSerializer; -use crate::servers::http::v1::query::execute_state::ExecuteStarting; -use crate::servers::http::v1::query::execute_state::ExecuteStopped; -use crate::servers::http::v1::query::execute_state::ExecutorSessionState; -use crate::servers::http::v1::query::execute_state::Progresses; -use crate::servers::http::v1::query::ExecuteState; -use crate::servers::http::v1::query::ExecuteStateKind; -use crate::servers::http::v1::query::Executor; -use crate::servers::http::v1::query::PageManager; -use crate::servers::http::v1::query::ResponseData; -use crate::servers::http::v1::query::Wait; use crate::servers::http::v1::ClientSessionManager; use crate::servers::http::v1::HttpQueryManager; use crate::servers::http::v1::QueryResponse; @@ -488,7 +488,7 @@ pub struct StageAttachmentConf { #[derive(Debug, Clone)] pub struct ResponseState { pub has_result_set: Option, - pub schema: Vec, + pub schema: DataSchemaRef, pub running_time_ms: i64, pub progresses: Progresses, pub state: ExecuteStateKind, @@ -844,7 +844,7 @@ impl HttpQuery { .ok(); let state = ExecuteStopped { stats: Progresses::default(), - schema: vec![], + schema: Default::default(), has_result_set: None, reason: Err(e.clone()), session_state: ExecutorSessionState::new( diff --git a/src/query/service/src/servers/http/v1/query/mod.rs b/src/query/service/src/servers/http/v1/query/mod.rs index 559b7e67b4914..358d8ad20ddb0 100644 --- a/src/query/service/src/servers/http/v1/query/mod.rs +++ b/src/query/service/src/servers/http/v1/query/mod.rs @@ -33,6 +33,7 @@ pub use http_query::HttpQueryRequest; pub use http_query::HttpQueryResponseInternal; pub use http_query::HttpSessionConf; pub use http_query::HttpSessionStateInternal; +pub use http_query::ResponseState; pub use http_query_context::HttpQueryContext; pub(crate) use http_query_manager::CloseReason; pub use http_query_manager::HttpQueryManager; From 0d4279698abead0337fcc04243585fa86e9bc13a Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 24 Oct 2025 22:09:05 +0800 Subject: [PATCH 02/11] BodyFormat --- .../servers/http/v1/http_query_handlers.rs | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index daa776d872dcc..82846260e306e 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -31,6 +31,7 @@ use databend_common_expression::DataSchema; use databend_common_management::WorkloadGroupResourceManager; use databend_common_metrics::http::metrics_incr_http_response_errors_count; use fastrace::prelude::*; +use headers::Header; use http::HeaderMap; use http::HeaderValue; use http::StatusCode; @@ -499,13 +500,16 @@ async fn query_page_handler( #[poem::handler] #[async_backtrace::framed] -#[fastrace::trace] pub(crate) async fn query_handler( ctx: &HttpQueryContext, Json(mut req): Json, ) -> PoemResult { let session = ctx.session.clone(); + // poem::http::Request + + // poem::web::TypedHeader; + let query_handle = async { let agent_info = ctx .user_agent @@ -940,3 +944,36 @@ pub(crate) fn get_http_tracing_span( Span::root(name, SpanContext::new(trace_id, SpanId(rand::random()))) .with_properties(|| ctx.to_fastrace_properties()) } + +enum BodyFormat { + Json, + Arrow, +} + +impl Header for BodyFormat { + fn name() -> &'static http::HeaderName { + &http::header::ACCEPT + } + + fn decode<'i, I>(values: &mut I) -> Result + where + Self: Sized, + I: Iterator, + { + if let Some(v) = values.next() { + match v.to_str() { + Ok("application/vnd.apache.arrow.file") => return Ok(BodyFormat::Arrow), + Err(_) => return Err(headers::Error::invalid()), + _ => {} + }; + } + Ok(BodyFormat::Json) + } + + fn encode>(&self, values: &mut E) { + values.extend([match self { + BodyFormat::Json => HeaderValue::from_static("application/json"), + BodyFormat::Arrow => HeaderValue::from_static("application/vnd.apache.arrow.file"), + }]); + } +} From 343a4a2197e236ced2fcbcd3cfade5fe39a47a17 Mon Sep 17 00:00:00 2001 From: coldWater Date: Sat, 25 Oct 2025 14:35:55 +0800 Subject: [PATCH 03/11] from_internal --- .../servers/http/v1/http_query_handlers.rs | 107 +++++++++++++----- .../http/v1/query/blocks_serializer.rs | 17 +++ 2 files changed, 95 insertions(+), 29 deletions(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index 82846260e306e..bedadc6efc3a8 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -27,6 +27,7 @@ use databend_common_base::runtime::ParentMemStat; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_config::GlobalConfig; +use databend_common_exception::ErrorCode; use databend_common_expression::DataSchema; use databend_common_management::WorkloadGroupResourceManager; use databend_common_metrics::http::metrics_incr_http_response_errors_count; @@ -47,6 +48,7 @@ use poem::post; use poem::put; use poem::web::Json; use poem::web::Path; +use poem::web::TypedHeader; use poem::EndpointExt; use poem::IntoResponse; use poem::Request; @@ -167,7 +169,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub(crate) fn from_internal( + fn from_internal( id: String, HttpQueryResponseInternal { data, @@ -188,7 +190,8 @@ impl QueryResponse { }, }: HttpQueryResponseInternal, is_final: bool, - ) -> impl IntoResponse { + body_format: BodyFormat, + ) -> Response { let (data, next_uri) = if is_final { (Arc::new(BlocksSerializer::empty()), None) } else { @@ -230,23 +233,21 @@ impl QueryResponse { metrics_incr_http_response_errors_count(err.name(), err.code()); } - let stats = QueryStats { - progresses, - running_time_ms, - }; let rows = data.num_rows(); - - Json(QueryResponse { - data, - state, - schema: QueryResponseField::from_schema(&schema), + let mut res = QueryResponse { + id: id.clone(), session_id: Some(session_id), node_id, + state, session, - stats, + stats: QueryStats { + progresses, + running_time_ms, + }, + schema: vec![], + data: Arc::new(BlocksSerializer::empty()), affect, warnings, - id: id.clone(), next_uri, stats_uri: Some(make_state_uri(&id)), final_uri: Some(make_final_uri(&id)), @@ -254,10 +255,40 @@ impl QueryResponse { error: error.map(QueryError::from_error_code), has_result_set, result_timeout_secs: Some(result_timeout_secs), - }) - .with_header(HEADER_QUERY_ID, id) - .with_header(HEADER_QUERY_STATE, state.to_string()) - .with_header(HEADER_QUERY_PAGE_ROWS, rows) + }; + + match body_format { + BodyFormat::Json => { + res.data = data; + res.schema = QueryResponseField::from_schema(&schema); + Json(res) + .with_header(HEADER_QUERY_ID, id) + .with_header(HEADER_QUERY_STATE, state.to_string()) + .with_header(HEADER_QUERY_PAGE_ROWS, rows) + .into_response() + } + BodyFormat::Arrow => { + let buf: Result<_, ErrorCode> = try { + const META_KEY: &str = "response_header"; + let json_res = serde_json::to_string(&res)?; + let mut schema = arrow_schema::Schema::from(&*schema); + schema.metadata.insert(META_KEY.to_string(), json_res); + data.to_arrow_ipc(schema)? + }; + + match buf { + Ok(buf) => Response::builder() + .header(HEADER_QUERY_ID, id) + .header(HEADER_QUERY_STATE, state.to_string()) + .header(HEADER_QUERY_PAGE_ROWS, rows) + .content_type(body_format.content_type()) + .body(buf), + Err(err) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(err.to_string()), + } + } + } } } @@ -324,6 +355,7 @@ impl StateResponse { #[poem::handler] async fn query_final_handler( ctx: &HttpQueryContext, + TypedHeader(body_format): TypedHeader, Path(query_id): Path, ) -> PoemResult { ctx.check_node_id(&query_id)?; @@ -352,7 +384,12 @@ async fn query_final_handler( // it is safe to set these 2 fields to None, because client now check for null/None first. response.session = None; response.state.affect = None; - Ok(QueryResponse::from_internal(query_id, response, true)) + Ok(QueryResponse::from_internal( + query_id, + response, + true, + body_format, + )) } None => Err(query_id_not_found(&query_id, &ctx.node_id)), } @@ -425,6 +462,7 @@ async fn query_state_handler( #[poem::handler] async fn query_page_handler( ctx: &HttpQueryContext, + TypedHeader(body_format): TypedHeader, Path((query_id, page_no)): Path<(String, usize)>, ) -> PoemResult { ctx.check_node_id(&query_id)?; @@ -476,7 +514,12 @@ async fn query_page_handler( query .update_expire_time(false, resp.is_data_drained()) .await; - Ok(QueryResponse::from_internal(query_id, resp, false)) + Ok(QueryResponse::from_internal( + query_id, + resp, + false, + body_format, + )) } } }; @@ -502,14 +545,10 @@ async fn query_page_handler( #[async_backtrace::framed] pub(crate) async fn query_handler( ctx: &HttpQueryContext, + TypedHeader(body_format): TypedHeader, Json(mut req): Json, ) -> PoemResult { let session = ctx.session.clone(); - - // poem::http::Request - - // poem::web::TypedHeader; - let query_handle = async { let agent_info = ctx .user_agent @@ -573,7 +612,10 @@ pub(crate) async fn query_handler( query .update_expire_time(false, resp.is_data_drained()) .await; - Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response()) + Ok( + QueryResponse::from_internal(query.id.to_string(), resp, false, body_format) + .into_response(), + ) } } }; @@ -945,6 +987,7 @@ pub(crate) fn get_http_tracing_span( .with_properties(|| ctx.to_fastrace_properties()) } +#[derive(Debug, Clone, Copy)] enum BodyFormat { Json, Arrow, @@ -971,9 +1014,15 @@ impl Header for BodyFormat { } fn encode>(&self, values: &mut E) { - values.extend([match self { - BodyFormat::Json => HeaderValue::from_static("application/json"), - BodyFormat::Arrow => HeaderValue::from_static("application/vnd.apache.arrow.file"), - }]); + values.extend([HeaderValue::from_static(self.content_type())]); + } +} + +impl BodyFormat { + pub fn content_type(&self) -> &'static str { + match self { + BodyFormat::Json => "application/json", + BodyFormat::Arrow => "application/vnd.apache.arrow.stream", + } } } diff --git a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs index b9b51a336ce15..bcecaab23e26d 100644 --- a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs +++ b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs @@ -15,6 +15,8 @@ use std::cell::RefCell; use std::ops::DerefMut; +use arrow_ipc::writer::StreamWriter; +use databend_common_exception::Result; use databend_common_expression::types::date::date_to_string; use databend_common_expression::types::interval::interval_to_string; use databend_common_expression::types::timestamp::timestamp_to_string; @@ -100,6 +102,21 @@ impl BlocksSerializer { pub fn num_rows(&self) -> usize { self.columns.iter().map(|(_, num_rows)| *num_rows).sum() } + + pub fn to_arrow_ipc(&self, schema: arrow_schema::Schema) -> Result> { + let mut buf = Vec::new(); + let mut writer = StreamWriter::try_new(&mut buf, &schema)?; + + let mut data_schema = None; + for (block, _) in &self.columns { + let block = DataBlock::new_from_columns(block.clone()); + let data_schema = data_schema.get_or_insert_with(|| block.infer_schema()); + let batch = block.to_record_batch_with_dataschema(data_schema)?; + writer.write(&batch)?; + } + writer.finish()?; + Ok(buf) + } } impl serde::Serialize for BlocksSerializer { From 4e80bf379996a060200df80cb62a2dfe1ccda84f Mon Sep 17 00:00:00 2001 From: coldWater Date: Sat, 25 Oct 2025 16:11:05 +0800 Subject: [PATCH 04/11] test --- .../servers/http/v1/http_query_handlers.rs | 4 +- .../http/v1/query/blocks_serializer.rs | 7 ++- tests/nox/noxfile.py | 2 +- .../09_http_handler/test_09_0015_arrow_ipc.py | 59 +++++++++++++++++++ 4 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index bedadc6efc3a8..ef7665527eaa4 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -1005,7 +1005,7 @@ impl Header for BodyFormat { { if let Some(v) = values.next() { match v.to_str() { - Ok("application/vnd.apache.arrow.file") => return Ok(BodyFormat::Arrow), + Ok(s) if s == BodyFormat::Arrow.content_type() => return Ok(BodyFormat::Arrow), Err(_) => return Err(headers::Error::invalid()), _ => {} }; @@ -1019,7 +1019,7 @@ impl Header for BodyFormat { } impl BodyFormat { - pub fn content_type(&self) -> &'static str { + pub const fn content_type(&self) -> &'static str { match self { BodyFormat::Json => "application/json", BodyFormat::Arrow => "application/vnd.apache.arrow.stream", diff --git a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs index bcecaab23e26d..9d9fcbfa3f1e9 100644 --- a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs +++ b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs @@ -15,7 +15,10 @@ use std::cell::RefCell; use std::ops::DerefMut; +use arrow_ipc::writer::IpcWriteOptions; use arrow_ipc::writer::StreamWriter; +use arrow_ipc::CompressionType; +use arrow_ipc::MetadataVersion; use databend_common_exception::Result; use databend_common_expression::types::date::date_to_string; use databend_common_expression::types::interval::interval_to_string; @@ -105,7 +108,9 @@ impl BlocksSerializer { pub fn to_arrow_ipc(&self, schema: arrow_schema::Schema) -> Result> { let mut buf = Vec::new(); - let mut writer = StreamWriter::try_new(&mut buf, &schema)?; + let opts = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)? + .try_with_compression(Some(CompressionType::LZ4_FRAME))?; + let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, opts)?; let mut data_schema = None; for (block, _) in &self.columns { diff --git a/tests/nox/noxfile.py b/tests/nox/noxfile.py index 82afa4d934ef6..9396aead8c87e 100644 --- a/tests/nox/noxfile.py +++ b/tests/nox/noxfile.py @@ -61,7 +61,7 @@ def run_jdbc_test(session, driver_version, main_version): @nox.session def test_suites(session): - session.install("pytest", "requests", "pytest-asyncio") + session.install("pytest", "requests", "pytest-asyncio", "pyarrow") # Usage: nox -s test_suites -- suites/1_stateful/09_http_handler/test_09_0007_session.py::test_session session.run("pytest", *session.posargs) diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py new file mode 100644 index 0000000000000..9f12273b266d4 --- /dev/null +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py @@ -0,0 +1,59 @@ +import json + +import requests +import pyarrow.ipc as ipc + +auth = ("root", "") + + +def do_query(query, session, pagination): + url = f"http://localhost:8000/v1/query" + payload = { + "sql": query, + } + if session: + payload["session"] = session + if pagination: + payload["pagination"] = pagination + headers = { + "Accept": "application/vnd.apache.arrow.stream", + "Content-Type": "application/json", + } + + return requests.post(url, headers=headers, json=payload, auth=auth) + + +def test_arrow_ipc(): + pagination = { + "max_rows_per_page": 20, + } + resp = do_query("select * from numbers(97)", session=None, pagination=pagination) + + # print("content", len(resp.content)) + # IpcWriteOptions(alignment 64 compression None) content: 1672 + # IpcWriteOptions(alignment 8 compression lz4) content: 1448 + + rows = 0 + with ipc.open_stream(resp.content) as reader: + header = json.loads(reader.schema.metadata[b"response_header"]) + assert header["error"] == None + for batch in reader: + rows += batch.num_rows + + for _ in range(30): + if header.get("next_uri") == None: + break + + uri = f"http://localhost:8000/{header['next_uri']}" + resp = requests.get( + uri, auth=auth, headers={"Accept": "application/vnd.apache.arrow.stream"} + ) + with ipc.open_stream(resp.content) as reader: + header = json.loads(reader.schema.metadata[b"response_header"]) + assert header["error"] == None + for batch in reader: + rows += batch.num_rows + if rows < 96: + assert batch.num_rows == 20 + + assert rows == 97 From 4c4e65fd753d02f79a4a24a3742171791fa5ff05 Mon Sep 17 00:00:00 2001 From: coldWater Date: Sat, 25 Oct 2025 16:39:40 +0800 Subject: [PATCH 05/11] fix --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index b1c3b2fb37324..5436309cd03f9 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ lint: # Cargo.toml file formatter(make setup to install) taplo fmt # Python file formatter(make setup to install) - ruff format tests/ + # ruff format tests/ # Bash file formatter(make setup to install) shfmt -l -w scripts/* From 3158e08770b928222e16dd52acc5459bcbef6d69 Mon Sep 17 00:00:00 2001 From: coldWater Date: Sat, 25 Oct 2025 23:02:46 +0800 Subject: [PATCH 06/11] fix --- .../servers/http/v1/http_query_handlers.rs | 23 +++++++++++++------ .../http/v1/query/blocks_serializer.rs | 12 +++++++--- src/query/service/src/test_kits/fixture.rs | 1 + .../09_http_handler/test_09_0015_arrow_ipc.py | 2 -- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index ef7665527eaa4..330062236077c 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -33,6 +33,7 @@ use databend_common_management::WorkloadGroupResourceManager; use databend_common_metrics::http::metrics_incr_http_response_errors_count; use fastrace::prelude::*; use headers::Header; +use headers::HeaderMapExt; use http::HeaderMap; use http::HeaderValue; use http::StatusCode; @@ -48,10 +49,11 @@ use poem::post; use poem::put; use poem::web::Json; use poem::web::Path; -use poem::web::TypedHeader; use poem::EndpointExt; +use poem::FromRequest; use poem::IntoResponse; use poem::Request; +use poem::RequestBody; use poem::Response; use poem::Route; use serde::Deserialize; @@ -271,9 +273,7 @@ impl QueryResponse { let buf: Result<_, ErrorCode> = try { const META_KEY: &str = "response_header"; let json_res = serde_json::to_string(&res)?; - let mut schema = arrow_schema::Schema::from(&*schema); - schema.metadata.insert(META_KEY.to_string(), json_res); - data.to_arrow_ipc(schema)? + data.to_arrow_ipc(&schema, vec![(META_KEY.to_string(), json_res)])? }; match buf { @@ -355,7 +355,7 @@ impl StateResponse { #[poem::handler] async fn query_final_handler( ctx: &HttpQueryContext, - TypedHeader(body_format): TypedHeader, + body_format: BodyFormat, Path(query_id): Path, ) -> PoemResult { ctx.check_node_id(&query_id)?; @@ -462,7 +462,7 @@ async fn query_state_handler( #[poem::handler] async fn query_page_handler( ctx: &HttpQueryContext, - TypedHeader(body_format): TypedHeader, + body_format: BodyFormat, Path((query_id, page_no)): Path<(String, usize)>, ) -> PoemResult { ctx.check_node_id(&query_id)?; @@ -545,7 +545,7 @@ async fn query_page_handler( #[async_backtrace::framed] pub(crate) async fn query_handler( ctx: &HttpQueryContext, - TypedHeader(body_format): TypedHeader, + body_format: BodyFormat, Json(mut req): Json, ) -> PoemResult { let session = ctx.session.clone(); @@ -1026,3 +1026,12 @@ impl BodyFormat { } } } + +impl<'a> FromRequest<'a> for BodyFormat { + async fn from_request(req: &'a Request, _body: &mut RequestBody) -> Result { + Ok(req + .headers() + .typed_get::() + .unwrap_or(BodyFormat::Json)) + } +} diff --git a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs index 9d9fcbfa3f1e9..2052feabd5727 100644 --- a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs +++ b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs @@ -26,6 +26,7 @@ use databend_common_expression::types::timestamp::timestamp_to_string; use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; use databend_common_formats::field_encoder::FieldEncoderValues; use databend_common_io::ewkb_to_geo; use databend_common_io::geo_to_ewkb; @@ -106,16 +107,21 @@ impl BlocksSerializer { self.columns.iter().map(|(_, num_rows)| *num_rows).sum() } - pub fn to_arrow_ipc(&self, schema: arrow_schema::Schema) -> Result> { + pub fn to_arrow_ipc( + &self, + data_schema: &DataSchema, + ext_meta: Vec<(String, String)>, + ) -> Result> { + let mut schema = arrow_schema::Schema::from(data_schema); + schema.metadata.extend(ext_meta); + let mut buf = Vec::new(); let opts = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)? .try_with_compression(Some(CompressionType::LZ4_FRAME))?; let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, opts)?; - let mut data_schema = None; for (block, _) in &self.columns { let block = DataBlock::new_from_columns(block.clone()); - let data_schema = data_schema.get_or_insert_with(|| block.infer_schema()); let batch = block.to_record_batch_with_dataschema(data_schema)?; writer.write(&batch)?; } diff --git a/src/query/service/src/test_kits/fixture.rs b/src/query/service/src/test_kits/fixture.rs index 9f838a389545d..05c745e971dd9 100644 --- a/src/query/service/src/test_kits/fixture.rs +++ b/src/query/service/src/test_kits/fixture.rs @@ -117,6 +117,7 @@ impl Drop for TestGuard { fn drop(&mut self) { #[cfg(debug_assertions)] { + log::set_max_level(log::LevelFilter::Off); databend_common_base::runtime::drop_guard(move || { databend_common_base::base::GlobalInstance::drop_testing(&self._thread_name); }) diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py index 9f12273b266d4..fcdc5c4b1685f 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py @@ -53,7 +53,5 @@ def test_arrow_ipc(): assert header["error"] == None for batch in reader: rows += batch.num_rows - if rows < 96: - assert batch.num_rows == 20 assert rows == 97 From 7e5a444afecd5d55fb35b2dd979fbd9e51208a90 Mon Sep 17 00:00:00 2001 From: coldWater Date: Mon, 27 Oct 2025 12:18:27 +0800 Subject: [PATCH 07/11] test_arrow_ipc_no_data --- .../09_http_handler/test_09_0015_arrow_ipc.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py index fcdc5c4b1685f..586788853f98c 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py @@ -55,3 +55,39 @@ def test_arrow_ipc(): rows += batch.num_rows assert rows == 97 + + +def test_arrow_ipc_no_data(): + pagination = { + "max_rows_per_page": 20, + } + resp = do_query( + "drop table if exists not_exists", session=None, pagination=pagination + ) + + # print("content", len(resp.content)) + # IpcWriteOptions(alignment 64 compression None) content: 1672 + # IpcWriteOptions(alignment 8 compression lz4) content: 1448 + + rows = 0 + with ipc.open_stream(resp.content) as reader: + header = json.loads(reader.schema.metadata[b"response_header"]) + assert header["error"] == None + for batch in reader: + rows += batch.num_rows + + for _ in range(30): + if header.get("next_uri") == None: + break + + uri = f"http://localhost:8000/{header['next_uri']}" + resp = requests.get( + uri, auth=auth, headers={"Accept": "application/vnd.apache.arrow.stream"} + ) + with ipc.open_stream(resp.content) as reader: + header = json.loads(reader.schema.metadata[b"response_header"]) + assert header["error"] == None + for batch in reader: + rows += batch.num_rows + + assert rows == 0 From ccd38fd1e4596454d1f8b785797d84ad5cd6f1b1 Mon Sep 17 00:00:00 2001 From: coldWater Date: Mon, 27 Oct 2025 12:42:54 +0800 Subject: [PATCH 08/11] json if no data --- .../servers/http/v1/http_query_handlers.rs | 20 +++++------ .../09_http_handler/test_09_0015_arrow_ipc.py | 36 ------------------- 2 files changed, 10 insertions(+), 46 deletions(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index 330062236077c..f5deeba8de328 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -260,16 +260,7 @@ impl QueryResponse { }; match body_format { - BodyFormat::Json => { - res.data = data; - res.schema = QueryResponseField::from_schema(&schema); - Json(res) - .with_header(HEADER_QUERY_ID, id) - .with_header(HEADER_QUERY_STATE, state.to_string()) - .with_header(HEADER_QUERY_PAGE_ROWS, rows) - .into_response() - } - BodyFormat::Arrow => { + BodyFormat::Arrow if !schema.fields.is_empty() && !data.is_empty() => { let buf: Result<_, ErrorCode> = try { const META_KEY: &str = "response_header"; let json_res = serde_json::to_string(&res)?; @@ -288,6 +279,15 @@ impl QueryResponse { .body(err.to_string()), } } + _ => { + res.data = data; + res.schema = QueryResponseField::from_schema(&schema); + Json(res) + .with_header(HEADER_QUERY_ID, id) + .with_header(HEADER_QUERY_STATE, state.to_string()) + .with_header(HEADER_QUERY_PAGE_ROWS, rows) + .into_response() + } } } } diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py index 586788853f98c..fcdc5c4b1685f 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py @@ -55,39 +55,3 @@ def test_arrow_ipc(): rows += batch.num_rows assert rows == 97 - - -def test_arrow_ipc_no_data(): - pagination = { - "max_rows_per_page": 20, - } - resp = do_query( - "drop table if exists not_exists", session=None, pagination=pagination - ) - - # print("content", len(resp.content)) - # IpcWriteOptions(alignment 64 compression None) content: 1672 - # IpcWriteOptions(alignment 8 compression lz4) content: 1448 - - rows = 0 - with ipc.open_stream(resp.content) as reader: - header = json.loads(reader.schema.metadata[b"response_header"]) - assert header["error"] == None - for batch in reader: - rows += batch.num_rows - - for _ in range(30): - if header.get("next_uri") == None: - break - - uri = f"http://localhost:8000/{header['next_uri']}" - resp = requests.get( - uri, auth=auth, headers={"Accept": "application/vnd.apache.arrow.stream"} - ) - with ipc.open_stream(resp.content) as reader: - header = json.loads(reader.schema.metadata[b"response_header"]) - assert header["error"] == None - for batch in reader: - rows += batch.num_rows - - assert rows == 0 From f0b0e5689399198f3a8a1922e1dde23ab6bb957e Mon Sep 17 00:00:00 2001 From: coldWater Date: Mon, 27 Oct 2025 14:12:24 +0800 Subject: [PATCH 09/11] fix --- .../09_http_handler/test_09_0015_arrow_ipc.py | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py index fcdc5c4b1685f..7a0c9f12cbd72 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py @@ -23,6 +23,19 @@ def do_query(query, session, pagination): return requests.post(url, headers=headers, json=payload, auth=auth) +def read_response(resp): + if resp.headers["Content-Type"] != "application/vnd.apache.arrow.stream": + header = resp.json() + assert len(header["data"]) == 0 + assert header["error"] == None + return (header, None) + else: + reader = ipc.open_stream(resp.content) + header = json.loads(reader.schema.metadata[b"response_header"]) + assert header["error"] == None + return (header, reader) + + def test_arrow_ipc(): pagination = { "max_rows_per_page": 20, @@ -33,12 +46,12 @@ def test_arrow_ipc(): # IpcWriteOptions(alignment 64 compression None) content: 1672 # IpcWriteOptions(alignment 8 compression lz4) content: 1448 + (header, reader) = read_response(resp) rows = 0 - with ipc.open_stream(resp.content) as reader: - header = json.loads(reader.schema.metadata[b"response_header"]) - assert header["error"] == None - for batch in reader: - rows += batch.num_rows + if reader: + with reader: + for batch in reader: + rows += batch.num_rows for _ in range(30): if header.get("next_uri") == None: @@ -48,10 +61,10 @@ def test_arrow_ipc(): resp = requests.get( uri, auth=auth, headers={"Accept": "application/vnd.apache.arrow.stream"} ) - with ipc.open_stream(resp.content) as reader: - header = json.loads(reader.schema.metadata[b"response_header"]) - assert header["error"] == None - for batch in reader: - rows += batch.num_rows + (header, reader) = read_response(resp) + if reader: + with reader: + for batch in reader: + rows += batch.num_rows assert rows == 97 From bd09287b9ea7ea30397ec3a92219ca3d4d4a8cac Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 28 Oct 2025 08:59:18 +0800 Subject: [PATCH 10/11] fix --- .../09_http_handler/test_09_0015_arrow_ipc.py | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py index 7a0c9f12cbd72..96e4ad40dfb8a 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py @@ -23,19 +23,6 @@ def do_query(query, session, pagination): return requests.post(url, headers=headers, json=payload, auth=auth) -def read_response(resp): - if resp.headers["Content-Type"] != "application/vnd.apache.arrow.stream": - header = resp.json() - assert len(header["data"]) == 0 - assert header["error"] == None - return (header, None) - else: - reader = ipc.open_stream(resp.content) - header = json.loads(reader.schema.metadata[b"response_header"]) - assert header["error"] == None - return (header, reader) - - def test_arrow_ipc(): pagination = { "max_rows_per_page": 20, @@ -46,12 +33,28 @@ def test_arrow_ipc(): # IpcWriteOptions(alignment 64 compression None) content: 1672 # IpcWriteOptions(alignment 8 compression lz4) content: 1448 + def read_response(resp): + if resp.headers["Content-Type"] != "application/vnd.apache.arrow.stream": + header = resp.json() + assert len(header["data"]) == 0 + assert header["error"] == None + return (header, None) + else: + reader = ipc.open_stream(resp.content) + header = json.loads(reader.schema.metadata[b"response_header"]) + assert header["error"] == None + return (header, reader) + + rows = [] + + def drain_reader(reader): + if reader: + with reader: + for batch in reader: + rows.extend([x.as_py() for x in batch["number"]]) + (header, reader) = read_response(resp) - rows = 0 - if reader: - with reader: - for batch in reader: - rows += batch.num_rows + drain_reader(reader) for _ in range(30): if header.get("next_uri") == None: @@ -62,9 +65,6 @@ def test_arrow_ipc(): uri, auth=auth, headers={"Accept": "application/vnd.apache.arrow.stream"} ) (header, reader) = read_response(resp) - if reader: - with reader: - for batch in reader: - rows += batch.num_rows + drain_reader(reader) - assert rows == 97 + assert rows == [x for x in range(97)] From b276136d2252ac1303b77d026a8c5e146206b05e Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 28 Oct 2025 14:53:30 +0800 Subject: [PATCH 11/11] fix --- .../suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py index 96e4ad40dfb8a..bb3535ee5308c 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0015_arrow_ipc.py @@ -67,4 +67,5 @@ def drain_reader(reader): (header, reader) = read_response(resp) drain_reader(reader) + rows.sort() assert rows == [x for x in range(97)]