Skip to content

Commit a9a3e0b

Browse files
authored
fix: prom ql logical plan use column index not name (#7109)
* feat: use index not col name Signed-off-by: discord9 <[email protected]> * fix: use name without qualifier&output schema fix Signed-off-by: discord9 <[email protected]> * proto Signed-off-by: discord9 <[email protected]> * refactor: resolve column name/index Signed-off-by: discord9 <[email protected]> * pcr Signed-off-by: discord9 <[email protected]> * chore: update proto Signed-off-by: discord9 <[email protected]> * chore: update proto Signed-off-by: discord9 <[email protected]> --------- Signed-off-by: discord9 <[email protected]>
1 parent 41ce100 commit a9a3e0b

File tree

11 files changed

+576
-110
lines changed

11 files changed

+576
-110
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
147147
fst = "0.4.7"
148148
futures = "0.3"
149149
futures-util = "0.3"
150-
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69a6089933daa573c96808ec4bbc48f447ec6e8c" }
150+
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "72a0d22e0f5f716b2ee21bca091f87a88c36e5ca" }
151151
hex = "0.4"
152152
http = "1"
153153
humantime = "2.1"

src/promql/src/extension_plan.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ mod union_distinct_on;
2727

2828
pub use absent::{Absent, AbsentExec, AbsentStream};
2929
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
30+
use datafusion::common::DFSchemaRef;
31+
use datafusion::error::{DataFusionError, Result as DataFusionResult};
3032
pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream, build_special_time_expr};
3133
pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream};
3234
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
@@ -40,3 +42,44 @@ pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctO
4042
pub type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
4143

4244
const METRIC_NUM_SERIES: &str = "num_series";
45+
46+
/// Utilities for handling unfix logic in extension plans
47+
/// Convert column name to index for serialization
48+
pub fn serialize_column_index(schema: &DFSchemaRef, column_name: &str) -> u64 {
49+
schema
50+
.index_of_column_by_name(None, column_name)
51+
.map(|idx| idx as u64)
52+
.unwrap_or(u64::MAX) // make sure if not found, it will report error in deserialization
53+
}
54+
55+
/// Convert index back to column name for deserialization
56+
pub fn resolve_column_name(
57+
index: u64,
58+
schema: &DFSchemaRef,
59+
context: &str,
60+
column_type: &str,
61+
) -> DataFusionResult<String> {
62+
let columns = schema.columns();
63+
columns
64+
.get(index as usize)
65+
.ok_or_else(|| {
66+
DataFusionError::Internal(format!(
67+
"Failed to get {} column at idx {} during unfixing {} with columns:{:?}",
68+
column_type, index, context, columns
69+
))
70+
})
71+
.map(|field| field.name().to_string())
72+
}
73+
74+
/// Batch process multiple column indices
75+
pub fn resolve_column_names(
76+
indices: &[u64],
77+
schema: &DFSchemaRef,
78+
context: &str,
79+
column_type: &str,
80+
) -> DataFusionResult<Vec<String>> {
81+
indices
82+
.iter()
83+
.map(|idx| resolve_column_name(*idx, schema, context, column_type))
84+
.collect()
85+
}

src/promql/src/extension_plan/absent.rs

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use prost::Message;
4747
use snafu::ResultExt;
4848

4949
use crate::error::DeserializeSnafu;
50-
use crate::extension_plan::Millisecond;
50+
use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
5151

5252
/// Maximum number of rows per output batch
5353
const ABSENT_BATCH_SIZE: usize = 8192;
@@ -62,6 +62,13 @@ pub struct Absent {
6262
fake_labels: Vec<(String, String)>,
6363
input: LogicalPlan,
6464
output_schema: DFSchemaRef,
65+
unfix: Option<UnfixIndices>,
66+
}
67+
68+
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69+
struct UnfixIndices {
70+
pub time_index_column_idx: u64,
71+
pub value_column_idx: u64,
6572
}
6673

6774
impl PartialOrd for Absent {
@@ -122,16 +129,44 @@ impl UserDefinedLogicalNodeCore for Absent {
122129
));
123130
}
124131

