Skip to content

Commit 5625aa4

Browse files
authored
chore: Add debug logs to help tracking transactional deadlocks (#4669)
* chore: reproduce a bug related to #4663 Add various debug logs to help tracking the deadlock. Add more assertions in helio and provide state time for fibers during stacktrace printings. --------- Signed-off-by: Roman Gershman <[email protected]>
1 parent 028e080 commit 5625aa4

9 files changed

+46
-19
lines changed

Diff for: .pre-commit-config.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
default_stages: [commit]
1+
default_stages: [pre-commit]
22
exclude: |
33
(?x)(
44
src/redis/.* |

Diff for: src/server/db_slice.cc

+2
Original file line numberDiff line numberDiff line change
@@ -898,6 +898,8 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
898898
}
899899

900900
void DbSlice::FlushDb(DbIndex db_ind) {
901+
DVLOG(1) << "Flushing db " << db_ind;
902+
901903
// clear client tracking map.
902904
client_tracking_map_.clear();
903905

Diff for: src/server/debugcmd.cc

+8-6
Original file line numberDiff line numberDiff line change
@@ -1066,12 +1066,7 @@ void DebugCmd::TxAnalysis(facade::SinkReplyBuilder* builder) {
10661066
string result;
10671067
for (unsigned i = 0; i < shard_set->size(); ++i) {
10681068
const auto& info = shard_info[i];
1069-
StrAppend(&result, "shard", i, ":\n", " tx armed ", info.tx_armed, ", total: ", info.tx_total,
1070-
",global:", info.tx_global, ",runnable:", info.tx_runnable, "\n");
1071-
StrAppend(&result, " locks total:", info.total_locks, ",contended:", info.contended_locks,
1072-
"\n");
1073-
StrAppend(&result, " max contention score: ", info.max_contention_score,
1074-
",lock_name:", info.max_contention_lock, "\n");
1069+
StrAppend(&result, "shard", i, ":\n", info.Format(), "\n");
10751070
}
10761071
auto* rb = static_cast<RedisReplyBuilder*>(builder);
10771072
rb->SendVerbatimString(result);
@@ -1114,7 +1109,14 @@ void DebugCmd::ObjHist(facade::SinkReplyBuilder* builder) {
11141109
void DebugCmd::Stacktrace(facade::SinkReplyBuilder* builder) {
11151110
fb2::Mutex m;
11161111
shard_set->pool()->AwaitFiberOnAll([&m](unsigned index, ProactorBase* base) {
1112+
EngineShard* es = EngineShard::tlocal();
1113+
string txq;
1114+
if (es) {
1115+
EngineShard::TxQueueInfo txq_info = es->AnalyzeTxQueue();
1116+
txq = txq_info.Format();
1117+
}
11171118
std::unique_lock lk(m);
1119+
LOG_IF(INFO, !txq.empty()) << "Shard" << index << ": " << txq;
11181120
fb2::detail::FiberInterface::PrintAllFiberStackTraces();
11191121
});
11201122
base::FlushLogs();

Diff for: src/server/engine_shard.cc

+21-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "server/engine_shard.h"
66

77
#include <absl/strings/match.h>
8+
#include <absl/strings/str_cat.h>
89

910
#include "base/flags.h"
1011
#include "io/proc_reader.h"
@@ -276,6 +277,25 @@ ShardId Shard(string_view v, ShardId shard_num) {
276277
return hash % shard_num;
277278
}
278279

280+
string EngineShard::TxQueueInfo::Format() const {
281+
string res;
282+
283+
if (tx_total > 0) {
284+
absl::StrAppend(&res, "tx armed ", tx_armed, ", total: ", tx_total, ",global:", tx_global,
285+
",runnable:", tx_runnable, "\n");
286+
absl::StrAppend(&res, ", head: ", head.debug_id_info, "\n");
287+
}
288+
if (total_locks > 0) {
289+
absl::StrAppend(&res, "locks total:", total_locks, ",contended:", contended_locks, "\n");
290+
}
291+
if (max_contention_score > 0) {
292+
absl::StrAppend(&res, "max contention score: ", max_contention_score,
293+
", lock: ", max_contention_lock, "\n");
294+
}
295+
296+
return res;
297+
}
298+
279299
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
280300
static_assert(sizeof(Stats) == 64);
281301

@@ -706,7 +726,7 @@ void EngineShard::RemoveContTx(Transaction* tx) {
706726
}
707727

708728
void EngineShard::Heartbeat() {
709-
DVLOG(2) << " Hearbeat";
729+
DVLOG(3) << " Hearbeat";
710730
DCHECK(namespaces);
711731

712732
CacheStats();

Diff for: src/server/engine_shard.h

+2
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ class EngineShard {
181181

182182
// We can use a vector to hold debug info for all items in the txqueue
183183
TxQueueItem head;
184+
185+
std::string Format() const;
184186
};
185187

186188
TxQueueInfo AnalyzeTxQueue() const;

Diff for: src/server/protocol_client.cc

+1
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ bool ProtocolClient::CheckRespFirstTypes(initializer_list<RespExpr::Type> types)
347347

348348
error_code ProtocolClient::SendCommand(string_view command) {
349349
string formatted_command = RedisReplyBuilderBase::SerializeCommand(command);
350+
DCHECK(sock_->proactor() == ProactorBase::me());
350351
auto ec = sock_->Write(io::Buffer(formatted_command));
351352
if (!ec)
352353
TouchIoTime();

Diff for: src/server/replica.cc

+7-3
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ Replica::~Replica() {
8787
static const char kConnErr[] = "could not connect to master: ";
8888

8989
GenericError Replica::Start() {
90-
VLOG(1) << "Starting replication";
90+
VLOG(1) << "Starting replication " << this;
9191
ProactorBase* mythread = ProactorBase::me();
9292
CHECK(mythread);
9393

@@ -138,7 +138,7 @@ void Replica::EnableReplication(facade::SinkReplyBuilder* builder) {
138138
}
139139

140140
void Replica::Stop() {
141-
VLOG(1) << "Stopping replication";
141+
VLOG(1) << "Stopping replication " << this;
142142
// Stops the loop in MainReplicationFb.
143143

144144
proactor_->Await([this] {
@@ -149,6 +149,7 @@ void Replica::Stop() {
149149
// Make sure the replica fully stopped and did all cleanup,
150150
// so we can freely release resources (connections).
151151
sync_fb_.JoinIfNeeded();
152+
DVLOG(1) << "MainReplicationFb stopped " << this;
152153
acks_fb_.JoinIfNeeded();
153154
for (auto& flow : shard_flows_) {
154155
flow.reset();
@@ -183,7 +184,7 @@ std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) {
183184
}
184185

185186
void Replica::MainReplicationFb() {
186-
VLOG(1) << "Main replication fiber started";
187+
VLOG(1) << "Main replication fiber started " << this;
187188
// Switch shard states to replication.
188189
SetShardStates(true);
189190

@@ -546,11 +547,14 @@ error_code Replica::InitiateDflySync() {
546547
std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0);
547548

548549
if (num_full_flows == num_df_flows) {
550+
DVLOG(1) << "Calling Flush on all slots " << this;
551+
549552
if (slot_range_.has_value()) {
550553
JournalExecutor{&service_}.FlushSlots(slot_range_.value());
551554
} else {
552555
JournalExecutor{&service_}.FlushAll();
553556
}
557+
DVLOG(1) << "Flush on all slots ended " << this;
554558
} else if (num_full_flows == 0) {
555559
sync_type = "partial";
556560
} else {

Diff for: src/server/server_family.cc

+1
Original file line numberDiff line numberDiff line change
@@ -2903,6 +2903,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
29032903
// If the replication attempt failed, clean up global state. The replica should have stopped
29042904
// internally.
29052905
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
2906+
29062907
// If there was an error above during Start we must not start the main replication fiber.
29072908
// However, it could be the case that Start() above connected succefully and by the time
29082909
// we acquire the lock, the context got cancelled because another ReplicaOf command

Diff for: src/server/transaction.cc

+3-8
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,14 @@ void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) {
4949
if (now >= last_log_time + 10) {
5050
last_log_time = now;
5151
EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue();
52-
string msg =
53-
StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed,
54-
", runnable:", info.tx_runnable, ", total locks: ", info.total_locks,
55-
", contended locks: ", info.contended_locks, "\n");
56-
absl::StrAppend(&msg, "max contention score: ", info.max_contention_score,
57-
", lock: ", info.max_contention_lock,
58-
", poll_executions:", shard->stats().poll_execution_total);
52+
string msg = StrCat("TxQueue is too long. ", info.Format());
53+
absl::StrAppend(&msg, "poll_executions:", shard->stats().poll_execution_total);
54+
5955
const Transaction* cont_tx = shard->GetContTx();
6056
if (cont_tx) {
6157
absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(shard->shard_id()), " ",
6258
cont_tx->DEBUG_IsArmedInShard(shard->shard_id()) ? " armed" : "");
6359
}
64-
absl::StrAppend(&msg, "\nTxQueue head debug info ", info.head.debug_id_info);
6560

6661
LOG(WARNING) << msg;
6762
}

0 commit comments

Comments
 (0)