Skip to content

Commit 9967641

Browse files
it-is-a-robotgitee-org
authored andcommitted
!1086 Sort aggregation support for unsorted partitioned table
Merge pull request !1086 from NeerajUnnikrishnan/sortAggr
2 parents baeb788 + b5e02d4 commit 9967641

27 files changed

+259
-61
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_net_loss
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk_bucket8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_net_loss
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk_bucket1_sort
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_net_loss
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk_bucket8_sort
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_net_loss

presto-benchto-benchmarks/src/test/java/io/prestosql/sql/planner/TestSortBasedAggregationPlan.java

+30-4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public TestSortBasedAggregationPlan()
7777

7878
Map<String, String> hiveCatalogConfig = ImmutableMap.<String, String>builder()
7979
.put("hive.orc-predicate-pushdown-enabled", "false")
80+
.put("hive.max-partitions-per-writers", "9999")
8081
.build();
8182

8283
queryRunner.createCatalog(catalog, hiveConnectorFactory, hiveCatalogConfig);
@@ -118,14 +119,38 @@ public TestSortBasedAggregationPlan()
118119
"bucketed_by=array['wr_return_quantity'], bucket_count = 1, " +
119120
"partitioned_by = ARRAY['wr_net_loss'], " +
120121
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk']) " +
121-
"as select * from tpcds.tiny.web_returns limit 100");
122+
"as select * from tpcds.tiny.web_returns limit 500");
122123

