Skip to content

Commit 833b161

Browse files
committed
Added parallel compilation
1 parent ac47d76 commit 833b161

File tree

9 files changed

+194
-26
lines changed

9 files changed

+194
-26
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

+5
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@ message TJsonParserConfig {
2525
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
2626
}
2727

28+
message TCompileServiceConfig {
29+
uint64 ParallelCompilationLimit = 1; // 1 by default, 0 <=> unlimited
30+
}
31+
2832
message TRowDispatcherConfig {
2933
bool Enabled = 1;
3034
uint64 TimeoutBeforeStartSessionSec = 2;
3135
uint64 SendStatusPeriodSec = 3;
3236
uint64 MaxSessionUsedMemory = 4;
3337
bool WithoutConsumer = 5;
3438
TJsonParserConfig JsonParser = 7;
39+
TCompileServiceConfig CompileService = 8;
3540
TRowDispatcherCoordinatorConfig Coordinator = 6;
3641
}

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

+3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct TEvRowDispatcher {
5151
EvGetInternalStateResponse,
5252
EvPurecalcCompileRequest,
5353
EvPurecalcCompileResponse,
54+
EvPurecalcCompileAbort,
5455
EvEnd,
5556
};
5657

@@ -197,6 +198,8 @@ struct TEvRowDispatcher {
197198
NYql::NDqProto::StatusIds::StatusCode Status;
198199
NYql::TIssues Issues;
199200
};
201+
202+
struct TEvPurecalcCompileAbort : public NActors::TEventLocal<TEvPurecalcCompileAbort, EEv::EvPurecalcCompileAbort> {};
200203
};
201204

202205
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp

+21-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class TTopicFilters : public ITopicFilters {
1414
NMonitoring::TDynamicCounters::TCounterPtr InFlightCompileRequests;
1515
NMonitoring::TDynamicCounters::TCounterPtr CompileErrors;
1616

17-
TCounters(NMonitoring::TDynamicCounterPtr counters)
17+
explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
1818
: Counters(counters)
1919
{
2020
Register();
@@ -83,6 +83,18 @@ class TTopicFilters : public ITopicFilters {
8383
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, PurecalcFilter->GetCompileRequest().release(), 0, InFlightCompilationId));
8484
}
8585

