Skip to content

Commit b5e02d4

Browse files
Sort aggregation support for unsorted partitioned table
1 parent 69f00c5 commit b5e02d4

27 files changed

+259
-61
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk.wr_net_loss
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk_bucket8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8.wr_net_loss
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk_bucket1_sort
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket1_sort.wr_net_loss
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
remote exchange (GATHER, SINGLE, [])
2+
final hashaggregation over (wr_account_credit, wr_net_loss)
3+
local exchange (GATHER, SINGLE, [])
4+
remote exchange (REPARTITION, HASH, ["wr_account_credit", "wr_net_loss"])
5+
partial sortaggregate over (wr_account_credit, wr_net_loss)
6+
scan web_returns_partiotion_netloss_returneddatesk_bucket8_sort
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
select count(${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_net_loss),${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_net_loss
2+
from ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort
3+
group by ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_account_credit, ${database}.${schema}.web_returns_partiotion_netloss_returneddatesk_bucket8_sort.wr_net_loss

presto-benchto-benchmarks/src/test/java/io/prestosql/sql/planner/TestSortBasedAggregationPlan.java

+30-4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public TestSortBasedAggregationPlan()
7777

7878
Map<String, String> hiveCatalogConfig = ImmutableMap.<String, String>builder()
7979
.put("hive.orc-predicate-pushdown-enabled", "false")
80+
.put("hive.max-partitions-per-writers", "9999")
8081
.build();
8182

8283
queryRunner.createCatalog(catalog, hiveConnectorFactory, hiveCatalogConfig);
@@ -118,14 +119,38 @@ public TestSortBasedAggregationPlan()
118119
"bucketed_by=array['wr_return_quantity'], bucket_count = 1, " +
119120
"partitioned_by = ARRAY['wr_net_loss'], " +
120121
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk']) " +
121-
"as select * from tpcds.tiny.web_returns limit 100");
122+
"as select * from tpcds.tiny.web_returns limit 500");
122123

