Skip to content

Commit 2273fb0

Browse files
feat: simple read write new json type values
Signed-off-by: luofucong <[email protected]>
1 parent 6caff50 commit 2273fb0

File tree

27 files changed

+297
-223
lines changed

27 files changed

+297
-223
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
148148
fst = "0.4.7"
149149
futures = "0.3"
150150
futures-util = "0.3"
151-
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
151+
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0aa6641051bd273fba2c8e2421381179f00b212d" }
152152
hex = "0.4"
153153
http = "1"
154154
humantime = "2.1"

src/api/src/helper.rs

Lines changed: 33 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use datatypes::prelude::{ConcreteDataType, ValueRef};
2424
use datatypes::types::{
2525
IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType,
2626
};
27-
use datatypes::value::{
28-
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
29-
};
27+
use datatypes::value::{ListValueRef, OrderedF32, OrderedF64, StructValueRef, Value};
3028
use datatypes::vectors::VectorRef;
3129
use greptime_proto::v1::column_data_type_extension::TypeExt;
3230
use greptime_proto::v1::ddl_request::Expr;
@@ -762,18 +760,7 @@ pub fn pb_value_to_value_ref<'a>(
762760
}
763761

764762
ValueData::StructValue(struct_value) => {
765-
let struct_datatype_ext = datatype_ext
766-
.as_ref()
767-
.and_then(|ext| {
768-
if let Some(TypeExt::StructType(s)) = &ext.type_ext {
769-
Some(s)
770-
} else {
771-
None
772-
}
773-
})
774-
.expect("struct must contain datatype ext");
775-
776-
let struct_fields = struct_datatype_ext
763+
let struct_fields = struct_value
777764
.fields
778765
.iter()
779766
.map(|field| {
@@ -789,7 +776,7 @@ pub fn pb_value_to_value_ref<'a>(
789776
let items = struct_value
790777
.items
791778
.iter()
792-
.zip(struct_datatype_ext.fields.iter())
779+
.zip(struct_value.fields.iter())
793780
.map(|(item, field)| pb_value_to_value_ref(item, field.datatype_extension.as_ref()))
794781
.collect::<Vec<ValueRef>>();
795782

@@ -834,132 +821,11 @@ pub fn is_column_type_value_eq(
834821
ColumnDataTypeWrapper::try_new(type_value, type_extension)
835822
.map(|wrapper| {
836823
let datatype = ConcreteDataType::from(wrapper);
837-
expect_type == &datatype
824+
expect_type.is_json() && datatype.is_json() || &datatype == expect_type
838825
})
839826
.unwrap_or(false)
840827
}
841828

842-
/// Convert value into proto's value.
843-
pub fn to_proto_value(value: Value) -> v1::Value {
844-
match value {
845-
Value::Null => v1::Value { value_data: None },
846-
Value::Boolean(v) => v1::Value {
847-
value_data: Some(ValueData::BoolValue(v)),
848-
},
849-
Value::UInt8(v) => v1::Value {
850-
value_data: Some(ValueData::U8Value(v.into())),
851-
},
852-
Value::UInt16(v) => v1::Value {
853-
value_data: Some(ValueData::U16Value(v.into())),
854-
},
855-
Value::UInt32(v) => v1::Value {
856-
value_data: Some(ValueData::U32Value(v)),
857-
},
858-
Value::UInt64(v) => v1::Value {
859-
value_data: Some(ValueData::U64Value(v)),
860-
},
861-
Value::Int8(v) => v1::Value {
862-
value_data: Some(ValueData::I8Value(v.into())),
863-
},
864-
Value::Int16(v) => v1::Value {
865-
value_data: Some(ValueData::I16Value(v.into())),
866-
},
867-
Value::Int32(v) => v1::Value {
868-
value_data: Some(ValueData::I32Value(v)),
869-
},
870-
Value::Int64(v) => v1::Value {
871-
value_data: Some(ValueData::I64Value(v)),
872-
},
873-
Value::Float32(v) => v1::Value {
874-
value_data: Some(ValueData::F32Value(*v)),
875-
},
876-
Value::Float64(v) => v1::Value {
877-
value_data: Some(ValueData::F64Value(*v)),
878-
},
879-
Value::String(v) => v1::Value {
880-
value_data: Some(ValueData::StringValue(v.as_utf8().to_string())),
881-
},
882-
Value::Binary(v) => v1::Value {
883-
value_data: Some(ValueData::BinaryValue(v.to_vec())),
884-
},
885-
Value::Date(v) => v1::Value {
886-
value_data: Some(ValueData::DateValue(v.val())),
887-
},
888-
Value::Timestamp(v) => match v.unit() {
889-
TimeUnit::Second => v1::Value {
890-
value_data: Some(ValueData::TimestampSecondValue(v.value())),
891-
},
892-
TimeUnit::Millisecond => v1::Value {
893-
value_data: Some(ValueData::TimestampMillisecondValue(v.value())),
894-
},
895-
TimeUnit::Microsecond => v1::Value {
896-
value_data: Some(ValueData::TimestampMicrosecondValue(v.value())),
897-
},
898-
TimeUnit::Nanosecond => v1::Value {
899-
value_data: Some(ValueData::TimestampNanosecondValue(v.value())),
900-
},
901-
},
902-
Value::Time(v) => match v.unit() {
903-
TimeUnit::Second => v1::Value {
904-
value_data: Some(ValueData::TimeSecondValue(v.value())),
905-
},
906-
TimeUnit::Millisecond => v1::Value {
907-
value_data: Some(ValueData::TimeMillisecondValue(v.value())),
908-
},
909-
TimeUnit::Microsecond => v1::Value {
910-
value_data: Some(ValueData::TimeMicrosecondValue(v.value())),
911-
},
912-
TimeUnit::Nanosecond => v1::Value {
913-
value_data: Some(ValueData::TimeNanosecondValue(v.value())),
914-
},
915-
},
916-
Value::IntervalYearMonth(v) => v1::Value {
917-
value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())),
918-
},
919-
Value::IntervalDayTime(v) => v1::Value {
920-
value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())),
921-
},
922-
Value::IntervalMonthDayNano(v) => v1::Value {
923-
value_data: Some(ValueData::IntervalMonthDayNanoValue(
924-
convert_month_day_nano_to_pb(v),
925-
)),
926-
},
927-
Value::Decimal128(v) => v1::Value {
928-
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
929-
},
930-
Value::List(list_value) => v1::Value {
931-
value_data: Some(ValueData::ListValue(v1::ListValue {
932-
items: convert_list_to_pb_values(list_value),
933-
})),
934-
},
935-
Value::Struct(struct_value) => v1::Value {
936-
value_data: Some(ValueData::StructValue(v1::StructValue {
937-
items: convert_struct_to_pb_values(struct_value),
938-
})),
939-
},
940-
Value::Json(v) => v1::Value {
941-
value_data: Some(ValueData::JsonValue(Box::new(to_proto_value(*v)))),
942-
},
943-
Value::Duration(_) => v1::Value { value_data: None },
944-
}
945-
}
946-
947-
fn convert_list_to_pb_values(list_value: ListValue) -> Vec<v1::Value> {
948-
list_value
949-
.take_items()
950-
.into_iter()
951-
.map(to_proto_value)
952-
.collect()
953-
}
954-
955-
fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec<v1::Value> {
956-
struct_value
957-
.take_items()
958-
.into_iter()
959-
.map(to_proto_value)
960-
.collect()
961-
}
962-
963829
/// Returns the [ColumnDataTypeWrapper] of the value.
964830
///
965831
/// If value is null, returns `None`.
@@ -1002,19 +868,19 @@ pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
1002868
pub fn vectors_to_rows<'a>(
1003869
columns: impl Iterator<Item = &'a VectorRef>,
1004870
row_count: usize,
1005-
) -> Vec<Row> {
871+
) -> Result<Vec<Row>> {
1006872
let mut rows = vec![Row { values: vec![] }; row_count];
1007873
for column in columns {
1008874
for (row_index, row) in rows.iter_mut().enumerate() {
1009-
row.values.push(value_to_grpc_value(column.get(row_index)))
875+
row.values.push(value_to_grpc_value(column.get(row_index))?)
1010876
}
1011877
}
1012878

1013-
rows
879+
Ok(rows)
1014880
}
1015881

