diff --git a/pinot-plugins/pinot-input-format/pinot-arrow/pom.xml b/pinot-plugins/pinot-input-format/pinot-arrow/pom.xml new file mode 100644 index 000000000000..746c69b49b8a --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-arrow/pom.xml @@ -0,0 +1,74 @@ + + + + 4.0.0 + + pinot-input-format + org.apache.pinot + 1.5.0-SNAPSHOT + + + pinot-arrow + Pinot Arrow + https://pinot.apache.org/ + + ${basedir}/../../.. + package + + + + + org.apache.arrow + arrow-compression + ${arrow.version} + + + org.apache.arrow + arrow-format + ${arrow.version} + + + org.apache.arrow + arrow-memory-core + ${arrow.version} + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + + + + pinot-fastdev + + none + + + + diff --git a/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java new file mode 100644 index 000000000000..297e04638299 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.arrow; + + +import java.io.ByteArrayInputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ArrowMessageDecoder is used to decode Apache Arrow IPC format messages into Pinot GenericRow. + * This decoder handles Arrow streaming format and converts Arrow data to Pinot's columnar format. + */ +public class ArrowMessageDecoder implements StreamMessageDecoder { + public static final String ARROW_ALLOCATOR_LIMIT = "arrow.allocator.limit"; + public static final String DEFAULT_ALLOCATOR_LIMIT = "268435456"; // 256MB default + + private static final Logger logger = LoggerFactory.getLogger(ArrowMessageDecoder.class); + + private String _streamTopicName; + private RootAllocator _allocator; + private ArrowToGenericRowConverter _converter; + + @Override + public void init(Map props, Set fieldsToRead, String topicName) + throws Exception { + _streamTopicName = topicName; + + // Initialize Arrow allocator with configurable memory limit + long allocatorLimit = + Long.parseLong(props.getOrDefault(ARROW_ALLOCATOR_LIMIT, DEFAULT_ALLOCATOR_LIMIT)); + _allocator = new RootAllocator(allocatorLimit); + + // Initialize Arrow to GenericRow converter (processes all fields) + _converter = new ArrowToGenericRowConverter(); + + logger.info( + "Initialized ArrowMessageDecoder for topic: {} with allocator limit: {} bytes", + topicName, + allocatorLimit); + } + + @Nullable + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(payload); + ReadableByteChannel channel = Channels.newChannel(inputStream); + ArrowStreamReader reader = new ArrowStreamReader(channel, _allocator)) { + + // Read the Arrow schema and data + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + if (!reader.loadNextBatch()) { + logger.warn("No data found in Arrow message for topic: {}", _streamTopicName); + return null; + } + + // Convert Arrow data to GenericRow using converter + GenericRow row = _converter.convert(reader, root, destination); + + return row; + } catch (Exception e) { + logger.error( + "Error decoding Arrow message for stream topic {} : {}", + _streamTopicName, + Arrays.toString(payload), + e); + return null; + } + } + + @Nullable + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } + + /** Clean up resources */ + public void close() { + if (_allocator != null) { + try { + _allocator.close(); + } catch (Exception e) { + logger.warn("Error closing Arrow allocator", e); + } + } + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java new file mode 100644 index 000000000000..2cdafc494786 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.arrow; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.dictionary.DictionaryEncoder; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.util.Text; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for converting Apache Arrow VectorSchemaRoot to Pinot {@code GenericRow}. Processes + * all fields and handles multiple rows from Arrow batch. + */ +public class ArrowToGenericRowConverter { + private static final Logger logger = LoggerFactory.getLogger(ArrowToGenericRowConverter.class); + + /** Default constructor that processes all fields from Arrow batch. */ + public ArrowToGenericRowConverter() { + logger.debug("ArrowToGenericRowConverter created for processing all fields"); + } + + /** + * Converts an Arrow VectorSchemaRoot to a Pinot {@code GenericRow}. Processes ALL rows from the + * Arrow batch and stores them as a list using MULTIPLE_RECORDS_KEY. + * + * @param reader ArrowStreamReader containing the data + * @param root Arrow VectorSchemaRoot containing the data + * @param destination Optional destination {@code GenericRow}, will create new if null + * @return {@code GenericRow} containing {@code List} with all converted rows, or null + * if no data available + */ + @Nullable + public GenericRow convert( + ArrowStreamReader reader, VectorSchemaRoot root, GenericRow destination) { + if (root == null) { + logger.warn("Cannot convert null VectorSchemaRoot"); + return null; + } + + if (destination == null) { + destination = new GenericRow(); + } + + int rowCount = root.getRowCount(); + if (rowCount == 0) { + logger.warn("No rows found in Arrow data"); + return destination; + } + + List rows = new ArrayList<>(rowCount); + + // Process all rows from the Arrow batch + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { + GenericRow row = convertSingleRow(reader, root, rowIndex); + if (row != null) { + rows.add(row); + } + } + + if (!rows.isEmpty()) { + // Use Pinot's MULTIPLE_RECORDS_KEY to store the list of rows + destination.putValue(GenericRow.MULTIPLE_RECORDS_KEY, rows); + logger.debug("Converted {} rows from Arrow batch", rows.size()); + } + + return destination; + } + + /** + * Converts a single row from Arrow VectorSchemaRoot. + * + * @param reader ArrowStreamReader containing the data + * @param root Arrow VectorSchemaRoot containing the data + * @param rowIndex Index of the row to convert (0-based) + * @return {@code GenericRow} with converted data, or null if row index is invalid + */ + @Nullable + private GenericRow convertSingleRow( + ArrowStreamReader reader, VectorSchemaRoot root, int rowIndex) { + GenericRow row = new GenericRow(); + int convertedFields = 0; + + // Process all fields in the Arrow schema + for (int i = 0; i < root.getFieldVectors().size(); i++) { + Object value; + + FieldVector fieldVector = root.getFieldVectors().get(i); + String fieldName = fieldVector.getField().getName(); + try { + if (fieldVector.getField().getDictionary() != null) { + long dictionaryId = fieldVector.getField().getDictionary().getId(); + try (ValueVector realFieldVector = + DictionaryEncoder.decode( + fieldVector, reader.getDictionaryVectors().get(dictionaryId))) { + value = realFieldVector.getObject(rowIndex); + } + } else { + value = fieldVector.getObject(rowIndex); + } + if (value != null) { + // Convert Arrow-specific types to Pinot-compatible types + Object pinotCompatibleValue = convertArrowTypeToPinotCompatible(value); + row.putValue(fieldName, pinotCompatibleValue); + convertedFields++; + } + } catch (Exception e) { + logger.error("Error extracting value for field: {} at row {}", fieldName, rowIndex, e); + } + } + + logger.debug("Converted {} fields from Arrow row {} to GenericRow", convertedFields, rowIndex); + return row; + } + + /** + * Converts Arrow-specific data types to Pinot-compatible types. This method handles the + * incompatibility issues between Arrow's native data types and what Pinot expects. + * + * @param value The raw value from Arrow fieldVector.getObject() + * @return A Pinot-compatible version of the value + */ + @Nullable + private Object convertArrowTypeToPinotCompatible(@Nullable Object value) { + if (value == null) { + return null; + } + + // Handle nested List and Map values, including Arrow MapVector's representation + if (value instanceof List) { + List originalList = (List) value; + if (!originalList.isEmpty()) { + boolean looksLikeMapEntries = true; + boolean sawNonNull = false; + for (Object entryObj : originalList) { + if (entryObj == null) { + continue; + } + sawNonNull = true; + if (!(entryObj instanceof Map)) { + looksLikeMapEntries = false; + break; + } + @SuppressWarnings("unchecked") + Map entryMap = (Map) entryObj; + if (!entryMap.containsKey(MapVector.KEY_NAME)) { + looksLikeMapEntries = false; + break; + } + } + if (looksLikeMapEntries && sawNonNull) { + Map flattened = new LinkedHashMap<>(originalList.size()); + for (Object entryObj : originalList) { + if (entryObj == null) { + continue; + } + @SuppressWarnings("unchecked") + Map entryMap = (Map) entryObj; + Object rawKey = entryMap.get(MapVector.KEY_NAME); + Object rawVal = entryMap.get(MapVector.VALUE_NAME); + Object convertedKey = convertArrowTypeToPinotCompatible(rawKey); + Object convertedVal = convertArrowTypeToPinotCompatible(rawVal); + flattened.put(String.valueOf(convertedKey), convertedVal); + } + return flattened; + } + } + + List convertedList = new ArrayList<>(originalList.size()); + for (Object element : originalList) { + convertedList.add(convertArrowTypeToPinotCompatible(element)); + } + return convertedList; + } + + // Handle Arrow Text type -> String conversion + if (value instanceof Text) { + // Arrow VarCharVector.getObject() returns Text objects, but Pinot expects String + return value.toString(); + } + + // Handle Arrow LocalDateTime -> java.sql.Timestamp conversion + if (value instanceof LocalDateTime) { + // Arrow TimeStampMilliVector.getObject() returns LocalDateTime, but Pinot expects + // java.sql.Timestamp objects for proper timestamp handling and native support + LocalDateTime dateTime = (LocalDateTime) value; + return Timestamp.from(dateTime.toInstant(ZoneOffset.UTC)); + } + + // Handle other potential Arrow-specific types that might cause issues + + // For primitive types (Integer, Double, Boolean) and other Java standard types, + // Arrow returns standard Java objects that are already Pinot-compatible + return value; + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java new file mode 100644 index 000000000000..a6b8adc9ca9c --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java @@ -0,0 +1,762 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.arrow; + +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.plugin.inputformat.arrow.util.ArrowTestDataUtil; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class ArrowMessageDecoderTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ArrowMessageDecoderTest.class); + + @Test + public void testArrowMessageDecoderWithDifferentAllocatorLimits() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + // Test with custom allocator limit + Map props = new HashMap<>(); + props.put(ArrowMessageDecoder.ARROW_ALLOCATOR_LIMIT, "67108864"); // 64MB + + Set fieldsToRead = Sets.newHashSet("field1"); + String topicName = "test-topic-custom"; + + decoder.init(props, fieldsToRead, topicName); + decoder.close(); + + // Test with default allocator limit + ArrowMessageDecoder decoder2 = new ArrowMessageDecoder(); + Map props2 = new HashMap<>(); // No allocator limit set + + decoder2.init(props2, fieldsToRead, topicName); + decoder2.close(); + } + + @Test + public void testArrowMessageDecoderMultipleInits() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id"); + String topicName = "test-multiple-init"; + + // Test multiple initializations (should work without issues) + decoder.init(props, fieldsToRead, topicName); + decoder.init(props, fieldsToRead, topicName); + + decoder.close(); + } + + @Test + public void testArrowMessageDecodingWithInvalidData() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "name", "age"); + String topicName = "test-arrow-topic"; + + decoder.init(props, fieldsToRead, topicName); + + // Test various invalid data scenarios + byte[] invalidData1 = "invalid arrow data".getBytes(); + byte[] invalidData2 = new byte[]{1, 2, 3, 4, 5}; + byte[] emptyData = new byte[0]; + + GenericRow destination = new GenericRow(); + + // Should return null for all invalid data types and null + assertNull(decoder.decode(null, destination)); + assertNull(decoder.decode(invalidData1, destination)); + assertNull(decoder.decode(invalidData2, destination)); + assertNull(decoder.decode(emptyData, destination)); + + // Test with null destination + assertNull(decoder.decode(invalidData1, null)); + + // Clean up + decoder.close(); + } + + @Test + public void testArrowMessageDecoderCloseMultipleTimes() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id"); + String topicName = "test-multiple-close"; + + decoder.init(props, fieldsToRead, topicName); + + // Close multiple times should not cause issues + decoder.close(); + decoder.close(); + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithArrowDataAndDestination() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "name"); + String topicName = "test-real-arrow-with-destination"; + + decoder.init(props, fieldsToRead, topicName); + + // Create real Arrow IPC data + byte[] realArrowData = ArrowTestDataUtil.createValidArrowIpcData(1); + + // Test with provided destination containing existing data + GenericRow destination = new GenericRow(); + destination.putValue("existing_field", "existing_value"); + + GenericRow result = decoder.decode(realArrowData, destination); + + // Should return the same destination object (testing ArrowToGenericRowConverter destination + // handling) + assertSame(destination, result); + + // Should preserve existing data + assertEquals("existing_value", result.getValue("existing_field")); + + // Should contain new converted Arrow data + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(1, rows.size()); + assertEquals(1, rows.get(0).getValue("id")); + assertEquals("name_1", rows.get(0).getValue("name")); + + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithEmptyData() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "name"); + String topicName = "test-empty-arrow-data"; + + decoder.init(props, fieldsToRead, topicName); + + // Test with empty Arrow data (zero batches) + byte[] emptyArrowData = ArrowTestDataUtil.createEmptyArrowIpcData(); + GenericRow result = decoder.decode(emptyArrowData, null); + + // Should handle empty data gracefully - might return null or empty result + // This tests the edge case of zero batches + if (result != null) { + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + if (rows != null) { + assertEquals(0, rows.size()); + } + } + + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithMultipleDataTypes() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "name", "price", "active", "timestamp"); + String topicName = "test-multi-type-arrow-data"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with multiple data types + byte[] multiTypeArrowData = ArrowTestDataUtil.createMultiTypeArrowIpcData(3); + GenericRow result = decoder.decode(multiTypeArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(3, rows.size()); + + // Verify different data types are correctly handled + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + assertEquals("product_1", row0.getValue("name").toString()); + assertEquals(10.99, (Double) row0.getValue("price"), 0.01); + assertEquals(true, row0.getValue("active")); // BitVector returns boolean + assertNotNull(row0.getValue("timestamp")); // Timestamp should be present + + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + assertEquals("product_2", row1.getValue("name").toString()); + assertEquals(15.99, (Double) row1.getValue("price"), 0.01); + assertEquals(false, row1.getValue("active")); + + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithBatchContainingMultipleRows() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "batch_num", "value"); + String topicName = "test-multi-batch-arrow-data"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with multiple batches - but note: ArrowMessageDecoder processes one batch + // per decode() call + // So we test with a single batch containing multiple rows instead + byte[] multiBatchArrowData = + ArrowTestDataUtil.createMultiBatchArrowIpcData(1, 3); // 1 batch, 3 rows + GenericRow result = decoder.decode(multiBatchArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(3, rows.size()); // 1 batch × 3 rows = 3 total rows + + // Verify data from the batch + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + assertEquals(0, row0.getValue("batch_num")); + assertEquals("batch_0_row_0", row0.getValue("value").toString()); + + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + assertEquals(0, row1.getValue("batch_num")); + assertEquals("batch_0_row_1", row1.getValue("value").toString()); + + GenericRow row2 = rows.get(2); + assertEquals(3, row2.getValue("id")); + assertEquals(0, row2.getValue("batch_num")); + assertEquals("batch_0_row_2", row2.getValue("value").toString()); + + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithDictionaryEncodedData() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "category", "price"); + String topicName = "test-dictionary-encoded-arrow-data"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with real dictionary encoding + byte[] dictionaryArrowData = ArrowTestDataUtil.createDictionaryEncodedArrowIpcData(8); + GenericRow result = decoder.decode(dictionaryArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(8, rows.size()); + + // Verify dictionary-encoded values are properly decoded by ArrowToGenericRowConverter + // Dictionary: id=1 -> "Electronics", id=2 -> "Books", id=3 -> "Clothing", id=4 -> "Home" + // Data cycles through indices 0,1,2,3,0,1,2,3 which should be resolved to string values + + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + assertEquals("Electronics", row0.getValue("category")); + assertEquals(19.99, (Double) row0.getValue("price"), 0.01); + + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + assertEquals("Books", row1.getValue("category")); + assertEquals(29.99, (Double) row1.getValue("price"), 0.01); + + GenericRow row2 = rows.get(2); + assertEquals(3, row2.getValue("id")); + assertEquals("Clothing", row2.getValue("category")); + assertEquals(39.99, (Double) row2.getValue("price"), 0.01); + + GenericRow row3 = rows.get(3); + assertEquals(4, row3.getValue("id")); + assertEquals("Home", row3.getValue("category")); + assertEquals(49.99, (Double) row3.getValue("price"), 0.01); + + // Verify cycling continues - row 4 should have same category as row 0 + GenericRow row4 = rows.get(4); + assertEquals(5, row4.getValue("id")); + assertEquals("Electronics", row4.getValue("category")); + assertEquals(59.99, (Double) row4.getValue("price"), 0.01); + + decoder.close(); + } + + @Test + public void testArrowDataTypeCompatibility() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "name", "price", "active", "timestamp"); + String topicName = "test-data-type-compatibility"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with multiple data types to verify compatibility + byte[] multiTypeArrowData = ArrowTestDataUtil.createMultiTypeArrowIpcData(3); + GenericRow result = decoder.decode(multiTypeArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(3, rows.size()); + + // Check the actual data types returned by Arrow and verify Pinot compatibility + GenericRow row0 = rows.get(0); + + // Verify each field's type and compatibility + Object idValue = row0.getValue("id"); + assertNotNull(idValue, "ID should not be null"); + assertTrue(idValue instanceof Integer, "ID should be Integer compatible"); + + Object nameValue = row0.getValue("name"); + assertNotNull(nameValue, "Name should not be null"); + // After conversion, Arrow Text should be converted to String for Pinot compatibility + assertTrue(nameValue instanceof String, "Name should be String after conversion"); + assertEquals("product_1", nameValue); + LOGGER.info("Arrow name field successfully converted to String: {}", nameValue); + + Object priceValue = row0.getValue("price"); + assertNotNull(priceValue, "Price should not be null"); + assertTrue(priceValue instanceof Double, "Price should be Double compatible"); + + Object activeValue = row0.getValue("active"); + assertNotNull(activeValue, "Active should not be null"); + // BitVector.getObject() returns Boolean + assertTrue(activeValue instanceof Boolean, "Active should be Boolean compatible"); + + Object timestampValue = row0.getValue("timestamp"); + assertNotNull(timestampValue, "Timestamp should not be null"); + // After conversion, Arrow LocalDateTime should be converted to java.sql.Timestamp for Pinot + // compatibility + assertTrue( + timestampValue instanceof java.sql.Timestamp, + "Timestamp should be java.sql.Timestamp after conversion"); + java.sql.Timestamp ts = (java.sql.Timestamp) timestampValue; + assertTrue(ts.getTime() > 0, "Timestamp should be a positive value"); + LOGGER.info( + "Arrow timestamp field successfully converted to java.sql.Timestamp: {}", timestampValue); + + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithListVectors() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "numbers", "tags"); + String topicName = "test-list-vectors"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with List vectors + byte[] listArrowData = ArrowTestDataUtil.createListArrowIpcData(3); + GenericRow result = decoder.decode(listArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(3, rows.size()); + + // Verify first row - should have 1 number and 2 tags + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + Object numbersValue0 = row0.getValue("numbers"); + assertNotNull(numbersValue0, "Numbers should not be null"); + assertTrue(numbersValue0 instanceof List); + @SuppressWarnings("unchecked") + List numbersList0 = (List) numbersValue0; + assertEquals(1, numbersList0.size()); + assertEquals(10, numbersList0.get(0)); + + Object tagsValue0 = row0.getValue("tags"); + assertNotNull(tagsValue0, "Tags should not be null"); + assertTrue(tagsValue0 instanceof List); + @SuppressWarnings("unchecked") + List tagsList0 = (List) tagsValue0; + assertEquals(2, tagsList0.size()); + assertEquals("tag_0_0", tagsList0.get(0).toString()); + assertEquals("tag_0_1", tagsList0.get(1).toString()); + + // Verify second row - should have 2 numbers and 2 tags + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + Object numbersValue1 = row1.getValue("numbers"); + assertNotNull(numbersValue1); + @SuppressWarnings("unchecked") + List numbersList1 = (List) numbersValue1; + assertEquals(2, numbersList1.size()); + assertEquals(20, numbersList1.get(0)); + assertEquals(21, numbersList1.get(1)); + + // Verify third row - should have 3 numbers + GenericRow row2 = rows.get(2); + assertEquals(3, row2.getValue("id")); + Object numbersValue2 = row2.getValue("numbers"); + @SuppressWarnings("unchecked") + List numbersList2 = (List) numbersValue2; + assertEquals(3, numbersList2.size()); + assertEquals(30, numbersList2.get(0)); + assertEquals(31, numbersList2.get(1)); + assertEquals(32, numbersList2.get(2)); + + LOGGER.info("List vector test completed successfully with {} rows", rows.size()); + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithStructVectors() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "person"); + String topicName = "test-struct-vectors"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with Struct vectors + byte[] structArrowData = ArrowTestDataUtil.createStructArrowIpcData(2); + GenericRow result = decoder.decode(structArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(2, rows.size()); + + // Verify first row with nested struct + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + Object personValue0 = row0.getValue("person"); + assertNotNull(personValue0); + assertTrue(personValue0 instanceof Map); + @SuppressWarnings("unchecked") + Map personMap0 = (Map) personValue0; + assertEquals("Person_1", personMap0.get("name").toString()); + assertEquals(25, personMap0.get("age")); + @SuppressWarnings("unchecked") + Map address0 = (Map) personMap0.get("address"); + assertEquals("1 Main St", address0.get("street").toString()); + assertEquals("City_1", address0.get("city").toString()); + + // Verify second row + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + Object personValue1 = row1.getValue("person"); + assertNotNull(personValue1); + assertTrue(personValue1 instanceof Map); + @SuppressWarnings("unchecked") + Map personMap1 = (Map) personValue1; + assertEquals("Person_2", personMap1.get("name").toString()); + assertEquals(26, personMap1.get("age")); + @SuppressWarnings("unchecked") + Map address1 = (Map) personMap1.get("address"); + assertEquals("2 Main St", address1.get("street").toString()); + assertEquals("City_2", address1.get("city").toString()); + + LOGGER.info("Struct vector test completed successfully with {} rows", rows.size()); + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithMapVectors() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "metadata"); + String topicName = "test-map-vectors"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with Map vectors + byte[] mapArrowData = ArrowTestDataUtil.createMapArrowIpcData(2); + GenericRow result = decoder.decode(mapArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(2, rows.size()); + + // Verify first row with map data + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + Object metadataValue0 = row0.getValue("metadata"); + assertNotNull(metadataValue0); + assertTrue(metadataValue0 instanceof Map); + @SuppressWarnings("unchecked") + Map meta0 = (Map) metadataValue0; + assertTrue(meta0.values().contains(100)); + assertTrue(meta0.values().contains(101)); + + // Verify second row - should have 3 entries (2 + (1%2) = 3) + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + Object metadataValue1 = row1.getValue("metadata"); + assertNotNull(metadataValue1); + assertTrue(metadataValue1 instanceof Map); + @SuppressWarnings("unchecked") + Map meta1 = (Map) metadataValue1; + assertTrue(meta1.values().contains(200)); + assertTrue(meta1.values().contains(201)); + assertTrue(meta1.values().contains(202)); + + LOGGER.info("Map vector test completed successfully with {} rows", rows.size()); + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithNestedMapValues() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "metadata"); + String topicName = "test-nested-map-values"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with Map values that are themselves Maps + byte[] nestedMapArrowData = ArrowTestDataUtil.createNestedMapArrowIpcData(2); + GenericRow result = decoder.decode(nestedMapArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(2, rows.size()); + + // Verify first row: metadata is a Map> + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + Object metadataValue0 = row0.getValue("metadata"); + assertNotNull(metadataValue0); + assertTrue(metadataValue0 instanceof Map); + @SuppressWarnings("unchecked") + Map outer0 = (Map) metadataValue0; + assertTrue(outer0.size() >= 2); + for (Object innerMapObj : outer0.values()) { + assertTrue(innerMapObj instanceof Map); + @SuppressWarnings("unchecked") + Map inner = (Map) innerMapObj; + assertTrue(inner.size() >= 2); + // Values should be integers from generator + for (Object v : inner.values()) { + assertTrue(v instanceof Integer); + } + } + + // Verify second row similarly + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + Object metadataValue1 = row1.getValue("metadata"); + assertNotNull(metadataValue1); + assertTrue(metadataValue1 instanceof Map); + @SuppressWarnings("unchecked") + Map outer1 = (Map) metadataValue1; + assertTrue(outer1.size() >= 2); + boolean sawThreeInner = false; + for (Object innerMapObj : outer1.values()) { + assertTrue(innerMapObj instanceof Map); + @SuppressWarnings("unchecked") + Map inner = (Map) innerMapObj; + if (inner.size() == 3) { + sawThreeInner = true; + } + } + assertTrue(sawThreeInner); + + decoder.close(); + } + + @Test + public void testArrowMessageDecoderWithNestedListStruct() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "items"); + String topicName = "test-nested-list-struct"; + + decoder.init(props, fieldsToRead, topicName); + + // Create Arrow data with nested List of Structs + byte[] nestedArrowData = ArrowTestDataUtil.createNestedListStructArrowIpcData(3); + GenericRow result = decoder.decode(nestedArrowData, null); + + assertNotNull(result); + @SuppressWarnings("unchecked") + List rows = (List) result.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(rows); + assertEquals(3, rows.size()); + + // Verify first row - should have 1 item (1 + (0%3) = 1) + GenericRow row0 = rows.get(0); + assertEquals(1, row0.getValue("id")); + Object itemsValue0 = row0.getValue("items"); + assertNotNull(itemsValue0); + assertTrue(itemsValue0 instanceof List); + @SuppressWarnings("unchecked") + List items0 = (List) itemsValue0; + assertEquals(1, items0.size()); + @SuppressWarnings("unchecked") + Map item00 = (Map) items0.get(0); + assertEquals("item_0_0", item00.get("item_name").toString()); + assertEquals(10.99, (Double) item00.get("item_price"), 0.01); + + // Verify second row - should have 2 items (1 + (1%3) = 2) + GenericRow row1 = rows.get(1); + assertEquals(2, row1.getValue("id")); + Object itemsValue1 = row1.getValue("items"); + assertNotNull(itemsValue1); + @SuppressWarnings("unchecked") + List items1 = (List) itemsValue1; + assertEquals(2, items1.size()); + @SuppressWarnings("unchecked") + Map item10 = (Map) items1.get(0); + assertEquals("item_1_0", item10.get("item_name").toString()); + assertEquals(15.99, (Double) item10.get("item_price"), 0.01); + @SuppressWarnings("unchecked") + Map item11 = (Map) items1.get(1); + assertEquals("item_1_1", item11.get("item_name").toString()); + assertEquals(16.99, (Double) item11.get("item_price"), 0.01); + + // Verify third row - should have 3 items (1 + (2%3) = 3) + GenericRow row2 = rows.get(2); + assertEquals(3, row2.getValue("id")); + Object itemsValue2 = row2.getValue("items"); + assertNotNull(itemsValue2); + @SuppressWarnings("unchecked") + List items2 = (List) itemsValue2; + assertEquals(3, items2.size()); + + LOGGER.info("Nested List-Struct test completed successfully with {} rows", rows.size()); + decoder.close(); + } + + @Test + public void testArrowNestedStructureCompatibilityWithPinot() + throws Exception { + ArrowMessageDecoder decoder = new ArrowMessageDecoder(); + + Map props = new HashMap<>(); + Set fieldsToRead = Sets.newHashSet("id", "numbers", "person", "metadata", "items"); + String topicName = "test-nested-compatibility"; + + decoder.init(props, fieldsToRead, topicName); + + // Test each nested structure type individually for compatibility + // Test List compatibility + byte[] listData = ArrowTestDataUtil.createListArrowIpcData(1); + GenericRow listResult = decoder.decode(listData, null); + assertNotNull(listResult, "List data should be decodable"); + + // Test Struct compatibility + byte[] structData = ArrowTestDataUtil.createStructArrowIpcData(1); + GenericRow structResult = decoder.decode(structData, null); + assertNotNull(structResult, "Struct data should be decodable"); + + // Test Map compatibility + byte[] mapData = ArrowTestDataUtil.createMapArrowIpcData(1); + GenericRow mapResult = decoder.decode(mapData, null); + assertNotNull(mapResult, "Map data should be decodable"); + + // Test complex nested structures + byte[] nestedData = ArrowTestDataUtil.createNestedListStructArrowIpcData(1); + GenericRow nestedResult = decoder.decode(nestedData, null); + assertNotNull(nestedResult, "Nested List-Struct data should be decodable"); + + // Verify that all simulated nested structures produce valid GenericRow objects + @SuppressWarnings("unchecked") + List listRows = + (List) listResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(listRows, "List result should contain rows"); + assertTrue(listRows.size() > 0, "List result should have at least one row"); + + // Verify nested list data is accessible + GenericRow firstListRow = listRows.get(0); + assertNotNull(firstListRow.getValue("numbers"), "List row should have numbers"); + + @SuppressWarnings("unchecked") + List structRows = + (List) structResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(structRows, "Struct result should contain rows"); + assertTrue(structRows.size() > 0, "Struct result should have at least one row"); + + // Verify struct data is accessible + GenericRow firstStructRow = structRows.get(0); + assertNotNull(firstStructRow.getValue("person"), "Struct row should have person"); + + @SuppressWarnings("unchecked") + List mapRows = + (List) mapResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(mapRows, "Map result should contain rows"); + assertTrue(mapRows.size() > 0, "Map result should have at least one row"); + + // Verify map data is accessible + GenericRow firstMapRow = mapRows.get(0); + assertNotNull(firstMapRow.getValue("metadata"), "Map row should have metadata"); + + @SuppressWarnings("unchecked") + List nestedRows = + (List) nestedResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY); + assertNotNull(nestedRows, "Nested result should contain rows"); + assertTrue(nestedRows.size() > 0, "Nested result should have at least one row"); + + // Verify nested list-struct data is accessible + GenericRow firstNestedRow = nestedRows.get(0); + assertNotNull(firstNestedRow.getValue("items"), "Nested row should have items"); + + LOGGER.info( + "All nested structure types are compatible with ArrowMessageDecoder and produce valid GenericRow objects"); + decoder.close(); + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java new file mode 100644 index 000000000000..206dc7d85a56 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java @@ -0,0 +1,607 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.arrow.util; + +import java.io.ByteArrayOutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.List; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryEncoder; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; + + +public class ArrowTestDataUtil { + + private ArrowTestDataUtil() { + } + + public static byte[] createValidArrowIpcData(int numRows) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field nameField = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null); + Schema schema = new Schema(Arrays.asList(idField, nameField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + + root.allocateNew(); + idVector.allocateNew(numRows); + nameVector.allocateNew(numRows * 10, numRows); + + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + nameVector.set(i, ("name_" + (i + 1)).getBytes()); + } + + idVector.setValueCount(numRows); + nameVector.setValueCount(numRows); + root.setRowCount(numRows); + + return writeArrowDataToBytes(root, null); + } + } + } + + public static byte[] createMultiTypeArrowIpcData(int numRows) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field nameField = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null); + Field priceField = + new Field( + "price", + FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null); + Field activeField = new Field("active", FieldType.nullable(new ArrowType.Bool()), null); + Field timestampField = + new Field( + "timestamp", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), + null); + + Schema schema = + new Schema(Arrays.asList(idField, nameField, priceField, activeField, timestampField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + Float8Vector priceVector = (Float8Vector) root.getVector("price"); + BitVector activeVector = (BitVector) root.getVector("active"); + TimeStampMilliVector timestampVector = (TimeStampMilliVector) root.getVector("timestamp"); + + root.allocateNew(); + idVector.allocateNew(numRows); + nameVector.allocateNew(numRows * 20, numRows); + priceVector.allocateNew(numRows); + activeVector.allocateNew(numRows); + timestampVector.allocateNew(numRows); + + long baseTime = System.currentTimeMillis(); + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + nameVector.set(i, ("product_" + (i + 1)).getBytes()); + priceVector.set(i, 10.99 + (i * 5.0)); + activeVector.set(i, i % 2 == 0 ? 1 : 0); + timestampVector.set(i, baseTime + (i * 1000L)); + } + + idVector.setValueCount(numRows); + nameVector.setValueCount(numRows); + priceVector.setValueCount(numRows); + activeVector.setValueCount(numRows); + timestampVector.setValueCount(numRows); + root.setRowCount(numRows); + + return writeArrowDataToBytes(root, null); + } + } + } + + public static byte[] createMultiBatchArrowIpcData(int batchCount, int rowsPerBatch) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field batchField = + new Field("batch_num", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field valueField = new Field("value", FieldType.nullable(new ArrowType.Utf8()), null); + Schema schema = new Schema(Arrays.asList(idField, batchField, valueField)); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (WritableByteChannel channel = Channels.newChannel(outputStream); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + ArrowStreamWriter writer = new ArrowStreamWriter(root, null, channel)) { + + writer.start(); + + IntVector idVector = (IntVector) root.getVector("id"); + IntVector batchVector = (IntVector) root.getVector("batch_num"); + VarCharVector valueVector = (VarCharVector) root.getVector("value"); + + int totalRowId = 1; + for (int batch = 0; batch < batchCount; batch++) { + root.allocateNew(); + idVector.allocateNew(rowsPerBatch); + batchVector.allocateNew(rowsPerBatch); + valueVector.allocateNew(rowsPerBatch * 15, rowsPerBatch); + + for (int row = 0; row < rowsPerBatch; row++) { + idVector.set(row, totalRowId++); + batchVector.set(row, batch); + valueVector.set(row, ("batch_" + batch + "_row_" + row).getBytes()); + } + + idVector.setValueCount(rowsPerBatch); + batchVector.setValueCount(rowsPerBatch); + valueVector.setValueCount(rowsPerBatch); + root.setRowCount(rowsPerBatch); + + writer.writeBatch(); + } + + writer.end(); + return outputStream.toByteArray(); + } + } + } + + public static byte[] createEmptyArrowIpcData() + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field nameField = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null); + Schema schema = new Schema(Arrays.asList(idField, nameField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.setRowCount(0); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (WritableByteChannel channel = Channels.newChannel(outputStream); + ArrowStreamWriter writer = new ArrowStreamWriter(root, null, channel)) { + + writer.start(); + writer.end(); + } + + return outputStream.toByteArray(); + } + } + } + + public static byte[] createDictionaryEncodedArrowIpcData(int numRows) + throws Exception { + List dictionaryValues = Arrays.asList("Electronics", "Books", "Clothing", "Home"); + DictionaryEncoding dictionaryEncoding = + new DictionaryEncoding(1L, false, new ArrowType.Int(32, true)); + + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + VarCharVector dictionaryVector = new VarCharVector("category_dict", allocator); + IntVector idVector = new IntVector("id", allocator); + Float8Vector priceVector = new Float8Vector("price", allocator); + VarCharVector categoryUnencoded = + new VarCharVector( + "category", + new FieldType(true, new ArrowType.Utf8(), dictionaryEncoding), + allocator)) { + + dictionaryVector.allocateNew(); + for (int i = 0; i < dictionaryValues.size(); i++) { + dictionaryVector.set(i, dictionaryValues.get(i).getBytes()); + } + dictionaryVector.setValueCount(dictionaryValues.size()); + + Dictionary dictionary = new Dictionary(dictionaryVector, dictionaryEncoding); + DictionaryProvider.MapDictionaryProvider dictionaryProvider = + new DictionaryProvider.MapDictionaryProvider(); + dictionaryProvider.put(dictionary); + + idVector.allocateNew(numRows); + priceVector.allocateNew(numRows); + categoryUnencoded.allocateNew(numRows); + + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + categoryUnencoded.set(i, dictionaryValues.get(i % dictionaryValues.size()).getBytes()); + priceVector.set(i, 19.99 + (i * 10.0)); + } + idVector.setValueCount(numRows); + priceVector.setValueCount(numRows); + categoryUnencoded.setValueCount(numRows); + + try (org.apache.arrow.vector.FieldVector encodedCategoryVector = + (org.apache.arrow.vector.FieldVector) + DictionaryEncoder.encode(categoryUnencoded, dictionary)) { + List fields = + Arrays.asList( + idVector.getField(), encodedCategoryVector.getField(), priceVector.getField()); + List vectors = + Arrays.asList(idVector, encodedCategoryVector, priceVector); + try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) { + return writeArrowDataToBytes(root, dictionaryProvider); + } + } + } + } + + public static byte[] createListArrowIpcData(int numRows) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field numbersElementField = + new Field("$data$", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field numbersField = + new Field( + "numbers", + FieldType.nullable(new ArrowType.List()), + Arrays.asList(numbersElementField)); + + Field tagsElementField = new Field("$data$", FieldType.nullable(new ArrowType.Utf8()), null); + Field tagsField = + new Field( + "tags", FieldType.nullable(new ArrowType.List()), Arrays.asList(tagsElementField)); + + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Schema schema = new Schema(Arrays.asList(idField, numbersField, tagsField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + ListVector numbersVector = (ListVector) root.getVector("numbers"); + ListVector tagsVector = (ListVector) root.getVector("tags"); + IntVector numbersChild = (IntVector) numbersVector.getDataVector(); + VarCharVector tagsChild = (VarCharVector) tagsVector.getDataVector(); + + root.allocateNew(); + idVector.allocateNew(numRows); + numbersVector.allocateNew(); + tagsVector.allocateNew(); + + int numbersElemIndex = 0; + int tagsElemIndex = 0; + + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + + numbersVector.startNewValue(i); + for (int j = 0; j <= i; j++) { + numbersChild.setSafe(numbersElemIndex++, (i + 1) * 10 + j); + } + numbersVector.endValue(i, i + 1); + + tagsVector.startNewValue(i); + for (int j = 0; j < 2; j++) { + tagsChild.setSafe(tagsElemIndex++, ("tag_" + i + "_" + j).getBytes()); + } + tagsVector.endValue(i, 2); + } + + idVector.setValueCount(numRows); + numbersChild.setValueCount(numbersElemIndex); + numbersVector.setValueCount(numRows); + tagsChild.setValueCount(tagsElemIndex); + tagsVector.setValueCount(numRows); + root.setRowCount(numRows); + + return writeArrowDataToBytes(root, null); + } + } + } + + public static byte[] createStructArrowIpcData(int numRows) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field nameField = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null); + Field ageField = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field streetField = new Field("street", FieldType.nullable(new ArrowType.Utf8()), null); + Field cityField = new Field("city", FieldType.nullable(new ArrowType.Utf8()), null); + Field addressField = + new Field( + "address", + FieldType.nullable(new ArrowType.Struct()), + Arrays.asList(streetField, cityField)); + + Field personField = + new Field( + "person", + FieldType.nullable(new ArrowType.Struct()), + Arrays.asList(nameField, ageField, addressField)); + + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Schema schema = new Schema(Arrays.asList(idField, personField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + StructVector personVector = (StructVector) root.getVector("person"); + + root.allocateNew(); + idVector.allocateNew(numRows); + personVector.allocateNew(); + + VarCharVector nameVector = (VarCharVector) personVector.getChild("name"); + IntVector ageVector = (IntVector) personVector.getChild("age"); + StructVector addressVector = (StructVector) personVector.getChild("address"); + VarCharVector streetVector = (VarCharVector) addressVector.getChild("street"); + VarCharVector cityVector = (VarCharVector) addressVector.getChild("city"); + + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + personVector.setIndexDefined(i); + addressVector.setIndexDefined(i); + nameVector.setSafe(i, ("Person_" + (i + 1)).getBytes()); + ageVector.setSafe(i, 25 + i); + streetVector.setSafe(i, ((i + 1) + " Main St").getBytes()); + cityVector.setSafe(i, ("City_" + (i + 1)).getBytes()); + } + + idVector.setValueCount(numRows); + personVector.setValueCount(numRows); + nameVector.setValueCount(numRows); + ageVector.setValueCount(numRows); + addressVector.setValueCount(numRows); + streetVector.setValueCount(numRows); + cityVector.setValueCount(numRows); + root.setRowCount(numRows); + + return writeArrowDataToBytes(root, null); + } + } + } + + public static byte[] createMapArrowIpcData(int numRows) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field keyField = + new Field(MapVector.KEY_NAME, FieldType.notNullable(new ArrowType.Utf8()), null); + Field valField = + new Field(MapVector.VALUE_NAME, FieldType.nullable(new ArrowType.Int(32, true)), null); + Field entriesField = + new Field( + MapVector.DATA_VECTOR_NAME, + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(keyField, valField)); + Field mapField = + new Field( + "metadata", + FieldType.nullable(new ArrowType.Map(false)), + Arrays.asList(entriesField)); + + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Schema schema = new Schema(Arrays.asList(idField, mapField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + MapVector mapVector = (MapVector) root.getVector("metadata"); + StructVector entries = (StructVector) mapVector.getDataVector(); + VarCharVector keyVector = (VarCharVector) entries.getChild(MapVector.KEY_NAME); + IntVector valueVector = (IntVector) entries.getChild(MapVector.VALUE_NAME); + + root.allocateNew(); + idVector.allocateNew(numRows); + mapVector.allocateNew(); + + int entryIndex = 0; + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + int entriesCount = 2 + (i % 2); + mapVector.startNewValue(i); + for (int j = 0; j < entriesCount; j++) { + keyVector.setSafe(entryIndex, ("key_" + i + "_" + j).getBytes()); + valueVector.setSafe(entryIndex, (i + 1) * 100 + j); + entries.setIndexDefined(entryIndex); + entryIndex++; + } + mapVector.endValue(i, entriesCount); + } + + idVector.setValueCount(numRows); + keyVector.setValueCount(entryIndex); + valueVector.setValueCount(entryIndex); + entries.setValueCount(entryIndex); + mapVector.setValueCount(numRows); + root.setRowCount(numRows); + + return writeArrowDataToBytes(root, null); + } + } + } + + public static byte[] createNestedMapArrowIpcData(int numRows) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + // Define inner map (value of outer map) + Field innerKeyField = + new Field(MapVector.KEY_NAME, FieldType.notNullable(new ArrowType.Utf8()), null); + Field innerValField = + new Field(MapVector.VALUE_NAME, FieldType.nullable(new ArrowType.Int(32, true)), null); + Field innerEntriesField = + new Field( + MapVector.DATA_VECTOR_NAME, + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(innerKeyField, innerValField)); + Field innerMapField = + new Field( + MapVector.VALUE_NAME, + FieldType.nullable(new ArrowType.Map(false)), + Arrays.asList(innerEntriesField)); + + // Define outer map with value as the inner map + Field outerKeyField = + new Field(MapVector.KEY_NAME, FieldType.notNullable(new ArrowType.Utf8()), null); + Field outerEntriesField = + new Field( + MapVector.DATA_VECTOR_NAME, + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(outerKeyField, innerMapField)); + Field outerMapField = + new Field( + "metadata", + FieldType.nullable(new ArrowType.Map(false)), + Arrays.asList(outerEntriesField)); + + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Schema schema = new Schema(Arrays.asList(idField, outerMapField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + MapVector outerMapVector = (MapVector) root.getVector("metadata"); + StructVector outerEntries = (StructVector) outerMapVector.getDataVector(); + VarCharVector outerKeyVector = (VarCharVector) outerEntries.getChild(MapVector.KEY_NAME); + MapVector innerMapVector = (MapVector) outerEntries.getChild(MapVector.VALUE_NAME); + StructVector innerEntries = (StructVector) innerMapVector.getDataVector(); + VarCharVector innerKeyVector = (VarCharVector) innerEntries.getChild(MapVector.KEY_NAME); + IntVector innerValueVector = (IntVector) innerEntries.getChild(MapVector.VALUE_NAME); + + root.allocateNew(); + idVector.allocateNew(numRows); + outerMapVector.allocateNew(); + + int outerEntryIndex = 0; + int innerEntryIndex = 0; + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + + int outerEntriesCount = 2 + (i % 2); // 2 or 3 outer entries + outerMapVector.startNewValue(i); + for (int j = 0; j < outerEntriesCount; j++) { + // Set outer key + outerKeyVector.setSafe(outerEntryIndex, ("outer_key_" + i + "_" + j).getBytes()); + + // Populate inner map for this outer entry at aligned index + innerMapVector.startNewValue(outerEntryIndex); + int innerEntriesCount = 2 + (j % 2); // 2 or 3 inner entries + for (int k = 0; k < innerEntriesCount; k++) { + innerKeyVector.setSafe( + innerEntryIndex, ("inner_key_" + i + "_" + j + "_" + k).getBytes()); + innerValueVector.setSafe(innerEntryIndex, (i + 1) * 1000 + j * 10 + k); + innerEntries.setIndexDefined(innerEntryIndex); + innerEntryIndex++; + } + innerMapVector.endValue(outerEntryIndex, innerEntriesCount); + + outerEntries.setIndexDefined(outerEntryIndex); + outerEntryIndex++; + } + outerMapVector.endValue(i, outerEntriesCount); + } + + idVector.setValueCount(numRows); + outerKeyVector.setValueCount(outerEntryIndex); + innerKeyVector.setValueCount(innerEntryIndex); + innerValueVector.setValueCount(innerEntryIndex); + innerEntries.setValueCount(innerEntryIndex); + innerMapVector.setValueCount(outerEntryIndex); + outerEntries.setValueCount(outerEntryIndex); + outerMapVector.setValueCount(numRows); + root.setRowCount(numRows); + + return writeArrowDataToBytes(root, null); + } + } + } + + public static byte[] createNestedListStructArrowIpcData(int numRows) + throws Exception { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Field itemNameField = new Field("item_name", FieldType.nullable(new ArrowType.Utf8()), null); + Field itemPriceField = + new Field( + "item_price", + FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null); + Field itemStructField = + new Field( + "$data$", + FieldType.nullable(new ArrowType.Struct()), + Arrays.asList(itemNameField, itemPriceField)); + + Field itemsField = + new Field( + "items", FieldType.nullable(new ArrowType.List()), Arrays.asList(itemStructField)); + + Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null); + Schema schema = new Schema(Arrays.asList(idField, itemsField)); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + ListVector itemsVector = (ListVector) root.getVector("items"); + StructVector itemStructVector = (StructVector) itemsVector.getDataVector(); + VarCharVector itemNameVector = (VarCharVector) itemStructVector.getChild("item_name"); + Float8Vector itemPriceVector = (Float8Vector) itemStructVector.getChild("item_price"); + + root.allocateNew(); + idVector.allocateNew(numRows); + itemsVector.allocateNew(); + + int structIndex = 0; + for (int i = 0; i < numRows; i++) { + idVector.set(i, i + 1); + int itemsCount = 1 + (i % 3); + itemsVector.startNewValue(i); + for (int j = 0; j < itemsCount; j++) { + itemNameVector.setSafe(structIndex, ("item_" + i + "_" + j).getBytes()); + itemPriceVector.setSafe(structIndex, 10.99 + (i * 5.0) + j); + itemStructVector.setIndexDefined(structIndex); + structIndex++; + } + itemsVector.endValue(i, itemsCount); + } + + idVector.setValueCount(numRows); + itemsVector.setValueCount(numRows); + itemNameVector.setValueCount(structIndex); + itemPriceVector.setValueCount(structIndex); + root.setRowCount(numRows); + + return writeArrowDataToBytes(root, null); + } + } + } + + private static byte[] writeArrowDataToBytes( + VectorSchemaRoot root, DictionaryProvider dictionaryProvider) + throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (WritableByteChannel channel = Channels.newChannel(outputStream); + ArrowStreamWriter writer = new ArrowStreamWriter(root, dictionaryProvider, channel)) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + return outputStream.toByteArray(); + } +} diff --git a/pinot-plugins/pinot-input-format/pom.xml b/pinot-plugins/pinot-input-format/pom.xml index e9aabe0079ff..78c6057fe0f9 100644 --- a/pinot-plugins/pinot-input-format/pom.xml +++ b/pinot-plugins/pinot-input-format/pom.xml @@ -37,6 +37,7 @@ + pinot-arrow pinot-avro pinot-avro-base pinot-clp-log