Skip to content

Commit 080b5b0

Browse files
committed
test
1 parent 658b347 commit 080b5b0

File tree

4 files changed

+365
-4
lines changed

4 files changed

+365
-4
lines changed

cloud/blockstore/libs/endpoints/session_manager_ut.cpp

Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <cloud/blockstore/libs/encryption/encryption_key.h>
1313
#include <cloud/blockstore/libs/service/service_test.h>
1414
#include <cloud/blockstore/libs/service/storage_provider.h>
15+
#include <cloud/blockstore/libs/storage/model/volume_label.h>
1516

1617
#include <cloud/storage/core/libs/common/scheduler_test.h>
1718
#include <cloud/storage/core/libs/common/thread_pool.h>
@@ -482,6 +483,364 @@ Y_UNIT_TEST_SUITE(TSessionManagerTest)
482483
{
483484
ShouldDisableClientThrottler(false);
484485
}
486+
487+
Y_UNIT_TEST(ShouldSwitchSessionBy_EF_OUTDATED_VOLUME)
488+
{
489+
TString socketPath = "testSocket";
490+
TString diskId = "testDiskId";
491+
492+
auto timer = CreateCpuCycleTimer();
493+
auto scheduler = CreateScheduler(timer);
494+
scheduler->Start();
495+
496+
auto service = std::make_shared<TTestService>();
497+
service->DescribeVolumeHandler =
498+
[&] (std::shared_ptr<NProto::TDescribeVolumeRequest> request) {
499+
auto response = NProto::TDescribeVolumeResponse();
500+
response.MutableVolume()->SetDiskId(request->GetDiskId());
501+
return MakeFuture(std::move(response));
502+
};
503+
service->MountVolumeHandler =
504+
[&] (std::shared_ptr<NProto::TMountVolumeRequest> request) {
505+
NProto::TMountVolumeResponse response;
506+
response.MutableVolume()->SetDiskId(request->GetDiskId());
507+
response.SetInactiveClientsTimeout(100);
508+
return MakeFuture(response);
509+
};
510+
service->UnmountVolumeHandler =
511+
[&] (std::shared_ptr<NProto::TUnmountVolumeRequest> request) {
512+
Y_UNUSED(request);
513+
return MakeFuture(NProto::TUnmountVolumeResponse());
514+
};
515+
516+
// Need to respond to the request asynchronously.
517+
auto scheduleReplyVolumeOutdated =
518+
[scheduler]() -> TFuture<NProto::TReadBlocksLocalResponse>
519+
{
520+
TPromise<NProto::TReadBlocksLocalResponse> readPromise =
521+
NewPromise<NProto::TReadBlocksLocalResponse>();
522+
TFuture<NProto::TReadBlocksLocalResponse> response = readPromise;
523+
scheduler->Schedule(
524+
nullptr,
525+
TInstant::Now(),
526+
[readPromise = std::move(readPromise)]() mutable
527+
{
528+
// We set the E_BS_INVALID_SESSION flag to give a signal to
529+
// switch the session to a new disk.
530+
ui32 flags = 0;
531+
SetProtoFlag(flags, NProto::EF_OUTDATED_VOLUME);
532+
readPromise.SetValue(TErrorResponse(
533+
E_BS_INVALID_SESSION,
534+
"Volume outdated",
535+
flags));
536+
});
537+
return response;
538+
};
539+
540+
auto executor = TExecutor::Create("TestService");
541+
auto logging = CreateLoggingService("console");
542+
auto encryptionClientFactory = CreateEncryptionClientFactory(
543+
logging,
544+
CreateDefaultEncryptionKeyProvider(),
545+
NProto::EZP_WRITE_ENCRYPTED_ZEROS);
546+
547+
auto sessionManager = CreateSessionManager(
548+
CreateWallClockTimer(),
549+
scheduler,
550+
logging,
551+
CreateMonitoringServiceStub(),
552+
CreateRequestStatsStub(),
553+
CreateVolumeStatsStub(),
554+
CreateServerStatsStub(),
555+
service,
556+
CreateCellManagerStub(),
557+
CreateDefaultStorageProvider(service),
558+
encryptionClientFactory,
559+
executor,
560+
TSessionManagerOptions());
561+
562+
563+
executor->Start();
564+
Y_DEFER {
565+
executor->Stop();
566+
};
567+
568+
NProto::TStartEndpointRequest request;
569+
request.SetUnixSocketPath(socketPath);
570+
request.SetDiskId(diskId);
571+
request.SetClientId("testClientId");
572+
request.SetIpcType(NProto::IPC_VHOST);
573+
574+
// Create session to testDiskId
575+
NClient::ISessionPtr session;
576+
{
577+
auto future = sessionManager->CreateSession(
578+
MakeIntrusive<TCallContext>(),
579+
request);
580+
581+
auto sessionOrError = future.GetValue(TDuration::Seconds(3));
582+
UNIT_ASSERT_C(!HasError(sessionOrError), sessionOrError.GetError());
583+
auto sessionInfo = sessionOrError.ExtractResult();
584+
session = sessionInfo.Session;
585+
}
586+
587+
// Setup read responses for switching from testDiskId to
588+
// testDiskId-copy.
589+
service->ReadBlocksLocalHandler =
590+
[&](std::shared_ptr<NProto::TReadBlocksLocalRequest> request)
591+
-> TFuture<NProto::TReadBlocksLocalResponse>
592+
{
593+
if (request->GetDiskId() == diskId) {
594+
// Response with EF_OUTDATED_VOLUME will switch session.
595+
return scheduleReplyVolumeOutdated();
596+
}
597+
598+
// After switching the session, reading from the testDiskId-copy
599+
// disk will return S_OK.
600+
UNIT_ASSERT_VALUES_EQUAL_C(
601+
NStorage::GetNextDiskId(diskId),
602+
request->GetDiskId(),
603+
request->GetDiskId());
604+
605+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
606+
TErrorResponse(E_ARGUMENT));
607+
};
608+
609+
{
610+
// Check that the read response is E_ARGUMENT.
611+
auto future = session->ReadBlocksLocal(
612+
MakeIntrusive<TCallContext>(),
613+
std::make_shared<NProto::TReadBlocksLocalRequest>());
614+
auto response = future.GetValue(TDuration::Seconds(1));
615+
UNIT_ASSERT_VALUES_EQUAL_C(
616+
E_ARGUMENT,
617+
response.GetError().GetCode(),
618+
FormatError(response.GetError()));
619+
}
620+
{
621+
// Check that the reading after switching.
622+
auto future = session->ReadBlocksLocal(
623+
MakeIntrusive<TCallContext>(),
624+
std::make_shared<NProto::TReadBlocksLocalRequest>());
625+
auto response = future.GetValue(TDuration::Seconds(1));
626+
UNIT_ASSERT_VALUES_EQUAL_C(
627+
E_ARGUMENT,
628+
response.GetError().GetCode(),
629+
FormatError(response.GetError()));
630+
}
631+
632+
// Setup read responses for switching from testDiskId-copy to
633+
// testDiskId.
634+
service->ReadBlocksLocalHandler =
635+
[&](std::shared_ptr<NProto::TReadBlocksLocalRequest> request)
636+
-> TFuture<NProto::TReadBlocksLocalResponse>
637+
{
638+
if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) {
639+
// Response with EF_OUTDATED_VOLUME will switch session.
640+
return scheduleReplyVolumeOutdated();
641+
}
642+
643+
// After switching the session, reading from the testDiskId-copy
644+
// disk will return S_OK.
645+
UNIT_ASSERT_VALUES_EQUAL_C(
646+
diskId,
647+
request->GetDiskId(),
648+
request->GetDiskId());
649+
650+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
651+
TErrorResponse(S_OK));
652+
};
653+
654+
{
655+
// We check that the reading leading to the switch is successful.
656+
auto future = session->ReadBlocksLocal(
657+
MakeIntrusive<TCallContext>(),
658+
std::make_shared<NProto::TReadBlocksLocalRequest>());
659+
auto response = future.GetValue(TDuration::Seconds(1));
660+
UNIT_ASSERT_VALUES_EQUAL_C(
661+
S_OK,
662+
response.GetError().GetCode(),
663+
FormatError(response.GetError()));
664+
}
665+
{
666+
// Check that the reading is successful after switching.
667+
auto future = session->ReadBlocksLocal(
668+
MakeIntrusive<TCallContext>(),
669+
std::make_shared<NProto::TReadBlocksLocalRequest>());
670+
auto response = future.GetValue(TDuration::Seconds(1));
671+
UNIT_ASSERT_VALUES_EQUAL_C(
672+
S_OK,
673+
response.GetError().GetCode(),
674+
FormatError(response.GetError()));
675+
}
676+
}
677+
678+
Y_UNIT_TEST(ShouldSwitchSessionByMountResponse)
679+
{
680+
TString socketPath = "testSocket";
681+
TString diskId = "testDiskId";
682+
683+
auto timer = CreateCpuCycleTimer();
684+
auto scheduler = CreateScheduler(timer);
685+
scheduler->Start();
686+
687+
auto service = std::make_shared<TTestService>();
688+
service->DescribeVolumeHandler =
689+
[&] (std::shared_ptr<NProto::TDescribeVolumeRequest> request) {
690+
auto response = NProto::TDescribeVolumeResponse();
691+
response.MutableVolume()->SetDiskId(request->GetDiskId());
692+
return MakeFuture(std::move(response));
693+
};
694+
// Setting up the handler for the mount request, which will add the
695+
// PrincipalDiskId when mounting testDiskId, which will lead to switch
696+
// session.
697+
service->MountVolumeHandler =
698+
[&] (std::shared_ptr<NProto::TMountVolumeRequest> request) {
699+
NProto::TMountVolumeResponse response;
700+
response.MutableVolume()->SetDiskId(request->GetDiskId());
701+
if (request->GetDiskId() == diskId) {
702+
// Response with filled PrincipalDiskId will switch session.
703+
response.MutableVolume()->SetPrincipalDiskId(
704+
NStorage::GetNextDiskId(diskId));
705+
}
706+
response.SetInactiveClientsTimeout(100);
707+
return MakeFuture(response);
708+
};
709+
service->UnmountVolumeHandler =
710+
[&] (std::shared_ptr<NProto::TUnmountVolumeRequest> request) {
711+
Y_UNUSED(request);
712+
return MakeFuture(NProto::TUnmountVolumeResponse());
713+
};
714+
715+
// Setting up the handler to respond E_REJECTED if the request handled
716+
// by testDiskId and S_OK if the request handled by disk testDiskId-copy
717+
service->ReadBlocksLocalHandler =
718+
[&](std::shared_ptr<NProto::TReadBlocksLocalRequest> request)
719+
-> TFuture<NProto::TReadBlocksLocalResponse>
720+
{
721+
if (request->GetDiskId() == diskId) {
722+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
723+
TErrorResponse(E_REJECTED));
724+
}
725+
if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) {
726+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
727+
TErrorResponse(S_OK));
728+
}
729+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
730+
TErrorResponse(E_ARGUMENT, "Unexpected disk id"));
731+
};
732+
733+
auto executor = TExecutor::Create("TestService");
734+
auto logging = CreateLoggingService("console");
735+
auto encryptionClientFactory = CreateEncryptionClientFactory(
736+
logging,
737+
CreateDefaultEncryptionKeyProvider(),
738+
NProto::EZP_WRITE_ENCRYPTED_ZEROS);
739+
740+
auto sessionManager = CreateSessionManager(
741+
CreateWallClockTimer(),
742+
scheduler,
743+
logging,
744+
CreateMonitoringServiceStub(),
745+
CreateRequestStatsStub(),
746+
CreateVolumeStatsStub(),
747+
CreateServerStatsStub(),
748+
service,
749+
CreateCellManagerStub(),
750+
CreateDefaultStorageProvider(service),
751+
encryptionClientFactory,
752+
executor,
753+
TSessionManagerOptions());
754+
755+
756+
executor->Start();
757+
Y_DEFER {
758+
executor->Stop();
759+
};
760+
761+
NProto::TStartEndpointRequest request;
762+
request.SetUnixSocketPath(socketPath);
763+
request.SetDiskId(diskId);
764+
request.SetClientId("testClientId");
765+
request.SetIpcType(NProto::IPC_VHOST);
766+
767+
// Create session to testDiskId
768+
NClient::ISessionPtr session;
769+
{
770+
auto future = sessionManager->CreateSession(
771+
MakeIntrusive<TCallContext>(),
772+
request);
773+
774+
auto sessionOrError = future.GetValue(TDuration::Seconds(3));
775+
UNIT_ASSERT_C(!HasError(sessionOrError), sessionOrError.GetError());
776+
auto sessionInfo = sessionOrError.ExtractResult();
777+
session = sessionInfo.Session;
778+
}
779+
780+
// Switch from testDiskId to testDiskId-copy.
781+
782+
{
783+
// Make sure that the read response processed by testDiskId-copy
784+
// that responds with S_OK.
785+
auto future = session->ReadBlocksLocal(
786+
MakeIntrusive<TCallContext>(),
787+
std::make_shared<NProto::TReadBlocksLocalRequest>());
788+
auto response = future.GetValue(TDuration::Seconds(1));
789+
UNIT_ASSERT_VALUES_EQUAL_C(
790+
S_OK,
791+
response.GetError().GetCode(),
792+
FormatError(response.GetError()));
793+
}
794+
795+
// Switch from testDiskId-copy to testDiskId.
796+
797+
// Setting up the handler for the mount request, which will add the
798+
// PrincipalDiskId when mounting testDiskId-copy, which will lead to
799+
// switch session back to testDiskId.
800+
service->MountVolumeHandler =
801+
[&](std::shared_ptr<NProto::TMountVolumeRequest> request)
802+
{
803+
NProto::TMountVolumeResponse response;
804+
response.MutableVolume()->SetDiskId(request->GetDiskId());
805+
if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) {
806+
// Response with filled PrincipalDiskId will switch session.
807+
response.MutableVolume()->SetPrincipalDiskId(diskId);
808+
}
809+
response.SetInactiveClientsTimeout(100);
810+
return MakeFuture(response);
811+
};
812+
813+
// Setting up the handler to respond E_REJECTED if the request handled
814+
// by testDiskId and S_OK if the request handled by disk testDiskId-copy
815+
service->ReadBlocksLocalHandler =
816+
[&](std::shared_ptr<NProto::TReadBlocksLocalRequest> request)
817+
-> TFuture<NProto::TReadBlocksLocalResponse>
818+
{
819+
if (request->GetDiskId() == diskId) {
820+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
821+
TErrorResponse(S_OK));
822+
}
823+
if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) {
824+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
825+
TErrorResponse(E_REJECTED));
826+
}
827+
return MakeFuture<NProto::TReadBlocksLocalResponse>(
828+
TErrorResponse(E_ARGUMENT, "Unexpected disk id"));
829+
};
830+
831+
{
832+
// Make sure that the read response processed by testDiskId-copy
833+
// that responds with S_OK.
834+
auto future = session->ReadBlocksLocal(
835+
MakeIntrusive<TCallContext>(),
836+
std::make_shared<NProto::TReadBlocksLocalRequest>());
837+
auto response = future.GetValue(TDuration::Seconds(1));
838+
UNIT_ASSERT_VALUES_EQUAL_C(
839+
S_OK,
840+
response.GetError().GetCode(),
841+
FormatError(response.GetError()));
842+
}
843+
}
485844
}
486845

487846
} // namespace NCloud::NBlockStore::NServer

cloud/blockstore/tools/testing/loadtest/lib/compare_data_action_runner.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ int TCompareDataActionRunner::Run(
7777
VolumeStats,
7878
TestContext.Client,
7979
ClientConfig,
80-
sessionConfig
81-
);
80+
sessionConfig,
81+
NClient::ISessionSwitcherWeakPtr());
8282
STORAGE_INFO("Mount volume: " << volumeName);
8383
auto response = WaitForCompletion(
8484
"MountVolume",

cloud/blockstore/tools/testing/loadtest/lib/load_test_runner.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,8 @@ void TLoadTestRunner::SetupTest(
333333
VolumeStats,
334334
TestContext.DataClient,
335335
ClientConfig,
336-
sessionConfig);
336+
sessionConfig,
337+
ISessionSwitcherWeakPtr());
337338

338339
STORAGE_INFO("Mount volume: " << volumeName);
339340
auto response = WaitForCompletion(

0 commit comments

Comments
 (0)