Skip to content

Commit d2465c3

Browse files
GH-698: Improve and fix Avro read consumers (#718)
## What's Changed This PR relates to #698 and is the second in a series intended to provide full Avro read / write support in native Java. It adds round-trip tests for both schemas (Arrow schema -> Avro -> Arrow) and data (Arrow VSR -> Avro block -> Arrow VSR). It also adds a number of fixes and improvements to the Avro Consumers so that data arrives back in its original form after a round trip. The main changes are: * Added a top level method in AvroToArrow to convert Avro schema directly to Arrow schema (this may exist elsewhere, but is needed to provide an API that matches the logic of this implementation) * Avro unions of [ type, null ] or [ null, type ] now have special handling, these are interpreted as a single nullable type rather than a union. Setting legacyMode = false in the AvroToArrowConfig object is required to enable this behaviour, otherwise unions are interpreted literally. Unions with more than 2 elements are always interpreted literally (but, per #108, in practice Java's current Union implementation is probably not usable with Avro atm). * Added support for new logical types (decimal 256, timestamp nano and 3 local timestamp types) * Existing timestamp-mills and timestamp-micros times now interpreted as zone-aware (previously they were interpreted as local, but now the local timestamp types are interpreted as local - I think this is correct per the [Avro spec](https://avro.apache.org/docs/1.12.0/specification/#timestamps)). Requires setting legacyMode = false. * Removed namespaces from generated Arrow field names in complex types. E.g. the Avro field myNamepsace.outerRecord.structField.intField should be called just "intField" inside the Arrow struct. This doesn't affect the skip field logic, which still works using the qualified names. This requires setting legacyMode = false. * Remove unexpected metadata in generated Arrow fields (empty alias lists and attributes interpreted as part of the field schema). This requires setting legacyMode = false. * Use the expected child vector names for Arrow LIST and MAP types when reading. For LIST, the default child vector is called "$data$" which is illegal in Avro, so the child field name is also changed to "item" in the producers. This requires setting legacyMode = false. Breaking changes have been removed from this PR. Per discussion below, all breaking changes are now behind a "legacyMode" flag in the AvroToArrowConfig object, which is enabled by default in all the original code paths. Closes #698 . This change is meant to allow for round trip of schemas and individual Avro data blocks (one Avro data block -> one VSR). File-level capabilities are not included. I have not included anything to recycle the VSR as part of the read API, this feels like it belongs with the file-level piece. Also I have not done anything specific for enums / dict encoding as of yet.
1 parent 4af464c commit d2465c3

22 files changed

+2870
-105
lines changed

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,17 @@ private static <T> T buildBaseTypeSchema(
332332

333333
case List:
334334
case FixedSizeList:
335-
return buildArraySchema(builder.array(), field, namespace);
335+
// Arrow uses "$data$" as the field name for list items, that is not a valid Avro name
336+
Field itemField = field.getChildren().get(0);
337+
if (ListVector.DATA_VECTOR_NAME.equals(itemField.getName())) {
338+
Field safeItemField =
339+
new Field("item", itemField.getFieldType(), itemField.getChildren());
340+
Field safeListField =
341+
new Field(field.getName(), field.getFieldType(), List.of(safeItemField));
342+
return buildArraySchema(builder.array(), safeListField, namespace);
343+
} else {
344+
return buildArraySchema(builder.array(), field, namespace);
345+
}
336346

337347
case Map:
338348
return buildMapSchema(builder.map(), field, namespace);

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrow.java

+19
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,23 @@ public static AvroToArrowVectorIterator avroToArrowIterator(
5959

6060
return AvroToArrowVectorIterator.create(decoder, schema, config);
6161
}
62+
63+
/**
64+
* Convert an Avro schema to its Arrow equivalent.
65+
*
66+
* <p>The resulting set of Arrow fields matches what would be set in the VSR after calling
67+
* avroToArrow() or avroToArrowIterator(), respecting the configuration in the config parameter.
68+
*
69+
* @param schema The Avro schema to convert
70+
* @param config Configuration options for conversion
71+
* @return The equivalent Arrow schema
72+
*/
73+
public static org.apache.arrow.vector.types.pojo.Schema avroToAvroSchema(
74+
Schema schema, AvroToArrowConfig config) {
75+
76+
Preconditions.checkNotNull(schema, "Avro schema object cannot be null");
77+
Preconditions.checkNotNull(config, "config cannot be null");
78+
79+
return AvroToArrowUtils.createArrowSchema(schema, config);
80+
}
6281
}

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowConfig.java

+41
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public class AvroToArrowConfig {
4141
/** The field names which to skip when reading decoder values. */
4242
private final Set<String> skipFieldNames;
4343

44+
/**
45+
* Use legacy-mode to keep compatibility with old behavior (pre-2025), enabled by default. This
46+
* affects how the AvroToArrow code interprets the Avro schema.
47+
*/
48+
private final boolean legacyMode;
49+
4450
/**
4551
* Instantiate an instance.
4652
*
@@ -64,6 +70,37 @@ public class AvroToArrowConfig {
6470
this.targetBatchSize = targetBatchSize;
6571
this.provider = provider;
6672
this.skipFieldNames = skipFieldNames;
73+
74+
// Default values for optional parameters
75+
legacyMode = true; // Keep compatibility with old behavior by default
76+
}
77+
78+
/**
79+
* Instantiate an instance.
80+
*
81+
* @param allocator The memory allocator to construct the Arrow vectors with.
82+
* @param targetBatchSize The maximum rowCount to read each time when partially convert data.
83+
* @param provider The dictionary provider used for enum type, adapter will update this provider.
84+
* @param skipFieldNames Field names which to skip.
85+
* @param legacyMode Keep compatibility with old behavior (pre-2025)
86+
*/
87+
AvroToArrowConfig(
88+
BufferAllocator allocator,
89+
int targetBatchSize,
90+
DictionaryProvider.MapDictionaryProvider provider,
91+
Set<String> skipFieldNames,
92+
boolean legacyMode) {
93+
94+
Preconditions.checkArgument(
95+
targetBatchSize == AvroToArrowVectorIterator.NO_LIMIT_BATCH_SIZE || targetBatchSize > 0,
96+
"invalid targetBatchSize: %s",
97+
targetBatchSize);
98+
99+
this.allocator = allocator;
100+
this.targetBatchSize = targetBatchSize;
101+
this.provider = provider;
102+
this.skipFieldNames = skipFieldNames;
103+
this.legacyMode = legacyMode;
67104
}
68105

69106
public BufferAllocator getAllocator() {
@@ -81,4 +118,8 @@ public DictionaryProvider.MapDictionaryProvider getProvider() {
81118
public Set<String> getSkipFieldNames() {
82119
return skipFieldNames;
83120
}
121+
122+
public boolean isLegacyMode() {
123+
return legacyMode;
124+
}
84125
}

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java

+328-86
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.consumers;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.vector.FieldVector;
21+
import org.apache.avro.io.Decoder;
22+
23+
/**
24+
* Consumer wrapper which consumes nullable type values from avro decoder. Write the data to the
25+
* underlying {@link FieldVector}.
26+
*
27+
* @param <T> The vector within consumer or its delegate.
28+
*/
29+
public class AvroNullableConsumer<T extends FieldVector> extends BaseAvroConsumer<T> {
30+
31+
private final Consumer<T> delegate;
32+
private final int nullIndex;
33+
34+
/** Instantiate a AvroNullableConsumer. */
35+
@SuppressWarnings("unchecked")
36+
public AvroNullableConsumer(Consumer<T> delegate, int nullIndex) {
37+
super((T) delegate.getVector());
38+
this.delegate = delegate;
39+
this.nullIndex = nullIndex;
40+
}
41+
42+
@Override
43+
public void consume(Decoder decoder) throws IOException {
44+
int typeIndex = decoder.readInt();
45+
if (typeIndex == nullIndex) {
46+
decoder.readNull();
47+
delegate.addNull();
48+
} else {
49+
delegate.consume(decoder);
50+
}
51+
currentIndex++;
52+
}
53+
54+
@Override
55+
public void addNull() {
56+
// Can be called by containers of nullable types
57+
delegate.addNull();
58+
currentIndex++;
59+
}
60+
61+
@Override
62+
public void setPosition(int index) {
63+
if (index < 0 || index > vector.getValueCount()) {
64+
throw new IllegalArgumentException("Index out of bounds");
65+
}
66+
delegate.setPosition(index);
67+
super.setPosition(index);
68+
}
69+
70+
@Override
71+
public boolean resetValueVector(T vector) {
72+
boolean delegateOk = delegate.resetValueVector(vector);
73+
boolean thisOk = super.resetValueVector(vector);
74+
return thisOk && delegateOk;
75+
}
76+
77+
@Override
78+
public void close() throws Exception {
79+
super.close();
80+
delegate.close();
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.consumers.logical;
18+
19+
import java.io.IOException;
20+
import java.nio.ByteBuffer;
21+
import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer;
22+
import org.apache.arrow.util.Preconditions;
23+
import org.apache.arrow.vector.Decimal256Vector;
24+
import org.apache.avro.io.Decoder;
25+
26+
/**
27+
* Consumer which consume 256-bit decimal type values from avro decoder. Write the data to {@link
28+
* Decimal256Vector}.
29+
*/
30+
public abstract class AvroDecimal256Consumer extends BaseAvroConsumer<Decimal256Vector> {
31+
32+
protected AvroDecimal256Consumer(Decimal256Vector vector) {
33+
super(vector);
34+
}
35+
36+
/** Consumer for decimal logical type with 256 bit width and original bytes type. */
37+
public static class BytesDecimal256Consumer extends AvroDecimal256Consumer {
38+
39+
private ByteBuffer cacheBuffer;
40+
41+
/** Instantiate a BytesDecimal256Consumer. */
42+
public BytesDecimal256Consumer(Decimal256Vector vector) {
43+
super(vector);
44+
}
45+
46+
@Override
47+
public void consume(Decoder decoder) throws IOException {
48+
cacheBuffer = decoder.readBytes(cacheBuffer);
49+
byte[] bytes = new byte[cacheBuffer.limit()];
50+
Preconditions.checkArgument(bytes.length <= 32, "Decimal bytes length should <= 32.");
51+
cacheBuffer.get(bytes);
52+
vector.setBigEndian(currentIndex++, bytes);
53+
}
54+
}
55+
56+
/** Consumer for decimal logical type with 256 bit width and original fixed type. */
57+
public static class FixedDecimal256Consumer extends AvroDecimal256Consumer {
58+
59+
private final byte[] reuseBytes;
60+
61+
/** Instantiate a FixedDecimal256Consumer. */
62+
public FixedDecimal256Consumer(Decimal256Vector vector, int size) {
63+
super(vector);
64+
Preconditions.checkArgument(size <= 32, "Decimal bytes length should <= 32.");
65+
reuseBytes = new byte[size];
66+
}
67+
68+
@Override
69+
public void consume(Decoder decoder) throws IOException {
70+
decoder.readFixed(reuseBytes);
71+
vector.setBigEndian(currentIndex++, reuseBytes);
72+
}
73+
}
74+
}

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.avro.io.Decoder;
2323

2424
/**
25-
* Consumer which consume date timestamp-micro values from avro decoder. Write the data to {@link
25+
* Consumer which consumes local-timestamp-micros values from avro decoder. Write the data to {@link
2626
* TimeStampMicroVector}.
2727
*/
2828
public class AvroTimestampMicrosConsumer extends BaseAvroConsumer<TimeStampMicroVector> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.consumers.logical;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer;
21+
import org.apache.arrow.vector.TimeStampMicroTZVector;
22+
import org.apache.avro.io.Decoder;
23+
24+
/**
25+
* Consumer which consumes timestamp-micros values from avro decoder. Write the data to {@link
26+
* TimeStampMicroTZVector}.
27+
*/
28+
public class AvroTimestampMicrosTzConsumer extends BaseAvroConsumer<TimeStampMicroTZVector> {
29+
30+
/** Instantiate a AvroTimestampMicrosTzConsumer. */
31+
public AvroTimestampMicrosTzConsumer(TimeStampMicroTZVector vector) {
32+
super(vector);
33+
}
34+
35+
@Override
36+
public void consume(Decoder decoder) throws IOException {
37+
vector.set(currentIndex++, decoder.readLong());
38+
}
39+
}

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.avro.io.Decoder;
2323

2424
/**
25-
* Consumer which consume date timestamp-millis values from avro decoder. Write the data to {@link
25+
* Consumer which consume local-timestamp-millis values from avro decoder. Write the data to {@link
2626
* TimeStampMilliVector}.
2727
*/
2828
public class AvroTimestampMillisConsumer extends BaseAvroConsumer<TimeStampMilliVector> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.consumers.logical;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer;
21+
import org.apache.arrow.vector.TimeStampMilliTZVector;
22+
import org.apache.avro.io.Decoder;
23+
24+
/**
25+
* Consumer which consume timestamp-millis values from avro decoder. Write the data to {@link
26+
* TimeStampMilliTZVector}.
27+
*/
28+
public class AvroTimestampMillisTzConsumer extends BaseAvroConsumer<TimeStampMilliTZVector> {
29+
30+
/** Instantiate a AvroTimestampMillisTzConsumer. */
31+
public AvroTimestampMillisTzConsumer(TimeStampMilliTZVector vector) {
32+
super(vector);
33+
}
34+
35+
@Override
36+
public void consume(Decoder decoder) throws IOException {
37+
vector.set(currentIndex++, decoder.readLong());
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.consumers.logical;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer;
21+
import org.apache.arrow.vector.TimeStampNanoVector;
22+
import org.apache.avro.io.Decoder;
23+
24+
/**
25+
* Consumer which consume local-timestamp-nanos values from avro decoder. Write the data to {@link
26+
* TimeStampNanoVector}.
27+
*/
28+
public class AvroTimestampNanosConsumer extends BaseAvroConsumer<TimeStampNanoVector> {
29+
30+
/** Instantiate a AvroTimestampNanosConsumer. */
31+
public AvroTimestampNanosConsumer(TimeStampNanoVector vector) {
32+
super(vector);
33+
}
34+
35+
@Override
36+
public void consume(Decoder decoder) throws IOException {
37+
vector.set(currentIndex++, decoder.readLong());
38+
}
39+
}

0 commit comments

Comments
 (0)