Skip to content

Commit 8d30b72

Browse files
GrigoriyPAasmyasnikov
authored andcommitted
YQ-3926 RD added parallel purecalc compilation (ydb-platform#12505)
1 parent ba3cff7 commit 8d30b72

File tree

9 files changed

+198
-26
lines changed

9 files changed

+198
-26
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

+5
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@ message TJsonParserConfig {
2525
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
2626
}
2727

28+
message TCompileServiceConfig {
29+
uint64 ParallelCompilationLimit = 1; // 1 by default
30+
}
31+
2832
message TRowDispatcherConfig {
2933
bool Enabled = 1;
3034
uint64 TimeoutBeforeStartSessionSec = 2;
3135
uint64 SendStatusPeriodSec = 3;
3236
uint64 MaxSessionUsedMemory = 4;
3337
bool WithoutConsumer = 5;
3438
TJsonParserConfig JsonParser = 7;
39+
TCompileServiceConfig CompileService = 8;
3540
TRowDispatcherCoordinatorConfig Coordinator = 6;
3641
}

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

+3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct TEvRowDispatcher {
5151
EvGetInternalStateResponse,
5252
EvPurecalcCompileRequest,
5353
EvPurecalcCompileResponse,
54+
EvPurecalcCompileAbort,
5455
EvEnd,
5556
};
5657

@@ -197,6 +198,8 @@ struct TEvRowDispatcher {
197198
NYql::NDqProto::StatusIds::StatusCode Status;
198199
NYql::TIssues Issues;
199200
};
201+
202+
struct TEvPurecalcCompileAbort : public NActors::TEventLocal<TEvPurecalcCompileAbort, EEv::EvPurecalcCompileAbort> {};
200203
};
201204

202205
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp

+21-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class TTopicFilters : public ITopicFilters {
1414
NMonitoring::TDynamicCounters::TCounterPtr InFlightCompileRequests;
1515
NMonitoring::TDynamicCounters::TCounterPtr CompileErrors;
1616

17-
TCounters(NMonitoring::TDynamicCounterPtr counters)
17+
explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
1818
: Counters(counters)
1919
{
2020
Register();
@@ -83,6 +83,18 @@ class TTopicFilters : public ITopicFilters {
8383
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, PurecalcFilter->GetCompileRequest().release(), 0, InFlightCompilationId));
8484
}
8585

