Skip to content

Conversation

lnbest0707-uber
Copy link
Contributor

feature ingestion

Issue: #16643

Add the initial version of Arrow decoder to Pinot. With the decoder, Pinot can decode the stream data in the basic Apache arrow format. This is part of the 1st stage delivery of the proposal above.

Performance and Improvements:

  • With around 200 messages a patch, the Kafka data volume could reduce 20-30%.
  • If the data could benefit from Arrow dictionary encoding, the data volume could reduce up to 70-80%.

Some limitation and TODOs:

  • Arrow is a columnar data format, to represent rows of data, it is efficient to batch them together. Hence, sending single message in Arrow format would be very inefficient. And batching means slightly higher latency and larger message size (average per message size would be smaller, but the sent message might be bigger).
  • The 1st version would handle common data structure with dictionary support on flat key/values. It does not support the nested dictionary encoding so far.

@codecov-commenter
Copy link

codecov-commenter commented Oct 16, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 63.49%. Comparing base (9d32f37) to head (dd9e5d9).

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17031      +/-   ##
============================================
- Coverage     63.51%   63.49%   -0.03%     
  Complexity     1419     1419              
============================================
  Files          3082     3082              
  Lines        181844   181844              
  Branches      27916    27916              
============================================
- Hits         115500   115459      -41     
- Misses        57458    57501      +43     
+ Partials       8886     8884       -2     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.47% <ø> (-0.03%) ⬇️
java-21 63.45% <ø> (+<0.01%) ⬆️
temurin 63.49% <ø> (-0.03%) ⬇️
unittests 63.49% <ø> (-0.03%) ⬇️
unittests1 56.32% <ø> (-0.01%) ⬇️
unittests2 33.66% <ø> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Apache Arrow format decoding support to Pinot, enabling the ingestion of streaming data in Arrow IPC format. The decoder converts Arrow batches into Pinot's GenericRow format, supporting primitive types, complex nested structures (lists, maps, structs), and dictionary encoding for improved data compression.

Key Changes

  • Arrow decoder implementation with dictionary encoding support reduces Kafka data volume by 20-80%
  • Comprehensive type conversion handling between Arrow and Pinot data types
  • Support for nested data structures (lists, maps, structs) with recursive conversion

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
pinot-plugins/pinot-input-format/pom.xml Added pinot-arrow module to the build
pinot-plugins/pinot-input-format/pinot-arrow/pom.xml Defined dependencies for Apache Arrow libraries (version 18.0.0)
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java Implements StreamMessageDecoder to decode Arrow IPC messages into GenericRow
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java Handles conversion logic from Arrow VectorSchemaRoot to GenericRow with type compatibility handling
pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java Utility for generating test Arrow data with various data types and structures
pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java Comprehensive test suite covering decoder functionality, data types, and edge cases

Comment on lines +227 to +237
private Object flattenArrowMap(MapVector fieldVector, int rowIndex) {
Map<String, Object> flattened = new LinkedHashMap<>();
UnionMapReader reader = fieldVector.getReader();
reader.setPosition(rowIndex);
while (reader.next()) {
flattened.put(
reader.key().readObject().toString(),
convertArrowTypeToPinotCompatible(reader.value().readObject()));
}
return flattened;
}
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method flattenArrowMap is defined but never called within this class. This is dead code that should either be removed or integrated into the conversion logic if it was intended to handle MapVector types.

Suggested change
private Object flattenArrowMap(MapVector fieldVector, int rowIndex) {
Map<String, Object> flattened = new LinkedHashMap<>();
UnionMapReader reader = fieldVector.getReader();
reader.setPosition(rowIndex);
while (reader.next()) {
flattened.put(
reader.key().readObject().toString(),
convertArrowTypeToPinotCompatible(reader.value().readObject()));
}
return flattened;
}

Copilot uses AI. Check for mistakes.

Comment on lines +107 to +111
private String buildEventLatencyMetricName(String kafkaTopicName) {
return "ServerMetrics.realtime.arrow.consumer.delay."
+ kafkaTopicName.replace(".", "_");
}

Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method buildEventLatencyMetricName is defined but never called. This appears to be dead code that should be removed unless metrics collection is planned for future implementation.

Suggested change
private String buildEventLatencyMetricName(String kafkaTopicName) {
return "ServerMetrics.realtime.arrow.consumer.delay."
+ kafkaTopicName.replace(".", "_");
}

Copilot uses AI. Check for mistakes.

* This decoder handles Arrow streaming format and converts Arrow data to Pinot's columnar format.
*/
public class ArrowMessageDecoder implements StreamMessageDecoder<byte[]> {
public static final String ARROW_SCHEMA_CONFIG = "arrow.schema.config";
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constant ARROW_SCHEMA_CONFIG is defined but never used in this class or the test files. Consider removing this unused constant or documenting its intended future use.

Suggested change
public static final String ARROW_SCHEMA_CONFIG = "arrow.schema.config";

Copilot uses AI. Check for mistakes.

Comment on lines +49 to +57
private Set<String> _fieldsToRead;
private RootAllocator _allocator;
private ArrowToGenericRowConverter _converter;

@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
throws Exception {
_kafkaTopicName = topicName;
_fieldsToRead = fieldsToRead;
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field _fieldsToRead is assigned in the init method but never used elsewhere in the class. Either implement field filtering logic based on this set or remove it if all fields should always be read.

Suggested change
private Set<String> _fieldsToRead;
private RootAllocator _allocator;
private ArrowToGenericRowConverter _converter;
@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
throws Exception {
_kafkaTopicName = topicName;
_fieldsToRead = fieldsToRead;
private RootAllocator _allocator;
private ArrowToGenericRowConverter _converter;
@Override
public void init(Map<String, String> props, String topicName)
throws Exception {
_kafkaTopicName = topicName;

Copilot uses AI. Check for mistakes.

Comment on lines +92 to +96
logger.error(
"Error decoding Arrow message for kafka topic {} : {}",
_kafkaTopicName,
Arrays.toString(payload),
e);
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the full payload with Arrays.toString(payload) could expose sensitive data in logs. Consider logging only payload metadata like length or a hash instead of the actual content.

Copilot uses AI. Check for mistakes.

<properties>
<pinot.root>${basedir}/../../..</pinot.root>
<shade.phase.prop>package</shade.phase.prop>
<arrow.version>18.0.0</arrow.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pinot root has an arrow version defined already

Copy link
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm

return row;
} catch (Exception e) {
logger.error(
"Error decoding Arrow message for kafka topic {} : {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not specific to kafka

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants