Skip to content

Commit 55d5932

Browse files
authored
Fix/aligned tv page seal (#734)
* fix readme logo * fix readme logo * fix readme badge * tmp * add ut * mvn spotless:apply * tmp * try to fix ut * Align C++ aligned-model page sealing with the Java behavior and fix reader handling of null-only value pages so that Debug builds pass. * fix ut * Add strict_page_size switch to optimize aligned tablet writing. In non-strict mode, disable per-write auto page sealing and seal value pages at time-page boundaries to reduce overhead while preserving aligned page semantics. * fix QueryByRowFasterThanManualNext tolerance * fix time_page_row_ends.reserve
1 parent aa77798 commit 55d5932

16 files changed

+867
-45
lines changed

cpp/src/common/config/config.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ typedef struct ConfigValue {
4646
TSEncoding double_encoding_type_;
4747
TSEncoding string_encoding_type_;
4848
CompressionType default_compression_type_;
49+
// When true, aligned writer enforces page size limit strictly by
50+
// interleaving time/value writes and sealing pages together when any side
51+
// becomes full.
52+
// When false, aligned writer may disable some page-size checks to improve
53+
// write performance.
54+
bool strict_page_size_ = true;
4955
} ConfigValue;
5056

5157
extern void init_config_value();
@@ -57,6 +63,7 @@ extern void set_config_value();
5763
extern void config_set_page_max_point_count(uint32_t page_max_point_count);
5864
extern void config_set_max_degree_of_index_node(
5965
uint32_t max_degree_of_index_node);
66+
extern void config_set_strict_page_size(bool strict_page_size);
6067

6168
} // namespace common
6269

cpp/src/common/global.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ void init_config_value() {
6060
#else
6161
g_config_value_.default_compression_type_ = UNCOMPRESSED;
6262
#endif
63+
// Enforce aligned page size limits strictly by default.
64+
g_config_value_.strict_page_size_ = true;
6365
}
6466

6567
extern TSEncoding get_value_encoder(TSDataType data_type) {
@@ -104,6 +106,10 @@ void config_set_max_degree_of_index_node(uint32_t max_degree_of_index_node) {
104106
g_config_value_.max_degree_of_index_node_ = max_degree_of_index_node;
105107
}
106108

109+
void config_set_strict_page_size(bool strict_page_size) {
110+
g_config_value_.strict_page_size_ = strict_page_size;
111+
}
112+
107113
void set_config_value() {}
108114
const char* s_data_type_names[8] = {"BOOLEAN", "INT32", "INT64", "FLOAT",
109115
"DOUBLE", "TEXT", "VECTOR", "STRING"};

cpp/src/reader/aligned_chunk_reader.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,6 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
562562
row_appender.append_null(1); \
563563
continue; \
564564
} \
565-
assert(value_decoder_->has_remaining(value_in)); \
566565
if (!value_decoder_->has_remaining(value_in)) { \
567566
return common::E_DATA_INCONSISTENCY; \
568567
} \
@@ -609,7 +608,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
609608
row_appender.append_null(1);
610609
continue;
611610
}
612-
assert(value_decoder_->has_remaining(value_in));
613611
if (!value_decoder_->has_remaining(value_in)) {
614612
return common::E_DATA_INCONSISTENCY;
615613
}
@@ -695,7 +693,6 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
695693
}
696694

