Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/blockstore/libs/client/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>

#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/common/scheduler.h>
#include <cloud/storage/core/libs/common/timer.h>
Expand Down
50 changes: 47 additions & 3 deletions cloud/blockstore/libs/client/switchable_client.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include "switchable_client.h"

#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service_method.h>

#include <cloud/storage/core/libs/common/helpers.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

#include <util/generic/vector.h>
Expand Down Expand Up @@ -43,6 +43,11 @@ class TDeferredRequestsHolder
TVector<TRequestInfo> Requests;

public:
~TDeferredRequestsHolder()
{
Y_DEBUG_ABORT_UNLESS(Requests.empty());
}

TFuture<TResponse> SaveRequest(
TCallContextPtr callContext,
std::shared_ptr<TRequest> request)
Expand Down Expand Up @@ -97,8 +102,8 @@ class TSwitchableBlockStore final
{
private:
TLog Log;

TClientInfo PrimaryClientInfo;
const ISessionSwitcherWeakPtr SessionSwitcher;
const TClientInfo PrimaryClientInfo;
TClientInfo SecondaryClientInfo;

// BeforeSwitching() sets the WillSwitchToSecondary to true. After that,
Expand All @@ -118,9 +123,11 @@ class TSwitchableBlockStore final
public:
TSwitchableBlockStore(
ILoggingServicePtr logging,
ISessionSwitcherWeakPtr sessionSwitcher,
TString diskId,
IBlockStorePtr client)
: Log(logging->CreateLog("BLOCKSTORE_CLIENT"))
, SessionSwitcher(std::move(sessionSwitcher))
, PrimaryClientInfo(
{.Client = std::move(client),
.DiskId = std::move(diskId),
Expand Down Expand Up @@ -209,6 +216,11 @@ class TSwitchableBlockStore final
std::move(callContext),
std::move(request));
}
if constexpr (TMethod::IsMountRequest()) {
return ExecuteMountRequest(
std::move(callContext),
std::move(request));
}
return TMethod::Execute(
PrimaryClientInfo.Client.get(),
std::move(callContext),
Expand Down Expand Up @@ -267,6 +279,36 @@ class TSwitchableBlockStore final
std::move(callContext),
std::move(request));
}

TFuture<NProto::TMountVolumeResponse> ExecuteMountRequest(
TCallContextPtr callContext,
std::shared_ptr<NProto::TMountVolumeRequest> request)
{
TFuture<NProto::TMountVolumeResponse> future =
PrimaryClientInfo.Client->MountVolume(
std::move(callContext),
std::move(request));

return future.Apply(
[sessionSwitcher = SessionSwitcher] //
(TFuture<NProto::TMountVolumeResponse> future)
-> NProto::TMountVolumeResponse
{
NProto::TMountVolumeResponse response = future.ExtractValue();

if (!HasError(response) &&
response.GetVolume().GetPrincipalDiskId())
{
if (auto switcher = sessionSwitcher.lock()) {
switcher->SwitchSession(
response.GetVolume().GetDiskId(),
response.GetVolume().GetPrincipalDiskId());
}
}

return response;
});
}
};

} // namespace
Expand Down Expand Up @@ -295,11 +337,13 @@ class TSessionSwitchingGuard

ISwitchableBlockStorePtr CreateSwitchableClient(
ILoggingServicePtr logging,
ISessionSwitcherWeakPtr sessionSwitcher,
TString diskId,
IBlockStorePtr client)
{
return std::make_shared<TSwitchableBlockStore>(
std::move(logging),
std::move(sessionSwitcher),
std::move(diskId),
std::move(client));
}
Expand Down
12 changes: 12 additions & 0 deletions cloud/blockstore/libs/client/switchable_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ struct ISwitchableBlockStore: public IBlockStore

////////////////////////////////////////////////////////////////////////////////

struct ISessionSwitcher
{
virtual ~ISessionSwitcher() = default;

virtual void SwitchSession(
const TString& diskId,
const TString& newDiskId) = 0;
};

////////////////////////////////////////////////////////////////////////////////

// ISwitchableBlockStore is used for dynamic switching between clients. By
// default, all requests are sent to the first client. After switching to the
// second client using the Switch() method, the read/write/zero requests start
Expand All @@ -45,6 +56,7 @@ struct ISwitchableBlockStore: public IBlockStore

ISwitchableBlockStorePtr CreateSwitchableClient(
ILoggingServicePtr logging,
ISessionSwitcherWeakPtr sessionSwitcher,
TString diskId,
IBlockStorePtr client);

Expand Down
87 changes: 87 additions & 0 deletions cloud/blockstore/libs/client/switchable_client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include <cloud/blockstore/libs/service/service_method.h>
#include <cloud/blockstore/libs/service/service_test.h>
#include <cloud/blockstore/libs/storage/model/volume_label.h>

#include <cloud/storage/core/libs/common/helpers.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

#include <library/cpp/testing/unittest/registar.h>
Expand Down Expand Up @@ -32,6 +34,20 @@ concept HasSessionId = requires(T& obj) {
{ obj.GetSessionId() } -> std::convertible_to<TString>;
};

struct TTestSessionSwitcher
: public ISessionSwitcher
, public std::enable_shared_from_this<TTestSessionSwitcher>
{
TString DiskId;
TString NewDiskId;

void SwitchSession(const TString& diskId, const TString& newDiskId) override
{
DiskId = diskId;
NewDiskId = newDiskId;
}
};