86+
void AbortCompilation() {
87+
if (!InFlightCompilationId) {
88+
return;
89+
}
90+
91+
LOG_ROW_DISPATCHER_TRACE("Send abort compile request with id " << InFlightCompilationId);
92+
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, new TEvRowDispatcher::TEvPurecalcCompileAbort(), 0, InFlightCompilationId));
93+
94+
InFlightCompilationId = 0;
95+
Self.Counters.InFlightCompileRequests->Dec();
96+
}
97+
8698
void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr ev) {
8799
if (ev->Cookie != InFlightCompilationId) {
88100
LOG_ROW_DISPATCHER_DEBUG("Outdated compiler response ignored for id " << ev->Cookie << ", current compile id " << InFlightCompilationId);
@@ -205,7 +217,14 @@ class TTopicFilters : public ITopicFilters {
205217

206218
void RemoveFilter(NActors::TActorId filterId) override {
207219
LOG_ROW_DISPATCHER_TRACE("Remove filter with id " << filterId);
208-
Filters.erase(filterId);
220+
221+
const auto it = Filters.find(filterId);
222+
if (it == Filters.end()) {
223+
return;
224+
}
225+
226+
it->second.AbortCompilation();
227+
Filters.erase(it);
209228
}
210229

211230
TFiltersStatistic GetStatistics() override {

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
333333
}
334334

335335
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
336-
with_lock(Alloc) {
337-
Clients.clear();
336+
if (Filters) {
337+
for (const auto& [clientId, _] : Clients) {
338+
Filters->RemoveFilter(clientId);
339+
}
340+
Filters.Reset();
338341
}
339342
PassAway();
340343
}

ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class TFiterFixture : public TBaseFixture {
9292
virtual void SetUp(NUnitTest::TTestContext& ctx) override {
9393
TBase::SetUp(ctx);
9494

95-
CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService());
95+
CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService({}, MakeIntrusive<NMonitoring::TDynamicCounters>()));
9696
}
9797

9898
virtual void TearDown(NUnitTest::TTestContext& ctx) override {

ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp

+158-19
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
55
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>
66

7+
#include <ydb/library/actors/core/actor_bootstrapped.h>
78
#include <ydb/library/actors/core/hfunc.h>
89

910
#include <yql/essentials/public/purecalc/common/interface.h>
@@ -12,26 +13,49 @@ namespace NFq::NRowDispatcher {
1213

1314
namespace {
1415

15-
class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
16-
using TBase = NActors::TActor<TPurecalcCompileService>;
16+
struct TEvPrivate {
17+
// Event ids
18+
enum EEv : ui32 {
19+
EvCompileFinished = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
20+
EvEnd
21+
};
22+
23+
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
24+
25+
// Events
26+
struct TEvCompileFinished : public NActors::TEventLocal<TEvCompileFinished, EvCompileFinished> {
27+
TEvCompileFinished(NActors::TActorId requestActor, ui64 requestId)
28+
: RequestActor(requestActor)
29+
, RequestId(requestId)
30+
{}
31+
32+
const NActors::TActorId RequestActor;
33+
const ui64 RequestId;
34+
};
35+
};
1736

37+
class TPurecalcCompileActor : public NActors::TActorBootstrapped<TPurecalcCompileActor> {
1838
public:
19-
TPurecalcCompileService()
20-
: TBase(&TPurecalcCompileService::StateFunc)
21-
, LogPrefix("TPurecalcCompileService: ")
39+
TPurecalcCompileActor(NActors::TActorId owner, NYql::NPureCalc::IProgramFactoryPtr factory, TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr request)
40+
: Owner(owner)
41+
, Factory(factory)
42+
, LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ")
43+
, Request(std::move(request))
2244
{}
2345

24-
STRICT_STFUNC(StateFunc,
25-
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
26-
)
46+
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_ACTOR";
2747

28-
void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
29-
LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie);
30-
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder);
48+
void Bootstrap() {
49+
Y_DEFER {
50+
Finish();
51+
};
52+
53+
LOG_ROW_DISPATCHER_TRACE("Started compile request");
54+
IProgramHolder::TPtr programHolder = std::move(Request->Get()->ProgramHolder);
3155

3256
TStatus status = TStatus::Success();
3357
try {
34-
programHolder->CreateProgram(GetOrCreateFactory(ev->Get()->Settings));
58+
programHolder->CreateProgram(Factory);
3559
} catch (const NYql::NPureCalc::TCompileError& error) {
3660
status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Compile issues: " << error.GetIssues())
3761
.AddIssue(TStringBuilder() << "Final yql: " << error.GetYql())
@@ -41,15 +65,122 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
4165
}
4266

4367
if (status.IsFail()) {
44-
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie);
45-
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, ev->Cookie);
68+
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request");
69+
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, Request->Cookie);
4670
} else {
47-
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie);
48-
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie);
71+
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request");
72+
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, Request->Cookie);
73+
}
74+
}
75+
76+
private:
77+
void Finish() {
78+
Send(Owner, new TEvPrivate::TEvCompileFinished(Request->Sender, Request->Cookie));
79+
PassAway();
80+
}
81+
82+
private:
83+
const NActors::TActorId Owner;
84+
const NYql::NPureCalc::IProgramFactoryPtr Factory;
85+
const TString LogPrefix;
86+
87+
TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr Request;
88+
};
89+
90+
class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
91+
using TBase = NActors::TActor<TPurecalcCompileService>;
92+
93+
struct TCounters {
94+
const NMonitoring::TDynamicCounterPtr Counters;
95+
96+
NMonitoring::TDynamicCounters::TCounterPtr ActiveCompileActors;
97+
NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize;
98+
99+
explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
100+
: Counters(counters)
101+
{
102+
Register();
49103
}
104+
105+
private:
106+
void Register() {
107+
ActiveCompileActors = Counters->GetCounter("ActiveCompileActors", false);
108+
CompileQueueSize = Counters->GetCounter("CompileQueueSize", false);
109+
}
110+
};
111+
112+
public:
113+
TPurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters)
114+
: TBase(&TPurecalcCompileService::StateFunc)
115+
, Config(config)
116+
, InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : 1)
117+
, LogPrefix("TPurecalcCompileService: ")
118+
, Counters(counters)
119+
{}
120+
121+
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_SERVICE";
122+
123+
STRICT_STFUNC(StateFunc,
124+
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
125+
hFunc(TEvRowDispatcher::TEvPurecalcCompileAbort, Handle)
126+
hFunc(TEvPrivate::TEvCompileFinished, Handle);
127+
)
128+
129+
void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
130+
const auto requestActor = ev->Sender;
131+
const ui64 requestId = ev->Cookie;
132+
LOG_ROW_DISPATCHER_TRACE("Add to compile queue request with id " << requestId << " from " << requestActor);
133+
134+
// Remove old compile request
135+
RemoveRequest(requestActor, requestId);
136+
137+
// Add new request
138+
RequestsQueue.emplace_back(std::move(ev));
139+
Y_ENSURE(RequestsIndex.emplace(std::make_pair(requestActor, requestId), --RequestsQueue.end()).second);
140+
Counters.CompileQueueSize->Inc();
141+
142+
StartCompilation();
143+
}
144+
145+
void Handle(TEvRowDispatcher::TEvPurecalcCompileAbort::TPtr& ev) {
146+
LOG_ROW_DISPATCHER_TRACE("Abort compile request with id " << ev->Cookie << " from " << ev->Sender);
147+
148+
RemoveRequest(ev->Sender, ev->Cookie);
149+
}
150+
151+
void Handle(TEvPrivate::TEvCompileFinished::TPtr& ev) {
152+
LOG_ROW_DISPATCHER_TRACE("Compile finished for request with id " << ev->Get()->RequestId << " from " << ev->Get()->RequestActor);
153+
154+
InFlightCompilations.erase(ev->Sender);
155+
Counters.ActiveCompileActors->Dec();
156+
157+
StartCompilation();
50158
}
51159

