Skip to content

Commit 8acd706

Browse files
committed
refine
1 parent b345914 commit 8acd706

File tree

4 files changed

+70
-56
lines changed

4 files changed

+70
-56
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use databend_common_base::runtime::ParentMemStat;
2727
use databend_common_base::runtime::ThreadTracker;
2828
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2929
use databend_common_config::GlobalConfig;
30-
use databend_common_expression::DataSchemaRef;
30+
use databend_common_expression::DataSchema;
3131
use databend_common_management::WorkloadGroupResourceManager;
3232
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
3333
use fastrace::prelude::*;
@@ -60,6 +60,7 @@ use super::query::ExecuteStateKind;
6060
use super::query::HttpQuery;
6161
use super::query::HttpQueryRequest;
6262
use super::query::HttpQueryResponseInternal;
63+
use super::query::ResponseState;
6364
use crate::clusters::ClusterDiscovery;
6465
use crate::servers::http::error::HttpErrorCode;
6566
use crate::servers::http::error::QueryError;
@@ -123,7 +124,7 @@ pub struct QueryResponseField {
123124
}
124125

125126
impl QueryResponseField {
126-
pub fn from_schema(schema: DataSchemaRef) -> Vec<Self> {
127+
pub fn from_schema(schema: &DataSchema) -> Vec<Self> {
127128
schema
128129
.fields()
129130
.iter()
@@ -167,15 +168,31 @@ pub struct QueryResponse {
167168
impl QueryResponse {
168169
pub(crate) fn from_internal(
169170
id: String,
170-
r: HttpQueryResponseInternal,
171+
HttpQueryResponseInternal {
172+
data,
173+
session_id,
174+
session,
175+
node_id,
176+
result_timeout_secs,
177+
state:
178+
ResponseState {
179+
has_result_set,
180+
schema,
181+
running_time_ms,
182+
progresses,
183+
state,
184+
affect,
185+
error,
186+
warnings,
187+
},
188+
}: HttpQueryResponseInternal,
171189
is_final: bool,
172190
) -> impl IntoResponse {
173-
let state = r.state.clone();
174191
let (data, next_uri) = if is_final {
175192
(Arc::new(BlocksSerializer::empty()), None)
176193
} else {
177-
match state.state {
178-
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
194+
match state {
195+
ExecuteStateKind::Running | ExecuteStateKind::Starting => match data {
179196
None => (
180197
Arc::new(BlocksSerializer::empty()),
181198
Some(make_state_uri(&id)),
@@ -192,7 +209,7 @@ impl QueryResponse {
192209
Arc::new(BlocksSerializer::empty()),
193210
Some(make_final_uri(&id)),
194211
),
195-
ExecuteStateKind::Succeeded => match r.data {
212+
ExecuteStateKind::Succeeded => match data {
196213
None => (
197214
Arc::new(BlocksSerializer::empty()),
198215
Some(make_final_uri(&id)),
@@ -208,38 +225,37 @@ impl QueryResponse {
208225
}
209226
};
210227

211-
if let Some(err) = &r.state.error {
228+
if let Some(err) = &error {
212229
metrics_incr_http_response_errors_count(err.name(), err.code());
213230
}
214231

215-
let session_id = r.session_id.clone();
216232
let stats = QueryStats {
217-
progresses: state.progresses.clone(),
218-
running_time_ms: state.running_time_ms,
233+
progresses,
234+
running_time_ms,
219235
};
220236
let rows = data.num_rows();
221237

222238
Json(QueryResponse {
223239
data,
224-
state: state.state,
225-
schema: state.schema.clone(),
240+
state,
241+
schema: QueryResponseField::from_schema(&schema),
226242
session_id: Some(session_id),
227-
node_id: r.node_id,
228-
session: r.session,
243+
node_id,
244+
session,
229245
stats,
230-
affect: state.affect,
231-
warnings: r.state.warnings,
246+
affect,
247+
warnings,
232248
id: id.clone(),
233249
next_uri,
234250
stats_uri: Some(make_state_uri(&id)),
235251
final_uri: Some(make_final_uri(&id)),
236252
kill_uri: Some(make_kill_uri(&id)),
237-
error: r.state.error.map(QueryError::from_error_code),
238-
has_result_set: r.state.has_result_set,
239-
result_timeout_secs: Some(r.result_timeout_secs),
253+
error: error.map(QueryError::from_error_code),
254+
has_result_set,
255+
result_timeout_secs: Some(result_timeout_secs),
240256
})
241-
.with_header(HEADER_QUERY_ID, id.clone())
242-
.with_header(HEADER_QUERY_STATE, state.state.to_string())
257+
.with_header(HEADER_QUERY_ID, id)
258+
.with_header(HEADER_QUERY_STATE, state.to_string())
243259
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
244260
}
245261
}

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,12 @@ use serde::Deserialize;
3838
use serde::Serialize;
3939
use ExecuteState::*;
4040

41+
use super::http_query::ResponseState;
42+
use super::sized_spsc::SizedChannelSender;
4143
use crate::interpreters::interpreter_plan_sql;
4244
use crate::interpreters::Interpreter;
4345
use crate::interpreters::InterpreterFactory;
4446
use crate::interpreters::InterpreterQueryLog;
45-
use crate::servers::http::v1::http_query_handlers::QueryResponseField;
46-
use crate::servers::http::v1::query::http_query::ResponseState;
47-
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
4847
use crate::sessions::AcquireQueueGuard;
4948
use crate::sessions::QueryAffect;
5049
use crate::sessions::QueryContext;
@@ -129,14 +128,14 @@ pub struct ExecuteRunning {
129128
session: Arc<Session>,
130129
// mainly used to get progress for now
131130
pub(crate) ctx: Arc<QueryContext>,
132-
schema: Vec<QueryResponseField>,
131+
schema: DataSchemaRef,
133132
has_result_set: bool,
134133
#[allow(dead_code)]
135134
queue_guard: AcquireQueueGuard,
136135
}
137136

138137
pub struct ExecuteStopped {
139-
pub schema: Vec<QueryResponseField>,
138+
pub schema: DataSchemaRef,
140139
pub has_result_set: Option<bool>,
141140
pub stats: Progresses,
142141
pub affect: Option<QueryAffect>,
@@ -192,24 +191,23 @@ impl ExecutorSessionState {
192191
impl Executor {
193192
pub fn get_response_state(&self) -> ResponseState {
194193
let (exe_state, err) = self.state.extract();
194+
let schema = match &self.state {
195+
Starting(_) => Default::default(),
196+
Running(r) => r.schema.clone(),
197+
Stopped(f) => f.schema.clone(),
198+
};
199+
195200
ResponseState {
196201
running_time_ms: self.get_query_duration_ms(),
197202
progresses: self.get_progress(),
198203
state: exe_state,
199204
error: err,
200205
warnings: self.get_warnings(),
201206
affect: self.get_affect(),
202-
schema: self.get_schema(),
207+
schema,
203208
has_result_set: self.has_result_set(),
204209
}
205210
}
206-
pub fn get_schema(&self) -> Vec<QueryResponseField> {
207-
match &self.state {
208-
Starting(_) => Default::default(),
209-
Running(r) => r.schema.clone(),
210-
Stopped(f) => f.schema.clone(),
211-
}
212-
}
213211

214212
pub fn has_result_set(&self) -> Option<bool> {
215213
match &self.state {
@@ -252,11 +250,10 @@ impl Executor {
252250
}
253251

254252
pub fn update_schema(this: &Arc<Mutex<Executor>>, schema: DataSchemaRef) {
255-
let mut guard = this.lock();
256-
match &mut guard.state {
253+
match &mut this.lock().state {
257254
Starting(_) => {}
258-
Running(r) => r.schema = QueryResponseField::from_schema(schema),
259-
Stopped(f) => f.schema = QueryResponseField::from_schema(schema),
255+
Running(r) => r.schema = schema,
256+
Stopped(f) => f.schema = schema,
260257
}
261258
}
262259

@@ -311,7 +308,7 @@ impl Executor {
311308
}
312309
ExecuteStopped {
313310
stats: Default::default(),
314-
schema: vec![],
311+
schema: Default::default(),
315312
has_result_set: None,
316313
reason: reason.clone(),
317314
session_state: ExecutorSessionState::new(s.ctx.get_current_session()),
@@ -388,9 +385,9 @@ impl ExecuteState {
388385
let is_dynamic_schema = plan.is_dynamic_schema();
389386
let schema = if has_result_set && !is_dynamic_schema {
390387
// check has_result_set first for safety
391-
QueryResponseField::from_schema(plan.schema())
388+
plan.schema()
392389
} else {
393-
vec![]
390+
Default::default()
394391
};
395392

396393
let running_state = ExecuteRunning {

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use databend_common_config::GlobalConfig;
3333
use databend_common_exception::ErrorCode;
3434
use databend_common_exception::Result;
3535
use databend_common_exception::ResultExt;
36+
use databend_common_expression::DataSchemaRef;
3637
use databend_common_expression::Scalar;
3738
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
3839
use databend_common_settings::ScopeLevel;
@@ -51,22 +52,21 @@ use serde::Deserializer;
5152
use serde::Serialize;
5253
use serde::Serializer;
5354

55+
use super::blocks_serializer::BlocksSerializer;
56+
use super::execute_state::ExecuteStarting;
57+
use super::execute_state::ExecuteStopped;
5458
use super::execute_state::ExecutionError;
59+
use super::execute_state::ExecutorSessionState;
60+
use super::execute_state::Progresses;
5561
use super::CloseReason;
62+
use super::ExecuteState;
63+
use super::ExecuteStateKind;
64+
use super::Executor;
5665
use super::HttpQueryContext;
66+
use super::PageManager;
67+
use super::ResponseData;
68+
use super::Wait;
5769
use crate::servers::http::error::QueryError;
58-
use crate::servers::http::v1::http_query_handlers::QueryResponseField;
59-
use crate::servers::http::v1::query::blocks_serializer::BlocksSerializer;
60-
use crate::servers::http::v1::query::execute_state::ExecuteStarting;
61-
use crate::servers::http::v1::query::execute_state::ExecuteStopped;
62-
use crate::servers::http::v1::query::execute_state::ExecutorSessionState;
63-
use crate::servers::http::v1::query::execute_state::Progresses;
64-
use crate::servers::http::v1::query::ExecuteState;
65-
use crate::servers::http::v1::query::ExecuteStateKind;
66-
use crate::servers::http::v1::query::Executor;
67-
use crate::servers::http::v1::query::PageManager;
68-
use crate::servers::http::v1::query::ResponseData;
69-
use crate::servers::http::v1::query::Wait;
7070
use crate::servers::http::v1::ClientSessionManager;
7171
use crate::servers::http::v1::HttpQueryManager;
7272
use crate::servers::http::v1::QueryResponse;
@@ -488,7 +488,7 @@ pub struct StageAttachmentConf {
488488
#[derive(Debug, Clone)]
489489
pub struct ResponseState {
490490
pub has_result_set: Option<bool>,
491-
pub schema: Vec<QueryResponseField>,
491+
pub schema: DataSchemaRef,
492492
pub running_time_ms: i64,
493493
pub progresses: Progresses,
494494
pub state: ExecuteStateKind,
@@ -844,7 +844,7 @@ impl HttpQuery {
844844
.ok();
845845
let state = ExecuteStopped {
846846
stats: Progresses::default(),
847-
schema: vec![],
847+
schema: Default::default(),
848848
has_result_set: None,
849849
reason: Err(e.clone()),
850850
session_state: ExecutorSessionState::new(

src/query/service/src/servers/http/v1/query/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub use http_query::HttpQueryRequest;
3333
pub use http_query::HttpQueryResponseInternal;
3434
pub use http_query::HttpSessionConf;
3535
pub use http_query::HttpSessionStateInternal;
36+
pub use http_query::ResponseState;
3637
pub use http_query_context::HttpQueryContext;
3738
pub(crate) use http_query_manager::CloseReason;
3839
pub use http_query_manager::HttpQueryManager;

0 commit comments

Comments
 (0)