@@ -84,8 +84,67 @@ class TExecuteWriteUnit : public TExecutionUnit {
84
84
return false ;
85
85
}
86
86
87
- EExecutionStatus OnTabletNotReadyException (TDataShardUserDb& userDb, TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx) {
87
+ void FillOps (const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<NTable::TUpdateOp>& ops) {
88
+ const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix ();
89
+ const auto & columnIds = validatedOperation.GetColumnIds ();
90
+
91
+ ops.clear ();
92
+ Y_ENSURE (matrix.GetColCount () >= userTable.KeyColumnIds .size ());
93
+ ops.reserve (matrix.GetColCount () - userTable.KeyColumnIds .size ());
94
+
95
+ for (ui16 valueColIdx = userTable.KeyColumnIds .size (); valueColIdx < matrix.GetColCount (); ++valueColIdx) {
96
+ ui32 columnTag = columnIds[valueColIdx];
97
+ const TCell& cell = matrix.GetCell (rowIdx, valueColIdx);
98
+
99
+ const NScheme::TTypeId vtypeId = scheme.GetColumnInfo (&tableInfo, columnTag)->PType .GetTypeId ();
100
+ ops.emplace_back (columnTag, NTable::ECellOp::Set, cell.IsNull () ? TRawTypeValue () : TRawTypeValue (cell.Data (), cell.Size (), vtypeId));
101
+ }
102
+ };
103
+
104
+ void FillKey (const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<TRawTypeValue>& key) {
105
+ const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix ();
106
+
107
+ key.clear ();
108
+ key.reserve (userTable.KeyColumnIds .size ());
109
+ for (ui16 keyColIdx = 0 ; keyColIdx < userTable.KeyColumnIds .size (); ++keyColIdx) {
110
+ const TCell& cell = matrix.GetCell (rowIdx, keyColIdx);
111
+ ui32 keyCol = tableInfo.KeyColumns [keyColIdx];
112
+ if (cell.IsNull ()) {
113
+ key.emplace_back ();
114
+ } else {
115
+ NScheme::TTypeId vtypeId = scheme.GetColumnInfo (&tableInfo, keyCol)->PType .GetTypeId ();
116
+ key.emplace_back (cell.Data (), cell.Size (), vtypeId);
117
+ }
118
+ }
119
+ };
120
+
121
+ EExecutionStatus OnTabletNotReadyException (TDataShardUserDb& userDb, TWriteOperation& writeOp, const TValidatedWriteTxOperation* validatedOperation, TTransactionContext& txc, const TActorContext& ctx) {
88
122
LOG_TRACE_S (ctx, NKikimrServices::TX_DATASHARD, " Tablet " << DataShard.TabletID () << " is not ready for " << writeOp << " execution" );
123
+
124
+ if (validatedOperation) {
125
+ const ui64 tableId = validatedOperation->GetTableId ().PathId .LocalPathId ;
126
+ const TTableId fullTableId (DataShard.GetPathOwnerId (), tableId);
127
+ const TUserTable& userTable = *DataShard.GetUserTables ().at (tableId);
128
+
129
+ const NTable::TScheme& scheme = txc.DB .GetScheme ();
130
+ const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo (userTable.LocalTid );
131
+
132
+ const TSerializedCellMatrix& matrix = validatedOperation->GetMatrix ();
133
+ const auto operationType = validatedOperation->GetOperationType ();
134
+
135
+ TSmallVec<TRawTypeValue> key;
136
+
137
+ // Precharge
138
+ if (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT ||
139
+ operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE ||
140
+ userDb.NeedToReadBeforeWrite (fullTableId))
141
+ {
142
+ for (ui32 rowIdx = 0 ; rowIdx < matrix.GetRowCount (); ++rowIdx) {
143
+ FillKey (scheme, userTable, tableInfo, *validatedOperation, rowIdx, key);
144
+ userDb.PrechargeRow (fullTableId, key);
145
+ }
146
+ }
147
+ }
89
148
90
149
DataShard.IncCounter (COUNTER_TX_TABLET_NOT_READY);
91
150
@@ -109,72 +168,28 @@ class TExecuteWriteUnit : public TExecutionUnit {
109
168
const TUserTable& userTable = *DataShard.GetUserTables ().at (tableId);
110
169
111
170
const NTable::TScheme& scheme = txc.DB .GetScheme ();
112
- const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo (userTable.LocalTid );
113
-
114
- TSmallVec<TRawTypeValue> key;
115
- TSmallVec<NTable::TUpdateOp> ops;
171
+ const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo (userTable.LocalTid );
116
172
117
173
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix ();
118
174
const auto operationType = validatedOperation.GetOperationType ();
119
175
120
- auto fillOps = [&](ui32 rowIdx) {
121
- ops.clear ();
122
- Y_ENSURE (matrix.GetColCount () >= userTable.KeyColumnIds .size ());
123
- ops.reserve (matrix.GetColCount () - userTable.KeyColumnIds .size ());
124
-
125
- for (ui16 valueColIdx = userTable.KeyColumnIds .size (); valueColIdx < matrix.GetColCount (); ++valueColIdx) {
126
- ui32 columnTag = validatedOperation.GetColumnIds ()[valueColIdx];
127
- const TCell& cell = matrix.GetCell (rowIdx, valueColIdx);
128
-
129
- const NScheme::TTypeId vtypeId = scheme.GetColumnInfo (tableInfo, columnTag)->PType .GetTypeId ();
130
- ops.emplace_back (columnTag, NTable::ECellOp::Set, cell.IsNull () ? TRawTypeValue () : TRawTypeValue (cell.Data (), cell.Size (), vtypeId));
131
- }
132
- };
133
-
134
- auto fillKey = [&](ui32 rowIdx, TSmallVec<TRawTypeValue>& key) {
135
- key.clear ();
136
- key.reserve (userTable.KeyColumnIds .size ());
137
- for (ui16 keyColIdx = 0 ; keyColIdx < userTable.KeyColumnIds .size (); ++keyColIdx) {
138
- const TCell& cell = matrix.GetCell (rowIdx, keyColIdx);
139
- ui32 keyCol = tableInfo->KeyColumns [keyColIdx];
140
- if (cell.IsNull ()) {
141
- key.emplace_back ();
142
- } else {
143
- NScheme::TTypeId vtypeId = scheme.GetColumnInfo (tableInfo, keyCol)->PType .GetTypeId ();
144
- key.emplace_back (cell.Data (), cell.Size (), vtypeId);
145
- }
146
- }
147
- };
148
-
149
- // Precharge
150
-
151
- switch (operationType) {
152
- case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT:
153
- case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE: {
154
- for (ui32 rowIdx = 0 ; rowIdx < matrix.GetRowCount (); ++rowIdx) {
155
- fillKey (rowIdx, key);
156
- userDb.PrechargeRow (fullTableId, key);
157
- }
158
- break ;
159
- }
160
- default :
161
- break ;
162
- }
176
+ TSmallVec<TRawTypeValue> key;
177
+ TSmallVec<NTable::TUpdateOp> ops;
163
178
164
179
// Main update cycle
165
180
166
181
for (ui32 rowIdx = 0 ; rowIdx < matrix.GetRowCount (); ++rowIdx)
167
182
{
168
- fillKey ( rowIdx, key);
183
+ FillKey (scheme, userTable, tableInfo, validatedOperation, rowIdx, key);
169
184
170
185
switch (operationType) {
171
186
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT: {
172
- fillOps ( rowIdx);
187
+ FillOps (scheme, userTable, tableInfo, validatedOperation, rowIdx, ops );
173
188
userDb.UpsertRow (fullTableId, key, ops);
174
189
break ;
175
190
}
176
191
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE: {
177
- fillOps ( rowIdx);
192
+ FillOps (scheme, userTable, tableInfo, validatedOperation, rowIdx, ops );
178
193
userDb.ReplaceRow (fullTableId, key, ops);
179
194
break ;
180
195
}
@@ -183,12 +198,12 @@ class TExecuteWriteUnit : public TExecutionUnit {
183
198
break ;
184
199
}
185
200
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT: {
186
- fillOps ( rowIdx);
201
+ FillOps (scheme, userTable, tableInfo, validatedOperation, rowIdx, ops );
187
202
userDb.InsertRow (fullTableId, key, ops);
188
203
break ;
189
204
}
190
205
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE: {
191
- fillOps ( rowIdx);
206
+ FillOps (scheme, userTable, tableInfo, validatedOperation, rowIdx, ops );
192
207
userDb.UpdateRow (fullTableId, key, ops);
193
208
break ;
194
209
}
@@ -239,6 +254,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
239
254
}
240
255
241
256
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx ();
257
+ const TValidatedWriteTxOperation* validatedOperation = nullptr ;
242
258
243
259
DataShard.ReleaseCache (*writeOp);
244
260
@@ -383,11 +399,11 @@ class TExecuteWriteUnit : public TExecutionUnit {
383
399
384
400
KqpCommitLocks (tabletId, kqpLocks, sysLocks, writeVersion, userDb);
385
401
386
- TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx ();
387
402
if (writeTx->HasOperations ()) {
388
- for (const auto & validatedOperation : writeTx->GetOperations ()) {
389
- DoUpdateToUserDb (userDb, validatedOperation, txc);
390
- LOG_DEBUG_S (ctx, NKikimrServices::TX_DATASHARD, " Executed write operation for " << *writeOp << " at " << DataShard.TabletID () << " , row count=" << validatedOperation.GetMatrix ().GetRowCount ());
403
+ for (size_t i = 0 ; i < writeTx->GetOperations ().size (); ++i) {
404
+ validatedOperation = &writeTx->GetOperations ()[i];
405
+ DoUpdateToUserDb (userDb, *validatedOperation, txc);
406
+ LOG_DEBUG_S (ctx, NKikimrServices::TX_DATASHARD, " Executed write operation for " << *writeOp << " at " << DataShard.TabletID () << " , row count=" << validatedOperation->GetMatrix ().GetRowCount ());
391
407
}
392
408
} else {
393
409
LOG_DEBUG_S (ctx, NKikimrServices::TX_DATASHARD, " Skip empty write operation for " << *writeOp << " at " << DataShard.TabletID ());
@@ -474,7 +490,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
474
490
}
475
491
return EExecutionStatus::Continue;
476
492
} catch (const TNotReadyTabletException&) {
477
- return OnTabletNotReadyException (userDb, *writeOp, txc, ctx);
493
+ return OnTabletNotReadyException (userDb, *writeOp, validatedOperation, txc, ctx);
478
494
} catch (const TLockedWriteLimitException&) {
479
495
userDb.ResetCollectedChanges ();
480
496
0 commit comments