Skip to content

Commit 658b347

Browse files
committed
Live migration to the new leader volume.
1 parent ffd016b commit 658b347

File tree

13 files changed

+542
-46
lines changed

13 files changed

+542
-46
lines changed

cloud/blockstore/apps/client/lib/command.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,8 @@ NProto::TMountVolumeResponse TCommand::MountVolume(
351351
VolumeStats,
352352
endpoint,
353353
ClientConfig,
354-
sessionConfig);
354+
sessionConfig,
355+
ISessionSwitcherWeakPtr());
355356

356357
auto response = SafeExecute<NProto::TMountVolumeResponse>([&] {
357358
return WaitFor(session->MountVolume());

cloud/blockstore/libs/client/session.cpp

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <cloud/blockstore/libs/service/context.h>
99
#include <cloud/blockstore/libs/service/request_helpers.h>
1010
#include <cloud/blockstore/libs/service/service.h>
11+
#include <cloud/blockstore/libs/storage/model/volume_label.h>
12+
1113
#include <cloud/storage/core/libs/common/error.h>
1214
#include <cloud/storage/core/libs/common/scheduler.h>
1315
#include <cloud/storage/core/libs/common/timer.h>
@@ -122,6 +124,7 @@ class TSession final
122124
const IBlockStorePtr Client;
123125
const TClientAppConfigPtr Config;
124126
TSessionConfig SessionConfig;
127+
ISessionSwitcherWeakPtr SessionSwitcherPtr;
125128

126129
TLog Log;
127130
TSessionInfo SessionInfo;
@@ -135,7 +138,8 @@ class TSession final
135138
IVolumeStatsPtr volumeStats,
136139
IBlockStorePtr client,
137140
TClientAppConfigPtr config,
138-
const TSessionConfig& sessionConfig);
141+
const TSessionConfig& sessionConfig,
142+
ISessionSwitcherWeakPtr sessionSwitcherPtr);
139143

140144
ui32 GetMaxTransfer() const override;
141145

@@ -251,7 +255,8 @@ TSession::TSession(
251255
IVolumeStatsPtr volumeStats,
252256
IBlockStorePtr client,
253257
TClientAppConfigPtr config,
254-
const TSessionConfig& sessionConfig)
258+
const TSessionConfig& sessionConfig,
259+
ISessionSwitcherWeakPtr sessionSwitcherPtr)
255260
: Timer(std::move(timer))
256261
, Scheduler(std::move(scheduler))
257262
, Logging(std::move(logging))
@@ -260,6 +265,7 @@ TSession::TSession(
260265
, Client(std::move(client))
261266
, Config(std::move(config))
262267
, SessionConfig(sessionConfig)
268+
, SessionSwitcherPtr(std::move(sessionSwitcherPtr))
263269
, Log(Logging->CreateLog("BLOCKSTORE_CLIENT"))
264270
{}
265271

@@ -629,6 +635,18 @@ void TSession::ProcessMountResponse(
629635
SessionConfig.InstanceId)
630636
<< " complete request");
631637

638+
if (response.GetVolume().GetPrincipalDiskId()) {
639+
Y_DEBUG_ABORT_UNLESS(
640+
response.GetVolume().GetPrincipalDiskId() ==
641+
NStorage::GetNextDiskId(response.GetVolume().GetDiskId()));
642+
643+
if (auto switcher = SessionSwitcherPtr.lock()) {
644+
switcher->SwitchSession(
645+
response.GetVolume().GetDiskId(),
646+
response.GetVolume().GetPrincipalDiskId());
647+
}
648+
}
649+
632650
SessionInfo.MountState = EMountState::MountCompleted;
633651
SessionInfo.BlockSize = response.GetVolume().GetBlockSize();
634652
SessionInfo.SessionId = response.GetSessionId();
@@ -964,6 +982,15 @@ void TSession::HandleResponse(
964982

965983
ForceVolumeRemount(sessionId);
966984
HandleRequest<T>(std::move(callContext), std::move(request), response);
985+
986+
if (HasProtoFlag(errorFlags, NProto::EF_OUTDATED_VOLUME)) {
987+
if (auto switcher = SessionSwitcherPtr.lock()) {
988+
switcher->SwitchSession(
989+
SessionConfig.DiskId,
990+
NStorage::GetNextDiskId(SessionConfig.DiskId));
991+
}
992+
}
993+
967994
return;
968995
}
969996

