Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4f11b8c
Add ingestion skip filter for upsert SRT task
krishan1390 Jul 14, 2025
59670e6
Merge remote-tracking branch 'upstream/master'
krishan1390 Jul 15, 2025
d6fa313
Merge remote-tracking branch 'upstream/master'
krishan1390 Jul 28, 2025
5db44e2
Merge remote-tracking branch 'upstream/master'
krishan1390 Jul 30, 2025
184e769
Merge remote-tracking branch 'upstream/master'
krishan1390 Aug 6, 2025
0cddd95
Merge remote-tracking branch 'upstream/master'
krishan1390 Aug 6, 2025
59d46e6
Merge remote-tracking branch 'upstream/master'
krishan1390 Aug 14, 2025
0adec23
Merge remote-tracking branch 'upstream/master'
krishan1390 Aug 29, 2025
cf0a077
Merge remote-tracking branch 'upstream/master'
krishan1390 Sep 1, 2025
da2519f
Merge remote-tracking branch 'upstream/master'
krishan1390 Sep 3, 2025
3f7b8c0
Merge remote-tracking branch 'upstream/master'
krishan1390 Sep 9, 2025
81388db
Merge remote-tracking branch 'upstream/master'
krishan1390 Sep 15, 2025
d4b7c4a
Merge remote-tracking branch 'upstream/master'
krishan1390 Sep 17, 2025
afc7245
Optimise index stats collector for no dict columns
krishan1390 Sep 18, 2025
5b27291
Test case fixes
krishan1390 Sep 22, 2025
7f62ad7
Add nodict collector to avoid changes to each collector to optimise n…
krishan1390 Sep 22, 2025
0771453
Add nodict collector to avoid changes to each collector to optimise n…
krishan1390 Sep 22, 2025
d2c7cc5
Checkstyle fixes
krishan1390 Sep 22, 2025
1fed6a2
Fix SegmentPreProcessorTest
krishan1390 Sep 22, 2025
8ff6f29
Linter fix
krishan1390 Sep 22, 2025
2a1858f
Add support for native memory arrays
krishan1390 Sep 23, 2025
6b29413
Merge remote-tracking branch 'upstream/master' into optimise_index_st…
krishan1390 Sep 23, 2025
5aa3e05
Add ULL to approximate cardinality in NoDictColumnStatisticsCollector
krishan1390 Sep 24, 2025
2b86882
Revert "Fix SegmentPreProcessorTest"
krishan1390 Sep 24, 2025
6f30b13
Fix tests after ULL approximation
krishan1390 Sep 24, 2025
9dbc174
cstyle fixes
krishan1390 Sep 24, 2025
2e104af
Add tests related to no dict column
krishan1390 Sep 24, 2025
97f2294
cstyle fixes
krishan1390 Sep 24, 2025
faff37d
Add support for map type
krishan1390 Sep 24, 2025
975fc4a
cstyle fixes
krishan1390 Sep 24, 2025
0a1166d
Merge remote-tracking branch 'upstream/master' into optimise_index_st…
krishan1390 Sep 24, 2025
5db2d32
1. Add config to disable nodict column stats.
krishan1390 Sep 25, 2025
651b62e
Fix test case
krishan1390 Sep 25, 2025
a39a365
Optimise map support for no dict columns
krishan1390 Sep 25, 2025
e0f7b03
Add documentation
krishan1390 Sep 25, 2025
8938c41
Update bits per element when cardinality changes
krishan1390 Sep 25, 2025
d6c7883
Merge remote-tracking branch 'upstream/master' into optimise_index_st…
krishan1390 Sep 25, 2025
fa518fb
Update cardinality from actual stats collector
krishan1390 Sep 29, 2025
283828a
cstyle fixes
krishan1390 Sep 29, 2025
1a86699
test case fixes
krishan1390 Sep 29, 2025
5f1ad22
Merge remote-tracking branch 'upstream/master' into optimise_index_st…
krishan1390 Sep 29, 2025
5f13b1e
Fix some properties in ColumnMetadata
Jackie-Jiang Sep 30, 2025
8d5e86e
Merge branch 'fix_bits_per_value' of github.com:Jackie-Jiang/pinot in…
krishan1390 Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.base.Preconditions;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;


Expand Down Expand Up @@ -51,8 +50,8 @@ public StringGenerator(Integer cardinality, Double numberOfValuesPerEntry, Integ
int initValueSize = lengthOfEachString - _counterLength;
Preconditions.checkState(initValueSize >= 0,
String.format("Cannot generate %d unique string with length %d", _cardinality, lengthOfEachString));
_initialValue = RandomStringUtils.randomAlphabetic(initValueSize);
_rand = new Random(System.currentTimeMillis());
_rand = new Random(0L);
_initialValue = generateAlphabetic(initValueSize);
}

