From bf087da58e3ab54cfb98f517606f64c9cf4ba797 Mon Sep 17 00:00:00 2001 From: kostas Date: Mon, 1 Sep 2025 19:34:40 +0300 Subject: [PATCH 1/7] chore: almost mutex free replication Signed-off-by: kostas --- src/server/replica.cc | 7 -- src/server/server_family.cc | 182 +++++++++++++++------------- src/server/server_family.h | 5 +- tests/dragonfly/replication_test.py | 11 +- 4 files changed, 108 insertions(+), 97 deletions(-) diff --git a/src/server/replica.cc b/src/server/replica.cc index 1b41a19b01fa..46631b46056b 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -542,13 +542,6 @@ error_code Replica::InitiateDflySync(std::optional last_mast JoinDflyFlows(); if (sync_type == "full") { service_.RemoveLoadingState(); - } else if (service_.IsLoadingExclusively()) { - // We need this check. We originally set the state unconditionally to LOADING - // when we call ReplicaOf command. If for some reason we fail to start full sync below - // or cancel the context, we still need to switch to ACTIVE state. - // TODO(kostasrim) we can remove this once my proposed changes for replication move forward - // as the state transitions for ReplicaOf command will be much clearer. - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); } state_mask_ &= ~R_SYNCING; last_journal_LSNs_.reset(); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 7f00361e2345..8ae8bb6610f0 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -242,6 +242,11 @@ using detail::SaveStagesController; using http::StringResponse; using strings::HumanReadableNumBytes; +// Initialized by REPLICAOF +thread_local std::shared_ptr tl_replica = nullptr; +// Initialized by ADDREPLICAOF +thread_local std::vector> tl_cluster_replicas; + namespace { // TODO these should be configurable as command line flag and at runtime via config set @@ -1228,6 +1233,11 @@ void ServerFamily::Shutdown() { dfly_cmd_->Shutdown(); DebugCmd::Shutdown(); }); + + service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) { + tl_replica = nullptr; + tl_cluster_replicas.clear(); + }); } bool ServerFamily::HasPrivilegedInterface() { @@ -3130,12 +3140,15 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("psync_attempts", rinfo.psync_attempts); append("psync_successes", rinfo.psync_successes); }; - fb2::LockGuard lk(replicaof_mu_); + // Deep copy because tl_replica might be overwritten inbetween + auto replica = tl_replica; - replication_info_cb(replica_->GetSummary()); + replication_info_cb(replica->GetSummary()); + // Deep copy because tl_cluster_replicas might be overwritten inbetween + auto cluster_replicas = tl_cluster_replicas; // Special case, when multiple masters replicate to a single replica. - for (const auto& replica : cluster_replicas_) { + for (const auto& replica : cluster_replicas) { replication_info_cb(replica->GetSummary()); } } @@ -3417,7 +3430,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) } LOG(INFO) << "Add Replica " << *replicaof_args; - auto add_replica = make_unique(replicaof_args->host, replicaof_args->port, &service_, + auto add_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, master_replid(), replicaof_args->slot_range); GenericError ec = add_replica->Start(); if (ec) { @@ -3426,77 +3439,76 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) } add_replica->StartMainReplicationFiber(nullopt); cluster_replicas_.push_back(std::move(add_replica)); - cmd_cntx.rb->SendOk(); -} -void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ActionOnConnectionFail on_err) { - std::shared_ptr new_replica; - std::optional last_master_data; - { - util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time + service_.proactor_pool().AwaitFiberOnAll( + [this](auto index, auto* cntx) + ABSL_NO_THREAD_SAFETY_ANALYSIS { tl_cluster_replicas = cluster_replicas_; }); - // We should not execute replica of command while loading from snapshot. - ServerState* ss = ServerState::tlocal(); - if (ss->is_master && ss->gstate() == GlobalState::LOADING) { - builder->SendError(kLoadingErr); - return; - } - - auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); - if (!replicaof_args.has_value()) { - return; - } - - LOG(INFO) << "Replicating " << *replicaof_args; - - // If NO ONE was supplied, just stop the current replica (if it exists) - if (replicaof_args->IsReplicaOfNoOne()) { - if (!ss->is_master) { - CHECK(replica_); - - SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica - last_master_data_ = replica_->Stop(); - replica_.reset(); + cmd_cntx.rb->SendOk(); +} - StopAllClusterReplicas(); - } +void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) { + util::fb2::LockGuard lk(replicaof_mu_); + ServerState* ss = ServerState::tlocal(); - // May not switch to ACTIVE if the process is, for example, shutting down at the same time. - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); + if (!ss->is_master) { + CHECK(replica_); + // flip flag before clearing replica_ + SetMasterFlagOnAllThreads(true); + // TODO we should not allow partial sync after NO-ONE. Only after Takeover. + last_master_data_ = replica_->Stop(); + // TODO set thread locals to nullptr + replica_.reset(); + StopAllClusterReplicas(); + service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) { tl_replica = nullptr; }); + } - return builder->SendOk(); - } + // May not switch to ACTIVE if the process is, for example, shutting down at the same time. + service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - // If any replication is in progress, stop it, cancellation should kick in immediately + return builder->SendOk(); +} - if (replica_) - last_master_data = replica_->Stop(); - StopAllClusterReplicas(); +bool ServerFamily::IsDragonflyLoadingAtomic() { + util::fb2::LockGuard lk(replicaof_mu_); + ServerState* ss = ServerState::tlocal(); - const GlobalState gstate = ServerState::tlocal()->gstate(); - if (gstate == GlobalState::TAKEN_OVER) { - service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING); - } else if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); - new_state != GlobalState::LOADING) { - LOG(WARNING) << new_state << " in progress, ignored"; - builder->SendError("Invalid state"); - return; - } + return ss->is_master && ss->gstate() == GlobalState::LOADING; +} - // Create a new replica and assign it - new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, - master_replid(), replicaof_args->slot_range); +void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ActionOnConnectionFail on_err) { + std::optional last_master_data; - replica_ = new_replica; + auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); + if (!replicaof_args.has_value()) { + return; + } - // TODO: disconnect pending blocked clients (pubsub, blocking commands) - SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica + LOG(INFO) << "Initiate replication with: " << *replicaof_args; + // This is a "weak" check. For example, if the node is already a replica, + // it could be the case that one of the flows disconnects. The MainReplicationFiber + // will then loop and if it can't partial sync it will enter LOADING state because of + // full sync. Note that the fiber is not aware of the replicaof_mu_ so even + // if that mutex is locked below before any state check we can't really enforce + // that the old replication fiber won't try to full sync and update the state to LOADING. + // What is more here is that we always call `replica->Stop()`. So even if we end up in the + // scenario described, the semantics are well defined. First, cancel the old replica and + // move on with the new one. Cancelation will be slower and ReplicaOf() will + // induce higher latency -- but that's ok because it's an highly improbable flow with + // well defined semantics. + if (IsDragonflyLoadingAtomic()) { + builder->SendError(kLoadingErr); + return; + } - } // release the lock, lk.unlock() - // We proceed connecting below without the lock to allow interrupting the replica immediately. - // From this point and onward, it should be highly responsive. + // replicaof no one + if (replicaof_args->IsReplicaOfNoOne()) { + return ReplicaOfNoOne(builder); + } + auto new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, + master_replid(), replicaof_args->slot_range); GenericError ec{}; switch (on_err) { case ActionOnConnectionFail::kReturnOnError: @@ -3507,30 +3519,31 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply break; }; - // If the replication attempt failed, clean up global state. The replica should have stopped - // internally. - util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time - - // If there was an error above during Start we must not start the main replication fiber. - // However, it could be the case that Start() above connected succefully and by the time - // we acquire the lock, the context got cancelled because another ReplicaOf command - // executed and acquired the replicaof_mu_ before us. - const bool cancelled = new_replica->IsContextCancelled(); - if (ec || cancelled) { - if (replica_ == new_replica) { - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - SetMasterFlagOnAllThreads(true); - replica_.reset(); - } - builder->SendError(ec ? ec.Format() : "replication cancelled"); - return; + if (ec || new_replica->IsContextCancelled()) { + return builder->SendError(ec ? ec.Format() : "replication cancelled"); } - // Successfully connected now we flush - // If we are called by "Replicate", tx will be null but we do not need - // to flush anything. + + util::fb2::LockGuard lk(replicaof_mu_); + if (replica_) + last_master_data = replica_->Stop(); + + StopAllClusterReplicas(); + + if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER) + service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING); + + // Update thread locals. That way INFO never blocks + replica_ = new_replica; + service_.proactor_pool().AwaitFiberOnAll([new_replica](auto index, auto* context) { + tl_replica = new_replica; + tl_cluster_replicas.clear(); + }); + SetMasterFlagOnAllThreads(false); + if (on_err == ActionOnConnectionFail::kReturnOnError) { - new_replica->StartMainReplicationFiber(last_master_data); + replica_->StartMainReplicationFiber(last_master_data); } + builder->SendOk(); } @@ -3608,6 +3621,9 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) SetMasterFlagOnAllThreads(true); last_master_data_ = replica_->Stop(); replica_.reset(); + + service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* context) { tl_replica = nullptr; }); + return builder->SendOk(); } diff --git a/src/server/server_family.h b/src/server/server_family.h index d04a15f66884..980f421b2f04 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -412,6 +412,9 @@ class ServerFamily { // Set accepting_connections_ and update listners according to it void ChangeConnectionAccept(bool accept); + void ReplicaOfNoOne(SinkReplyBuilder* builder); + bool IsDragonflyLoadingAtomic(); + util::fb2::Fiber snapshot_schedule_fb_; util::fb2::Fiber load_fiber_; @@ -424,7 +427,7 @@ class ServerFamily { mutable util::fb2::Mutex replicaof_mu_, save_mu_; std::shared_ptr replica_ ABSL_GUARDED_BY(replicaof_mu_); - std::vector> cluster_replicas_ + std::vector> cluster_replicas_ ABSL_GUARDED_BY(replicaof_mu_); // used to replicating multiple nodes to single dragonfly std::unique_ptr script_mgr_; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 8a3f0aaa66ad..259cd7860b8e 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -448,9 +448,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl """ Issue 100 replication commands. This checks that the replication state machine can handle cancellation well. - We assert that at least one command was cancelled. - After we finish the 'fuzzing' part, replicate the first master and check that - all the data is correct. """ COMMANDS_TO_ISSUE = 100 @@ -484,7 +481,7 @@ async def replicate(): num_successes += await result logging.info(f"succeses: {num_successes}") - assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" + assert COMMANDS_TO_ISSUE == num_successes await wait_available_async(c_replica) capture = await seeder.capture() @@ -493,6 +490,9 @@ async def replicate(): ping_job.cancel() + replica.stop() + lines = replica.find_in_logs("Stopping replication") + """ Test flushall command. Set data to master send flashall and set more data. @@ -2967,8 +2967,7 @@ async def replicate_inside_multi(): num_successes += await result logging.info(f"succeses: {num_successes}") - assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" - assert num_successes > 0, "At least one REPLICAOF must success" + assert MULTI_COMMANDS_TO_ISSUE == num_successes async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory): From 95ca6928611a1e4769aab090d091d5431ba6c978 Mon Sep 17 00:00:00 2001 From: kostas Date: Tue, 2 Sep 2025 19:45:00 +0300 Subject: [PATCH 2/7] clean up --- src/server/server_family.cc | 27 ++++++++++++++++----------- src/server/server_family.h | 1 + tests/dragonfly/replication_test.py | 11 +++++------ 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 8ae8bb6610f0..31be63d2a072 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1234,10 +1234,7 @@ void ServerFamily::Shutdown() { DebugCmd::Shutdown(); }); - service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) { - tl_replica = nullptr; - tl_cluster_replicas.clear(); - }); + UpdateReplicationThreadLocals(nullptr); } bool ServerFamily::HasPrivilegedInterface() { @@ -3457,10 +3454,9 @@ void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) { SetMasterFlagOnAllThreads(true); // TODO we should not allow partial sync after NO-ONE. Only after Takeover. last_master_data_ = replica_->Stop(); - // TODO set thread locals to nullptr replica_.reset(); StopAllClusterReplicas(); - service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) { tl_replica = nullptr; }); + UpdateReplicationThreadLocals(nullptr); } // May not switch to ACTIVE if the process is, for example, shutting down at the same time. @@ -3523,6 +3519,11 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply return builder->SendError(ec ? ec.Format() : "replication cancelled"); } + // Critical section. + // 1. Stop the old replica_ if it exists + // 2. Update all the pointers to the new replica and update master flag + // 3. Start the main replication fiber + // 4. Send OK util::fb2::LockGuard lk(replicaof_mu_); if (replica_) last_master_data = replica_->Stop(); @@ -3534,10 +3535,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply // Update thread locals. That way INFO never blocks replica_ = new_replica; - service_.proactor_pool().AwaitFiberOnAll([new_replica](auto index, auto* context) { - tl_replica = new_replica; - tl_cluster_replicas.clear(); - }); + UpdateReplicationThreadLocals(new_replica); SetMasterFlagOnAllThreads(false); if (on_err == ActionOnConnectionFail::kReturnOnError) { @@ -3622,7 +3620,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) last_master_data_ = replica_->Stop(); replica_.reset(); - service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* context) { tl_replica = nullptr; }); + UpdateReplicationThreadLocals(nullptr); return builder->SendOk(); } @@ -3916,6 +3914,13 @@ void ServerFamily::ClientPauseCmd(CmdArgList args, SinkReplyBuilder* builder, } } +void ServerFamily::UpdateReplicationThreadLocals(std::shared_ptr repl) { + service_.proactor_pool().AwaitFiberOnAll([repl](auto index, auto* context) { + tl_replica = repl; + tl_cluster_replicas.clear(); + }); +} + #define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x)) namespace acl { diff --git a/src/server/server_family.h b/src/server/server_family.h index 980f421b2f04..db861901e55f 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -414,6 +414,7 @@ class ServerFamily { void ReplicaOfNoOne(SinkReplyBuilder* builder); bool IsDragonflyLoadingAtomic(); + void UpdateReplicationThreadLocals(std::shared_ptr repl); util::fb2::Fiber snapshot_schedule_fb_; util::fb2::Fiber load_fiber_; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 259cd7860b8e..e98c944e8d26 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -466,12 +466,8 @@ async def ping_status(): await asyncio.sleep(0.05) async def replicate(): - try: - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - return True - except redis.exceptions.ResponseError as e: - assert e.args[0] == "replication cancelled" - return False + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + return True ping_job = asyncio.create_task(ping_status()) replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)] @@ -492,6 +488,9 @@ async def replicate(): replica.stop() lines = replica.find_in_logs("Stopping replication") + # Cancelled 99 times by REPLICAOF command and once by Shutdown() because + # we stopped the instance + assert len(lines) == COMMANDS_TO_ISSUE """ From 6ed0f0cd06ee8b5ae1b9557cebd46a5497f5da91 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 3 Sep 2025 09:59:38 +0300 Subject: [PATCH 3/7] fix race condition in ProtocolClient::Socket() Signed-off-by: kostas --- src/server/protocol_client.cc | 7 +++++++ src/server/protocol_client.h | 3 ++- src/server/replica.cc | 14 +------------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index d20f73856677..8f1486286e86 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -109,11 +109,17 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) { #ifdef DFLY_USE_SSL MaybeInitSslCtx(); #endif + + // We initialize the proactor thread here such that it never races with Sock(). + // ProtocolClient is never migrated to a different thread, so this is safe. + socket_thread_ = ProactorBase::me(); } + ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) { #ifdef DFLY_USE_SSL MaybeInitSslCtx(); #endif + socket_thread_ = ProactorBase::me(); } ProtocolClient::~ProtocolClient() { @@ -162,6 +168,7 @@ error_code ProtocolClient::ResolveHostDns() { error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, ExecutionState* cntx) { ProactorBase* mythread = ProactorBase::me(); + DCHECK(mythread == socket_thread_); CHECK(mythread); { unique_lock lk(sock_mu_); diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 7e7ddda036b7..70829d688ef1 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -107,7 +107,7 @@ class ProtocolClient { } auto* Proactor() const { - return sock_->proactor(); + return socket_thread_; } util::FiberSocketBase* Sock() const { @@ -142,6 +142,7 @@ class ProtocolClient { #else void* ssl_ctx_{nullptr}; #endif + util::fb2::ProactorBase* socket_thread_; }; } // namespace dfly diff --git a/src/server/replica.cc b/src/server/replica.cc index 46631b46056b..30031fb723b5 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -1207,19 +1207,7 @@ auto Replica::GetSummary() const -> Summary { return res; }; - if (Sock()) - return Proactor()->AwaitBrief(f); - - /** - * when this branch happens: there is a very short grace period - * where Sock() is not initialized, yet the server can - * receive ROLE/INFO commands. That period happens when launching - * an instance with '--replicaof' and then immediately - * sending a command. - * - * In that instance, we have to run f() on the current fiber. - */ - return f(); + return Proactor()->AwaitBrief(f); } std::vector Replica::GetReplicaOffset() const { From 4b48d9a5f40add1a2c34a3ba1ec34c4256fd0b99 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 3 Sep 2025 11:32:35 +0300 Subject: [PATCH 4/7] one to rule them all --- src/server/replica.cc | 35 ++++++++++++++++++++++---------- src/server/server_family.cc | 40 +++++++++++++++++-------------------- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/src/server/replica.cc b/src/server/replica.cc index 30031fb723b5..a8f1a3d6b58b 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -167,9 +167,12 @@ std::optional Replica::Stop() { sync_fb_.JoinIfNeeded(); DVLOG(1) << "MainReplicationFb stopped " << this; acks_fb_.JoinIfNeeded(); - for (auto& flow : shard_flows_) { - flow.reset(); - } + + proactor_->Await([this]() { + for (auto& flow : shard_flows_) { + flow.reset(); + } + }); if (last_journal_LSNs_.has_value()) { return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()}; @@ -505,14 +508,23 @@ error_code Replica::InitiateDflySync(std::optional last_mast // Initialize MultiShardExecution. multi_shard_exe_.reset(new MultiShardExecution()); - // Initialize shard flows. - shard_flows_.resize(master_context_.num_flows); - DCHECK(!shard_flows_.empty()); - for (unsigned i = 0; i < shard_flows_.size(); ++i) { - shard_flows_[i].reset( - new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_)); - } - thread_flow_map_ = Partition(shard_flows_.size()); + // Initialize shard flows. The update to the shard_flows_ should be done by this thread. + // Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset() + // below. + decltype(shard_flows_) shard_flows_copy; + shard_flows_copy.resize(master_context_.num_flows); + DCHECK(!shard_flows_copy.empty()); + thread_flow_map_ = Partition(shard_flows_copy.size()); + const size_t pool_sz = shard_set->pool()->size(); + + shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) { + for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) { + shard_flows_copy[i].reset( + new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_)); + } + }); + // now update shard_flows on proactor thread + shard_flows_ = std::move(shard_flows_copy); // Blocked on until all flows got full sync cut. BlockingCounter sync_block{unsigned(shard_flows_.size())}; @@ -1171,6 +1183,7 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d auto Replica::GetSummary() const -> Summary { auto f = [this]() { + DCHECK(Proactor() == ProactorBase::me()); auto last_io_time = LastIoTime(); // Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 31be63d2a072..d75b7edb25e3 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1970,11 +1970,11 @@ vector ServerFamily::GetNonPriviligedListeners() const { } optional ServerFamily::GetReplicaSummary() const { - util::fb2::LockGuard lk(replicaof_mu_); - if (replica_ == nullptr) { + if (tl_replica == nullptr) { return nullopt; } else { - return replica_->GetSummary(); + auto replica = tl_replica; + return replica->GetSummary(); } } @@ -3095,15 +3095,12 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio auto add_repl_info = [&] { bool is_master = true; - // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, - // ensuring eventual consistency of is_master. When determining if the server is a replica and - // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is - // insufficient in this scenario. - // Please note that we we do not use Metrics object here. - { - fb2::LockGuard lk(replicaof_mu_); - is_master = !replica_; - } + + // Deep copy because tl_replica might be overwritten inbetween + auto replica = tl_replica; + auto cluster_replicas = tl_cluster_replicas; + is_master = !replica; + if (is_master) { vector replicas_info = dfly_cmd_->GetReplicasRoleInfo(); append("role", "master"); @@ -3137,13 +3134,9 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("psync_attempts", rinfo.psync_attempts); append("psync_successes", rinfo.psync_successes); }; - // Deep copy because tl_replica might be overwritten inbetween - auto replica = tl_replica; replication_info_cb(replica->GetSummary()); - // Deep copy because tl_cluster_replicas might be overwritten inbetween - auto cluster_replicas = tl_cluster_replicas; // Special case, when multiple masters replicate to a single replica. for (const auto& replica : cluster_replicas) { replication_info_cb(replica->GetSummary()); @@ -3721,12 +3714,11 @@ void ServerFamily::ReplConf(CmdArgList args, const CommandContext& cmd_cntx) { void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); - util::fb2::LockGuard lk(replicaof_mu_); // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, // ensuring eventual consistency of is_master. When determining if the server is a replica and // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is // insufficient in this scenario. - if (!replica_) { + if (!tl_replica) { rb->StartArray(2); rb->SendBulkString("master"); auto vec = dfly_cmd_->GetReplicasRoleInfo(); @@ -3739,7 +3731,11 @@ void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { } } else { - rb->StartArray(4 + cluster_replicas_.size() * 3); + // Deep copy because tl_replica might be overwritten inbetween + auto replica = tl_replica; + auto cluster_replicas = tl_cluster_replicas; + + rb->StartArray(4 + cluster_replicas.size() * 3); rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica"); auto send_replica_info = [rb](Replica::Summary rinfo) { @@ -3756,9 +3752,9 @@ void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { rb->SendBulkString("connecting"); } }; - send_replica_info(replica_->GetSummary()); - for (const auto& replica : cluster_replicas_) { - send_replica_info(replica->GetSummary()); + send_replica_info(replica->GetSummary()); + for (const auto& repl : cluster_replicas) { + send_replica_info(repl->GetSummary()); } } } From 7db60ebdf9a17b2e9421a58cffc30b6a89230425 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 4 Sep 2025 09:37:48 +0300 Subject: [PATCH 5/7] proper clean up semantics --- src/server/protocol_client.cc | 10 +++------- src/server/replica.cc | 2 ++ src/server/server_family.cc | 3 +-- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 8f1486286e86..07e5192073d9 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -125,13 +125,6 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov ProtocolClient::~ProtocolClient() { exec_st_.JoinErrorHandler(); - // FIXME: We should close the socket explictly outside of the destructor. This currently - // breaks test_cancel_replication_immediately. - if (sock_) { - std::error_code ec; - sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); }); - LOG_IF(ERROR, ec) << "Error closing socket " << ec; - } #ifdef DFLY_USE_SSL if (ssl_ctx_) { SSL_CTX_free(ssl_ctx_); @@ -242,6 +235,9 @@ void ProtocolClient::CloseSocket() { auto ec = sock_->Shutdown(SHUT_RDWR); LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; } + error_code ec = sock_->Close(); // Quietly close. + + LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message(); }); } } diff --git a/src/server/replica.cc b/src/server/replica.cc index a8f1a3d6b58b..56be8af11f54 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -172,6 +172,7 @@ std::optional Replica::Stop() { for (auto& flow : shard_flows_) { flow.reset(); } + shard_flows_.clear(); }); if (last_journal_LSNs_.has_value()) { @@ -1227,6 +1228,7 @@ std::vector Replica::GetReplicaOffset() const { std::vector flow_rec_count; flow_rec_count.resize(shard_flows_.size()); for (const auto& flow : shard_flows_) { + DCHECK(flow.get()); uint32_t flow_id = flow->FlowId(); uint64_t rec_count = flow->JournalExecutedCount(); DCHECK_LT(flow_id, shard_flows_.size()); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index d75b7edb25e3..02d37a7705c3 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1229,12 +1229,11 @@ void ServerFamily::Shutdown() { replica_->Stop(); } StopAllClusterReplicas(); + UpdateReplicationThreadLocals(nullptr); dfly_cmd_->Shutdown(); DebugCmd::Shutdown(); }); - - UpdateReplicationThreadLocals(nullptr); } bool ServerFamily::HasPrivilegedInterface() { From 6f06bba00fc809ee9714c8fcaef8e2936b51b110 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 4 Sep 2025 10:54:19 +0300 Subject: [PATCH 6/7] cluster: do not close socket on destructor --- src/server/cluster/outgoing_slot_migration.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index dd6ea922f789..46a09c64d7bb 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -166,6 +166,7 @@ void OutgoingMigration::Finish(const GenericError& error) { switch (state_) { case MigrationState::C_FATAL: case MigrationState::C_FINISHED: + CloseSocket(); return; // Already finished, nothing else to do case MigrationState::C_CONNECTING: @@ -192,6 +193,9 @@ void OutgoingMigration::Finish(const GenericError& error) { }); exec_st_.JoinErrorHandler(); } + + // Close socket for clean disconnect. + CloseSocket(); } MigrationState OutgoingMigration::GetState() const { From f932141d06c6119bfcf53b0b09fd26c84607dc85 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 4 Sep 2025 12:19:42 +0300 Subject: [PATCH 7/7] cluster socket! --- src/server/cluster/outgoing_slot_migration.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 46a09c64d7bb..05972685561d 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -318,6 +318,7 @@ void OutgoingMigration::SyncFb() { } VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString(); + CloseSocket(); } bool OutgoingMigration::FinalizeMigration(long attempt) {