@@ -1020,7 +1047,8 @@ ISessionPtr CreateSession(
10201047
IVolumeStatsPtr volumeStats,
10211048
IBlockStorePtr client,
10221049
TClientAppConfigPtr config,
1023-
const TSessionConfig& sessionConfig)
1050+
const TSessionConfig& sessionConfig,
1051+
ISessionSwitcherWeakPtr sessionSwitcherPtr)
10241052
{
10251053
return std::make_shared<TSession>(
10261054
std::move(timer),
@@ -1030,7 +1058,8 @@ ISessionPtr CreateSession(
10301058
std::move(volumeStats),
10311059
std::move(client),
10321060
std::move(config),
1033-
sessionConfig);
1061+
sessionConfig,
1062+
std::move(sessionSwitcherPtr));
10341063
}
10351064

10361065
} // namespace NCloud::NBlockStore::NClient

cloud/blockstore/libs/client/session.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ struct ISession
4343

4444
////////////////////////////////////////////////////////////////////////////////
4545

46+
struct ISessionSwitcher
47+
{
48+
virtual ~ISessionSwitcher() = default;
49+
50+
virtual void SwitchSession(
51+
const TString& diskId,
52+
const TString& newDiskId) = 0;
53+
};
54+
using ISessionSwitcherWeakPtr = std::weak_ptr<ISessionSwitcher>;
55+
56+
////////////////////////////////////////////////////////////////////////////////
57+
4658
struct TSessionConfig
4759
{
4860
TString DiskId;
@@ -73,6 +85,7 @@ ISessionPtr CreateSession(
7385
IVolumeStatsPtr volumeStats,
7486
IBlockStorePtr client,
7587
TClientAppConfigPtr config,
76-
const TSessionConfig& sessionConfig);
88+
const TSessionConfig& sessionConfig,
89+
ISessionSwitcherWeakPtr sessionSwitcherPtr);
7790

7891
} // namespace NCloud::NBlockStore::NClient

cloud/blockstore/libs/client/session_ut.cpp

Lines changed: 186 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
#include "client.h"
44
#include "config.h"
55

6+
#include <cloud/storage/core/libs/common/helpers.h>
67
#include <cloud/blockstore/libs/diagnostics/request_stats.h>
78
#include <cloud/blockstore/libs/diagnostics/volume_stats.h>
89
#include <cloud/blockstore/libs/service/context.h>
910
#include <cloud/blockstore/libs/service/service_test.h>
11+
#include <cloud/blockstore/libs/storage/model/volume_label.h>
1012

1113
#include <cloud/storage/core/libs/common/error.h>
1214
#include <cloud/storage/core/libs/common/scheduler_test.h>
@@ -35,6 +37,26 @@ static constexpr ui32 DefaultBlocksCount = 1024;
3537

3638
////////////////////////////////////////////////////////////////////////////////
3739

40+
struct TTestSessionSwitcher
41+
: public ISessionSwitcher
42+
, public std::enable_shared_from_this<TTestSessionSwitcher>
43+
{
44+
struct TSwitchInfo
45+
{
46+
TString DiskId;
47+
TString NewDiskId;
48+
};
49+
TVector<TSwitchInfo> Switches;
50+
51+
void SwitchSession(const TString& diskId, const TString& newDiskId) override
52+
{
53+
Switches.push_back(
54+
TSwitchInfo{.DiskId = diskId, .NewDiskId = newDiskId});
55+
}
56+
};
57+
using TTestSessionSwitcherPtr = std::shared_ptr<TTestSessionSwitcher>;
58+
59+
////////////////////////////////////////////////////////////////////////////////
3860
class TBootstrap
3961
{
4062
ITimerPtr Timer;
@@ -44,6 +66,9 @@ class TBootstrap
4466

4567
IBlockStorePtr Client;
4668

69+
TTestSessionSwitcherPtr SessionSwitcher =
70+
std::make_shared<TTestSessionSwitcher>();
71+
4772
ISessionPtr Session;
4873

4974
public:
@@ -59,19 +84,19 @@ class TBootstrap
5984
, Config(std::make_shared<TClientAppConfig>())
6085
, Client(std::move(client))
6186
, Session(CreateSession(
62-
Timer,
63-
Scheduler,
64-
Logging,
65-
CreateRequestStatsStub(),
66-
CreateVolumeStatsStub(),
67-
Client,
68-
Config,
69-
TSessionConfig{
70-
.DiskId = DefaultDiskId,
71-
.MountToken = DefaultMountToken,
72-
.ClientVersionInfo = std::move(clientVersionInfo),
73-
.MountSeqNumber = mountSeqNumber
74-
}))
87+
Timer,
88+
Scheduler,
89+
Logging,
90+
CreateRequestStatsStub(),
91+
CreateVolumeStatsStub(),
92+
Client,
93+
Config,
94+
TSessionConfig{
95+
.DiskId = DefaultDiskId,
96+
.MountToken = DefaultMountToken,
97+
.ClientVersionInfo = std::move(clientVersionInfo),
98+
.MountSeqNumber = mountSeqNumber},
99+
SessionSwitcher))
75100
{}
76101

