Skip to content

Commit bca3401

Browse files
authored
fix: enhance AvroInputStreamImpl with improved buffer management and exception boundary handling (#143)
1 parent 1a1c34e commit bca3401

File tree

8 files changed

+92
-71
lines changed

8 files changed

+92
-71
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ This product includes code from Apache Iceberg C++.
268268
* src/paimon/format/avro/avro_direct_decoder.h
269269
* src/paimon/format/avro/avro_direct_encoder.cpp
270270
* src/paimon/format/avro/avro_direct_encoder.h
271+
* Avro input stream in src/paimon/format/avro/avro_direct_decoder.cpp
271272

272273
Copyright: 2024-2025 The Apache Software Foundation.
273274
Home page: https://iceberg.apache.org/

src/paimon/core/utils/duration.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ class Duration {
3232
}
3333

3434
uint64_t Reset() {
35-
uint64_t dura = Get();
35+
uint64_t duration = Get();
3636
start_ = std::chrono::high_resolution_clock::now();
37-
return dura;
37+
return duration;
3838
}
3939

4040
private:

src/paimon/format/avro/avro_direct_encoder.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node,
289289
element_node->leaves() != 2)) {
290290
return Status::Invalid(
291291
fmt::format("Expected AVRO_RECORD for map key-value pair, got {}",
292-
AvroUtils::ToString(avro_node)));
292+
AvroUtils::ToString(element_node)));
293293
}
294294

295295
const auto& map_array =

src/paimon/format/avro/avro_input_stream_impl.cpp

Lines changed: 60 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,13 @@
1414
* limitations under the License.
1515
*/
1616

17+
// Adapted from Apache Iceberg C++
18+
// https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_stream_internal.cc
19+
1720
#include "paimon/format/avro/avro_input_stream_impl.h"
1821

1922
#include <algorithm>
23+
#include <memory>
2024
#include <string>
2125
#include <utility>
2226

@@ -39,82 +43,85 @@ AvroInputStreamImpl::AvroInputStreamImpl(const std::shared_ptr<paimon::InputStre
3943
size_t buffer_size, const uint64_t total_length,
4044
const std::shared_ptr<MemoryPool>& pool)
4145
: pool_(pool),
46+
in_(input_stream),
4247
buffer_size_(buffer_size),
4348
total_length_(total_length),
44-
buffer_(reinterpret_cast<uint8_t*>(pool_->Malloc(buffer_size))),
45-
in_(input_stream),
46-
byte_count_(0),
47-
next_(buffer_),
48-
available_(0) {}
49+
buffer_(reinterpret_cast<uint8_t*>(pool_->Malloc(buffer_size))) {}
4950

5051
AvroInputStreamImpl::~AvroInputStreamImpl() {
5152
pool_->Free(buffer_, buffer_size_);
5253
}
5354