123124
queryRunner.execute("create table web_returns_partition_bucketCount32 with (" +
124125
"bucketed_by=array['wr_return_quantity'], bucket_count = 1, " +
125126
"partitioned_by = ARRAY['wr_net_loss'], " +
126127
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk']) " +
127-
"as select * from tpcds.tiny.web_returns limit 100");
128-
128+
"as select * from tpcds.tiny.web_returns limit 1000");
129+
130+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk with (" +
131+
"transactional = false, format='orc', " +
132+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss']) " +
133+
"as select * from tpcds.tiny.web_returns limit 500");
134+
135+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk_bucket8 with (" +
136+
"transactional = false, format='orc', " +
137+
"bucket_count = 8, bucketed_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
138+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss']) " +
139+
"as select * from tpcds.tiny.web_returns limit 500");
140+
141+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk_bucket8_sort with (" +
142+
"transactional = false, format='orc', " +
143+
"bucket_count = 8, bucketed_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
144+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss'], " +
145+
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk'])" +
146+
"as select * from tpcds.tiny.web_returns limit 500");
147+
148+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk_bucket1_sort with (" +
149+
"transactional = false, format='orc', " +
150+
"bucket_count = 1, bucketed_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
151+
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
152+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss']) " +
153+
"as select * from tpcds.tiny.web_returns limit 500");
129154
return queryRunner;
130155
}, false, false, false, true);
131156
}
@@ -136,7 +161,8 @@ protected Stream<String> getQueryResourcePaths()
136161
return Stream.of("q_InnerJoin", "q_leftJoin", "q_rightjoin", "q_rightjoin_wrong_order", "q_Inner_LeftJoin", "q_sort_groupby_notsameOrder",
137162
"q_sort_groupby_notsameOrder1", "q_groupByHavingMore", "q_InnerJoinWrongOrder", "q_InnerJoinLessCriterias",
138163
"q_InnerJoinGroupLessCriterias", "q_bucketAndSortDifferentOrder", "BucketAndGroupSameSortIsMore", "BucketGroupAreDifferent",
139-
"groupsAreMoreThanBucket", "ColNameEndWithInt", "q_PartitionBucketCount1", "q_PartitionBucketCount32")
164+
"groupsAreMoreThanBucket", "ColNameEndWithInt", "q_sortedPartitionBucketCount1", "q_sortedPartitionBucketCount32",
165+
"q_partition2Colu", "q_partition2ColuBucket8", "q_useOnlyPartition2ColuBucket8OnSortTable", "q_useOnlyPartition2ColuBucket1OnSortTable")
140166
.flatMap(i -> {
141167
String queryId = format("%s", i);
142168
System.out.println("query ID: " + queryId);

presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java

+48-30
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
4848
import io.prestosql.plugin.hive.util.ConfigurationUtils;
4949
import io.prestosql.plugin.hive.util.Statistics;
50+
import io.prestosql.spi.PartialAndFinalAggregationType;
5051
import io.prestosql.spi.PrestoException;
5152
import io.prestosql.spi.block.Block;
5253
import io.prestosql.spi.connector.ColumnHandle;
@@ -3129,36 +3130,44 @@ public List<ConnectorVacuumTableInfo> getTablesForVacuum()
31293130
}
31303131

31313132
@Override
3132-
public boolean canPerformSortBasedAggregation(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> groupKeyNames)
3133+
public PartialAndFinalAggregationType validateAndGetSortAggregationType(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> groupKeyNames)
31333134
{
3135+
PartialAndFinalAggregationType partialAndFinalAggregationType = new PartialAndFinalAggregationType();
31343136
ConnectorTableMetadata connectorTableMetadata = getTableMetadata(session, ((HiveTableHandle) tableHandle).getSchemaTableName());
31353137
List<SortingColumn> sortingColumn = (List<SortingColumn>) connectorTableMetadata.getProperties().get(HiveTableProperties.SORTED_BY_PROPERTY);
3136-
if ((sortingColumn == null) || (sortingColumn.size() == 0)) {
3137-
return false;
3138-
}
3138+
boolean isSortingColumnsNotPresent = (sortingColumn == null) || (sortingColumn.size() == 0);
3139+
31393140
List<String> partitionedBy = new ArrayList<>();
3140-
List<String> partitionedBytemp = (List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.PARTITIONED_BY_PROPERTY);
3141-
if (null != partitionedBytemp) {
3142-
partitionedBy.addAll(partitionedBytemp);
3141+
List<String> partitionedByTemp = (List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.PARTITIONED_BY_PROPERTY);
3142+
if ((partitionedByTemp != null) && (partitionedByTemp.size() != 0)) {
3143+
partitionedBy.addAll(partitionedByTemp);
3144+
if (isSortingColumnsNotPresent && (partitionedByTemp.size() != groupKeyNames.size())) {
3145+
return partialAndFinalAggregationType;
3146+
}
3147+
}
3148+
else if (isSortingColumnsNotPresent) {
3149+
return partialAndFinalAggregationType;
31433150
}
31443151
int bucketCount = 0;
3145-
List<String> bucketedColumns = (List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKETED_BY_PROPERTY);
3146-
if (null != bucketedColumns) {
3147-
bucketCount = (int) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKET_COUNT_PROPERTY);
3152+
List<String> bucketedColumns = new ArrayList<>();
3153+
if (!isSortingColumnsNotPresent) {
3154+
bucketedColumns.addAll((List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKETED_BY_PROPERTY));
3155+
if (null != bucketedColumns) {
3156+
bucketCount = (int) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKET_COUNT_PROPERTY);
3157+
}
31483158
}
31493159

3150-
List<String> sortedColumnNames = sortingColumn.stream().map(column -> column.getColumnName()).collect(Collectors.toList());
3151-
List<String> partitionAndSortedColumnNames = new ArrayList<>();
3152-
// when there is partition by , sorted data is arranged 'partition by' followed by 'sorted by'
3153-
partitionAndSortedColumnNames.addAll(partitionedBy);
3154-
partitionAndSortedColumnNames.addAll(sortedColumnNames);
3160+
List<String> sortedColumnNames = new ArrayList<>();
3161+
if ((sortingColumn != null) && (sortingColumn.size() != 0)) {
3162+
sortedColumnNames.addAll(sortingColumn.stream().map(column -> column.getColumnName()).collect(Collectors.toList()));
3163+
}
31553164

31563165
//grouping key should be sub set of sorted By and it should match all partition by columns
3157-
if ((sortedColumnNames.size() + partitionedBy.size() < groupKeyNames.size()) ||
3166+
if ((partitionedBy.size() + sortedColumnNames.size() < groupKeyNames.size()) ||
31583167
(partitionedBy.size() > groupKeyNames.size())) {
31593168
//sorted columns are less than join criteria columns
31603169
log.debug("number of sorted columns " + sortedColumnNames.size() + "are less join column size " + groupKeyNames.size());
3161-
return false;
3170+
return partialAndFinalAggregationType;
31623171
}
31633172

31643173
// bucketby columns and groupby Columns should be same.
@@ -3169,43 +3178,52 @@ public boolean canPerformSortBasedAggregation(ConnectorSession session, Connecto
31693178
(groupKeyNames.get(partitionedByCount).equals(bucketedColumns.get(0)))) || (bucketCount == 0));
31703179

31713180
if ((bucketCount == 1) && (bucketedColumns.size() > 1)) {
3172-
boolean notMatching = false;
31733181
int minSize = Math.min(groupKeyNames.size() - partitionedBy.size(), bucketedColumns.size());
31743182
int partSize = partitionedBy.size();
31753183
for (int keyIdx = 0; keyIdx < minSize; keyIdx++) {
31763184
if (!groupKeyNames.get(keyIdx + partSize).equals(bucketedColumns.get(keyIdx))) {
3177-
notMatching = true;
3178-
break;
3185+
return partialAndFinalAggregationType;
31793186
}
31803187
}
3181-
if (!notMatching) {
3182-
singleOrZeroBucketedColumn = true;
3188+
singleOrZeroBucketedColumn = true;
3189+
}
3190+
3191+
for (int numOfComparedKeys = 0; numOfComparedKeys < partitionedBy.size(); numOfComparedKeys++) {
3192+
if ((!groupKeyNames.get(numOfComparedKeys).equals(partitionedBy.get(numOfComparedKeys)))) {
3193+
return partialAndFinalAggregationType;
31833194
}
31843195
}
3196+
3197+
if (groupKeyNames.size() == partitionedBy.size()) {
3198+
partialAndFinalAggregationType.setPartialAsSortAndFinalAsHashAggregation(true);
3199+
return partialAndFinalAggregationType;
3200+
}
3201+
31853202
if (singleOrZeroBucketedColumn || (groupKeyNames.size() == (bucketedColumns.size() + partitionedBy.size()))) {
3186-
for (int numOfComparedKeys = 0; numOfComparedKeys < groupKeyNames.size(); numOfComparedKeys++) {
3187-
boolean bucketedColumnsResult = numOfComparedKeys < partitionedBy.size() ? false : !singleOrZeroBucketedColumn &&
3188-
(!groupKeyNames.get(numOfComparedKeys).equals(bucketedColumns.get(numOfComparedKeys - partitionedBy.size())));
3189-
if ((!groupKeyNames.get(numOfComparedKeys).equals(partitionAndSortedColumnNames.get(numOfComparedKeys))) ||
3203+
int numOfCmpKeysAfterPartitionedBy = partitionedBy.size();
3204+
for (int numOfComparedKeys = 0; numOfComparedKeys < groupKeyNames.size() - partitionedBy.size(); numOfComparedKeys++, numOfCmpKeysAfterPartitionedBy++) {
3205+
boolean bucketedColumnsResult = !singleOrZeroBucketedColumn && (!groupKeyNames.get(numOfComparedKeys).equals(bucketedColumns.get(numOfComparedKeys)));
3206+
if ((!groupKeyNames.get(numOfCmpKeysAfterPartitionedBy).equals(sortedColumnNames.get(numOfComparedKeys))) ||
31903207
(!singleOrZeroBucketedColumn && bucketedColumnsResult)) {
31913208
if (log.isDebugEnabled()) {
31923209
final String[] dbgGroupKeyNames = {new String("")};
31933210
groupKeyNames.stream().forEach(k -> dbgGroupKeyNames[0] = dbgGroupKeyNames[0].concat(k + " , "));
31943211
final String[] dbgSortedColumnNames = {new String("")};
3195-
partitionAndSortedColumnNames.stream().forEach(k -> dbgSortedColumnNames[0] = dbgSortedColumnNames[0].concat(k + " , "));
3212+
sortedColumnNames.stream().forEach(k -> dbgSortedColumnNames[0] = dbgSortedColumnNames[0].concat(k + " , "));
31963213
if ((null != bucketedColumns) && (bucketedColumns.size() > 0)) {
31973214
final String[] dbgbucketedColumns = {new String("")};
31983215
bucketedColumns.stream().forEach(k -> dbgbucketedColumns[0] = dbgbucketedColumns[0].concat(k + " , "));
31993216
log.debug("Not matching sortedColumnNames: " + dbgSortedColumnNames + " group columns name: " + dbgGroupKeyNames + " bucketedColumns :" + dbgbucketedColumns);
32003217
}
32013218
log.debug("Not matching sortedColumnNames: " + dbgSortedColumnNames + " group columns name: " + dbgGroupKeyNames);
32023219
}
3203-
return false;
3220+
return partialAndFinalAggregationType;
32043221
}
32053222
}
3206-
return true;
3223+
partialAndFinalAggregationType.setSortAggregation(true);
3224+
return partialAndFinalAggregationType;
32073225
}
3208-
return false;
3226+
return partialAndFinalAggregationType;
32093227
}
32103228

32113229
@Override

presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java

+46
Original file line numberDiff line numberDiff line change
@@ -6609,6 +6609,52 @@ public void testCachedPlanForTablesWithSameName()
66096609
assertUpdate(String.format("DROP TABLE %s.%s", schema, table));
66106610
}
66116611

6612+
@Test
6613+
public void UnSortAggrePartitionBucketCount1()
6614+
{
6615+
initSortBasedAggregation();
6616+
computeActual("create table lineitem_partition_shipmode_comment_bucket1 with(transactional = false, " +
6617+
"format = 'ORC', partitioned_by = ARRAY['shipmode', 'comment'], bucketed_by=array['orderkey', 'partkey'], bucket_count=1)" +
6618+
" as select * from tpch.tiny.lineitem limit 100");
6619+
6620+
String query = "select lineitem_partition_shipmode_comment_bucket1.shipmode, lineitem_partition_shipmode_comment_bucket1.comment " +
6621+
"from lineitem_partition_shipmode_comment_bucket1 " +
6622+
"group by shipmode, comment " +
6623+
"order by shipmode, comment ";
6624+
6625+
MaterializedResult sortResult = computeActual(testSessionSort, query);
6626+
MaterializedResult hashResult = computeActual(query);
6627+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6628+
sortResult = computeActual(testSessionSortPrcntDrv50, query);
6629+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6630+
sortResult = computeActual(testSessionSortPrcntDrv40, query);
6631+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6632+
assertUpdate("DROP TABLE lineitem_partition_shipmode_comment_bucket1");
6633+
}
6634+
6635+
@Test
6636+
public void SortAggreGroupOnlyPartitionColumns()
6637+
{
6638+
initSortBasedAggregation();
6639+
computeActual("create table lineitem_sort_partition_shipmode_comment_bucket1 with(transactional = false, " +
6640+
"format = 'ORC', partitioned_by = ARRAY['shipmode', 'comment'], bucketed_by=array['orderkey', 'partkey'], bucket_count=1, sorted_by = ARRAY['orderkey', 'partkey'])" +
6641+
" as select * from tpch.tiny.lineitem limit 100");
6642+
6643+
String query = "select lineitem_sort_partition_shipmode_comment_bucket1.shipmode, lineitem_sort_partition_shipmode_comment_bucket1.comment " +
6644+
"from lineitem_sort_partition_shipmode_comment_bucket1 " +
6645+
"group by shipmode, comment " +
6646+
"order by shipmode, comment ";
6647+
6648+
MaterializedResult sortResult = computeActual(testSessionSort, query);
6649+
MaterializedResult hashResult = computeActual(query);
6650+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6651+
sortResult = computeActual(testSessionSortPrcntDrv50, query);
6652+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6653+
sortResult = computeActual(testSessionSortPrcntDrv40, query);
6654+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6655+
assertUpdate("DROP TABLE lineitem_sort_partition_shipmode_comment_bucket1");
6656+
}
6657+
66126658
@Test
66136659
public void testAcidFormatColumnNameConflict()
66146660
{

0 commit comments

Comments
 (0)