Skip to content

Commit 26c47c4

Browse files
feat: basic json read and write
Signed-off-by: luofucong <[email protected]>
1 parent 641659b commit 26c47c4

File tree

29 files changed

+618
-246
lines changed

29 files changed

+618
-246
lines changed

src/api/src/helper.rs

Lines changed: 13 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth
2323
use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant};
2424
use datatypes::prelude::{ConcreteDataType, ValueRef};
2525
use datatypes::types::{
26-
IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType,
27-
};
28-
use datatypes::value::{
29-
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
26+
IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType,
3027
};
28+
use datatypes::value::{ListValueRef, OrderedF32, OrderedF64, StructValueRef, Value};
3129
use datatypes::vectors::VectorRef;
3230
use greptime_proto::v1::column_data_type_extension::TypeExt;
3331
use greptime_proto::v1::ddl_request::Expr;
@@ -127,6 +125,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
127125
};
128126
ConcreteDataType::json_native_datatype(inner_type.into())
129127
}
128+
None => ConcreteDataType::Json(JsonType::empty()),
130129
_ => {
131130
// invalid state, type extension is missing or invalid
132131
ConcreteDataType::null_datatype()
@@ -405,19 +404,22 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
405404
JsonFormat::Jsonb => Some(ColumnDataTypeExtension {
406405
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
407406
}),
408-
JsonFormat::Native(inner) => {
407+
JsonFormat::Native(inner) => (!ConcreteDataType::from(inner.as_ref())
408+
.is_null())
409+
.then(|| {
409410
let inner_type = ColumnDataTypeWrapper::try_from(
410411
ConcreteDataType::from(inner.as_ref()),
411412
)?;
412-
Some(ColumnDataTypeExtension {
413+
Ok(ColumnDataTypeExtension {
413414
type_ext: Some(TypeExt::JsonNativeType(Box::new(
414415
JsonNativeTypeExtension {
415416
datatype: inner_type.datatype.into(),
416417
datatype_extension: inner_type.datatype_ext.map(Box::new),
417418
},
418419
))),
419420
})
420-
}
421+
})
422+
.transpose()?,
421423
}
422424
} else {
423425
None
@@ -824,116 +826,11 @@ pub fn is_column_type_value_eq(
824826
ColumnDataTypeWrapper::try_new(type_value, type_extension)
825827
.map(|wrapper| {
826828
let datatype = ConcreteDataType::from(wrapper);
827-
expect_type == &datatype
829+
expect_type.is_json() && datatype.is_json() || &datatype == expect_type
828830
})
829831
.unwrap_or(false)
830832
}
831833

832-
/// Convert value into proto's value.
833-
pub fn to_proto_value(value: Value) -> v1::Value {
834-
match value {
835-
Value::Null => v1::Value { value_data: None },
836-
Value::Boolean(v) => v1::Value {
837-
value_data: Some(ValueData::BoolValue(v)),
838-
},
839-
Value::UInt8(v) => v1::Value {
840-
value_data: Some(ValueData::U8Value(v.into())),
841-
},
842-
Value::UInt16(v) => v1::Value {
843-
value_data: Some(ValueData::U16Value(v.into())),
844-
},
845-
Value::UInt32(v) => v1::Value {
846-
value_data: Some(ValueData::U32Value(v)),
847-
},
848-
Value::UInt64(v) => v1::Value {
849-
value_data: Some(ValueData::U64Value(v)),
850-
},
851-
Value::Int8(v) => v1::Value {
852-
value_data: Some(ValueData::I8Value(v.into())),
853-
},
854-
Value::Int16(v) => v1::Value {
855-
value_data: Some(ValueData::I16Value(v.into())),
856-
},
857-
Value::Int32(v) => v1::Value {
858-
value_data: Some(ValueData::I32Value(v)),
859-
},
860-
Value::Int64(v) => v1::Value {
861-
value_data: Some(ValueData::I64Value(v)),
862-
},
863-
Value::Float32(v) => v1::Value {
864-
value_data: Some(ValueData::F32Value(*v)),
865-
},
866-
Value::Float64(v) => v1::Value {
867-
value_data: Some(ValueData::F64Value(*v)),
868-
},
869-
Value::String(v) => v1::Value {
870-
value_data: Some(ValueData::StringValue(v.as_utf8().to_string())),
871-
},
872-
Value::Binary(v) => v1::Value {
873-
value_data: Some(ValueData::BinaryValue(v.to_vec())),
874-
},
875-
Value::Date(v) => v1::Value {
876-
value_data: Some(ValueData::DateValue(v.val())),
877-
},
878-
Value::Timestamp(v) => match v.unit() {
879-
TimeUnit::Second => v1::Value {
880-
value_data: Some(ValueData::TimestampSecondValue(v.value())),
881-
},
882-
TimeUnit::Millisecond => v1::Value {
883-
value_data: Some(ValueData::TimestampMillisecondValue(v.value())),
884-
},
885-
TimeUnit::Microsecond => v1::Value {
886-
value_data: Some(ValueData::TimestampMicrosecondValue(v.value())),
887-
},
888-
TimeUnit::Nanosecond => v1::Value {
889-
value_data: Some(ValueData::TimestampNanosecondValue(v.value())),
890-
},
891-
},
892-
Value::Time(v) => match v.unit() {
893-
TimeUnit::Second => v1::Value {
894-
value_data: Some(ValueData::TimeSecondValue(v.value())),
895-
},
896-
TimeUnit::Millisecond => v1::Value {
897-
value_data: Some(ValueData::TimeMillisecondValue(v.value())),
898-
},
899-
TimeUnit::Microsecond => v1::Value {
900-
value_data: Some(ValueData::TimeMicrosecondValue(v.value())),
901-
},
902-
TimeUnit::Nanosecond => v1::Value {
903-
value_data: Some(ValueData::TimeNanosecondValue(v.value())),
904-
},
905-
},
906-
Value::IntervalYearMonth(v) => v1::Value {
907-
value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())),
908-
},
909-
Value::IntervalDayTime(v) => v1::Value {
910-
value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())),
911-
},
912-
Value::IntervalMonthDayNano(v) => v1::Value {
913-
value_data: Some(ValueData::IntervalMonthDayNanoValue(
914-
convert_month_day_nano_to_pb(v),
915-
)),
916-
},
917-
Value::Decimal128(v) => v1::Value {
918-
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
919-
},
920-
Value::List(list_value) => v1::Value {
921-
value_data: Some(ValueData::ListValue(v1::ListValue {
922-
items: convert_list_to_pb_values(list_value),
923-
})),
924-
},
925-
Value::Struct(struct_value) => v1::Value {
926-
value_data: Some(ValueData::StructValue(v1::StructValue {
927-
items: convert_struct_to_pb_values(struct_value),
928-
})),
929-
},
930-
Value::Json(v) => v1::Value {
931-
value_data: Some(ValueData::JsonValue(encode_json_value(*v))),
932-
},
933-
Value::Duration(_) => v1::Value { value_data: None },
934-
}
935-
}
936-
937834
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
938835
fn helper(json: JsonVariant) -> v1::JsonValue {
939836
let value = match json {
@@ -994,22 +891,6 @@ fn decode_json_value(value: &v1::JsonValue) -> JsonValueRef<'_> {
994891
}
995892
}
996893