@Override
Expand All @@ -75,6 +74,18 @@ private String getNextString() {
return _initialValue + StringUtils.leftPad(String.valueOf(_counter), _counterLength, '0');
}

private String generateAlphabetic(int length) {
if (length <= 0) {
return "";
}
final char[] alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
sb.append(alphabet[_rand.nextInt(alphabet.length)]);
}
return sb.toString();
}

public static void main(String[] args) {
final StringGenerator gen = new StringGenerator(10000, null, null);
gen.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testSegmentGenerator()
assertEquals(extract(metadata, "column.colFloatMV.cardinality = (\\d+)"), "250");
assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "300");
assertEquals(extract(metadata, "column.colStringMV.cardinality = (\\d+)"), "350");
assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "400");
assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "416");
assertEquals(extract(metadata, "column.colLong.cardinality = (\\d+)"), "500");
assertEquals(extract(metadata, "column.colLongMV.cardinality = (\\d+)"), "550");
assertEquals(extract(metadata, "column.colDouble.cardinality = (\\d+)"), "600");
Expand Down Expand Up @@ -83,7 +83,7 @@ public void testSegmentGeneratorWithDateTimeFieldSpec()
assertEquals(extract(metadata, "column.colInt.cardinality = (\\d+)"), "500");
assertEquals(extract(metadata, "column.colFloat.cardinality = (\\d+)"), "600");
assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "700");
assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "800");
assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "841");
assertEquals(extract(metadata, "column.colMetric.cardinality = (\\d+)"), "900");
assertEquals(extract(metadata, "column.colTime.cardinality = (\\d+)"), "250");
assertEquals(extract(metadata, "column.colTime2.cardinality = (\\d+)"), "750");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.intellij.lang.annotations.Language;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;


/**
Expand Down Expand Up @@ -330,4 +331,16 @@ private BrokerResponseNative getBrokerResponseDistinctInstances(PinotQuery pinot
serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
return reduceOnDataTable(brokerRequest, serverBrokerRequest, dataTableMap);
}

protected void validateBeforeAfterQueryResults(List<Object[]> beforeResults, List<Object[]> afterResults) {
assertEquals(beforeResults.size(), afterResults.size());
for (int i = 0; i < beforeResults.size(); i++) {
Object[] resultRow1 = beforeResults.get(i);
Object[] resultRow2 = afterResults.get(i);
assertEquals(resultRow1.length, resultRow2.length);
for (int j = 0; j < resultRow1.length; j++) {
assertEquals(resultRow1[j], resultRow2[j]);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.queries;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;


public class CustomReloadQueriesTest extends BaseQueriesTest {

private static final File INDEX_DIR =
new File(FileUtils.getTempDirectory(), CustomReloadQueriesTest.class.getSimpleName());
private static final String RAW_TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";

private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;

@BeforeMethod
public void setUp()
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
}

private TableConfig createTableConfig(List<String> noDictionaryColumns, List<String> invertedIndexColumns,
List<String> rangeIndexColumns, List<FieldConfig> fieldConfigs) {
return new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setNoDictionaryColumns(noDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
.setRangeIndexColumns(rangeIndexColumns).setFieldConfigList(fieldConfigs).build();
}

@AfterMethod
public void tearDown() {
_indexSegment.destroy();
FileUtils.deleteQuietly(INDEX_DIR);
}

@Override
protected String getFilter() {
return "";
}

@Override
protected IndexSegment getIndexSegment() {
return _indexSegment;
}

@Override
protected List<IndexSegment> getIndexSegments() {
return _indexSegments;
}

@DataProvider(name = "alphabets")
public static Object[][] alphabets() {
// 2 sets of input data - sorted and unsorted
return new Object[][] {
{ new String[]{"a", "b", "c", "d", "e"} },
{ new String[]{"b", "c", "a", "e", "d"} }
};
}

/**
* If a columns approximate cardinality is lesser than actual cardinality and its bits per element also reduces
* because of this, then enabling dictionary for that column should result in updating both bits per element
* and cardinality. In this test, we will verify both segment metadata and query results
* @throws Exception
*/
@Test(dataProvider = "alphabets")
public void testReducedBitsPerElementWithNoDictCardinalityApproximation(String[] alphabets)
throws Exception {

// Common variables - schema, data file, etc
File csvFile = new File(FileUtils.getTempDirectory(), "data.csv");
List<String> values = new ArrayList<>(Arrays.asList(alphabets));
String columnName = "column1";
writeCsv(csvFile, values, columnName);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension(columnName, FieldSpec.DataType.STRING)
.build();

// Load segment with no dictionary column and get segment metadata
List<FieldConfig> fieldConfigs = new ArrayList<>();
fieldConfigs.add(new FieldConfig(
columnName, FieldConfig.EncodingType.RAW, List.of(), FieldConfig.CompressionCodec.SNAPPY, null));
TableConfig tableConfig = createTableConfig(List.of(), List.of(), List.of(), fieldConfigs);
ImmutableSegment segment = buildNewSegment(tableConfig, schema, csvFile.getAbsolutePath());
Map<String, ColumnMetadata> columnMetadataMap = segment.getSegmentMetadata().getColumnMetadataMap();

ColumnMetadata columnMetadata1 = columnMetadataMap.get(columnName);
assertFalse(columnMetadata1.hasDictionary());
assertNull(segment.getDictionary(columnName));

String query = "SELECT column1, count(*) FROM testTable GROUP BY column1 ORDER BY column1";
BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
List<Object[]> resultRows1 = brokerResponseNative.getResultTable().getRows();
assertEquals(resultRows1.size(), 5);

// Make column1 dictionary encoded and reload table
fieldConfigs = new ArrayList<>();
fieldConfigs.add(new FieldConfig(
columnName, FieldConfig.EncodingType.DICTIONARY, List.of(), FieldConfig.CompressionCodec.SNAPPY, null));
tableConfig = createTableConfig(List.of(), List.of(), List.of(), fieldConfigs);
segment = reloadSegment(tableConfig, schema);
columnMetadataMap = segment.getSegmentMetadata().getColumnMetadataMap();

ColumnMetadata columnMetadata2 = columnMetadataMap.get(columnName);
assertEquals(columnMetadata2.getCardinality(), 5); // actual cardinality
assertEquals(columnMetadata2.getBitsPerElement(), 3); // actual required bits per element
assertTrue(columnMetadata2.hasDictionary());
assertNotNull(segment.getDictionary(columnName));

brokerResponseNative = getBrokerResponse(query);
List<Object[]> resultRows2 = brokerResponseNative.getResultTable().getRows();
validateBeforeAfterQueryResults(resultRows1, resultRows2);
}

