Skip to content

Commit 86e8e6e

Browse files
authored
Added BeforeSwitching() and AfterSwitching() for ISwitchableBlockStore (#4540)
Сontinuing the #2999
1 parent a21dda1 commit 86e8e6e

File tree

4 files changed

+488
-58
lines changed

4 files changed

+488
-58
lines changed

cloud/blockstore/libs/client/switchable_client.cpp

Lines changed: 238 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <cloud/storage/core/libs/diagnostics/logging.h>
77

88
#include <util/generic/vector.h>
9+
#include <util/system/spinlock.h>
910

1011
#include <utility>
1112

@@ -17,35 +18,103 @@ namespace {
1718

1819
////////////////////////////////////////////////////////////////////////////////
1920

20-
template <typename T>
21-
concept HasSetDiskId = requires(T& obj) {
22-
{
23-
obj.SetDiskId(TString())
24-
} -> std::same_as<void>;
21+
// The adapter helps to execute the request via the generalized overloaded
22+
// Execute() method.
23+
class TBlockStoreRequestAdapter
24+
{
25+
public:
26+
#define BLOCKSTORE_DECLARE_METHOD(name, ...) \
27+
static NThreading::TFuture<NProto::T##name##Response> Execute( \
28+
IBlockStore* blockstore, \
29+
TCallContextPtr callContext, \
30+
std::shared_ptr<NProto::T##name##Request> request) \
31+
{ \
32+
return blockstore->name(std::move(callContext), std::move(request)); \
33+
} \
34+
// BLOCKSTORE_DECLARE_METHOD
35+
36+
BLOCKSTORE_SERVICE(BLOCKSTORE_DECLARE_METHOD)
37+
38+
#undef BLOCKSTORE_DECLARE_METHOD
39+
}; // namespace
40+
41+
////////////////////////////////////////////////////////////////////////////////
42+
struct TClientInfo
43+
{
44+
IBlockStorePtr Client;
45+
TString DiskId;
46+
TString SessionId;
2547
};
2648

27-
template <typename T>
28-
void SetDiskIdIfExists(T& obj, const TString& diskId)
49+
template <typename TRequest, typename TResponse>
50+
class TDeferredRequestsHolder
2951
{
30-
if constexpr (HasSetDiskId<T>) {
31-
obj.SetDiskId(diskId);
52+
private:
53+
struct TRequestInfo
54+
{
55+
TPromise<TResponse> Promise;
56+
TCallContextPtr CallContext;
57+
std::shared_ptr<TRequest> Request;
58+
};
59+
60+
TVector<TRequestInfo> Requests;
61+
62+
public:
63+
TFuture<TResponse> SaveRequest(
64+
TCallContextPtr callContext,
65+
std::shared_ptr<TRequest> request)
66+
{
67+
Requests.emplace_back(
68+
TRequestInfo{
69+
.Promise = NewPromise<TResponse>(),
70+
.CallContext = std::move(callContext),
71+
.Request = std::move(request)});
72+
return Requests.back().Promise;
3273
}
33-
}
3474

35-
template <typename T>
36-
concept HasSetSessionId = requires(T& obj) {
75+
void ExecuteSavedRequests(const TClientInfo& clientInfo)
3776
{
38-
obj.SetSessionId(TString())
39-
} -> std::same_as<void>;
40-
};
77+
Y_ABORT_UNLESS(clientInfo.Client);
4178

42-
template <typename T>
43-
void SetSessionIdIfExists(T& obj, const TString& sessionId)
44-
{
45-
if constexpr (HasSetSessionId<T>) {
46-
obj.SetSessionId(sessionId);
79+
for (auto& requestInfo: Requests) {
80+
if (clientInfo.DiskId) {
81+
requestInfo.Request->SetDiskId(clientInfo.DiskId);
82+
}
83+
if (clientInfo.SessionId) {
84+
requestInfo.Request->SetSessionId(clientInfo.SessionId);
85+
}
86+
87+
auto future = TBlockStoreRequestAdapter::Execute(
88+
clientInfo.Client.get(),
89+
std::move(requestInfo.CallContext),
90+
std::move(requestInfo.Request));
91+
future.Subscribe(
92+
[promise = std::move(requestInfo.Promise)](
93+
TFuture<TResponse> f) mutable
94+
{
95+
promise.SetValue(f.ExtractValue()); //
96+
});
97+
}
98+
Requests.clear();
4799
}
48-
}
100+
};
101+
102+
using TDeferredRequestsHolders = std::tuple<
103+
TDeferredRequestsHolder<
104+
NProto::TReadBlocksRequest,
105+
NProto::TReadBlocksResponse>,
106+
TDeferredRequestsHolder<
107+
NProto::TReadBlocksLocalRequest,
108+
NProto::TReadBlocksLocalResponse>,
109+
TDeferredRequestsHolder<
110+
NProto::TWriteBlocksRequest,
111+
NProto::TWriteBlocksResponse>,
112+
TDeferredRequestsHolder<
113+
NProto::TWriteBlocksLocalRequest,
114+
NProto::TWriteBlocksLocalResponse>,
115+
TDeferredRequestsHolder<
116+
NProto::TZeroBlocksRequest,
117+
NProto::TZeroBlocksResponse>>;
49118

50119
////////////////////////////////////////////////////////////////////////////////
51120

@@ -56,17 +125,23 @@ class TSwitchableBlockStore final
56125
private:
57126
TLog Log;
58127

59-
struct TClientInfo
60-
{
61-
IBlockStorePtr Client;
62-
TString DiskId;
63-
TString SessionId;
64-
};
65-
66128
TClientInfo PrimaryClientInfo;
67129
TClientInfo SecondaryClientInfo;
130+
131+
// BeforeSwitching() sets the WillSwitchToSecondary to true. After that,
132+
// all data-plane requests are saved and not sent for execution. Calling
133+
// AfterSwitching() sets WillSwitchToSecondary to false and sends all saved
134+
// requests for execution.
135+
std::atomic_bool WillSwitchToSecondary{false};
136+
137+
// Switch() sets the SwitchedToSecondary to true. After that, all data-plane
138+
// requests executed with client from SecondaryClientInfo.
139+
// Reverse switching is not possible.
68140
std::atomic_bool SwitchedToSecondary{false};
69141

142+
TAdaptiveLock DifferedRequestsLock;
143+
TDeferredRequestsHolders DeferredRequests;
144+
70145
public:
71146
TSwitchableBlockStore(
72147
ILoggingServicePtr logging,
@@ -79,22 +154,61 @@ class TSwitchableBlockStore final
79154
.SessionId = {}})
80155
{}
81156

157+
void BeforeSwitching() override
158+
{
159+
Y_ABORT_UNLESS(!WillSwitchToSecondary);
160+
STORAGE_INFO("Will switch from " << PrimaryClientInfo.DiskId.Quote());
161+
WillSwitchToSecondary = true;
162+
}
163+
82164
void Switch(
83165
IBlockStorePtr newClient,
84166
const TString& newDiskId,
85167
const TString& newSessionId) override
86168
{
169+
Y_ABORT_UNLESS(WillSwitchToSecondary);
87170
Y_ABORT_UNLESS(!SwitchedToSecondary);
88171

172+
STORAGE_INFO(
173+
"Switched from " << PrimaryClientInfo.DiskId.Quote() << " to "
174+
<< newDiskId.Quote());
175+
89176
SecondaryClientInfo = {
90177
.Client = std::move(newClient),
91178
.DiskId = newDiskId,
92179
.SessionId = newSessionId};
93180
SwitchedToSecondary = true;
181+
}
94182

95-
STORAGE_INFO(
96-
"Switched from " << PrimaryClientInfo.DiskId.Quote() << " to "
97-
<< newDiskId.Quote());
183+
void AfterSwitching() override
184+
{
185+
Y_ABORT_UNLESS(WillSwitchToSecondary);
186+
187+
with_lock (DifferedRequestsLock) {
188+
WillSwitchToSecondary = false;
189+
190+
if (SwitchedToSecondary) {
191+
STORAGE_INFO(
192+
"Switching from "
193+
<< PrimaryClientInfo.DiskId.Quote() << " to "
194+
<< SecondaryClientInfo.DiskId.Quote() << " is completed");
195+
} else {
196+
STORAGE_INFO(
197+
"Switching from " << PrimaryClientInfo.DiskId.Quote()
198+
<< " is interrupted");
199+
}
200+
201+
const TClientInfo& currentClientInfo =
202+
SwitchedToSecondary ? SecondaryClientInfo : PrimaryClientInfo;
203+
204+
std::apply(
205+
[currentClientInfo](auto&... deferredRequests)
206+
{
207+
(deferredRequests.ExecuteSavedRequests(currentClientInfo),
208+
...);
209+
},
210+
DeferredRequests);
211+
}
98212
}
99213

100214
void Start() override
@@ -112,40 +226,103 @@ class TSwitchableBlockStore final
112226
return PrimaryClientInfo.Client->AllocateBuffer(bytesCount);
113227
}
114228

115-
#define BLOCKSTORE_IMPLEMENT_METHOD(name, ...) \
116-
TFuture<NProto::T##name##Response> name( \
117-
TCallContextPtr callContext, \
118-
std::shared_ptr<NProto::T##name##Request> request) override \
119-
{ \
120-
constexpr bool isSwitchableRequest = IsReadWriteRequest( \
121-
GetBlockStoreRequest<NProto::T##name##Request>()); \
122-
if constexpr (isSwitchableRequest) { \
123-
if (SwitchedToSecondary) { \
124-
STORAGE_TRACE( \
125-
"Forward " << #name << " from " \
126-
<< PrimaryClientInfo.DiskId.Quote() << " to " \
127-
<< SecondaryClientInfo.DiskId.Quote()); \
128-
SetDiskIdIfExists(*request, SecondaryClientInfo.DiskId); \
129-
SetSessionIdIfExists(*request, SecondaryClientInfo.SessionId); \
130-
return SecondaryClientInfo.Client->name( \
131-
std::move(callContext), \
132-
std::move(request)); \
133-
} \
134-
} \
135-
return PrimaryClientInfo.Client->name( \
136-
std::move(callContext), \
137-
std::move(request)); \
229+
#define BLOCKSTORE_IMPLEMENT_METHOD(name, ...) \
230+
TFuture<NProto::T##name##Response> name( \
231+
TCallContextPtr callContext, \
232+
std::shared_ptr<NProto::T##name##Request> request) override \
233+
{ \
234+
constexpr bool isReadWriteRequest = IsReadWriteRequest( \
235+
GetBlockStoreRequest<NProto::T##name##Request>()); \
236+
if constexpr (isReadWriteRequest) { \
237+
return ExecuteReadWriteRequest<NProto::T##name##Response>( \
238+
std::move(callContext), \
239+
std::move(request)); \
240+
} \
241+
return PrimaryClientInfo.Client->name( \
242+
std::move(callContext), \
243+
std::move(request)); \
138244
}
139245

140246
BLOCKSTORE_SERVICE(BLOCKSTORE_IMPLEMENT_METHOD)
141247

142248
#undef BLOCKSTORE_IMPLEMENT_METHOD
249+
250+
private:
251+
template <typename TResponse, typename TRequest>
252+
TFuture<TResponse> ExecuteReadWriteRequest(
253+
TCallContextPtr callContext,
254+
std::shared_ptr<TRequest> request)
255+
{
256+
if (SwitchedToSecondary) {
257+
STORAGE_TRACE(
258+
"Forward " << typeid(TRequest).name() << " from "
259+
<< PrimaryClientInfo.DiskId.Quote() << " to "
260+
<< SecondaryClientInfo.DiskId.Quote());
261+
262+
request->SetDiskId(SecondaryClientInfo.DiskId);
263+
request->SetSessionId(SecondaryClientInfo.SessionId);
264+
265+
return TBlockStoreRequestAdapter::Execute(
266+
SecondaryClientInfo.Client.get(),
267+
std::move(callContext),
268+
std::move(request));
269+
}
270+
271+
if (WillSwitchToSecondary) {
272+
with_lock (DifferedRequestsLock) {
273+
// A double check is necessary to avoid a race when a switch is
274+
// cancelled.
275+
if (WillSwitchToSecondary) {
276+
STORAGE_TRACE(
277+
"Save " << typeid(TRequest).name() << " from "
278+
<< PrimaryClientInfo.DiskId.Quote());
279+
280+
return std::get<
281+
TDeferredRequestsHolder<TRequest, TResponse>>(
282+
DeferredRequests)
283+
.SaveRequest(
284+
std::move(callContext),
285+
std::move(request));
286+
}
287+
}
288+
return TBlockStoreRequestAdapter::Execute(
289+
SwitchedToSecondary ? SecondaryClientInfo.Client.get()
290+
: PrimaryClientInfo.Client.get(),
291+
std::move(callContext),
292+
std::move(request));
293+
}
294+
295+
return TBlockStoreRequestAdapter::Execute(
296+
PrimaryClientInfo.Client.get(),
297+
std::move(callContext),
298+
std::move(request));
299+
}
143300
};
144301

145302
} // namespace
146303

147304
////////////////////////////////////////////////////////////////////////////////
148305

306+
class TSessionSwitchingGuard
307+
{
308+
ISwitchableBlockStorePtr SwitchableDataClient;
309+
310+
public:
311+
explicit TSessionSwitchingGuard(
312+
ISwitchableBlockStorePtr switchableDataClient)
313+
: SwitchableDataClient(std::move(switchableDataClient))
314+
{
315+
SwitchableDataClient->BeforeSwitching();
316+
}
317+
318+
~TSessionSwitchingGuard()
319+
{
320+
SwitchableDataClient->AfterSwitching();
321+
}
322+
};
323+
324+
////////////////////////////////////////////////////////////////////////////////
325+
149326
ISwitchableBlockStorePtr CreateSwitchableClient(
150327
ILoggingServicePtr logging,
151328
TString diskId,
@@ -157,4 +334,11 @@ ISwitchableBlockStorePtr CreateSwitchableClient(
157334
std::move(client));
158335
}
159336

337+
TSessionSwitchingGuardPtr CreateSessionSwitchingGuard(
338+
ISwitchableBlockStorePtr switchableDataClient)
339+
{
340+
return std::make_shared<TSessionSwitchingGuard>(
341+
std::move(switchableDataClient));
342+
}
343+
160344
} // namespace NCloud::NBlockStore

0 commit comments

Comments
 (0)