77102
void Start()
@@ -133,6 +158,11 @@ class TBootstrap
133158
{
134159
return Session.get();
135160
}
161+
162+
TTestSessionSwitcher& GetSessionSwitcher()
163+
{
164+
return *SessionSwitcher;
165+
}
136166
};
137167

138168
////////////////////////////////////////////////////////////////////////////////
@@ -321,6 +351,9 @@ Y_UNIT_TEST_SUITE(TSessionTest)
321351
{
322352
auto res = session->MountVolume().GetValueSync();
323353
UNIT_ASSERT_C(!HasError(res), res);
354+
UNIT_ASSERT_VALUES_EQUAL(
355+
0ul,
356+
bootstrap->GetSessionSwitcher().Switches.size());
324357
}
325358

326359
{
@@ -1818,6 +1851,146 @@ Y_UNIT_TEST_SUITE(TSessionTest)
18181851

18191852
bootstrap->Stop();
18201853
}
1854+
1855+
Y_UNIT_TEST(ShouldSwitchSessionIfMountContainsPrincipalDiskId)
1856+
{
1857+
auto client = std::make_shared<TTestService>();
1858+
1859+
size_t sessionNum = 0;
1860+
client->MountVolumeHandler =
1861+
[&](std::shared_ptr<NProto::TMountVolumeRequest> request)
1862+
{
1863+
UNIT_ASSERT_EQUAL(request->GetDiskId(), DefaultDiskId);
1864+
UNIT_ASSERT_EQUAL(request->GetToken(), DefaultMountToken);
1865+
1866+
NProto::TMountVolumeResponse response;
1867+
response.SetSessionId(ToString(++sessionNum));
1868+
1869+
auto& volume = *response.MutableVolume();
1870+
volume.SetDiskId(request->GetDiskId());
1871+
volume.SetBlockSize(DefaultBlockSize);
1872+
volume.SetBlocksCount(DefaultBlocksCount);
1873+
volume.SetPrincipalDiskId(
1874+
NStorage::GetNextDiskId(request->GetDiskId()));
1875+
1876+
return MakeFuture(response);
1877+
};
1878+
1879+
client->UnmountVolumeHandler =
1880+
[](std::shared_ptr<NProto::TUnmountVolumeRequest> request)
1881+
{
1882+
UNIT_ASSERT_EQUAL(request->GetDiskId(), DefaultDiskId);
1883+
UNIT_ASSERT_EQUAL(request->GetSessionId(), "2");
1884+
1885+
return MakeFuture<NProto::TUnmountVolumeResponse>();
1886+
};
1887+
1888+
auto bootstrap = CreateBootstrap(client);
1889+
1890+
auto* session = bootstrap->GetSession();
1891+
1892+
bootstrap->Start();
1893+
1894+
auto res = session->MountVolume().GetValueSync();
1895+
UNIT_ASSERT_C(!HasError(res), res);
1896+
UNIT_ASSERT_EQUAL(sessionNum, 1);
1897+
1898+
UNIT_ASSERT_VALUES_EQUAL(
1899+
1ul,
1900+
bootstrap->GetSessionSwitcher().Switches.size());
1901+
1902+
auto switchInfo = bootstrap->GetSessionSwitcher().Switches[0];
1903+
UNIT_ASSERT_VALUES_EQUAL(DefaultDiskId, switchInfo.DiskId);
1904+
UNIT_ASSERT_VALUES_EQUAL(
1905+
NStorage::GetNextDiskId(DefaultDiskId),
1906+
switchInfo.NewDiskId);
1907+
1908+
bootstrap->Stop();
1909+
}
1910+
1911+
Y_UNIT_TEST(ShouldSwitchSessionIfResponseContains_EF_OUTDATED_VOLUME)
1912+
{
1913+
auto client = std::make_shared<TTestService>();
1914+
1915+
size_t sessionNum = 0;
1916+
client->MountVolumeHandler =
1917+
[&](std::shared_ptr<NProto::TMountVolumeRequest> request)
1918+
{
1919+
UNIT_ASSERT_EQUAL(request->GetDiskId(), DefaultDiskId);
1920+
UNIT_ASSERT_EQUAL(request->GetToken(), DefaultMountToken);
1921+
1922+
NProto::TMountVolumeResponse response;
1923+
response.SetSessionId(ToString(++sessionNum));
1924+
1925+
auto& volume = *response.MutableVolume();
1926+
volume.SetDiskId(request->GetDiskId());
1927+
volume.SetBlockSize(DefaultBlockSize);
1928+
volume.SetBlocksCount(DefaultBlocksCount);
1929+
1930+
return MakeFuture(response);
1931+
};
1932+
1933+
client->UnmountVolumeHandler =
1934+
[](std::shared_ptr<NProto::TUnmountVolumeRequest> request)
1935+
{
1936+
UNIT_ASSERT_EQUAL(request->GetDiskId(), DefaultDiskId);
1937+
UNIT_ASSERT_EQUAL(request->GetSessionId(), "2");
1938+
1939+
return MakeFuture<NProto::TUnmountVolumeResponse>();
1940+
};
1941+
1942+
client->WriteBlocksLocalHandler =
1943+
[](std::shared_ptr<NProto::TWriteBlocksLocalRequest> request)
1944+
{
1945+
UNIT_ASSERT_EQUAL(request->GetDiskId(), DefaultDiskId);
1946+
if (request->GetSessionId() == "1") {
1947+
ui32 flags = 0;
1948+
SetProtoFlag(flags, NProto::EF_OUTDATED_VOLUME);
1949+
return MakeFuture<NProto::TWriteBlocksLocalResponse>(
1950+
TErrorResponse(
1951+
E_BS_INVALID_SESSION,
1952+
"reconnect to principal disk",
1953+
flags));
1954+
}
1955+
1956+
return MakeFuture<NProto::TWriteBlocksLocalResponse>();
1957+
};
1958+
1959+
auto bootstrap = CreateBootstrap(client);
1960+
1961+
auto* session = bootstrap->GetSession();
1962+
1963+
bootstrap->Start();
1964+
1965+
{
1966+
auto res = session->MountVolume().GetValueSync();
1967+
UNIT_ASSERT_C(!HasError(res), res);
1968+
UNIT_ASSERT_EQUAL(sessionNum, 1);
1969+
}
1970+
1971+
{
1972+
auto res = WriteBlocks(session);
1973+
UNIT_ASSERT_C(!HasError(res), res);
1974+
UNIT_ASSERT_EQUAL(sessionNum, 2);
1975+
1976+
UNIT_ASSERT_VALUES_EQUAL(
1977+
1ul,
1978+
bootstrap->GetSessionSwitcher().Switches.size());
1979+
1980+
auto switchInfo = bootstrap->GetSessionSwitcher().Switches[0];
1981+
UNIT_ASSERT_VALUES_EQUAL(DefaultDiskId, switchInfo.DiskId);
1982+
UNIT_ASSERT_VALUES_EQUAL(
1983+
NStorage::GetNextDiskId(DefaultDiskId),
1984+
switchInfo.NewDiskId);
1985+
}
1986+
1987+
{
1988+
auto res = session->UnmountVolume().GetValueSync();
1989+
UNIT_ASSERT_C(!HasError(res), res);
1990+
}
1991+
1992+
bootstrap->Stop();
1993+
}
18211994
}
18221995

18231996
} // namespace NCloud::NBlockStore::NClient

cloud/blockstore/libs/client/switchable_session_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,8 @@ TTestSession CreateTestSession(const TString& diskId, const TString& sessionId)
261261
TSessionConfig{
262262
.DiskId = diskId,
263263
.MountToken = DefaultMountToken,
264-
.ClientVersionInfo = DefaultClientVersionInfo});
264+
.ClientVersionInfo = DefaultClientVersionInfo},
265+
ISessionSwitcherWeakPtr());
265266
{
266267
NProto::THeaders headers;
267268
headers.SetClientId(DefaultClientId);

0 commit comments

Comments
 (0)