Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
Expand All @@ -48,6 +49,8 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class AlignByDeviceOrderByLimitOffsetTest {
Expand Down Expand Up @@ -322,6 +325,32 @@ public void orderByDeviceTest4() {
* ├──SeriesScanNode-30:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesScanNode-31:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
*/
private static TopKNode findRootTopK(FragmentInstance inst) {
PlanNode root = inst.getFragment().getPlanNodeTree();
if (root.getChildren().isEmpty()) {
return null;
}
PlanNode first = root.getChildren().get(0);
if (first instanceof TopKNode) {
return (TopKNode) first;
}
if (first instanceof ExchangeNode
&& !first.getChildren().isEmpty()
&& first.getChildren().get(0) instanceof TopKNode) {
return (TopKNode) first.getChildren().get(0);
}
return null;
}

private static TopKNode findInnerTopK(TopKNode rootTopK) {
for (PlanNode ch : rootTopK.getChildren()) {
if (ch instanceof TopKNode) {
return (TopKNode) ch;
}
}
return null;
}

@Test
public void orderByTimeTest1() {
// only order by time, no filter
Expand All @@ -334,25 +363,22 @@ public void orderByTimeTest1() {
planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
plan = planner.planFragments();
assertEquals(4, plan.getInstances().size());
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
assertTrue(firstFiRoot instanceof IdentitySinkNode);
assertEquals(4, firstFiRoot.getChildren().get(0).getChildren().size());
PlanNode firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
assertTrue(node instanceof SingleDeviceViewNode);
TopKNode rootTopK = null;
for (FragmentInstance fi : plan.getInstances()) {
rootTopK = findRootTopK(fi);
if (rootTopK != null) break;
}
assertNotNull(rootTopK);
TopKNode innerTopK = findInnerTopK(rootTopK);
assertNotNull(innerTopK);
for (PlanNode sub : innerTopK.getChildren()) {
assertTrue(sub instanceof SingleDeviceViewNode || sub instanceof DeviceViewNode);
}
long exch = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count();
assertEquals(3, exch);
for (FragmentInstance fi : plan.getInstances()) {
assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), LIMIT_VALUE);
}
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
assertScanNodeLimitValue(
plan.getInstances().get(0).getFragment().getPlanNodeTree(), LIMIT_VALUE);
assertScanNodeLimitValue(
plan.getInstances().get(1).getFragment().getPlanNodeTree(), LIMIT_VALUE);
assertScanNodeLimitValue(
plan.getInstances().get(2).getFragment().getPlanNodeTree(), LIMIT_VALUE);
assertScanNodeLimitValue(
plan.getInstances().get(3).getFragment().getPlanNodeTree(), LIMIT_VALUE);
}

/*
Expand Down Expand Up @@ -402,6 +428,10 @@ public void orderByTimeTest1() {
* ├──SeriesScanNode-50:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesScanNode-51:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
*/
private static boolean hasDirectLimit(PlanNode dv) {
return dv.getChildren().stream().anyMatch(c -> c instanceof LimitNode);
}

@Test
public void orderByTimeTest2() {
// only order by time, has value filter
Expand All @@ -415,21 +445,23 @@ public void orderByTimeTest2() {
planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
plan = planner.planFragments();
assertEquals(4, plan.getInstances().size());
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
PlanNode firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
assertTrue(node instanceof SingleDeviceViewNode);
assertTrue(node.getChildren().get(0) instanceof LimitNode);
assertTrue(node.getChildren().get(0).getChildren().get(0) instanceof ProjectNode);
TopKNode rootTopK = null;
for (FragmentInstance fi : plan.getInstances()) {
rootTopK = findRootTopK(fi);
if (rootTopK != null) break;
}
assertNotNull(rootTopK);
TopKNode innerTopK = findInnerTopK(rootTopK);
assertNotNull(innerTopK);
for (PlanNode sub : innerTopK.getChildren()) {
assertTrue(sub instanceof SingleDeviceViewNode || sub instanceof DeviceViewNode);
assertTrue(hasDirectLimit(sub));
}
long exchCnt = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count();
assertEquals(3, exchCnt);
for (FragmentInstance fi : plan.getInstances()) {
assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), 0);
}
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
assertScanNodeLimitValue(plan.getInstances().get(0).getFragment().getPlanNodeTree(), 0);
assertScanNodeLimitValue(plan.getInstances().get(1).getFragment().getPlanNodeTree(), 0);
assertScanNodeLimitValue(plan.getInstances().get(2).getFragment().getPlanNodeTree(), 0);
assertScanNodeLimitValue(plan.getInstances().get(3).getFragment().getPlanNodeTree(), 0);
}

