Skip to content

Commit f39c679

Browse files
refactor: remove Vectors from RecordBatch completely
Signed-off-by: luofucong <[email protected]>
1 parent 605f327 commit f39c679

File tree

41 files changed

+762
-922
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+762
-922
lines changed

src/client/src/database.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,10 @@ impl Database {
435435
.context(ExternalSnafu)?;
436436
match flight_message {
437437
FlightMessage::RecordBatch(arrow_batch) => {
438-
yield RecordBatch::try_from_df_record_batch(
438+
yield Ok(RecordBatch::from_df_record_batch(
439439
schema_cloned.clone(),
440440
arrow_batch,
441-
)
441+
))
442442
}
443443
FlightMessage::Metrics(_) => {}
444444
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {

src/client/src/region.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,8 @@ impl RegionRequester {
182182

183183
match flight_message {
184184
FlightMessage::RecordBatch(record_batch) => {
185-
let result_to_yield = RecordBatch::try_from_df_record_batch(
186-
schema_cloned.clone(),
187-
record_batch,
188-
);
185+
let result_to_yield =
186+
RecordBatch::from_df_record_batch(schema_cloned.clone(), record_batch);
189187

190188
// get the next message from the stream. normally it should be a metrics message.
191189
if let Some(next_flight_message_result) = flight_message_stream.next().await
@@ -219,7 +217,7 @@ impl RegionRequester {
219217
stream_ended = true;
220218
}
221219

222-
yield result_to_yield;
220+
yield Ok(result_to_yield);
223221
}
224222
FlightMessage::Metrics(s) => {
225223
// just a branch in case of some metrics message comes after other things.

src/common/query/src/error.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@ pub enum Error {
5252
data_type: ArrowDatatype,
5353
},
5454

55-
#[snafu(display("Failed to downcast vector: {}", err_msg))]
56-
DowncastVector { err_msg: String },
57-
5855
#[snafu(display("Invalid input type: {}", err_msg))]
5956
InvalidInputType {
6057
#[snafu(implicit)]
@@ -209,8 +206,7 @@ pub type Result<T> = std::result::Result<T, Error>;
209206
impl ErrorExt for Error {
210207
fn status_code(&self) -> StatusCode {
211208
match self {
212-
Error::DowncastVector { .. }
213-
| Error::InvalidInputState { .. }
209+
Error::InvalidInputState { .. }
214210
| Error::ToScalarValue { .. }
215211
| Error::GetScalarVector { .. }
216212
| Error::ArrowCompute { .. }

src/common/recordbatch/src/adapter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,10 @@ impl Stream for RecordBatchStreamAdapter {
314314
metric_collector.record_batch_metrics,
315315
);
316316
}
317-
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
317+
Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch(
318318
self.schema(),
319319
df_record_batch,
320-
)))
320+
))))
321321
}
322322
Poll::Ready(None) => {
323323
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =

src/common/recordbatch/src/error.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,6 @@ pub enum Error {
133133
source: datatypes::error::Error,
134134
},
135135

136-
#[snafu(display(
137-
"Failed to downcast vector of type '{:?}' to type '{:?}'",
138-
from_type,
139-
to_type
140-
))]
141-
DowncastVector {
142-
from_type: ConcreteDataType,
143-
to_type: ConcreteDataType,
144-
#[snafu(implicit)]
145-
location: Location,
146-
},
147-
148136
#[snafu(display("Error occurs when performing arrow computation"))]
149137
ArrowCompute {
150138
#[snafu(source)]
@@ -217,8 +205,6 @@ impl ErrorExt for Error {
217205
| Error::PhysicalExpr { .. }
218206
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
219207

220-
Error::DowncastVector { .. } => StatusCode::Unexpected,
221-
222208
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
223209

224210
Error::ArrowCompute { .. } => StatusCode::IllegalState,

src/common/recordbatch/src/lib.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,20 @@ use adapter::RecordBatchMetrics;
3030
use arc_swap::ArcSwapOption;
3131
use common_base::readable_size::ReadableSize;
3232
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
33+
use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder};
3334
use datatypes::arrow::compute::SortOptions;
3435
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
3536
use datatypes::arrow::util::pretty;
3637
use datatypes::prelude::{ConcreteDataType, VectorRef};
37-
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
3838
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
3939
use datatypes::types::{JsonFormat, jsonb_to_string};
40-
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
4140
use error::Result;
4241
use futures::task::{Context, Poll};
4342
use futures::{Stream, TryStreamExt};
4443
pub use recordbatch::RecordBatch;
45-
use snafu::{OptionExt, ResultExt, ensure};
44+
use snafu::{ResultExt, ensure};
45+
46+
use crate::error::NewDfRecordBatchSnafu;
4647

4748
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
4849
fn name(&self) -> &str {
@@ -92,32 +93,26 @@ pub fn map_json_type_to_string(
9293
mapped_schema: &SchemaRef,
9394
) -> Result<RecordBatch> {
9495
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
95-
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
96+
for (vector, schema) in batch.columns().iter().zip(original_schema.column_schemas()) {
9697
if let ConcreteDataType::Json(j) = &schema.data_type {
9798
if matches!(&j.format, JsonFormat::Jsonb) {
98-
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
99-
let binary_vector = vector
100-
.as_any()
101-
.downcast_ref::<BinaryVector>()
102-
.with_context(|| error::DowncastVectorSnafu {
103-
from_type: schema.data_type.clone(),
104-
to_type: ConcreteDataType::binary_datatype(),
105-
})?;
106-
for value in binary_vector.iter_data() {
99+
let mut string_vector_builder = StringBuilder::new();
100+
let binary_vector = vector.as_binary::<i32>();
101+
for value in binary_vector.iter() {
107102
let Some(value) = value else {
108-
string_vector_builder.push(None);
103+
string_vector_builder.append_null();
109104
continue;
110105
};
111106
let string_value =
112107
jsonb_to_string(value).with_context(|_| error::CastVectorSnafu {
113108
from_type: schema.data_type.clone(),
114109
to_type: ConcreteDataType::string_datatype(),
115110
})?;
116-
string_vector_builder.push(Some(string_value.as_str()));
111+
string_vector_builder.append_value(string_value);
117112
}
118113

119114
let string_vector = string_vector_builder.finish();
120-
vectors.push(Arc::new(string_vector) as VectorRef);
115+
vectors.push(Arc::new(string_vector) as ArrayRef);
121116
} else {
122117
vectors.push(vector.clone());
123118
}
@@ -126,7 +121,15 @@ pub fn map_json_type_to_string(
126121
}
127122
}
128123

129-
RecordBatch::new(mapped_schema.clone(), vectors)
124+
let record_batch = datatypes::arrow::record_batch::RecordBatch::try_new(
125+
mapped_schema.arrow_schema().clone(),
126+
vectors,
127+
)
128+
.context(NewDfRecordBatchSnafu)?;
129+
Ok(RecordBatch::from_df_record_batch(
130+
mapped_schema.clone(),
131+
record_batch,
132+
))
130133
}
131134

132135
/// Maps the json type to string in the schema.
@@ -758,7 +761,7 @@ impl Stream for MemoryTrackedStream {
758761
let additional = batch
759762
.columns()
760763
.iter()
761-
.map(|c| c.memory_size())
764+
.map(|c| c.get_array_memory_size())
762765
.sum::<usize>();
763766

764767
if let Err(e) = self.permit.track(additional, self.total_tracked) {

0 commit comments

Comments
 (0)