54-
bool AvroInputStreamImpl::next(const uint8_t** data, size_t* size) {
55-
if (available_ == 0 && !fill()) {
56-
return false;
55+
bool AvroInputStreamImpl::next(const uint8_t** data, size_t* len) {
56+
// Return all unconsumed data in the buffer
57+
if (buffer_pos_ < available_bytes_) {
58+
*data = buffer_ + buffer_pos_;
59+
*len = available_bytes_ - buffer_pos_;
60+
byte_count_ += available_bytes_ - buffer_pos_;
61+
buffer_pos_ = available_bytes_;
62+
return true;
63+
}
64+
65+
// Read from the input stream when the buffer is empty
66+
uint64_t remaining = total_length_ - stream_pos_;
67+
if (remaining == 0) {
68+
return false; // eof
69+
}
70+
auto read_length =
71+
in_->Read(reinterpret_cast<char*>(buffer_), std::min(buffer_size_, remaining));
72+
if (!read_length.ok()) {
73+
throw ::avro::Exception("Read failed: {}", read_length.status().ToString());
5774
}
58-
*data = next_;
59-
*size = available_;
60-
next_ += available_;
61-
byte_count_ += available_;
62-
available_ = 0;
75+
available_bytes_ = read_length.value();
76+
stream_pos_ += available_bytes_;
77+
buffer_pos_ = 0;
78+
79+
// Return the whole buffer
80+
*data = buffer_;
81+
*len = available_bytes_;
82+
byte_count_ += available_bytes_;
83+
buffer_pos_ = available_bytes_;
84+
6385
return true;
6486
}
6587

6688
void AvroInputStreamImpl::backup(size_t len) {
67-
next_ -= len;
68-
available_ += len;
89+
if (len > buffer_pos_) {
90+
throw ::avro::Exception("Cannot backup {} bytes, only {} bytes available", len,
91+
buffer_pos_);
92+
}
93+
94+
buffer_pos_ -= len;
6995
byte_count_ -= len;
7096
}
7197

7298
void AvroInputStreamImpl::skip(size_t len) {
73-
while (len > 0) {
74-
if (available_ == 0) {
75-
auto s = in_->Seek(len, paimon::FS_SEEK_CUR);
76-
if (!s.ok()) {
77-
throw ::avro::Exception(s.ToString());
78-
}
79-
byte_count_ += len;
80-
total_read_len_ += len;
81-
return;
82-
}
83-
size_t n = std::min(available_, len);
84-
available_ -= n;
85-
next_ += n;
86-
len -= n;
87-
byte_count_ += n;
99+
// The range to skip is within the buffer
100+
if (buffer_pos_ + len <= available_bytes_) {
101+
buffer_pos_ += len;
102+
byte_count_ += len;
103+
return;
88104
}
105+
seek(byte_count_ + len);
89106
}
90107

91-
void AvroInputStreamImpl::seek(int64_t position) {
92-
auto s = in_->Seek(position - byte_count_ - available_, paimon::FS_SEEK_CUR);
93-
if (!s.ok()) {
94-
throw ::avro::Exception(s.ToString());
95-
}
96-
byte_count_ = position;
97-
total_read_len_ = position;
98-
available_ = 0;
108+
size_t AvroInputStreamImpl::byteCount() const {
109+
return byte_count_;
99110
}
100111

101-
bool AvroInputStreamImpl::fill() {
102-
if (static_cast<uint64_t>(total_read_len_) >= total_length_) {
103-
// eof
104-
return false;
105-
}
106-
Result<int32_t> actual_len = in_->Read(reinterpret_cast<char*>(buffer_),
107-
std::min(buffer_size_, total_length_ - total_read_len_));
108-
if (!actual_len.ok()) {
109-
throw ::avro::Exception(actual_len.status().ToString());
112+
void AvroInputStreamImpl::seek(int64_t position) {
113+
if (static_cast<uint64_t>(position) > total_length_) {
114+
throw ::avro::Exception("Cannot seek to {}, total length is {}", position, total_length_);
110115
}
111-
total_read_len_ += actual_len.value();
112-
if (actual_len.value() != 0) {
113-
next_ = buffer_;
114-
available_ = actual_len.value();
115-
return true;
116+
auto status = in_->Seek(position, SeekOrigin::FS_SEEK_SET);
117+
if (!status.ok()) {
118+
throw ::avro::Exception("Failed to seek to {}, got {}", position, status.ToString());
116119
}
117-
return false;
120+
121+
stream_pos_ = position;
122+
buffer_pos_ = 0;
123+
available_bytes_ = 0;
124+
byte_count_ = position;
118125
}
119126

120127
} // namespace paimon::avro

src/paimon/format/avro/avro_input_stream_impl.h

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,29 +39,26 @@ class AvroInputStreamImpl : public ::avro::SeekableInputStream {
3939

4040
~AvroInputStreamImpl() override;
4141

42-
bool next(const uint8_t** data, size_t* size) override;
42+
bool next(const uint8_t** data, size_t* len) override;
4343
void backup(size_t len) override;
4444
void skip(size_t len) override;
45-
size_t byteCount() const override {
46-
return byte_count_;
47-
}
45+
size_t byteCount() const override;
4846
void seek(int64_t position) override;
4947

5048
private:
5149
AvroInputStreamImpl(const std::shared_ptr<paimon::InputStream>& input_stream,
5250
size_t buffer_size, const uint64_t length,
5351
const std::shared_ptr<MemoryPool>& pool);
54-
bool fill();
5552

5653
std::shared_ptr<MemoryPool> pool_;
54+
std::shared_ptr<paimon::InputStream> in_;
5755
const size_t buffer_size_;
5856
const uint64_t total_length_;
5957
uint8_t* const buffer_;
60-
std::shared_ptr<paimon::InputStream> in_;
61-
size_t byte_count_;
62-
uint8_t* next_;
63-
size_t available_;
64-
int32_t total_read_len_ = 0;
58+
size_t byte_count_ = 0; // bytes position in the avro input stream
59+
size_t stream_pos_ = 0; // current position in the paimon input stream
60+
size_t buffer_pos_ = 0; // next position to read in the buffer
61+
size_t available_bytes_ = 0; // bytes available in the buffer
6562
};
6663

6764
} // namespace paimon::avro

src/paimon/format/avro/avro_input_stream_impl_test.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,21 @@ TEST(AvroInputStreamImplTest, TestSkip) {
105105
ASSERT_TRUE(stream->next(&data, &size));
106106
ASSERT_EQ(size, 5);
107107
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "fghij");
108-
stream->skip(5);
109-
ASSERT_EQ(stream->byteCount(), 15);
110-
ASSERT_FALSE(stream->next(&data, &size));
108+
ASSERT_THROW(stream->skip(5), ::avro::Exception); // already eof, cannot skip more
109+
ASSERT_EQ(stream->byteCount(), 10);
110+
ASSERT_FALSE(stream->next(&data, &size)); // reach eof
111+
112+
ASSERT_THROW(stream->backup(7), ::avro::Exception); // buffer item is 5, cannot backup 7
113+
stream->backup(4);
114+
ASSERT_EQ(stream->byteCount(), 6);
115+
stream->skip(2); // skip 2 bytes from the available buffer data
116+
ASSERT_EQ(stream->byteCount(), 8);
117+
118+
// verify we can read the remaining 2 bytes from buffer
119+
ASSERT_TRUE(stream->next(&data, &size));
120+
ASSERT_EQ(size, 2);
121+
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "ij");
122+
ASSERT_EQ(stream->byteCount(), 10);
111123
}
112124

113125
TEST(AvroInputStreamImplTest, TestSkipWithAvailableData) {
@@ -169,6 +181,10 @@ TEST(AvroInputStreamImplTest, TestSeek) {
169181
ASSERT_EQ(size, 5);
170182
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "abcde");
171183
stream->seek(2);
184+
ASSERT_EQ(stream->byteCount(), 2);
185+
186+
// after seek, buffer will be cleared, cannot backup
187+
ASSERT_THROW(stream->backup(2), ::avro::Exception);
172188
ASSERT_TRUE(stream->next(&data, &size));
173189
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "cdefg");
174190
ASSERT_EQ(stream->byteCount(), 7);

src/paimon/format/lance/lance_format_reader_writer_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ TEST_F(LanceFileReaderWriterTest, TestReachTargetSize) {
387387
}
388388
ASSERT_OK(writer->Flush());
389389
ASSERT_OK(writer->Finish());
390-
// test reach targe size
390+
// test reach target size
391391
ASSERT_TRUE(reach_target_size);
392392
auto fs = std::make_shared<LocalFileSystem>();
393393
ASSERT_OK_AND_ASSIGN(auto file_status, fs->GetFileStatus(file_path));

src/paimon/global_index/lucene/lucene_global_index_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ TEST_P(LuceneGlobalIndexTest, TestSimpleChinese) {
306306
["最近开源了一个新项目叫qianwen(全角字符),功能类似之前的 Qianwen,是一个面向 AI 应用的智能助手。它不仅支持 Machine Learning 和 NLP 技术,还提供了可扩展的开发框架,便于开发者构建自己的智能助手系统。"],
307307
["我们在测试 qianwen-core v1.2 和 ai-engine-alpha 中的 bug,重点优化了 qianwen 的响应速度和稳定性。本次更新增强了核心模块的功能,提升了智能助手的开发效率,并修复了与 NLP 模块相关的多个问题。"],
308308
["AI 助手开发中常用的技术包括 Speech Recognition、Natural Language Processing 和 Recommendation System。我们使用 TensorFlow 和 PyTorch 构建模型,开发了多个智能助手原型,支持语音交互和上下文理解功能,是当前热门的人工智能发展应用方向。"],
309-
["新一代的 AI 助手代号为「千问」,内部命名为 QianwenX-2024,计划在 next quarter 发布。QianwenX 将集成更强的 multimodal 能力,支持图像和文本联合处理,进一步提升智能助手的理解能力和交互体验,是未来智能助手的重要发展方向。"]
309+
["新一代的 AI 助手代号为「千问」,内部命名为 QianwenX-2024,计划在 next quarter 发布。QianwenX 将集成更强的 multimodel 能力,支持图像和文本联合处理,进一步提升智能助手的理解能力和交互体验,是未来智能助手的重要发展方向。"]
310310
])")
311311
.ValueOrDie();
312312

0 commit comments

Comments
 (0)