123124
queryRunner.execute("create table web_returns_partition_bucketCount32 with (" +
124125
"bucketed_by=array['wr_return_quantity'], bucket_count = 1, " +
125126
"partitioned_by = ARRAY['wr_net_loss'], " +
126127
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk']) " +
127-
"as select * from tpcds.tiny.web_returns limit 100");
128-
128+
"as select * from tpcds.tiny.web_returns limit 1000");
129+
130+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk with (" +
131+
"transactional = false, format='orc', " +
132+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss']) " +
133+
"as select * from tpcds.tiny.web_returns limit 500");
134+
135+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk_bucket8 with (" +
136+
"transactional = false, format='orc', " +
137+
"bucket_count = 8, bucketed_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
138+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss']) " +
139+
"as select * from tpcds.tiny.web_returns limit 500");
140+
141+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk_bucket8_sort with (" +
142+
"transactional = false, format='orc', " +
143+
"bucket_count = 8, bucketed_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
144+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss'], " +
145+
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk'])" +
146+
"as select * from tpcds.tiny.web_returns limit 500");
147+
148+
queryRunner.execute("create table web_returns_partiotion_netloss_returneddatesk_bucket1_sort with (" +
149+
"transactional = false, format='orc', " +
150+
"bucket_count = 1, bucketed_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
151+
"sorted_by = ARRAY['wr_return_quantity','wr_returned_time_sk'], " +
152+
"partitioned_by = ARRAY['wr_account_credit', 'wr_net_loss']) " +
153+
"as select * from tpcds.tiny.web_returns limit 500");
129154
return queryRunner;
130155
}, false, false, false, true);
131156
}
@@ -136,7 +161,8 @@ protected Stream<String> getQueryResourcePaths()
136161
return Stream.of("q_InnerJoin", "q_leftJoin", "q_rightjoin", "q_rightjoin_wrong_order", "q_Inner_LeftJoin", "q_sort_groupby_notsameOrder",
137162
"q_sort_groupby_notsameOrder1", "q_groupByHavingMore", "q_InnerJoinWrongOrder", "q_InnerJoinLessCriterias",
138163
"q_InnerJoinGroupLessCriterias", "q_bucketAndSortDifferentOrder", "BucketAndGroupSameSortIsMore", "BucketGroupAreDifferent",
139-
"groupsAreMoreThanBucket", "ColNameEndWithInt", "q_PartitionBucketCount1", "q_PartitionBucketCount32")
164+
"groupsAreMoreThanBucket", "ColNameEndWithInt", "q_sortedPartitionBucketCount1", "q_sortedPartitionBucketCount32",
165+
"q_partition2Colu", "q_partition2ColuBucket8", "q_useOnlyPartition2ColuBucket8OnSortTable", "q_useOnlyPartition2ColuBucket1OnSortTable")
140166
.flatMap(i -> {
141167
String queryId = format("%s", i);
142168
System.out.println("query ID: " + queryId);

presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java

+48-30
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
4848
import io.prestosql.plugin.hive.util.ConfigurationUtils;
4949
import io.prestosql.plugin.hive.util.Statistics;
50+
import io.prestosql.spi.PartialAndFinalAggregationType;
5051
import io.prestosql.spi.PrestoException;
5152
import io.prestosql.spi.block.Block;
5253
import io.prestosql.spi.connector.ColumnHandle;
@@ -3127,36 +3128,44 @@ public List<ConnectorVacuumTableInfo> getTablesForVacuum()
31273128
}
31283129

31293130
@Override
3130-
public boolean canPerformSortBasedAggregation(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> groupKeyNames)
3131+
public PartialAndFinalAggregationType validateAndGetSortAggregationType(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> groupKeyNames)
31313132
{
3133+
PartialAndFinalAggregationType partialAndFinalAggregationType = new PartialAndFinalAggregationType();
31323134
ConnectorTableMetadata connectorTableMetadata = getTableMetadata(session, ((HiveTableHandle) tableHandle).getSchemaTableName());
31333135
List<SortingColumn> sortingColumn = (List<SortingColumn>) connectorTableMetadata.getProperties().get(HiveTableProperties.SORTED_BY_PROPERTY);
3134-
if ((sortingColumn == null) || (sortingColumn.size() == 0)) {
3135-
return false;
3136-
}
3136+
boolean isSortingColumnsNotPresent = (sortingColumn == null) || (sortingColumn.size() == 0);
3137+
31373138
List<String> partitionedBy = new ArrayList<>();
3138-
List<String> partitionedBytemp = (List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.PARTITIONED_BY_PROPERTY);
3139-
if (null != partitionedBytemp) {
3140-
partitionedBy.addAll(partitionedBytemp);
3139+
List<String> partitionedByTemp = (List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.PARTITIONED_BY_PROPERTY);
3140+
if ((partitionedByTemp != null) && (partitionedByTemp.size() != 0)) {
3141+
partitionedBy.addAll(partitionedByTemp);
3142+
if (isSortingColumnsNotPresent && (partitionedByTemp.size() != groupKeyNames.size())) {
3143+
return partialAndFinalAggregationType;
3144+
}
3145+
}
3146+
else if (isSortingColumnsNotPresent) {
3147+
return partialAndFinalAggregationType;
31413148
}
31423149
int bucketCount = 0;
3143-
List<String> bucketedColumns = (List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKETED_BY_PROPERTY);
3144-
if (null != bucketedColumns) {
3145-
bucketCount = (int) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKET_COUNT_PROPERTY);
3150+
List<String> bucketedColumns = new ArrayList<>();
3151+
if (!isSortingColumnsNotPresent) {
3152+
bucketedColumns.addAll((List<String>) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKETED_BY_PROPERTY));
3153+
if (null != bucketedColumns) {
3154+
bucketCount = (int) connectorTableMetadata.getProperties().get(HiveTableProperties.BUCKET_COUNT_PROPERTY);
3155+
}
31463156
}
31473157

3148-
List<String> sortedColumnNames = sortingColumn.stream().map(column -> column.getColumnName()).collect(Collectors.toList());
3149-
List<String> partitionAndSortedColumnNames = new ArrayList<>();
3150-
// when there is partition by , sorted data is arranged 'partition by' followed by 'sorted by'
3151-
partitionAndSortedColumnNames.addAll(partitionedBy);
3152-
partitionAndSortedColumnNames.addAll(sortedColumnNames);
3158+
List<String> sortedColumnNames = new ArrayList<>();
3159+
if ((sortingColumn != null) && (sortingColumn.size() != 0)) {
3160+
sortedColumnNames.addAll(sortingColumn.stream().map(column -> column.getColumnName()).collect(Collectors.toList()));
3161+
}
31533162

31543163
//grouping key should be sub set of sorted By and it should match all partition by columns
3155-
if ((sortedColumnNames.size() + partitionedBy.size() < groupKeyNames.size()) ||
3164+
if ((partitionedBy.size() + sortedColumnNames.size() < groupKeyNames.size()) ||
31563165
(partitionedBy.size() > groupKeyNames.size())) {
31573166
//sorted columns are less than join criteria columns
31583167
log.debug("number of sorted columns " + sortedColumnNames.size() + "are less join column size " + groupKeyNames.size());
3159-
return false;
3168+
return partialAndFinalAggregationType;
31603169
}
31613170

31623171
// bucketby columns and groupby Columns should be same.
@@ -3167,43 +3176,52 @@ public boolean canPerformSortBasedAggregation(ConnectorSession session, Connecto
31673176
(groupKeyNames.get(partitionedByCount).equals(bucketedColumns.get(0)))) || (bucketCount == 0));
31683177

31693178
if ((bucketCount == 1) && (bucketedColumns.size() > 1)) {
3170-
boolean notMatching = false;
31713179
int minSize = Math.min(groupKeyNames.size() - partitionedBy.size(), bucketedColumns.size());
31723180
int partSize = partitionedBy.size();
31733181
for (int keyIdx = 0; keyIdx < minSize; keyIdx++) {
31743182
if (!groupKeyNames.get(keyIdx + partSize).equals(bucketedColumns.get(keyIdx))) {
3175-
notMatching = true;
3176-
break;
3183+
return partialAndFinalAggregationType;
31773184
}
31783185
}
3179-
if (!notMatching) {
3180-
singleOrZeroBucketedColumn = true;
3186+
singleOrZeroBucketedColumn = true;
3187+
}
3188+
3189+
for (int numOfComparedKeys = 0; numOfComparedKeys < partitionedBy.size(); numOfComparedKeys++) {
3190+
if ((!groupKeyNames.get(numOfComparedKeys).equals(partitionedBy.get(numOfComparedKeys)))) {
3191+
return partialAndFinalAggregationType;
31813192
}
31823193
}
3194+
3195+
if (groupKeyNames.size() == partitionedBy.size()) {
3196+
partialAndFinalAggregationType.setPartialAsSortAndFinalAsHashAggregation(true);
3197+
return partialAndFinalAggregationType;
3198+
}
3199+
31833200
if (singleOrZeroBucketedColumn || (groupKeyNames.size() == (bucketedColumns.size() + partitionedBy.size()))) {
3184-
for (int numOfComparedKeys = 0; numOfComparedKeys < groupKeyNames.size(); numOfComparedKeys++) {
3185-
boolean bucketedColumnsResult = numOfComparedKeys < partitionedBy.size() ? false : !singleOrZeroBucketedColumn &&
3186-
(!groupKeyNames.get(numOfComparedKeys).equals(bucketedColumns.get(numOfComparedKeys - partitionedBy.size())));
3187-
if ((!groupKeyNames.get(numOfComparedKeys).equals(partitionAndSortedColumnNames.get(numOfComparedKeys))) ||
3201+
int numOfCmpKeysAfterPartitionedBy = partitionedBy.size();
3202+
for (int numOfComparedKeys = 0; numOfComparedKeys < groupKeyNames.size() - partitionedBy.size(); numOfComparedKeys++, numOfCmpKeysAfterPartitionedBy++) {
3203+
boolean bucketedColumnsResult = !singleOrZeroBucketedColumn && (!groupKeyNames.get(numOfComparedKeys).equals(bucketedColumns.get(numOfComparedKeys)));
3204+
if ((!groupKeyNames.get(numOfCmpKeysAfterPartitionedBy).equals(sortedColumnNames.get(numOfComparedKeys))) ||
31883205
(!singleOrZeroBucketedColumn && bucketedColumnsResult)) {
31893206
if (log.isDebugEnabled()) {
31903207
final String[] dbgGroupKeyNames = {new String("")};
31913208
groupKeyNames.stream().forEach(k -> dbgGroupKeyNames[0] = dbgGroupKeyNames[0].concat(k + " , "));
31923209
final String[] dbgSortedColumnNames = {new String("")};
3193-
partitionAndSortedColumnNames.stream().forEach(k -> dbgSortedColumnNames[0] = dbgSortedColumnNames[0].concat(k + " , "));
3210+
sortedColumnNames.stream().forEach(k -> dbgSortedColumnNames[0] = dbgSortedColumnNames[0].concat(k + " , "));
31943211
if ((null != bucketedColumns) && (bucketedColumns.size() > 0)) {
31953212
final String[] dbgbucketedColumns = {new String("")};
31963213
bucketedColumns.stream().forEach(k -> dbgbucketedColumns[0] = dbgbucketedColumns[0].concat(k + " , "));
31973214
log.debug("Not matching sortedColumnNames: " + dbgSortedColumnNames + " group columns name: " + dbgGroupKeyNames + " bucketedColumns :" + dbgbucketedColumns);
31983215
}
31993216
log.debug("Not matching sortedColumnNames: " + dbgSortedColumnNames + " group columns name: " + dbgGroupKeyNames);
32003217
}
3201-
return false;
3218+
return partialAndFinalAggregationType;
32023219
}
32033220
}
3204-
return true;
3221+
partialAndFinalAggregationType.setSortAggregation(true);
3222+
return partialAndFinalAggregationType;
32053223
}
3206-
return false;
3224+
return partialAndFinalAggregationType;
32073225
}
32083226

32093227
@Override

presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java

+46
Original file line numberDiff line numberDiff line change
@@ -6609,6 +6609,52 @@ public void testCachedPlanForTablesWithSameName()
66096609
assertUpdate(String.format("DROP TABLE %s.%s", schema, table));
66106610
}
66116611

6612+
@Test
6613+
public void UnSortAggrePartitionBucketCount1()
6614+
{
6615+
initSortBasedAggregation();
6616+
computeActual("create table lineitem_partition_shipmode_comment_bucket1 with(transactional = false, " +
6617+
"format = 'ORC', partitioned_by = ARRAY['shipmode', 'comment'], bucketed_by=array['orderkey', 'partkey'], bucket_count=1)" +
6618+
" as select * from tpch.tiny.lineitem limit 100");
6619+
6620+
String query = "select lineitem_partition_shipmode_comment_bucket1.shipmode, lineitem_partition_shipmode_comment_bucket1.comment " +
6621+
"from lineitem_partition_shipmode_comment_bucket1 " +
6622+
"group by shipmode, comment " +
6623+
"order by shipmode, comment ";
6624+
6625+
MaterializedResult sortResult = computeActual(testSessionSort, query);
6626+
MaterializedResult hashResult = computeActual(query);
6627+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6628+
sortResult = computeActual(testSessionSortPrcntDrv50, query);
6629+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6630+
sortResult = computeActual(testSessionSortPrcntDrv40, query);
6631+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6632+
assertUpdate("DROP TABLE lineitem_partition_shipmode_comment_bucket1");
6633+
}
6634+
6635+
@Test
6636+
public void SortAggreGroupOnlyPartitionColumns()
6637+
{
6638+
initSortBasedAggregation();
6639+
computeActual("create table lineitem_sort_partition_shipmode_comment_bucket1 with(transactional = false, " +
6640+
"format = 'ORC', partitioned_by = ARRAY['shipmode', 'comment'], bucketed_by=array['orderkey', 'partkey'], bucket_count=1, sorted_by = ARRAY['orderkey', 'partkey'])" +
6641+
" as select * from tpch.tiny.lineitem limit 100");
6642+
6643+
String query = "select lineitem_sort_partition_shipmode_comment_bucket1.shipmode, lineitem_sort_partition_shipmode_comment_bucket1.comment " +
6644+
"from lineitem_sort_partition_shipmode_comment_bucket1 " +
6645+
"group by shipmode, comment " +
6646+
"order by shipmode, comment ";
6647+
6648+
MaterializedResult sortResult = computeActual(testSessionSort, query);
6649+
MaterializedResult hashResult = computeActual(query);
6650+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6651+
sortResult = computeActual(testSessionSortPrcntDrv50, query);
6652+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6653+
sortResult = computeActual(testSessionSortPrcntDrv40, query);
6654+
assertEquals(sortResult.getMaterializedRows(), hashResult.getMaterializedRows());
6655+
assertUpdate("DROP TABLE lineitem_sort_partition_shipmode_comment_bucket1");
6656+
}
6657+
66126658
@Test
66136659
public void testAcidFormatColumnNameConflict()
66146660
{

0 commit comments

Comments
 (0)