Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ enum JoinStrategy {
HASH = 0;
LOOKUP = 1;
AS_OF = 2;
SORTED = 3;
}

message JoinNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public static class JoinHintOptions {
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
// "lookup" can be used when the right table is a dimension table replicated to all workers
public static final String LOOKUP_JOIN_STRATEGY = "lookup";
public static final String SORTED_JOIN_STRATEGY = "sorted";

public static final String LEFT_DISTRIBUTION_TYPE = "left_distribution_type";
public static final String RIGHT_DISTRIBUTION_TYPE = "right_distribution_type";
Expand Down Expand Up @@ -128,6 +129,10 @@ public static boolean useLookupJoinStrategy(Join join) {
return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join));
}

public static boolean useSortedJoinStrategy(Join join) {
return SORTED_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join));
}

@Nullable
public static DistributionType getLeftDistributionType(Map<String, String> joinHintOptions) {
return DistributionType.fromHint(joinHintOptions.get(LEFT_DISTRIBUTION_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
Expand All @@ -37,7 +38,7 @@
import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions.JoinHintOptions;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
Expand Down Expand Up @@ -135,7 +136,7 @@ RelNode assignSort(PhysicalSort sort) {
@VisibleForTesting
RelNode assignJoin(Join join) {
// Case-1: Handle lookup joins.
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
if (JoinHintOptions.useLookupJoinStrategy(join)) {
return assignLookupJoin(join);
}
// Case-2: Handle dynamic filter for semi joins.
Expand All @@ -148,14 +149,25 @@ RelNode assignJoin(Join join) {
Preconditions.checkState(joinInfo.leftKeys.size() == joinInfo.rightKeys.size(),
"Always expect left and right keys to be same size. Found: %s and %s",
joinInfo.leftKeys, joinInfo.rightKeys);
// Case-3: Default case.
Join sortedJoin = assignSortedJoin(join);
RelDistribution rightDistribution = !joinInfo.rightKeys.isEmpty() ? RelDistributions.hash(joinInfo.rightKeys)
: RelDistributions.BROADCAST_DISTRIBUTED;
RelDistribution leftDistribution;
if (joinInfo.leftKeys.isEmpty() || rightDistribution == RelDistributions.BROADCAST_DISTRIBUTED) {
leftDistribution = RelDistributions.RANDOM_DISTRIBUTED;
} else {
if (sortedJoin != null) {
// We support sorted join only for single key conditions. And we push down collation and hash-dist trait on
// both sides.
join = sortedJoin;
joinInfo = sortedJoin.analyzeCondition();
Preconditions.checkState(joinInfo.leftKeys.size() == 1,
"Expecting single key for sorted join. Found: %s", joinInfo.leftKeys.size());
leftDistribution = RelDistributions.hash(joinInfo.leftKeys);
rightDistribution = RelDistributions.hash(joinInfo.rightKeys);
} else {
if (joinInfo.leftKeys.isEmpty() || rightDistribution == RelDistributions.BROADCAST_DISTRIBUTED) {
leftDistribution = RelDistributions.RANDOM_DISTRIBUTED;
} else {
leftDistribution = RelDistributions.hash(joinInfo.leftKeys);
}
}
// left-input
RelNode leftInput = join.getInput(0);
Expand Down Expand Up @@ -280,12 +292,36 @@ private RelNode assignDynamicFilterSemiJoin(PhysicalJoin join) {
Preconditions.checkState(rightInput.getTraitSet().getDistribution() == null,
"Found existing dist trait on right input of semi-join");
RelDistribution distribution = RelDistributions.BROADCAST_DISTRIBUTED;
if (Boolean.TRUE.equals(PinotHintOptions.JoinHintOptions.isColocatedByJoinKeys(join))) {
if (Boolean.TRUE.equals(JoinHintOptions.isColocatedByJoinKeys(join))) {
distribution = RelDistributions.hash(joinInfo.rightKeys);
}
RelTraitSet rightTraitSet = rightInput.getTraitSet().plus(distribution)
.plus(PinotExecStrategyTrait.PIPELINE_BREAKER);
rightInput = rightInput.copy(rightTraitSet, rightInput.getInputs());
return join.copy(join.getTraitSet(), ImmutableList.of(leftInput, rightInput));
}

@Nullable
private Join assignSortedJoin(Join join) {
String joinStrategyHint = JoinHintOptions.getJoinStrategyHint(join);
if (joinStrategyHint == null || !joinStrategyHint.equalsIgnoreCase(JoinHintOptions.SORTED_JOIN_STRATEGY)) {
return null;
}
JoinInfo joinInfo = join.analyzeCondition();
if (joinInfo.leftKeys.size() != 1) {
return null;
}
RelNode leftInput = join.getInput(0);
RelNode rightInput = join.getInput(1);
RelTraitSet leftTraitSet = leftInput.getTraitSet();
RelTraitSet rightTraitSet = rightInput.getTraitSet();
if (leftTraitSet.getCollation() != null || rightTraitSet.getCollation() != null) {
return null;
}
RelNode newLeftInput = leftInput.copy(leftTraitSet.plus(RelCollations.of(joinInfo.leftKeys.get(0))),
leftInput.getInputs());
RelNode newRightInput = rightInput.copy(rightTraitSet.plus(RelCollations.of(joinInfo.rightKeys.get(0))),
rightInput.getInputs());
return join.copy(join.getTraitSet(), ImmutableList.of(newLeftInput, newRightInput));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public static JoinNode convertJoin(PhysicalJoin join) {
JoinNode.JoinStrategy joinStrategy;
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
joinStrategy = JoinNode.JoinStrategy.LOOKUP;
} else if (PinotHintOptions.JoinHintOptions.useSortedJoinStrategy(join)) {
joinStrategy = JoinNode.JoinStrategy.SORTED;
} else {
joinStrategy = JoinNode.JoinStrategy.HASH;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.query.planner.physical.v2.opt.rules.AggregatePushdownRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageAggregateRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageBoundaryRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageSortJoinRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeSortInsertRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeWorkerAssignmentRule;
Expand All @@ -43,6 +44,7 @@ public static List<PRelNodeTransformer> create(PhysicalPlannerContext context, T
transformers.add(create(new LeafStageWorkerAssignmentRule(context, tableCache), RuleExecutors.Type.POST_ORDER,
context));
transformers.add(create(new LeafStageAggregateRule(context), RuleExecutors.Type.POST_ORDER, context));
transformers.add(create(new LeafStageSortJoinRule(context), RuleExecutors.Type.POST_ORDER, context));
transformers.add(createWorkerAssignmentRule(context));
transformers.add(create(new AggregatePushdownRule(context), RuleExecutors.Type.POST_ORDER, context));
transformers.add(create(new SortPushdownRule(context), RuleExecutors.Type.POST_ORDER, context));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.apache.pinot.query.planner.physical.v2.opt.rules;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule;
import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRuleCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* If the user has attempted to leverage a Sorted join on top of leaf stage inputs, then this rule
* attempts to add a Sort to the leaf stage to avoid an unnecessary exchange. This only applies when the
* following conditions are met:
* <ul>
* <li>User has explicitly requested a sorted join strategy.</li>
* <li>Both inputs to the join are leaf stage nodes.</li>
* <li>Join is an equi join with only 1 key. This key becomes the sort key for the join.</li>
* <li>Both inputs are already partitioned and satisfy their dist constraints.</li>
* <li>Root of leaf stage is either of table-scan, project or filter.</li>
* </ul>
*/
public class LeafStageSortJoinRule extends PRelOptRule {
private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageSortJoinRule.class);
private final PhysicalPlannerContext _physicalPlannerContext;

public LeafStageSortJoinRule(PhysicalPlannerContext physicalPlannerContext) {
_physicalPlannerContext = physicalPlannerContext;
}

@Override
public boolean matches(PRelOptRuleCall call) {
if (call._currentNode instanceof PhysicalJoin) {
PhysicalJoin join = (PhysicalJoin) call._currentNode;
if (PinotHintOptions.JoinHintOptions.useSortedJoinStrategy(join)) {
return !join.isLeafStage() && join.getPRelInput(0).isLeafStage() && join.getPRelInput(1).isLeafStage();
}
}
return false;
}

@Override
public PRelNode onMatch(PRelOptRuleCall call) {
PhysicalJoin join = (PhysicalJoin) call._currentNode;
Pair<RelCollation, RelCollation> sortConstraints = getSortConstraints(join);
if (sortConstraints == null) {
return join;
}
PRelNode newLeft = addSortToLeafStage(join.getPRelInput(0), sortConstraints.getLeft());
PRelNode newRight = addSortToLeafStage(join.getPRelInput(1), sortConstraints.getRight());
if (newLeft == null || newRight == null) {
return join;
}
return join.with(List.of(newLeft, newRight));
}

@Nullable
private PRelNode addSortToLeafStage(PRelNode node, RelCollation collation) {
if (!node.isLeafStage() || !satisfiesDistTrait(node) || node.unwrap().getTraitSet().getCollation() == null) {
return null;
}
if (node.unwrap() instanceof TableScan || node.unwrap() instanceof Filter || node.unwrap() instanceof Project) {
PinotDataDistribution pdd = node.getPinotDataDistributionOrThrow().withCollation(collation);
return new PhysicalSort(node.unwrap().getCluster(), RelTraitSet.createEmpty(), List.of(), collation, null, null,
node, _physicalPlannerContext.getNodeIdGenerator().get(), pdd, true);
}
return null;
}

private boolean satisfiesDistTrait(PRelNode node) {
PinotDataDistribution pdd = node.getPinotDataDistributionOrThrow();
RelDistribution distConstraint = node.unwrap().getTraitSet().getDistribution();
return pdd.satisfies(distConstraint);
}

@Nullable
private Pair<RelCollation, RelCollation> getSortConstraints(PhysicalJoin join) {
JoinInfo joinInfo = join.analyzeCondition();
if (!joinInfo.isEqui() || joinInfo.leftKeys.size() != 1) {
return null;
}
int leftKey = joinInfo.leftKeys.get(0);
int rightKey = joinInfo.rightKeys.get(0);
RelCollation leftCollation = RelCollations.of(leftKey);
RelCollation rightCollation = RelCollations.of(rightKey);
return Pair.of(leftCollation, rightCollation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ public int hashCode() {
}

public enum JoinStrategy {
HASH, LOOKUP, ASOF
HASH, LOOKUP, ASOF, SORTED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ private static JoinNode.JoinStrategy convertJoinStrategy(Plan.JoinStrategy joinS
return JoinNode.JoinStrategy.LOOKUP;
case AS_OF:
return JoinNode.JoinStrategy.ASOF;
case SORTED:
return JoinNode.JoinStrategy.SORTED;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ private static Plan.JoinStrategy convertJoinStrategy(JoinNode.JoinStrategy joinS
return Plan.JoinStrategy.LOOKUP;
case ASOF:
return Plan.JoinStrategy.AS_OF;
case SORTED:
return Plan.JoinStrategy.SORTED;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ public ObjectNode visitFilter(FilterNode node, Context context) {
public ObjectNode visitJoin(JoinNode node, Context context) {
if (node.getJoinStrategy() == JoinNode.JoinStrategy.HASH) {
return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN, context);
} else if (node.getJoinStrategy() == JoinNode.JoinStrategy.SORTED) {
// TODO: Create state type for SORTED_JOIN
return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN, context);
} else {
assert node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP;
return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN, context);
Expand Down
Loading
Loading