Skip to content

Commit e258c7d

Browse files
authored
Add MV support to LONG specific aggregation functions (#17007)
1 parent 17dfafe commit e258c7d

File tree

10 files changed

+621
-26
lines changed

10 files changed

+621
-26
lines changed

pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java

Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,37 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
6868
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
6969
Map<ExpressionContext, BlockValSet> blockValSetMap) {
7070
BlockValSet blockValSet = blockValSetMap.get(_expression);
71-
long[] values = blockValSet.getLongValuesSV();
7271

73-
Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
74-
long innerMax = values[from];
75-
for (int i = from; i < to; i++) {
76-
innerMax = Math.max(innerMax, values[i]);
77-
}
78-
return acum == null ? innerMax : Math.max(acum, innerMax);
79-
});
72+
if (blockValSet.isSingleValue()) {
73+
long[] values = blockValSet.getLongValuesSV();
74+
75+
Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
76+
long innerMax = values[from];
77+
for (int i = from; i < to; i++) {
78+
innerMax = Math.max(innerMax, values[i]);
79+
}
80+
return acum == null ? innerMax : Math.max(acum, innerMax);
81+
});
82+
83+
updateAggregationResultHolder(aggregationResultHolder, max);
84+
} else {
85+
long[][] valuesArray = blockValSet.getLongValuesMV();
86+
87+
Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
88+
long innerMax = DEFAULT_INITIAL_VALUE;
89+
for (int i = from; i < to; i++) {
90+
long[] values = valuesArray[i];
91+
for (long value : values) {
92+
if (value > innerMax) {
93+
innerMax = value;
94+
}
95+
}
96+
}
97+
return acum == null ? innerMax : Math.max(acum, innerMax);
98+
});
8099

81-
updateAggregationResultHolder(aggregationResultHolder, max);
100+
updateAggregationResultHolder(aggregationResultHolder, max);
101+
}
82102
}
83103

84104
protected void updateAggregationResultHolder(AggregationResultHolder aggregationResultHolder, Long max) {
@@ -97,6 +117,16 @@ protected void updateAggregationResultHolder(AggregationResultHolder aggregation
97117
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
98118
Map<ExpressionContext, BlockValSet> blockValSetMap) {
99119
BlockValSet blockValSet = blockValSetMap.get(_expression);
120+
121+
if (blockValSet.isSingleValue()) {
122+
aggregateSvGroupBySV(blockValSet, length, groupKeyArray, groupByResultHolder);
123+
} else {
124+
aggregateMvGroupBySV(blockValSet, length, groupKeyArray, groupByResultHolder);
125+
}
126+
}
127+
128+
private void aggregateSvGroupBySV(BlockValSet blockValSet, int length, int[] groupKeyArray,
129+
GroupByResultHolder groupByResultHolder) {
100130
long[] valueArray = blockValSet.getLongValuesSV();
101131

102132
if (_nullHandlingEnabled) {
@@ -121,10 +151,51 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
121151
}
122152
}
123153

154+
private void aggregateMvGroupBySV(BlockValSet blockValSet, int length, int[] groupKeyArray,
155+
GroupByResultHolder groupByResultHolder) {
156+
long[][] valuesArray = blockValSet.getLongValuesMV();
157+
158+
if (_nullHandlingEnabled) {
159+
forEachNotNull(length, blockValSet, (from, to) -> {
160+
for (int i = from; i < to; i++) {
161+
int groupKey = groupKeyArray[i];
162+
Long max = groupByResultHolder.getResult(groupKey);
163+
for (long value : valuesArray[i]) {
164+
if (max == null || value > max) {
165+
max = value;
166+
}
167+
}
168+
groupByResultHolder.setValueForKey(groupKey, max);
169+
}
170+
});
171+
} else {
172+
for (int i = 0; i < length; i++) {
173+
int groupKey = groupKeyArray[i];
174+
long max = groupByResultHolder.getLongResult(groupKey);
175+
for (long value : valuesArray[i]) {
176+
if (value > max) {
177+
max = value;
178+
}
179+
}
180+
groupByResultHolder.setValueForKey(groupKey, max);
181+
}
182+
}
183+
}
184+
124185
@Override
125186
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
126187
Map<ExpressionContext, BlockValSet> blockValSetMap) {
127188
BlockValSet blockValSet = blockValSetMap.get(_expression);
189+
190+
if (blockValSet.isSingleValue()) {
191+
aggregateSvGroupByMV(blockValSet, length, groupKeysArray, groupByResultHolder);
192+
} else {
193+
aggregateMvGroupByMV(blockValSet, length, groupKeysArray, groupByResultHolder);
194+
}
195+
}
196+
197+
private void aggregateSvGroupByMV(BlockValSet blockValSet, int length, int[][] groupKeysArray,
198+
GroupByResultHolder groupByResultHolder) {
128199
long[] valueArray = blockValSet.getLongValuesSV();
129200

130201
if (_nullHandlingEnabled) {
@@ -151,6 +222,44 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
151222
}
152223
}
153224

