Skip to content

Commit 0926de1

Browse files
committed
Precharge in EvWrite
1 parent 52cd0e1 commit 0926de1

File tree

4 files changed

+69
-2
lines changed

4 files changed

+69
-2
lines changed

ydb/core/kqp/ut/query/kqp_query_ut.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <fmt/format.h>
12
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
23

34
#include <ydb/core/tx/datashard/datashard_failpoints.h>
@@ -2634,6 +2635,35 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
26342635
}
26352636
}
26362637

2638+
Y_UNIT_TEST(ExecuteWriteQuery) {
2639+
using namespace fmt::literals;
2640+
2641+
TKikimrRunner kikimr;
2642+
auto client = kikimr.GetQueryClient();
2643+
2644+
{ // Just generate table
2645+
const auto sql = fmt::format(R"(
2646+
CREATE TABLE test_table (
2647+
PRIMARY KEY (id)
2648+
) AS SELECT
2649+
ROW_NUMBER() OVER w AS id, data
2650+
FROM
2651+
AS_TABLE(ListReplicate(<|data: '{data}'|>, 500000))
2652+
WINDOW
2653+
w AS (ORDER BY data))",
2654+
"data"_a = std::string(137, 'a')
2655+
);
2656+
const auto result = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2657+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2658+
}
2659+
2660+
Cerr << TInstant::Now() << " --------------- Start update ---------------\n";
2661+
2662+
const auto hangingResult = client.ExecuteQuery(R"(
2663+
UPDATE test_table SET data = "a"
2664+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2665+
UNIT_ASSERT_VALUES_EQUAL_C(hangingResult.GetStatus(), EStatus::SUCCESS, hangingResult.GetIssues().ToString());
2666+
}
26372667
}
26382668

26392669
} // namespace NKqp

ydb/core/tx/datashard/datashard_user_db.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,16 @@ void TDataShardUserDb::EraseRow(
178178
Counters.EraseRowBytes += keyBytes + 8;
179179
}
180180

181+
bool TDataShardUserDb::PrechargeRow(
182+
const TTableId& tableId,
183+
const TArrayRef<const TRawTypeValue> key)
184+
{
185+
auto localTableId = Self.GetLocalTableId(tableId);
186+
Y_ENSURE(localTableId != 0, "Unexpected PrechargeRow for an unknown table");
187+
188+
return Db.Precharge(localTableId, key, key, {}, 0, Max<ui64>(), Max<ui64>());
189+
}
190+
181191
void TDataShardUserDb::IncreaseUpdateCounters(
182192
const TArrayRef<const TRawTypeValue> key,
183193
const TArrayRef<const NIceDb::TUpdateOp> ops)

ydb/core/tx/datashard/datashard_user_db.h

+4
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ class TDataShardUserDb final
176176
NMiniKQL::TEngineHostCounters& GetCounters();
177177
const NMiniKQL::TEngineHostCounters& GetCounters() const;
178178

179+
bool PrechargeRow(
180+
const TTableId& tableId,
181+
const TArrayRef<const TRawTypeValue> key);
182+
179183
private:
180184
static TSmallVec<TCell> ConvertTableKeys(const TArrayRef<const TRawTypeValue> key);
181185

ydb/core/tx/datashard/execute_write_unit.cpp

+25-2
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
131131
}
132132
};
133133

134-
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
135-
{
134+
auto fillKey = [&](ui32 rowIdx, TSmallVec<TRawTypeValue>& key) {
136135
key.clear();
137136
key.reserve(userTable.KeyColumnIds.size());
138137
for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
@@ -145,6 +144,28 @@ class TExecuteWriteUnit : public TExecutionUnit {
145144
key.emplace_back(cell.Data(), cell.Size(), vtypeId);
146145
}
147146
}
147+
};
148+
149+
// Precharge
150+
151+
switch (operationType) {
152+
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT:
153+
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE: {
154+
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) {
155+
fillKey(rowIdx, key);
156+
userDb.PrechargeRow(fullTableId, key);
157+
}
158+
break;
159+
}
160+
default:
161+
break;
162+
}
163+
164+
// Main update cycle
165+
166+
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
167+
{
168+
fillKey(rowIdx, key);
148169

149170
switch (operationType) {
150171
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT: {
@@ -177,6 +198,8 @@ class TExecuteWriteUnit : public TExecutionUnit {
177198
}
178199
}
179200

201+
// Counters
202+
180203
switch (operationType) {
181204
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT:
182205
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE:

0 commit comments

Comments
 (0)