diff --git a/cloud/blockstore/libs/client/session.cpp b/cloud/blockstore/libs/client/session.cpp index b728eeaa606..d5fd2aeb630 100644 --- a/cloud/blockstore/libs/client/session.cpp +++ b/cloud/blockstore/libs/client/session.cpp @@ -8,6 +8,7 @@ #include #include #include + #include #include #include diff --git a/cloud/blockstore/libs/client/switchable_client.cpp b/cloud/blockstore/libs/client/switchable_client.cpp index 2819f04ee1b..9d6a4d9fa08 100644 --- a/cloud/blockstore/libs/client/switchable_client.cpp +++ b/cloud/blockstore/libs/client/switchable_client.cpp @@ -1,9 +1,9 @@ #include "switchable_client.h" #include -#include #include +#include #include #include @@ -43,6 +43,11 @@ class TDeferredRequestsHolder TVector Requests; public: + ~TDeferredRequestsHolder() + { + Y_DEBUG_ABORT_UNLESS(Requests.empty()); + } + TFuture SaveRequest( TCallContextPtr callContext, std::shared_ptr request) @@ -97,8 +102,8 @@ class TSwitchableBlockStore final { private: TLog Log; - - TClientInfo PrimaryClientInfo; + const ISessionSwitcherWeakPtr SessionSwitcher; + const TClientInfo PrimaryClientInfo; TClientInfo SecondaryClientInfo; // BeforeSwitching() sets the WillSwitchToSecondary to true. After that, @@ -118,9 +123,11 @@ class TSwitchableBlockStore final public: TSwitchableBlockStore( ILoggingServicePtr logging, + ISessionSwitcherWeakPtr sessionSwitcher, TString diskId, IBlockStorePtr client) : Log(logging->CreateLog("BLOCKSTORE_CLIENT")) + , SessionSwitcher(std::move(sessionSwitcher)) , PrimaryClientInfo( {.Client = std::move(client), .DiskId = std::move(diskId), @@ -209,6 +216,11 @@ class TSwitchableBlockStore final std::move(callContext), std::move(request)); } + if constexpr (TMethod::IsMountRequest()) { + return ExecuteMountRequest( + std::move(callContext), + std::move(request)); + } return TMethod::Execute( PrimaryClientInfo.Client.get(), std::move(callContext), @@ -267,6 +279,36 @@ class TSwitchableBlockStore final std::move(callContext), std::move(request)); } + + TFuture ExecuteMountRequest( + TCallContextPtr callContext, + std::shared_ptr request) + { + TFuture future = + PrimaryClientInfo.Client->MountVolume( + std::move(callContext), + std::move(request)); + + return future.Apply( + [sessionSwitcher = SessionSwitcher] // + (TFuture future) + -> NProto::TMountVolumeResponse + { + NProto::TMountVolumeResponse response = future.ExtractValue(); + + if (!HasError(response) && + response.GetVolume().GetPrincipalDiskId()) + { + if (auto switcher = sessionSwitcher.lock()) { + switcher->SwitchSession( + response.GetVolume().GetDiskId(), + response.GetVolume().GetPrincipalDiskId()); + } + } + + return response; + }); + } }; } // namespace @@ -295,11 +337,13 @@ class TSessionSwitchingGuard ISwitchableBlockStorePtr CreateSwitchableClient( ILoggingServicePtr logging, + ISessionSwitcherWeakPtr sessionSwitcher, TString diskId, IBlockStorePtr client) { return std::make_shared( std::move(logging), + std::move(sessionSwitcher), std::move(diskId), std::move(client)); } diff --git a/cloud/blockstore/libs/client/switchable_client.h b/cloud/blockstore/libs/client/switchable_client.h index 439233faf57..7202dc6f2f3 100644 --- a/cloud/blockstore/libs/client/switchable_client.h +++ b/cloud/blockstore/libs/client/switchable_client.h @@ -34,6 +34,17 @@ struct ISwitchableBlockStore: public IBlockStore //////////////////////////////////////////////////////////////////////////////// +struct ISessionSwitcher +{ + virtual ~ISessionSwitcher() = default; + + virtual void SwitchSession( + const TString& diskId, + const TString& newDiskId) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + // ISwitchableBlockStore is used for dynamic switching between clients. By // default, all requests are sent to the first client. After switching to the // second client using the Switch() method, the read/write/zero requests start @@ -45,6 +56,7 @@ struct ISwitchableBlockStore: public IBlockStore ISwitchableBlockStorePtr CreateSwitchableClient( ILoggingServicePtr logging, + ISessionSwitcherWeakPtr sessionSwitcher, TString diskId, IBlockStorePtr client); diff --git a/cloud/blockstore/libs/client/switchable_client_ut.cpp b/cloud/blockstore/libs/client/switchable_client_ut.cpp index 7ae8315d6a5..66fbb4f38f2 100644 --- a/cloud/blockstore/libs/client/switchable_client_ut.cpp +++ b/cloud/blockstore/libs/client/switchable_client_ut.cpp @@ -2,7 +2,9 @@ #include #include +#include +#include #include #include @@ -32,6 +34,20 @@ concept HasSessionId = requires(T& obj) { { obj.GetSessionId() } -> std::convertible_to; }; +struct TTestSessionSwitcher + : public ISessionSwitcher + , public std::enable_shared_from_this +{ + TString DiskId; + TString NewDiskId; + + void SwitchSession(const TString& diskId, const TString& newDiskId) override + { + DiskId = diskId; + NewDiskId = newDiskId; + } +}; + template class TTestMethod { @@ -201,9 +217,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest) { auto client1 = std::make_shared(); auto client2 = std::make_shared(); + auto sessionSwitcher = std::make_shared(); auto switchableClient = CreateSwitchableClient( CreateLoggingService("console"), + sessionSwitcher, PrimaryDiskId, client1); @@ -249,9 +267,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest) { auto client1 = std::make_shared(); auto client2 = std::make_shared(); + auto sessionSwitcher = std::make_shared(); auto switchableClient = CreateSwitchableClient( CreateLoggingService("console"), + sessionSwitcher, PrimaryDiskId, client1); @@ -335,9 +355,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest) { auto client1 = std::make_shared(); auto client2 = std::make_shared(); + auto sessionSwitcher = std::make_shared(); auto switchableClient = CreateSwitchableClient( CreateLoggingService("console"), + sessionSwitcher, PrimaryDiskId, client1); @@ -390,9 +412,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest) { auto client1 = std::make_shared(); auto client2 = std::make_shared(); + auto sessionSwitcher = std::make_shared(); auto switchableClient = CreateSwitchableClient( CreateLoggingService("console"), + sessionSwitcher, PrimaryDiskId, client1); @@ -459,6 +483,69 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest) switchableClient, EHandledOn::Primary); } + + Y_UNIT_TEST(ShouldSwitchSessionIfMountResponseContainsPrincipalDiskId) + { + auto client1 = std::make_shared(); + auto sessionSwitcher = std::make_shared(); + + auto switchableClient = CreateSwitchableClient( + CreateLoggingService("console"), + sessionSwitcher, + PrimaryDiskId, + client1); + + size_t sessionNum = 0; + bool setPrincipalDisk = false; + client1->MountVolumeHandler = + [&](std::shared_ptr request) + { + UNIT_ASSERT_EQUAL(request->GetDiskId(), PrimaryDiskId); + + NProto::TMountVolumeResponse response; + response.SetSessionId(ToString(++sessionNum)); + + auto& volume = *response.MutableVolume(); + volume.SetDiskId(request->GetDiskId()); + volume.SetBlockSize(DefaultBlockSize); + volume.SetBlocksCount(100); + if (setPrincipalDisk) { + volume.SetPrincipalDiskId( + NStorage::GetNextDiskId(request->GetDiskId())); + } + + return MakeFuture(response); + }; + + { // If the mount response does not contain the PrincipalDiskId, then + // the call SwitchSession of ISessionSwitcher should not occur. + TCallContextPtr callContext = MakeIntrusive(); + auto request = std::make_shared(); + request->SetDiskId(PrimaryDiskId); + auto future = + switchableClient->MountVolume(callContext, std::move(request)); + const auto& response = future.GetValue(); + UNIT_ASSERT(!HasError(response)); + UNIT_ASSERT_VALUES_EQUAL("", sessionSwitcher->DiskId); + UNIT_ASSERT_VALUES_EQUAL("", sessionSwitcher->NewDiskId); + } + + { // If the mount response contains the PrincipalDiskId, then call + // SwitchSession of ISessionSwitcher should occur. + setPrincipalDisk = true; + TCallContextPtr callContext = MakeIntrusive(); + auto request = std::make_shared(); + request->SetDiskId(PrimaryDiskId); + auto future = + switchableClient->MountVolume(callContext, std::move(request)); + const auto& response = future.GetValue(); + UNIT_ASSERT(!HasError(response)); + UNIT_ASSERT_VALUES_EQUAL(PrimaryDiskId, sessionSwitcher->DiskId); + UNIT_ASSERT_VALUES_EQUAL( + NStorage::GetNextDiskId(PrimaryDiskId), + sessionSwitcher->NewDiskId); + } + } } } // namespace NCloud::NBlockStore::NClient diff --git a/cloud/blockstore/libs/client/switchable_session_ut.cpp b/cloud/blockstore/libs/client/switchable_session_ut.cpp index af0e5da1f07..42512be648c 100644 --- a/cloud/blockstore/libs/client/switchable_session_ut.cpp +++ b/cloud/blockstore/libs/client/switchable_session_ut.cpp @@ -238,7 +238,11 @@ TTestSession CreateTestSession(const TString& diskId, const TString& sessionId) }; } - auto switchableClient = CreateSwitchableClient(logging, diskId, client); + auto switchableClient = CreateSwitchableClient( + logging, + ISessionSwitcherWeakPtr(), + diskId, + client); auto durable = CreateDurableClient( config, diff --git a/cloud/blockstore/libs/client/ya.make b/cloud/blockstore/libs/client/ya.make index a357648e187..247b53588ce 100644 --- a/cloud/blockstore/libs/client/ya.make +++ b/cloud/blockstore/libs/client/ya.make @@ -1,5 +1,7 @@ LIBRARY() +INCLUDE(${ARCADIA_ROOT}/cloud/storage/deny_ydb_dependency.inc) + SRCS( client.cpp config.cpp @@ -20,6 +22,7 @@ PEERDIR( cloud/blockstore/libs/common cloud/blockstore/libs/diagnostics cloud/blockstore/libs/service + cloud/blockstore/libs/storage/model cloud/blockstore/libs/throttling cloud/storage/core/libs/grpc cloud/storage/core/libs/throttling diff --git a/cloud/blockstore/libs/endpoints/session_manager.cpp b/cloud/blockstore/libs/endpoints/session_manager.cpp index 86f66b5b3d4..0c181342ba8 100644 --- a/cloud/blockstore/libs/endpoints/session_manager.cpp +++ b/cloud/blockstore/libs/endpoints/session_manager.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -16,6 +18,7 @@ #include #include #include +#include #include #include @@ -28,6 +31,8 @@ #include #include +#include + namespace NCloud::NBlockStore::NServer { using namespace NClient; @@ -41,26 +46,38 @@ class TEndpoint { private: TExecutor& Executor; + ISwitchableSessionPtr SwitchableSession; const ISessionPtr Session; const IBlockStorePtr DataClient; + const ISwitchableBlockStorePtr SwitchableDataClient; const IThrottlerProviderPtr ThrottlerProvider; const TString ClientId; const TString DiskId; + const NProto::TStartEndpointRequest StartRequest; + + std::weak_ptr SwitchingGuard; + TString SessionId; public: TEndpoint( TExecutor& executor, + ISwitchableSessionPtr switchableSession, ISessionPtr session, IBlockStorePtr dataClient, + ISwitchableBlockStorePtr switchableDataClient, IThrottlerProviderPtr throttlerProvider, TString clientId, - TString diskId) + TString diskId, + NProto::TStartEndpointRequest startRequest) : Executor(executor) + , SwitchableSession(std::move(switchableSession)) , Session(std::move(session)) , DataClient(std::move(dataClient)) + , SwitchableDataClient(std::move(switchableDataClient)) , ThrottlerProvider(std::move(throttlerProvider)) , ClientId(std::move(clientId)) , DiskId(std::move(diskId)) + , StartRequest(std::move(startRequest)) {} NProto::TError Start(TCallContextPtr callContext, NProto::THeaders headers) @@ -73,6 +90,8 @@ class TEndpoint if (HasError(response)) { DataClient->Stop(); + } else { + SessionId = response.GetSessionId(); } return response.GetError(); @@ -106,20 +125,54 @@ class TEndpoint return response.GetError(); } - ISessionPtr GetSession() + ISessionPtr GetSession() const { - return Session; + // For consumers, we return the top-level session, which is the + // SwitchableSession. + return SwitchableSession; } - TString GetDiskId() + TString GetDiskId() const { return DiskId; } - NProto::TClientPerformanceProfile GetPerformanceProfile() + NProto::TClientPerformanceProfile GetPerformanceProfile() const { return ThrottlerProvider->GetPerformanceProfile(ClientId); } + + NProto::TStartEndpointRequest GetStartRequest() const + { + return StartRequest; + } + + TFuture SwitchSession(TEndpoint& oldEndpoint) + { + SwitchableSession = std::move(oldEndpoint.SwitchableSession); + + return SwitchableSession->SwitchSession( + GetDiskId(), + SessionId, + Session, + SwitchableDataClient); + } + + // The acquire of the session switching guard must be performed under + // EndpointLock. + // If nullptr is returned, then the session switch is already in progress. + // The caller must keep the guard at all times while the session is being + // switched. + [[nodiscard]] TSessionSwitchingGuardPtr AcquireSwitchingGuard() + { + if (SwitchingGuard.lock()) { + // Session switch is already in progress. + return {}; + } + auto result = CreateSessionSwitchingGuard(SwitchableDataClient); + SwitchingGuard = result; + return result; + } }; using TEndpointPtr = std::shared_ptr; @@ -301,6 +354,7 @@ class TStorageDataClient final class TSessionManager final : public ISessionManager + , public ISessionSwitcher , public std::enable_shared_from_this { private: @@ -358,6 +412,7 @@ class TSessionManager final Log = Logging->CreateLog("BLOCKSTORE_SERVER"); } + // implementation ISessionManager TFuture CreateSession( TCallContextPtr callContext, const NProto::TStartEndpointRequest& request) override; @@ -383,10 +438,15 @@ class TSessionManager final TResultOrError GetProfile( const TString& socketPath) override; + // implementation ISessionSwitcher + void SwitchSession( + const TString& diskId, + const TString& newDiskId) override; + private: TSessionOrError CreateSessionImpl( TCallContextPtr callContext, - const NProto::TStartEndpointRequest& request); + NProto::TStartEndpointRequest request); NProto::TError RemoveSessionImpl( TCallContextPtr callContext, @@ -417,22 +477,32 @@ class TSessionManager final const TString& cellId); TClientAppConfigPtr CreateClientConfig( - const NProto::TStartEndpointRequest& request); + const NProto::TStartEndpointRequest& request) const; TClientAppConfigPtr CreateClientConfig( const NProto::TStartEndpointRequest& request, TString host, - ui32 port); + ui32 port) const; TResultOrError CreateStorageDataClient( const TString& cellId, const TClientAppConfigPtr& clientConfig, const NProto::TVolume& volume, const TString& clientId, - NProto::EVolumeAccessMode accessMode); + NProto::EVolumeAccessMode accessMode) const; static TSessionConfig CreateSessionConfig( const NProto::TStartEndpointRequest& request); + + void SwitchSessionForEndpoint( + const TString& socketPath, + TEndpointPtr endpoint, + const TString& diskId, + const TString& newDiskId); + void StopEndpoint(TCallContextPtr callContext, TEndpointPtr oldEndpoint); + void StopEndpointImpl( + TCallContextPtr callContext, + TEndpointPtr oldEndpoint); }; //////////////////////////////////////////////////////////////////////////////// @@ -451,7 +521,7 @@ TFuture TSessionManager::CreateSession( TSessionManager::TSessionOrError TSessionManager::CreateSessionImpl( TCallContextPtr callContext, - const NProto::TStartEndpointRequest& request) + NProto::TStartEndpointRequest request) { auto describeResponse = DescribeVolume( callContext, @@ -463,6 +533,15 @@ TSessionManager::TSessionOrError TSessionManager::CreateSessionImpl( const auto& volume = describeResponse.GetVolume(); const auto& cellId = describeResponse.GetCellId(); + if (volume.GetDiskId() != request.GetDiskId()) { + // The original volume no longer exists. Use principal volume instead. + request.SetDiskId(volume.GetDiskId()); + } else if (volume.GetPrincipalDiskId()) { + // The original volume has lost leadership. Use principal volume instead. + request.SetDiskId(volume.GetPrincipalDiskId()); + return CreateSessionImpl(std::move(callContext), std::move(request)); + } + auto result = CreateEndpoint(request, volume, cellId); if (HasError(result)) { return TErrorResponse(result.GetError()); @@ -670,12 +749,59 @@ TResultOrError TSessionManager::GetProfile( return endpoint->GetPerformanceProfile(); } +void TSessionManager::SwitchSession( + const TString& diskId, + const TString& newDiskId) +{ + bool endpointFound = false; + with_lock (EndpointLock) { + for (const auto& [socketPath, endpoint]: Endpoints) { + if (endpoint->GetDiskId() != diskId) { + continue; + } + endpointFound = true; + + auto switchingGuard = endpoint->AcquireSwitchingGuard(); + if (!switchingGuard) { + STORAGE_INFO( + "Session swithing in progress: " << diskId.Quote() << " -> " + << newDiskId.Quote()); + continue; + } + + Executor->Execute( + [socketPath = socketPath, + endpoint = endpoint, + switchingGuard = std::move(switchingGuard), + diskId = diskId, + newDiskId = newDiskId, + weakSelf = weak_from_this()]() mutable + { + if (auto self = weakSelf.lock()) { + self->SwitchSessionForEndpoint( + socketPath, + std::move(endpoint), + diskId, + newDiskId); + } + switchingGuard.reset(); + }); + } + } + + if (!endpointFound) { + STORAGE_WARN( + "Session for " << diskId.Quote() << " not found. Can't switch to " + << newDiskId.Quote()); + } +} + TResultOrError TSessionManager::CreateStorageDataClient( const TString& cellId, const TClientAppConfigPtr& clientConfig, const NProto::TVolume& volume, const TString& clientId, - NProto::EVolumeAccessMode accessMode) + NProto::EVolumeAccessMode accessMode) const { auto service = Service; IStoragePtr storage; @@ -716,9 +842,6 @@ TResultOrError TSessionManager::CreateEndpoint( const auto& clientId = request.GetClientId(); auto accessMode = request.GetVolumeAccessMode(); - auto service = Service; - IStoragePtr storage; - auto clientConfig = CreateClientConfig(request); auto [client, error] = CreateStorageDataClient( @@ -732,6 +855,13 @@ TResultOrError TSessionManager::CreateEndpoint( return error; } + auto switchableClient = CreateSwitchableClient( + Logging, + weak_from_this(), + volume.GetDiskId(), + std::move(client)); + client = switchableClient; + if (Options.TemporaryServer) { client = CreateErrorTransformService( std::move(client), @@ -773,7 +903,7 @@ TResultOrError TSessionManager::CreateEndpoint( auto encryptionFuture = EncryptionClientFactory->CreateEncryptionClient( std::move(client), request.GetEncryptionSpec(), - request.GetDiskId()); + NStorage::GetLogicalDiskId(request.GetDiskId())); const auto& clientOrError = Executor->WaitFor(encryptionFuture); if (HasError(clientOrError)) { @@ -815,19 +945,29 @@ TResultOrError TSessionManager::CreateEndpoint( std::move(clientConfig), CreateSessionConfig(request)); + auto switchableSession = CreateSwitchableSession( + Logging, + Scheduler, + volume.GetDiskId(), + session, + switchableClient); + return std::make_shared( *Executor, + std::move(switchableSession), std::move(session), std::move(client), + std::move(switchableClient), ThrottlerProvider, clientId, - volume.GetDiskId()); + volume.GetDiskId(), + request); } TClientAppConfigPtr TSessionManager::CreateClientConfig( const NProto::TStartEndpointRequest& request, TString host, - ui32 port) + ui32 port) const { NProto::TClientAppConfig clientAppConfig; auto& config = *clientAppConfig.MutableClientConfig(); @@ -842,7 +982,7 @@ TClientAppConfigPtr TSessionManager::CreateClientConfig( } TClientAppConfigPtr TSessionManager::CreateClientConfig( - const NProto::TStartEndpointRequest& request) + const NProto::TStartEndpointRequest& request) const { NProto::TClientAppConfig clientAppConfig; auto& config = *clientAppConfig.MutableClientConfig(); @@ -854,6 +994,7 @@ TClientAppConfigPtr TSessionManager::CreateClientConfig( return std::make_shared(std::move(clientAppConfig)); } +// static TSessionConfig TSessionManager::CreateSessionConfig( const NProto::TStartEndpointRequest& request) { @@ -869,6 +1010,125 @@ TSessionConfig TSessionManager::CreateSessionConfig( return config; } +void TSessionManager::SwitchSessionForEndpoint( + const TString& socketPath, + TEndpointPtr endpoint, + const TString& diskId, + const TString& newDiskId) +{ + STORAGE_INFO( + "Start session switching: " << diskId.Quote() << " -> " + << newDiskId.Quote()); + + auto callContext = MakeIntrusive(CreateRequestId()); + + // Prepare request for new endpoint + auto newStartRequest = endpoint->GetStartRequest(); + newStartRequest.SetDiskId(newDiskId); + + // Describe new disk + auto describeResponse = + DescribeVolume(callContext, newDiskId, newStartRequest.GetHeaders()); + if (HasError(describeResponse)) { + STORAGE_WARN( + "Describe volume " << newDiskId.Quote() << " failed: " + << describeResponse.GetError().GetMessage()); + return; + } + + // Start new endpoint + auto result = CreateEndpoint( + newStartRequest, + describeResponse.GetVolume(), + describeResponse.GetCellId()); + + if (HasError(result)) { + STORAGE_WARN( + "Can't create new Session for " << newDiskId.Quote() << " " + << FormatError(result.GetError())); + return; + } + + auto newEndpoint = std::move(result.ExtractResult()); + auto error = newEndpoint->Start( + callContext, + newEndpoint->GetStartRequest().GetHeaders()); + + if (HasError(error)) { + STORAGE_WARN( + "Can't start new endpoint for " << newEndpoint->GetDiskId().Quote() + << " " << FormatError(error)); + return; + } + + // Switch to new endpoint + with_lock (EndpointLock) { + if (!Endpoints.contains(socketPath)) { + STORAGE_WARN("Endpoint " << socketPath.Quote() << " not found"); + return; + } + + if (Endpoints[socketPath] != endpoint) { + STORAGE_WARN( + "Endpoint on " << socketPath.Quote() + << " is not the same as before"); + return; + } + + Endpoints[socketPath] = newEndpoint; + auto future = newEndpoint->SwitchSession(*endpoint); + future.Subscribe( + [callContext = std::move(callContext), + oldEndpoint = std::move(endpoint), + weakSelf = weak_from_this()](const TFuture& future) mutable + { + Y_UNUSED(future); + // Stop old endpoint when switch finished + if (auto self = weakSelf.lock()) { + self->StopEndpoint( + std::move(callContext), + std::move(oldEndpoint)); + } + }); + } + + ThrottlerProvider->Clean(); +} + +void TSessionManager::StopEndpoint( + TCallContextPtr callContext, + TEndpointPtr oldEndpoint) +{ + Executor->Execute( + [callContext = std::move(callContext), + oldEndpoint = std::move(oldEndpoint), + weakSelf = weak_from_this()]() + { + if (auto self = weakSelf.lock()) { + self->StopEndpointImpl(callContext, oldEndpoint); + } + }); +} + +void TSessionManager::StopEndpointImpl( + TCallContextPtr callContext, + TEndpointPtr oldEndpoint) +{ + auto error = oldEndpoint->Stop( + std::move(callContext), + oldEndpoint->GetStartRequest().GetHeaders()); + + if (HasError(error)) { + STORAGE_WARN( + "Stop old endpoint for " << oldEndpoint->GetDiskId().Quote() + << "error :" << FormatError(error)); + } else { + STORAGE_INFO( + "Stop old endpoint for " << oldEndpoint->GetDiskId().Quote() + << " success"); + } +} + } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/endpoints/session_manager_ut.cpp b/cloud/blockstore/libs/endpoints/session_manager_ut.cpp index dea4182ae84..4879594a9ea 100644 --- a/cloud/blockstore/libs/endpoints/session_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/session_manager_ut.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -124,8 +125,9 @@ Y_UNIT_TEST_SUITE(TSessionManagerTest) auto service = std::make_shared(); service->DescribeVolumeHandler = [&] (std::shared_ptr request) { - Y_UNUSED(request); - return MakeFuture(NProto::TDescribeVolumeResponse()); + auto response = NProto::TDescribeVolumeResponse(); + response.MutableVolume()->SetDiskId(request->GetDiskId()); + return MakeFuture(std::move(response)); }; service->MountVolumeHandler = [&] (std::shared_ptr request) { @@ -230,8 +232,9 @@ Y_UNIT_TEST_SUITE(TSessionManagerTest) auto service = std::make_shared(); service->DescribeVolumeHandler = [&] (std::shared_ptr request) { - Y_UNUSED(request); - return MakeFuture(NProto::TDescribeVolumeResponse()); + auto response = NProto::TDescribeVolumeResponse(); + response.MutableVolume()->SetDiskId(request->GetDiskId()); + return MakeFuture(std::move(response)); }; service->MountVolumeHandler = [&] (std::shared_ptr request) { @@ -333,8 +336,9 @@ Y_UNIT_TEST_SUITE(TSessionManagerTest) service->DescribeVolumeHandler = [&](std::shared_ptr request) { - Y_UNUSED(request); - return MakeFuture(NProto::TDescribeVolumeResponse()); + auto response = NProto::TDescribeVolumeResponse(); + response.MutableVolume()->SetDiskId(request->GetDiskId()); + return MakeFuture(std::move(response)); }; service->MountVolumeHandler = [&](std::shared_ptr request) @@ -479,6 +483,173 @@ Y_UNIT_TEST_SUITE(TSessionManagerTest) { ShouldDisableClientThrottler(false); } + + Y_UNIT_TEST(ShouldSwitchSessionByMountResponseWithPrincipalDiskId) + { + TString socketPath = "testSocket"; + TString diskId = "testDiskId"; + + auto timer = CreateCpuCycleTimer(); + auto scheduler = CreateScheduler(timer); + scheduler->Start(); + + auto service = std::make_shared(); + service->DescribeVolumeHandler = + [&] (std::shared_ptr request) { + auto response = NProto::TDescribeVolumeResponse(); + response.MutableVolume()->SetDiskId(request->GetDiskId()); + return MakeFuture(std::move(response)); + }; + // Setting up the handler for the mount request, which will add the + // PrincipalDiskId when mounting testDiskId, which will lead to switch + // session. + service->MountVolumeHandler = + [&] (std::shared_ptr request) { + NProto::TMountVolumeResponse response; + response.MutableVolume()->SetDiskId(request->GetDiskId()); + if (request->GetDiskId() == diskId) { + // Response with filled PrincipalDiskId will switch session. + response.MutableVolume()->SetPrincipalDiskId( + NStorage::GetNextDiskId(diskId)); + } + response.SetInactiveClientsTimeout(100); + return MakeFuture(response); + }; + service->UnmountVolumeHandler = + [&] (std::shared_ptr request) { + Y_UNUSED(request); + return MakeFuture(NProto::TUnmountVolumeResponse()); + }; + + // Setting up the handler to respond E_REJECTED if the request handled + // by testDiskId and S_OK if the request handled by disk testDiskId-copy + service->ReadBlocksLocalHandler = + [&](std::shared_ptr request) + -> TFuture + { + if (request->GetDiskId() == diskId) { + return MakeFuture( + TErrorResponse(E_REJECTED)); + } + if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) { + return MakeFuture( + TErrorResponse(S_OK)); + } + return MakeFuture( + TErrorResponse(E_ARGUMENT, "Unexpected disk id")); + }; + + auto executor = TExecutor::Create("TestService"); + auto logging = CreateLoggingService("console"); + auto encryptionClientFactory = CreateEncryptionClientFactory( + logging, + CreateDefaultEncryptionKeyProvider(), + NProto::EZP_WRITE_ENCRYPTED_ZEROS); + + auto sessionManager = CreateSessionManager( + CreateWallClockTimer(), + scheduler, + logging, + CreateMonitoringServiceStub(), + CreateRequestStatsStub(), + CreateVolumeStatsStub(), + CreateServerStatsStub(), + service, + CreateCellManagerStub(), + CreateDefaultStorageProvider(service), + encryptionClientFactory, + executor, + TSessionManagerOptions()); + + + executor->Start(); + Y_DEFER { + executor->Stop(); + }; + + NProto::TStartEndpointRequest request; + request.SetUnixSocketPath(socketPath); + request.SetDiskId(diskId); + request.SetClientId("testClientId"); + request.SetIpcType(NProto::IPC_VHOST); + + // Create session to testDiskId + NClient::ISessionPtr session; + { + auto future = sessionManager->CreateSession( + MakeIntrusive(), + request); + + auto sessionOrError = future.GetValue(TDuration::Seconds(3)); + UNIT_ASSERT_C(!HasError(sessionOrError), sessionOrError.GetError()); + auto sessionInfo = sessionOrError.ExtractResult(); + session = sessionInfo.Session; + } + + // Switch from testDiskId to testDiskId-copy. + + { + // Make sure that the read response processed by testDiskId-copy + // that responds with S_OK. + auto future = session->ReadBlocksLocal( + MakeIntrusive(), + std::make_shared()); + auto response = future.GetValue(TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + response.GetError().GetCode(), + FormatError(response.GetError())); + } + + // Switch from testDiskId-copy to testDiskId. + + // Setting up the handler for the mount request, which will add the + // PrincipalDiskId when mounting testDiskId-copy, which will lead to + // switch session back to testDiskId. + service->MountVolumeHandler = + [&](std::shared_ptr request) + { + NProto::TMountVolumeResponse response; + response.MutableVolume()->SetDiskId(request->GetDiskId()); + if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) { + // Response with filled PrincipalDiskId will switch session. + response.MutableVolume()->SetPrincipalDiskId(diskId); + } + response.SetInactiveClientsTimeout(100); + return MakeFuture(response); + }; + + // Setting up the handler to respond E_REJECTED if the request handled + // by testDiskId and S_OK if the request handled by disk testDiskId-copy + service->ReadBlocksLocalHandler = + [&](std::shared_ptr request) + -> TFuture + { + if (request->GetDiskId() == diskId) { + return MakeFuture( + TErrorResponse(S_OK)); + } + if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) { + return MakeFuture( + TErrorResponse(E_REJECTED)); + } + return MakeFuture( + TErrorResponse(E_ARGUMENT, "Unexpected disk id")); + }; + + { + // Make sure that the read response processed by testDiskId-copy + // that responds with S_OK. + auto future = session->ReadBlocksLocal( + MakeIntrusive(), + std::make_shared()); + auto response = future.GetValue(TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + response.GetError().GetCode(), + FormatError(response.GetError())); + } + } } } // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/service/public.h b/cloud/blockstore/libs/service/public.h index 896391d8120..266d3b8255c 100644 --- a/cloud/blockstore/libs/service/public.h +++ b/cloud/blockstore/libs/service/public.h @@ -17,6 +17,9 @@ using IBlockStorePtr = std::shared_ptr; struct ISwitchableBlockStore; using ISwitchableBlockStorePtr = std::shared_ptr; +struct ISessionSwitcher; +using ISessionSwitcherWeakPtr = std::weak_ptr; + struct IAuthProvider; using IAuthProviderPtr = std::shared_ptr; diff --git a/cloud/blockstore/libs/service/service_method.h b/cloud/blockstore/libs/service/service_method.h index 9d219af6d42..1e20d114365 100644 --- a/cloud/blockstore/libs/service/service_method.h +++ b/cloud/blockstore/libs/service/service_method.h @@ -29,6 +29,11 @@ struct TBlockStoreMethodTraits { return IsReadRequest() || IsWriteRequest(); } + + static constexpr bool IsMountRequest() + { + return std::is_same_v; + } }; //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/storage/volume/actors/follower_disk_actor.cpp b/cloud/blockstore/libs/storage/volume/actors/follower_disk_actor.cpp index 600aa7e006a..e25cbf0e78e 100644 --- a/cloud/blockstore/libs/storage/volume/actors/follower_disk_actor.cpp +++ b/cloud/blockstore/libs/storage/volume/actors/follower_disk_actor.cpp @@ -89,7 +89,7 @@ void TFollowerDiskActor::OnBootstrap(const NActors::TActorContext& ctx) LOG_INFO( ctx, TBlockStoreComponents::PARTITION, - "%s Follower created %s", + "%s Follower actor created %s", LogTitle.GetWithTime().c_str(), ToString(FollowerDiskInfo.State).Quote().c_str()); @@ -378,6 +378,9 @@ void TFollowerDiskActor::HandleUpdateFollowerStateResponse( const auto* msg = ev->Get(); FollowerDiskInfo = msg->Follower; ApplyLinkState(ctx); + if (State == EState::LeadershipTransferredAndPersisted) { + RebootLeaderVolume(ctx, TDuration()); + } } void TFollowerDiskActor::HandlePropagateLeadershipToFollowerResponse( diff --git a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp index 49e473a60a3..9148cac3a64 100644 --- a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp +++ b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp @@ -347,9 +347,12 @@ TVolumeClient::CreateUpdateVolumeConfigRequest( return request; } -std::unique_ptr TVolumeClient::CreateWaitReadyRequest() +std::unique_ptr +TVolumeClient::CreateWaitReadyRequest(TString diskId) { - return std::make_unique(); + auto request = std::make_unique(); + request->Record.SetDiskId(std::move(diskId)); + return request; } std::unique_ptr TVolumeClient::CreateAddClientRequest( diff --git a/cloud/blockstore/libs/storage/volume/testlib/test_env.h b/cloud/blockstore/libs/storage/volume/testlib/test_env.h index 57bae640d69..cd29f864c3e 100644 --- a/cloud/blockstore/libs/storage/volume/testlib/test_env.h +++ b/cloud/blockstore/libs/storage/volume/testlib/test_env.h @@ -383,7 +383,8 @@ class TVolumeClient }; } - std::unique_ptr CreateWaitReadyRequest(); + std::unique_ptr CreateWaitReadyRequest( + TString diskId = {}); std::unique_ptr CreateAddClientRequest( const NProto::TVolumeClientInfo& info); diff --git a/cloud/blockstore/libs/storage/volume/volume_ut_linked.cpp b/cloud/blockstore/libs/storage/volume/volume_ut_linked.cpp index 2a89761a966..3b76b8eb692 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut_linked.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_ut_linked.cpp @@ -230,8 +230,12 @@ struct TFixture: public NUnitTest::TBaseFixture status->Record.GetCheckpointStatus()); } - [[nodiscard]] bool WriteBlocks(TBlockRange64 range) const + [[nodiscard]] bool WriteBlocks( + TBlockRange64 range, + std::optional* followerState = nullptr) const { + using EState = TFollowerDiskInfo::EState; + const auto* volume = Volumes.FindPtr("vol1"); if (!volume) { UNIT_ASSERT_C(false, "Volume not found"); @@ -243,6 +247,12 @@ struct TFixture: public NUnitTest::TBaseFixture range, volume->VolumeClientInfo->GetClientId(), 'b'); + if (followerState && followerState->has_value() && + (followerState->value() == EState::DataReady || + followerState->value() == EState::LeadershipTransferred)) + { + return false; + } auto response = volume->VolumeClient->RecvWriteBlocksResponse(); if (response->GetStatus() == S_OK) { return true; @@ -809,6 +819,10 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) UNIT_ASSERT_EQUAL( TFollowerDiskInfo::EState::LeadershipTransferred, followerState); + // It is necessary to reconnect the pipe as the previous one broke + // when the volume was restarted. + volume1.ReconnectPipe(); + volume1.WaitReady(); } if (checkpoint == ECheckpointBehaviour::CreateBeforeLink || @@ -819,7 +833,6 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) // Check volumes content match. volume1.UnlinkLeaderVolumeFromFollower(link); - volume1.ReconnectPipe(); fixture.CheckVolumesDataMatch(); } @@ -1055,7 +1068,8 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) for (ui64 pos = 0;; pos += 2048) { bool success = fixture.WriteBlocks( TBlockRange64::MakeOneBlock( - ((pos + 1) % fixture.VolumeBlockCount))); + ((pos + 1) % fixture.VolumeBlockCount)), + &followerState); writtenBlockCount += success ? 1 : 0; TDispatchOptions options; @@ -1103,6 +1117,9 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) UNIT_ASSERT_VALUES_EQUAL(true, deletionOfTheSourceHasBeenInitiated); } + volume1.ReconnectPipe(); + volume1.WaitReady(); + // Check volumes content match. volume1.UnlinkLeaderVolumeFromFollower(link); volume1.ReconnectPipe(); @@ -1153,7 +1170,6 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) auto* msg = event->Get< TEvVolumePrivate::TEvUpdateFollowerStateResponse>(); followerState = msg->Follower.State; - leaderVolumeActorId = event->Sender; } if (event->GetTypeRewrite() == @@ -1164,6 +1180,13 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) propagatedAction = msg->Record.GetAction(); } + if (event->GetTypeRewrite() == TEvVolume::EvWaitReadyRequest) { + auto* msg = event->Get(); + if (msg->Record.GetDiskId() == "vol1") { + leaderVolumeActorId = event->Recipient; + } + } + if (event->GetTypeRewrite() == TEvService::EvDestroyVolumeRequest) { ++volumeDestructionRequestCount; auto* msg = event->Get(); @@ -1207,7 +1230,7 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) NProto::STORAGE_MEDIA_SSD_NONREPLICATED, VolumeBlockCount, // block count per partition "vol1")); - volume1.WaitReady(); + volume1.WaitReady("vol1"); // registering a writer auto clientInfo1 = CreateVolumeClientInfo( @@ -1269,6 +1292,16 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) UNIT_ASSERT_EQUAL( TFollowerDiskInfo::EState::LeadershipTransferred, followerState); + + // Leader volume actor should be poisoned + UNIT_ASSERT_VALUES_EQUAL(true, leaderPoisoned); + leaderPoisoned = false; + + // It is necessary to reconnect the pipe as the previous one broke + // when the partition was restarted. + volume1.ReconnectPipe(); + volume1.WaitReady("vol1"); + volume1.AddClient(clientInfo1); } // Waiting for the follower disk to become a leader. @@ -1298,34 +1331,36 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) UNIT_ASSERT_VALUES_EQUAL(1, volumeDestructionRequestCount); } - // Waiting for the leader disk destruction retried after reject. + // Waiting for the leader volume actor poisoned by + // FollowerPartitionActor after 30 seconds. { + UNIT_ASSERT_VALUES_EQUAL(false, leaderPoisoned); TDispatchOptions options; options.CustomFinalCondition = [&] { - return volumeDestructionRequestCount == 2; + return leaderPoisoned; }; Runtime->AdvanceCurrentTime(TDuration::Seconds(30)); Runtime->DispatchEvents(options, TDuration::Seconds(1)); Runtime->AdvanceCurrentTime(TDuration::Seconds(30)); Runtime->DispatchEvents(options, TDuration::Seconds(1)); - UNIT_ASSERT_VALUES_EQUAL(2, volumeDestructionRequestCount); + UNIT_ASSERT_VALUES_EQUAL(true, leaderPoisoned); } - // Waiting for the leader volume actor poisoned by - // FollowerPartitionActor. + // Waiting for the leader disk destruction retried after reject. { - UNIT_ASSERT_VALUES_EQUAL(false, leaderPoisoned); TDispatchOptions options; - Runtime->AdvanceCurrentTime(TDuration::Seconds(30)); options.CustomFinalCondition = [&] { - return leaderPoisoned; + return volumeDestructionRequestCount == 2; }; + Runtime->AdvanceCurrentTime(TDuration::Seconds(30)); + Runtime->DispatchEvents(options, TDuration::Seconds(1)); + Runtime->AdvanceCurrentTime(TDuration::Seconds(30)); Runtime->DispatchEvents(options, TDuration::Seconds(1)); - UNIT_ASSERT_VALUES_EQUAL(false, leaderPoisoned); + UNIT_ASSERT_VALUES_EQUAL(2, volumeDestructionRequestCount); } } @@ -1772,6 +1807,11 @@ Y_UNIT_TEST_SUITE(TLinkedVolumeTest) UNIT_ASSERT_EQUAL( TFollowerDiskInfo::EState::LeadershipTransferred, followerState); + + // It is necessary to reconnect the pipe as the previous one broke + // when the volume was restarted. + volume1.ReconnectPipe(); + volume1.WaitReady(); } { // State of leader ELeadershipStatus::LeadershipTransferring diff --git a/example/8-backward.sh b/example/8-backward.sh new file mode 100755 index 00000000000..325415ac5a4 --- /dev/null +++ b/example/8-backward.sh @@ -0,0 +1,6 @@ +./5-create_disk.sh -k nonreplicated -d nrd1 +#./blockstore-client.sh CreateVolume --disk-id=nrd1 --storage-media-kind nonreplicated --blocks-count=131072 --block-size=8192 + +./blockstore-client.sh ExecuteAction --action=modifytags --input-bytes='{"DiskId":"nrd1","TagsToAdd":"source-disk-id=nrd1-copy","TagsToRemove":""}' + +./blockstore-client.sh createvolumelink --leader-disk-id=nrd1-copy --follower-disk-id=nrd1 diff --git a/example/8-forward.sh b/example/8-forward.sh new file mode 100755 index 00000000000..b43fbaec018 --- /dev/null +++ b/example/8-forward.sh @@ -0,0 +1,6 @@ +./5-create_disk.sh -k nonreplicated -d nrd1-copy +#./blockstore-client.sh CreateVolume --disk-id=nrd1-copy --storage-media-kind nonreplicated --blocks-count=131072 --block-size=8192 + +./blockstore-client.sh ExecuteAction --action=modifytags --input-bytes='{"DiskId":"nrd1-copy","TagsToAdd":"source-disk-id=nrd1","TagsToRemove":""}' + +./blockstore-client.sh createvolumelink --leader-disk-id=nrd1 --follower-disk-id=nrd1-copy diff --git a/example/8-start.sh b/example/8-start.sh new file mode 100755 index 00000000000..268a4f9eff7 --- /dev/null +++ b/example/8-start.sh @@ -0,0 +1,2 @@ +./5-create_disk.sh -k nonreplicated -d nrd1 +#./blockstore-client.sh CreateVolume --disk-id=nrd1 --storage-media-kind nonreplicated --blocks-count=131072 --block-size=8192 diff --git a/example/8-stop.sh b/example/8-stop.sh new file mode 100755 index 00000000000..3df4a713e26 --- /dev/null +++ b/example/8-stop.sh @@ -0,0 +1,3 @@ +./blockstore-client.sh stopendpoint --socket /tmp/nrd1.sock +./blockstore-client.sh destroyvolume --disk-id=nrd1 +./blockstore-client.sh destroyvolume --disk-id=nrd1-copy