Skip to content

Commit eb62b69

Browse files
feat: basic json read and write
Signed-off-by: luofucong <[email protected]>
1 parent ff99bce commit eb62b69

File tree

29 files changed

+618
-246
lines changed

29 files changed

+618
-246
lines changed

src/api/src/helper.rs

Lines changed: 13 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth
2323
use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant};
2424
use datatypes::prelude::{ConcreteDataType, ValueRef};
2525
use datatypes::types::{
26-
IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType,
27-
};
28-
use datatypes::value::{
29-
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
26+
IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType,
3027
};
28+
use datatypes::value::{ListValueRef, OrderedF32, OrderedF64, StructValueRef, Value};
3129
use datatypes::vectors::VectorRef;
3230
use greptime_proto::v1::column_data_type_extension::TypeExt;
3331
use greptime_proto::v1::ddl_request::Expr;
@@ -127,6 +125,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
127125
};
128126
ConcreteDataType::json_native_datatype(inner_type.into())
129127
}
128+
None => ConcreteDataType::Json(JsonType::empty()),
130129
_ => {
131130
// invalid state, type extension is missing or invalid
132131
ConcreteDataType::null_datatype()
@@ -441,19 +440,22 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
441440
JsonFormat::Jsonb => Some(ColumnDataTypeExtension {
442441
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
443442
}),
444-
JsonFormat::Native(inner) => {
443+
JsonFormat::Native(inner) => (!ConcreteDataType::from(inner.as_ref())
444+
.is_null())
445+
.then(|| {
445446
let inner_type = ColumnDataTypeWrapper::try_from(
446447
ConcreteDataType::from(inner.as_ref()),
447448
)?;
448-
Some(ColumnDataTypeExtension {
449+
Ok(ColumnDataTypeExtension {
449450
type_ext: Some(TypeExt::JsonNativeType(Box::new(
450451
JsonNativeTypeExtension {
451452
datatype: inner_type.datatype.into(),
452453
datatype_extension: inner_type.datatype_ext.map(Box::new),
453454
},
454455
))),
455456
})
456-
}
457+
})
458+
.transpose()?,
457459
}
458460
} else {
459461
None
@@ -882,116 +884,11 @@ pub fn is_column_type_value_eq(
882884
ColumnDataTypeWrapper::try_new(type_value, type_extension)
883885
.map(|wrapper| {
884886
let datatype = ConcreteDataType::from(wrapper);
885-
expect_type == &datatype
887+
expect_type.is_json() && datatype.is_json() || &datatype == expect_type
886888
})
887889
.unwrap_or(false)
888890
}
889891

890-
/// Convert value into proto's value.
891-
pub fn to_proto_value(value: Value) -> v1::Value {
892-
match value {
893-
Value::Null => v1::Value { value_data: None },
894-
Value::Boolean(v) => v1::Value {
895-
value_data: Some(ValueData::BoolValue(v)),
896-
},
897-
Value::UInt8(v) => v1::Value {
898-
value_data: Some(ValueData::U8Value(v.into())),
899-
},
900-
Value::UInt16(v) => v1::Value {
901-
value_data: Some(ValueData::U16Value(v.into())),
902-
},
903-
Value::UInt32(v) => v1::Value {
904-
value_data: Some(ValueData::U32Value(v)),
905-
},
906-
Value::UInt64(v) => v1::Value {
907-
value_data: Some(ValueData::U64Value(v)),
908-
},
909-
Value::Int8(v) => v1::Value {
910-
value_data: Some(ValueData::I8Value(v.into())),
911-
},
912-
Value::Int16(v) => v1::Value {
913-
value_data: Some(ValueData::I16Value(v.into())),
914-
},
915-
Value::Int32(v) => v1::Value {
916-
value_data: Some(ValueData::I32Value(v)),
917-
},
918-
Value::Int64(v) => v1::Value {
919-
value_data: Some(ValueData::I64Value(v)),
920-
},
921-
Value::Float32(v) => v1::Value {
922-
value_data: Some(ValueData::F32Value(*v)),
923-
},
924-
Value::Float64(v) => v1::Value {
925-
value_data: Some(ValueData::F64Value(*v)),
926-
},
927-
Value::String(v) => v1::Value {
928-
value_data: Some(ValueData::StringValue(v.as_utf8().to_string())),
929-
},
930-
Value::Binary(v) => v1::Value {
931-
value_data: Some(ValueData::BinaryValue(v.to_vec())),
932-
},
933-
Value::Date(v) => v1::Value {
934-
value_data: Some(ValueData::DateValue(v.val())),
935-
},
936-
Value::Timestamp(v) => match v.unit() {
937-
TimeUnit::Second => v1::Value {
938-
value_data: Some(ValueData::TimestampSecondValue(v.value())),
939-
},
940-
TimeUnit::Millisecond => v1::Value {
941-
value_data: Some(ValueData::TimestampMillisecondValue(v.value())),
942-
},
943-
TimeUnit::Microsecond => v1::Value {
944-
value_data: Some(ValueData::TimestampMicrosecondValue(v.value())),
945-
},
946-
TimeUnit::Nanosecond => v1::Value {
947-
value_data: Some(ValueData::TimestampNanosecondValue(v.value())),
948-
},
949-
},
950-
Value::Time(v) => match v.unit() {
951-
TimeUnit::Second => v1::Value {
952-
value_data: Some(ValueData::TimeSecondValue(v.value())),
953-
},
954-
TimeUnit::Millisecond => v1::Value {
955-
value_data: Some(ValueData::TimeMillisecondValue(v.value())),
956-
},
957-
TimeUnit::Microsecond => v1::Value {
958-
value_data: Some(ValueData::TimeMicrosecondValue(v.value())),
959-
},
960-
TimeUnit::Nanosecond => v1::Value {
961-
value_data: Some(ValueData::TimeNanosecondValue(v.value())),
962-
},
963-
},
964-
Value::IntervalYearMonth(v) => v1::Value {
965-
value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())),
966-
},
967-
Value::IntervalDayTime(v) => v1::Value {
968-
value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())),
969-
},
970-
Value::IntervalMonthDayNano(v) => v1::Value {
971-
value_data: Some(ValueData::IntervalMonthDayNanoValue(
972-
convert_month_day_nano_to_pb(v),
973-
)),
974-
},
975-
Value::Decimal128(v) => v1::Value {
976-
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
977-
},
978-
Value::List(list_value) => v1::Value {
979-
value_data: Some(ValueData::ListValue(v1::ListValue {
980-
items: convert_list_to_pb_values(list_value),
981-
})),
982-
},
983-
Value::Struct(struct_value) => v1::Value {
984-
value_data: Some(ValueData::StructValue(v1::StructValue {
985-
items: convert_struct_to_pb_values(struct_value),
986-
})),
987-
},
988-
Value::Json(v) => v1::Value {
989-
value_data: Some(ValueData::JsonValue(encode_json_value(*v))),
990-
},
991-
Value::Duration(_) => v1::Value { value_data: None },
992-
}
993-
}
994-
995892
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
996893
fn helper(json: JsonVariant) -> v1::JsonValue {
997894
let value = match json {
@@ -1052,22 +949,6 @@ fn decode_json_value(value: &v1::JsonValue) -> JsonValueRef<'_> {
1052949
}
1053950
}
1054951

