Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <util/random/random.h>

#include <utility>

namespace NCloud::NBlockStore::NServer {

namespace {
Expand Down Expand Up @@ -162,9 +164,15 @@ class TChaosStorage final: public IStorage
if constexpr (IsExactlyWriteRequest<TRequest>) {
damageData = TryChaosLuck(ChaosConfig.GetDataDamageProbability());
if (damageData) {
auto& buffers = *request->MutableBlocks()->MutableBuffers();
for (auto& block: buffers) {
ui64 trash = RandomNumber<ui64>();
TGuardedSgList::TGuard guard = request->Sglist.Acquire();
if (!guard) {
return NThreading::MakeFuture<TResponse>(TErrorResponse(
E_CANCELLED,
"failed to acquire sglist in ChaosStorage"));
}
const auto& buffers = guard.Get();
for (const auto& block: buffers) {
const ui64 trash = RandomNumber<ui64>();
Y_ABORT_UNLESS(block.size() >= sizeof(trash));
memcpy(
const_cast<char*>(block.data()),
Expand Down Expand Up @@ -211,11 +219,19 @@ class TChaosStorage final: public IStorage
std::move(request));

return future.Apply(
[reply = std::move(reply)](NThreading::TFuture<TResponse> f)
[damageData,
reply = std::move(reply)](NThreading::TFuture<TResponse> f)
-> NThreading::TFuture<TResponse>
{
auto originalResponse = f.ExtractValue();
if (HasError(originalResponse)) {
if (damageData) {
NProto::TError* error = originalResponse.MutableError();
error->MutableMessage()->append(
TStringBuilder()
<< "(The data were damaged by chaos!)");
}

return NThreading::MakeFuture<TResponse>(
std::move(originalResponse));
}
Expand All @@ -234,10 +250,10 @@ class TChaosStorageProvider final: public IStorageProvider

public:
TChaosStorageProvider(
IStorageProviderPtr storageProvider,
const NProto::TChaosConfig& chaosConfig)
IStorageProviderPtr storageProvider,
NProto::TChaosConfig chaosConfig)
: StorageProvider(std::move(storageProvider))
, ChaosConfig(chaosConfig)
, ChaosConfig(std::move(chaosConfig))
{}

NThreading::TFuture<IStoragePtr> CreateStorage(
Expand Down Expand Up @@ -265,11 +281,11 @@ class TChaosStorageProvider final: public IStorageProvider

IStorageProviderPtr CreateChaosStorageProvider(
IStorageProviderPtr storageProvider,
const NProto::TChaosConfig& chaosConfig)
NProto::TChaosConfig chaosConfig)
{
return std::make_shared<TChaosStorageProvider>(
std::move(storageProvider),
chaosConfig);
std::move(chaosConfig));
}

} // namespace NCloud::NBlockStore::NServer
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ namespace NCloud::NBlockStore::NServer {

IStorageProviderPtr CreateChaosStorageProvider(
IStorageProviderPtr storageProvider,
const NProto::TChaosConfig& chaosConfig);
NProto::TChaosConfig chaosConfig);

} // namespace NCloud::NBlockStore::NServer
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Y_UNIT_TEST_SUITE(TChaosStorageProviderTest)
auto request = std::make_shared<NProto::TWriteBlocksLocalRequest>();
const TString data(8, 'a');
request->MutableBlocks()->AddBuffers(data.c_str(), data.size());
TSgList sglist = {
{request->GetBlocks().GetBuffers(0).data(),
request->GetBlocks().GetBuffers(0).size()}};
request->Sglist = TGuardedSgList(std::move(sglist));

auto response =
chaosStorage
Expand Down Expand Up @@ -246,6 +250,10 @@ Y_UNIT_TEST_SUITE(TChaosStorageProviderTest)
auto request = std::make_shared<NProto::TWriteBlocksLocalRequest>();
const TString data(8, 'a');
request->MutableBlocks()->AddBuffers(data.c_str(), data.size());
TSgList sglist = {
{request->GetBlocks().GetBuffers(0).data(),
request->GetBlocks().GetBuffers(0).size()}};
request->Sglist = TGuardedSgList(std::move(sglist));

auto response =
chaosStorage
Expand Down Expand Up @@ -334,6 +342,10 @@ Y_UNIT_TEST_SUITE(TChaosStorageProviderTest)
auto request = std::make_shared<NProto::TWriteBlocksLocalRequest>();
const TString data(8, 'a');
request->MutableBlocks()->AddBuffers(data.c_str(), data.size());
TSgList sglist = {
{request->GetBlocks().GetBuffers(0).data(),
request->GetBlocks().GetBuffers(0).size()}};
request->Sglist = TGuardedSgList(std::move(sglist));

auto response =
chaosStorage
Expand Down
125 changes: 88 additions & 37 deletions cloud/blockstore/tests/loadtest/local-data-integrity/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from enum import Enum

from cloud.blockstore.config.client_pb2 import TClientAppConfig, TClientConfig
from cloud.blockstore.config.disk_pb2 import TDiskAgentConfig
from cloud.blockstore.config.disk_pb2 import TDiskAgentConfig, TChaosConfig
from cloud.blockstore.config.server_pb2 import TServerConfig, TServerAppConfig, \
TKikimrServiceConfig, TChecksumFlags
from cloud.blockstore.config.storage_pb2 import TStorageServiceConfig
Expand Down Expand Up @@ -33,6 +33,7 @@
class ValidationMode(Enum):
DIRECT = 0
COPIED = 1
DIRECT_TO_COPIED = 2


class TestCase(object):
Expand All @@ -45,12 +46,14 @@ def __init__(
storage_media_kind,
agent_count=1,
validation_mode=ValidationMode.DIRECT,
enable_chaos=False,
use_rdma=False):
self.name = name
self.config_path = config_path
self.storage_media_kind = storage_media_kind
self.agent_count = agent_count
self.validation_mode = validation_mode
self.enable_chaos = enable_chaos
self.use_rdma = use_rdma


Expand Down Expand Up @@ -88,6 +91,14 @@ def __init__(
agent_count=2,
validation_mode=ValidationMode.COPIED,
),
TestCase(
"mirror2-chaos",
"cloud/blockstore/tests/loadtest/local-data-integrity/local-mirror2.txt",
storage_media_kind=STORAGE_MEDIA_SSD_MIRROR2,
agent_count=2,
validation_mode=ValidationMode.DIRECT_TO_COPIED,
enable_chaos=True,
),
]


Expand Down Expand Up @@ -187,43 +198,77 @@ def __check_data_integrity_counters(test_case, mon_port):
# Define validation rules for each mode
validation_rules = {
ValidationMode.DIRECT: {
'active': direct_counters,
'inactive': copied_counters,
'active_should_have_requests': True,
'active_should_have_clients': True,
'inactive_should_have_requests': False,
'inactive_should_have_clients': False
'active': {
'mode': ValidationMode.DIRECT,
'counters': direct_counters,
'should_have_requests': True,
'should_have_clients': True,
'should_have_mismatches': False,
},
'inactive': {
'mode': ValidationMode.COPIED,
'counters': copied_counters,
'should_have_requests': False,
'should_have_clients': False,
'should_have_mismatches': False,
}
},
ValidationMode.COPIED: {
'active': copied_counters,
'inactive': direct_counters,
'active_should_have_requests': False,
'active_should_have_clients': True,
'inactive_should_have_requests': False,
'inactive_should_have_clients': False
'active': {
'mode': ValidationMode.COPIED,
'counters': copied_counters,
'should_have_requests': True,
'should_have_clients': True,
'should_have_mismatches': False,
},
'inactive': {
'mode': ValidationMode.DIRECT,
'counters': direct_counters,
'should_have_requests': False,
'should_have_clients': False,
'should_have_mismatches': False,
}
},
ValidationMode.DIRECT_TO_COPIED: {
'active': {
'mode': ValidationMode.COPIED,
'counters': copied_counters,
'should_have_requests': True,
'should_have_clients': True,
'should_have_mismatches': True,
},
'inactive': {
'mode': ValidationMode.DIRECT,
'counters': direct_counters,
'should_have_requests': True,
'should_have_clients': False,
'should_have_mismatches': True,
}
}
}

rules = validation_rules[test_case.validation_mode]

# Validate active mode counters
__validate_counters(
rules['active'],
mode_name=test_case.validation_mode.name,
should_have_requests=rules['active_should_have_requests'],
should_have_clients=rules['active_should_have_clients']
rules['active']['counters'],
mode_name=rules['active']['mode'].name,
should_have_requests=rules['active']['should_have_requests'],
should_have_clients=rules['active']['should_have_clients'],
should_have_mismatches=rules['active']['should_have_mismatches']
)

# Validate inactive mode counters
__validate_counters(
rules['inactive'],
mode_name=__get_opposite_mode(test_case.validation_mode).name,
should_have_requests=rules['inactive_should_have_requests'],
should_have_clients=rules['inactive_should_have_clients']
rules['inactive']['counters'],
mode_name=rules['inactive']['mode'].name,
should_have_requests=rules['inactive']['should_have_requests'],
should_have_clients=rules['inactive']['should_have_clients'],
should_have_mismatches=rules['inactive']['should_have_mismatches']
)


def __validate_counters(counters, mode_name, should_have_requests, should_have_clients):
def __validate_counters(counters, mode_name, should_have_requests, should_have_clients, should_have_mismatches):
read_requests = counters["read_local_requests"]
write_requests = counters["write_local_requests"]
read_mismatches = counters["read_checksum_mismatch"]
Expand All @@ -232,7 +277,7 @@ def __validate_counters(counters, mode_name, should_have_requests, should_have_c

# Validate request counts
if should_have_requests:
if read_requests == 0 or write_requests == 0:
if read_requests == 0 and write_requests == 0:
raise Exception(
f"{mode_name} mode should have requests: "
f"read_requests={read_requests}, write_requests={write_requests}"
Expand All @@ -244,12 +289,19 @@ def __validate_counters(counters, mode_name, should_have_requests, should_have_c
f"read_requests={read_requests}, write_requests={write_requests}"
)

# Validate checksum mismatches (should always be 0)
if read_mismatches != 0 or write_mismatches != 0:
raise Exception(
f"Checksum mismatches in {mode_name} mode: "
f"read_mismatches={read_mismatches}, write_mismatches={write_mismatches}"
)
# Validate mismatch counts
if should_have_mismatches:
if read_mismatches == 0 and write_mismatches == 0:
raise Exception(
f"{mode_name} mode should have mismatches: "
f"read_mismatches={read_mismatches}, write_mismatches={write_mismatches}"
)
else:
if read_mismatches != 0 or write_mismatches != 0:
raise Exception(
f"{mode_name} mode should not have mismatches: "
f"read_mismatches={read_mismatches}, write_mismatches={write_mismatches}"
)

# Validate client counts
if should_have_clients:
Expand All @@ -260,11 +312,6 @@ def __validate_counters(counters, mode_name, should_have_requests, should_have_c
raise Exception(f"{mode_name} mode shouldn't have {clients} clients")


def __get_opposite_mode(validation_mode):
"""Returns the opposite validation mode."""
return ValidationMode.COPIED if validation_mode == ValidationMode.DIRECT else ValidationMode.DIRECT


def __run_test(test_case):
kikimr_binary_path = yatest_common.binary_path("contrib/ydb/apps/ydbd/ydbd")

Expand Down Expand Up @@ -308,12 +355,17 @@ def __run_test(test_case):

try:
if test_case.agent_count > 0:
chaos_config = TChaosConfig()
if test_case.enable_chaos:
chaos_config.ChaosProbability = 0.2
chaos_config.DataDamageProbability = 0.5
setup_nonreplicated(
kikimr_cluster.client,
devices_per_agent,
disk_agent_config_patch=TDiskAgentConfig(
DedicatedDiskAgent=True,
EnableDataIntegrityValidationForDrBasedDisks=True),
EnableDataIntegrityValidationForDrBasedDisks=True,
ChaosConfig=chaos_config),
agent_count=test_case.agent_count,
)

Expand Down Expand Up @@ -425,8 +477,7 @@ def __run_test(test_case):
env_processes=disk_agents + [nbs],
)

if ret == 0:
__check_data_integrity_counters(test_case, nbs.mon_port)
__check_data_integrity_counters(test_case, nbs.mon_port)

finally:
for disk_agent in disk_agents:
Expand Down
Loading