Skip to content

Commit 83fc358

Browse files
authored
[YQ-4797] Watermark: RD: reduce CPU consumption, fix (#27563)
1 parent 2284193 commit 83fc358

File tree

2 files changed

+23
-19
lines changed

2 files changed

+23
-19
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -251,17 +251,17 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
251251
Y_ENSURE(false, "Expected embedded or list from purecalc");
252252
}
253253

254-
const auto offset = Self.Offsets->at(rowId);
255-
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) {
256-
LOG_ROW_DISPATCHER_TRACE("OnData, skip historical offset: " << offset << ", next message offset: " << *nextOffset);
254+
Offset = Self.Offsets->at(rowId);
255+
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) {
256+
LOG_ROW_DISPATCHER_TRACE("OnData, skip historical offset: " << Offset << ", next message offset: " << *nextOffset);
257257
return;
258258
}
259259

260-
FilteredOffsets.push_back(offset);
261-
262260
auto newNumberRows = NumberRows;
263261
auto newDataPackerSize = DataPackerSize;
264262
if (filter) {
263+
FilteredOffsets.push_back(Offset);
264+
265265
Y_DEFER {
266266
// Values allocated on parser allocator and should be released
267267
FilteredRow.assign(Columns.size(), NYql::NUdf::TUnboxedValue());
@@ -280,7 +280,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
280280
newDataPackerSize = DataPacker->PackedSizeEstimate();
281281
}
282282

283-
OnWatermark(offset, maybeWatermark);
283+
OnWatermark(Offset, maybeWatermark);
284284

285285
const auto numberRows = newNumberRows - NumberRows;
286286
const auto rowSize = newDataPackerSize - DataPackerSize;
@@ -289,9 +289,9 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
289289
return;
290290
}
291291

292-
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);
292+
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);
293293

294-
Client->AddDataToClient(offset, numberRows, rowSize, Watermark);
294+
Client->AddDataToClient(Offset, numberRows, rowSize, Watermark);
295295

296296
NumberRows = newNumberRows;
297297
DataPackerSize = newDataPackerSize;
@@ -322,6 +322,9 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
322322
void FinishPacking() {
323323
if (!DataPacker->IsEmpty() || !Watermark.Empty()) {
324324
LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size());
325+
if (FilteredOffsets.empty()) {
326+
FilteredOffsets.push_back(Offset);
327+
}
325328
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark);
326329
NumberRows = 0;
327330
DataPackerSize = 0;
@@ -340,6 +343,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
340343
bool ClientStarted = false;
341344

342345
// Filtered data
346+
ui64 Offset;
343347
ui64 NumberRows = 0;
344348
ui64 DataPackerSize = 0;
345349
TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket

ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,8 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
342342
));
343343

344344
messages = TVector<TMessages>{
345-
{{firstOffset + 0, firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event1").AddString("str_first__large__"))},
346-
{{firstOffset + 2, firstOffset + 3}, {}, TBatch().AddRow(TRow().AddString("event3").AddString("str_first__large__"))},
345+
{{firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event1").AddString("str_first__large__"))},
346+
{{firstOffset + 3}, {}, TBatch().AddRow(TRow().AddString("event3").AddString("str_first__large__"))},
347347
};
348348
CheckSuccess(MakeClient(
349349
{commonColumn, {"column_0", "[DataType; String]"}},
@@ -354,7 +354,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
354354
));
355355

356356
messages = TVector<TMessages>{
357-
{{firstOffset + 0, firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event0").AddString("str_second"))},
357+
{{firstOffset + 0}, {}, TBatch().AddRow(TRow().AddString("event0").AddString("str_second"))},
358358
};
359359
CheckSuccess(MakeClient(
360360
{commonColumn, {"column_1", "[DataType; String]"}},
@@ -656,13 +656,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
656656

657657
auto messages = TVector<TMessages>{
658658
{
659-
{firstOffset + 2, firstOffset + 3},
659+
{firstOffset + 2},
660660
TInstant::Seconds(40),
661661
TBatch()
662662
.AddRow(TRow().AddString("1970-01-01T00:00:44Z").AddUint64(1))
663663
},
664664
{
665-
{firstOffset + 4, firstOffset + 5},
665+
{firstOffset + 4},
666666
TInstant::Seconds(42),
667667
TBatch()
668668
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
@@ -683,13 +683,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
683683

684684
messages = TVector<TMessages>{
685685
{
686-
{firstOffset + 4, firstOffset + 5},
686+
{firstOffset + 4},
687687
TInstant::Seconds(42),
688688
TBatch()
689689
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
690690
},
691691
{
692-
{firstOffset + 6, firstOffset + 7},
692+
{firstOffset + 6},
693693
TInstant::Seconds(44),
694694
TBatch()
695695
.AddRow(TRow().AddString("1970-01-01T00:00:48Z").AddUint64(1))
@@ -733,12 +733,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
733733

734734
auto messages = TVector<TMessages>{
735735
{
736-
{firstOffset + 2, firstOffset + 3},
736+
{firstOffset + 3},
737737
TInstant::Seconds(40),
738738
TBatch()
739739
},
740740
{
741-
{firstOffset + 4, firstOffset + 5},
741+
{firstOffset + 5},
742742
TInstant::Seconds(42),
743743
TBatch()
744744
},
@@ -758,12 +758,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
758758

759759
messages = TVector<TMessages>{
760760
{
761-
{firstOffset + 4, firstOffset + 5},
761+
{firstOffset + 5},
762762
TInstant::Seconds(42),
763763
TBatch()
764764
},
765765
{
766-
{firstOffset + 6, firstOffset + 7},
766+
{firstOffset + 7},
767767
TInstant::Seconds(44),
768768
TBatch()
769769
},

0 commit comments

Comments
 (0)