Skip to content

Commit 54aa97d

Browse files
Late messages TEvProposeTransaction (#16216) (#16288)
1 parent 9fb5522 commit 54aa97d

File tree

2 files changed

+88
-1
lines changed

2 files changed

+88
-1
lines changed

ydb/core/persqueue/pq_impl.cpp

+26-1
Original file line numberDiff line numberDiff line change
@@ -1685,7 +1685,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvDropTablet::TPtr& ev, const TActorConte
16851685
{
16861686
PQ_LOG_D("Handle TEvPersQueue::TEvDropTablet");
16871687

1688-
auto& record = ev->Get()->Record;
1688+
const auto& record = ev->Get()->Record;
16891689
ui64 txId = record.GetTxId();
16901690

16911691
TChangeNotification stateRequest(ev->Sender, txId);
@@ -4560,6 +4560,31 @@ void TPersQueue::InitMediatorTimeCast(const TActorContext& ctx)
45604560

45614561
bool TPersQueue::AllTransactionsHaveBeenProcessed() const
45624562
{
4563+
bool existDataTx = false;
4564+
bool existPlannedConfigTx = false;
4565+
bool existUnplannedConfigTx = false;
4566+
4567+
for (const auto& [_, tx] : Txs) {
4568+
switch (tx.Kind) {
4569+
case NKikimrPQ::TTransaction::KIND_CONFIG:
4570+
((tx.Step == Max<ui64>()) ? existUnplannedConfigTx : existPlannedConfigTx) = true;
4571+
break;
4572+
case NKikimrPQ::TTransaction::KIND_DATA:
4573+
existDataTx = true;
4574+
break;
4575+
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
4576+
Y_ABORT_UNLESS(false);
4577+
}
4578+
}
4579+
4580+
if (existDataTx || existPlannedConfigTx) {
4581+
return false;
4582+
}
4583+
4584+
if (existUnplannedConfigTx) {
4585+
return true;
4586+
}
4587+
45634588
return EvProposeTransactionQueue.empty() && Txs.empty();
45644589
}
45654590

ydb/core/persqueue/ut/pqtablet_ut.cpp

+62
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,68 @@ Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture)
12591259
.Status=NKikimrPQ::TEvProposeTransactionResult::ABORTED});
12601260
}
12611261

1262+
Y_UNIT_TEST_F(DropTablet_And_UnplannedConfigTransaction, TPQTabletFixture)
1263+
{
1264+
PQTabletPrepare({.partitions=2}, {}, *Ctx);
1265+
1266+
const ui64 txId = 67890;
1267+
1268+
auto tabletConfig =
1269+
NHelpers::MakeConfig(2, {
1270+
{.Consumer="client-1", .Generation=0},
1271+
{.Consumer="client-3", .Generation=7}},
1272+
2);
1273+
1274+
SendProposeTransactionRequest({.TxId=txId,
1275+
.Configs=NHelpers::TConfigParams{
1276+
.Tablet=tabletConfig,
1277+
.Bootstrap=NHelpers::MakeBootstrapConfig(),
1278+
}});
1279+
WaitProposeTransactionResponse({.TxId=txId,
1280+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1281+
1282+
// The 'TEvDropTablet` message arrives when the transaction has not yet received a PlanStep. We know that SS
1283+
// performs no more than one operation at a time. Therefore, we believe that no one is waiting for this
1284+
// transaction anymore.
1285+
SendDropTablet({.TxId=12345});
1286+
WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=12345, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped});
1287+
}
1288+
1289+
Y_UNIT_TEST_F(DropTablet_And_PlannedConfigTransaction, TPQTabletFixture)
1290+
{
1291+
PQTabletPrepare({.partitions=2}, {}, *Ctx);
1292+
1293+
const ui64 txId = 67890;
1294+
1295+
auto tabletConfig =
1296+
NHelpers::MakeConfig(2, {
1297+
{.Consumer="client-1", .Generation=0},
1298+
{.Consumer="client-3", .Generation=7}},
1299+
2);
1300+
1301+
SendProposeTransactionRequest({.TxId=txId,
1302+
.Configs=NHelpers::TConfigParams{
1303+
.Tablet=tabletConfig,
1304+
.Bootstrap=NHelpers::MakeBootstrapConfig(),
1305+
}});
1306+
WaitProposeTransactionResponse({.TxId=txId,
1307+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1308+
1309+
SendPlanStep({.Step=100, .TxIds={txId}});
1310+
WaitPlanStepAck({.Step=100, .TxIds={txId}});
1311+
1312+
// The 'TEvDropTablet` message arrives when the transaction has already received a PlanStep.
1313+
// We will receive the response when the transaction is executed.
1314+
SendDropTablet({.TxId=12345});
1315+
1316+
WaitPlanStepAccepted({.Step=100});
1317+
1318+
WaitProposeTransactionResponse({.TxId=txId,
1319+
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
1320+
1321+
WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=12345, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped});
1322+
}
1323+
12621324
Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
12631325
{
12641326
PQTabletPrepare({.partitions=2}, {}, *Ctx);

0 commit comments

Comments
 (0)