1016-
pub fn value_to_grpc_value(value: Value) -> GrpcValue {
1017-
GrpcValue {
882+
pub fn value_to_grpc_value(value: Value) -> Result<GrpcValue> {
883+
Ok(GrpcValue {
1018884
value_data: match value {
1019885
Value::Null => None,
1020886
Value::Boolean(v) => Some(ValueData::BoolValue(v)),
@@ -1054,23 +920,38 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
1054920
.take_items()
1055921
.into_iter()
1056922
.map(value_to_grpc_value)
1057-
.collect();
923+
.collect::<Result<Vec<GrpcValue>>>()?;
1058924
Some(ValueData::ListValue(v1::ListValue { items }))
1059925
}
1060926
Value::Struct(struct_value) => {
927+
let fields = struct_value.struct_type().fields();
928+
let fields = fields
929+
.iter()
930+
.map(|f| {
931+
let (datatype, datatype_extension) =
932+
ColumnDataTypeWrapper::try_from(f.data_type().clone())
933+
.map(|x| x.to_parts())?;
934+
Ok(v1::StructField {
935+
name: f.name().to_string(),
936+
datatype: datatype as i32,
937+
datatype_extension,
938+
})
939+
})
940+
.collect::<Result<Vec<_>>>()?;
941+
1061942
let items = struct_value
1062943
.take_items()
1063944
.into_iter()
1064945
.map(value_to_grpc_value)
1065-
.collect();
1066-
Some(ValueData::StructValue(v1::StructValue { items }))
946+
.collect::<Result<Vec<_>>>()?;
947+
Some(ValueData::StructValue(v1::StructValue { fields, items }))
1067948
}
1068949
Value::Json(inner_value) => Some(ValueData::JsonValue(Box::new(value_to_grpc_value(
1069950
*inner_value,
1070-
)))),
951+
)?))),
1071952
Value::Duration(_) => unreachable!(),
1072953
},
1073-
}
954+
})
1074955
}
1075956

