Skip to content

Commit e2abb51

Browse files
alter column's json type on demand
Signed-off-by: luofucong <[email protected]>
1 parent 7ec6570 commit e2abb51

File tree

15 files changed

+274
-65
lines changed

15 files changed

+274
-65
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/recordbatch/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ workspace = true
99

1010
[dependencies]
1111
arc-swap = "1.6"
12+
arrow-schema.workspace = true
1213
common-base.workspace = true
1314
common-error.workspace = true
1415
common-macro.workspace = true

src/common/recordbatch/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod adapter;
1818
pub mod cursor;
1919
pub mod error;
2020
pub mod filter;
21-
mod recordbatch;
21+
pub mod recordbatch;
2222
pub mod util;
2323

2424
use std::pin::Pin;

src/common/recordbatch/src/recordbatch.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::borrow::Cow;
1516
use std::collections::HashMap;
1617
use std::slice;
1718
use std::sync::Arc;
1819

20+
use arrow_schema::extension::ExtensionType;
1921
use datafusion::arrow::util::pretty::pretty_format_batches;
2022
use datafusion_common::arrow::array::ArrayRef;
2123
use datafusion_common::arrow::compute;
2224
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
2325
use datatypes::arrow::array::{Array, RecordBatchOptions};
2426
use datatypes::arrow::datatypes::{Field, Schema};
27+
use datatypes::extension::json::JsonExtensionType;
2528
use datatypes::prelude::DataType;
2629
use datatypes::schema::SchemaRef;
2730
use datatypes::vectors::{Helper, VectorRef};
@@ -32,7 +35,7 @@ use snafu::{OptionExt, ResultExt, ensure};
3235
use crate::DfRecordBatch;
3336
use crate::error::{
3437
self, ArrowComputeSnafu, CastVectorSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
35-
ProjectArrowRecordBatchSnafu, Result, SchemaConversionSnafu,
38+
ProjectArrowRecordBatchSnafu, Result,
3639
};
3740

3841
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -61,8 +64,6 @@ impl RecordBatch {
6164
// TODO(LFC): Remove the casting here once `Batch` is no longer used.
6265
let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
6366

64-
let schema = maybe_amend_with_struct_arrays(schema, &arrow_arrays)?;
65-
6667
let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
6768
.context(error::NewDfRecordBatchSnafu)?;
6869

@@ -252,30 +253,44 @@ impl RecordBatch {
252253
}
253254
}
254255