86+
void AbortCompilation() {
87+
if (!InFlightCompilationId) {
88+
return;
89+
}
90+
91+
LOG_ROW_DISPATCHER_TRACE("Send abort compile request with id " << InFlightCompilationId);
92+
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, new TEvRowDispatcher::TEvPurecalcCompileAbort(), 0, InFlightCompilationId));
93+
94+
InFlightCompilationId = 0;
95+
Self.Counters.InFlightCompileRequests->Dec();
96+
}
97+
8698
void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr ev) {
8799
if (ev->Cookie != InFlightCompilationId) {
88100
LOG_ROW_DISPATCHER_DEBUG("Outdated compiler response ignored for id " << ev->Cookie << ", current compile id " << InFlightCompilationId);
@@ -205,7 +217,14 @@ class TTopicFilters : public ITopicFilters {
205217

206218
void RemoveFilter(NActors::TActorId filterId) override {
207219
LOG_ROW_DISPATCHER_TRACE("Remove filter with id " << filterId);
208-
Filters.erase(filterId);
220+
221+
const auto it = Filters.find(filterId);
222+
if (it == Filters.end()) {
223+
return;
224+
}
225+
226+
it->second.AbortCompilation();
227+
Filters.erase(it);
209228
}
210229

211230
TFiltersStatistic GetStatistics() override {

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
333333
}
334334

335335
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
336-
with_lock(Alloc) {
337-
Clients.clear();
336+
if (Filters) {
337+
for (const auto& [clientId, _] : Clients) {
338+
Filters->RemoveFilter(clientId);
339+
}
340+
Filters.Reset();
338341
}
339342
PassAway();
340343
}

ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class TFiterFixture : public TBaseFixture {
9292
virtual void SetUp(NUnitTest::TTestContext& ctx) override {
9393
TBase::SetUp(ctx);
9494

95-
CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService());
95+
CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService({}, MakeIntrusive<NMonitoring::TDynamicCounters>()));
9696
}
9797

9898
virtual void TearDown(NUnitTest::TTestContext& ctx) override {

ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp

+154-19
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
55
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>
66

7+
#include <ydb/library/actors/core/actor_bootstrapped.h>
78
#include <ydb/library/actors/core/hfunc.h>
89

910
#include <yql/essentials/public/purecalc/common/interface.h>
@@ -12,26 +13,47 @@ namespace NFq::NRowDispatcher {
1213

1314
namespace {
1415

15-
class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
16-
using TBase = NActors::TActor<TPurecalcCompileService>;
16+
struct TEvPrivate {
17+
// Event ids
18+
enum EEv : ui32 {
19+
EvCompileFinished = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
20+
EvEnd
21+
};
22+
23+
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
24+
25+
// Events
26+
struct TEvCompileFinished : public NActors::TEventLocal<TEvCompileFinished, EvCompileFinished> {
27+
TEvCompileFinished(NActors::TActorId requestActor, ui64 requestId)
28+
: RequestActor(requestActor)
29+
, RequestId(requestId)
30+
{}
31+
32+
const NActors::TActorId RequestActor;
33+
const ui64 RequestId;
34+
};
35+
};
1736

37+
class TPurecalcCompileActor : public NActors::TActorBootstrapped<TPurecalcCompileActor> {
1838
public:
19-
TPurecalcCompileService()
20-
: TBase(&TPurecalcCompileService::StateFunc)
21-
, LogPrefix("TPurecalcCompileService: ")
39+
TPurecalcCompileActor(NActors::TActorId owner, NYql::NPureCalc::IProgramFactoryPtr factory, TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr request)
40+
: Owner(owner)
41+
, Factory(factory)
42+
, LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ")
43+
, Request(std::move(request))
2244
{}
2345

24-
STRICT_STFUNC(StateFunc,
25-
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
26-
)
46+
void Bootstrap() {
47+
Y_DEFER {
48+
Finish();
49+
};
2750

28-
void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
29-
LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie);
30-
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder);
51+
LOG_ROW_DISPATCHER_TRACE("Started compile request");
52+
IProgramHolder::TPtr programHolder = std::move(Request->Get()->ProgramHolder);
3153

3254
TStatus status = TStatus::Success();
3355
try {
34-
programHolder->CreateProgram(GetOrCreateFactory(ev->Get()->Settings));
56+
programHolder->CreateProgram(Factory);
3557
} catch (const NYql::NPureCalc::TCompileError& error) {
3658
status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Compile issues: " << error.GetIssues())
3759
.AddIssue(TStringBuilder() << "Final yql: " << error.GetYql())
@@ -41,15 +63,120 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
4163
}
4264

4365
if (status.IsFail()) {
44-
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie);
45-
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, ev->Cookie);
66+
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request");
67+
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, Request->Cookie);
4668
} else {
47-
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie);
48-
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie);
69+
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request");
70+
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, Request->Cookie);
4971
}
5072
}
5173