/*
Expand Down Expand Up @@ -483,23 +515,22 @@ public void orderByTimeTest3() {
planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
plan = planner.planFragments();
assertEquals(4, plan.getInstances().size());
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
assertTrue(node instanceof DeviceViewNode);
TopKNode rootTopK = null;
for (FragmentInstance fi : plan.getInstances()) {
rootTopK = findRootTopK(fi);
if (rootTopK != null) break;
}
assertNotNull(rootTopK);
TopKNode innerTopK = findInnerTopK(rootTopK);
assertNotNull(innerTopK);
for (PlanNode sub : innerTopK.getChildren()) {
assertTrue(sub instanceof DeviceViewNode || sub instanceof SingleDeviceViewNode);
assertFalse(hasDirectLimit(sub));
}
assertEquals(3, rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count());
for (FragmentInstance fi : plan.getInstances()) {
assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), LIMIT_VALUE);
}
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
assertScanNodeLimitValue(
plan.getInstances().get(0).getFragment().getPlanNodeTree(), LIMIT_VALUE);
assertScanNodeLimitValue(
plan.getInstances().get(1).getFragment().getPlanNodeTree(), LIMIT_VALUE);
assertScanNodeLimitValue(
plan.getInstances().get(2).getFragment().getPlanNodeTree(), LIMIT_VALUE);
assertScanNodeLimitValue(
plan.getInstances().get(3).getFragment().getPlanNodeTree(), LIMIT_VALUE);

// order by time and expression, has value filter
// need read all data, use DeviceViewNode instead of SingleDeviceViewNode
Expand All @@ -514,24 +545,25 @@ public void orderByTimeTest3() {
planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
plan = planner.planFragments();
assertEquals(4, plan.getInstances().size());
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
assertTrue(node instanceof DeviceViewNode);
assertTrue(node.getChildren().get(0) instanceof LimitNode);
assertTrue(node.getChildren().get(0).getChildren().get(0) instanceof ProjectNode);
rootTopK = null;
for (FragmentInstance fi : plan.getInstances()) {
rootTopK = findRootTopK(fi);
if (rootTopK != null) break;
}
assertNotNull(rootTopK);
innerTopK = findInnerTopK(rootTopK);
assertNotNull(innerTopK);
for (PlanNode sub : innerTopK.getChildren()) {
assertTrue(sub instanceof DeviceViewNode || sub instanceof SingleDeviceViewNode);
assertTrue(hasDirectLimit(sub));
assertTrue(
node.getChildren().get(0).getChildren().get(0).getChildren().get(0)
instanceof LeftOuterTimeJoinNode);
containsNodeType(sub, LeftOuterTimeJoinNode.class)
|| containsNodeType(sub, FullOuterTimeJoinNode.class));
}
assertEquals(3, rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count());
for (FragmentInstance fi : plan.getInstances()) {
assertScanNodeLimitValue(fi.getFragment().getPlanNodeTree(), 0);
}
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
assertScanNodeLimitValue(plan.getInstances().get(0).getFragment().getPlanNodeTree(), 0);
assertScanNodeLimitValue(plan.getInstances().get(1).getFragment().getPlanNodeTree(), 0);
assertScanNodeLimitValue(plan.getInstances().get(2).getFragment().getPlanNodeTree(), 0);
assertScanNodeLimitValue(plan.getInstances().get(3).getFragment().getPlanNodeTree(), 0);
}

/*
Expand Down Expand Up @@ -619,6 +651,15 @@ public void orderByTimeTest4() {
* └──SingleDeviceView-6
* └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
*/
private static TopKNode firstTopKChild(TopKNode parent) {
for (PlanNode ch : parent.getChildren()) {
if (ch instanceof TopKNode) {
return (TopKNode) ch;
}
}
return null;
}

@Test
public void orderByTimeTest5() {
// aggregation + order by time, has LIMIT
Expand All @@ -630,21 +671,30 @@ public void orderByTimeTest5() {
planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
plan = planner.planFragments();
assertEquals(4, plan.getInstances().size());
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
assertEquals(2, firstFiTopNode.getChildren().size());
assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);
assertTrue(
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);
assertTrue(
plan.getInstances().get(3).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof TopKNode);
TopKNode rootTopK = null;
for (FragmentInstance fi : plan.getInstances()) {
rootTopK = findRootTopK(fi);
if (rootTopK != null) break;
}
assertNotNull(rootTopK);

TopKNode innerTopK = firstTopKChild(rootTopK);
assertNotNull(innerTopK); // <-- replaces old index-0 assertion

long exchCnt = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count();
assertTrue(exchCnt >= 1 && exchCnt <= 3);

boolean seenAggScan = false, seenTopK = false;
for (FragmentInstance fi : plan.getInstances()) {
PlanNode first = fi.getFragment().getPlanNodeTree().getChildren().get(0);
if (first instanceof SeriesAggregationScanNode) {
seenAggScan = true;
} else if (first instanceof TopKNode) {
seenTopK = true;
}
}
assertTrue(seenAggScan);
assertTrue(seenTopK);

// aggregation + order by time + group by time, has LIMIT
// SingleDeviceViewNode + TopKNode
Expand All @@ -655,20 +705,29 @@ public void orderByTimeTest5() {
planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
plan = planner.planFragments();
assertEquals(4, plan.getInstances().size());
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);
assertTrue(
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);
assertTrue(
plan.getInstances().get(3).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof TopKNode);
rootTopK = null;
for (FragmentInstance fi : plan.getInstances()) {
rootTopK = findRootTopK(fi);
if (rootTopK != null) break;
}
assertNotNull(rootTopK);

assertNotNull(firstTopKChild(rootTopK)); // inner TopK exists
exchCnt = rootTopK.getChildren().stream().filter(c -> c instanceof ExchangeNode).count();
assertTrue(exchCnt >= 1 && exchCnt <= 3);

seenAggScan = false;
seenTopK = false;
for (FragmentInstance fi : plan.getInstances()) {
PlanNode first = fi.getFragment().getPlanNodeTree().getChildren().get(0);
if (first instanceof SeriesAggregationScanNode) {
seenAggScan = true;
} else if (first instanceof TopKNode) {
seenTopK = true;
}
}
assertTrue(seenAggScan);
assertTrue(seenTopK);
}

/*
Expand Down Expand Up @@ -1121,4 +1180,16 @@ private void assertScanNodeLimitValue(PlanNode root, long limitValue) {
}
}
}

private static boolean containsNodeType(PlanNode root, Class<?> clazz) {
if (clazz.isInstance(root)) {
return true;
}
for (PlanNode child : root.getChildren()) {
if (containsNodeType(child, clazz)) {
return true;
}
}
return false;
}
}