255-
fn maybe_amend_with_struct_arrays(schema: SchemaRef, arrays: &[ArrayRef]) -> Result<SchemaRef> {
256-
if arrays
256+
/// Align the schema's datatypes with the actual arrays', only if there are json arrays.
257+
/// Because the actual datatype of json array is determined by its values, not by the pre-defined
258+
/// schema. So the datatype in schema could be lagged behind the actual one in the array.
259+
pub fn maybe_align_schema_with_json_arrays<'a>(
260+
schema: &'a ArrowSchemaRef,
261+
arrays: &[ArrayRef],
262+
) -> Cow<'a, ArrowSchemaRef> {
263+
if schema
264+
.fields()
257265
.iter()
258-
.any(|x| matches!(x.data_type(), ArrowDataType::Struct(_)))
266+
.any(|f| f.extension_type_name() == Some(JsonExtensionType::NAME))
259267
{
260-
let schema = schema.arrow_schema();
261268
let mut fields = Vec::with_capacity(schema.fields().len());
262269
for (f, a) in schema.fields().iter().zip(arrays.iter()) {
263-
let field_type = f.data_type();
264-
let array_type = a.data_type();
265-
match (field_type, array_type) {
266-
(ArrowDataType::Struct(_), ArrowDataType::Struct(x)) => {
267-
let field = Field::new_struct(f.name(), x.clone(), f.is_nullable());
268-
fields.push(Arc::new(field));
270+
if f.extension_type_name() == Some(JsonExtensionType::NAME) {
271+
let field_type = f.data_type();
272+
let array_type = a.data_type();
273+
match (field_type, array_type) {
274+
(ArrowDataType::Struct(x), ArrowDataType::Struct(y)) if x != y => {
275+
common_telemetry::debug!(
276+
"Align field {} datatype: {field_type} => {array_type}",
277+
f.name(),
278+
);
279+
let field = Field::new_struct(f.name(), y.clone(), f.is_nullable())
280+
.with_metadata(f.metadata().clone());
281+
fields.push(Arc::new(field));
282+
}
283+
_ => fields.push(f.clone()),
269284
}
270-
_ => fields.push(f.clone()),
285+
} else {
286+
fields.push(f.clone())
271287
}
272288
}
273289
let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
274-
let schema = schema.try_into().context(SchemaConversionSnafu)?;
275-
Ok(Arc::new(schema))
290+
Cow::Owned(schema)
276291
} else {
277-
// Fast path: must not need to amend if there are no struct arrays.
278-
Ok(schema)
292+
// Fast path: must not need to align if there are no json types.
293+
Cow::Borrowed(schema)
279294
}
280295
}
281296

src/datatypes/src/data_type.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::fmt;
1616
use std::sync::Arc;
1717

18-
use arrow::compute::cast as arrow_array_cast;
1918
use arrow::datatypes::{
2019
DataType as ArrowDataType, IntervalUnit as ArrowIntervalUnit, TimeUnit as ArrowTimeUnit,
2120
};
@@ -368,8 +367,10 @@ impl ConcreteDataType {
368367

369368
/// Checks if the data type can cast to another data type.
370369
pub fn can_arrow_type_cast_to(&self, to_type: &ConcreteDataType) -> bool {
371-
let array = arrow_array::new_empty_array(&self.as_arrow_type());
372-
arrow_array_cast(array.as_ref(), &to_type.as_arrow_type()).is_ok()
370+
match (self, to_type) {
371+
(ConcreteDataType::Json(this), ConcreteDataType::Json(that)) => that.is_include(this),
372+
_ => arrow::compute::can_cast_types(&self.as_arrow_type(), &to_type.as_arrow_type()),
373+
}
373374
}
374375

375376
/// Try to cast data type as a [`DurationType`].

src/datatypes/src/schema.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,9 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
273273
_ => None,
274274
};
275275
if let Some(extype) = extype {
276-
let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]);
277-
field = field.with_metadata(metadata);
276+
field
277+
.metadata_mut()
278+
.insert(TYPE_KEY.to_string(), extype.to_string());
278279
}
279280
fields.push(field);
280281
ensure!(

src/datatypes/src/types/json_type.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
// limitations under the License.
1414

1515
use std::collections::{BTreeMap, HashMap};
16+
use std::fmt::{Display, Formatter};
1617
use std::str::FromStr;
1718
use std::sync::Arc;
1819

1920
use arrow::datatypes::DataType as ArrowDataType;
20-
use arrow_schema::Fields;
2121
use common_base::bytes::Bytes;
2222
use serde::{Deserialize, Serialize};
2323
use snafu::ResultExt;
@@ -68,7 +68,7 @@ impl JsonType {
6868
/// - if not, the json is one of bool, number, string or array, make it a special field called
6969
/// [JSON_PLAIN_FIELD_NAME] with metadata [JSON_PLAIN_FIELD_METADATA_KEY] = `"true"` in a
7070
/// struct with only that field.
71-
pub(crate) fn as_struct_type(&self) -> StructType {
71+
pub fn as_struct_type(&self) -> StructType {
7272
match &self.format {
7373
JsonFormat::Jsonb => StructType::default(),
7474
JsonFormat::Native(inner) => match inner.as_ref() {
@@ -98,7 +98,7 @@ impl JsonType {
9898
}
9999

100100
/// Try to merge this json type with others, error on datatype conflict.
101-
pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
101+
pub fn merge(&mut self, other: &JsonType) -> Result<()> {
102102
match (&self.format, &other.format) {
103103
(JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
104104
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
@@ -113,7 +113,8 @@ impl JsonType {
113113
}
114114
}
115115

116-
pub(crate) fn is_mergeable(&self, other: &JsonType) -> bool {
116+
/// Check if it can merge with `other` json type.
117+
pub fn is_mergeable(&self, other: &JsonType) -> bool {
117118
match (&self.format, &other.format) {
118119
(JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
119120
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
@@ -122,6 +123,49 @@ impl JsonType {
122123
_ => false,
123124
}
124125
}
126+
127+
/// Check if it includes all fields in `other` json type.
128+
pub fn is_include(&self, other: &JsonType) -> bool {
129+
match (&self.format, &other.format) {
130+
(JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
131+
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
132+
is_include(this.as_ref(), that.as_ref())
133+
}
134+
_ => false,
135+
}
136+
}
137+
}
138+
139+
fn is_include(this: &ConcreteDataType, that: &ConcreteDataType) -> bool {
140+
fn is_include_struct(this: &StructType, that: &StructType) -> bool {
141+
let this_fields = this.fields();
142+
let this_fields = this_fields
143+
.iter()
144+
.map(|x| (x.name(), x))
145+
.collect::<HashMap<_, _>>();
146+
147+
for that_field in that.fields().iter() {
148+
let Some(this_field) = this_fields.get(that_field.name()) else {
149+
return false;
150+
};
151+
if !is_include(this_field.data_type(), that_field.data_type()) {
152+
return false;
153+
}
154+
}
155+
true
156+
}
157+
158+
match (this, that) {
159+
(this, that) if this == that => true,
160+
(ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
161+
is_include(this.item_type(), that.item_type())
162+
}
163+
(ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
164+
is_include_struct(this, that)
165+
}
166+
(_, ConcreteDataType::Null(_)) => true,
167+
_ => false,
168+
}
125169
}
126170

127171
fn is_mergeable(this: &ConcreteDataType, that: &ConcreteDataType) -> bool {
@@ -223,7 +267,7 @@ impl DataType for JsonType {
223267
fn as_arrow_type(&self) -> ArrowDataType {
224268
match self.format {
225269
JsonFormat::Jsonb => ArrowDataType::Binary,
226-
JsonFormat::Native(_) => ArrowDataType::Struct(Fields::empty()),
270+
JsonFormat::Native(_) => self.as_struct_type().as_arrow_type(),
227271
}
228272
}
229273

@@ -242,6 +286,12 @@ impl DataType for JsonType {
242286
}
243287
}
244288

289+
impl Display for JsonType {
290+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
291+
write!(f, "{}", self.name())
292+
}
293+
}
294+
245295
/// Converts a json type value to string
246296
pub fn jsonb_to_string(val: &[u8]) -> Result<String> {
247297
match jsonb::from_slice(val) {

src/datatypes/src/vectors/struct_vector.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -379,10 +379,8 @@ impl MutableVector for StructVectorBuilder {
379379
},
380380
StructValueRef::Ref(val) => self.push_struct_value(val)?,
381381
StructValueRef::RefList { val, fields } => {
382-
let struct_value = StructValue::try_new(
383-
val.iter().map(|v| Value::from(v.clone())).collect(),
384-
fields.clone(),
385-
)?;
382+
let struct_value =
383+
StructValue::try_new(val.into_iter().map(Value::from).collect(), fields)?;
386384
self.push_struct_value(&struct_value)?;
387385
}
388386
}
@@ -429,12 +427,17 @@ impl ScalarVectorBuilder for StructVectorBuilder {
429427
.value_builders
430428
.iter_mut()
431429
.map(|b| b.to_vector().to_arrow_array())
432-
.collect();
433-
let struct_array = StructArray::new(
434-
self.fields.as_arrow_fields(),
435-
arrays,
436-
self.null_buffer.finish(),
437-
);
430+
.collect::<Vec<_>>();
431+
432+
let struct_array = if arrays.is_empty() {
433+
StructArray::new_empty_fields(self.len(), self.null_buffer.finish())
434+
} else {
435+
StructArray::new(
436+
self.fields.as_arrow_fields(),
437+
arrays,
438+
self.null_buffer.finish(),
439+
)
440+
};
438441

439442
StructVector::try_new(self.fields.clone(), struct_array).unwrap()
440443
}

src/mito2/src/sst/parquet/format.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl PrimaryKeyWriteFormat {
9696
}
9797

9898
/// Gets the arrow schema to store in parquet.
99+
#[cfg(test)]
99100
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
100101
&self.arrow_schema
101102
}
@@ -135,7 +136,11 @@ impl PrimaryKeyWriteFormat {
135136
}
136137
columns.push(batch.op_types().to_arrow_array());
137138

138-
RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
139+
let schema = common_recordbatch::recordbatch::maybe_align_schema_with_json_arrays(
140+
&self.arrow_schema,
141+
&columns,
142+
);
143+
RecordBatch::try_new(schema.as_ref().clone(), columns).context(NewRecordBatchSnafu)
139144
}
140145
}
141146

src/mito2/src/sst/parquet/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ where
411411
let arrow_batch = write_format.convert_batch(&batch)?;
412412

413413
let start = Instant::now();
414-
self.maybe_init_writer(write_format.arrow_schema(), opts)
414+
self.maybe_init_writer(arrow_batch.schema_ref(), opts)
415415
.await?
416416
.write(&arrow_batch)
417417
.await

0 commit comments

Comments
 (0)