@@ -222,8 +222,6 @@ public class LocalQueryRunner
222
222
private final SqlParser sqlParser ;
223
223
private final PlanFragmenter planFragmenter ;
224
224
private final InMemoryNodeManager nodeManager ;
225
- private final PageSorter pageSorter ;
226
- private final PageIndexerFactory pageIndexerFactory ;
227
225
private final MetadataManager metadata ;
228
226
private final StatsCalculator statsCalculator ;
229
227
private final CostCalculator costCalculator ;
@@ -251,45 +249,35 @@ public class LocalQueryRunner
251
249
private final TaskManagerConfig taskManagerConfig ;
252
250
private final boolean alwaysRevokeMemory ;
253
251
private final NodeSpillConfig nodeSpillConfig ;
254
- private final NodeSchedulerConfig nodeSchedulerConfig ;
255
252
private final FeaturesConfig featuresConfig ;
256
- private final Map <String , List <PropertyMetadata <?>>> defaultSessionProperties ;
257
253
private boolean printPlan ;
258
254
259
255
private final ReadWriteLock lock = new ReentrantReadWriteLock ();
260
256
261
- public LocalQueryRunner (Session defaultSession )
257
+ public static LocalQueryRunner create (Session defaultSession )
262
258
{
263
- this ( defaultSession , new FeaturesConfig (), new NodeSpillConfig (), false , false );
259
+ return builder ( defaultSession ). build ( );
264
260
}
265
261
266
- public LocalQueryRunner (Session defaultSession , Map < String , List < PropertyMetadata <?>>> defaultSessionProperties )
262
+ public static Builder builder (Session defaultSession )
267
263
{
268
- this ( defaultSession , new FeaturesConfig (), new NodeSpillConfig (), false , false , defaultSessionProperties );
264
+ return new Builder ( defaultSession );
269
265
}
270
266
271
- public LocalQueryRunner (Session defaultSession , FeaturesConfig featuresConfig )
272
- {
273
- this (defaultSession , featuresConfig , new NodeSpillConfig (), false , false );
274
- }
275
-
276
- public LocalQueryRunner (Session defaultSession , FeaturesConfig featuresConfig , NodeSpillConfig nodeSpillConfig , boolean withInitialTransaction , boolean alwaysRevokeMemory )
277
- {
278
- this (defaultSession , featuresConfig , nodeSpillConfig , withInitialTransaction , alwaysRevokeMemory , 1 , ImmutableMap .of ());
279
- }
280
-
281
- public LocalQueryRunner (Session defaultSession , FeaturesConfig featuresConfig , NodeSpillConfig nodeSpillConfig , boolean withInitialTransaction , boolean alwaysRevokeMemory , Map <String , List <PropertyMetadata <?>>> defaultSessionProperties )
282
- {
283
- this (defaultSession , featuresConfig , nodeSpillConfig , withInitialTransaction , alwaysRevokeMemory , 1 , defaultSessionProperties );
284
- }
285
-
286
- private LocalQueryRunner (Session defaultSession , FeaturesConfig featuresConfig , NodeSpillConfig nodeSpillConfig , boolean withInitialTransaction , boolean alwaysRevokeMemory , int nodeCountForStats , Map <String , List <PropertyMetadata <?>>> defaultSessionProperties )
267
+ private LocalQueryRunner (
268
+ Session defaultSession ,
269
+ FeaturesConfig featuresConfig ,
270
+ NodeSpillConfig nodeSpillConfig ,
271
+ boolean withInitialTransaction ,
272
+ boolean alwaysRevokeMemory ,
273
+ int nodeCountForStats ,
274
+ Map <String , List <PropertyMetadata <?>>> defaultSessionProperties )
287
275
{
288
276
requireNonNull (defaultSession , "defaultSession is null" );
277
+ requireNonNull (defaultSessionProperties , "defaultSessionProperties is null" );
289
278
checkArgument (!defaultSession .getTransactionId ().isPresent () || !withInitialTransaction , "Already in transaction" );
290
279
291
280
this .taskManagerConfig = new TaskManagerConfig ().setTaskConcurrency (4 );
292
- this .defaultSessionProperties = requireNonNull (defaultSessionProperties , "defaultSessionProperties is null" );
293
281
this .nodeSpillConfig = requireNonNull (nodeSpillConfig , "nodeSpillConfig is null" );
294
282
this .alwaysRevokeMemory = alwaysRevokeMemory ;
295
283
this .notificationExecutor = newCachedThreadPool (daemonThreadsNamed ("local-query-runner-executor-%s" ));
@@ -299,9 +287,9 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
299
287
300
288
this .sqlParser = new SqlParser ();
301
289
this .nodeManager = new InMemoryNodeManager ();
302
- this . pageSorter = new PagesIndexPageSorter (new PagesIndex .TestingFactory (false ));
290
+ PageSorter pageSorter = new PagesIndexPageSorter (new PagesIndex .TestingFactory (false ));
303
291
this .indexManager = new IndexManager ();
304
- this . nodeSchedulerConfig = new NodeSchedulerConfig ().setIncludeCoordinator (true );
292
+ NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig ().setIncludeCoordinator (true );
305
293
NodeScheduler nodeScheduler = new NodeScheduler (new UniformNodeSelectorFactory (nodeManager , nodeSchedulerConfig , new NodeTaskMap (finalizerService )));
306
294
this .featuresConfig = requireNonNull (featuresConfig , "featuresConfig is null" );
307
295
this .pageSinkManager = new PageSinkManager ();
@@ -324,7 +312,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
324
312
this .splitManager = new SplitManager (new QueryManagerConfig (), metadata );
325
313
this .planFragmenter = new PlanFragmenter (this .metadata , this .nodePartitioningManager , new QueryManagerConfig ());
326
314
this .joinCompiler = new JoinCompiler (metadata );
327
- this . pageIndexerFactory = new GroupByHashPageIndexerFactory (joinCompiler );
315
+ PageIndexerFactory pageIndexerFactory = new GroupByHashPageIndexerFactory (joinCompiler );
328
316
this .statsCalculator = createNewStatsCalculator (metadata , new TypeAnalyzer (sqlParser , metadata ));
329
317
this .taskCountEstimator = new TaskCountEstimator (() -> nodeCountForStats );
330
318
this .costCalculator = new CostCalculatorUsingExchanges (taskCountEstimator );
@@ -381,7 +369,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
381
369
382
370
// add bogus connector for testing session properties
383
371
catalogManager .registerCatalog (createBogusTestingCatalog (TESTING_CATALOG ));
384
- metadata .getSessionPropertyManager ().addConnectorSessionProperties (new CatalogName (TESTING_CATALOG ), this . defaultSessionProperties .getOrDefault (TESTING_CATALOG , ImmutableList .of ()));
372
+ metadata .getSessionPropertyManager ().addConnectorSessionProperties (new CatalogName (TESTING_CATALOG ), defaultSessionProperties .getOrDefault (TESTING_CATALOG , ImmutableList .of ()));
385
373
386
374
// rewrite session to use managed SessionPropertyMetadata
387
375
this .defaultSession = new Session (
@@ -434,17 +422,6 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
434
422
this .spillerFactory = new GenericSpillerFactory (singleStreamSpillerFactory );
435
423
}
436
424
437
- public static LocalQueryRunner queryRunnerWithInitialTransaction (Session defaultSession )
438
- {
439
- checkArgument (!defaultSession .getTransactionId ().isPresent (), "Already in transaction!" );
440
- return new LocalQueryRunner (defaultSession , new FeaturesConfig (), new NodeSpillConfig (), true , false );
441
- }
442
-
443
- public static LocalQueryRunner queryRunnerWithFakeNodeCountForStats (Session defaultSession , int nodeCount )
444
- {
445
- return new LocalQueryRunner (defaultSession , new FeaturesConfig (), new NodeSpillConfig (), false , false , nodeCount , ImmutableMap .of ());
446
- }
447
-
448
425
@ Override
449
426
public void close ()
450
427
{
@@ -877,4 +854,68 @@ private static List<TableScanNode> findTableScanNodes(PlanNode node)
877
854
.where (TableScanNode .class ::isInstance )
878
855
.findAll ();
879
856
}
857
+
858
+ public static class Builder
859
+ {
860
+ private Session defaultSession ;
861
+ private FeaturesConfig featuresConfig = new FeaturesConfig ();
862
+ private NodeSpillConfig nodeSpillConfig = new NodeSpillConfig ();
863
+ private boolean initialTransaction ;
864
+ private boolean alwaysRevokeMemory ;
865
+ private Map <String , List <PropertyMetadata <?>>> defaultSessionProperties = ImmutableMap .of ();
866
+ private int nodeCountForStats ;
867
+
868
+ private Builder (Session defaultSession )
869
+ {
870
+ this .defaultSession = requireNonNull (defaultSession , "defaultSession is null" );
871
+ }
872
+
873
+ public Builder withFeaturesConfig (FeaturesConfig featuresConfig )
874
+ {
875
+ this .featuresConfig = requireNonNull (featuresConfig , "featuresConfig is null" );
876
+ return this ;
877
+ }
878
+
879
+ public Builder withNodeSpillConfig (NodeSpillConfig nodeSpillConfig )
880
+ {
881
+ this .nodeSpillConfig = requireNonNull (nodeSpillConfig , "nodeSpillConfig is null" );
882
+ return this ;
883
+ }
884
+
885
+ public Builder withInitialTransaction ()
886
+ {
887
+ this .initialTransaction = true ;
888
+ return this ;
889
+ }
890
+
891
+ public Builder withAlwaysRevokeMemory ()
892
+ {
893
+ this .alwaysRevokeMemory = true ;
894
+ return this ;
895
+ }
896
+
897
+ public Builder withDefaultSessionProperties (Map <String , List <PropertyMetadata <?>>> defaultSessionProperties )
898
+ {
899
+ this .defaultSessionProperties = requireNonNull (defaultSessionProperties , "defaultSessionProperties is null" );
900
+ return this ;
901
+ }
902
+
903
+ public Builder withNodeCountForStats (int nodeCountForStats )
904
+ {
905
+ this .nodeCountForStats = nodeCountForStats ;
906
+ return this ;
907
+ }
908
+
909
+ public LocalQueryRunner build ()
910
+ {
911
+ return new LocalQueryRunner (
912
+ defaultSession ,
913
+ featuresConfig ,
914
+ nodeSpillConfig ,
915
+ initialTransaction ,
916
+ alwaysRevokeMemory ,
917
+ nodeCountForStats ,
918
+ defaultSessionProperties );
919
+ }
920
+ }
880
921
}
0 commit comments