225+
private void aggregateMvGroupByMV(BlockValSet blockValSet, int length, int[][] groupKeysArray,
226+
GroupByResultHolder groupByResultHolder) {
227+
long[][] valuesArray = blockValSet.getLongValuesMV();
228+
229+
if (_nullHandlingEnabled) {
230+
forEachNotNull(length, blockValSet, (from, to) -> {
231+
for (int i = from; i < to; i++) {
232+
Long max = null;
233+
for (long value : valuesArray[i]) {
234+
if (max == null || value > max) {
235+
max = value;
236+
}
237+
}
238+
239+
for (int groupKey : groupKeysArray[i]) {
240+
Long currentMax = groupByResultHolder.getResult(groupKey);
241+
if (currentMax == null || (max != null && max > currentMax)) {
242+
groupByResultHolder.setValueForKey(groupKey, max);
243+
}
244+
}
245+
}
246+
});
247+
} else {
248+
for (int i = 0; i < length; i++) {
249+
long[] values = valuesArray[i];
250+
for (int groupKey : groupKeysArray[i]) {
251+
long max = groupByResultHolder.getLongResult(groupKey);
252+
for (long value : values) {
253+
if (value > max) {
254+
max = value;
255+
}
256+
}
257+
groupByResultHolder.setValueForKey(groupKey, max);
258+
}
259+
}
260+
}
261+
}
262+
154263
@Override
155264
public Long extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
156265
if (_nullHandlingEnabled) {

pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,37 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
6868
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
6969
Map<ExpressionContext, BlockValSet> blockValSetMap) {
7070
BlockValSet blockValSet = blockValSetMap.get(_expression);
71-
long[] values = blockValSet.getLongValuesSV();
7271

73-
Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
74-
long innerMin = values[from];
75-
for (int i = from; i < to; i++) {
76-
innerMin = Math.min(innerMin, values[i]);
77-
}
78-
return acum == null ? innerMin : Math.min(acum, innerMin);
79-
});
72+
if (blockValSet.isSingleValue()) {
73+
long[] values = blockValSet.getLongValuesSV();
74+
75+
Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
76+
long innerMin = values[from];
77+
for (int i = from; i < to; i++) {
78+
innerMin = Math.min(innerMin, values[i]);
79+
}
80+
return acum == null ? innerMin : Math.min(acum, innerMin);
81+
});
8082

81-
updateAggregationResultHolder(aggregationResultHolder, min);
83+
updateAggregationResultHolder(aggregationResultHolder, min);
84+
} else {
85+
long[][] valuesArray = blockValSet.getLongValuesMV();
86+
87+
Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
88+
long innerMin = DEFAULT_VALUE;
89+
for (int i = from; i < to; i++) {
90+
long[] values = valuesArray[i];
91+
for (long value : values) {
92+
if (value < innerMin) {
93+
innerMin = value;
94+
}
95+
}
96+
}
97+
return acum == null ? innerMin : Math.min(acum, innerMin);
98+
});
99+
100+
updateAggregationResultHolder(aggregationResultHolder, min);
101+
}
82102
}
83103