template <typename TRequest, typename TResponse>
class TTestMethod
{
Expand Down Expand Up @@ -201,9 +217,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest)
{
auto client1 = std::make_shared<TTestService>();
auto client2 = std::make_shared<TTestService>();
auto sessionSwitcher = std::make_shared<TTestSessionSwitcher>();

auto switchableClient = CreateSwitchableClient(
CreateLoggingService("console"),
sessionSwitcher,
PrimaryDiskId,
client1);

Expand Down Expand Up @@ -249,9 +267,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest)
{
auto client1 = std::make_shared<TTestService>();
auto client2 = std::make_shared<TTestService>();
auto sessionSwitcher = std::make_shared<TTestSessionSwitcher>();

auto switchableClient = CreateSwitchableClient(
CreateLoggingService("console"),
sessionSwitcher,
PrimaryDiskId,
client1);

Expand Down Expand Up @@ -335,9 +355,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest)
{
auto client1 = std::make_shared<TTestService>();
auto client2 = std::make_shared<TTestService>();
auto sessionSwitcher = std::make_shared<TTestSessionSwitcher>();

auto switchableClient = CreateSwitchableClient(
CreateLoggingService("console"),
sessionSwitcher,
PrimaryDiskId,
client1);

Expand Down Expand Up @@ -390,9 +412,11 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest)
{
auto client1 = std::make_shared<TTestService>();
auto client2 = std::make_shared<TTestService>();
auto sessionSwitcher = std::make_shared<TTestSessionSwitcher>();

auto switchableClient = CreateSwitchableClient(
CreateLoggingService("console"),
sessionSwitcher,
PrimaryDiskId,
client1);

Expand Down Expand Up @@ -459,6 +483,69 @@ Y_UNIT_TEST_SUITE(TSwitchableClientTest)
switchableClient,
EHandledOn::Primary);
}

Y_UNIT_TEST(ShouldSwitchSessionIfMountResponseContainsPrincipalDiskId)
{
auto client1 = std::make_shared<TTestService>();
auto sessionSwitcher = std::make_shared<TTestSessionSwitcher>();

auto switchableClient = CreateSwitchableClient(
CreateLoggingService("console"),
sessionSwitcher,
PrimaryDiskId,
client1);

size_t sessionNum = 0;
bool setPrincipalDisk = false;
client1->MountVolumeHandler =
[&](std::shared_ptr<NProto::TMountVolumeRequest> request)
{
UNIT_ASSERT_EQUAL(request->GetDiskId(), PrimaryDiskId);

NProto::TMountVolumeResponse response;
response.SetSessionId(ToString(++sessionNum));

auto& volume = *response.MutableVolume();
volume.SetDiskId(request->GetDiskId());
volume.SetBlockSize(DefaultBlockSize);
volume.SetBlocksCount(100);
if (setPrincipalDisk) {
volume.SetPrincipalDiskId(
NStorage::GetNextDiskId(request->GetDiskId()));
}

return MakeFuture(response);
};

{ // If the mount response does not contain the PrincipalDiskId, then
// the call SwitchSession of ISessionSwitcher should not occur.
TCallContextPtr callContext = MakeIntrusive<TCallContext>();
auto request = std::make_shared<NProto::TMountVolumeRequest>();
request->SetDiskId(PrimaryDiskId);
auto future =
switchableClient->MountVolume(callContext, std::move(request));
const auto& response = future.GetValue();
UNIT_ASSERT(!HasError(response));
UNIT_ASSERT_VALUES_EQUAL("", sessionSwitcher->DiskId);
UNIT_ASSERT_VALUES_EQUAL("", sessionSwitcher->NewDiskId);
}

{ // If the mount response contains the PrincipalDiskId, then call
// SwitchSession of ISessionSwitcher should occur.
setPrincipalDisk = true;
TCallContextPtr callContext = MakeIntrusive<TCallContext>();
auto request = std::make_shared<NProto::TMountVolumeRequest>();
request->SetDiskId(PrimaryDiskId);
auto future =
switchableClient->MountVolume(callContext, std::move(request));
const auto& response = future.GetValue();
UNIT_ASSERT(!HasError(response));
UNIT_ASSERT_VALUES_EQUAL(PrimaryDiskId, sessionSwitcher->DiskId);
UNIT_ASSERT_VALUES_EQUAL(
NStorage::GetNextDiskId(PrimaryDiskId),
sessionSwitcher->NewDiskId);
}
}
}

} // namespace NCloud::NBlockStore::NClient
6 changes: 5 additions & 1 deletion cloud/blockstore/libs/client/switchable_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ TTestSession CreateTestSession(const TString& diskId, const TString& sessionId)
};
}

auto switchableClient = CreateSwitchableClient(logging, diskId, client);
auto switchableClient = CreateSwitchableClient(
logging,
ISessionSwitcherWeakPtr(),
diskId,
client);

auto durable = CreateDurableClient(
config,
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/client/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
LIBRARY()

INCLUDE(${ARCADIA_ROOT}/cloud/storage/deny_ydb_dependency.inc)

SRCS(
client.cpp
config.cpp
Expand All @@ -20,6 +22,7 @@ PEERDIR(
cloud/blockstore/libs/common
cloud/blockstore/libs/diagnostics
cloud/blockstore/libs/service
cloud/blockstore/libs/storage/model
cloud/blockstore/libs/throttling
cloud/storage/core/libs/grpc
cloud/storage/core/libs/throttling
Expand Down
Loading
Loading