997-
fn convert_list_to_pb_values(list_value: ListValue) -> Vec<v1::Value> {
998-
list_value
999-
.take_items()
1000-
.into_iter()
1001-
.map(to_proto_value)
1002-
.collect()
1003-
}
1004-
1005-
fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec<v1::Value> {
1006-
struct_value
1007-
.take_items()
1008-
.into_iter()
1009-
.map(to_proto_value)
1010-
.collect()
1011-
}
1012-
1013894
/// Returns the [ColumnDataTypeWrapper] of the value.
1014895
///
1015896
/// If value is null, returns `None`.
@@ -1211,6 +1092,7 @@ mod tests {
12111092
use common_time::interval::IntervalUnit;
12121093
use datatypes::scalars::ScalarVector;
12131094
use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type};
1095+
use datatypes::value::{ListValue, StructValue};
12141096
use datatypes::vectors::{
12151097
BooleanVector, DateVector, Float32Vector, PrimitiveVector, StringVector,
12161098
};
@@ -1788,7 +1670,7 @@ mod tests {
17881670
Arc::new(ConcreteDataType::boolean_datatype()),
17891671
));
17901672

1791-
let pb_value = to_proto_value(value);
1673+
let pb_value = value_to_grpc_value(value);
17921674

17931675
match pb_value.value_data.unwrap() {
17941676
ValueData::ListValue(pb_list_value) => {
@@ -1817,7 +1699,7 @@ mod tests {
18171699
.unwrap(),
18181700
);
18191701

1820-
let pb_value = to_proto_value(value);
1702+
let pb_value = value_to_grpc_value(value);
18211703

18221704
match pb_value.value_data.unwrap() {
18231705
ValueData::StructValue(pb_struct_value) => {

src/common/recordbatch/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,13 @@ pub enum Error {
188188
#[snafu(implicit)]
189189
location: Location,
190190
},
191+
192+
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
193+
AlignJsonArray {
194+
reason: String,
195+
#[snafu(implicit)]
196+
location: Location,
197+
},
191198
}
192199

193200
impl ErrorExt for Error {
@@ -203,7 +210,8 @@ impl ErrorExt for Error {
203210
| Error::ToArrowScalar { .. }
204211
| Error::ProjectArrowRecordBatch { .. }
205212
| Error::PhysicalExpr { .. }
206-
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
213+
| Error::RecordBatchSliceIndexOverflow { .. }
214+
| Error::AlignJsonArray { .. } => StatusCode::Internal,
207215

208216
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
209217

src/common/recordbatch/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod adapter;
1818
pub mod cursor;
1919
pub mod error;
2020
pub mod filter;
21-
mod recordbatch;
21+
pub mod recordbatch;
2222
pub mod util;
2323

2424
use std::fmt;

src/common/recordbatch/src/recordbatch.rs

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
2020
use datafusion_common::arrow::array::ArrayRef;
2121
use datafusion_common::arrow::compute;
2222
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23-
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
23+
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
24+
use datatypes::extension::json::is_json_extension_type;
2425
use datatypes::prelude::DataType;
2526
use datatypes::schema::SchemaRef;
2627
use datatypes::vectors::{Helper, VectorRef};
@@ -30,8 +31,8 @@ use snafu::{OptionExt, ResultExt, ensure};
3031

3132
use crate::DfRecordBatch;
3233
use crate::error::{
33-
self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
34-
Result,
34+
self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
35+
NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
3536
};
3637

3738
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -59,6 +60,8 @@ impl RecordBatch {
5960
// TODO(LFC): Remove the casting here once `Batch` is no longer used.
6061
let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
6162

63+
let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
64+
6265
let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
6366
.context(error::NewDfRecordBatchSnafu)?;
6467

@@ -327,6 +330,94 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
327330
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
328331
}
329332

333+
/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the
334+
/// "largest" json type after some insertions in the table schema, while the json array previously
335+
/// written in the SST could be lagged behind it. So it's important to "amend" the json array's
336+
/// missing fields with null arrays, to align the array's data type with the provided one.
337+
///
338+
/// # Panics
339+
///
340+
/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not
341+
/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how
342+
/// json array is physically stored.
343+
pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
344+
let json_type = json_array.data_type();
345+
if json_type == schema_type {
346+
return Ok(json_array.clone());
347+
}
348+
349+
let json_array = json_array.as_struct();
350+
let array_fields = json_array.fields();
351+
let array_columns = json_array.columns();
352+
let ArrowDataType::Struct(schema_fields) = schema_type else {
353+
unreachable!()
354+
};
355+
let mut aligned = Vec::with_capacity(schema_fields.len());
356+
357+
let mut i = 0; // point to the schema fields
358+
let mut j = 0; // point to the array fields
359+
while i < schema_fields.len() && j < array_fields.len() {
360+
let schema_field = &schema_fields[i];
361+
let array_field = &array_fields[j];
362+
if schema_field.name() == array_field.name() {
363+
if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
364+
// A `StructArray`s in a json array must be another json array. (Like a nested json
365+
// object in a json value.)
366+
aligned.push(align_json_array(
367+
&array_columns[j],
368+
schema_field.data_type(),
369+
)?);
370+
} else {
371+
aligned.push(array_columns[j].clone());
372+
}
373+
j += 1;
374+
} else {
375+
aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
376+
}
377+
i += 1;
378+
}
379+
if i < schema_fields.len() {
380+
for field in &schema_fields[i..] {
381+
aligned.push(new_null_array(field.data_type(), json_array.len()));
382+
}
383+
}
384+
ensure!(
385+
j == array_fields.len(),
386+
AlignJsonArraySnafu {
387+
reason: format!(
388+
"Json array {:?} has more fields than schema {:?}",
389+
json_array, schema_fields
390+
)
391+
}
392+
);
393+
394+
let json_array =
395+
StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
396+
.context(NewDfRecordBatchSnafu)?;
397+
Ok(Arc::new(json_array))
398+
}
399+
400+
fn maybe_align_json_array_with_schema(
401+
schema: &ArrowSchemaRef,
402+
arrays: Vec<ArrayRef>,
403+
) -> Result<Vec<ArrayRef>> {
404+
if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
405+
return Ok(arrays);
406+
}
407+
408+
let mut aligned = Vec::with_capacity(arrays.len());
409+
for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
410+
if !is_json_extension_type(field) {
411+
aligned.push(array);
412+
continue;
413+
}
414+
415+
let json_array = align_json_array(&array, field.data_type())?;
416+
aligned.push(json_array);
417+
}
418+
Ok(aligned)
419+
}
420+
330421
#[cfg(test)]
331422
mod tests {
332423
use std::sync::Arc;

src/common/sql/src/convert.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,15 @@ pub fn sql_value_to_value(
231231
}
232232
}
233233

234-
if value.data_type() != *data_type {
234+
let value_datatype = value.data_type();
235+
// The datatype of json value is determined by its actual data, so we can't simply "cast" it here.
236+
if value_datatype.is_json() || value_datatype == *data_type {
237+
Ok(value)
238+
} else {
235239
datatypes::types::cast(value, data_type).with_context(|_| InvalidCastSnafu {
236240
sql_value: sql_val.clone(),
237241
datatype: data_type,
238242
})
239-
} else {
240-
Ok(value)
241243
}
242244
}
243245

0 commit comments

Comments
 (0)