diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 7508358552c4..f53f3f162b16 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -23,11 +23,9 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant}; use datatypes::prelude::{ConcreteDataType, ValueRef}; use datatypes::types::{ - IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType, -}; -use datatypes::value::{ - ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value, + IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType, }; +use datatypes::value::{ListValueRef, OrderedF32, OrderedF64, StructValueRef, Value}; use datatypes::vectors::VectorRef; use greptime_proto::v1::column_data_type_extension::TypeExt; use greptime_proto::v1::ddl_request::Expr; @@ -82,6 +80,10 @@ impl ColumnDataTypeWrapper { pub fn to_parts(&self) -> (ColumnDataType, Option) { (self.datatype, self.datatype_ext.clone()) } + + pub fn into_parts(self) -> (ColumnDataType, Option) { + (self.datatype, self.datatype_ext) + } } impl From for ConcreteDataType { @@ -127,6 +129,7 @@ impl From for ConcreteDataType { }; ConcreteDataType::json_native_datatype(inner_type.into()) } + None => ConcreteDataType::Json(JsonType::null()), _ => { // invalid state, type extension is missing or invalid ConcreteDataType::null_datatype() @@ -441,18 +444,22 @@ impl TryFrom for ColumnDataTypeWrapper { JsonFormat::Jsonb => Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), }), - JsonFormat::Native(inner) => { - let inner_type = ColumnDataTypeWrapper::try_from( - ConcreteDataType::from(inner.as_ref()), - )?; - Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonNativeType(Box::new( - JsonNativeTypeExtension { - datatype: inner_type.datatype.into(), - datatype_extension: inner_type.datatype_ext.map(Box::new), - }, - ))), - }) + JsonFormat::Native(native_type) => { + if native_type.is_null() { + None + } else { + let native_type = ConcreteDataType::from(native_type.as_ref()); + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(native_type)?.into_parts(); + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonNativeType(Box::new( + JsonNativeTypeExtension { + datatype: datatype as i32, + datatype_extension: datatype_extension.map(Box::new), + }, + ))), + }) + } } } } else { @@ -887,111 +894,6 @@ pub fn is_column_type_value_eq( .unwrap_or(false) } -/// Convert value into proto's value. -pub fn to_proto_value(value: Value) -> v1::Value { - match value { - Value::Null => v1::Value { value_data: None }, - Value::Boolean(v) => v1::Value { - value_data: Some(ValueData::BoolValue(v)), - }, - Value::UInt8(v) => v1::Value { - value_data: Some(ValueData::U8Value(v.into())), - }, - Value::UInt16(v) => v1::Value { - value_data: Some(ValueData::U16Value(v.into())), - }, - Value::UInt32(v) => v1::Value { - value_data: Some(ValueData::U32Value(v)), - }, - Value::UInt64(v) => v1::Value { - value_data: Some(ValueData::U64Value(v)), - }, - Value::Int8(v) => v1::Value { - value_data: Some(ValueData::I8Value(v.into())), - }, - Value::Int16(v) => v1::Value { - value_data: Some(ValueData::I16Value(v.into())), - }, - Value::Int32(v) => v1::Value { - value_data: Some(ValueData::I32Value(v)), - }, - Value::Int64(v) => v1::Value { - value_data: Some(ValueData::I64Value(v)), - }, - Value::Float32(v) => v1::Value { - value_data: Some(ValueData::F32Value(*v)), - }, - Value::Float64(v) => v1::Value { - value_data: Some(ValueData::F64Value(*v)), - }, - Value::String(v) => v1::Value { - value_data: Some(ValueData::StringValue(v.as_utf8().to_string())), - }, - Value::Binary(v) => v1::Value { - value_data: Some(ValueData::BinaryValue(v.to_vec())), - }, - Value::Date(v) => v1::Value { - value_data: Some(ValueData::DateValue(v.val())), - }, - Value::Timestamp(v) => match v.unit() { - TimeUnit::Second => v1::Value { - value_data: Some(ValueData::TimestampSecondValue(v.value())), - }, - TimeUnit::Millisecond => v1::Value { - value_data: Some(ValueData::TimestampMillisecondValue(v.value())), - }, - TimeUnit::Microsecond => v1::Value { - value_data: Some(ValueData::TimestampMicrosecondValue(v.value())), - }, - TimeUnit::Nanosecond => v1::Value { - value_data: Some(ValueData::TimestampNanosecondValue(v.value())), - }, - }, - Value::Time(v) => match v.unit() { - TimeUnit::Second => v1::Value { - value_data: Some(ValueData::TimeSecondValue(v.value())), - }, - TimeUnit::Millisecond => v1::Value { - value_data: Some(ValueData::TimeMillisecondValue(v.value())), - }, - TimeUnit::Microsecond => v1::Value { - value_data: Some(ValueData::TimeMicrosecondValue(v.value())), - }, - TimeUnit::Nanosecond => v1::Value { - value_data: Some(ValueData::TimeNanosecondValue(v.value())), - }, - }, - Value::IntervalYearMonth(v) => v1::Value { - value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())), - }, - Value::IntervalDayTime(v) => v1::Value { - value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())), - }, - Value::IntervalMonthDayNano(v) => v1::Value { - value_data: Some(ValueData::IntervalMonthDayNanoValue( - convert_month_day_nano_to_pb(v), - )), - }, - Value::Decimal128(v) => v1::Value { - value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))), - }, - Value::List(list_value) => v1::Value { - value_data: Some(ValueData::ListValue(v1::ListValue { - items: convert_list_to_pb_values(list_value), - })), - }, - Value::Struct(struct_value) => v1::Value { - value_data: Some(ValueData::StructValue(v1::StructValue { - items: convert_struct_to_pb_values(struct_value), - })), - }, - Value::Json(v) => v1::Value { - value_data: Some(ValueData::JsonValue(encode_json_value(*v))), - }, - Value::Duration(_) => v1::Value { value_data: None }, - } -} - fn encode_json_value(value: JsonValue) -> v1::JsonValue { fn helper(json: JsonVariant) -> v1::JsonValue { let value = match json { @@ -1052,22 +954,6 @@ fn decode_json_value(value: &v1::JsonValue) -> JsonValueRef<'_> { } } -fn convert_list_to_pb_values(list_value: ListValue) -> Vec { - list_value - .take_items() - .into_iter() - .map(to_proto_value) - .collect() -} - -fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec { - struct_value - .take_items() - .into_iter() - .map(to_proto_value) - .collect() -} - /// Returns the [ColumnDataTypeWrapper] of the value. /// /// If value is null, returns `None`. @@ -1114,14 +1000,14 @@ pub fn vectors_to_rows<'a>( let mut rows = vec![Row { values: vec![] }; row_count]; for column in columns { for (row_index, row) in rows.iter_mut().enumerate() { - row.values.push(value_to_grpc_value(column.get(row_index))) + row.values.push(to_grpc_value(column.get(row_index))) } } rows } -pub fn value_to_grpc_value(value: Value) -> GrpcValue { +pub fn to_grpc_value(value: Value) -> GrpcValue { GrpcValue { value_data: match value { Value::Null => None, @@ -1161,7 +1047,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { let items = list_value .take_items() .into_iter() - .map(value_to_grpc_value) + .map(to_grpc_value) .collect(); Some(ValueData::ListValue(v1::ListValue { items })) } @@ -1169,7 +1055,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { let items = struct_value .take_items() .into_iter() - .map(value_to_grpc_value) + .map(to_grpc_value) .collect(); Some(ValueData::StructValue(v1::StructValue { items })) } @@ -1269,6 +1155,7 @@ mod tests { use common_time::interval::IntervalUnit; use datatypes::scalars::ScalarVector; use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type}; + use datatypes::value::{ListValue, StructValue}; use datatypes::vectors::{ BooleanVector, DateVector, Float32Vector, PrimitiveVector, StringVector, }; @@ -1872,7 +1759,7 @@ mod tests { Arc::new(ConcreteDataType::boolean_datatype()), )); - let pb_value = to_proto_value(value); + let pb_value = to_grpc_value(value); match pb_value.value_data.unwrap() { ValueData::ListValue(pb_list_value) => { @@ -1901,7 +1788,7 @@ mod tests { .unwrap(), ); - let pb_value = to_proto_value(value); + let pb_value = to_grpc_value(value); match pb_value.value_data.unwrap() { ValueData::StructValue(pb_struct_value) => { diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 00d4291ead29..6d794463a03f 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -188,6 +188,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to align JSON array, reason: {reason}"))] + AlignJsonArray { + reason: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -203,7 +210,8 @@ impl ErrorExt for Error { | Error::ToArrowScalar { .. } | Error::ProjectArrowRecordBatch { .. } | Error::PhysicalExpr { .. } - | Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal, + | Error::RecordBatchSliceIndexOverflow { .. } + | Error::AlignJsonArray { .. } => StatusCode::Internal, Error::PollStream { .. } => StatusCode::EngineExecuteQuery, diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 4804e9040edb..c1253cfa1c0f 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -18,7 +18,7 @@ pub mod adapter; pub mod cursor; pub mod error; pub mod filter; -mod recordbatch; +pub mod recordbatch; pub mod util; use std::fmt; diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index f04fd49dd2f8..a9dd663c2c0e 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -20,7 +20,8 @@ use datafusion::arrow::util::pretty::pretty_format_batches; use datafusion_common::arrow::array::ArrayRef; use datafusion_common::arrow::compute; use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}; -use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions}; +use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array}; +use datatypes::extension::json::is_json_extension_type; use datatypes::prelude::DataType; use datatypes::schema::SchemaRef; use datatypes::vectors::{Helper, VectorRef}; @@ -30,8 +31,8 @@ use snafu::{OptionExt, ResultExt, ensure}; use crate::DfRecordBatch; use crate::error::{ - self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu, - Result, + self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, + NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result, }; /// A two-dimensional batch of column-oriented data with a defined schema. @@ -59,6 +60,8 @@ impl RecordBatch { // TODO(LFC): Remove the casting here once `Batch` is no longer used. let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?; + let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?; + let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays) .context(error::NewDfRecordBatchSnafu)?; @@ -327,12 +330,111 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul Ok(RecordBatch::from_df_record_batch(schema, record_batch)) } +/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the +/// "largest" json type after some insertions in the table schema, while the json array previously +/// written in the SST could be lagged behind it. So it's important to "amend" the json array's +/// missing fields with null arrays, to align the array's data type with the provided one. +/// +/// # Panics +/// +/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not +/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how +/// json array is physically stored. +pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result { + let json_type = json_array.data_type(); + if json_type == schema_type { + return Ok(json_array.clone()); + } + + let json_array = json_array.as_struct(); + let array_fields = json_array.fields(); + let array_columns = json_array.columns(); + let ArrowDataType::Struct(schema_fields) = schema_type else { + unreachable!() + }; + let mut aligned = Vec::with_capacity(schema_fields.len()); + + // Compare the fields in the json array and the to-be-aligned schema, amending with null arrays + // on the way. It's very important to note that fields in the json array and in the json type + // are both SORTED. + + let mut i = 0; // point to the schema fields + let mut j = 0; // point to the array fields + while i < schema_fields.len() && j < array_fields.len() { + let schema_field = &schema_fields[i]; + let array_field = &array_fields[j]; + if schema_field.name() == array_field.name() { + if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) { + // A `StructArray`s in a json array must be another json array. (Like a nested json + // object in a json value.) + aligned.push(align_json_array( + &array_columns[j], + schema_field.data_type(), + )?); + } else { + aligned.push(array_columns[j].clone()); + } + j += 1; + } else { + aligned.push(new_null_array(schema_field.data_type(), json_array.len())); + } + i += 1; + } + if i < schema_fields.len() { + for field in &schema_fields[i..] { + aligned.push(new_null_array(field.data_type(), json_array.len())); + } + } + ensure!( + j == array_fields.len(), + AlignJsonArraySnafu { + reason: format!( + "this json array has more fields {:?}", + array_fields[j..] + .iter() + .map(|x| x.name()) + .collect::>(), + ) + } + ); + + let json_array = + StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned()) + .context(NewDfRecordBatchSnafu)?; + Ok(Arc::new(json_array)) +} + +fn maybe_align_json_array_with_schema( + schema: &ArrowSchemaRef, + arrays: Vec, +) -> Result> { + if schema.fields().iter().all(|f| !is_json_extension_type(f)) { + return Ok(arrays); + } + + let mut aligned = Vec::with_capacity(arrays.len()); + for (field, array) in schema.fields().iter().zip(arrays.into_iter()) { + if !is_json_extension_type(field) { + aligned.push(array); + continue; + } + + let json_array = align_json_array(&array, field.data_type())?; + aligned.push(json_array); + } + Ok(aligned) +} + #[cfg(test)] mod tests { use std::sync::Arc; - use datatypes::arrow::array::{AsArray, UInt32Array}; - use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type}; + use datatypes::arrow::array::{ + AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array, + }; + use datatypes::arrow::datatypes::{ + DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type, + }; use datatypes::arrow_array::StringArray; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; @@ -340,6 +442,165 @@ mod tests { use super::*; + #[test] + fn test_align_json_array() -> Result<()> { + struct TestCase { + json_array: ArrayRef, + schema_type: DataType, + expected: std::result::Result, + } + + impl TestCase { + fn new( + json_array: StructArray, + schema_type: Fields, + expected: std::result::Result, String>, + ) -> Self { + Self { + json_array: Arc::new(json_array), + schema_type: DataType::Struct(schema_type.clone()), + expected: expected + .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef), + } + } + + fn test(self) -> Result<()> { + let result = align_json_array(&self.json_array, &self.schema_type); + match (result, self.expected) { + (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected), + (Ok(json_array), Err(e)) => { + panic!("expecting error {e} but actually get: {json_array:?}") + } + (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected), + (Err(e), Ok(_)) => return Err(e), + } + Ok(()) + } + } + + // Test empty json array can be aligned with a complex json type. + TestCase::new( + StructArray::new_empty_fields(2, None), + Fields::from(vec![ + Field::new("int", DataType::Int64, true), + Field::new_struct( + "nested", + vec![Field::new("bool", DataType::Boolean, true)], + true, + ), + Field::new("string", DataType::Utf8, true), + ]), + Ok(vec![ + Arc::new(Int64Array::new_null(2)) as ArrayRef, + Arc::new(StructArray::new_null( + Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]), + 2, + )), + Arc::new(StringArray::new_null(2)), + ]), + ) + .test()?; + + // Test simple json array alignment. + TestCase::new( + StructArray::from(vec![( + Arc::new(Field::new("float", DataType::Float64, true)), + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + )]), + Fields::from(vec![ + Field::new("float", DataType::Float64, true), + Field::new("string", DataType::Utf8, true), + ]), + Ok(vec![ + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + Arc::new(StringArray::new_null(3)), + ]), + ) + .test()?; + + // Test complex json array alignment. + TestCase::new( + StructArray::from(vec![ + ( + Arc::new(Field::new_list( + "list", + Field::new_list_field(DataType::Int64, true), + true, + )), + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1)]), + None, + Some(vec![Some(2), Some(3)]), + ])) as ArrayRef, + ), + ( + Arc::new(Field::new_struct( + "nested", + vec![Field::new("int", DataType::Int64, true)], + true, + )), + Arc::new(StructArray::from(vec![( + Arc::new(Field::new("int", DataType::Int64, true)), + Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef, + )])), + ), + ( + Arc::new(Field::new("string", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ), + ]), + Fields::from(vec![ + Field::new("bool", DataType::Boolean, true), + Field::new_list("list", Field::new_list_field(DataType::Int64, true), true), + Field::new_struct( + "nested", + vec![ + Field::new("float", DataType::Float64, true), + Field::new("int", DataType::Int64, true), + ], + true, + ), + Field::new("string", DataType::Utf8, true), + ]), + Ok(vec![ + Arc::new(BooleanArray::new_null(3)) as ArrayRef, + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1)]), + None, + Some(vec![Some(2), Some(3)]), + ])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("float", DataType::Float64, true)), + Arc::new(Float64Array::new_null(3)) as ArrayRef, + ), + ( + Arc::new(Field::new("int", DataType::Int64, true)), + Arc::new(Int64Array::from(vec![-1, -2, -3])), + ), + ])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ]), + ) + .test()?; + + // Test align failed. + TestCase::new( + StructArray::try_from(vec![ + ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef), + ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef), + ]) + .unwrap(), + Fields::from(vec![Field::new("i", DataType::Int64, true)]), + Err( + r#"Failed to align JSON array, reason: this json array has more fields ["j"]"# + .to_string(), + ), + ) + .test()?; + Ok(()) + } + #[test] fn test_record_batch() { let arrow_schema = Arc::new(ArrowSchema::new(vec![ diff --git a/src/common/sql/src/convert.rs b/src/common/sql/src/convert.rs index 487e84abb809..edb793baf6f7 100644 --- a/src/common/sql/src/convert.rs +++ b/src/common/sql/src/convert.rs @@ -231,13 +231,15 @@ pub fn sql_value_to_value( } } - if value.data_type() != *data_type { + let value_datatype = value.data_type(); + // The datatype of json value is determined by its actual data, so we can't simply "cast" it here. + if value_datatype.is_json() || value_datatype == *data_type { + Ok(value) + } else { datatypes::types::cast(value, data_type).with_context(|_| InvalidCastSnafu { sql_value: sql_val.clone(), datatype: data_type, }) - } else { - Ok(value) } } diff --git a/src/common/sql/src/default_constraint.rs b/src/common/sql/src/default_constraint.rs index 0366f9aec3c3..e2a57337a53b 100644 --- a/src/common/sql/src/default_constraint.rs +++ b/src/common/sql/src/default_constraint.rs @@ -16,6 +16,7 @@ use common_time::timezone::Timezone; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnDefaultConstraint; use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN}; +use snafu::ensure; use sqlparser::ast::ValueWithSpan; pub use sqlparser::ast::{ BinaryOperator, ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Function, @@ -37,6 +38,14 @@ pub fn parse_column_default_constraint( .iter() .find(|o| matches!(o.option, ColumnOption::Default(_))) { + ensure!( + !data_type.is_json(), + UnsupportedDefaultValueSnafu { + column_name, + reason: "json column cannot have a default value", + } + ); + let default_constraint = match &opt.option { ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value( sql_value_to_value(column_name, data_type, &v.value, timezone, None, false)?, @@ -82,7 +91,7 @@ pub fn parse_column_default_constraint( } else { return UnsupportedDefaultValueSnafu { column_name, - expr: *expr.clone(), + reason: format!("expr '{expr}' not supported"), } .fail(); } @@ -90,14 +99,14 @@ pub fn parse_column_default_constraint( ColumnOption::Default(others) => { return UnsupportedDefaultValueSnafu { column_name, - expr: others.clone(), + reason: format!("expr '{others}' not supported"), } .fail(); } _ => { return UnsupportedDefaultValueSnafu { column_name, - expr: Expr::Value(SqlValue::Null.into()), + reason: format!("option '{}' not supported", opt.option), } .fail(); } diff --git a/src/common/sql/src/error.rs b/src/common/sql/src/error.rs index b777b5410352..ed23df0cc197 100644 --- a/src/common/sql/src/error.rs +++ b/src/common/sql/src/error.rs @@ -55,13 +55,11 @@ pub enum Error { }, #[snafu(display( - "Unsupported expr in default constraint: {} for column: {}", - expr, - column_name + "Unsupported default constraint for column: '{column_name}', reason: {reason}" ))] UnsupportedDefaultValue { column_name: String, - expr: Expr, + reason: String, #[snafu(implicit)] location: Location, }, diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 78fa1f16bb2d..25fd095a9f97 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -15,7 +15,6 @@ use std::fmt; use std::sync::Arc; -use arrow::compute::cast as arrow_array_cast; use arrow::datatypes::{ DataType as ArrowDataType, IntervalUnit as ArrowIntervalUnit, TimeUnit as ArrowTimeUnit, }; @@ -368,8 +367,10 @@ impl ConcreteDataType { /// Checks if the data type can cast to another data type. pub fn can_arrow_type_cast_to(&self, to_type: &ConcreteDataType) -> bool { - let array = arrow_array::new_empty_array(&self.as_arrow_type()); - arrow_array_cast(array.as_ref(), &to_type.as_arrow_type()).is_ok() + match (self, to_type) { + (ConcreteDataType::Json(this), ConcreteDataType::Json(that)) => that.is_include(this), + _ => arrow::compute::can_cast_types(&self.as_arrow_type(), &to_type.as_arrow_type()), + } } /// Try to cast data type as a [`DurationType`]. diff --git a/src/datatypes/src/extension/json.rs b/src/datatypes/src/extension/json.rs index bd3bd9471232..abc75bb35b5b 100644 --- a/src/datatypes/src/extension/json.rs +++ b/src/datatypes/src/extension/json.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use arrow_schema::extension::ExtensionType; -use arrow_schema::{ArrowError, DataType}; +use arrow_schema::{ArrowError, DataType, FieldRef}; use serde::{Deserialize, Serialize}; use crate::json::JsonStructureSettings; @@ -102,3 +102,8 @@ impl ExtensionType for JsonExtensionType { Ok(json) } } + +/// Check if this field is to be treated as json extension type. +pub fn is_json_extension_type(field: &FieldRef) -> bool { + field.extension_type_name() == Some(JsonExtensionType::NAME) +} diff --git a/src/datatypes/src/json/value.rs b/src/datatypes/src/json/value.rs index 30e1573de336..acff194e122a 100644 --- a/src/datatypes/src/json/value.rs +++ b/src/datatypes/src/json/value.rs @@ -260,7 +260,7 @@ impl JsonValue { ConcreteDataType::Json(self.json_type().clone()) } - pub(crate) fn json_type(&self) -> &JsonType { + pub fn json_type(&self) -> &JsonType { self.json_type.get_or_init(|| self.json_variant.json_type()) } @@ -268,6 +268,14 @@ impl JsonValue { matches!(self.json_variant, JsonVariant::Null) } + /// Check if this JSON value is an empty object. + pub fn is_empty_object(&self) -> bool { + match &self.json_variant { + JsonVariant::Object(object) => object.is_empty(), + _ => false, + } + } + pub(crate) fn as_i64(&self) -> Option { match self.json_variant { JsonVariant::Number(n) => n.as_i64(), diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 6bdf321137ee..812b3c3b2255 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -273,8 +273,9 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result { _ => None, }; if let Some(extype) = extype { - let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]); - field = field.with_metadata(metadata); + field + .metadata_mut() + .insert(TYPE_KEY.to_string(), extype.to_string()); } fields.push(field); ensure!( diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index 61366079fb4f..597bbb673b8b 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -20,7 +20,7 @@ mod decimal_type; mod dictionary_type; mod duration_type; mod interval_type; -pub(crate) mod json_type; +pub mod json_type; mod list_type; mod null_type; mod primitive_type; diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 77bfa35a777d..4c838b78d126 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -18,7 +18,6 @@ use std::str::FromStr; use std::sync::Arc; use arrow::datatypes::DataType as ArrowDataType; -use arrow_schema::Fields; use common_base::bytes::Bytes; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -36,7 +35,7 @@ use crate::vectors::json::builder::JsonVectorBuilder; use crate::vectors::{BinaryVectorBuilder, MutableVector}; pub const JSON_TYPE_NAME: &str = "Json"; -const JSON_PLAIN_FIELD_NAME: &str = "__plain__"; +const JSON_PLAIN_FIELD_NAME: &str = "__json_plain__"; const JSON_PLAIN_FIELD_METADATA_KEY: &str = "is_plain_json"; pub type JsonObjectType = BTreeMap; @@ -59,6 +58,10 @@ pub enum JsonNativeType { } impl JsonNativeType { + pub fn is_null(&self) -> bool { + matches!(self, JsonNativeType::Null) + } + pub fn u64() -> Self { Self::Number(JsonNumberType::U64) } @@ -187,7 +190,7 @@ impl JsonType { } } - pub(crate) fn empty() -> Self { + pub fn null() -> Self { Self { format: JsonFormat::Native(Box::new(JsonNativeType::Null)), } @@ -208,7 +211,7 @@ impl JsonType { } /// Try to merge this json type with others, error on datatype conflict. - pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> { + pub fn merge(&mut self, other: &JsonType) -> Result<()> { match (&self.format, &other.format) { (JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()), (JsonFormat::Native(this), JsonFormat::Native(that)) => { @@ -223,7 +226,8 @@ impl JsonType { } } - pub(crate) fn is_mergeable(&self, other: &JsonType) -> bool { + /// Check if it can merge with `other` json type. + pub fn is_mergeable(&self, other: &JsonType) -> bool { match (&self.format, &other.format) { (JsonFormat::Jsonb, JsonFormat::Jsonb) => true, (JsonFormat::Native(this), JsonFormat::Native(that)) => { @@ -232,6 +236,43 @@ impl JsonType { _ => false, } } + + /// Check if it includes all fields in `other` json type. + pub fn is_include(&self, other: &JsonType) -> bool { + match (&self.format, &other.format) { + (JsonFormat::Jsonb, JsonFormat::Jsonb) => true, + (JsonFormat::Native(this), JsonFormat::Native(that)) => { + is_include(this.as_ref(), that.as_ref()) + } + _ => false, + } + } +} + +fn is_include(this: &JsonNativeType, that: &JsonNativeType) -> bool { + fn is_include_object(this: &JsonObjectType, that: &JsonObjectType) -> bool { + for (type_name, that_type) in that { + let Some(this_type) = this.get(type_name) else { + return false; + }; + if !is_include(this_type, that_type) { + return false; + } + } + true + } + + match (this, that) { + (this, that) if this == that => true, + (JsonNativeType::Array(this), JsonNativeType::Array(that)) => { + is_include(this.as_ref(), that.as_ref()) + } + (JsonNativeType::Object(this), JsonNativeType::Object(that)) => { + is_include_object(this, that) + } + (_, JsonNativeType::Null) => true, + _ => false, + } } /// A special struct type for denoting "plain"(not object) json value. It has only one field, with @@ -317,14 +358,14 @@ impl DataType for JsonType { fn as_arrow_type(&self) -> ArrowDataType { match self.format { JsonFormat::Jsonb => ArrowDataType::Binary, - JsonFormat::Native(_) => ArrowDataType::Struct(Fields::empty()), + JsonFormat::Native(_) => self.as_struct_type().as_arrow_type(), } } fn create_mutable_vector(&self, capacity: usize) -> Box { - match self.format { + match &self.format { JsonFormat::Jsonb => Box::new(BinaryVectorBuilder::with_capacity(capacity)), - JsonFormat::Native(_) => Box::new(JsonVectorBuilder::with_capacity(capacity)), + JsonFormat::Native(x) => Box::new(JsonVectorBuilder::new(*x.clone(), capacity)), } } @@ -336,6 +377,12 @@ impl DataType for JsonType { } } +impl Display for JsonType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + /// Converts a json type value to string pub fn jsonb_to_string(val: &[u8]) -> Result { match jsonb::from_slice(val) { @@ -366,6 +413,204 @@ mod tests { use super::*; use crate::json::JsonStructureSettings; + #[test] + fn test_json_type_include() { + fn test(this: &JsonNativeType, that: &JsonNativeType, expected: bool) { + assert_eq!(is_include(this, that), expected); + } + + test(&JsonNativeType::Null, &JsonNativeType::Null, true); + test(&JsonNativeType::Null, &JsonNativeType::Bool, false); + + test(&JsonNativeType::Bool, &JsonNativeType::Null, true); + test(&JsonNativeType::Bool, &JsonNativeType::Bool, true); + test(&JsonNativeType::Bool, &JsonNativeType::u64(), false); + + test(&JsonNativeType::u64(), &JsonNativeType::Null, true); + test(&JsonNativeType::u64(), &JsonNativeType::u64(), true); + test(&JsonNativeType::u64(), &JsonNativeType::String, false); + + test(&JsonNativeType::String, &JsonNativeType::Null, true); + test(&JsonNativeType::String, &JsonNativeType::String, true); + test( + &JsonNativeType::String, + &JsonNativeType::Array(Box::new(JsonNativeType::f64())), + false, + ); + + test( + &JsonNativeType::Array(Box::new(JsonNativeType::f64())), + &JsonNativeType::Null, + true, + ); + test( + &JsonNativeType::Array(Box::new(JsonNativeType::f64())), + &JsonNativeType::Array(Box::new(JsonNativeType::Null)), + true, + ); + test( + &JsonNativeType::Array(Box::new(JsonNativeType::f64())), + &JsonNativeType::Array(Box::new(JsonNativeType::f64())), + true, + ); + test( + &JsonNativeType::Array(Box::new(JsonNativeType::f64())), + &JsonNativeType::String, + false, + ); + test( + &JsonNativeType::Array(Box::new(JsonNativeType::f64())), + &JsonNativeType::Object(JsonObjectType::new()), + false, + ); + + let simple_json_object = &JsonNativeType::Object(JsonObjectType::from([( + "foo".to_string(), + JsonNativeType::String, + )])); + test(simple_json_object, &JsonNativeType::Null, true); + test(simple_json_object, simple_json_object, true); + test(simple_json_object, &JsonNativeType::i64(), false); + test( + simple_json_object, + &JsonNativeType::Object(JsonObjectType::from([( + "bar".to_string(), + JsonNativeType::i64(), + )])), + false, + ); + + let complex_json_object = &JsonNativeType::Object(JsonObjectType::from([ + ( + "nested".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "b".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "c".to_string(), + JsonNativeType::String, + )])), + )])), + )])), + ), + ("bar".to_string(), JsonNativeType::i64()), + ])); + test(complex_json_object, &JsonNativeType::Null, true); + test(complex_json_object, &JsonNativeType::String, false); + test(complex_json_object, complex_json_object, true); + test( + complex_json_object, + &JsonNativeType::Object(JsonObjectType::from([( + "bar".to_string(), + JsonNativeType::i64(), + )])), + true, + ); + test( + complex_json_object, + &JsonNativeType::Object(JsonObjectType::from([ + ( + "nested".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::Null, + )])), + ), + ("bar".to_string(), JsonNativeType::i64()), + ])), + true, + ); + test( + complex_json_object, + &JsonNativeType::Object(JsonObjectType::from([ + ( + "nested".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::String, + )])), + ), + ("bar".to_string(), JsonNativeType::i64()), + ])), + false, + ); + test( + complex_json_object, + &JsonNativeType::Object(JsonObjectType::from([ + ( + "nested".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "b".to_string(), + JsonNativeType::String, + )])), + )])), + ), + ("bar".to_string(), JsonNativeType::i64()), + ])), + false, + ); + test( + complex_json_object, + &JsonNativeType::Object(JsonObjectType::from([ + ( + "nested".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "b".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "c".to_string(), + JsonNativeType::Null, + )])), + )])), + )])), + ), + ("bar".to_string(), JsonNativeType::i64()), + ])), + true, + ); + test( + complex_json_object, + &JsonNativeType::Object(JsonObjectType::from([ + ( + "nested".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "b".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "c".to_string(), + JsonNativeType::Bool, + )])), + )])), + )])), + ), + ("bar".to_string(), JsonNativeType::i64()), + ])), + false, + ); + test( + complex_json_object, + &JsonNativeType::Object(JsonObjectType::from([( + "nested".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "b".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "c".to_string(), + JsonNativeType::String, + )])), + )])), + )])), + )])), + true, + ); + } + #[test] fn test_merge_json_type() -> Result<()> { fn test( diff --git a/src/datatypes/src/vectors/json/builder.rs b/src/datatypes/src/vectors/json/builder.rs index 4f63cb4bc48e..3a32dda171df 100644 --- a/src/datatypes/src/vectors/json/builder.rs +++ b/src/datatypes/src/vectors/json/builder.rs @@ -20,6 +20,7 @@ use crate::data_type::ConcreteDataType; use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu}; use crate::json::value::JsonValueRef; use crate::prelude::{ValueRef, Vector, VectorRef}; +use crate::types::json_type::JsonNativeType; use crate::types::{JsonType, json_type}; use crate::value::StructValueRef; use crate::vectors::{MutableVector, StructVectorBuilder}; @@ -181,9 +182,9 @@ pub(crate) struct JsonVectorBuilder { } impl JsonVectorBuilder { - pub(crate) fn with_capacity(capacity: usize) -> Self { + pub(crate) fn new(json_type: JsonNativeType, capacity: usize) -> Self { Self { - merged_type: JsonType::empty(), + merged_type: JsonType::new_native(json_type), capacity, builders: vec![], } @@ -326,18 +327,18 @@ mod tests { "Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]", ), ]; - let mut builder = JsonVectorBuilder::with_capacity(1); + let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1); for (json, result) in jsons.into_iter().zip(results.into_iter()) { push(json, &mut builder, result); } let vector = builder.to_vector(); let expected = r#" -+----------------+ -| StructVector | -+----------------+ -| {__plain__: 1} | -| {__plain__: 2} | -+----------------+"#; ++---------------------+ +| StructVector | ++---------------------+ +| {__json_plain__: 1} | +| {__json_plain__: 2} | ++---------------------+"#; assert_eq!(pretty_print(vector), expected.trim()); Ok(()) } @@ -386,7 +387,7 @@ mod tests { "object": {"timestamp": 1761523203000} }"#, ]; - let mut builder = JsonVectorBuilder::with_capacity(1); + let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1); for json in jsons { push(json, &mut builder, Ok(())); } diff --git a/src/datatypes/src/vectors/struct_vector.rs b/src/datatypes/src/vectors/struct_vector.rs index d9490a63bb53..44de9abf5ecb 100644 --- a/src/datatypes/src/vectors/struct_vector.rs +++ b/src/datatypes/src/vectors/struct_vector.rs @@ -379,10 +379,8 @@ impl MutableVector for StructVectorBuilder { }, StructValueRef::Ref(val) => self.push_struct_value(val)?, StructValueRef::RefList { val, fields } => { - let struct_value = StructValue::try_new( - val.iter().map(|v| Value::from(v.clone())).collect(), - fields.clone(), - )?; + let struct_value = + StructValue::try_new(val.into_iter().map(Value::from).collect(), fields)?; self.push_struct_value(&struct_value)?; } } @@ -429,12 +427,17 @@ impl ScalarVectorBuilder for StructVectorBuilder { .value_builders .iter_mut() .map(|b| b.to_vector().to_arrow_array()) - .collect(); - let struct_array = StructArray::new( - self.fields.as_arrow_fields(), - arrays, - self.null_buffer.finish(), - ); + .collect::>(); + + let struct_array = if arrays.is_empty() { + StructArray::new_empty_fields(self.len(), self.null_buffer.finish()) + } else { + StructArray::new( + self.fields.as_arrow_fields(), + arrays, + self.null_buffer.finish(), + ) + }; StructVector::try_new(self.fields.clone(), struct_array).unwrap() } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 301431aff5be..715f60594b7f 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -17,7 +17,7 @@ mod relation; -use api::helper::{pb_value_to_value_ref, value_to_grpc_value}; +use api::helper::{pb_value_to_value_ref, to_grpc_value}; use api::v1::Row as ProtoRow; use datatypes::data_type::ConcreteDataType; use datatypes::types::cast; @@ -201,11 +201,7 @@ impl From for Row { impl From for ProtoRow { fn from(row: Row) -> Self { - let values = row - .unpack() - .into_iter() - .map(value_to_grpc_value) - .collect_vec(); + let values = row.unpack().into_iter().map(to_grpc_value).collect_vec(); ProtoRow { values } } } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index d820a3522635..aa2278cb7814 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; -use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value}; +use api::helper::{ColumnDataTypeWrapper, to_grpc_value}; use api::v1::bulk_wal_entry::Body; use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry}; use bytes::Bytes; @@ -268,7 +268,7 @@ impl BulkPart { let values = (0..self.batch.num_columns()) .map(|col_idx| { if let Some(v) = &vectors[col_idx] { - value_to_grpc_value(v.get(row_idx)) + to_grpc_value(v.get(row_idx)) } else { api::v1::Value { value_data: None } } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 8bc24a49536a..8a69b1856f04 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; +use common_recordbatch::recordbatch::align_json_array; use datatypes::arrow::array::{ Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array, }; @@ -27,7 +28,7 @@ use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; use datatypes::value::Value; -use datatypes::vectors::VectorRef; +use datatypes::vectors::{Helper, VectorRef}; use mito_codec::row_converter::{ CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec, build_primary_key_codec_with_fields, @@ -38,8 +39,9 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::{ - CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, - NewRecordBatchSnafu, Result, UnexpectedSnafu, UnsupportedOperationSnafu, + CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu, + DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu, + UnsupportedOperationSnafu, }; use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns}; use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper}; @@ -150,7 +152,7 @@ impl PrimaryKeyCompatBatch { batch = compat_pk.compat(batch)?; } if let Some(compat_fields) = &self.compat_fields { - batch = compat_fields.compat(batch); + batch = compat_fields.compat(batch)?; } Ok(batch) @@ -351,11 +353,13 @@ impl FlatCompatBatch { let old_column = batch.column(*pos); if let Some(ty) = cast_type { - // Safety: We ensure type can be converted and the new batch should be valid. - // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted. - let casted = + let casted = if let Some(json_type) = ty.as_json() { + align_json_array(old_column, &json_type.as_arrow_type()) + .context(RecordBatchSnafu)? + } else { datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type()) - .context(ComputeArrowSnafu)?; + .context(ComputeArrowSnafu)? + }; Ok(casted) } else { Ok(old_column.clone()) @@ -452,8 +456,7 @@ struct CompatFields { impl CompatFields { /// Make fields of the `batch` compatible. - #[must_use] - fn compat(&self, batch: Batch) -> Batch { + fn compat(&self, batch: Batch) -> Result { debug_assert_eq!(self.actual_fields.len(), batch.fields().len()); debug_assert!( self.actual_fields @@ -463,24 +466,32 @@ impl CompatFields { ); let len = batch.num_rows(); - let fields = self - .index_or_defaults + self.index_or_defaults .iter() .map(|index_or_default| match index_or_default { IndexOrDefault::Index { pos, cast_type } => { let old_column = &batch.fields()[*pos]; let data = if let Some(ty) = cast_type { - // Safety: We ensure type can be converted and the new batch should be valid. - // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted. - old_column.data.cast(ty).unwrap() + if let Some(json_type) = ty.as_json() { + let json_array = old_column.data.to_arrow_array(); + let json_array = + align_json_array(&json_array, &json_type.as_arrow_type()) + .context(RecordBatchSnafu)?; + Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)? + } else { + old_column.data.cast(ty).with_context(|_| CastVectorSnafu { + from: old_column.data.data_type(), + to: ty.clone(), + })? + } } else { old_column.data.clone() }; - BatchColumn { + Ok(BatchColumn { column_id: old_column.column_id, data, - } + }) } IndexOrDefault::DefaultValue { column_id, @@ -488,16 +499,14 @@ impl CompatFields { semantic_type: _, } => { let data = default_vector.replicate(&[len]); - BatchColumn { + Ok(BatchColumn { column_id: *column_id, data, - } + }) } }) - .collect(); - - // Safety: We ensure all columns have the same length and the new batch should be valid. - batch.with_fields(fields).unwrap() + .collect::>>() + .and_then(|fields| batch.with_fields(fields)) } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4bac6b6266e1..29061fae3d4f 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -20,7 +20,6 @@ use std::time::Instant; use api::helper::{ ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type, - to_proto_value, }; use api::v1::column_def::options_from_column_schema; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint}; @@ -414,7 +413,7 @@ impl WriteRequest { }; // Convert default value into proto's value. - Ok(to_proto_value(default_value)) + Ok(api::helper::to_grpc_value(default_value)) } } diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index 3fa9a0ae1f69..4b7e0946cd16 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -762,7 +762,8 @@ pub(crate) fn to_alter_table_expr( target_type, } => { let target_type = - sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?; + sql_data_type_to_concrete_data_type(&target_type, &Default::default()) + .context(ParseSqlSnafu)?; let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type) .map(|w| w.to_parts()) .context(ColumnDataTypeSnafu)?; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index c323677f435b..201d5d99f4ed 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -353,10 +353,11 @@ impl Inserter { &self, insert: &Insert, ctx: &QueryContextRef, + statement_executor: &StatementExecutor, ) -> Result { let (inserts, table_info) = StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx) - .convert(insert, ctx) + .convert(insert, ctx, statement_executor) .await?; let table_infos = diff --git a/src/operator/src/req_convert/insert/fill_impure_default.rs b/src/operator/src/req_convert/insert/fill_impure_default.rs index 2029b8b96f68..0e39bc724144 100644 --- a/src/operator/src/req_convert/insert/fill_impure_default.rs +++ b/src/operator/src/req_convert/insert/fill_impure_default.rs @@ -63,7 +63,7 @@ impl ImpureDefaultFiller { column.default_constraint() ), })?; - let grpc_default_value = api::helper::to_proto_value(default_value); + let grpc_default_value = api::helper::to_grpc_value(default_value); let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0); let grpc_column_schema = api::v1::ColumnSchema { column_name: def.name, diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index aca31b289a37..ef4e7cac8e23 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -12,13 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value}; +use std::cell::LazyCell; +use std::collections::HashMap; + +use api::helper::{ColumnDataTypeWrapper, to_grpc_value}; +use api::v1::alter_table_expr::Kind; use api::v1::column_def::options_from_column_schema; use api::v1::region::InsertRequests as RegionInsertRequests; -use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue}; +use api::v1::{ + AlterTableExpr, ColumnSchema as GrpcColumnSchema, ModifyColumnType, ModifyColumnTypes, Row, + Rows, +}; use catalog::CatalogManager; +use common_telemetry::info; use common_time::Timezone; +use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::types::JsonType; +use datatypes::value::Value; use partition::manager::PartitionRuleManager; use session::context::{QueryContext, QueryContextRef}; use snafu::{OptionExt, ResultExt, ensure}; @@ -30,12 +41,13 @@ use table::metadata::TableInfoRef; use crate::error::{ CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, - ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, - SchemaReadOnlySnafu, TableNotFoundSnafu, + ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, + ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu, }; use crate::insert::InstantAndNormalInsertRequests; use crate::req_convert::common::partitioner::Partitioner; use crate::req_convert::insert::semantic_type; +use crate::statement::StatementExecutor; const DEFAULT_PLACEHOLDER_VALUE: &str = "default"; @@ -62,12 +74,12 @@ impl<'a> StatementToRegion<'a> { &self, stmt: &Insert, query_ctx: &QueryContextRef, + statement_executor: &StatementExecutor, ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> { let name = stmt.table_name().context(ParseSqlSnafu)?; let (catalog, schema, table_name) = self.get_full_name(name)?; - let table = self.get_table(&catalog, &schema, &table_name).await?; + let mut table = self.get_table(&catalog, &schema, &table_name).await?; let table_schema = table.schema(); - let table_info = table.table_info(); ensure!( !common_catalog::consts::is_readonly_schema(&schema), @@ -94,7 +106,6 @@ impl<'a> StatementToRegion<'a> { Ok(()) })?; - let mut schema = Vec::with_capacity(column_count); let mut rows = vec![ Row { values: Vec::with_capacity(column_count) @@ -102,17 +113,57 @@ impl<'a> StatementToRegion<'a> { row_count ]; - for (i, column_name) in column_names.into_iter().enumerate() { - let column_schema = table_schema - .column_schema_by_name(column_name) - .with_context(|| ColumnNotFoundSnafu { - msg: format!("Column {} not found in table {}", column_name, &table_name), - })?; + fn find_insert_columns<'a>( + table: &'a TableRef, + column_names: &[&String], + ) -> Result> { + let schema = table.schema_ref(); + column_names + .iter() + .map(|name| { + schema + .column_schema_by_name(name) + .context(ColumnNotFoundSnafu { msg: *name }) + }) + .collect::>>() + } + + let mut insert_columns = find_insert_columns(&table, &column_names)?; + let converter = SqlRowConverter::new(&insert_columns, query_ctx); + // Convert the SQL values to GreptimeDB values, and merge a "largest" JSON types of all + // values on the way by `JsonColumnTypeUpdater`. + let mut updater = JsonColumnTypeUpdater::new(statement_executor, query_ctx); + let value_rows = converter.convert(&mut updater, &sql_rows)?; + + // If the JSON values have a "larger" json type than the one in the table schema, modify + // the column's json type first, by executing an "alter table" DDL. + if updater + .maybe_update_column_type(&catalog, &schema, &table_name, &insert_columns) + .await? + { + // Update with the latest schema, if changed. + table = self.get_table(&catalog, &schema, &table_name).await?; + insert_columns = find_insert_columns(&table, &column_names)?; + } + + // Finally convert GreptimeDB values to GRPC values, ready to do insertion on Datanode. + for (i, row) in value_rows.into_iter().enumerate() { + for value in row { + let grpc_value = to_grpc_value(value); + rows[i].values.push(grpc_value); + } + } + + let table_info = table.table_info(); + let mut schema = Vec::with_capacity(column_count); + for column_schema in insert_columns { let (datatype, datatype_extension) = ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()) .context(ColumnDataTypeSnafu)? .to_parts(); + + let column_name = &column_schema.name; let semantic_type = semantic_type(&table_info, column_name)?; let grpc_column_schema = GrpcColumnSchema { @@ -123,16 +174,6 @@ impl<'a> StatementToRegion<'a> { options: options_from_column_schema(column_schema), }; schema.push(grpc_column_schema); - - for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) { - let value = sql_value_to_grpc_value( - column_schema, - &sql_row[i], - Some(&query_ctx.timezone()), - query_ctx.auto_string_to_numeric(), - )?; - grpc_row.values.push(value); - } } let requests = Partitioner::new(self.partition_manager) @@ -194,6 +235,147 @@ impl<'a> StatementToRegion<'a> { } } +struct SqlRowConverter<'a, 'b> { + insert_columns: &'a [&'a ColumnSchema], + query_context: &'b QueryContextRef, +} + +impl<'a, 'b> SqlRowConverter<'a, 'b> { + fn new(insert_columns: &'a [&'a ColumnSchema], query_context: &'b QueryContextRef) -> Self { + Self { + insert_columns, + query_context, + } + } + + fn convert( + &self, + updater: &mut JsonColumnTypeUpdater<'_, 'a>, + sql_rows: &[Vec], + ) -> Result>> { + let timezone = Some(&self.query_context.timezone()); + let auto_string_to_numeric = self.query_context.auto_string_to_numeric(); + + let mut value_rows = Vec::with_capacity(sql_rows.len()); + for sql_row in sql_rows { + let mut value_row = Vec::with_capacity(self.insert_columns.len()); + + for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) { + let value = + sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?; + + updater.merge_types(insert_column, &value)?; + + value_row.push(value); + } + value_rows.push(value_row); + } + Ok(value_rows) + } +} + +struct JsonColumnTypeUpdater<'a, 'b> { + statement_executor: &'a StatementExecutor, + query_context: &'a QueryContextRef, + merged_value_types: LazyCell>, +} + +impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> { + fn new(statement_executor: &'a StatementExecutor, query_context: &'a QueryContextRef) -> Self { + Self { + statement_executor, + query_context, + merged_value_types: LazyCell::new(Default::default), + } + } + + fn merge_types(&mut self, column_schema: &'b ColumnSchema, value: &Value) -> Result<()> { + if !matches!(value, Value::Json(_)) { + return Ok(()); + } + + if let ConcreteDataType::Json(value_type) = value.data_type() { + let merged_type = self + .merged_value_types + .entry(&column_schema.name) + .or_insert_with(|| value_type.clone()); + + if !merged_type.is_include(&value_type) { + merged_type.merge(&value_type).map_err(|e| { + InvalidInsertRequestSnafu { + reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#), + } + .build() + })?; + } + } + Ok(()) + } + + async fn maybe_update_column_type( + self, + catalog: &str, + schema: &str, + table: &str, + insert_columns: &[&ColumnSchema], + ) -> Result { + let mut has_update = false; + for (column_name, merged_type) in self.merged_value_types.iter() { + let Some(column_type) = insert_columns + .iter() + .find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json())) + .flatten() + else { + continue; + }; + if column_type.is_include(merged_type) { + continue; + } + + let new_column_type = { + let mut x = column_type.clone(); + x.merge(merged_type) + .map_err(|e| { + InvalidInsertRequestSnafu { + reason: format!( + r#"cannot merge "{merged_type}" into "{column_type}": {e}"# + ), + } + .build() + }) + .map(|()| x) + }?; + info!( + "updating table {}.{}.{} column {} json type: {} => {}", + catalog, schema, table, column_name, column_type, new_column_type, + ); + + let (target_type, target_type_extension) = + ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(new_column_type)) + .context(ColumnDataTypeSnafu)? + .into_parts(); + let alter_expr = AlterTableExpr { + catalog_name: catalog.to_string(), + schema_name: schema.to_string(), + table_name: table.to_string(), + kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes { + modify_column_types: vec![ModifyColumnType { + column_name: column_name.to_string(), + target_type: target_type as i32, + target_type_extension, + }], + })), + }; + self.statement_executor + .alter_table_inner(alter_expr, self.query_context.clone()) + .await?; + + has_update = true; + } + Ok(has_update) + } +} + fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> { if !stmt.columns().is_empty() { stmt.columns() @@ -209,12 +391,12 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St /// Converts SQL value to gRPC value according to the column schema. /// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values, /// and fills the default value if the cast fails. -fn sql_value_to_grpc_value( +fn sql_value_to_value( column_schema: &ColumnSchema, sql_val: &SqlValue, timezone: Option<&Timezone>, auto_string_to_numeric: bool, -) -> Result { +) -> Result { let column = &column_schema.name; let value = if replace_default(sql_val) { let default_value = column_schema @@ -237,9 +419,25 @@ fn sql_value_to_grpc_value( ) .context(crate::error::SqlCommonSnafu)? }; + validate(&value)?; + Ok(value) +} - let grpc_value = value_to_grpc_value(value); - Ok(grpc_value) +fn validate(value: &Value) -> Result<()> { + match value { + Value::Json(value) => { + // Json object will be stored as Arrow struct in parquet, and it has the restriction: + // "Parquet does not support writing empty structs". + ensure!( + !value.is_empty_object(), + InvalidInsertRequestSnafu { + reason: "empty json object is not supported, consider adding a dummy field" + } + ); + Ok(()) + } + _ => Ok(()), + } } fn replace_default(sql_val: &SqlValue) -> bool { diff --git a/src/operator/src/statement/dml.rs b/src/operator/src/statement/dml.rs index 827bfd8b6631..41169398abce 100644 --- a/src/operator/src/statement/dml.rs +++ b/src/operator/src/statement/dml.rs @@ -28,7 +28,7 @@ impl StatementExecutor { if insert.can_extract_values() { // Fast path: plain insert ("insert with literal values") is executed directly self.inserter - .handle_statement_insert(insert.as_ref(), &query_ctx) + .handle_statement_insert(insert.as_ref(), &query_ctx, self) .await } else { // Slow path: insert with subquery. Execute using query engine. diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 6c2a7e11ab2d..8558db6fcb32 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -353,7 +353,8 @@ mod tests { let ts_col = columns.first().unwrap(); assert_eq!( expected_type, - sql_data_type_to_concrete_data_type(ts_col.data_type()).unwrap() + sql_data_type_to_concrete_data_type(ts_col.data_type(), &Default::default()) + .unwrap() ); } _ => unreachable!(), diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 157f5540719c..53dcdb0e0312 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -669,8 +669,7 @@ impl<'a> ParserContext<'a> { // Must immediately parse the JSON datatype format because it is closely after the "JSON" // datatype, like this: "JSON(format = ...)". if matches!(data_type, DataType::JSON) { - let options = json::parse_json_datatype_options(parser)?; - extensions.json_datatype_options = Some(options); + extensions.json_datatype_options = json::parse_json_datatype_options(parser)?; } let mut options = vec![]; @@ -856,7 +855,7 @@ impl<'a> ParserContext<'a> { ); let column_type = get_unalias_type(column_type); - let data_type = sql_data_type_to_concrete_data_type(&column_type)?; + let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?; ensure!( data_type == ConcreteDataType::string_datatype(), InvalidColumnOptionSnafu { diff --git a/src/sql/src/parsers/create_parser/json.rs b/src/sql/src/parsers/create_parser/json.rs index 1556205fef4b..649a91106a35 100644 --- a/src/sql/src/parsers/create_parser/json.rs +++ b/src/sql/src/parsers/create_parser/json.rs @@ -20,7 +20,7 @@ use crate::error::{Result, SyntaxSnafu}; use crate::statements::OptionMap; use crate::util; -pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result { +pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result> { if parser.consume_token(&Token::LParen) { let result = parser .parse_comma_separated0(Parser::parse_sql_option, Token::RParen) @@ -32,9 +32,9 @@ pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result>>() })?; parser.expect_token(&Token::RParen).context(SyntaxSnafu)?; - Ok(OptionMap::new(result)) + Ok(Some(OptionMap::new(result))) } else { - Ok(OptionMap::default()) + Ok(None) } } @@ -53,7 +53,7 @@ mod tests { #[test] fn test_parse_json_datatype_options() { - fn parse(sql: &str) -> OptionMap { + fn parse(sql: &str) -> Option { let Statement::CreateTable(mut create_table) = ParserContext::create_with_dialect( sql, &GreptimeDbDialect {}, @@ -72,8 +72,7 @@ mod tests { assert_eq!(column_def.data_type, DataType::JSON); assert!(column_def.options.is_empty()); - assert!(extensions.json_datatype_options.is_some()); - extensions.json_datatype_options.unwrap() + extensions.json_datatype_options } let sql = r#" @@ -81,7 +80,7 @@ CREATE TABLE json_data ( my_json JSON(format = "partial", unstructured_keys = ["k", "foo.bar", "a.b.c"]), ts TIMESTAMP TIME INDEX, )"#; - let options = parse(sql); + let options = parse(sql).unwrap(); assert_eq!(options.len(), 2); assert_eq!( options.value(JSON_OPT_FORMAT).and_then(|x| x.as_string()), @@ -100,7 +99,7 @@ CREATE TABLE json_data ( my_json JSON(format = "structured"), ts TIMESTAMP TIME INDEX, )"#; - let options = parse(sql); + let options = parse(sql).unwrap(); assert_eq!(options.len(), 1); assert_eq!( options.value(JSON_OPT_FORMAT).and_then(|x| x.as_string()), @@ -112,7 +111,7 @@ CREATE TABLE json_data ( my_json JSON(format = "raw"), ts TIMESTAMP TIME INDEX, )"#; - let options = parse(sql); + let options = parse(sql).unwrap(); assert_eq!(options.len(), 1); assert_eq!( options.value(JSON_OPT_FORMAT).and_then(|x| x.as_string()), @@ -124,7 +123,7 @@ CREATE TABLE json_data ( my_json JSON(), ts TIMESTAMP TIME INDEX, )"#; - let options = parse(sql); + let options = parse(sql).unwrap(); assert!(options.is_empty()); let sql = r#" @@ -133,6 +132,6 @@ CREATE TABLE json_data ( ts TIMESTAMP TIME INDEX, )"#; let options = parse(sql); - assert!(options.is_empty()); + assert!(options.is_none()); } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index b4933753738b..10fcac9b11b3 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -41,7 +41,8 @@ use common_time::timezone::Timezone; use datatypes::extension::json::{JsonExtensionType, JsonMetadata}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema}; -use datatypes::types::TimestampType; +use datatypes::types::json_type::JsonNativeType; +use datatypes::types::{JsonFormat, JsonType, TimestampType}; use datatypes::value::Value; use snafu::ResultExt; use sqlparser::ast::{ExactNumberInfo, Ident}; @@ -55,7 +56,7 @@ use crate::error::{ SerializeColumnDefaultConstraintSnafu, SetFulltextOptionSnafu, SetJsonStructureSettingsSnafu, SetSkippingIndexOptionSnafu, SqlCommonSnafu, }; -use crate::statements::create::Column; +use crate::statements::create::{Column, ColumnExtensions}; pub use crate::statements::option_map::OptionMap; pub(crate) use crate::statements::transform::transform_statements; @@ -109,7 +110,7 @@ pub fn column_to_schema( && !is_time_index; let name = column.name().value.clone(); - let data_type = sql_data_type_to_concrete_data_type(column.data_type())?; + let data_type = sql_data_type_to_concrete_data_type(column.data_type(), &column.extensions)?; let default_constraint = parse_column_default_constraint(&name, &data_type, column.options(), timezone) .context(SqlCommonSnafu)?; @@ -171,7 +172,7 @@ pub fn sql_column_def_to_grpc_column_def( timezone: Option<&Timezone>, ) -> Result { let name = col.name.value.clone(); - let data_type = sql_data_type_to_concrete_data_type(&col.data_type)?; + let data_type = sql_data_type_to_concrete_data_type(&col.data_type, &Default::default())?; let is_nullable = col .options @@ -217,7 +218,10 @@ pub fn sql_column_def_to_grpc_column_def( }) } -pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { +pub fn sql_data_type_to_concrete_data_type( + data_type: &SqlDataType, + column_extensions: &ColumnExtensions, +) -> Result { match data_type { SqlDataType::BigInt(_) | SqlDataType::Int64 => Ok(ConcreteDataType::int64_datatype()), SqlDataType::BigIntUnsigned(_) => Ok(ConcreteDataType::uint64_datatype()), @@ -269,7 +273,14 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::json_datatype()), + SqlDataType::JSON => { + let format = if column_extensions.json_datatype_options.is_some() { + JsonFormat::Native(Box::new(JsonNativeType::Null)) + } else { + JsonFormat::Jsonb + }; + Ok(ConcreteDataType::Json(JsonType::new(format))) + } // Vector type SqlDataType::Custom(name, d) if name.0.as_slice().len() == 1 @@ -354,7 +365,7 @@ mod tests { fn check_type(sql_type: SqlDataType, data_type: ConcreteDataType) { assert_eq!( data_type, - sql_data_type_to_concrete_data_type(&sql_type).unwrap() + sql_data_type_to_concrete_data_type(&sql_type, &Default::default()).unwrap() ); } diff --git a/src/sql/src/statements/transform/type_alias.rs b/src/sql/src/statements/transform/type_alias.rs index 2d78855f2aa1..f76eb13ba64e 100644 --- a/src/sql/src/statements/transform/type_alias.rs +++ b/src/sql/src/statements/transform/type_alias.rs @@ -117,7 +117,9 @@ impl TransformRule for TypeAliasTransformRule { } if get_type_by_alias(data_type).is_some() => { // Safety: checked in the match arm. let new_type = get_type_by_alias(data_type).unwrap(); - if let Ok(new_type) = sql_data_type_to_concrete_data_type(&new_type) { + if let Ok(new_type) = + sql_data_type_to_concrete_data_type(&new_type, &Default::default()) + { *expr = Expr::Function(cast_expr_to_arrow_cast_func( (**cast_expr).clone(), new_type.as_arrow_type().to_string(), @@ -132,9 +134,10 @@ impl TransformRule for TypeAliasTransformRule { expr: cast_expr, .. } => { - if let Ok(concrete_type) = - sql_data_type_to_concrete_data_type(&DataType::Timestamp(*precision, *zone)) - { + if let Ok(concrete_type) = sql_data_type_to_concrete_data_type( + &DataType::Timestamp(*precision, *zone), + &Default::default(), + ) { let new_type = concrete_type.as_arrow_type(); *expr = Expr::Function(cast_expr_to_arrow_cast_func( (**cast_expr).clone(), diff --git a/src/table/src/table.rs b/src/table/src/table.rs index ce36544b73dc..0ae7d580d8ab 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -118,6 +118,10 @@ impl Table { self.table_info.meta.schema.clone() } + pub fn schema_ref(&self) -> &SchemaRef { + &self.table_info.meta.schema + } + /// Get a reference to the table info. pub fn table_info(&self) -> TableInfoRef { self.table_info.clone() diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 14c8796e527a..b3d981b1b0de 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -230,7 +230,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { .unwrap(); sqlx::query( - "create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt timestamp(3) default null, b blob default null, j json default null, v vector(3) default null)", + "create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt timestamp(3) default null, b blob default null, j json, v vector(3) default null)", ) .execute(&pool) .await diff --git a/tests/cases/standalone/common/alter/alter_table_alter_column_set_default.result b/tests/cases/standalone/common/alter/alter_table_alter_column_set_default.result index 30d82e100055..c92f6a4acff2 100644 --- a/tests/cases/standalone/common/alter/alter_table_alter_column_set_default.result +++ b/tests/cases/standalone/common/alter/alter_table_alter_column_set_default.result @@ -116,7 +116,7 @@ SHOW CREATE TABLE test1; ALTER TABLE test1 MODIFY COLUMN o SET DEFAULT "not allow"; -Error: 1001(Unsupported), Unsupported expr in default constraint: "not allow" for column: o +Error: 1001(Unsupported), Unsupported default constraint for column: 'o', reason: expr '"not allow"' not supported ALTER TABLE test1 MODIFY COLUMN o SET DEFAULT NULL; diff --git a/tests/cases/standalone/common/create/current_timestamp.result b/tests/cases/standalone/common/create/current_timestamp.result index 09cb8a01a5a9..8571b62d30cc 100644 --- a/tests/cases/standalone/common/create/current_timestamp.result +++ b/tests/cases/standalone/common/create/current_timestamp.result @@ -54,7 +54,7 @@ show create table t3; create table t4 (ts timestamp time index default now); -Error: 1001(Unsupported), Unsupported expr in default constraint: now for column: ts +Error: 1001(Unsupported), Unsupported default constraint for column: 'ts', reason: expr 'now' not supported drop table t1; diff --git a/tests/cases/standalone/common/types/json/json-structured.result b/tests/cases/standalone/common/types/json/json-structured.result new file mode 100644 index 000000000000..0553831e9019 --- /dev/null +++ b/tests/cases/standalone/common/types/json/json-structured.result @@ -0,0 +1,82 @@ +CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured") DEFAULT '{"foo": "bar"}'); + +Error: 1001(Unsupported), Unsupported default constraint for column: 'j', reason: json column cannot have a default value + +CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured")); + +Affected Rows: 0 + +DESC TABLE t; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| j | Json | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + +INSERT INTO t VALUES +(1762128001000, '{"int": 1}'), +(1762128002000, '{"int": 2, "list": [0.1, 0.2, 0.3]}'), +(1762128003000, '{"int": 3, "list": [0.4, 0.5, 0.6], "nested": {"a": {"x": "hello"}, "b": {"y": -1}}}'); + +Affected Rows: 3 + +DESC TABLE t; + ++--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| j | Json | | YES | | FIELD | ++--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ + +INSERT INTO t VALUES +(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'), +(1762128005000, '{"int": 5, "bool": false, "nested": {"b": {"x": "world"}}}'); + +Affected Rows: 2 + +DESC TABLE t; + ++--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| j | Json | | YES | | FIELD | ++--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ + +INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}'); + +Affected Rows: 1 + +DESC TABLE t; + ++--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| j | Json | | YES | | FIELD | ++--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ + +INSERT INTO t VALUES (1762128011000, '{}'); + +Error: 1004(InvalidArguments), Invalid InsertRequest, reason: empty json object is not supported, consider adding a dummy field + +SELECT ts, j FROM t order by ts; + ++---------------------+----------------------------------------------------------------------------------------+ +| ts | j | ++---------------------+----------------------------------------------------------------------------------------+ +| 2025-11-03T00:00:01 | {bool: , int: 1, list: , nested: } | +| 2025-11-03T00:00:02 | {bool: , int: 2, list: [0.1, 0.2, 0.3], nested: } | +| 2025-11-03T00:00:03 | {bool: , int: 3, list: [0.4, 0.5, 0.6], nested: {a: {x: hello, y: }, b: {x: , y: -1}}} | +| 2025-11-03T00:00:04 | {bool: true, int: 4, list: , nested: {a: {x: , y: 1}, b: }} | +| 2025-11-03T00:00:05 | {bool: false, int: 5, list: , nested: {a: , b: {x: world, y: }}} | +| 2025-11-03T00:00:06 | {bool: true, int: 6, list: [-6.0], nested: {a: {x: ax, y: 66}, b: {x: bx, y: -66}}} | ++---------------------+----------------------------------------------------------------------------------------+ + +DROP table t; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/types/json/json-structured.sql b/tests/cases/standalone/common/types/json/json-structured.sql new file mode 100644 index 000000000000..8bb10b4b0e2b --- /dev/null +++ b/tests/cases/standalone/common/types/json/json-structured.sql @@ -0,0 +1,28 @@ +CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured") DEFAULT '{"foo": "bar"}'); + +CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured")); + +DESC TABLE t; + +INSERT INTO t VALUES +(1762128001000, '{"int": 1}'), +(1762128002000, '{"int": 2, "list": [0.1, 0.2, 0.3]}'), +(1762128003000, '{"int": 3, "list": [0.4, 0.5, 0.6], "nested": {"a": {"x": "hello"}, "b": {"y": -1}}}'); + +DESC TABLE t; + +INSERT INTO t VALUES +(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'), +(1762128005000, '{"int": 5, "bool": false, "nested": {"b": {"x": "world"}}}'); + +DESC TABLE t; + +INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}'); + +DESC TABLE t; + +INSERT INTO t VALUES (1762128011000, '{}'); + +SELECT ts, j FROM t order by ts; + +DROP table t;