125-
Ok(Self {
126-
start: self.start,
127-
end: self.end,
128-
step: self.step,
129-
time_index_column: self.time_index_column.clone(),
130-
value_column: self.value_column.clone(),
131-
fake_labels: self.fake_labels.clone(),
132-
input: inputs[0].clone(),
133-
output_schema: self.output_schema.clone(),
134-
})
132+
let input: LogicalPlan = inputs[0].clone();
133+
let input_schema = input.schema();
134+
135+
if let Some(unfix) = &self.unfix {
136+
// transform indices to names
137+
let time_index_column = resolve_column_name(
138+
unfix.time_index_column_idx,
139+
input_schema,
140+
"Absent",
141+
"time index",
142+
)?;
143+
144+
let value_column =
145+
resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
146+
147+
// Recreate output schema with actual field names
148+
Self::try_new(
149+
self.start,
150+
self.end,
151+
self.step,
152+
time_index_column,
153+
value_column,
154+
self.fake_labels.clone(),
155+
input,
156+
)
157+
} else {
158+
Ok(Self {
159+
start: self.start,
160+
end: self.end,
161+
step: self.step,
162+
time_index_column: self.time_index_column.clone(),
163+
value_column: self.value_column.clone(),
164+
fake_labels: self.fake_labels.clone(),
165+
input,
166+
output_schema: self.output_schema.clone(),
167+
unfix: None,
168+
})
169+
}
135170
}
136171
}
137172

@@ -179,6 +214,7 @@ impl Absent {
179214
fake_labels,
180215
input,
181216
output_schema,
217+
unfix: None,
182218
})
183219
}
184220

@@ -209,12 +245,17 @@ impl Absent {
209245
}
210246

211247
pub fn serialize(&self) -> Vec<u8> {
248+
let time_index_column_idx =
249+
serialize_column_index(self.input.schema(), &self.time_index_column);
250+
251+
let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
252+
212253
pb::Absent {
213254
start: self.start,
214255
end: self.end,
215256
step: self.step,
216-
time_index_column: self.time_index_column.clone(),
217-
value_column: self.value_column.clone(),
257+
time_index_column_idx,
258+
value_column_idx,
218259
fake_labels: self
219260
.fake_labels
220261
.iter()
@@ -223,6 +264,7 @@ impl Absent {
223264
value: value.clone(),
224265
})
225266
.collect(),
267+
..Default::default()
226268
}
227269
.encode_to_vec()
228270
}
@@ -233,19 +275,27 @@ impl Absent {
233275
produce_one_row: false,
234276
schema: Arc::new(DFSchema::empty()),
235277
});
236-
Self::try_new(
237-
pb_absent.start,
238-
pb_absent.end,
239-
pb_absent.step,
240-
pb_absent.time_index_column,
241-
pb_absent.value_column,
242-
pb_absent
278+
279+
let unfix = UnfixIndices {
280+
time_index_column_idx: pb_absent.time_index_column_idx,
281+
value_column_idx: pb_absent.value_column_idx,
282+
};
283+
284+
Ok(Self {
285+
start: pb_absent.start,
286+
end: pb_absent.end,
287+
step: pb_absent.step,
288+
time_index_column: String::new(),
289+
value_column: String::new(),
290+
fake_labels: pb_absent
243291
.fake_labels
244292
.iter()
245293
.map(|label| (label.key.clone(), label.value.clone()))
246294
.collect(),
247-
placeholder_plan,
248-
)
295+
input: placeholder_plan,
296+
output_schema: Arc::new(DFSchema::empty()),
297+
unfix: Some(unfix),
298+
})
249299
}
250300
}
251301

src/promql/src/extension_plan/instant_manipulate.rs

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ use prost::Message;
4141
use snafu::ResultExt;
4242

4343
use crate::error::{DeserializeSnafu, Result};
44-
use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond};
44+
use crate::extension_plan::{
45+
METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
46+
};
4547
use crate::metrics::PROMQL_SERIES_COUNT;
4648

