diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java index ab461dd46acf..35b323182889 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import java.util.Random; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -51,8 +50,8 @@ public StringGenerator(Integer cardinality, Double numberOfValuesPerEntry, Integ int initValueSize = lengthOfEachString - _counterLength; Preconditions.checkState(initValueSize >= 0, String.format("Cannot generate %d unique string with length %d", _cardinality, lengthOfEachString)); - _initialValue = RandomStringUtils.randomAlphabetic(initValueSize); - _rand = new Random(System.currentTimeMillis()); + _rand = new Random(0L); + _initialValue = generateAlphabetic(initValueSize); } @Override @@ -75,6 +74,18 @@ private String getNextString() { return _initialValue + StringUtils.leftPad(String.valueOf(_counter), _counterLength, '0'); } + private String generateAlphabetic(int length) { + if (length <= 0) { + return ""; + } + final char[] alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray(); + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(alphabet[_rand.nextInt(alphabet.length)]); + } + return sb.toString(); + } + public static void main(String[] args) { final StringGenerator gen = new StringGenerator(10000, null, null); gen.init(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimatorTest.java index 7b8eadc508b7..abaef091f954 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimatorTest.java @@ -50,7 +50,7 @@ public void testSegmentGenerator() assertEquals(extract(metadata, "column.colFloatMV.cardinality = (\\d+)"), "250"); assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "300"); assertEquals(extract(metadata, "column.colStringMV.cardinality = (\\d+)"), "350"); - assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "400"); + assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "416"); assertEquals(extract(metadata, "column.colLong.cardinality = (\\d+)"), "500"); assertEquals(extract(metadata, "column.colLongMV.cardinality = (\\d+)"), "550"); assertEquals(extract(metadata, "column.colDouble.cardinality = (\\d+)"), "600"); @@ -83,7 +83,7 @@ public void testSegmentGeneratorWithDateTimeFieldSpec() assertEquals(extract(metadata, "column.colInt.cardinality = (\\d+)"), "500"); assertEquals(extract(metadata, "column.colFloat.cardinality = (\\d+)"), "600"); assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "700"); - assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "800"); + assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "841"); assertEquals(extract(metadata, "column.colMetric.cardinality = (\\d+)"), "900"); assertEquals(extract(metadata, "column.colTime.cardinality = (\\d+)"), "250"); assertEquals(extract(metadata, "column.colTime2.cardinality = (\\d+)"), "750"); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index 5bc6636d15eb..f47e78f97f7a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -57,6 +57,7 @@ import org.intellij.lang.annotations.Language; import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; /** @@ -330,4 +331,16 @@ private BrokerResponseNative getBrokerResponseDistinctInstances(PinotQuery pinot serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery); return reduceOnDataTable(brokerRequest, serverBrokerRequest, dataTableMap); } + + protected void validateBeforeAfterQueryResults(List beforeResults, List afterResults) { + assertEquals(beforeResults.size(), afterResults.size()); + for (int i = 0; i < beforeResults.size(); i++) { + Object[] resultRow1 = beforeResults.get(i); + Object[] resultRow2 = afterResults.get(i); + assertEquals(resultRow1.length, resultRow2.length); + for (int j = 0; j < resultRow1.length; j++) { + assertEquals(resultRow1[j], resultRow2[j]); + } + } + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/CustomReloadQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/CustomReloadQueriesTest.java new file mode 100644 index 000000000000..16820c99d86c --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/CustomReloadQueriesTest.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor; +import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; + + +public class CustomReloadQueriesTest extends BaseQueriesTest { + + private static final File INDEX_DIR = + new File(FileUtils.getTempDirectory(), CustomReloadQueriesTest.class.getSimpleName()); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String SEGMENT_NAME = "testSegment"; + + private IndexSegment _indexSegment; + private List _indexSegments; + + @BeforeMethod + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + } + + private TableConfig createTableConfig(List noDictionaryColumns, List invertedIndexColumns, + List rangeIndexColumns, List fieldConfigs) { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNoDictionaryColumns(noDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns) + .setRangeIndexColumns(rangeIndexColumns).setFieldConfigList(fieldConfigs).build(); + } + + @AfterMethod + public void tearDown() { + _indexSegment.destroy(); + FileUtils.deleteQuietly(INDEX_DIR); + } + + @Override + protected String getFilter() { + return ""; + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List getIndexSegments() { + return _indexSegments; + } + + @DataProvider(name = "alphabets") + public static Object[][] alphabets() { + // 2 sets of input data - sorted and unsorted + return new Object[][] { + { new String[]{"a", "b", "c", "d", "e"} }, + { new String[]{"b", "c", "a", "e", "d"} } + }; + } + + /** + * If a columns approximate cardinality is lesser than actual cardinality and its bits per element also reduces + * because of this, then enabling dictionary for that column should result in updating both bits per element + * and cardinality. In this test, we will verify both segment metadata and query results + * @throws Exception + */ + @Test(dataProvider = "alphabets") + public void testReducedBitsPerElementWithNoDictCardinalityApproximation(String[] alphabets) + throws Exception { + + // Common variables - schema, data file, etc + File csvFile = new File(FileUtils.getTempDirectory(), "data.csv"); + List values = new ArrayList<>(Arrays.asList(alphabets)); + String columnName = "column1"; + writeCsv(csvFile, values, columnName); + Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) + .addSingleValueDimension(columnName, FieldSpec.DataType.STRING) + .build(); + + // Load segment with no dictionary column and get segment metadata + List fieldConfigs = new ArrayList<>(); + fieldConfigs.add(new FieldConfig( + columnName, FieldConfig.EncodingType.RAW, List.of(), FieldConfig.CompressionCodec.SNAPPY, null)); + TableConfig tableConfig = createTableConfig(List.of(), List.of(), List.of(), fieldConfigs); + ImmutableSegment segment = buildNewSegment(tableConfig, schema, csvFile.getAbsolutePath()); + Map columnMetadataMap = segment.getSegmentMetadata().getColumnMetadataMap(); + + ColumnMetadata columnMetadata1 = columnMetadataMap.get(columnName); + assertFalse(columnMetadata1.hasDictionary()); + assertNull(segment.getDictionary(columnName)); + + String query = "SELECT column1, count(*) FROM testTable GROUP BY column1 ORDER BY column1"; + BrokerResponseNative brokerResponseNative = getBrokerResponse(query); + List resultRows1 = brokerResponseNative.getResultTable().getRows(); + assertEquals(resultRows1.size(), 5); + + // Make column1 dictionary encoded and reload table + fieldConfigs = new ArrayList<>(); + fieldConfigs.add(new FieldConfig( + columnName, FieldConfig.EncodingType.DICTIONARY, List.of(), FieldConfig.CompressionCodec.SNAPPY, null)); + tableConfig = createTableConfig(List.of(), List.of(), List.of(), fieldConfigs); + segment = reloadSegment(tableConfig, schema); + columnMetadataMap = segment.getSegmentMetadata().getColumnMetadataMap(); + + ColumnMetadata columnMetadata2 = columnMetadataMap.get(columnName); + assertEquals(columnMetadata2.getCardinality(), 5); // actual cardinality + assertEquals(columnMetadata2.getBitsPerElement(), 3); // actual required bits per element + assertTrue(columnMetadata2.hasDictionary()); + assertNotNull(segment.getDictionary(columnName)); + + brokerResponseNative = getBrokerResponse(query); + List resultRows2 = brokerResponseNative.getResultTable().getRows(); + validateBeforeAfterQueryResults(resultRows1, resultRows2); + } + + private ImmutableSegment buildNewSegment(TableConfig tableConfig, Schema schema, String inputFile) + throws Exception { + SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema); + generatorConfig.setInputFilePath(inputFile); + generatorConfig.setFormat(FileFormat.CSV); + generatorConfig.setOutDir(INDEX_DIR.getAbsolutePath()); + generatorConfig.setSegmentName(SEGMENT_NAME); + + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(generatorConfig); + driver.build(); + + ImmutableSegment segment = ImmutableSegmentLoader.load( + new File(INDEX_DIR, SEGMENT_NAME), new IndexLoadingConfig(tableConfig, schema)); + if (_indexSegment != null) { + _indexSegment.destroy(); + } + _indexSegment = segment; + _indexSegments = List.of(segment, segment); + return segment; + } + + private ImmutableSegment reloadSegment(TableConfig tableConfig, Schema schema) + throws Exception { + IndexLoadingConfig loadingConfig = new IndexLoadingConfig(tableConfig, schema); + File indexDir = new File(INDEX_DIR, SEGMENT_NAME); + SegmentDirectory segmentDirectory = new SegmentLocalFSDirectory(indexDir, ReadMode.mmap); + try (SegmentPreProcessor preProcessor = new SegmentPreProcessor(segmentDirectory, loadingConfig)) { + preProcessor.process(); + } + // Replace in-memory segment with reloaded one + _indexSegment.destroy(); + ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, loadingConfig); + _indexSegment = segment; + _indexSegments = List.of(segment, segment); + return segment; + } + + private static void writeCsv(File file, List values, String columnName) throws IOException { + try (FileWriter writer = new FileWriter(file, false)) { + writer.append(columnName).append('\n'); + for (String v : values) { + writer.append(v).append('\n'); + } + } + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java index fc5b40d7c688..c8411e549dd7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java @@ -621,18 +621,6 @@ public void testRangeIndexAfterReload() validateBeforeAfterQueryResults(resultRows1, resultRows2); } - private void validateBeforeAfterQueryResults(List beforeResults, List afterResults) { - assertEquals(beforeResults.size(), afterResults.size()); - for (int i = 0; i < beforeResults.size(); i++) { - Object[] resultRow1 = beforeResults.get(i); - Object[] resultRow2 = afterResults.get(i); - assertEquals(resultRow1.length, resultRow2.length); - for (int j = 0; j < resultRow1.length; j++) { - assertEquals(resultRow1[j], resultRow2[j]); - } - } - } - /** * As a part of segmentReload, the ForwardIndexHandler will perform the following operations: * diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java index ab950a475056..bbc1e143e2eb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java @@ -106,6 +106,7 @@ public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, @Nullable in private ColumnStatistics createMapColumnStatistics(DataSource dataSource, boolean useCompactedStatistics, ThreadSafeMutableRoaringBitmap validDocIds, StatsCollectorConfig statsCollectorConfig) { ForwardIndexReader reader = dataSource.getForwardIndex(); + // TODO - Can we use NoDictColumnStatisticsCollector here for non dictionary columns ? MapColumnPreIndexStatsCollector mapColumnPreIndexStatsCollector = new MapColumnPreIndexStatsCollector(dataSource.getColumnName(), statsCollectorConfig); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index 262fbb24ff48..16eef8b9b1e9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -602,9 +602,16 @@ private void writeMetadata() convertedStartTime = TimeUtils.getValidMinTimeMillis(); timeUnit = TimeUnit.MILLISECONDS; } else { - timeUnit = Preconditions.checkNotNull(_config.getSegmentTimeUnit()); - convertedEndTime = timeUnit.convert(now, TimeUnit.MILLISECONDS); - convertedStartTime = timeUnit.convert(TimeUtils.getValidMinTimeMillis(), TimeUnit.MILLISECONDS); + timeUnit = _config.getSegmentTimeUnit(); + if (timeUnit != null) { + convertedEndTime = timeUnit.convert(now, TimeUnit.MILLISECONDS); + convertedStartTime = timeUnit.convert(TimeUtils.getValidMinTimeMillis(), TimeUnit.MILLISECONDS); + } else { + // Use millis as the time unit if not able to infer from config + timeUnit = TimeUnit.MILLISECONDS; + convertedEndTime = now; + convertedStartTime = TimeUtils.getValidMinTimeMillis(); + } } LOGGER.warn( "Caught exception while writing time metadata for segment: {}, time column: {}, total docs: {}. " @@ -648,10 +655,16 @@ public static void addColumnMetadataInfo(PropertiesConfiguration properties, Str properties.setProperty(getKeyFor(column, TOTAL_DOCS), String.valueOf(totalDocs)); DataType dataType = fieldSpec.getDataType(); properties.setProperty(getKeyFor(column, DATA_TYPE), String.valueOf(dataType)); + // TODO: When the column is raw (no dictionary), we should set BITS_PER_ELEMENT to -1 (invalid). Currently we set it + // regardless of whether dictionary is created or not for backward compatibility because ForwardIndexHandler + // doesn't update this value when converting a raw column to dictionary encoded. + // Consider changing it after releasing 1.5.0. + // See https://github.com/apache/pinot/pull/16921 for details properties.setProperty(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(PinotDataBitSet.getNumBitsPerValue(cardinality - 1))); + FieldType fieldType = fieldSpec.getFieldType(); properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(dictionaryElementSize)); - properties.setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType())); + properties.setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldType)); properties.setProperty(getKeyFor(column, IS_SORTED), String.valueOf(columnIndexCreationInfo.isSorted())); properties.setProperty(getKeyFor(column, HAS_DICTIONARY), String.valueOf(hasDictionary)); properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField())); @@ -661,7 +674,8 @@ public static void addColumnMetadataInfo(PropertiesConfiguration properties, Str String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries())); properties.setProperty(getKeyFor(column, IS_AUTO_GENERATED), String.valueOf(columnIndexCreationInfo.isAutoGenerated())); - if (dataType.equals(DataType.STRING) || dataType.equals(DataType.BYTES) || dataType.equals(DataType.JSON)) { + DataType storedType = dataType.getStoredType(); + if (storedType == DataType.STRING || storedType == DataType.BYTES) { properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH), fieldSpec.getEffectiveMaxLength()); // TODO let's revisit writing effective maxLengthStrategy into metadata, as changing it right now may affect // segment's CRC value @@ -685,15 +699,28 @@ public static void addColumnMetadataInfo(PropertiesConfiguration properties, Str } } - // datetime field - if (fieldSpec.getFieldType() == FieldType.DATE_TIME) { + // Datetime field + if (fieldType == FieldType.DATE_TIME) { DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec; properties.setProperty(getKeyFor(column, DATETIME_FORMAT), dateTimeFieldSpec.getFormat()); properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity()); } - // complex field - if (fieldSpec.getFieldType() == FieldType.COMPLEX) { + if (fieldType != FieldType.COMPLEX) { + // Regular (non-complex) field + if (totalDocs > 0) { + Object min = columnIndexCreationInfo.getMin(); + Object max = columnIndexCreationInfo.getMax(); + // NOTE: + // Min/max could be null for real-time aggregate metrics. We don't directly call addColumnMinMaxValueInfo() to + // avoid setting MIN_MAX_VALUE_INVALID flag, which will prevent ColumnMinMaxValueGenerator from generating them + // when loading the segment. + if (min != null && max != null) { + addColumnMinMaxValueInfo(properties, column, min, max, storedType); + } + } + } else { + // Complex field ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec; properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES), new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet())); @@ -702,17 +729,9 @@ public static void addColumnMetadataInfo(PropertiesConfiguration properties, Str } } - // NOTE: Min/max could be null for real-time aggregate metrics. - if ((fieldSpec.getFieldType() != FieldType.COMPLEX) && (totalDocs > 0)) { - Object min = columnIndexCreationInfo.getMin(); - Object max = columnIndexCreationInfo.getMax(); - if (min != null && max != null) { - addColumnMinMaxValueInfo(properties, column, min, max, dataType.getStoredType()); - } - } - + // TODO: Revisit whether we should set default null value for complex field String defaultNullValue = columnIndexCreationInfo.getDefaultNullValue().toString(); - if (dataType.getStoredType() == DataType.STRING) { + if (storedType == DataType.STRING) { // NOTE: Do not limit length of default null value because we need exact value to determine whether the default // null value changes defaultNullValue = CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(defaultNullValue); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java index 92c58e929b42..1972bce38f5e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java @@ -23,6 +23,9 @@ import java.util.Map; import org.apache.pinot.common.utils.PinotDataType; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil; +import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.ComplexFieldSpec; @@ -51,16 +54,24 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol private final Object2ObjectOpenHashMap _keyStats = new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE); private final Map _keyFrequencies = new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE); - private String[] _sortedValues; + private String[] _sortedKeys; private int _minLength = Integer.MAX_VALUE; private int _maxLength = 0; private boolean _sealed = false; private ComplexFieldSpec _colFieldSpec; + private boolean _createNoDictCollectorsForKeys = false; public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) { super(column, statsCollectorConfig); _sorted = false; _colFieldSpec = (ComplexFieldSpec) statsCollectorConfig.getFieldSpecForColumn(column); + Map indexConfigsByCol = FieldIndexConfigsUtil.createIndexConfigsByColName( + statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema()); + boolean isDictionaryEnabled = indexConfigsByCol.get(column).getConfig(StandardIndexes.dictionary()).isEnabled(); + if (!isDictionaryEnabled) { + _createNoDictCollectorsForKeys = statsCollectorConfig.getTableConfig().getIndexingConfig() + .canOptimiseNoDictStatsCollection(); + } } public AbstractColumnStatisticsCollector getKeyStatistics(String key) { @@ -105,7 +116,7 @@ public void collect(Object entry) { @Override public String getMinValue() { if (_sealed) { - return _sortedValues[0]; + return _sortedKeys[0]; } throw new IllegalStateException("you must seal the collector first before asking for min value"); } @@ -113,7 +124,7 @@ public String getMinValue() { @Override public String getMaxValue() { if (_sealed) { - return _sortedValues[_sortedValues.length - 1]; + return _sortedKeys[_sortedKeys.length - 1]; } throw new IllegalStateException("you must seal the collector first before asking for max value"); } @@ -121,7 +132,7 @@ public String getMaxValue() { @Override public String[] getUniqueValuesSet() { if (_sealed) { - return _sortedValues; + return _sortedKeys; } throw new IllegalStateException("you must seal the collector first before asking for unique values set"); } @@ -156,8 +167,8 @@ public void seal() { _keyStats.get(entry.getKey()).collect(_keyStats.get(entry.getKey())._fieldSpec.getDefaultNullValue()); } } - _sortedValues = _keyStats.keySet().toArray(new String[0]); - Arrays.sort(_sortedValues); + _sortedKeys = _keyStats.keySet().toArray(new String[0]); + Arrays.sort(_sortedKeys); // Iterate through every key stats collector and seal them for (AbstractColumnStatisticsCollector keyStatsCollector : _keyStats.values()) { @@ -182,6 +193,10 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob .addField(new DimensionFieldSpec(key, convertToDataType(type), false)).build(); StatsCollectorConfig config = new StatsCollectorConfig(tableConfig, keySchema, null); + if (_createNoDictCollectorsForKeys) { + return new NoDictColumnStatisticsCollector(key, config); + } + switch (type) { case INTEGER: return new IntColumnPreIndexStatsCollector(key, config); @@ -200,7 +215,7 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob } } - static FieldSpec.DataType convertToDataType(PinotDataType ty) { + private static FieldSpec.DataType convertToDataType(PinotDataType ty) { // TODO: I've been told that we already have a function to do this, so find that function and replace this switch (ty) { case BOOLEAN: diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollector.java new file mode 100644 index 000000000000..2c970de44f59 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollector.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.stats; + +import com.dynatrace.hash4j.distinctcount.UltraLogLog; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.pinot.segment.local.utils.UltraLogLogUtils; +import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.spi.utils.BigDecimalUtils; +import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Column statistics collector for no-dictionary columns that avoids storing unique values and thus reduces memory + * Behavior: + * - getUniqueValuesSet() throws NotImplementedException + * - getCardinality() returns approximate cardinality using ULL + * - Doesn't handle cases where values are of different types (e.g. int and long). This is expected. + * Individual type collectors (e.g. IntColumnPreIndexStatsCollector) also don't handle this case. + * At this point in the Pinot process, the type consistency of a key should already be enforced. + * So if such a case is encountered, it will be raised as an exception during collect() + * Doesn't handle MAP data type as MapColumnPreIndexStatsCollector is optimized for no-dictionary collection + */ +@SuppressWarnings({"rawtypes"}) +public class NoDictColumnStatisticsCollector extends AbstractColumnStatisticsCollector { + private static final Logger LOGGER = LoggerFactory.getLogger(NoDictColumnStatisticsCollector.class); + private Comparable _minValue; + private Comparable _maxValue; + private int _minLength = Integer.MAX_VALUE; + private int _maxLength = -1; // default return value is -1 + private int _maxRowLength = -1; // default return value is -1 + private boolean _sealed = false; + private final UltraLogLog _ull; + + public NoDictColumnStatisticsCollector(String column, StatsCollectorConfig statsCollectorConfig) { + super(column, statsCollectorConfig); + // Use default p; can be made configurable via StatsCollectorConfig later if needed + _ull = UltraLogLog.create(CommonConstants.Helix.DEFAULT_ULTRALOGLOG_P); + LOGGER.info("Initialized NoDictColumnStatisticsCollector for column: {}", column); + } + + @Override + public void collect(Object entry) { + assert !_sealed; + if (entry instanceof Object[]) { + Object[] values = (Object[]) entry; + int rowLength = 0; + for (Object value : values) { + if (value instanceof BigDecimal) { + // BigDecimalColumnPreIndexStatsCollector doesn't support multi-value + throw new UnsupportedOperationException(); + } + updateMinMax(value); + updateUll(value); + int len = getValueLength(value); + _minLength = Math.min(_minLength, len); + _maxLength = Math.max(_maxLength, len); + rowLength += len; + } + _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, values.length); + _maxRowLength = Math.max(_maxRowLength, rowLength); + updateTotalNumberOfEntries(values); + } else if (entry instanceof int[] || entry instanceof long[] + || entry instanceof float[] || entry instanceof double[]) { + // Native multi-value types don't require length calculation because they're not variable-length + int length; + if (entry instanceof int[]) { + int[] values = (int[]) entry; + for (int value : values) { + updateMinMax(value); + updateUll(value); + } + length = values.length; + } else if (entry instanceof long[]) { + long[] values = (long[]) entry; + for (long value : values) { + updateMinMax(value); + updateUll(value); + } + length = values.length; + } else if (entry instanceof float[]) { + float[] values = (float[]) entry; + for (float value : values) { + updateMinMax(value); + updateUll(value); + } + length = values.length; + } else { + double[] values = (double[]) entry; + for (double value : values) { + updateMinMax(value); + updateUll(value); + } + length = values.length; + } + _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, length); + updateTotalNumberOfEntries(length); + } else { + Comparable value = toComparable(entry); + addressSorted(value); + updateMinMax(entry); + updateUll(entry); + int len = getValueLength(entry); + _minLength = Math.min(_minLength, len); + _maxLength = Math.max(_maxLength, len); + if (isPartitionEnabled()) { + updatePartition(value.toString()); + } + _maxRowLength = Math.max(_maxRowLength, len); + _totalNumberOfEntries++; + } + } + + private void updateMinMax(Object value) { + Comparable comp = toComparable(value); + if (_minValue == null || comp.compareTo(_minValue) < 0) { + _minValue = comp; + } + if (_maxValue == null || comp.compareTo(_maxValue) > 0) { + _maxValue = comp; + } + } + + private Comparable toComparable(Object value) { + if (value instanceof byte[]) { + return new ByteArray((byte[]) value); + } + if (value instanceof Comparable) { + return (Comparable) value; + } + throw new IllegalStateException("Unsupported value type " + value.getClass()); + } + + private int getValueLength(Object value) { + if (value instanceof byte[]) { + return ((byte[]) value).length; + } + if (value instanceof CharSequence) { + return ((CharSequence) value).toString().getBytes(StandardCharsets.UTF_8).length; + } + if (value instanceof BigDecimal) { + return BigDecimalUtils.byteSize((BigDecimal) value); + } + if (value instanceof Number) { + return 8; // fixed-width approximation as it's not actually required for numeric fields which are of fixed length + } + throw new IllegalStateException("Unsupported value type " + value.getClass()); + } + + @Override + public Object getMinValue() { + if (_sealed) { + return _minValue; + } + throw new IllegalStateException("you must seal the collector first before asking for min value"); + } + + @Override + public Object getMaxValue() { + if (_sealed) { + return _maxValue; + } + throw new IllegalStateException("you must seal the collector first before asking for max value"); + } + + @Override + public Object getUniqueValuesSet() { + throw new NotImplementedException("getUniqueValuesSet is not supported in NoDictColumnStatisticsCollector"); + } + + @Override + public int getLengthOfShortestElement() { + return _minLength == Integer.MAX_VALUE ? -1 : _minLength; + } + + @Override + public int getLengthOfLargestElement() { + return _maxLength; + } + + @Override + public int getMaxRowLengthInBytes() { + return _maxRowLength; + } + + @Override + public int getCardinality() { + // Get approximate distinct count estimate + // Increase by 5% to increase probability of not returning lower than actual cardinality + long estimate = Math.round(_ull.getDistinctCountEstimate() * 1.05); + // There are cases where ULL can overshoot the actual number of entries. + // Returning a cardinality greater than total entries can break assumptions. + return estimate > getTotalNumberOfEntries() ? getTotalNumberOfEntries() : (int) estimate; + } + + @Override + public void seal() { + _sealed = true; + } + + private void updateUll(Object value) { + // Hash and add to ULL using shared utility to ensure deterministic canonicalization + UltraLogLogUtils.hashObject(value).ifPresent(_ull::add); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java index 45c75e0c490f..737d519a8884 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java @@ -23,6 +23,9 @@ import org.apache.pinot.segment.spi.creator.ColumnStatistics; import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil; +import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -44,10 +47,22 @@ public SegmentPreIndexStatsCollectorImpl(StatsCollectorConfig statsCollectorConf @Override public void init() { _columnStatsCollectorMap = new HashMap<>(); + Map indexConfigsByCol = FieldIndexConfigsUtil.createIndexConfigsByColName( + _statsCollectorConfig.getTableConfig(), _statsCollectorConfig.getSchema()); Schema dataSchema = _statsCollectorConfig.getSchema(); for (FieldSpec fieldSpec : dataSchema.getAllFieldSpecs()) { String column = fieldSpec.getName(); + boolean dictionaryEnabled = indexConfigsByCol.get(column).getConfig(StandardIndexes.dictionary()).isEnabled(); + if (!dictionaryEnabled) { + // MAP collector is optimised for no-dictionary collection + if (!fieldSpec.getDataType().getStoredType().equals(FieldSpec.DataType.MAP)) { + if (_statsCollectorConfig.getTableConfig().getIndexingConfig().canOptimiseNoDictStatsCollection()) { + _columnStatsCollectorMap.put(column, new NoDictColumnStatisticsCollector(column, _statsCollectorConfig)); + continue; + } + } + } switch (fieldSpec.getDataType().getStoredType()) { case INT: _columnStatsCollectorMap.put(column, new IntColumnPreIndexStatsCollector(column, _statsCollectorConfig)); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java index f63e71f54e0d..a7398f6ae8f7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java @@ -31,6 +31,7 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.util.PinotDataBitSet; import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector; @@ -41,6 +42,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; @@ -48,6 +50,7 @@ import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.DictIdCompressionType; +import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo; import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; @@ -59,6 +62,7 @@ import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; @@ -72,10 +76,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.CARDINALITY; -import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE; -import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY; -import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor; +import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*; /** @@ -342,6 +343,8 @@ private void removeDictionaryFromForwardIndexDisabledColumn(String column, Segme Map metadataProperties = new HashMap<>(); metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(false)); metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(0)); + // TODO: See https://github.com/apache/pinot/pull/16921 for details + // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(-1)); SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, metadataProperties); // Remove the inverted index, FST index and range index @@ -795,15 +798,16 @@ private void forwardIndexReadRawWriteDictHelper(String column, ColumnMetadata ex private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer segmentWriter) throws Exception { - ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); - boolean isSingleValue = existingColMetadata.isSingleValue(); - - File indexDir = _segmentDirectory.getSegmentMetadata().getIndexDir(); - String segmentName = _segmentDirectory.getSegmentMetadata().getName(); + SegmentMetadataImpl segmentMetadata = _segmentDirectory.getSegmentMetadata(); + File indexDir = segmentMetadata.getIndexDir(); + String segmentName = segmentMetadata.getName(); File inProgress = new File(indexDir, column + ".dict.inprogress"); File dictionaryFile = new File(indexDir, column + V1Constants.Dict.FILE_EXTENSION); + + ColumnMetadata existingColMetadata = segmentMetadata.getColumnMetadataFor(column); + FieldSpec fieldSpec = existingColMetadata.getFieldSpec(); String fwdIndexFileExtension; - if (isSingleValue) { + if (fieldSpec.isSingleValueField()) { if (existingColMetadata.isSorted()) { fwdIndexFileExtension = V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION; } else { @@ -825,22 +829,62 @@ private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer FileUtils.deleteQuietly(dictionaryFile); } - LOGGER.info("Creating a new dictionary for segment={} and column={}", segmentName, column); - AbstractColumnStatisticsCollector statsCollector = - getStatsCollector(column, existingColMetadata.getDataType().getStoredType()); - SegmentDictionaryCreator dictionaryCreator = - buildDictionary(column, existingColMetadata, segmentWriter, statsCollector); - LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile, StandardIndexes.dictionary()); + AbstractColumnStatisticsCollector statsCollector; + SegmentDictionaryCreator dictionaryCreator; + try (ForwardIndexReader reader = StandardIndexes.forward() + .getReaderFactory() + .createIndexReader(segmentWriter, _fieldIndexConfigs.get(column), existingColMetadata)) { + assert reader != null; + + LOGGER.info("Creating a new dictionary for segment={} and column={}", segmentName, column); + int numDocs = existingColMetadata.getTotalDocs(); + statsCollector = getStatsCollector(column, fieldSpec.getDataType().getStoredType()); + // NOTE: + // Special null handling is not necessary here. This is because, the existing default null value in the raw + // forwardIndex will be retained as such while created the dictionary and dict-based forward index. Also, null + // value vectors maintain a bitmap of docIds. No handling is necessary there. + try (PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(reader, null, null, + existingColMetadata.getMaxNumberOfMultiValues())) { + for (int i = 0; i < numDocs; i++) { + statsCollector.collect(columnReader.getValue(i)); + } + statsCollector.seal(); + } + DictionaryIndexConfig dictConf = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.dictionary()); + boolean optimizeDictionaryType = _tableConfig.getIndexingConfig().isOptimizeDictionaryType(); + boolean useVarLength = dictConf.getUseVarLengthDictionary() || DictionaryIndexType.shouldUseVarLengthDictionary( + reader.getStoredType(), statsCollector) || (optimizeDictionaryType + && DictionaryIndexType.optimizeTypeShouldUseVarLengthDictionary(reader.getStoredType(), statsCollector)); + dictionaryCreator = new SegmentDictionaryCreator(fieldSpec, segmentMetadata.getIndexDir(), useVarLength); + dictionaryCreator.build(statsCollector.getUniqueValuesSet()); + + LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index for segment={} and column={}", + segmentName, column); + ColumnIndexCreationInfo creationInfo = + new ColumnIndexCreationInfo(statsCollector, true, useVarLength, false, fieldSpec.getDefaultNullValue()); + IndexCreationContext context = IndexCreationContext.builder() + .withIndexDir(indexDir) + .withFieldSpec(fieldSpec) + .withColumnIndexCreationInfo(creationInfo) + .withTotalDocs(numDocs) + .withDictionary(true) + .withTableNameWithType(_tableConfig.getTableName()) + .withContinueOnError( + _tableConfig.getIngestionConfig() != null && _tableConfig.getIngestionConfig().isContinueOnError()) + .build(); + ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()); + try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) { + forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, dictionaryCreator, null); + } + } - LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index for segment={} and column={}", - segmentName, column); - writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter, indexDir, dictionaryCreator); // We used the existing forward index to generate a new forward index. The existing forward index will be in V3 // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed // anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The // actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is // called during segmentWriter.close(). segmentWriter.removeIndex(column, StandardIndexes.forward()); + LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile, StandardIndexes.dictionary()); LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, StandardIndexes.forward()); LOGGER.info("Created forwardIndex. Updating metadata properties for segment={} and column={}", segmentName, column); @@ -851,7 +895,10 @@ private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer // If realtime segments were completed when the column was RAW, the cardinality value is populated as Integer // .MIN_VALUE. When dictionary is enabled for this column later, cardinality value should be rightly populated so // that the dictionary can be loaded. - metadataProperties.put(getKeyFor(column, CARDINALITY), String.valueOf(statsCollector.getCardinality())); + int cardinality = statsCollector.getCardinality(); + metadataProperties.put(getKeyFor(column, CARDINALITY), String.valueOf(cardinality)); + metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), + String.valueOf(PinotDataBitSet.getNumBitsPerValue(cardinality - 1))); SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, metadataProperties); // We remove indexes that have to be rewritten when a dictEnabled is toggled. Note that the respective index @@ -864,64 +911,6 @@ private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer LOGGER.info("Created dictionary based forward index for segment: {}, column: {}", segmentName, column); } - private SegmentDictionaryCreator buildDictionary(String column, ColumnMetadata existingColMetadata, - SegmentDirectory.Writer segmentWriter, AbstractColumnStatisticsCollector statsCollector) - throws Exception { - int numDocs = existingColMetadata.getTotalDocs(); - - // Get the forward index reader factory and create a reader - IndexReaderFactory readerFactory = StandardIndexes.forward().getReaderFactory(); - try (ForwardIndexReader reader = readerFactory.createIndexReader(segmentWriter, _fieldIndexConfigs.get(column), - existingColMetadata)) { - // Note: Special Null handling is not necessary here. This is because, the existing default null value in the - // raw forwardIndex will be retained as such while created the dictionary and dict-based forward index. Also, - // null value vectors maintain a bitmap of docIds. No handling is necessary there. - PinotSegmentColumnReader columnReader = - new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues()); - for (int i = 0; i < numDocs; i++) { - Object obj = columnReader.getValue(i); - statsCollector.collect(obj); - } - statsCollector.seal(); - - DictionaryIndexConfig dictConf = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.dictionary()); - - boolean optimizeDictionaryType = _tableConfig.getIndexingConfig().isOptimizeDictionaryType(); - boolean useVarLength = dictConf.getUseVarLengthDictionary() || DictionaryIndexType.shouldUseVarLengthDictionary( - reader.getStoredType(), statsCollector) || (optimizeDictionaryType - && DictionaryIndexType.optimizeTypeShouldUseVarLengthDictionary(reader.getStoredType(), statsCollector)); - SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(), - _segmentDirectory.getSegmentMetadata().getIndexDir(), useVarLength); - - dictionaryCreator.build(statsCollector.getUniqueValuesSet()); - return dictionaryCreator; - } - } - - private void writeDictEnabledForwardIndex(String column, ColumnMetadata existingColMetadata, - SegmentDirectory.Writer segmentWriter, File indexDir, SegmentDictionaryCreator dictionaryCreator) - throws Exception { - // Get the forward index reader factory and create a reader - IndexReaderFactory readerFactory = StandardIndexes.forward().getReaderFactory(); - try (ForwardIndexReader reader = readerFactory.createIndexReader(segmentWriter, _fieldIndexConfigs.get(column), - existingColMetadata)) { - IndexCreationContext.Builder builder = - IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata) - .withTableNameWithType(_tableConfig.getTableName()) - .withContinueOnError(_tableConfig.getIngestionConfig() != null - && _tableConfig.getIngestionConfig().isContinueOnError()); - // existingColMetadata has dictEnable=false. Overwrite the value. - builder.withDictionary(true); - IndexCreationContext context = builder.build(); - ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()); - - try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) { - int numDocs = existingColMetadata.getTotalDocs(); - forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, dictionaryCreator, null); - } - } - } - static void removeDictRelatedIndexes(String column, SegmentDirectory.Writer segmentWriter) { // TODO: Move this logic as a static function in each index creator. @@ -965,6 +954,8 @@ private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDir Map metadataProperties = new HashMap<>(); metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(false)); metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(0)); + // TODO: See https://github.com/apache/pinot/pull/16921 for details + // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(-1)); SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, metadataProperties); // Remove range index, inverted index and FST index. @@ -1036,6 +1027,13 @@ private int getMaxRowLength(ColumnMetadata columnMetadata, ForwardIndexReader private AbstractColumnStatisticsCollector getStatsCollector(String column, DataType storedType) { StatsCollectorConfig statsCollectorConfig = new StatsCollectorConfig(_tableConfig, _schema, null); + boolean dictionaryEnabled = hasIndex(column, StandardIndexes.dictionary()); + // MAP collector is optimised for no-dictionary collection + if (!dictionaryEnabled && storedType != DataType.MAP) { + if (_tableConfig.getIndexingConfig().canOptimiseNoDictStatsCollection()) { + return new NoDictColumnStatisticsCollector(column, statsCollectorConfig); + } + } switch (storedType) { case INT: return new IntColumnPreIndexStatsCollector(column, statsCollectorConfig); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java index cf3d4b47e2f1..9e267f90ef40 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java @@ -216,9 +216,11 @@ public void regenerateForwardIndex() // MV columns, in addition to dictionary related metadata, MAX_MULTI_VALUE_ELEMENTS and TOTAL_NUMBER_OF_ENTRIES // may be modified which can be left behind in the modified state even on forward index deletion. LOGGER.info("Created forward index from inverted index and dictionary. Updating metadata properties for " - + "segment: {}, column: {}, property list: {}, is temporary: {}", segmentName, _columnName, + + "segment: {}, column: {}, property list: {}, is temporary: {}", segmentName, _columnName, metadataProperties, _isTemporaryForwardIndex); - _segmentMetadata = SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, metadataProperties); + if (!metadataProperties.isEmpty()) { + _segmentMetadata = SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, metadataProperties); + } } catch (Exception e) { throw new IOException( String.format("Failed to update metadata properties for segment: %s, column: %s", segmentName, _columnName), @@ -280,11 +282,16 @@ private Map createForwardIndexForSVColumn() writeToForwardIndex(dictionary, context); // Setup and return the metadata properties to update - Map metadataProperties = new HashMap<>(); - metadataProperties.put(getKeyFor(_columnName, HAS_DICTIONARY), String.valueOf(_dictionaryEnabled)); - metadataProperties.put(getKeyFor(_columnName, DICTIONARY_ELEMENT_SIZE), - String.valueOf(_dictionaryEnabled ? _columnMetadata.getColumnMaxLength() : 0)); - return metadataProperties; + if (_dictionaryEnabled) { + return Map.of(); + } else { + return Map.of( + getKeyFor(_columnName, HAS_DICTIONARY), String.valueOf(false), + getKeyFor(_columnName, DICTIONARY_ELEMENT_SIZE), String.valueOf(0) + // TODO: See https://github.com/apache/pinot/pull/16921 for details + // getKeyFor(_columnName, BITS_PER_ELEMENT), String.valueOf(-1) + ); + } } } @@ -368,13 +375,15 @@ private Map createForwardIndexForMVColumn() // Setup and return the metadata properties to update Map metadataProperties = new HashMap<>(); - metadataProperties.put(getKeyFor(_columnName, HAS_DICTIONARY), String.valueOf(_dictionaryEnabled)); - metadataProperties.put(getKeyFor(_columnName, DICTIONARY_ELEMENT_SIZE), - String.valueOf(_dictionaryEnabled ? _columnMetadata.getColumnMaxLength() : 0)); metadataProperties.put(getKeyFor(_columnName, MAX_MULTI_VALUE_ELEMENTS), String.valueOf(maxNumberOfMultiValues[0])); - metadataProperties.put(getKeyFor(_columnName, TOTAL_NUMBER_OF_ENTRIES), - String.valueOf(_nextValueId)); + metadataProperties.put(getKeyFor(_columnName, TOTAL_NUMBER_OF_ENTRIES), String.valueOf(_nextValueId)); + if (!_dictionaryEnabled) { + metadataProperties.put(getKeyFor(_columnName, HAS_DICTIONARY), String.valueOf(false)); + metadataProperties.put(getKeyFor(_columnName, DICTIONARY_ELEMENT_SIZE), String.valueOf(0)); + // TODO: See https://github.com/apache/pinot/pull/16921 for details + // metadataProperties.put(getKeyFor(_columnName, BITS_PER_ELEMENT), String.valueOf(-1)); + } return metadataProperties; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java index 45bb854516ba..7875dca531a0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java @@ -41,11 +41,13 @@ import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator; +import org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory; @@ -668,6 +670,10 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct fieldIndexConfigs != null ? fieldIndexConfigs.getConfig(StandardIndexes.dictionary()) : DictionaryIndexConfig.DEFAULT; boolean createDictionary = dictionaryIndexConfig.isEnabled(); + boolean useNoDictColumnStatsCollector = false; + if (!dictionaryIndexConfig.isEnabled()) { + useNoDictColumnStatsCollector = _tableConfig.getIndexingConfig().canOptimiseNoDictStatsCollection(); + } StatsCollectorConfig statsCollectorConfig = new StatsCollectorConfig(_tableConfig, _schema, null); ColumnIndexCreationInfo indexCreationInfo; boolean isSingleValue = fieldSpec.isSingleValueField(); @@ -677,8 +683,9 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct outputValues[i] = getIntOutputValue(outputValues[i], isSingleValue, outputValueType, (Integer) fieldSpec.getDefaultNullValue(), createDictionary); } - IntColumnPreIndexStatsCollector statsCollector = - new IntColumnPreIndexStatsCollector(column, statsCollectorConfig); + AbstractColumnStatisticsCollector statsCollector = !useNoDictColumnStatsCollector + ? new IntColumnPreIndexStatsCollector(column, statsCollectorConfig) + : new NoDictColumnStatisticsCollector(column, statsCollectorConfig); for (Object value : outputValues) { statsCollector.collect(value); } @@ -693,8 +700,9 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct outputValues[i] = getLongOutputValue(outputValues[i], isSingleValue, outputValueType, (Long) fieldSpec.getDefaultNullValue(), createDictionary); } - LongColumnPreIndexStatsCollector statsCollector = - new LongColumnPreIndexStatsCollector(column, statsCollectorConfig); + AbstractColumnStatisticsCollector statsCollector = !useNoDictColumnStatsCollector + ? new LongColumnPreIndexStatsCollector(column, statsCollectorConfig) + : new NoDictColumnStatisticsCollector(column, statsCollectorConfig); for (Object value : outputValues) { statsCollector.collect(value); } @@ -709,8 +717,9 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct outputValues[i] = getFloatOutputValue(outputValues[i], isSingleValue, outputValueType, (Float) fieldSpec.getDefaultNullValue(), createDictionary); } - FloatColumnPreIndexStatsCollector statsCollector = - new FloatColumnPreIndexStatsCollector(column, statsCollectorConfig); + AbstractColumnStatisticsCollector statsCollector = !useNoDictColumnStatsCollector + ? new FloatColumnPreIndexStatsCollector(column, statsCollectorConfig) + : new NoDictColumnStatisticsCollector(column, statsCollectorConfig); for (Object value : outputValues) { statsCollector.collect(value); } @@ -725,8 +734,9 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct outputValues[i] = getDoubleOutputValue(outputValues[i], isSingleValue, outputValueType, (Double) fieldSpec.getDefaultNullValue(), createDictionary); } - DoubleColumnPreIndexStatsCollector statsCollector = - new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig); + AbstractColumnStatisticsCollector statsCollector = !useNoDictColumnStatsCollector + ? new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig) + : new NoDictColumnStatisticsCollector(column, statsCollectorConfig); for (Object value : outputValues) { statsCollector.collect(value); } @@ -747,8 +757,9 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct outputValues[i] = outputValueType.toBigDecimal(outputValues[i]); } } - DoubleColumnPreIndexStatsCollector statsCollector = - new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig); + AbstractColumnStatisticsCollector statsCollector = !useNoDictColumnStatsCollector + ? new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig) + : new NoDictColumnStatisticsCollector(column, statsCollectorConfig); for (Object value : outputValues) { statsCollector.collect(value); } @@ -763,8 +774,9 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct outputValues[i] = getStringOutputValue(outputValues[i], isSingleValue, outputValueType, (String) fieldSpec.getDefaultNullValue()); } - StringColumnPreIndexStatsCollector statsCollector = - new StringColumnPreIndexStatsCollector(column, statsCollectorConfig); + AbstractColumnStatisticsCollector statsCollector = !useNoDictColumnStatsCollector + ? new StringColumnPreIndexStatsCollector(column, statsCollectorConfig) + : new NoDictColumnStatisticsCollector(column, statsCollectorConfig); for (Object value : outputValues) { statsCollector.collect(value); } @@ -778,8 +790,9 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct outputValues[i] = getBytesOutputValue(outputValues[i], isSingleValue, outputValueType, (byte[]) fieldSpec.getDefaultNullValue()); } - BytesColumnPredIndexStatsCollector statsCollector = - new BytesColumnPredIndexStatsCollector(column, statsCollectorConfig); + AbstractColumnStatisticsCollector statsCollector = !useNoDictColumnStatsCollector + ? new BytesColumnPredIndexStatsCollector(column, statsCollectorConfig) + : new NoDictColumnStatisticsCollector(column, statsCollectorConfig); for (Object value : outputValues) { statsCollector.collect(value); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java index 8514571826e7..d60bb1c9f9a6 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java @@ -69,8 +69,8 @@ import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.util.TestUtils; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -83,12 +83,12 @@ public class DictionariesTest implements PinotBuffersAfterMethodCheckRule { private static TableConfig _tableConfig; - @AfterClass + @AfterMethod public static void cleanup() { FileUtils.deleteQuietly(INDEX_DIR); } - @BeforeClass + @BeforeMethod public static void before() throws Exception { final String filePath = diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java index f7751eeb20ef..56e6fdcb4963 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java @@ -19,9 +19,14 @@ package org.apache.pinot.segment.local.segment.creator.impl.stats; import java.math.BigDecimal; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.ComplexFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; @@ -35,9 +40,13 @@ public class MapColumnPreIndexStatsCollectorTest { - private static StatsCollectorConfig newConfig() { + private static StatsCollectorConfig newConfig(boolean optimiseNoDictStatsCollection) { TableConfig tableConfig = new TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE) .setTableName("testTable") + .setOptimiseNoDictStatsCollection(optimiseNoDictStatsCollection) + .setSegmentPartitionConfig(new SegmentPartitionConfig( + Collections.singletonMap("col", new ColumnPartitionConfig("murmur", 4)))) + .setNoDictionaryColumns(java.util.List.of("col")) .build(); Map children = new HashMap<>(); @@ -76,7 +85,7 @@ public void testMapCollector() { r3.put("kFloat", 3.5f); r3.put("kBigDec", new BigDecimal("5.25")); - StatsCollectorConfig statsCollectorConfig = newConfig(); + StatsCollectorConfig statsCollectorConfig = newConfig(false); MapColumnPreIndexStatsCollector mapCollector = new MapColumnPreIndexStatsCollector("col", statsCollectorConfig); @@ -155,4 +164,88 @@ public void testMapCollector() { assertFalse(keyBigDecStats.isSorted()); assertTrue(keyBigDecStats instanceof BigDecimalColumnPreIndexStatsCollector); } + + @Test + public void testKeyCollectorsUseNoDictWhenEnabledAndMatchOutputs() { + // Prepare mixed-type values across keys + Map r1 = new HashMap<>(); + r1.put("kStr", "alpha"); + r1.put("kInt", 3); + r1.put("kLong", 7L); + r1.put("kFloat", 1.5f); + r1.put("kDouble", 2.25d); + r1.put("kBigDec", new BigDecimal("10.01")); + + Map r2 = new HashMap<>(); + r2.put("kStr", "beta"); + r2.put("kInt", 3); // duplicate for cardinality checks + r2.put("kLong", 2L); + r2.put("kFloat", 1.5f); // duplicate for cardinality checks + r2.put("kDouble", 0.75d); + r2.put("kBigDec", new BigDecimal("10.01")); // duplicate for cardinality checks + + Map r3 = new HashMap<>(); + r3.put("kStr", "alpha"); + r3.put("kInt", 3); + r3.put("kFloat", 3.5f); + r3.put("kBigDec", new BigDecimal("5.25")); + + StatsCollectorConfig cfgNoDict = newConfig(true); + StatsCollectorConfig cfgDict = newConfig(false); + + MapColumnPreIndexStatsCollector mapNoDict = new MapColumnPreIndexStatsCollector("col", cfgNoDict); + MapColumnPreIndexStatsCollector mapDict = new MapColumnPreIndexStatsCollector("col", cfgDict); + + mapNoDict.collect(r1); + mapNoDict.collect(r2); + mapNoDict.collect(r3); + mapNoDict.seal(); + + mapDict.collect(r1); + mapDict.collect(r2); + mapDict.collect(r3); + mapDict.seal(); + + // Compare public outputs on the map collectors + assertEquals(mapNoDict.getCardinality(), mapDict.getCardinality()); + assertEquals(mapNoDict.getMinValue(), mapDict.getMinValue()); + assertEquals(mapNoDict.getMaxValue(), mapDict.getMaxValue()); + assertEquals(mapNoDict.getTotalNumberOfEntries(), mapDict.getTotalNumberOfEntries()); + assertEquals(mapNoDict.getMaxNumberOfMultiValues(), mapDict.getMaxNumberOfMultiValues()); + assertEquals(mapNoDict.isSorted(), mapDict.isSorted()); + assertEquals(mapNoDict.getLengthOfShortestElement(), mapDict.getLengthOfShortestElement()); + assertEquals(mapNoDict.getLengthOfLargestElement(), mapDict.getLengthOfLargestElement()); + assertEquals(mapNoDict.getMaxRowLengthInBytes(), mapDict.getMaxRowLengthInBytes()); + + // Partition metadata + PartitionFunction pfNoDict = mapNoDict.getPartitionFunction(); + PartitionFunction pfDict = mapDict.getPartitionFunction(); + if (pfNoDict == null || pfDict == null) { + assertNull(pfNoDict); + assertNull(pfDict); + } else { + assertEquals(pfNoDict.getName(), pfDict.getName()); + assertEquals(mapNoDict.getNumPartitions(), mapDict.getNumPartitions()); + Set partsNoDict = mapNoDict.getPartitions(); + Set partsDict = mapDict.getPartitions(); + assertEquals(partsNoDict, partsDict); + } + + // Compare per-key collectors exposed via getKeyStatistics + for (String key : mapNoDict.getAllKeyFrequencies().keySet()) { + AbstractColumnStatisticsCollector keyNoDict = mapNoDict.getKeyStatistics(key); + AbstractColumnStatisticsCollector keyDict = mapDict.getKeyStatistics(key); + assertNotNull(keyNoDict, "missing key in no-dict collector: " + key); + assertNotNull(keyDict, "missing key in dict collector: " + key); + + assertEquals(keyNoDict.getCardinality(), keyDict.getCardinality(), "cardinality mismatch for key " + key); + assertEquals(keyNoDict.getMinValue(), keyDict.getMinValue(), "min mismatch for key " + key); + assertEquals(keyNoDict.getMaxValue(), keyDict.getMaxValue(), "max mismatch for key " + key); + assertEquals(keyNoDict.getTotalNumberOfEntries(), keyDict.getTotalNumberOfEntries(), + "entries mismatch for key " + key); + assertEquals(keyNoDict.getMaxNumberOfMultiValues(), keyDict.getMaxNumberOfMultiValues(), + "max MV mismatch for key " + key); + assertEquals(keyNoDict.isSorted(), keyDict.isSorted(), "sorted mismatch for key " + key); + } + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollectorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollectorTest.java new file mode 100644 index 000000000000..2ccdbe3b3d95 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollectorTest.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.stats; + +import java.math.BigDecimal; +import java.util.Collections; +import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class NoDictColumnStatisticsCollectorTest { + + private static StatsCollectorConfig newConfig(FieldSpec.DataType dataType, boolean isSingleValue) { + TableConfig tableConfig = new TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE) + .setTableName("testTable") + .setSegmentPartitionConfig(new SegmentPartitionConfig( + Collections.singletonMap("col", new ColumnPartitionConfig("murmur", 4)))) + .build(); + Schema schema = new Schema(); + schema.addField(new DimensionFieldSpec("col", dataType, isSingleValue)); + + return new StatsCollectorConfig(tableConfig, schema, tableConfig.getIndexingConfig().getSegmentPartitionConfig()); + } + + @DataProvider(name = "primitiveTypeTestData") + public Object[][] primitiveTypeTestData() { + return new Object[][] { + // Ensure data has exactly 1 duplicate entry and total 4 entries + + // Sorted data + {FieldSpec.DataType.INT, new Object[]{5, 5, 10, 20}, true}, + {FieldSpec.DataType.LONG, new Object[]{1L, 1L, 15L, 25L}, true}, + {FieldSpec.DataType.FLOAT, new Object[]{1.5f, 1.5f, 3.5f, 7.5f}, true}, + {FieldSpec.DataType.DOUBLE, new Object[]{2.5, 2.5, 4.5, 8.5}, true}, + + // Unsorted data + {FieldSpec.DataType.INT, new Object[]{10, 5, 20, 5}, false}, + {FieldSpec.DataType.LONG, new Object[]{15L, 1L, 25L, 1L}, false}, + {FieldSpec.DataType.FLOAT, new Object[]{3.5f, 1.5f, 7.5f, 1.5f}, false}, + {FieldSpec.DataType.DOUBLE, new Object[]{4.5, 2.5, 8.5, 2.5}, false} + }; + } + + @DataProvider(name = "stringTypeTestData") + public Object[][] stringTypeTestData() { + return new Object[][] { + // Ensure data has exactly 1 duplicate entry and total 4 entries + // Sorted data + {new String[]{"a", "a", "bbb", "ccc"}, true}, + // Unsorted data + {new String[]{"bbb", "a", "ccc", "a"}, false} + }; + } + + @DataProvider(name = "bytesTypeTestData") + public Object[][] bytesTypeTestData() { + return new Object[][] { + // Ensure data has exactly 1 duplicate entry and total 4 entries + // Sorted data + {new byte[][]{new byte[]{1}, new byte[]{1}, new byte[]{2}, new byte[]{3}}, true}, + // Unsorted data + {new byte[][]{new byte[]{2}, new byte[]{1}, new byte[]{1}, new byte[]{3}}, false} + }; + } + + @DataProvider(name = "bigDecimalTypeTestData") + public Object[][] bigDecimalTypeTestData() { + return new Object[][] { + // Ensure data has exactly 1 duplicate entry and total 4 entries + // Sorted data + {new BigDecimal[]{ + new BigDecimal("1.23"), new BigDecimal("1.23"), new BigDecimal("2.34"), new BigDecimal("9.99")}, true}, + // Unsorted data + {new BigDecimal[]{ + new BigDecimal("2.34"), new BigDecimal("1.23"), new BigDecimal("9.99"), new BigDecimal("1.23")}, false} + }; + } + + @Test(dataProvider = "primitiveTypeTestData") + public void testSVPrimitiveTypes(FieldSpec.DataType dataType, Object[] entries, boolean isSorted) { + NoDictColumnStatisticsCollector c = new NoDictColumnStatisticsCollector("col", + newConfig(dataType, true)); + for (Object entry : entries) { + c.collect(entry); + } + c.seal(); + + AbstractColumnStatisticsCollector expectedStatsCollector = null; + switch (dataType) { + case INT: + expectedStatsCollector = new IntColumnPreIndexStatsCollector("col", newConfig(dataType, true)); + break; + case LONG: + expectedStatsCollector = new LongColumnPreIndexStatsCollector("col", newConfig(dataType, true)); + break; + case FLOAT: + expectedStatsCollector = new FloatColumnPreIndexStatsCollector("col", newConfig(dataType, true)); + break; + case DOUBLE: + expectedStatsCollector = new DoubleColumnPreIndexStatsCollector("col", newConfig(dataType, true)); + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + for (Object entry : entries) { + expectedStatsCollector.collect(entry); + } + expectedStatsCollector.seal(); + + assertEquals(c.getCardinality(), 3); + assertEquals(c.getMinValue(), expectedStatsCollector.getMinValue()); + assertEquals(c.getMaxValue(), expectedStatsCollector.getMaxValue()); + assertEquals(c.getTotalNumberOfEntries(), 4); + assertEquals(c.getMaxNumberOfMultiValues(), 0); + assertEquals(c.isSorted(), isSorted); + assertEquals(c.getLengthOfShortestElement(), 8); + assertEquals(c.getLengthOfLargestElement(), 8); + assertEquals(c.getMaxRowLengthInBytes(), 8); + assertEquals(c.getPartitions(), expectedStatsCollector.getPartitions()); + } + + @Test(dataProvider = "stringTypeTestData") + public void testSVString(String[] entries, boolean isSorted) { + NoDictColumnStatisticsCollector c = new NoDictColumnStatisticsCollector("col", + newConfig(FieldSpec.DataType.STRING, true)); + for (String e : entries) { + c.collect(e); + } + c.seal(); + + StringColumnPreIndexStatsCollector stringStats = new StringColumnPreIndexStatsCollector("col", + newConfig(FieldSpec.DataType.STRING, true)); + for (String e : entries) { + stringStats.collect(e); + } + stringStats.seal(); + + assertEquals(c.getCardinality(), 3); + assertEquals(c.getMinValue(), "a"); + assertEquals(c.getMaxValue(), "ccc"); + assertEquals(c.getTotalNumberOfEntries(), entries.length); + assertEquals(c.getMaxNumberOfMultiValues(), 0); + assertEquals(c.isSorted(), isSorted); + assertEquals(c.getLengthOfShortestElement(), 1); + assertEquals(c.getLengthOfLargestElement(), 3); + assertEquals(c.getMaxRowLengthInBytes(), 3); + assertEquals(c.getPartitions(), stringStats.getPartitions()); + } + + @Test(dataProvider = "bytesTypeTestData") + public void testSVBytes(byte[][] entries, boolean isSorted) { + NoDictColumnStatisticsCollector c = new NoDictColumnStatisticsCollector("col", + newConfig(FieldSpec.DataType.BYTES, true)); + for (byte[] e : entries) { + c.collect(e); + } + c.seal(); + + BytesColumnPredIndexStatsCollector bytesStats = new BytesColumnPredIndexStatsCollector("col", + newConfig(FieldSpec.DataType.BYTES, true)); + for (byte[] e : entries) { + bytesStats.collect(e); + } + bytesStats.seal(); + + assertEquals(c.getCardinality(), bytesStats.getCardinality()); + assertEquals(c.getMinValue(), bytesStats.getMinValue()); + assertEquals(c.getMaxValue(), bytesStats.getMaxValue()); + assertEquals(c.getTotalNumberOfEntries(), entries.length); + assertEquals(c.getMaxNumberOfMultiValues(), 0); + assertEquals(c.isSorted(), isSorted); + assertEquals(c.getLengthOfShortestElement(), bytesStats.getLengthOfShortestElement()); + assertEquals(c.getLengthOfLargestElement(), bytesStats.getLengthOfLargestElement()); + assertEquals(c.getMaxRowLengthInBytes(), bytesStats.getMaxRowLengthInBytes()); + assertEquals(c.getPartitions(), bytesStats.getPartitions()); + } + + @Test(dataProvider = "bigDecimalTypeTestData") + public void testSVBigDecimal(BigDecimal[] entries, boolean isSorted) { + NoDictColumnStatisticsCollector c = new NoDictColumnStatisticsCollector("col", + newConfig(FieldSpec.DataType.BIG_DECIMAL, true)); + for (BigDecimal e : entries) { + c.collect(e); + } + c.seal(); + + BigDecimalColumnPreIndexStatsCollector bigDecimalStats = new BigDecimalColumnPreIndexStatsCollector("col", + newConfig(FieldSpec.DataType.BIG_DECIMAL, true)); + for (BigDecimal e : entries) { + bigDecimalStats.collect(e); + } + bigDecimalStats.seal(); + + assertEquals(c.getCardinality(), 3); + assertEquals(c.getMinValue(), bigDecimalStats.getMinValue()); + assertEquals(c.getMaxValue(), bigDecimalStats.getMaxValue()); + assertEquals(c.getTotalNumberOfEntries(), entries.length); + assertEquals(c.getMaxNumberOfMultiValues(), 0); + assertEquals(c.isSorted(), isSorted); + assertEquals(c.getLengthOfShortestElement(), bigDecimalStats.getLengthOfShortestElement()); + assertEquals(c.getLengthOfLargestElement(), bigDecimalStats.getLengthOfLargestElement()); + assertEquals(c.getMaxRowLengthInBytes(), bigDecimalStats.getMaxRowLengthInBytes()); + assertEquals(c.getPartitions(), bigDecimalStats.getPartitions()); + } + + @DataProvider(name = "primitiveMVTypeTestData") + public Object[][] primitiveMVTypeTestData() { + return new Object[][] { + // Two MV rows with one duplicate across total 4 values -> cardinality 3 + {FieldSpec.DataType.INT, new int[][]{new int[]{5, 10}, new int[]{5, 20}}}, + {FieldSpec.DataType.LONG, new long[][]{new long[]{1L, 15L}, new long[]{1L, 25L}}}, + {FieldSpec.DataType.FLOAT, new float[][]{new float[]{1.5f, 3.5f}, new float[]{1.5f, 7.5f}}}, + {FieldSpec.DataType.DOUBLE, new double[][]{new double[]{2.5, 4.5}, new double[]{2.5, 8.5}}} + }; + } + + @Test(dataProvider = "primitiveMVTypeTestData") + public void testMVPrimitiveTypes(FieldSpec.DataType dataType, Object entries) { + // Validate MV behavior for numeric primitives using native arrays + // - isSorted should be false for MV columns + // - lengths are not tracked for native MV primitives (expect -1) + // - maxNumberOfMultiValues should reflect per-row MV length + NoDictColumnStatisticsCollector c; + AbstractColumnStatisticsCollector expectedStatsCollector; + + c = new NoDictColumnStatisticsCollector("col", newConfig(dataType, false)); + + if (entries instanceof int[][]) { + expectedStatsCollector = new IntColumnPreIndexStatsCollector("col", newConfig(dataType, false)); + for (int[] row : (int[][]) entries) { + c.collect(row); + expectedStatsCollector.collect(row); + } + } else if (entries instanceof long[][]) { + expectedStatsCollector = new LongColumnPreIndexStatsCollector("col", newConfig(dataType, false)); + for (long[] row : (long[][]) entries) { + c.collect(row); + expectedStatsCollector.collect(row); + } + } else if (entries instanceof float[][]) { + expectedStatsCollector = new FloatColumnPreIndexStatsCollector("col", newConfig(dataType, false)); + for (float[] row : (float[][]) entries) { + c.collect(row); + expectedStatsCollector.collect(row); + } + } else { + expectedStatsCollector = new DoubleColumnPreIndexStatsCollector("col", newConfig(dataType, false)); + for (double[] row : (double[][]) entries) { + c.collect(row); + expectedStatsCollector.collect(row); + } + } + + c.seal(); + expectedStatsCollector.seal(); + + assertEquals(c.getCardinality(), expectedStatsCollector.getCardinality()); + assertEquals(c.getMinValue(), expectedStatsCollector.getMinValue()); + assertEquals(c.getMaxValue(), expectedStatsCollector.getMaxValue()); + assertEquals(c.getTotalNumberOfEntries(), 4); + assertEquals(c.getMaxNumberOfMultiValues(), 2); + assertFalse(c.isSorted()); + assertEquals(c.getLengthOfShortestElement(), expectedStatsCollector.getLengthOfShortestElement()); + assertEquals(c.getLengthOfLargestElement(), expectedStatsCollector.getLengthOfLargestElement()); + assertEquals(c.getMaxRowLengthInBytes(), expectedStatsCollector.getMaxRowLengthInBytes()); + assertEquals(c.getPartitions(), expectedStatsCollector.getPartitions()); + } + + @DataProvider(name = "stringMVTypeTestData") + public Object[][] stringMVTypeTestData() { + return new Object[][] { + // Two MV rows with one duplicate across total 4 values -> cardinality 3 + {new String[][]{new String[]{"a", "bbb"}, new String[]{"a", "ccc"}}} + }; + } + + @Test(dataProvider = "stringMVTypeTestData") + public void testMVString(String[][] rows) { + NoDictColumnStatisticsCollector c = new NoDictColumnStatisticsCollector("col", + newConfig(FieldSpec.DataType.STRING, false)); + for (String[] r : rows) { + c.collect(r); + } + c.seal(); + + StringColumnPreIndexStatsCollector stringStats = new StringColumnPreIndexStatsCollector("col", + newConfig(FieldSpec.DataType.STRING, false)); + for (String[] r : rows) { + stringStats.collect(r); + } + stringStats.seal(); + + assertEquals(c.getCardinality(), stringStats.getCardinality()); + assertEquals(c.getMinValue(), stringStats.getMinValue()); + assertEquals(c.getMaxValue(), stringStats.getMaxValue()); + assertEquals(c.getTotalNumberOfEntries(), 4); + assertEquals(c.getMaxNumberOfMultiValues(), 2); + assertFalse(c.isSorted()); + assertEquals(c.getLengthOfShortestElement(), 1); + assertEquals(c.getLengthOfLargestElement(), 3); + assertEquals(c.getMaxRowLengthInBytes(), 4); + assertEquals(c.getPartitions(), stringStats.getPartitions()); + } + + @DataProvider(name = "bytesMVTypeTestData") + public Object[][] bytesMVTypeTestData() { + return new Object[][] { + // Two MV rows with one duplicate across total 4 values + {new byte[][][]{new byte[][]{new byte[]{1}, new byte[]{2}}, new byte[][]{new byte[]{1}, new byte[]{3}}}} + }; + } + + @Test(dataProvider = "bytesMVTypeTestData") + public void testMVBytes(byte[][][] rows) { + NoDictColumnStatisticsCollector c = new NoDictColumnStatisticsCollector("col", + newConfig(FieldSpec.DataType.BYTES, false)); + for (byte[][] r : rows) { + c.collect(r); + } + c.seal(); + + BytesColumnPredIndexStatsCollector bytesStats = new BytesColumnPredIndexStatsCollector("col", + newConfig(FieldSpec.DataType.BYTES, false)); + for (byte[][] r : rows) { + bytesStats.collect(r); + } + bytesStats.seal(); + + assertEquals(c.getCardinality(), bytesStats.getCardinality()); + assertEquals(c.getMinValue(), bytesStats.getMinValue()); + assertEquals(c.getMaxValue(), bytesStats.getMaxValue()); + assertEquals(c.getTotalNumberOfEntries(), 4); + assertEquals(c.getMaxNumberOfMultiValues(), 2); + assertFalse(c.isSorted()); + assertEquals(c.getLengthOfShortestElement(), bytesStats.getLengthOfShortestElement()); + assertEquals(c.getLengthOfLargestElement(), bytesStats.getLengthOfLargestElement()); + assertEquals(c.getMaxRowLengthInBytes(), bytesStats.getMaxRowLengthInBytes()); + assertEquals(c.getPartitions(), bytesStats.getPartitions()); + } + + @Test + public void testMVBigDecimal() + throws Exception { + NoDictColumnStatisticsCollector c = new NoDictColumnStatisticsCollector("col", + newConfig(FieldSpec.DataType.BIG_DECIMAL, false)); + try { + c.collect(new Object[] {new BigDecimal("1.1"), new BigDecimal("2.2")}); + fail("Expected UnsupportedOperationException"); + } catch (UnsupportedOperationException expected) { + // expected: BigDecimal MV not supported by NoDict collector by design + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImplTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImplTest.java new file mode 100644 index 000000000000..caf1c986f1a9 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImplTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.stats; + +import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class SegmentPreIndexStatsCollectorImplTest { + private StatsCollectorConfig newConfig(Schema schema, TableConfig tableConfig) { + return new StatsCollectorConfig(tableConfig, schema, null); + } + + @Test + public void testNoDictCollector() { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("c1", FieldSpec.DataType.STRING).build(); + TableConfig tableConfig = new TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE) + .setTableName("t").setNoDictionaryColumns(java.util.List.of("c1")).build(); + SegmentPreIndexStatsCollectorImpl impl = new SegmentPreIndexStatsCollectorImpl(newConfig(schema, tableConfig)); + impl.init(); + assertTrue(impl.getColumnProfileFor("c1") instanceof NoDictColumnStatisticsCollector); + } + + @Test + public void testNoDictCollectorDisabled() { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("c1", FieldSpec.DataType.STRING).build(); + TableConfig tableConfig = new TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE) + .setTableName("t").setNoDictionaryColumns(java.util.List.of("c1")) + .setOptimiseNoDictStatsCollection(false).build(); + SegmentPreIndexStatsCollectorImpl impl = new SegmentPreIndexStatsCollectorImpl(newConfig(schema, tableConfig)); + impl.init(); + assertTrue(impl.getColumnProfileFor("c1") instanceof StringColumnPreIndexStatsCollector); + } + + @Test + public void testDictCollector() { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("c1", FieldSpec.DataType.STRING).build(); + TableConfig tableConfig = new TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE) + .setTableName("t").build(); + SegmentPreIndexStatsCollectorImpl impl = new SegmentPreIndexStatsCollectorImpl(newConfig(schema, tableConfig)); + impl.init(); + assertTrue(impl.getColumnProfileFor("c1") instanceof StringColumnPreIndexStatsCollector); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java index 2b8a59b5a7e0..83cb02a81a06 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java @@ -22,6 +22,7 @@ import java.net.URL; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -46,25 +47,32 @@ import org.apache.pinot.segment.spi.partition.BoundedColumnValuePartitionFunction; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.env.CommonsConfigurationUtils; import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER; +import static org.testng.Assert.*; public class ColumnMetadataTest { private static final String AVRO_DATA = "data/test_data-mv.avro"; private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ColumnMetadataTest"); private static final String CREATOR_VERSION = "TestHadoopJar.1.1.1"; + private static final String RAW_TABLE_NAME = "testTable"; @BeforeMethod public void setUp() @@ -77,67 +85,102 @@ public void tearDown() { FileUtils.deleteQuietly(INDEX_DIR); } - public SegmentGeneratorConfig createSegmentConfigWithoutCreator() - throws Exception { - final String filePath = - TestUtils.getFileFromResourceUrl(ColumnMetadataTest.class.getClassLoader().getResource(AVRO_DATA)); - // Intentionally changed this to TimeUnit.Hours to make it non-default for testing. + public SegmentGeneratorConfig createSegmentConfigWithoutCreator() { + URL resource = getClass().getClassLoader().getResource(AVRO_DATA); + assertNotNull(resource); + String filePath = TestUtils.getFileFromResourceUrl(resource); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNoDictionaryColumns(List.of("column4", "column7")) + .build(); + Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) + .addSingleValueDimension("column3", DataType.STRING) + .addSingleValueDimension("column4", DataType.STRING) + .addMultiValueDimension("column6", DataType.INT) + .addMultiValueDimension("column7", DataType.INT) + .addDateTime("daysSinceEpoch", DataType.INT, "EPOCH|HOURS", "1:HOURS") + .build(); SegmentGeneratorConfig config = - SegmentTestUtils.getSegmentGenSpecWithSchemAndProjectedColumns(new File(filePath), INDEX_DIR, "daysSinceEpoch", - TimeUnit.HOURS, "testTable"); + SegmentTestUtils.getSegmentGeneratorConfig(new File(filePath), FileFormat.AVRO, INDEX_DIR, RAW_TABLE_NAME, + tableConfig, schema); config.setSegmentNamePostfix("1"); return config; } - public SegmentGeneratorConfig createSegmentConfigWithCreator() - throws Exception { + public SegmentGeneratorConfig createSegmentConfigWithCreator() { SegmentGeneratorConfig config = createSegmentConfigWithoutCreator(); config.setCreatorVersion(CREATOR_VERSION); return config; } public void verifySegmentAfterLoading(SegmentMetadata segmentMetadata) { - // Multi-value numeric dimension column. - ColumnMetadata col7Meta = segmentMetadata.getColumnMetadataFor("column7"); - Assert.assertEquals(col7Meta.getFieldSpec(), new DimensionFieldSpec("column7", DataType.INT, false)); - Assert.assertEquals(col7Meta.getCardinality(), 359); - Assert.assertEquals(col7Meta.getTotalDocs(), 100000); - Assert.assertEquals(col7Meta.getBitsPerElement(), 9); - Assert.assertEquals(col7Meta.getColumnMaxLength(), 0); - Assert.assertFalse(col7Meta.isSorted()); - Assert.assertTrue(col7Meta.hasDictionary()); - Assert.assertEquals(col7Meta.getMaxNumberOfMultiValues(), 24); - Assert.assertEquals(col7Meta.getTotalNumberOfEntries(), 134090); - Assert.assertFalse(col7Meta.isAutoGenerated()); - - // Single-value string dimension column. + // Single-value dictionary-encoded string dimension column ColumnMetadata col3Meta = segmentMetadata.getColumnMetadataFor("column3"); - Assert.assertEquals(col3Meta.getFieldSpec(), + assertEquals(col3Meta.getFieldSpec(), new DimensionFieldSpec("column3", DataType.STRING, true, FieldSpec.DEFAULT_MAX_LENGTH, null)); - Assert.assertEquals(col3Meta.getCardinality(), 5); - Assert.assertEquals(col3Meta.getTotalDocs(), 100000); - Assert.assertEquals(col3Meta.getBitsPerElement(), 3); - Assert.assertEquals(col3Meta.getColumnMaxLength(), 4); - Assert.assertFalse(col3Meta.isSorted()); - Assert.assertTrue(col3Meta.hasDictionary()); - Assert.assertEquals(col3Meta.getMaxNumberOfMultiValues(), 0); - Assert.assertEquals(col3Meta.getTotalNumberOfEntries(), 100000); - Assert.assertFalse(col3Meta.isAutoGenerated()); - - // Time column. - // FIXME: Currently it is modeled as dimension in the auto-generated schema + assertEquals(col3Meta.getCardinality(), 5); + assertEquals(col3Meta.getTotalDocs(), 100000); + assertEquals(col3Meta.getBitsPerElement(), 3); + assertEquals(col3Meta.getColumnMaxLength(), 4); + assertFalse(col3Meta.isSorted()); + assertTrue(col3Meta.hasDictionary()); + assertEquals(col3Meta.getMaxNumberOfMultiValues(), 0); + assertEquals(col3Meta.getTotalNumberOfEntries(), 100000); + assertFalse(col3Meta.isAutoGenerated()); + + // Single-value raw string dimension column + ColumnMetadata col4Meta = segmentMetadata.getColumnMetadataFor("column4"); + assertEquals(col4Meta.getFieldSpec(), + new DimensionFieldSpec("column4", DataType.STRING, true, FieldSpec.DEFAULT_MAX_LENGTH, null)); + assertEquals(col4Meta.getCardinality(), 5); + assertEquals(col4Meta.getTotalDocs(), 100000); + assertEquals(col4Meta.getBitsPerElement(), 3); + assertEquals(col4Meta.getColumnMaxLength(), 0); + assertFalse(col4Meta.isSorted()); + assertFalse(col4Meta.hasDictionary()); + assertEquals(col4Meta.getMaxNumberOfMultiValues(), 0); + assertEquals(col4Meta.getTotalNumberOfEntries(), 100000); + assertFalse(col4Meta.isAutoGenerated()); + + // Multi-value dictionary-encoded int dimension column + ColumnMetadata col6Meta = segmentMetadata.getColumnMetadataFor("column6"); + assertEquals(col6Meta.getFieldSpec(), new DimensionFieldSpec("column6", DataType.INT, false)); + assertEquals(col6Meta.getCardinality(), 18499); + assertEquals(col6Meta.getTotalDocs(), 100000); + assertEquals(col6Meta.getBitsPerElement(), 15); + assertEquals(col6Meta.getColumnMaxLength(), 0); + assertFalse(col6Meta.isSorted()); + assertTrue(col6Meta.hasDictionary()); + assertEquals(col6Meta.getMaxNumberOfMultiValues(), 13); + assertEquals(col6Meta.getTotalNumberOfEntries(), 106688); + assertFalse(col6Meta.isAutoGenerated()); + + // Multi-value raw int dimension column + ColumnMetadata col7Meta = segmentMetadata.getColumnMetadataFor("column7"); + assertEquals(col7Meta.getFieldSpec(), new DimensionFieldSpec("column7", DataType.INT, false)); + assertEquals(col7Meta.getCardinality(), 359); + assertEquals(col7Meta.getTotalDocs(), 100000); + assertEquals(col7Meta.getBitsPerElement(), 9); + assertEquals(col7Meta.getColumnMaxLength(), 0); + assertFalse(col7Meta.isSorted()); + assertFalse(col7Meta.hasDictionary()); + assertEquals(col7Meta.getMaxNumberOfMultiValues(), 24); + assertEquals(col7Meta.getTotalNumberOfEntries(), 134090); + assertFalse(col7Meta.isAutoGenerated()); + + // Date-time column ColumnMetadata timeColumn = segmentMetadata.getColumnMetadataFor("daysSinceEpoch"); - Assert.assertEquals(timeColumn.getFieldSpec(), new DimensionFieldSpec("daysSinceEpoch", DataType.INT, true)); - Assert.assertEquals(timeColumn.getColumnName(), "daysSinceEpoch"); - Assert.assertEquals(timeColumn.getCardinality(), 1); - Assert.assertEquals(timeColumn.getTotalDocs(), 100000); - Assert.assertEquals(timeColumn.getBitsPerElement(), 1); - Assert.assertEquals(timeColumn.getColumnMaxLength(), 0); - Assert.assertTrue(timeColumn.isSorted()); - Assert.assertTrue(timeColumn.hasDictionary()); - Assert.assertEquals(timeColumn.getMaxNumberOfMultiValues(), 0); - Assert.assertEquals(timeColumn.getTotalNumberOfEntries(), 100000); - Assert.assertFalse(timeColumn.isAutoGenerated()); + assertEquals(timeColumn.getFieldSpec(), + new DateTimeFieldSpec("daysSinceEpoch", DataType.INT, "EPOCH|HOURS", "1:HOURS")); + assertEquals(timeColumn.getColumnName(), "daysSinceEpoch"); + assertEquals(timeColumn.getCardinality(), 1); + assertEquals(timeColumn.getTotalDocs(), 100000); + assertEquals(timeColumn.getBitsPerElement(), 1); + assertEquals(timeColumn.getColumnMaxLength(), 0); + assertTrue(timeColumn.isSorted()); + assertTrue(timeColumn.hasDictionary()); + assertEquals(timeColumn.getMaxNumberOfMultiValues(), 0); + assertEquals(timeColumn.getTotalNumberOfEntries(), 100000); + assertFalse(timeColumn.isAutoGenerated()); } @Test @@ -155,7 +198,7 @@ public void testAllFieldsInitialized() // Make sure we got the creator name as well. String creatorName = segmentMetadata.getCreatorName(); - Assert.assertEquals(creatorName, CREATOR_VERSION); + assertEquals(creatorName, CREATOR_VERSION); } @Test @@ -172,7 +215,7 @@ public void testAllFieldsExceptCreatorName() verifySegmentAfterLoading(segmentMetadata); // Make sure we get null for creator name. - Assert.assertNull(segmentMetadata.getCreatorName()); + assertNull(segmentMetadata.getCreatorName()); } @Test @@ -208,15 +251,15 @@ public void testSegmentPartitionedWithBoundedColumnValue() SegmentMetadata segmentMetadata = new SegmentMetadataImpl(INDEX_DIR.listFiles()[0]); verifySegmentAfterLoading(segmentMetadata); // Make sure we get null for creator name. - Assert.assertNull(segmentMetadata.getCreatorName()); + assertNull(segmentMetadata.getCreatorName()); // Verify segment partitioning metadata. ColumnMetadata col3Meta = segmentMetadata.getColumnMetadataFor("column3"); - Assert.assertNotNull(col3Meta.getPartitionFunction()); - Assert.assertTrue(col3Meta.getPartitionFunction() instanceof BoundedColumnValuePartitionFunction); - Assert.assertEquals(col3Meta.getPartitionFunction().getNumPartitions(), 4); - Assert.assertEquals(col3Meta.getPartitionFunction().getFunctionConfig(), functionConfig); - Assert.assertEquals(col3Meta.getPartitions(), Stream.of(0, 1, 2, 3).collect(Collectors.toSet())); + assertNotNull(col3Meta.getPartitionFunction()); + assertTrue(col3Meta.getPartitionFunction() instanceof BoundedColumnValuePartitionFunction); + assertEquals(col3Meta.getPartitionFunction().getNumPartitions(), 4); + assertEquals(col3Meta.getPartitionFunction().getFunctionConfig(), functionConfig); + assertEquals(col3Meta.getPartitions(), Stream.of(0, 1, 2, 3).collect(Collectors.toSet())); } @Test @@ -229,16 +272,15 @@ public void testMetadataWithEscapedValue() PropertiesConfiguration propertiesConfiguration = CommonsConfigurationUtils.fromFile(metadataFile); ColumnMetadataImpl installationOutput = ColumnMetadataImpl.fromPropertiesConfiguration("installation_output", propertiesConfiguration); - Assert.assertEquals(installationOutput.getMinValue(), + assertEquals(installationOutput.getMinValue(), "\r\n\r\n utils em::C:\\dir\\utils\r\nPSParentPath : Mi"); } @Test public void testComplexFieldSpec() { - ComplexFieldSpec intMapFieldSpec = new ComplexFieldSpec("intMap", DataType.MAP, true, Map.of( - "key", new DimensionFieldSpec("key", DataType.STRING, true), - "value", new DimensionFieldSpec("value", DataType.INT, true) - )); + ComplexFieldSpec intMapFieldSpec = new ComplexFieldSpec("intMap", DataType.MAP, true, + Map.of("key", new DimensionFieldSpec("key", DataType.STRING, true), "value", + new DimensionFieldSpec("value", DataType.INT, true))); ColumnIndexCreationInfo columnIndexCreationInfo = new ColumnIndexCreationInfo(new DefaultColumnStatistics(null, null, null, false, 1, 1), false, false, false, Map.of()); @@ -247,7 +289,7 @@ public void testComplexFieldSpec() { SegmentColumnarIndexCreator.addColumnMetadataInfo(config, "intMap", columnIndexCreationInfo, 1, intMapFieldSpec, false, -1); ColumnMetadataImpl intMapColumnMetadata = ColumnMetadataImpl.fromPropertiesConfiguration("intMap", config); - Assert.assertEquals(intMapColumnMetadata.getFieldSpec(), intMapFieldSpec); + assertEquals(intMapColumnMetadata.getFieldSpec(), intMapFieldSpec); } @Test @@ -257,18 +299,17 @@ public void testSetAndCheckIndexSizes() { meta.addIndexSize(IndexService.getInstance().getNumericId(StandardIndexes.h3()), 0xffffffffffffL); meta.addIndexSize(IndexService.getInstance().getNumericId(StandardIndexes.vector()), 0); - Assert.assertEquals(meta.getNumIndexes(), 3); - Assert.assertEquals(meta.getIndexSizeFor(StandardIndexes.json()), 12345L); - Assert.assertEquals(meta.getIndexSizeFor(StandardIndexes.h3()), 0xffffffffffffL); - Assert.assertEquals(meta.getIndexSizeFor(StandardIndexes.vector()), 0); - Assert.assertEquals(meta.getIndexSizeFor(StandardIndexes.inverted()), ColumnMetadata.INDEX_NOT_FOUND); + assertEquals(meta.getNumIndexes(), 3); + assertEquals(meta.getIndexSizeFor(StandardIndexes.json()), 12345L); + assertEquals(meta.getIndexSizeFor(StandardIndexes.h3()), 0xffffffffffffL); + assertEquals(meta.getIndexSizeFor(StandardIndexes.vector()), 0); + assertEquals(meta.getIndexSizeFor(StandardIndexes.inverted()), ColumnMetadata.INDEX_NOT_FOUND); try { meta.addIndexSize(IndexService.getInstance().getNumericId(StandardIndexes.fst()), -1); - Assert.fail(); + fail(); } catch (IllegalArgumentException e) { - Assert.assertEquals(e.getMessage(), - "Index size should be a non-negative integer value between 0 and 281474976710655"); + assertEquals(e.getMessage(), "Index size should be a non-negative integer value between 0 and 281474976710655"); } } @@ -281,13 +322,14 @@ public void testBadTimeColumnWithoutContinueOnError() config.setTimeColumnName("column4"); SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null); driver.init(config); - Assert.assertThrows(NumberFormatException.class, driver::build); + assertThrows(NumberFormatException.class, driver::build); } @Test public void testBadTimeColumnWithContinueOnError() throws Exception { SegmentGeneratorConfig config = createSegmentConfigWithCreator(); + // column4 is not a time column and should cause an exception to be thrown when the segment is sealed and time // metadata is being parsed and written config.setTimeColumnName("column4"); @@ -295,12 +337,10 @@ public void testBadTimeColumnWithContinueOnError() SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null); driver.init(config); driver.build(); + SegmentMetadata segmentMetadata = new SegmentMetadataImpl(INDEX_DIR.listFiles()[0]); - // The time unit being used is hours since epoch. - long hoursSinceEpoch = System.currentTimeMillis() / TimeUnit.HOURS.toMillis(1); - // Use tolerance of 1 hour to eliminate any flakiness in the test due to time boundaries. - Assert.assertTrue(hoursSinceEpoch - segmentMetadata.getEndTime() <= 1); - Assert.assertEquals(segmentMetadata.getStartTime(), - TimeUnit.MILLISECONDS.toHours(TimeUtils.getValidMinTimeMillis())); + assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS); + assertEquals(segmentMetadata.getStartTime(), TimeUtils.getValidMinTimeMillis()); + assertTrue(System.currentTimeMillis() - segmentMetadata.getEndTime() < 60_000L); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java index c7d7b733aa47..1e626f493a8b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java @@ -298,6 +298,8 @@ public class ForwardIndexHandlerTest { //@formatter:on private static final Random RANDOM = new Random(); + private static int _cardinalityOfSvEntries; + private static int _cardinalityOfMvEntries; private static final List TEST_DATA; @@ -326,21 +328,23 @@ public class ForwardIndexHandlerTest { Long[][] tempMVLongRowsForwardIndexDisabled = new Long[numRows][maxNumberOfMVEntries]; byte[][][] tempMVByteRowsForwardIndexDisabled = new byte[numRows][maxNumberOfMVEntries][]; + _cardinalityOfSvEntries = 1; // Start with 1 to account for static "testRow" entry + _cardinalityOfMvEntries = maxNumberOfMVEntries + 1; // Add 1 to account for static "testRow" entry for (int i = 0; i < numRows; i++) { // Adding a fixed value to check for filter queries if (i % 10 == 0) { String str = "testRow"; tempStringRows[i] = str; - tempIntRows[i] = 1001; - tempLongRows[i] = 1001L; + tempIntRows[i] = numRows + 1; + tempLongRows[i] = (long) (numRows + 1); tempBytesRows[i] = str.getBytes(); - tempBigDecimalRows[i] = BigDecimal.valueOf(1001); + tempBigDecimalRows[i] = BigDecimal.valueOf(numRows + 1); // Avoid creating empty arrays. int numMVElements = RANDOM.nextInt(maxNumberOfMVEntries) + 1; for (int j = 0; j < numMVElements; j++) { - tempMVIntRows[i][j] = 1001; - tempMVLongRows[i][j] = 1001L; + tempMVIntRows[i][j] = numRows + 1; + tempMVLongRows[i][j] = (long) (numRows + 1); tempMVStringRows[i][j] = str; tempMVByteRows[i][j] = str.getBytes(); } @@ -351,9 +355,11 @@ public class ForwardIndexHandlerTest { tempLongRows[i] = (long) i; tempBytesRows[i] = str.getBytes(); tempBigDecimalRows[i] = BigDecimal.valueOf(i); + _cardinalityOfSvEntries += 1; // Avoid creating empty arrays. - int numMVElements = RANDOM.nextInt(maxNumberOfMVEntries) + 1; + // To test total cardinality, for atleast 1 row, have the number of MV entries = maxNumberOfMVEntries + int numMVElements = (i == 1) ? maxNumberOfMVEntries : (RANDOM.nextInt(maxNumberOfMVEntries) + 1); for (int j = 0; j < numMVElements; j++) { tempMVIntRows[i][j] = j; tempMVLongRows[i][j] = (long) j; @@ -364,7 +370,7 @@ public class ForwardIndexHandlerTest { // Populate data for the MV columns with forward index disabled to have unique entries per row. // Avoid creating empty arrays. - int numMVElements = RANDOM.nextInt(maxNumberOfMVEntries) + 1; + int numMVElements = (i == 1) ? maxNumberOfMVEntries : (RANDOM.nextInt(maxNumberOfMVEntries) + 1); for (int j = 0; j < numMVElements; j++) { String str = "n" + i + j; tempMVIntRowsForwardIndexDisabled[i][j] = j; @@ -1241,7 +1247,11 @@ public void testEnableDictionaryForSingleColumn() } else if (dataType == DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(column, true, dictionaryElementSize, metadata.getCardinality(), + int expectedCardinality = column.equals(DIM_MV_PASS_THROUGH_INTEGER) || column.equals(DIM_MV_PASS_THROUGH_LONG) + ? _cardinalityOfMvEntries : _cardinalityOfSvEntries; + // DIM_RAW_SORTED_INTEGER has all unique values + expectedCardinality = column.equals(DIM_RAW_SORTED_INTEGER) ? TEST_DATA.size() : expectedCardinality; + validateMetadataProperties(column, true, dictionaryElementSize, expectedCardinality, metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); @@ -1283,7 +1293,11 @@ public void testEnableDictionaryForMultipleColumns() } else if (dataType == DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(column1, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + int expectedCardinality = column1.equals(DIM_MV_PASS_THROUGH_INTEGER) || column1.equals(DIM_MV_PASS_THROUGH_LONG) + ? _cardinalityOfMvEntries : _cardinalityOfSvEntries; + // DIM_RAW_SORTED_INTEGER has all unique values + expectedCardinality = column1.equals(DIM_RAW_SORTED_INTEGER) ? TEST_DATA.size() : expectedCardinality; + validateMetadataProperties(column1, true, dictionaryElementSize, expectedCardinality, metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); @@ -1303,7 +1317,11 @@ public void testEnableDictionaryForMultipleColumns() } else if (dataType == DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(column2, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + expectedCardinality = column2.equals(DIM_MV_PASS_THROUGH_INTEGER) || column2.equals(DIM_MV_PASS_THROUGH_LONG) + ? _cardinalityOfMvEntries : _cardinalityOfSvEntries; + // DIM_RAW_SORTED_INTEGER has all unique values + expectedCardinality = column2.equals(DIM_RAW_SORTED_INTEGER) ? TEST_DATA.size() : expectedCardinality; + validateMetadataProperties(column2, true, dictionaryElementSize, expectedCardinality, metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); @@ -1498,7 +1516,9 @@ public void testDisableForwardIndexForSingleRawColumn() } else if (dataType == DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(column, true, dictionaryElementSize, metadata.getCardinality(), + int expectedCardinality = column.equals(DIM_MV_PASS_THROUGH_INTEGER) || column.equals(DIM_MV_PASS_THROUGH_LONG) + ? _cardinalityOfMvEntries : _cardinalityOfSvEntries; + validateMetadataProperties(column, true, dictionaryElementSize, expectedCardinality, metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); @@ -1544,7 +1564,9 @@ public void testDisableForwardIndexForMultipleRawColumns() } else if (dataType == DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(column1, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + int expectedCardinality = column1.equals(DIM_MV_PASS_THROUGH_INTEGER) || column1.equals(DIM_MV_PASS_THROUGH_LONG) + ? _cardinalityOfMvEntries : _cardinalityOfSvEntries; + validateMetadataProperties(column1, true, dictionaryElementSize, expectedCardinality, metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); @@ -1562,7 +1584,9 @@ public void testDisableForwardIndexForMultipleRawColumns() } else if (dataType == DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(column2, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + expectedCardinality = column2.equals(DIM_MV_PASS_THROUGH_INTEGER) || column2.equals(DIM_MV_PASS_THROUGH_LONG) + ? _cardinalityOfMvEntries : _cardinalityOfSvEntries; + validateMetadataProperties(column2, true, dictionaryElementSize, expectedCardinality, metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); @@ -1637,7 +1661,9 @@ public void testDisableForwardIndexForInvertedIndexDisabledColumns() } else if (dataType == DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(column, true, dictionaryElementSize, metadata.getCardinality(), + int expectedCardinality = column.equals(DIM_MV_PASS_THROUGH_INTEGER) || column.equals(DIM_MV_PASS_THROUGH_LONG) + ? _cardinalityOfMvEntries : _cardinalityOfSvEntries; + validateMetadataProperties(column, true, dictionaryElementSize, expectedCardinality, metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index f53f3ba35698..a114577511e4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -386,12 +386,14 @@ public void testEnableFSTIndexOnExistingColumnDictEncoded(SegmentVersion segment @Test public void testSimpleEnableDictionarySV() throws Exception { + int approxCardinality = 44319; // derived via ULL in NoDictColumnStatisticsCollector // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for all existing raw columns. buildV1Segment(); checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, ChunkCompressionType.LZ4, true, 0, DataType.STRING, 100000); - validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW, 42242, 16, false, false, false, 0, true, 0, - ChunkCompressionType.LZ4, false, DataType.INT, 100000); + // since dictionary is disabled, the cardinality will be approximate cardinality. + validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW, approxCardinality, 16, false, false, false, 0, + true, 0, ChunkCompressionType.LZ4, false, DataType.INT, 100000); // Convert the segment to V3. convertV1SegmentToV3(); @@ -410,9 +412,11 @@ public void testSimpleEnableDictionarySV() @Test public void testSimpleEnableDictionaryMV() throws Exception { + int approxCardinality = 19613; // derived via ULL in NoDictColumnStatisticsCollector // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for all existing raw columns. buildV1Segment(); - checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, false, false, 0, + // since dictionary is disabled, the cardinality will be approximate cardinality. + checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, approxCardinality, 15, _schema, false, false, false, 0, ChunkCompressionType.LZ4, false, 13, DataType.INT, 106688); // Convert the segment to V3. @@ -427,6 +431,8 @@ public void testSimpleEnableDictionaryMV() @Test public void testEnableDictAndOtherIndexesSV() throws Exception { + int approxCardinality = 44319; // derived via ULL in NoDictColumnStatisticsCollector + // TEST 1: EXISTING_STRING_COL_RAW. Enable dictionary. Also add inverted index and text index. Reload code path // will create dictionary, inverted index and text index. buildV3Segment(); @@ -465,7 +471,8 @@ public void testEnableDictAndOtherIndexesSV() resetIndexConfigs(); _rangeIndexColumns.add(EXISTING_INT_COL_RAW); buildV3Segment(); - validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW, 42242, 16, false, false, false, 0, true, 0, + // Since dictionary is disabled, the cardinality will be approximate cardinality. + validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW, approxCardinality, 16, false, false, false, 0, true, 0, ChunkCompressionType.LZ4, false, DataType.INT, 100000); long oldRangeIndexSize = new SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(EXISTING_INT_COL_RAW) .getIndexSizeFor(StandardIndexes.range()); @@ -484,6 +491,8 @@ public void testEnableDictAndOtherIndexesSV() @Test public void testEnableDictAndOtherIndexesMV() throws Exception { + int approxCardinality = 19613; // derived via ULL in NoDictColumnStatisticsCollector + // TEST 1: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column. Also enable inverted index and range index. buildV3Segment(); _noDictionaryColumns.remove(EXISTING_INT_COL_RAW_MV); @@ -500,10 +509,11 @@ public void testEnableDictAndOtherIndexesMV() resetIndexConfigs(); _rangeIndexColumns.add(EXISTING_INT_COL_RAW_MV); buildV3Segment(); - validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW_MV, 18499, 15, false, false, false, 0, false, 13, - ChunkCompressionType.LZ4, false, DataType.INT, 106688); - validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW_MV, 18499, 15, false, false, false, 0, false, 13, - ChunkCompressionType.LZ4, false, DataType.INT, 106688); + // Since dictionary is disabled, the cardinality will be approximate cardinality. + validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW_MV, approxCardinality, 15, false, false, false, 0, + false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688); + validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW_MV, approxCardinality, 15, false, false, false, 0, + false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688); // Enable dictionary. _noDictionaryColumns.remove(EXISTING_INT_COL_RAW_MV); @@ -623,6 +633,8 @@ public void testDisableDictAndOtherIndexesMV() @Test public void testForwardIndexHandlerChangeCompression() throws Exception { + int approximateCardinality = 19613; // derived via ULL in NoDictColumnStatisticsCollector + // Test1: Rewriting forward index will be a no-op for v1 segments. Default LZ4 compressionType will be retained. buildV1Segment(); _fieldConfigMap.put(EXISTING_STRING_COL_RAW, @@ -665,7 +677,8 @@ public void testForwardIndexHandlerChangeCompression() _fieldConfigMap.put(EXISTING_INT_COL_RAW_MV, new FieldConfig(EXISTING_INT_COL_RAW_MV, FieldConfig.EncodingType.RAW, List.of(), CompressionCodec.ZSTANDARD, null)); - checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, false, false, 0, + // Since dictionary is disabled, the cardinality will be approximate cardinality. + checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, approximateCardinality, 15, _schema, false, false, false, 0, ChunkCompressionType.ZSTANDARD, false, 13, DataType.INT, 106688); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java index 9e47502098ef..5bd2e4db73dc 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java @@ -23,6 +23,7 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.NotImplementedException; import org.apache.pinot.segment.spi.Constants; import org.apache.pinot.segment.spi.partition.PartitionFunction; @@ -64,11 +65,23 @@ public Object getMax() { } public Object getSortedUniqueElementsArray() { - return _columnStatistics.getUniqueValuesSet(); + try { + return _columnStatistics.getUniqueValuesSet(); + } catch (NotImplementedException e) { + return null; + } } public int getDistinctValueCount() { - Object uniqueValArray = _columnStatistics.getUniqueValuesSet(); + Object uniqueValArray; + try { + uniqueValArray = _columnStatistics.getUniqueValuesSet(); + } catch (NotImplementedException e) { + // For no-dictionary columns, we don't retain unique values in collectors to save memory. + // Fall back to the collectors' cardinality (tracked as total entries) so downstream components retain + // a non-negative effective cardinality for optimizations like scan-based AND reordering. + return _columnStatistics.getCardinality(); + } if (uniqueValArray == null) { return Constants.UNKNOWN_CARDINALITY; } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index eafc92484a8f..706d1ce360ec 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -150,11 +150,10 @@ public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema, boolean cr // NOTE: SegmentGeneratorConfig#setSchema doesn't set the time column anymore. timeColumnName is expected to be // read from table config. - String timeColumnName = null; - if (tableConfig.getValidationConfig() != null) { - timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); + String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); + if (timeColumnName != null) { + setTime(timeColumnName, schema); } - setTime(timeColumnName, schema); IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); String segmentVersion = indexingConfig.getSegmentFormatVersion(); @@ -229,14 +228,12 @@ public Map> getColumnProperties() { /** * Set time column details using the given time column */ - private void setTime(@Nullable String timeColumnName, Schema schema) { - if (timeColumnName != null) { - DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName); - if (dateTimeFieldSpec != null) { - _segmentTimeColumnDataType = dateTimeFieldSpec.getDataType(); - setTimeColumnName(dateTimeFieldSpec.getName()); - setDateTimeFormatSpec(dateTimeFieldSpec.getFormatSpec()); - } + private void setTime(String timeColumnName, Schema schema) { + DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName); + if (dateTimeFieldSpec != null) { + _segmentTimeColumnDataType = dateTimeFieldSpec.getDataType(); + setTimeColumnName(dateTimeFieldSpec.getName()); + setDateTimeFormatSpec(dateTimeFieldSpec.getFormatSpec()); } } @@ -260,6 +257,7 @@ public void setDateTimeFormatSpec(DateTimeFormatSpec formatSpec) { } } + @Nullable public DateTimeFormatSpec getDateTimeFormatSpec() { return _dateTimeFormatSpec; } @@ -387,6 +385,7 @@ public void setSequenceId(int sequenceId) { _sequenceId = sequenceId; } + @Nullable public TimeUnit getSegmentTimeUnit() { return _segmentTimeUnit; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java index 85b224c205b9..de4ef87b8499 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java @@ -65,6 +65,9 @@ public class IndexingConfig extends BaseJsonConfig { private boolean _nullHandlingEnabled; private boolean _columnMajorSegmentBuilderEnabled = true; private boolean _skipSegmentPreprocess; + // Use NoDictColumnStatisticsCollector for stats collection for no-dictionary columns + // We can deprecate this config once we are confident about the stability of NoDictColumnStatisticsCollector + private boolean _optimiseNoDictStatsCollection = true; /** * If `optimizeDictionary` enabled, dictionary is not created for the high-cardinality @@ -354,6 +357,14 @@ public void setSkipSegmentPreprocess(boolean skipSegmentPreprocess) { _skipSegmentPreprocess = skipSegmentPreprocess; } + public boolean canOptimiseNoDictStatsCollection() { + return _optimiseNoDictStatsCollection; + } + + public void setOptimiseNoDictStatsCollection(boolean optimiseNoDictStatsCollection) { + _optimiseNoDictStatsCollection = optimiseNoDictStatsCollection; + } + public boolean isOptimizeDictionary() { return _optimizeDictionary; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java index dbb92090d177..83f5ef67c555 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java @@ -191,7 +191,7 @@ public ObjectNode toJsonObject() { @Override public String toString() { - return "< field type: DATE_TIME, field name: " + _name + ", datatype: " + _dataType + ", time column format: " + return "< field type: DATE_TIME, field name: " + _name + ", data type: " + _dataType + ", time column format: " + _format + ", time field granularity: " + _granularity + " >"; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java index e5388131f860..0b6e6cce6f31 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java @@ -107,6 +107,7 @@ public class TableConfigBuilder { private boolean _nullHandlingEnabled; private boolean _columnMajorSegmentBuilderEnabled = true; private boolean _skipSegmentPreprocess; + private boolean _optimiseNoDictStatsCollection = true; private List _varLengthDictionaryColumns; private List _starTreeIndexConfigs; private List _jsonIndexColumns; @@ -381,6 +382,11 @@ public TableConfigBuilder setSkipSegmentPreprocess(boolean skipSegmentPreprocess return this; } + public TableConfigBuilder setOptimiseNoDictStatsCollection(boolean optimiseNoDictStatsCollection) { + _optimiseNoDictStatsCollection = optimiseNoDictStatsCollection; + return this; + } + public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) { _customConfig = customConfig; return this; @@ -511,6 +517,7 @@ public TableConfig build() { indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled); indexingConfig.setColumnMajorSegmentBuilderEnabled(_columnMajorSegmentBuilderEnabled); indexingConfig.setSkipSegmentPreprocess(_skipSegmentPreprocess); + indexingConfig.setOptimiseNoDictStatsCollection(_optimiseNoDictStatsCollection); indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns); indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs); indexingConfig.setMultiColumnTextIndexConfig(_multiColumnTextIndexConfig); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java index 065bd27d85fa..a9406a012e23 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java @@ -34,6 +34,9 @@ import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.V1Constants.MetadataKeys; +import org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column; +import org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; @@ -252,20 +255,19 @@ public boolean convertSegment(File segmentDir, String[] columns, File outputDir, */ private void updateMetadata(File segmentDir, String[] columns, String tableName) throws ConfigurationException { - File metadataFile = new File(segmentDir, V1Constants.MetadataKeys.METADATA_FILE_NAME); + File metadataFile = new File(segmentDir, MetadataKeys.METADATA_FILE_NAME); PropertiesConfiguration properties = CommonsConfigurationUtils.fromFile(metadataFile); if (tableName != null) { - properties - .setProperty(V1Constants.MetadataKeys.Segment.TABLE_NAME, TableNameBuilder.extractRawTableName(tableName)); + properties.setProperty(Segment.TABLE_NAME, TableNameBuilder.extractRawTableName(tableName)); } for (String column : columns) { - properties.setProperty( - V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.HAS_DICTIONARY), false); - properties.setProperty( - V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.BITS_PER_ELEMENT), -1); + properties.setProperty(Column.getKeyFor(column, Column.HAS_DICTIONARY), false); + properties.setProperty(Column.getKeyFor(column, Column.DICTIONARY_ELEMENT_SIZE), 0); + properties.setProperty(Column.getKeyFor(column, Column.BITS_PER_ELEMENT), -1); } + CommonsConfigurationUtils.saveToFile(properties, metadataFile); }