@@ -242,6 +242,11 @@ using detail::SaveStagesController;
242242using  http::StringResponse;
243243using  strings::HumanReadableNumBytes;
244244
245+ //  Initialized by REPLICAOF
246+ thread_local  std::shared_ptr<Replica> tl_replica = nullptr ;
247+ //  Initialized by ADDREPLICAOF
248+ thread_local  std::vector<std::shared_ptr<Replica>> tl_cluster_replicas;
249+ 
245250namespace  {
246251
247252//  TODO these should be configurable as command line flag and at runtime via config set
@@ -1228,6 +1233,11 @@ void ServerFamily::Shutdown() {
12281233    dfly_cmd_->Shutdown ();
12291234    DebugCmd::Shutdown ();
12301235  });
1236+ 
1237+   service_.proactor_pool ().AwaitFiberOnAll ([](auto  index, auto * cntx) {
1238+     tl_replica = nullptr ;
1239+     tl_cluster_replicas.clear ();
1240+   });
12311241}
12321242
12331243bool  ServerFamily::HasPrivilegedInterface () {
@@ -3130,12 +3140,15 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
31303140        append (" psync_attempts" psync_attempts );
31313141        append (" psync_successes" psync_successes );
31323142      };
3133-       fb2::LockGuard lk (replicaof_mu_);
3143+       //  Deep copy because tl_replica might be overwritten inbetween
3144+       auto  replica = tl_replica;
31343145
3135-       replication_info_cb (replica_ ->GetSummary ());
3146+       replication_info_cb (replica ->GetSummary ());
31363147
3148+       //  Deep copy because tl_cluster_replicas might be overwritten inbetween
3149+       auto  cluster_replicas = tl_cluster_replicas;
31373150      //  Special case, when multiple masters replicate to a single replica.
3138-       for  (const  auto & replica : cluster_replicas_ ) {
3151+       for  (const  auto & replica : cluster_replicas ) {
31393152        replication_info_cb (replica->GetSummary ());
31403153      }
31413154    }
@@ -3417,7 +3430,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
34173430  }
34183431  LOG (INFO) << " Add Replica " 
34193432
3420-   auto  add_replica = make_unique <Replica>(replicaof_args->host , replicaof_args->port , &service_,
3433+   auto  add_replica = make_shared <Replica>(replicaof_args->host , replicaof_args->port , &service_,
34213434                                          master_replid (), replicaof_args->slot_range );
34223435  GenericError ec = add_replica->Start ();
34233436  if  (ec) {
@@ -3426,77 +3439,76 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
34263439  }
34273440  add_replica->StartMainReplicationFiber (nullopt );
34283441  cluster_replicas_.push_back (std::move (add_replica));
3429-   cmd_cntx.rb ->SendOk ();
3430- }
34313442
3432- void  ServerFamily::ReplicaOfInternal (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3433-                                      ActionOnConnectionFail on_err) {
3434-   std::shared_ptr<Replica> new_replica;
3435-   std::optional<Replica::LastMasterSyncData> last_master_data;
3436-   {
3437-     util::fb2::LockGuard lk (replicaof_mu_);  //  Only one REPLICAOF command can run at a time
3443+   service_.proactor_pool ().AwaitFiberOnAll (
3444+       [this ](auto  index, auto * cntx)
3445+           ABSL_NO_THREAD_SAFETY_ANALYSIS { tl_cluster_replicas = cluster_replicas_; });
34383446
3439-     //  We should not execute replica of command while loading from snapshot.
3440-     ServerState* ss = ServerState::tlocal ();
3441-     if  (ss->is_master  && ss->gstate () == GlobalState::LOADING) {
3442-       builder->SendError (kLoadingErr );
3443-       return ;
3444-     }
3445- 
3446-     auto  replicaof_args = ReplicaOfArgs::FromCmdArgs (args, builder);
3447-     if  (!replicaof_args.has_value ()) {
3448-       return ;
3449-     }
3450- 
3451-     LOG (INFO) << " Replicating " 
3452- 
3453-     //  If NO ONE was supplied, just stop the current replica (if it exists)
3454-     if  (replicaof_args->IsReplicaOfNoOne ()) {
3455-       if  (!ss->is_master ) {
3456-         CHECK (replica_);
3457- 
3458-         SetMasterFlagOnAllThreads (true );  //  Flip flag before clearing replica
3459-         last_master_data_ = replica_->Stop ();
3460-         replica_.reset ();
3447+   cmd_cntx.rb ->SendOk ();
3448+ }
34613449
3462-         StopAllClusterReplicas ();
3463-       }
3450+ void  ServerFamily::ReplicaOfNoOne (SinkReplyBuilder* builder) {
3451+   util::fb2::LockGuard lk (replicaof_mu_);
3452+   ServerState* ss = ServerState::tlocal ();
34643453
3465-       //  May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3466-       service_.SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3454+   if  (!ss->is_master ) {
3455+     CHECK (replica_);
3456+     //  flip flag before clearing replica_
3457+     SetMasterFlagOnAllThreads (true );
3458+     //  TODO we should not allow partial sync after NO-ONE. Only after Takeover.
3459+     last_master_data_ = replica_->Stop ();
3460+     //  TODO set thread locals to nullptr
3461+     replica_.reset ();
3462+     StopAllClusterReplicas ();
3463+     service_.proactor_pool ().AwaitFiberOnAll ([](auto  index, auto * cntx) { tl_replica = nullptr ; });
3464+   }
34673465
3468-        return  builder-> SendOk (); 
3469-     } 
3466+   //  May not switch to ACTIVE if the process is, for example, shutting down at the same time. 
3467+   service_. SwitchState (GlobalState::LOADING, GlobalState::ACTIVE); 
34703468
3471-     //  If any replication is in progress, stop it, cancellation should kick in immediately
3469+   return  builder->SendOk ();
3470+ }
34723471
3473-      if  (replica_) 
3474-       last_master_data = replica_-> Stop ( );
3475-      StopAllClusterReplicas ();
3472+ bool   ServerFamily::IsDragonflyLoadingAtomic () { 
3473+   util::fb2::LockGuard  lk (replicaof_mu_ );
3474+   ServerState* ss =  ServerState::tlocal ();
34763475
3477-     const  GlobalState gstate = ServerState::tlocal ()->gstate ();
3478-     if  (gstate == GlobalState::TAKEN_OVER) {
3479-       service_.SwitchState (GlobalState::TAKEN_OVER, GlobalState::LOADING);
3480-     } else  if  (auto  new_state = service_.SwitchState (GlobalState::ACTIVE, GlobalState::LOADING);
3481-                new_state != GlobalState::LOADING) {
3482-       LOG (WARNING) << new_state << "  in progress, ignored" 
3483-       builder->SendError (" Invalid state" 
3484-       return ;
3485-     }
3476+   return  ss->is_master  && ss->gstate () == GlobalState::LOADING;
3477+ }
34863478
3487-      //  Create a new replica and assign it 
3488-     new_replica = make_shared<Replica>(replicaof_args-> host , replicaof_args-> port , &service_, 
3489-                                         master_replid (), replicaof_args-> slot_range ) ;
3479+ void   ServerFamily::ReplicaOfInternal (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, 
3480+                                      ActionOnConnectionFail on_err) { 
3481+   std::optional<Replica::LastMasterSyncData> last_master_data ;
34903482
3491-     replica_ = new_replica;
3483+   auto  replicaof_args = ReplicaOfArgs::FromCmdArgs (args, builder);
3484+   if  (!replicaof_args.has_value ()) {
3485+     return ;
3486+   }
34923487
3493-     //  TODO: disconnect pending blocked clients (pubsub, blocking commands)
3494-     SetMasterFlagOnAllThreads (false );  //  Flip flag after assiging replica
3488+   LOG (INFO) << " Initiate replication with: " 
3489+   //  This is a "weak" check. For example, if the node is already a replica,
3490+   //  it could be the case that one of the flows disconnects. The MainReplicationFiber
3491+   //  will then loop and if it can't partial sync it will enter LOADING state because of
3492+   //  full sync. Note that the fiber is not aware of the replicaof_mu_ so even
3493+   //  if that mutex is locked below before any state check we can't really enforce
3494+   //  that the old replication fiber won't try to full sync and update the state to LOADING.
3495+   //  What is more here is that we always call `replica->Stop()`. So even if we end up in the
3496+   //  scenario described, the semantics are well defined. First, cancel the old replica and
3497+   //  move on with the new one. Cancelation will be slower and ReplicaOf() will
3498+   //  induce higher latency -- but that's ok because it's an highly improbable flow with
3499+   //  well defined semantics.
3500+   if  (IsDragonflyLoadingAtomic ()) {
3501+     builder->SendError (kLoadingErr );
3502+     return ;
3503+   }
34953504
3496-   }  //  release the lock, lk.unlock()
3497-   //  We proceed connecting below without the lock to allow interrupting the replica immediately.
3498-   //  From this point and onward, it should be highly responsive.
3505+   //  replicaof no one
3506+   if  (replicaof_args->IsReplicaOfNoOne ()) {
3507+     return  ReplicaOfNoOne (builder);
3508+   }
34993509
3510+   auto  new_replica = make_shared<Replica>(replicaof_args->host , replicaof_args->port , &service_,
3511+                                           master_replid (), replicaof_args->slot_range );
35003512  GenericError ec{};
35013513  switch  (on_err) {
35023514    case  ActionOnConnectionFail::kReturnOnError :
@@ -3507,30 +3519,31 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
35073519      break ;
35083520  };
35093521
3510-   //  If the replication attempt failed, clean up global state. The replica should have stopped
3511-   //  internally.
3512-   util::fb2::LockGuard lk (replicaof_mu_);  //  Only one REPLICAOF command can run at a time
3513- 
3514-   //  If there was an error above during Start we must not start the main replication fiber.
3515-   //  However, it could be the case that Start() above connected succefully and by the time
3516-   //  we acquire the lock, the context got cancelled because another ReplicaOf command
3517-   //  executed and acquired the replicaof_mu_ before us.
3518-   const  bool  cancelled = new_replica->IsContextCancelled ();
3519-   if  (ec || cancelled) {
3520-     if  (replica_ == new_replica) {
3521-       service_.SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3522-       SetMasterFlagOnAllThreads (true );
3523-       replica_.reset ();
3524-     }
3525-     builder->SendError (ec ? ec.Format () : " replication cancelled" 
3526-     return ;
3522+   if  (ec) {
3523+     return  builder->SendError (ec.Format ());
35273524  }
3528-   //  Successfully connected now we flush
3529-   //  If we are called by "Replicate", tx will be null but we do not need
3530-   //  to flush anything.
3525+ 
3526+   util::fb2::LockGuard lk (replicaof_mu_);
3527+   if  (replica_)
3528+     last_master_data = replica_->Stop ();
3529+ 
3530+   StopAllClusterReplicas ();
3531+ 
3532+   if  (ServerState::tlocal ()->gstate () == GlobalState::TAKEN_OVER)
3533+     service_.SwitchState (GlobalState::TAKEN_OVER, GlobalState::LOADING);
3534+ 
3535+   //  Update thread locals. That way INFO never blocks
3536+   replica_ = new_replica;
3537+   service_.proactor_pool ().AwaitFiberOnAll ([new_replica](auto  index, auto * context) {
3538+     tl_replica = new_replica;
3539+     tl_cluster_replicas.clear ();
3540+   });
3541+   SetMasterFlagOnAllThreads (false );
3542+ 
35313543  if  (on_err == ActionOnConnectionFail::kReturnOnError ) {
3532-     new_replica ->StartMainReplicationFiber (last_master_data);
3544+     replica_ ->StartMainReplicationFiber (last_master_data);
35333545  }
3546+ 
35343547  builder->SendOk ();
35353548}
35363549
@@ -3608,6 +3621,9 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
36083621  SetMasterFlagOnAllThreads (true );
36093622  last_master_data_ = replica_->Stop ();
36103623  replica_.reset ();
3624+ 
3625+   service_.proactor_pool ().AwaitFiberOnAll ([](auto  index, auto * context) { tl_replica = nullptr ; });
3626+ 
36113627  return  builder->SendOk ();
36123628}
36133629
0 commit comments