5274
private:
75+
void Finish() {
76+
Send(Owner, new TEvPrivate::TEvCompileFinished(Request->Sender, Request->Cookie));
77+
PassAway();
78+
}
79+
80+
private:
81+
const NActors::TActorId Owner;
82+
const NYql::NPureCalc::IProgramFactoryPtr Factory;
83+
const TString LogPrefix;
84+
85+
TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr Request;
86+
};
87+
88+
class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
89+
using TBase = NActors::TActor<TPurecalcCompileService>;
90+
91+
struct TCounters {
92+
const NMonitoring::TDynamicCounterPtr Counters;
93+
94+
NMonitoring::TDynamicCounters::TCounterPtr ActiveCompileActors;
95+
NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize;
96+
97+
explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
98+
: Counters(counters)
99+
{
100+
Register();
101+
}
102+
103+
private:
104+
void Register() {
105+
ActiveCompileActors = Counters->GetCounter("ActiveCompileActors", false);
106+
CompileQueueSize = Counters->GetCounter("CompileQueueSize", false);
107+
}
108+
};
109+
110+
public:
111+
TPurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters)
112+
: TBase(&TPurecalcCompileService::StateFunc)
113+
, Config(config)
114+
, InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : std::numeric_limits<ui64>::max())
115+
, LogPrefix("TPurecalcCompileService: ")
116+
, Counters(counters)
117+
{}
118+
119+
STRICT_STFUNC(StateFunc,
120+
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
121+
hFunc(TEvRowDispatcher::TEvPurecalcCompileAbort, Handle)
122+
hFunc(TEvPrivate::TEvCompileFinished, Handle);
123+
)
124+
125+
void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
126+
const auto requestActor = ev->Sender;
127+
const ui64 requestId = ev->Cookie;
128+
LOG_ROW_DISPATCHER_TRACE("Add to compile queue request with id " << requestId << " from " << requestActor);
129+
130+
// Remove old compile request
131+
RemoveRequest(requestActor, requestId);
132+
133+
// Add new request
134+
RequestsQueue.emplace_back(std::move(ev));
135+
Y_ENSURE(RequestsIndex.emplace(std::make_pair(requestActor, requestId), --RequestsQueue.end()).second);
136+
Counters.CompileQueueSize->Inc();
137+
138+
StartCompilation();
139+
}
140+
141+
void Handle(TEvRowDispatcher::TEvPurecalcCompileAbort::TPtr& ev) {
142+
LOG_ROW_DISPATCHER_TRACE("Abort compile request with id " << ev->Cookie << " from " << ev->Sender);
143+
144+
RemoveRequest(ev->Sender, ev->Cookie);
145+
}
146+
147+
void Handle(TEvPrivate::TEvCompileFinished::TPtr& ev) {
148+
LOG_ROW_DISPATCHER_TRACE("Compile finished for request with id " << ev->Get()->RequestId << " from " << ev->Get()->RequestActor);
149+
150+
InFlightCompilations.erase(ev->Sender);
151+
Counters.ActiveCompileActors->Dec();
152+
153+
StartCompilation();
154+
}
155+
156+
private:
157+
void RemoveRequest(NActors::TActorId requestActor, ui64 requestId) {
158+
const auto it = RequestsIndex.find(std::make_pair(requestActor, requestId));
159+
if (it == RequestsIndex.end()) {
160+
return;
161+
}
162+
163+
RequestsQueue.erase(it->second);
164+
RequestsIndex.erase(it);
165+
Counters.CompileQueueSize->Dec();
166+
}
167+
168+
void StartCompilation() {
169+
while (!RequestsQueue.empty() && InFlightCompilations.size() < InFlightLimit) {
170+
auto request = std::move(RequestsQueue.front());
171+
RemoveRequest(request->Sender, request->Cookie);
172+
173+
const auto factory = GetOrCreateFactory(request->Get()->Settings);
174+
const auto compileActor = Register(new TPurecalcCompileActor(SelfId(), factory, std::move(request)));
175+
Y_ENSURE(InFlightCompilations.emplace(compileActor).second);
176+
Counters.ActiveCompileActors->Inc();
177+
}
178+
}
179+
53180
NYql::NPureCalc::IProgramFactoryPtr GetOrCreateFactory(const TPurecalcCompileSettings& settings) {
54181
const auto it = ProgramFactories.find(settings);
55182
if (it != ProgramFactories.end()) {
@@ -62,15 +189,23 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
62189
}
63190

64191
private:
192+
const NConfig::TCompileServiceConfig Config;
193+
const ui64 InFlightLimit;
65194
const TString LogPrefix;
66195

196+
std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr> RequestsQueue;
197+
THashMap<std::pair<NActors::TActorId, ui64>, std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr>::iterator> RequestsIndex;
198+
std::unordered_set<NActors::TActorId> InFlightCompilations;
199+
67200
std::map<TPurecalcCompileSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
201+
202+
const TCounters Counters;
68203
};
69204

70-
} // namespace {
205+
} // anonymous namespace
71206

72-
NActors::IActor* CreatePurecalcCompileService() {
73-
return new TPurecalcCompileService();
207+
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) {
208+
return new TPurecalcCompileService(config, counters);
74209
}
75210

76211
} // namespace NFq::NRowDispatcher
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#pragma once
22

3+
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
4+
35
#include <ydb/library/actors/core/actor.h>
46

57
namespace NFq::NRowDispatcher {
68

7-
NActors::IActor* CreatePurecalcCompileService();
9+
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters);
810

911
} // namespace NFq::NRowDispatcher

ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SRCS(
66

77
PEERDIR(
88
ydb/core/fq/libs/actors/logging
9+
ydb/core/fq/libs/config/protos
910
ydb/core/fq/libs/row_dispatcher/events
1011
ydb/core/fq/libs/row_dispatcher/format_handler/common
1112
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ void TRowDispatcher::Bootstrap() {
471471
auto coordinatorId = Register(NewCoordinator(SelfId(), config, YqSharedResources, Tenant, Counters).release());
472472
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());
473473

474-
CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService());
474+
CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService(Config.GetCompileService(), Counters));
475475

476476
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
477477
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());

0 commit comments

Comments
 (0)