@@ -1426,27 +1426,10 @@ private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pc
1426
1426
if (equalOp1 .getNumChild () > 1 || equalOp2 .getNumChild () > 1 ) {
1427
1427
// TODO: Support checking multiple child operators to merge further.
1428
1428
discardableInputOps .addAll (gatherDPPBranchOps (pctx , optimizerCache , discardableOps ));
1429
-
1430
- // Accumulate InMemoryDataSize of unmerged MapJoin operators.
1431
- Set <Operator <?>> opsWork1 = findWorkOperators (optimizerCache , retainableTsOp );
1432
- for (Operator <?> op : opsWork1 ) {
1433
- if (op instanceof MapJoinOperator ) {
1434
- MapJoinOperator mop = (MapJoinOperator ) op ;
1435
- dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1436
- maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1437
- }
1438
- }
1439
- Set <Operator <?>> opsWork2 = findWorkOperators (optimizerCache , discardableTsOp );
1440
- for (Operator <?> op : opsWork2 ) {
1441
- if (op instanceof MapJoinOperator ) {
1442
- MapJoinOperator mop = (MapJoinOperator ) op ;
1443
- dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1444
- maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1445
- }
1446
- }
1447
-
1448
- return new SharedResult (retainableOps , discardableOps , discardableInputOps , dataSize , maxDataSize );
1429
+ return createSharedResultForRoot (optimizerCache , retainableTsOp , discardableTsOp ,
1430
+ retainableOps , discardableOps , discardableInputOps );
1449
1431
}
1432
+
1450
1433
if (retainableTsOp .getChildOperators ().size () == 0 || discardableTsOp .getChildOperators ().size () == 0 ) {
1451
1434
return new SharedResult (retainableOps , discardableOps , discardableInputOps , dataSize , maxDataSize );
1452
1435
}
@@ -1469,18 +1452,18 @@ private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pc
1469
1452
}
1470
1453
}
1471
1454
1472
- boolean bailOut = false ;
1473
1455
if (equalFilters ) {
1474
1456
equalOp1 = currentOp1 ;
1475
1457
equalOp2 = currentOp2 ;
1476
1458
retainableOps .add (equalOp1 );
1477
1459
discardableOps .add (equalOp2 );
1478
- if (currentOp1 .getChildOperators (). size () > 1 || currentOp2 .getChildOperators (). size () > 1 ) {
1460
+ if (currentOp1 .getNumChild () > 1 || currentOp2 .getNumChild () > 1 ) {
1479
1461
// TODO: Support checking multiple child operators to merge further.
1480
1462
discardableInputOps .addAll (gatherDPPBranchOps (pctx , optimizerCache , discardableOps ));
1481
1463
discardableInputOps .addAll (gatherDPPBranchOps (pctx , optimizerCache , retainableOps ,
1482
1464
discardableInputOps ));
1483
- bailOut = true ;
1465
+ return createSharedResultForRoot (optimizerCache , retainableTsOp , discardableTsOp ,
1466
+ retainableOps , discardableOps , discardableInputOps );
1484
1467
}
1485
1468
currentOp1 = currentOp1 .getChildOperators ().get (0 );
1486
1469
currentOp2 = currentOp2 .getChildOperators ().get (0 );
@@ -1489,37 +1472,54 @@ private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pc
1489
1472
discardableInputOps .addAll (gatherDPPBranchOps (pctx , optimizerCache , discardableOps ));
1490
1473
discardableInputOps .addAll (gatherDPPBranchOps (pctx , optimizerCache , retainableOps ,
1491
1474
discardableInputOps ));
1492
- bailOut = true ;
1475
+ return createSharedResultForRoot (optimizerCache , retainableTsOp , discardableTsOp ,
1476
+ retainableOps , discardableOps , discardableInputOps );
1493
1477
}
1478
+ }
1494
1479
1495
- if (bailOut ) {
1496
- // Accumulate InMemoryDataSize of unmerged MapJoin operators.
1497
- Set <Operator <?>> opsWork1 = findWorkOperators (optimizerCache , currentOp1 );
1498
- for (Operator <?> op : opsWork1 ) {
1499
- if (op instanceof MapJoinOperator ) {
1500
- MapJoinOperator mop = (MapJoinOperator ) op ;
1501
- dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1502
- maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1503
- }
1480
+ return extractSharedOptimizationInfo (pctx , optimizerCache , equalOp1 , equalOp2 ,
1481
+ currentOp1 , currentOp2 , retainableOps , discardableOps , discardableInputOps , mayRemoveDownStreamOperators ,
1482
+ mayRemoveInputOps );
1483
+ }
1484
+
1485
+ private static SharedResult createSharedResultForRoot (
1486
+ SharedWorkOptimizerCache optimizerCache ,
1487
+ Operator <?> retainableOp ,
1488
+ Operator <?> discardableOp ,
1489
+ LinkedHashSet <Operator <?>> retainableOps ,
1490
+ LinkedHashSet <Operator <?>> discardableOps ,
1491
+ Set <Operator <?>> discardableInputOps ) {
1492
+ // Assertion: retainableOps and discardableOps do not contain MapJoinOperator.
1493
+
1494
+ // Accumulate InMemoryDataSize of unmerged MapJoin operators.
1495
+ long dataSize = 0L ;
1496
+ long maxDataSize = 0L ;
1497
+
1498
+ Set <Operator <?>> opsWork1 = findWorkOperators (optimizerCache , retainableOp );
1499
+ for (Operator <?> op : opsWork1 ) {
1500
+ if (op instanceof MapJoinOperator ) {
1501
+ MapJoinOperator mop = (MapJoinOperator ) op ;
1502
+ dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1503
+ if (maxDataSize < mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ()) {
1504
+ maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1504
1505
}
1505
- Set <Operator <?>> opsWork2 = findWorkOperators (optimizerCache , currentOp2 );
1506
- for (Operator <?> op : opsWork2 ) {
1507
- if (op instanceof MapJoinOperator ) {
1508
- MapJoinOperator mop = (MapJoinOperator ) op ;
1509
- dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1510
- maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1511
- }
1506
+ }
1507
+ }
1508
+ Set <Operator <?>> opsWork2 = findWorkOperators (optimizerCache , discardableOp );
1509
+ for (Operator <?> op : opsWork2 ) {
1510
+ if (op instanceof MapJoinOperator ) {
1511
+ MapJoinOperator mop = (MapJoinOperator ) op ;
1512
+ dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1513
+ if (maxDataSize < mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ()) {
1514
+ maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1512
1515
}
1513
-
1514
- return new SharedResult (retainableOps , discardableOps , discardableInputOps , dataSize , maxDataSize );
1515
1516
}
1516
1517
}
1517
1518
1518
- return extractSharedOptimizationInfo (pctx , optimizerCache , equalOp1 , equalOp2 ,
1519
- currentOp1 , currentOp2 , retainableOps , discardableOps , discardableInputOps , mayRemoveDownStreamOperators ,
1520
- mayRemoveInputOps );
1519
+ return new SharedResult (retainableOps , discardableOps , discardableInputOps , dataSize , maxDataSize );
1521
1520
}
1522
1521
1522
+
1523
1523
private static SharedResult extractSharedOptimizationInfo (ParseContext pctx ,
1524
1524
SharedWorkOptimizerCache optimizerCache ,
1525
1525
Operator <?> retainableOpEqualParent ,
@@ -1597,7 +1597,9 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
1597
1597
if (equalOp1 instanceof MapJoinOperator ) {
1598
1598
MapJoinOperator mop = (MapJoinOperator ) equalOp1 ;
1599
1599
dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1600
- maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1600
+ if (maxDataSize < mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ()) {
1601
+ maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1602
+ }
1601
1603
}
1602
1604
if (currentOp1 .getChildOperators ().size () > 1 ||
1603
1605
currentOp2 .getChildOperators ().size () > 1 ) {
@@ -1615,15 +1617,19 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
1615
1617
if (op instanceof MapJoinOperator && !retainableOps .contains (op )) {
1616
1618
MapJoinOperator mop = (MapJoinOperator ) op ;
1617
1619
dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1618
- maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1620
+ if (maxDataSize < mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ()) {
1621
+ maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1622
+ }
1619
1623
}
1620
1624
}
1621
1625
Set <Operator <?>> opsWork2 = findWorkOperators (optimizerCache , currentOp2 );
1622
1626
for (Operator <?> op : opsWork2 ) {
1623
1627
if (op instanceof MapJoinOperator && !discardableOps .contains (op )) {
1624
1628
MapJoinOperator mop = (MapJoinOperator ) op ;
1625
1629
dataSize = StatsUtils .safeAdd (dataSize , mop .getConf ().getInMemoryDataSize ());
1626
- maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1630
+ if (maxDataSize < mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ()) {
1631
+ maxDataSize = mop .getConf ().getMemoryMonitorInfo ().getAdjustedNoConditionalTaskSize ();
1632
+ }
1627
1633
}
1628
1634
}
1629
1635
0 commit comments