84104
protected void updateAggregationResultHolder(AggregationResultHolder aggregationResultHolder, Long min) {
@@ -97,8 +117,17 @@ protected void updateAggregationResultHolder(AggregationResultHolder aggregation
97117
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
98118
Map<ExpressionContext, BlockValSet> blockValSetMap) {
99119
BlockValSet blockValSet = blockValSetMap.get(_expression);
100-
long[] valueArray = blockValSet.getLongValuesSV();
101120

121+
if (blockValSet.isSingleValue()) {
122+
aggregateSvGroupBySv(blockValSet, length, groupKeyArray, groupByResultHolder);
123+
} else {
124+
aggregateMvGroupBySv(blockValSet, length, groupKeyArray, groupByResultHolder);
125+
}
126+
}
127+
128+
private void aggregateSvGroupBySv(BlockValSet blockValSet, int length, int[] groupKeyArray,
129+
GroupByResultHolder groupByResultHolder) {
130+
long[] valueArray = blockValSet.getLongValuesSV();
102131
if (_nullHandlingEnabled) {
103132
forEachNotNull(length, blockValSet, (from, to) -> {
104133
for (int i = from; i < to; i++) {
@@ -121,10 +150,51 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
121150
}
122151
}
123152

153+
private void aggregateMvGroupBySv(BlockValSet blockValSet, int length, int[] groupKeyArray,
154+
GroupByResultHolder groupByResultHolder) {
155+
long[][] valuesArray = blockValSet.getLongValuesMV();
156+
157+
if (_nullHandlingEnabled) {
158+
forEachNotNull(length, blockValSet, (from, to) -> {
159+
for (int i = from; i < to; i++) {
160+
int groupKey = groupKeyArray[i];
161+
Long min = groupByResultHolder.getResult(groupKey);
162+
for (long value : valuesArray[i]) {
163+
if (min == null || value < min) {
164+
min = value;
165+
}
166+
}
167+
groupByResultHolder.setValueForKey(groupKey, min);
168+
}
169+
});
170+
} else {
171+
for (int i = 0; i < length; i++) {
172+
int groupKey = groupKeyArray[i];
173+
long min = groupByResultHolder.getLongResult(groupKey);
174+
for (long value : valuesArray[i]) {
175+
if (value < min) {
176+
min = value;
177+
}
178+
}
179+
groupByResultHolder.setValueForKey(groupKey, min);
180+
}
181+
}
182+
}
183+
124184
@Override
125185
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
126186
Map<ExpressionContext, BlockValSet> blockValSetMap) {
127187
BlockValSet blockValSet = blockValSetMap.get(_expression);
188+
189+
if (blockValSet.isSingleValue()) {
190+
aggregateSvGroupByMv(blockValSet, length, groupKeysArray, groupByResultHolder);
191+
} else {
192+
aggregateMvGroupByMv(blockValSet, length, groupKeysArray, groupByResultHolder);
193+
}
194+
}
195+
196+
private void aggregateSvGroupByMv(BlockValSet blockValSet, int length, int[][] groupKeysArray,
197+
GroupByResultHolder groupByResultHolder) {
128198
long[] valueArray = blockValSet.getLongValuesSV();
129199

130200
if (_nullHandlingEnabled) {
@@ -151,6 +221,44 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
151221
}
152222
}
153223

224+
private void aggregateMvGroupByMv(BlockValSet blockValSet, int length, int[][] groupKeysArray,
225+
GroupByResultHolder groupByResultHolder) {
226+
long[][] valuesArray = blockValSet.getLongValuesMV();
227+
228+
if (_nullHandlingEnabled) {
229+
forEachNotNull(length, blockValSet, (from, to) -> {
230+
for (int i = from; i < to; i++) {
231+
Long min = null;
232+
for (long value : valuesArray[i]) {
233+
if (min == null || value < min) {
234+
min = value;
235+
}
236+
}
237+
238+
for (int groupKey : groupKeysArray[i]) {
239+
Long currentMin = groupByResultHolder.getResult(groupKey);
240+
if (currentMin == null || (min != null && min < currentMin)) {
241+
groupByResultHolder.setValueForKey(groupKey, min);
242+
}
243+
}
244+
}
245+
});
246+
} else {
247+
for (int i = 0; i < length; i++) {
248+
long[] values = valuesArray[i];
249+
for (int groupKey : groupKeysArray[i]) {
250+
long min = groupByResultHolder.getLongResult(groupKey);
251+
for (long value : values) {
252+
if (value < min) {
253+
min = value;
254+
}
255+
}
256+
groupByResultHolder.setValueForKey(groupKey, min);
257+
}
258+
}
259+
}
260+
}
261+
154262
@Override
155263
public Long extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
156264
if (_nullHandlingEnabled) {

0 commit comments

Comments
 (0)