|
12 | 12 | #include <cloud/blockstore/libs/encryption/encryption_key.h> |
13 | 13 | #include <cloud/blockstore/libs/service/service_test.h> |
14 | 14 | #include <cloud/blockstore/libs/service/storage_provider.h> |
| 15 | +#include <cloud/blockstore/libs/storage/model/volume_label.h> |
15 | 16 |
|
16 | 17 | #include <cloud/storage/core/libs/common/scheduler_test.h> |
17 | 18 | #include <cloud/storage/core/libs/common/thread_pool.h> |
@@ -482,6 +483,197 @@ Y_UNIT_TEST_SUITE(TSessionManagerTest) |
482 | 483 | { |
483 | 484 | ShouldDisableClientThrottler(false); |
484 | 485 | } |
| 486 | + |
| 487 | + Y_UNIT_TEST(ShouldSwitchSessionBy_EF_OUTDATED_VOLUME) |
| 488 | + { |
| 489 | + TString socketPath = "testSocket"; |
| 490 | + TString diskId = "testDiskId"; |
| 491 | + |
| 492 | + auto timer = CreateCpuCycleTimer(); |
| 493 | + auto scheduler = CreateScheduler(timer); |
| 494 | + scheduler->Start(); |
| 495 | + |
| 496 | + auto service = std::make_shared<TTestService>(); |
| 497 | + service->DescribeVolumeHandler = |
| 498 | + [&] (std::shared_ptr<NProto::TDescribeVolumeRequest> request) { |
| 499 | + auto response = NProto::TDescribeVolumeResponse(); |
| 500 | + response.MutableVolume()->SetDiskId(request->GetDiskId()); |
| 501 | + return MakeFuture(std::move(response)); |
| 502 | + }; |
| 503 | + service->MountVolumeHandler = |
| 504 | + [&] (std::shared_ptr<NProto::TMountVolumeRequest> request) { |
| 505 | + NProto::TMountVolumeResponse response; |
| 506 | + response.MutableVolume()->SetDiskId(request->GetDiskId()); |
| 507 | + response.SetInactiveClientsTimeout(100); |
| 508 | + return MakeFuture(response); |
| 509 | + }; |
| 510 | + service->UnmountVolumeHandler = |
| 511 | + [&] (std::shared_ptr<NProto::TUnmountVolumeRequest> request) { |
| 512 | + Y_UNUSED(request); |
| 513 | + return MakeFuture(NProto::TUnmountVolumeResponse()); |
| 514 | + }; |
| 515 | + |
| 516 | + // Need to respond to the request asynchronously. |
| 517 | + auto scheduleReplyVolumeOutdated = |
| 518 | + [scheduler]() -> TFuture<NProto::TReadBlocksLocalResponse> |
| 519 | + { |
| 520 | + TPromise<NProto::TReadBlocksLocalResponse> readPromise = |
| 521 | + NewPromise<NProto::TReadBlocksLocalResponse>(); |
| 522 | + TFuture<NProto::TReadBlocksLocalResponse> response = readPromise; |
| 523 | + scheduler->Schedule( |
| 524 | + nullptr, |
| 525 | + TInstant::Now(), |
| 526 | + [readPromise = std::move(readPromise)]() mutable |
| 527 | + { |
| 528 | + // We set the E_BS_INVALID_SESSION flag to give a signal to |
| 529 | + // switch the session to a new disk. |
| 530 | + ui32 flags = 0; |
| 531 | + SetProtoFlag(flags, NProto::EF_OUTDATED_VOLUME); |
| 532 | + readPromise.SetValue(TErrorResponse( |
| 533 | + E_BS_INVALID_SESSION, |
| 534 | + "Volume outdated", |
| 535 | + flags)); |
| 536 | + }); |
| 537 | + return response; |
| 538 | + }; |
| 539 | + |
| 540 | + auto executor = TExecutor::Create("TestService"); |
| 541 | + auto logging = CreateLoggingService("console"); |
| 542 | + auto encryptionClientFactory = CreateEncryptionClientFactory( |
| 543 | + logging, |
| 544 | + CreateDefaultEncryptionKeyProvider(), |
| 545 | + NProto::EZP_WRITE_ENCRYPTED_ZEROS); |
| 546 | + |
| 547 | + auto sessionManager = CreateSessionManager( |
| 548 | + CreateWallClockTimer(), |
| 549 | + scheduler, |
| 550 | + logging, |
| 551 | + CreateMonitoringServiceStub(), |
| 552 | + CreateRequestStatsStub(), |
| 553 | + CreateVolumeStatsStub(), |
| 554 | + CreateServerStatsStub(), |
| 555 | + service, |
| 556 | + CreateCellManagerStub(), |
| 557 | + CreateDefaultStorageProvider(service), |
| 558 | + encryptionClientFactory, |
| 559 | + executor, |
| 560 | + TSessionManagerOptions()); |
| 561 | + |
| 562 | + |
| 563 | + executor->Start(); |
| 564 | + Y_DEFER { |
| 565 | + executor->Stop(); |
| 566 | + }; |
| 567 | + |
| 568 | + NProto::TStartEndpointRequest request; |
| 569 | + request.SetUnixSocketPath(socketPath); |
| 570 | + request.SetDiskId(diskId); |
| 571 | + request.SetClientId("testClientId"); |
| 572 | + request.SetIpcType(NProto::IPC_VHOST); |
| 573 | + |
| 574 | + // Create session to testDiskId |
| 575 | + NClient::ISessionPtr session; |
| 576 | + { |
| 577 | + auto future = sessionManager->CreateSession( |
| 578 | + MakeIntrusive<TCallContext>(), |
| 579 | + request); |
| 580 | + |
| 581 | + auto sessionOrError = future.GetValue(TDuration::Seconds(3)); |
| 582 | + UNIT_ASSERT_C(!HasError(sessionOrError), sessionOrError.GetError()); |
| 583 | + auto sessionInfo = sessionOrError.ExtractResult(); |
| 584 | + session = sessionInfo.Session; |
| 585 | + } |
| 586 | + |
| 587 | + // Setup read responses for switching from testDiskId to |
| 588 | + // testDiskId-copy. |
| 589 | + service->ReadBlocksLocalHandler = |
| 590 | + [&](std::shared_ptr<NProto::TReadBlocksLocalRequest> request) |
| 591 | + -> TFuture<NProto::TReadBlocksLocalResponse> |
| 592 | + { |
| 593 | + if (request->GetDiskId() == diskId) { |
| 594 | + // Response with EF_OUTDATED_VOLUME will switch session. |
| 595 | + return scheduleReplyVolumeOutdated(); |
| 596 | + } |
| 597 | + |
| 598 | + // After switching the session, reading from the testDiskId-copy |
| 599 | + // disk will return S_OK. |
| 600 | + UNIT_ASSERT_VALUES_EQUAL_C( |
| 601 | + NStorage::GetNextDiskId(diskId), |
| 602 | + request->GetDiskId(), |
| 603 | + request->GetDiskId()); |
| 604 | + |
| 605 | + return MakeFuture<NProto::TReadBlocksLocalResponse>( |
| 606 | + TErrorResponse(E_ARGUMENT)); |
| 607 | + }; |
| 608 | + |
| 609 | + { |
| 610 | + // Check that the read response is E_ARGUMENT. |
| 611 | + auto future = session->ReadBlocksLocal( |
| 612 | + MakeIntrusive<TCallContext>(), |
| 613 | + std::make_shared<NProto::TReadBlocksLocalRequest>()); |
| 614 | + auto response = future.GetValue(TDuration::Seconds(1)); |
| 615 | + UNIT_ASSERT_VALUES_EQUAL_C( |
| 616 | + E_ARGUMENT, |
| 617 | + response.GetError().GetCode(), |
| 618 | + FormatError(response.GetError())); |
| 619 | + } |
| 620 | + { |
| 621 | + // Check that the reading after switching. |
| 622 | + auto future = session->ReadBlocksLocal( |
| 623 | + MakeIntrusive<TCallContext>(), |
| 624 | + std::make_shared<NProto::TReadBlocksLocalRequest>()); |
| 625 | + auto response = future.GetValue(TDuration::Seconds(1)); |
| 626 | + UNIT_ASSERT_VALUES_EQUAL_C( |
| 627 | + E_ARGUMENT, |
| 628 | + response.GetError().GetCode(), |
| 629 | + FormatError(response.GetError())); |
| 630 | + } |
| 631 | + |
| 632 | + // Setup read responses for switching from testDiskId-copy to |
| 633 | + // testDiskId. |
| 634 | + service->ReadBlocksLocalHandler = |
| 635 | + [&](std::shared_ptr<NProto::TReadBlocksLocalRequest> request) |
| 636 | + -> TFuture<NProto::TReadBlocksLocalResponse> |
| 637 | + { |
| 638 | + if (request->GetDiskId() == NStorage::GetNextDiskId(diskId)) { |
| 639 | + // Response with EF_OUTDATED_VOLUME will switch session. |
| 640 | + return scheduleReplyVolumeOutdated(); |
| 641 | + } |
| 642 | + |
| 643 | + // After switching the session, reading from the testDiskId-copy |
| 644 | + // disk will return S_OK. |
| 645 | + UNIT_ASSERT_VALUES_EQUAL_C( |
| 646 | + diskId, |
| 647 | + request->GetDiskId(), |
| 648 | + request->GetDiskId()); |
| 649 | + |
| 650 | + return MakeFuture<NProto::TReadBlocksLocalResponse>( |
| 651 | + TErrorResponse(S_OK)); |
| 652 | + }; |
| 653 | + |
| 654 | + { |
| 655 | + // We check that the reading leading to the switch is successful. |
| 656 | + auto future = session->ReadBlocksLocal( |
| 657 | + MakeIntrusive<TCallContext>(), |
| 658 | + std::make_shared<NProto::TReadBlocksLocalRequest>()); |
| 659 | + auto response = future.GetValue(TDuration::Seconds(1)); |
| 660 | + UNIT_ASSERT_VALUES_EQUAL_C( |
| 661 | + S_OK, |
| 662 | + response.GetError().GetCode(), |
| 663 | + FormatError(response.GetError())); |
| 664 | + } |
| 665 | + { |
| 666 | + // Check that the reading is successful after switching. |
| 667 | + auto future = session->ReadBlocksLocal( |
| 668 | + MakeIntrusive<TCallContext>(), |
| 669 | + std::make_shared<NProto::TReadBlocksLocalRequest>()); |
| 670 | + auto response = future.GetValue(TDuration::Seconds(1)); |
| 671 | + UNIT_ASSERT_VALUES_EQUAL_C( |
| 672 | + S_OK, |
| 673 | + response.GetError().GetCode(), |
| 674 | + FormatError(response.GetError())); |
| 675 | + } |
| 676 | + } |
485 | 677 | } |
486 | 678 |
|
487 | 679 | } // namespace NCloud::NBlockStore::NServer |
0 commit comments