1055-
fn convert_list_to_pb_values(list_value: ListValue) -> Vec<v1::Value> {
1056-
list_value
1057-
.take_items()
1058-
.into_iter()
1059-
.map(to_proto_value)
1060-
.collect()
1061-
}
1062-
1063-
fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec<v1::Value> {
1064-
struct_value
1065-
.take_items()
1066-
.into_iter()
1067-
.map(to_proto_value)
1068-
.collect()
1069-
}
1070-
1071952
/// Returns the [ColumnDataTypeWrapper] of the value.
1072953
///
1073954
/// If value is null, returns `None`.
@@ -1269,6 +1150,7 @@ mod tests {
12691150
use common_time::interval::IntervalUnit;
12701151
use datatypes::scalars::ScalarVector;
12711152
use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type};
1153+
use datatypes::value::{ListValue, StructValue};
12721154
use datatypes::vectors::{
12731155
BooleanVector, DateVector, Float32Vector, PrimitiveVector, StringVector,
12741156
};
@@ -1872,7 +1754,7 @@ mod tests {
18721754
Arc::new(ConcreteDataType::boolean_datatype()),
18731755
));
18741756

1875-
let pb_value = to_proto_value(value);
1757+
let pb_value = value_to_grpc_value(value);
18761758

18771759
match pb_value.value_data.unwrap() {
18781760
ValueData::ListValue(pb_list_value) => {
@@ -1901,7 +1783,7 @@ mod tests {
19011783
.unwrap(),
19021784
);
19031785

1904-
let pb_value = to_proto_value(value);
1786+
let pb_value = value_to_grpc_value(value);
19051787

19061788
match pb_value.value_data.unwrap() {
19071789
ValueData::StructValue(pb_struct_value) => {

src/common/recordbatch/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,13 @@ pub enum Error {
188188
#[snafu(implicit)]
189189
location: Location,
190190
},
191+
192+
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
193+
AlignJsonArray {
194+
reason: String,
195+
#[snafu(implicit)]
196+
location: Location,
197+
},
191198
}
192199

193200
impl ErrorExt for Error {
@@ -203,7 +210,8 @@ impl ErrorExt for Error {
203210
| Error::ToArrowScalar { .. }
204211
| Error::ProjectArrowRecordBatch { .. }
205212
| Error::PhysicalExpr { .. }
206-
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
213+
| Error::RecordBatchSliceIndexOverflow { .. }
214+
| Error::AlignJsonArray { .. } => StatusCode::Internal,
207215

208216
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
209217

src/common/recordbatch/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod adapter;
1818
pub mod cursor;
1919
pub mod error;
2020
pub mod filter;
21-
mod recordbatch;
21+
pub mod recordbatch;
2222
pub mod util;
2323

2424
use std::fmt;

src/common/recordbatch/src/recordbatch.rs

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
2020
use datafusion_common::arrow::array::ArrayRef;
2121
use datafusion_common::arrow::compute;
2222
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23-
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
23+
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
24+
use datatypes::extension::json::is_json_extension_type;
2425
use datatypes::prelude::DataType;
2526
use datatypes::schema::SchemaRef;
2627
use datatypes::vectors::{Helper, VectorRef};
@@ -30,8 +31,8 @@ use snafu::{OptionExt, ResultExt, ensure};
3031

3132
use crate::DfRecordBatch;
3233
use crate::error::{
33-
self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
34-
Result,
34+
self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
35+
NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
3536
};
3637

3738
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -59,6 +60,8 @@ impl RecordBatch {
5960
// TODO(LFC): Remove the casting here once `Batch` is no longer used.
6061
let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
6162

63+
let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
64+
6265
let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
6366
.context(error::NewDfRecordBatchSnafu)?;
6467

@@ -327,6 +330,94 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
327330
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
328331
}
329332

333+
/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the
334+
/// "largest" json type after some insertions in the table schema, while the json array previously
335+
/// written in the SST could be lagged behind it. So it's important to "amend" the json array's
336+
/// missing fields with null arrays, to align the array's data type with the provided one.
337+
///
338+
/// # Panics
339+
///
340+
/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not
341+
/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how
342+
/// json array is physically stored.
343+
pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
344+
let json_type = json_array.data_type();
345+
if json_type == schema_type {
346+
return Ok(json_array.clone());
347+
}
348+
349+
let json_array = json_array.as_struct();
350+
let array_fields = json_array.fields();
351+
let array_columns = json_array.columns();
352+
let ArrowDataType::Struct(schema_fields) = schema_type else {
353+
unreachable!()
354+
};
355+
let mut aligned = Vec::with_capacity(schema_fields.len());
356+
357+
let mut i = 0; // point to the schema fields
358+
let mut j = 0; // point to the array fields
359+
while i < schema_fields.len() && j < array_fields.len() {
360+
let schema_field = &schema_fields[i];
361+
let array_field = &array_fields[j];
362+
if schema_field.name() == array_field.name() {
363+
if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
364+
// A `StructArray`s in a json array must be another json array. (Like a nested json
365+
// object in a json value.)
366+
aligned.push(align_json_array(
367+
&array_columns[j],
368+
schema_field.data_type(),
369+
)?);
370+
} else {
371+
aligned.push(array_columns[j].clone());
372+
}
373+
j += 1;
374+
} else {
375+
aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
376+
}
377+
i += 1;
378+
}
379+
if i < schema_fields.len() {
380+
for field in &schema_fields[i..] {
381+
aligned.push(new_null_array(field.data_type(), json_array.len()));
382+
}
383+
}
384+
ensure!(
385+
j == array_fields.len(),
386+
AlignJsonArraySnafu {
387+
reason: format!(
388+
"Json array {:?} has more fields than schema {:?}",
389+
json_array, schema_fields
390+
)
391+
}
392+
);
393+
394+
let json_array =
395+
StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
396+
.context(NewDfRecordBatchSnafu)?;
397+
Ok(Arc::new(json_array))
398+
}
399+
400+
fn maybe_align_json_array_with_schema(
401+
schema: &ArrowSchemaRef,
402+
arrays: Vec<ArrayRef>,
403+
) -> Result<Vec<ArrayRef>> {
404+
if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
405+
return Ok(arrays);
406+
}
407+
408+
let mut aligned = Vec::with_capacity(arrays.len());
409+
for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
410+
if !is_json_extension_type(field) {
411+
aligned.push(array);
412+
continue;
413+
}
414+
415+
let json_array = align_json_array(&array, field.data_type())?;
416+
aligned.push(json_array);
417+
}
418+
Ok(aligned)
419+
}
420+
330421
#[cfg(test)]
331422
mod tests {
332423
use std::sync::Arc;

src/common/sql/src/convert.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,15 @@ pub fn sql_value_to_value(
231231
}
232232
}
233233

234-
if value.data_type() != *data_type {
234+
let value_datatype = value.data_type();
235+
// The datatype of json value is determined by its actual data, so we can't simply "cast" it here.
236+
if value_datatype.is_json() || value_datatype == *data_type {
237+
Ok(value)
238+
} else {
235239
datatypes::types::cast(value, data_type).with_context(|_| InvalidCastSnafu {
236240
sql_value: sql_val.clone(),
237241
datatype: data_type,
238242
})
239-
} else {
240-
Ok(value)
241243
}
242244
}
243245

0 commit comments

Comments
 (0)