1076957
pub fn from_pb_time_unit(unit: v1::TimeUnit) -> TimeUnit {
@@ -1163,6 +1044,7 @@ mod tests {
11631044
use common_time::interval::IntervalUnit;
11641045
use datatypes::scalars::ScalarVector;
11651046
use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type};
1047+
use datatypes::value::{ListValue, StructValue};
11661048
use datatypes::vectors::{
11671049
BooleanVector, DateVector, Float32Vector, PrimitiveVector, StringVector,
11681050
};
@@ -1653,7 +1535,7 @@ mod tests {
16531535
Arc::new(string_vec),
16541536
];
16551537

1656-
let result = vectors_to_rows(vector_refs.iter(), 3);
1538+
let result = vectors_to_rows(vector_refs.iter(), 3).unwrap();
16571539

16581540
assert_eq!(result.len(), 3);
16591541

@@ -1740,7 +1622,7 @@ mod tests {
17401622
Arc::new(ConcreteDataType::boolean_datatype()),
17411623
));
17421624

1743-
let pb_value = to_proto_value(value);
1625+
let pb_value = value_to_grpc_value(value).unwrap();
17441626

17451627
match pb_value.value_data.unwrap() {
17461628
ValueData::ListValue(pb_list_value) => {
@@ -1769,7 +1651,7 @@ mod tests {
17691651
.unwrap(),
17701652
);
17711653

1772-
let pb_value = to_proto_value(value);
1654+
let pb_value = value_to_grpc_value(value).unwrap();
17731655

17741656
match pb_value.value_data.unwrap() {
17751657
ValueData::StructValue(pb_struct_value) => {

src/common/recordbatch/src/recordbatch.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
2020
use datafusion_common::arrow::array::ArrayRef;
2121
use datafusion_common::arrow::compute;
2222
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23-
use datatypes::arrow::array::RecordBatchOptions;
23+
use datatypes::arrow::array::{Array, RecordBatchOptions};
24+
use datatypes::arrow::datatypes::{Field, Schema};
2425
use datatypes::prelude::DataType;
2526
use datatypes::schema::SchemaRef;
2627
use datatypes::vectors::{Helper, VectorRef};
@@ -31,7 +32,7 @@ use snafu::{OptionExt, ResultExt, ensure};
3132
use crate::DfRecordBatch;
3233
use crate::error::{
3334
self, ArrowComputeSnafu, CastVectorSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
34-
ProjectArrowRecordBatchSnafu, Result,
35+
ProjectArrowRecordBatchSnafu, Result, SchemaConversionSnafu,
3536
};
3637

3738
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -60,6 +61,8 @@ impl RecordBatch {
6061
// TODO(LFC): Remove the casting here once `Batch` is no longer used.
6162
let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
6263

64+
let schema = maybe_amend_with_struct_arrays(schema, &arrow_arrays)?;
65+
6366
let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
6467
.context(error::NewDfRecordBatchSnafu)?;
6568

@@ -249,6 +252,33 @@ impl RecordBatch {
249252
}
250253
}
251254

255+
fn maybe_amend_with_struct_arrays(schema: SchemaRef, arrays: &[ArrayRef]) -> Result<SchemaRef> {
256+
if arrays
257+
.iter()
258+
.any(|x| matches!(x.data_type(), ArrowDataType::Struct(_)))
259+
{
260+
let schema = schema.arrow_schema();
261+
let mut fields = Vec::with_capacity(schema.fields().len());
262+
for (f, a) in schema.fields().iter().zip(arrays.iter()) {
263+
let field_type = f.data_type();
264+
let array_type = a.data_type();
265+
match (field_type, array_type) {
266+
(ArrowDataType::Struct(_), ArrowDataType::Struct(x)) => {
267+
let field = Field::new_struct(f.name(), x.clone(), f.is_nullable());
268+
fields.push(Arc::new(field));
269+
}
270+
_ => fields.push(f.clone()),
271+
}
272+
}
273+
let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
274+
let schema = schema.try_into().context(SchemaConversionSnafu)?;
275+
Ok(Arc::new(schema))
276+
} else {
277+
// Fast path: must not need to amend if there are no struct arrays.
278+
Ok(schema)
279+
}
280+
}
281+
252282
impl Serialize for RecordBatch {
253283
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
254284
where

src/common/sql/src/convert.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,15 @@ pub fn sql_value_to_value(
228228
}
229229
}
230230

231-
if value.data_type() != *data_type {
231+
let value_datatype = value.data_type();
232+
// The datatype of json value is determined by its actual data, so we can't simply "cast" it here.
233+
if value_datatype.is_json() || value_datatype == *data_type {
234+
Ok(value)
235+
} else {
232236
datatypes::types::cast(value, data_type).with_context(|_| InvalidCastSnafu {
233237
sql_value: sql_val.clone(),
234238
datatype: data_type,
235239
})
236-
} else {
237-
Ok(value)
238240
}
239241
}
240242

src/datatypes/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod decimal_type;
2020
mod dictionary_type;
2121
mod duration_type;
2222
mod interval_type;
23-
mod json_type;
23+
pub(crate) mod json_type;
2424
mod list_type;
2525
mod null_type;
2626
mod primitive_type;

src/datatypes/src/types/json_type.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::vectors::json::builder::JsonVectorBuilder;
3535
use crate::vectors::{BinaryVectorBuilder, MutableVector};
3636

3737
pub const JSON_TYPE_NAME: &str = "Json";
38-
const JSON_PLAIN_FIELD_NAME: &str = "__plain__";
38+
const JSON_PLAIN_FIELD_NAME: &str = "__json_plain__";
3939
const JSON_PLAIN_FIELD_METADATA_KEY: &str = "is_plain_json";
4040

4141
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]

0 commit comments

Comments
 (0)