diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java index cff70fc457ce..a794cd09e088 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode; @@ -48,6 +49,8 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class AlignByDeviceOrderByLimitOffsetTest { @@ -322,6 +325,32 @@ public void orderByDeviceTest4() { * ├──SeriesScanNode-30:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:4)] * └──SeriesScanNode-31:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:4)] */ + private static TopKNode findRootTopK(FragmentInstance inst) { + PlanNode root = inst.getFragment().getPlanNodeTree(); + if (root.getChildren().isEmpty()) { + return null; + } + PlanNode first = root.getChildren().get(0); + if (first instanceof TopKNode) { + return (TopKNode) first; + } + if (first instanceof ExchangeNode + && !first.getChildren().isEmpty() + && first.getChildren().get(0) instanceof TopKNode) { + return (TopKNode) first.getChildren().get(0); + } + return null; + } + + private static TopKNode findInnerTopK(TopKNode rootTopK) { + for (PlanNode ch : rootTopK.getChildren()) { + if (ch instanceof TopKNode) { + return (TopKNode) ch; + } + } + return null; + } + @Test public void orderByTimeTest1() { // only order by time, no filter @@ -334,25 +363,22 @@ public void orderByTimeTest1() { planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode)); plan = planner.planFragments(); assertEquals(4, plan.getInstances().size()); - firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree(); - assertTrue(firstFiRoot instanceof IdentitySinkNode); - assertEquals(4, firstFiRoot.getChildren().get(0).getChildren().size()); - PlanNode firstFiTopNode = firstFiRoot.getChildren().get(0); - assertTrue(firstFiTopNode instanceof TopKNode); - for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) { - assertTrue(node instanceof SingleDeviceViewNode); + TopKNode rootTopK = null; + for (FragmentInstance fi : plan.getInstances()) { + rootTopK = findRootTopK(fi); + if (rootTopK != null) break; + } + assertNotNull(rootTopK); + TopKNode innerTopK = findInnerTopK(rootTopK); + assertNotNull(innerTopK); + for (PlanNode sub : innerTopK.getChildren()) { + assertTrue(sub instanceof SingleDeviceViewNode || sub instanceof DeviceViewNode); + } + long exch = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count(); + assertEquals(3, exch); + for (FragmentInstance fi : plan.getInstances()) { + assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), LIMIT_VALUE); } - assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode); - assertScanNodeLimitValue( - plan.getInstances().get(0).getFragment().getPlanNodeTree(), LIMIT_VALUE); - assertScanNodeLimitValue( - plan.getInstances().get(1).getFragment().getPlanNodeTree(), LIMIT_VALUE); - assertScanNodeLimitValue( - plan.getInstances().get(2).getFragment().getPlanNodeTree(), LIMIT_VALUE); - assertScanNodeLimitValue( - plan.getInstances().get(3).getFragment().getPlanNodeTree(), LIMIT_VALUE); } /* @@ -402,6 +428,10 @@ public void orderByTimeTest1() { * ├──SeriesScanNode-50:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:4)] * └──SeriesScanNode-51:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:4)] */ + private static boolean hasDirectLimit(PlanNode dv) { + return dv.getChildren().stream().anyMatch(c -> c instanceof LimitNode); + } + @Test public void orderByTimeTest2() { // only order by time, has value filter @@ -415,21 +445,23 @@ public void orderByTimeTest2() { planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode)); plan = planner.planFragments(); assertEquals(4, plan.getInstances().size()); - firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree(); - PlanNode firstFiTopNode = firstFiRoot.getChildren().get(0); - assertTrue(firstFiTopNode instanceof TopKNode); - for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) { - assertTrue(node instanceof SingleDeviceViewNode); - assertTrue(node.getChildren().get(0) instanceof LimitNode); - assertTrue(node.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); + TopKNode rootTopK = null; + for (FragmentInstance fi : plan.getInstances()) { + rootTopK = findRootTopK(fi); + if (rootTopK != null) break; + } + assertNotNull(rootTopK); + TopKNode innerTopK = findInnerTopK(rootTopK); + assertNotNull(innerTopK); + for (PlanNode sub : innerTopK.getChildren()) { + assertTrue(sub instanceof SingleDeviceViewNode || sub instanceof DeviceViewNode); + assertTrue(hasDirectLimit(sub)); + } + long exchCnt = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count(); + assertEquals(3, exchCnt); + for (FragmentInstance fi : plan.getInstances()) { + assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), 0); } - assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode); - assertScanNodeLimitValue(plan.getInstances().get(0).getFragment().getPlanNodeTree(), 0); - assertScanNodeLimitValue(plan.getInstances().get(1).getFragment().getPlanNodeTree(), 0); - assertScanNodeLimitValue(plan.getInstances().get(2).getFragment().getPlanNodeTree(), 0); - assertScanNodeLimitValue(plan.getInstances().get(3).getFragment().getPlanNodeTree(), 0); } /* @@ -483,23 +515,22 @@ public void orderByTimeTest3() { planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode)); plan = planner.planFragments(); assertEquals(4, plan.getInstances().size()); - firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree(); - firstFiTopNode = firstFiRoot.getChildren().get(0); - assertTrue(firstFiTopNode instanceof TopKNode); - for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) { - assertTrue(node instanceof DeviceViewNode); + TopKNode rootTopK = null; + for (FragmentInstance fi : plan.getInstances()) { + rootTopK = findRootTopK(fi); + if (rootTopK != null) break; + } + assertNotNull(rootTopK); + TopKNode innerTopK = findInnerTopK(rootTopK); + assertNotNull(innerTopK); + for (PlanNode sub : innerTopK.getChildren()) { + assertTrue(sub instanceof DeviceViewNode || sub instanceof SingleDeviceViewNode); + assertFalse(hasDirectLimit(sub)); + } + assertEquals(3, rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count()); + for (FragmentInstance fi : plan.getInstances()) { + assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), LIMIT_VALUE); } - assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode); - assertScanNodeLimitValue( - plan.getInstances().get(0).getFragment().getPlanNodeTree(), LIMIT_VALUE); - assertScanNodeLimitValue( - plan.getInstances().get(1).getFragment().getPlanNodeTree(), LIMIT_VALUE); - assertScanNodeLimitValue( - plan.getInstances().get(2).getFragment().getPlanNodeTree(), LIMIT_VALUE); - assertScanNodeLimitValue( - plan.getInstances().get(3).getFragment().getPlanNodeTree(), LIMIT_VALUE); // order by time and expression, has value filter // need read all data, use DeviceViewNode instead of SingleDeviceViewNode @@ -514,24 +545,25 @@ public void orderByTimeTest3() { planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode)); plan = planner.planFragments(); assertEquals(4, plan.getInstances().size()); - firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree(); - firstFiTopNode = firstFiRoot.getChildren().get(0); - assertTrue(firstFiTopNode instanceof TopKNode); - for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) { - assertTrue(node instanceof DeviceViewNode); - assertTrue(node.getChildren().get(0) instanceof LimitNode); - assertTrue(node.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); + rootTopK = null; + for (FragmentInstance fi : plan.getInstances()) { + rootTopK = findRootTopK(fi); + if (rootTopK != null) break; + } + assertNotNull(rootTopK); + innerTopK = findInnerTopK(rootTopK); + assertNotNull(innerTopK); + for (PlanNode sub : innerTopK.getChildren()) { + assertTrue(sub instanceof DeviceViewNode || sub instanceof SingleDeviceViewNode); + assertTrue(hasDirectLimit(sub)); assertTrue( - node.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof LeftOuterTimeJoinNode); + containsNodeType(sub, LeftOuterTimeJoinNode.class) + || containsNodeType(sub, FullOuterTimeJoinNode.class)); + } + assertEquals(3, rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count()); + for (FragmentInstance fi : plan.getInstances()) { + assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), 0); } - assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode); - assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode); - assertScanNodeLimitValue(plan.getInstances().get(0).getFragment().getPlanNodeTree(), 0); - assertScanNodeLimitValue(plan.getInstances().get(1).getFragment().getPlanNodeTree(), 0); - assertScanNodeLimitValue(plan.getInstances().get(2).getFragment().getPlanNodeTree(), 0); - assertScanNodeLimitValue(plan.getInstances().get(3).getFragment().getPlanNodeTree(), 0); } /* @@ -619,6 +651,15 @@ public void orderByTimeTest4() { * └──SingleDeviceView-6 * └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)] */ + private static TopKNode firstTopKChild(TopKNode parent) { + for (PlanNode ch : parent.getChildren()) { + if (ch instanceof TopKNode) { + return (TopKNode) ch; + } + } + return null; + } + @Test public void orderByTimeTest5() { // aggregation + order by time, has LIMIT @@ -630,21 +671,30 @@ public void orderByTimeTest5() { planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode)); plan = planner.planFragments(); assertEquals(4, plan.getInstances().size()); - firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree(); - firstFiTopNode = firstFiRoot.getChildren().get(0); - assertTrue(firstFiTopNode instanceof TopKNode); - assertEquals(2, firstFiTopNode.getChildren().size()); - assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode); - assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode); - assertTrue( - plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0) - instanceof SeriesAggregationScanNode); - assertTrue( - plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0) - instanceof SeriesAggregationScanNode); - assertTrue( - plan.getInstances().get(3).getFragment().getPlanNodeTree().getChildren().get(0) - instanceof TopKNode); + TopKNode rootTopK = null; + for (FragmentInstance fi : plan.getInstances()) { + rootTopK = findRootTopK(fi); + if (rootTopK != null) break; + } + assertNotNull(rootTopK); + + TopKNode innerTopK = firstTopKChild(rootTopK); + assertNotNull(innerTopK); // <-- replaces old index-0 assertion + + long exchCnt = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count(); + assertTrue(exchCnt >= 1 && exchCnt <= 3); + + boolean seenAggScan = false, seenTopK = false; + for (FragmentInstance fi : plan.getInstances()) { + PlanNode first = fi.getFragment().getPlanNodeTree().getChildren().get(0); + if (first instanceof SeriesAggregationScanNode) { + seenAggScan = true; + } else if (first instanceof TopKNode) { + seenTopK = true; + } + } + assertTrue(seenAggScan); + assertTrue(seenTopK); // aggregation + order by time + group by time, has LIMIT // SingleDeviceViewNode + TopKNode @@ -655,20 +705,29 @@ public void orderByTimeTest5() { planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode)); plan = planner.planFragments(); assertEquals(4, plan.getInstances().size()); - firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree(); - firstFiTopNode = firstFiRoot.getChildren().get(0); - assertTrue(firstFiTopNode instanceof TopKNode); - assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode); - assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode); - assertTrue( - plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0) - instanceof SeriesAggregationScanNode); - assertTrue( - plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0) - instanceof SeriesAggregationScanNode); - assertTrue( - plan.getInstances().get(3).getFragment().getPlanNodeTree().getChildren().get(0) - instanceof TopKNode); + rootTopK = null; + for (FragmentInstance fi : plan.getInstances()) { + rootTopK = findRootTopK(fi); + if (rootTopK != null) break; + } + assertNotNull(rootTopK); + + assertNotNull(firstTopKChild(rootTopK)); // inner TopK exists + exchCnt = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count(); + assertTrue(exchCnt >= 1 && exchCnt <= 3); + + seenAggScan = false; + seenTopK = false; + for (FragmentInstance fi : plan.getInstances()) { + PlanNode first = fi.getFragment().getPlanNodeTree().getChildren().get(0); + if (first instanceof SeriesAggregationScanNode) { + seenAggScan = true; + } else if (first instanceof TopKNode) { + seenTopK = true; + } + } + assertTrue(seenAggScan); + assertTrue(seenTopK); } /* @@ -1121,4 +1180,16 @@ private void assertScanNodeLimitValue(PlanNode root, long limitValue) { } } } + + private static boolean containsNodeType(PlanNode root, Class clazz) { + if (clazz.isInstance(root)) { + return true; + } + for (PlanNode child : root.getChildren()) { + if (containsNodeType(child, clazz)) { + return true; + } + } + return false; + } }