4749
/// Manipulate the input record batch to make it suitable for Instant Operator.
@@ -59,6 +61,13 @@ pub struct InstantManipulate {
5961
/// A optional column for validating staleness
6062
field_column: Option<String>,
6163
input: LogicalPlan,
64+
unfix: Option<UnfixIndices>,
65+
}
66+
67+
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
68+
struct UnfixIndices {
69+
pub time_index_idx: u64,
70+
pub field_index_idx: u64,
6271
}
6372

6473
impl UserDefinedLogicalNodeCore for InstantManipulate {
@@ -97,15 +106,51 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
97106
));
98107
}
99108

100-
Ok(Self {
101-
start: self.start,
102-
end: self.end,
103-
lookback_delta: self.lookback_delta,
104-
interval: self.interval,
105-
time_index_column: self.time_index_column.clone(),
106-
field_column: self.field_column.clone(),
107-
input: inputs.into_iter().next().unwrap(),
108-
})
109+
let input: LogicalPlan = inputs.into_iter().next().unwrap();
110+
let input_schema = input.schema();
111+
112+
if let Some(unfix) = &self.unfix {
113+
// transform indices to names
114+
let time_index_column = resolve_column_name(
115+
unfix.time_index_idx,
116+
input_schema,
117+
"InstantManipulate",
118+
"time index",
119+
)?;
120+
121+
let field_column = if unfix.field_index_idx == u64::MAX {
122+
None
123+
} else {
124+
Some(resolve_column_name(
125+
unfix.field_index_idx,
126+
input_schema,
127+
"InstantManipulate",
128+
"field",
129+
)?)
130+
};
131+
132+
Ok(Self {
133+
start: self.start,
134+
end: self.end,
135+
lookback_delta: self.lookback_delta,
136+
interval: self.interval,
137+
time_index_column,
138+
field_column,
139+
input,
140+
unfix: None,
141+
})
142+
} else {
143+
Ok(Self {
144+
start: self.start,
145+
end: self.end,
146+
lookback_delta: self.lookback_delta,
147+
interval: self.interval,
148+
time_index_column: self.time_index_column.clone(),
149+
field_column: self.field_column.clone(),
150+
input,
151+
unfix: None,
152+
})
153+
}
109154
}
110155
}
111156

@@ -127,6 +172,7 @@ impl InstantManipulate {
127172
time_index_column,
128173
field_column,
129174
input,
175+
unfix: None,
130176
}
131177
}
132178

@@ -148,13 +194,22 @@ impl InstantManipulate {
148194
}
149195

150196
pub fn serialize(&self) -> Vec<u8> {
197+
let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
198+
199+
let field_index_idx = self
200+
.field_column
201+
.as_ref()
202+
.map(|name| serialize_column_index(self.input.schema(), name))
203+
.unwrap_or(u64::MAX);
204+
151205
pb::InstantManipulate {
152206
start: self.start,
153207
end: self.end,
154208
interval: self.interval,
155209
lookback_delta: self.lookback_delta,
156-
time_index: self.time_index_column.clone(),
157-
field_index: self.field_column.clone().unwrap_or_default(),
210+
time_index_idx,
211+
field_index_idx,
212+
..Default::default()
158213
}
159214
.encode_to_vec()
160215
}
@@ -166,19 +221,21 @@ impl InstantManipulate {
166221
produce_one_row: false,
167222
schema: Arc::new(DFSchema::empty()),
168223
});
169-
let field_column = if pb_instant_manipulate.field_index.is_empty() {
170-
None
171-
} else {
172-
Some(pb_instant_manipulate.field_index)
224+
225+
let unfix = UnfixIndices {
226+
time_index_idx: pb_instant_manipulate.time_index_idx,
227+
field_index_idx: pb_instant_manipulate.field_index_idx,
173228
};
229+
174230
Ok(Self {
175231
start: pb_instant_manipulate.start,
176232
end: pb_instant_manipulate.end,
177233
lookback_delta: pb_instant_manipulate.lookback_delta,
178234
interval: pb_instant_manipulate.interval,
179-
time_index_column: pb_instant_manipulate.time_index,
180-
field_column,
235+
time_index_column: String::new(),
236+
field_column: None,
181237
input: placeholder_plan,
238+
unfix: Some(unfix),
182239
})
183240
}
184241
}

0 commit comments

Comments
 (0)