private ImmutableSegment buildNewSegment(TableConfig tableConfig, Schema schema, String inputFile)
throws Exception {
SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setInputFilePath(inputFile);
generatorConfig.setFormat(FileFormat.CSV);
generatorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
generatorConfig.setSegmentName(SEGMENT_NAME);

SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig);
driver.build();

ImmutableSegment segment = ImmutableSegmentLoader.load(
new File(INDEX_DIR, SEGMENT_NAME), new IndexLoadingConfig(tableConfig, schema));
if (_indexSegment != null) {
_indexSegment.destroy();
}
_indexSegment = segment;
_indexSegments = List.of(segment, segment);
return segment;
}

private ImmutableSegment reloadSegment(TableConfig tableConfig, Schema schema)
throws Exception {
IndexLoadingConfig loadingConfig = new IndexLoadingConfig(tableConfig, schema);
File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
SegmentDirectory segmentDirectory = new SegmentLocalFSDirectory(indexDir, ReadMode.mmap);
try (SegmentPreProcessor preProcessor = new SegmentPreProcessor(segmentDirectory, loadingConfig)) {
preProcessor.process();
}
// Replace in-memory segment with reloaded one
_indexSegment.destroy();
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, loadingConfig);
_indexSegment = segment;
_indexSegments = List.of(segment, segment);
return segment;
}

private static void writeCsv(File file, List<String> values, String columnName) throws IOException {
try (FileWriter writer = new FileWriter(file, false)) {
writer.append(columnName).append('\n');
for (String v : values) {
writer.append(v).append('\n');
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -621,18 +621,6 @@ public void testRangeIndexAfterReload()
validateBeforeAfterQueryResults(resultRows1, resultRows2);
}

private void validateBeforeAfterQueryResults(List<Object[]> beforeResults, List<Object[]> afterResults) {
assertEquals(beforeResults.size(), afterResults.size());
for (int i = 0; i < beforeResults.size(); i++) {
Object[] resultRow1 = beforeResults.get(i);
Object[] resultRow2 = afterResults.get(i);
assertEquals(resultRow1.length, resultRow2.length);
for (int j = 0; j < resultRow1.length; j++) {
assertEquals(resultRow1[j], resultRow2[j]);
}
}
}

/**
* As a part of segmentReload, the ForwardIndexHandler will perform the following operations:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, @Nullable in
private ColumnStatistics createMapColumnStatistics(DataSource dataSource, boolean useCompactedStatistics,
ThreadSafeMutableRoaringBitmap validDocIds, StatsCollectorConfig statsCollectorConfig) {
ForwardIndexReader reader = dataSource.getForwardIndex();
// TODO - Can we use NoDictColumnStatisticsCollector here for non dictionary columns ?
MapColumnPreIndexStatsCollector mapColumnPreIndexStatsCollector =
new MapColumnPreIndexStatsCollector(dataSource.getColumnName(), statsCollectorConfig);

Expand Down
Loading
Loading