Skip to content

Commit 343a4a2

Browse files
committed
from_internal
1 parent 0d42796 commit 343a4a2

File tree

2 files changed

+95
-29
lines changed

2 files changed

+95
-29
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 78 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_base::runtime::ParentMemStat;
2727
use databend_common_base::runtime::ThreadTracker;
2828
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2929
use databend_common_config::GlobalConfig;
30+
use databend_common_exception::ErrorCode;
3031
use databend_common_expression::DataSchema;
3132
use databend_common_management::WorkloadGroupResourceManager;
3233
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
@@ -47,6 +48,7 @@ use poem::post;
4748
use poem::put;
4849
use poem::web::Json;
4950
use poem::web::Path;
51+
use poem::web::TypedHeader;
5052
use poem::EndpointExt;
5153
use poem::IntoResponse;
5254
use poem::Request;
@@ -167,7 +169,7 @@ pub struct QueryResponse {
167169
}
168170

169171
impl QueryResponse {
170-
pub(crate) fn from_internal(
172+
fn from_internal(
171173
id: String,
172174
HttpQueryResponseInternal {
173175
data,
@@ -188,7 +190,8 @@ impl QueryResponse {
188190
},
189191
}: HttpQueryResponseInternal,
190192
is_final: bool,
191-
) -> impl IntoResponse {
193+
body_format: BodyFormat,
194+
) -> Response {
192195
let (data, next_uri) = if is_final {
193196
(Arc::new(BlocksSerializer::empty()), None)
194197
} else {
@@ -230,34 +233,62 @@ impl QueryResponse {
230233
metrics_incr_http_response_errors_count(err.name(), err.code());
231234
}
232235

233-
let stats = QueryStats {
234-
progresses,
235-
running_time_ms,
236-
};
237236
let rows = data.num_rows();
238-
239-
Json(QueryResponse {
240-
data,
241-
state,
242-
schema: QueryResponseField::from_schema(&schema),
237+
let mut res = QueryResponse {
238+
id: id.clone(),
243239
session_id: Some(session_id),
244240
node_id,
241+
state,
245242
session,
246-
stats,
243+
stats: QueryStats {
244+
progresses,
245+
running_time_ms,
246+
},
247+
schema: vec![],
248+
data: Arc::new(BlocksSerializer::empty()),
247249
affect,
248250
warnings,
249-
id: id.clone(),
250251
next_uri,
251252
stats_uri: Some(make_state_uri(&id)),
252253
final_uri: Some(make_final_uri(&id)),
253254
kill_uri: Some(make_kill_uri(&id)),
254255
error: error.map(QueryError::from_error_code),
255256
has_result_set,
256257
result_timeout_secs: Some(result_timeout_secs),
257-
})
258-
.with_header(HEADER_QUERY_ID, id)
259-
.with_header(HEADER_QUERY_STATE, state.to_string())
260-
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
258+
};
259+
260+
match body_format {
261+
BodyFormat::Json => {
262+
res.data = data;
263+
res.schema = QueryResponseField::from_schema(&schema);
264+
Json(res)
265+
.with_header(HEADER_QUERY_ID, id)
266+
.with_header(HEADER_QUERY_STATE, state.to_string())
267+
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
268+
.into_response()
269+
}
270+
BodyFormat::Arrow => {
271+
let buf: Result<_, ErrorCode> = try {
272+
const META_KEY: &str = "response_header";
273+
let json_res = serde_json::to_string(&res)?;
274+
let mut schema = arrow_schema::Schema::from(&*schema);
275+
schema.metadata.insert(META_KEY.to_string(), json_res);
276+
data.to_arrow_ipc(schema)?
277+
};
278+
279+
match buf {
280+
Ok(buf) => Response::builder()
281+
.header(HEADER_QUERY_ID, id)
282+
.header(HEADER_QUERY_STATE, state.to_string())
283+
.header(HEADER_QUERY_PAGE_ROWS, rows)
284+
.content_type(body_format.content_type())
285+
.body(buf),
286+
Err(err) => Response::builder()
287+
.status(StatusCode::INTERNAL_SERVER_ERROR)
288+
.body(err.to_string()),
289+
}
290+
}
291+
}
261292
}
262293
}
263294

@@ -324,6 +355,7 @@ impl StateResponse {
324355
#[poem::handler]
325356
async fn query_final_handler(
326357
ctx: &HttpQueryContext,
358+
TypedHeader(body_format): TypedHeader<BodyFormat>,
327359
Path(query_id): Path<String>,
328360
) -> PoemResult<impl IntoResponse> {
329361
ctx.check_node_id(&query_id)?;
@@ -352,7 +384,12 @@ async fn query_final_handler(
352384
// it is safe to set these 2 fields to None, because client now check for null/None first.
353385
response.session = None;
354386
response.state.affect = None;
355-
Ok(QueryResponse::from_internal(query_id, response, true))
387+
Ok(QueryResponse::from_internal(
388+
query_id,
389+
response,
390+
true,
391+
body_format,
392+
))
356393
}
357394
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
358395
}
@@ -425,6 +462,7 @@ async fn query_state_handler(
425462
#[poem::handler]
426463
async fn query_page_handler(
427464
ctx: &HttpQueryContext,
465+
TypedHeader(body_format): TypedHeader<BodyFormat>,
428466
Path((query_id, page_no)): Path<(String, usize)>,
429467
) -> PoemResult<impl IntoResponse> {
430468
ctx.check_node_id(&query_id)?;
@@ -476,7 +514,12 @@ async fn query_page_handler(
476514
query
477515
.update_expire_time(false, resp.is_data_drained())
478516
.await;
479-
Ok(QueryResponse::from_internal(query_id, resp, false))
517+
Ok(QueryResponse::from_internal(
518+
query_id,
519+
resp,
520+
false,
521+
body_format,
522+
))
480523
}
481524
}
482525
};
@@ -502,14 +545,10 @@ async fn query_page_handler(
502545
#[async_backtrace::framed]
503546
pub(crate) async fn query_handler(
504547
ctx: &HttpQueryContext,
548+
TypedHeader(body_format): TypedHeader<BodyFormat>,
505549
Json(mut req): Json<HttpQueryRequest>,
506550
) -> PoemResult<impl IntoResponse> {
507551
let session = ctx.session.clone();
508-
509-
// poem::http::Request
510-
511-
// poem::web::TypedHeader;
512-
513552
let query_handle = async {
514553
let agent_info = ctx
515554
.user_agent
@@ -573,7 +612,10 @@ pub(crate) async fn query_handler(
573612
query
574613
.update_expire_time(false, resp.is_data_drained())
575614
.await;
576-
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
615+
Ok(
616+
QueryResponse::from_internal(query.id.to_string(), resp, false, body_format)
617+
.into_response(),
618+
)
577619
}
578620
}
579621
};
@@ -945,6 +987,7 @@ pub(crate) fn get_http_tracing_span(
945987
.with_properties(|| ctx.to_fastrace_properties())
946988
}
947989

990+
#[derive(Debug, Clone, Copy)]
948991
enum BodyFormat {
949992
Json,
950993
Arrow,
@@ -971,9 +1014,15 @@ impl Header for BodyFormat {
9711014
}
9721015

9731016
fn encode<E: Extend<HeaderValue>>(&self, values: &mut E) {
974-
values.extend([match self {
975-
BodyFormat::Json => HeaderValue::from_static("application/json"),
976-
BodyFormat::Arrow => HeaderValue::from_static("application/vnd.apache.arrow.file"),
977-
}]);
1017+
values.extend([HeaderValue::from_static(self.content_type())]);
1018+
}
1019+
}
1020+
1021+
impl BodyFormat {
1022+
pub fn content_type(&self) -> &'static str {
1023+
match self {
1024+
BodyFormat::Json => "application/json",
1025+
BodyFormat::Arrow => "application/vnd.apache.arrow.stream",
1026+
}
9781027
}
9791028
}

src/query/service/src/servers/http/v1/query/blocks_serializer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::cell::RefCell;
1616
use std::ops::DerefMut;
1717

18+
use arrow_ipc::writer::StreamWriter;
19+
use databend_common_exception::Result;
1820
use databend_common_expression::types::date::date_to_string;
1921
use databend_common_expression::types::interval::interval_to_string;
2022
use databend_common_expression::types::timestamp::timestamp_to_string;
@@ -100,6 +102,21 @@ impl BlocksSerializer {
100102
pub fn num_rows(&self) -> usize {
101103
self.columns.iter().map(|(_, num_rows)| *num_rows).sum()
102104
}
105+
106+
pub fn to_arrow_ipc(&self, schema: arrow_schema::Schema) -> Result<Vec<u8>> {
107+
let mut buf = Vec::new();
108+
let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
109+
110+
let mut data_schema = None;
111+
for (block, _) in &self.columns {
112+
let block = DataBlock::new_from_columns(block.clone());
113+
let data_schema = data_schema.get_or_insert_with(|| block.infer_schema());
114+
let batch = block.to_record_batch_with_dataschema(data_schema)?;
115+
writer.write(&batch)?;
116+
}
117+
writer.finish()?;
118+
Ok(buf)
119+
}
103120
}
104121

105122
impl serde::Serialize for BlocksSerializer {

0 commit comments

Comments
 (0)