diff --git a/cloud/storage/core/libs/throttling/tablet_throttler.cpp b/cloud/storage/core/libs/throttling/tablet_throttler.cpp index 94f28f3d4cf..ed1d7bd403e 100644 --- a/cloud/storage/core/libs/throttling/tablet_throttler.cpp +++ b/cloud/storage/core/libs/throttling/tablet_throttler.cpp @@ -62,6 +62,10 @@ class TTabletThrottler final void OnShutDown(const NActors::TActorContext&) override { + if (PostponedQueueFlushInProgress) { + // We are already in the process of flushing postponed requests + return; + } PostponedQueueFlushScheduled = false; while (PostponedRequests.size()) { @@ -71,6 +75,8 @@ class TTabletThrottler final TAutoPtr ev = x.Event.release(); Owner.Receive(ev); + // When shutting down, we do not expect that the actor will try to + // schedule flushing again Y_ABORT_UNLESS(!PostponedQueueFlushScheduled); PostponedRequests.pop_front(); } diff --git a/cloud/storage/core/libs/throttling/tablet_throttler_ut.cpp b/cloud/storage/core/libs/throttling/tablet_throttler_ut.cpp new file mode 100644 index 00000000000..9bfc4550592 --- /dev/null +++ b/cloud/storage/core/libs/throttling/tablet_throttler_ut.cpp @@ -0,0 +1,200 @@ +#include "tablet_throttler.h" + +#include "tablet_throttler_logger.h" +#include "tablet_throttler_policy.h" + +#include +#include + +#include +#include +#include + +#include + +namespace NCloud { + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +using namespace NActors; + +class TSampleActorWithThrottler final: public TActor +{ +public: + TSampleActorWithThrottler() + : TActor(&TThis::StateWork) + {} + + void ResetThrottler(ITabletThrottlerPtr throttler) + { + Throttler = std::move(throttler); + } + + STRICT_STFUNC( + StateWork, HFunc(NActors::TEvents::TEvWakeup, HandleWakeUp); + HFunc(NActors::TEvents::TEvFlushLog, HandleFlush)); + + STRICT_STFUNC( + StateZombie, HFunc(NActors::TEvents::TEvWakeup, RejectRequest);) + + void HandleWakeUp( + const NActors::TEvents::TEvWakeup::TPtr& ev, + const NActors::TActorContext& ctx) + { + if (RequestsCount++ == 0) { + // The first request should be postponed + + auto callContext = + MakeIntrusive(static_cast(0)); + auto requestInfo = TThrottlingRequestInfo{}; + + Throttler->Throttle( + ctx, + callContext, + requestInfo, + [ev]() -> NActors::IEventHandlePtr + { return NActors::IEventHandlePtr(ev.Release()); }, + "TestMethod"); + } else { + Become(&TThis::StateZombie); + + Throttler->OnShutDown(ctx); + } + } + + void HandleFlush( + const NActors::TEvents::TEvFlushLog::TPtr& ev, + const NActors::TActorContext& ctx) + { + Y_UNUSED(ev); + Throttler->StartFlushing(ctx); + } + + static void RejectRequest( + const NActors::TEvents::TEvWakeup::TPtr& ev, + const NActors::TActorContext& ctx) + { + auto response = std::make_unique(); + NCloud::Reply(ctx, *ev, std::move(response)); + } + +private: + ITabletThrottlerPtr Throttler; + + ui64 RequestsCount = 0; +}; + +///////////////////////////////////////////////////////////////////////////// + +struct TTabletThrottlerLoggerStub: public ITabletThrottlerLogger +{ + void LogRequestPostponedBeforeSchedule( + const NActors::TActorContext& ctx, + TCallContextBase& callContext, + TDuration delay, + const char* methodName) const override + { + Y_UNUSED(ctx, callContext, delay, methodName); + } + + void LogRequestPostponedAfterSchedule( + const NActors::TActorContext& ctx, + TCallContextBase& callContext, + ui32 postponedCount, + const char* methodName) const override + { + Y_UNUSED(ctx, callContext, postponedCount, methodName); + } + + void LogRequestAdvanced( + const NActors::TActorContext& ctx, + TCallContextBase& callContext, + const char* methodName, + ui32 opType, + TDuration delay) const override + { + Y_UNUSED(ctx, callContext, methodName, opType, delay); + } +}; + +////////////////////////////////////////////////////////////////////////////// + +struct TTabletThrottlerPolicyAlwaysPostpone: public ITabletThrottlerPolicy +{ + bool TryPostpone( + TInstant ts, + const TThrottlingRequestInfo& requestInfo) override + { + Y_UNUSED(ts, requestInfo); + return true; + } + + TMaybe SuggestDelay( + TInstant ts, + TDuration queueTime, + const TThrottlingRequestInfo& requestInfo) override + { + Y_UNUSED(ts, queueTime, requestInfo); + return TDuration::Seconds(1); + } + + void OnPostponedEvent( + TInstant ts, + const TThrottlingRequestInfo& requestInfo) override + { + Y_UNUSED(ts, requestInfo); + } +}; + +} // namespace + +Y_UNIT_TEST_SUITE(TTabletThrottlerTest) +{ + /** + * Scenario that caused a crash: + * 1. A request is present in the postponed queue + * 2. Throttler flush is initiated + * 3. During the processing of the postponed queue, an actor shutdown is + * initiated + * 4. During the shutdown, Throttle->OnShutDown is called. It is not + * supposed to process the request that was initially in the postponed + * queue the second time. + */ + Y_UNIT_TEST(ShouldNotFlushNullEventOnShutdown) + { + TTabletThrottlerLoggerStub logger; + TTabletThrottlerPolicyAlwaysPostpone policy; + + std::unique_ptr actor = + std::make_unique(); + auto throttler = CreateTabletThrottler(*actor, logger, policy); + actor->ResetThrottler(std::move(throttler)); + + TTestActorRuntimeBase runtime; + runtime.Initialize(); + + auto senderId = runtime.AllocateEdgeActor(); + + auto actorId = runtime.Register(actor.release()); + + // One request is postponed + runtime.Send( + TAutoPtr(new IEventHandle( + actorId, + senderId, + new NActors::TEvents::TEvWakeup()))); + + // Flush is initiated + runtime.Send( + TAutoPtr(new IEventHandle( + actorId, + senderId, + new NActors::TEvents::TEvFlushLog()))); + } +} + +} // namespace NCloud diff --git a/cloud/storage/core/libs/throttling/ut/ya.make b/cloud/storage/core/libs/throttling/ut/ya.make index db19a6ce765..91c13ddfcd2 100644 --- a/cloud/storage/core/libs/throttling/ut/ya.make +++ b/cloud/storage/core/libs/throttling/ut/ya.make @@ -1,8 +1,19 @@ UNITTEST_FOR(cloud/storage/core/libs/throttling) +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/small.inc) + +SPLIT_FACTOR(1) + SRCS( helpers_ut.cpp leaky_bucket_ut.cpp + tablet_throttler_ut.cpp +) + +PEERDIR( + contrib/ydb/core/testlib + contrib/ydb/core/testlib/basics + contrib/ydb/core/testlib/default ) END()