52160
private:
161+
void RemoveRequest(NActors::TActorId requestActor, ui64 requestId) {
162+
const auto it = RequestsIndex.find(std::make_pair(requestActor, requestId));
163+
if (it == RequestsIndex.end()) {
164+
return;
165+
}
166+
167+
RequestsQueue.erase(it->second);
168+
RequestsIndex.erase(it);
169+
Counters.CompileQueueSize->Dec();
170+
}
171+
172+
void StartCompilation() {
173+
while (!RequestsQueue.empty() && InFlightCompilations.size() < InFlightLimit) {
174+
auto request = std::move(RequestsQueue.front());
175+
RemoveRequest(request->Sender, request->Cookie);
176+
177+
const auto factory = GetOrCreateFactory(request->Get()->Settings);
178+
const auto compileActor = Register(new TPurecalcCompileActor(SelfId(), factory, std::move(request)));
179+
Y_ENSURE(InFlightCompilations.emplace(compileActor).second);
180+
Counters.ActiveCompileActors->Inc();
181+
}
182+
}
183+
53184
NYql::NPureCalc::IProgramFactoryPtr GetOrCreateFactory(const TPurecalcCompileSettings& settings) {
54185
const auto it = ProgramFactories.find(settings);
55186
if (it != ProgramFactories.end()) {
@@ -62,15 +193,23 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
62193
}
63194

64195
private:
196+
const NConfig::TCompileServiceConfig Config;
197+
const ui64 InFlightLimit;
65198
const TString LogPrefix;
66199

200+
std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr> RequestsQueue;
201+
THashMap<std::pair<NActors::TActorId, ui64>, std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr>::iterator> RequestsIndex;
202+
std::unordered_set<NActors::TActorId> InFlightCompilations;
203+
67204
std::map<TPurecalcCompileSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
205+
206+
const TCounters Counters;
68207
};
69208

70-
} // namespace {
209+
} // anonymous namespace
71210

72-
NActors::IActor* CreatePurecalcCompileService() {
73-
return new TPurecalcCompileService();
211+
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) {
212+
return new TPurecalcCompileService(config, counters);
74213
}
75214

76215
} // namespace NFq::NRowDispatcher
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#pragma once
22

3+
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
4+
35
#include <ydb/library/actors/core/actor.h>
46

57
namespace NFq::NRowDispatcher {
68

7-
NActors::IActor* CreatePurecalcCompileService();
9+
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters);
810

911
} // namespace NFq::NRowDispatcher

ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SRCS(
66

77
PEERDIR(
88
ydb/core/fq/libs/actors/logging
9+
ydb/core/fq/libs/config/protos
910
ydb/core/fq/libs/row_dispatcher/events
1011
ydb/core/fq/libs/row_dispatcher/format_handler/common
1112
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ void TRowDispatcher::Bootstrap() {
471471
auto coordinatorId = Register(NewCoordinator(SelfId(), config, YqSharedResources, Tenant, Counters).release());
472472
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());
473473

474-
CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService());
474+
CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService(Config.GetCompileService(), Counters));
475475

476476
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
477477
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());

0 commit comments

Comments
 (0)