697695
if (should_read_data) {
698-
assert(value_decoder_->has_remaining(value_in));
699696
if (!value_decoder_->has_remaining(value_in)) {
700697
return E_DATA_INCONSISTENCY;
701698
}

cpp/src/reader/qds_without_timegenerator.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,14 @@ int QDSWithoutTimeGenerator::next(bool& has_next) {
167167

168168
uint32_t len = 0;
169169
uint32_t idx = heap_time_.begin()->second;
170+
bool is_null_val = false;
170171
auto val_datatype = value_iters_[idx]->get_data_type();
171-
void* val_ptr = value_iters_[idx]->read(&len);
172+
void* val_ptr = value_iters_[idx]->read(&len, &is_null_val);
172173
if (!skip_row) {
173-
row_record_->get_field(idx + 1)->set_value(val_datatype,
174-
val_ptr, len, pa_);
174+
if (!is_null_val) {
175+
row_record_->get_field(idx + 1)->set_value(
176+
val_datatype, val_ptr, len, pa_);
177+
}
175178
}
176179
value_iters_[idx]->next();
177180

@@ -219,10 +222,14 @@ int QDSWithoutTimeGenerator::next(bool& has_next) {
219222
std::multimap<int64_t, uint32_t>::iterator iter = heap_time_.find(time);
220223
for (uint32_t i = 0; i < count; ++i) {
221224
uint32_t len = 0;
225+
bool is_null_val = false;
222226
auto val_datatype = value_iters_[iter->second]->get_data_type();
223-
void* val_ptr = value_iters_[iter->second]->read(&len);
224-
row_record_->get_field(iter->second + 1)
225-
->set_value(val_datatype, val_ptr, len, pa_);
227+
void* val_ptr =
228+
value_iters_[iter->second]->read(&len, &is_null_val);
229+
if (!is_null_val) {
230+
row_record_->get_field(iter->second + 1)
231+
->set_value(val_datatype, val_ptr, len, pa_);
232+
}
226233
value_iters_[iter->second]->next();
227234
if (!time_iters_[iter->second]->end()) {
228235
int64_t timev =

cpp/src/writer/time_chunk_writer.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ int TimeChunkWriter::end_encode_chunk() {
173173
chunk_header_.data_size_ = chunk_data_.total_size();
174174
chunk_header_.num_of_pages_ = num_of_pages_;
175175
}
176+
} else if (num_of_pages_ > 0) {
177+
chunk_header_.data_size_ = chunk_data_.total_size();
178+
chunk_header_.num_of_pages_ = num_of_pages_;
176179
}
177180
#if DEBUG_SE
178181
std::cout << "end_encode_time_chunk: num_of_pages_=" << num_of_pages_

cpp/src/writer/time_chunk_writer.h

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class TimeChunkWriter {
4242
first_page_data_(),
4343
first_page_statistic_(nullptr),
4444
chunk_header_(),
45-
num_of_pages_(0) {}
45+
num_of_pages_(0),
46+
enable_page_seal_if_full_(true) {}
4647
~TimeChunkWriter() { destroy(); }
4748
int init(const common::ColumnSchema& col_schema);
4849
int init(const std::string& measurement_name, common::TSEncoding encoding,
@@ -57,8 +58,12 @@ class TimeChunkWriter {
5758
if (RET_FAIL(time_page_writer_.write(timestamp))) {
5859
return ret;
5960
}
60-
if (RET_FAIL(seal_cur_page_if_full())) {
61+
if (UNLIKELY(!enable_page_seal_if_full_)) {
6162
return ret;
63+
} else {
64+
if (RET_FAIL(seal_cur_page_if_full())) {
65+
return ret;
66+
}
6267
}
6368
return ret;
6469
}
@@ -68,10 +73,33 @@ class TimeChunkWriter {
6873
Statistic* get_chunk_statistic() { return chunk_statistic_; }
6974
FORCE_INLINE int32_t num_of_pages() const { return num_of_pages_; }
7075

76+
// Current (unsealed) page point count.
77+
FORCE_INLINE uint32_t get_point_numer() const {
78+
return time_page_writer_.get_point_numer();
79+
}
80+
7181
int64_t estimate_max_series_mem_size();
7282

7383
bool hasData();
7484

85+
/** True if the current (unsealed) page has at least one point. */
86+
bool has_current_page_data() const {
87+
return time_page_writer_.get_point_numer() > 0;
88+
}
89+
90+
/**
91+
* Force seal the current page (for aligned model: when any aligned page
92+
* seals due to memory/point threshold, all pages must seal together).
93+
* @return E_OK on success.
94+
*/
95+
int seal_current_page() { return seal_cur_page(false); }
96+
97+
// For aligned writer: allow disabling the automatic page-size/point-number
98+
// check so the caller can seal pages at chosen boundaries.
99+
FORCE_INLINE void set_enable_page_seal_if_full(bool enable) {
100+
enable_page_seal_if_full_ = enable;
101+
}
102+
75103
private:
76104
FORCE_INLINE bool is_cur_page_full() const {
77105
// FIXME
@@ -110,6 +138,8 @@ class TimeChunkWriter {
110138

111139
ChunkHeader chunk_header_;
112140
int32_t num_of_pages_;
141+
// If false, write() won't auto-seal when the current page becomes full.
142+
bool enable_page_seal_if_full_;
113143
};
114144

115145
} // end namespace storage

0 commit comments

Comments
 (0)