Skip to content

Commit

Permalink
update multi versions
Browse files Browse the repository at this point in the history
fix checkstyle
  • Loading branch information
XBaith committed Feb 21, 2025
1 parent d9679ad commit c9dfece
Show file tree
Hide file tree
Showing 23 changed files with 193 additions and 70 deletions.
27 changes: 27 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,33 @@ acceptedBreaks:
old: "method void org.apache.iceberg.MetadataUpdate.SetStatistics::<init>(long,\
\ org.apache.iceberg.StatisticsFile)"
justification: "Removing deprecated code"
org.apache.iceberg:iceberg-data:
- code: "java.method.numberOfParametersChanged"
old: "method void org.apache.iceberg.data.GenericAppenderFactory::<init>(org.apache.iceberg.Schema,\
\ org.apache.iceberg.PartitionSpec, int[], org.apache.iceberg.Schema, org.apache.iceberg.Schema)"
new: "method void org.apache.iceberg.data.GenericAppenderFactory::<init>(org.apache.iceberg.Table,\
\ int[], org.apache.iceberg.Schema, org.apache.iceberg.Schema)"
justification: "The previous API should be deprecated because, by default, it\
\ prevents table properties from being used by the metric config."
- code: "java.method.parameterTypeChanged"
old: "parameter void org.apache.iceberg.data.GenericAppenderFactory::<init>(===org.apache.iceberg.Schema===)"
new: "parameter void org.apache.iceberg.data.GenericAppenderFactory::<init>(===org.apache.iceberg.Table===)"
justification: "The previous API should be deprecated because, by default, it\
\ prevents table properties from being used by the metric config."
- code: "java.method.parameterTypeChanged"
old: "parameter void org.apache.iceberg.data.GenericAppenderFactory::<init>(===org.apache.iceberg.Schema===,\
\ org.apache.iceberg.PartitionSpec)"
new: "parameter void org.apache.iceberg.data.GenericAppenderFactory::<init>(===org.apache.iceberg.Table===,\
\ org.apache.iceberg.Schema)"
justification: "The previous API should be deprecated because, by default, it\
\ prevents table properties from being used by the metric config."
- code: "java.method.parameterTypeChanged"
old: "parameter void org.apache.iceberg.data.GenericAppenderFactory::<init>(org.apache.iceberg.Schema,\
\ ===org.apache.iceberg.PartitionSpec===)"
new: "parameter void org.apache.iceberg.data.GenericAppenderFactory::<init>(org.apache.iceberg.Table,\
\ ===org.apache.iceberg.Schema===)"
justification: "The previous API should be deprecated because, by default, it\
\ prevents table properties from being used by the metric config."
org.apache.iceberg:iceberg-parquet:
- code: "java.class.visibilityReduced"
old: "class org.apache.iceberg.data.parquet.BaseParquetReaders<T extends java.lang.Object>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
@TestTemplate
public void testRewriteAvoidRepeateCompress() throws IOException {
List<Record> expected = Lists.newArrayList();
Schema schema = icebergTableUnPartitioned.schema();
GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
GenericAppenderFactory genericAppenderFactory =
new GenericAppenderFactory(icebergTableUnPartitioned);
File file = File.createTempFile("junit", null, temp.toFile());
int count = 0;
try (FileAppender<Record> fileAppender =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
Expand All @@ -32,6 +33,7 @@
import java.util.stream.IntStream;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.RandomGenericData;
Expand All @@ -42,15 +44,28 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase {
public static final Schema SCHEMA =
new Schema(required(1, "timestamp_column", Types.TimestampType.withoutZone()));
private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);

@TempDir private static File tableLocation;

private static GenericAppenderFactory appenderFactory;

@BeforeAll
public static void beforeAll() {
HadoopTables tables = new HadoopTables();
Table table = tables.create(SCHEMA, tableLocation.toPath().toString());
appenderFactory = new GenericAppenderFactory(table);
}

@Override
protected SplitAssigner splitAssigner() {
Expand Down Expand Up @@ -138,7 +153,7 @@ private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
try {
return IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(
records, temporaryFolder, FileFormat.PARQUET, APPENDER_FACTORY));
records, temporaryFolder, FileFormat.PARQUET, appenderFactory));
} catch (IOException e) {
throw new RuntimeException("Split creation exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -52,7 +54,7 @@ public static Object[][] parameters() {
};
}

@TempDir protected Path temporaryFolder;
@TempDir protected static Path temporaryFolder;

protected abstract ReaderFunction<T> readerFunction();

Expand All @@ -61,8 +63,14 @@ public static Object[][] parameters() {
@Parameter(index = 0)
private FileFormat fileFormat;

private final GenericAppenderFactory appenderFactory =
new GenericAppenderFactory(TestFixtures.SCHEMA);
private static GenericAppenderFactory appenderFactory;

@BeforeAll
public static void beforeAll() {
HadoopTables tables = new HadoopTables();
appenderFactory =
new GenericAppenderFactory(tables.create(TestFixtures.SCHEMA, temporaryFolder.toString()));
}

private void assertRecordsAndPosition(
List<Record> expectedRecords,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,45 @@
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.io.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class TestArrayPoolDataIteratorBatcherRowData {

@RegisterExtension
private static final HadoopCatalogExtension CATALOG_EXTENSION =
new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);

@TempDir protected Path temporaryFolder;
private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
private final Configuration config =
new Configuration()
.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1)
.set(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 2);

private final GenericAppenderFactory appenderFactory =
new GenericAppenderFactory(TestFixtures.SCHEMA);
private static GenericAppenderFactory appenderFactory;
private final DataIteratorBatcher<RowData> batcher =
new ArrayPoolDataIteratorBatcher<>(config, new RowDataRecordFactory(TestFixtures.ROW_TYPE));

@BeforeEach
public void createTable() {
Table testTable =
CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
appenderFactory = new GenericAppenderFactory(testTable);
}

/** Read a CombinedScanTask that contains a single file with less than a full batch of records */
@Test
public void testSingleFileLessThanOneFullBatch() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.iceberg.flink.source.reader;

import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.LocalDateTime;
Expand All @@ -38,19 +38,18 @@
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableExtension;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
Expand All @@ -62,7 +61,10 @@ public class TestColumnStatsWatermarkExtractor {
required(3, "long_column", Types.LongType.get()),
required(4, "string_column", Types.StringType.get()));

private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);
@TempDir protected static File tableDir = null;
private static final HadoopTables TABLES = new HadoopTables();

private static GenericAppenderFactory appenderFactory;

private static final List<List<Record>> TEST_RECORDS =
ImmutableList.of(
Expand All @@ -73,15 +75,13 @@ public class TestColumnStatsWatermarkExtractor {

@TempDir protected Path temporaryFolder;

@RegisterExtension
private static final HadoopTableExtension SOURCE_TABLE_EXTENSION =
new HadoopTableExtension(DATABASE, TestFixtures.TABLE, SCHEMA);

@Parameter(index = 0)
private String columnName;

@BeforeAll
public static void updateMinValue() {
Table testTable = TABLES.create(SCHEMA, tableDir.toString());
appenderFactory = new GenericAppenderFactory(testTable);
for (int i = 0; i < TEST_RECORDS.size(); ++i) {
for (Record r : TEST_RECORDS.get(i)) {
Map<String, Long> minValues = MIN_VALUES.get(i);
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testMultipleFiles() throws IOException {
IcebergSourceSplit combinedSplit =
IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(
TEST_RECORDS, temporaryFolder, FileFormat.PARQUET, APPENDER_FACTORY));
TEST_RECORDS, temporaryFolder, FileFormat.PARQUET, appenderFactory));

ColumnStatsWatermarkExtractor extractor =
new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null);
Expand Down Expand Up @@ -171,6 +171,6 @@ private IcebergSourceSplit split(int id) throws IOException {
ImmutableList.of(TEST_RECORDS.get(id)),
temporaryFolder,
FileFormat.PARQUET,
APPENDER_FACTORY));
appenderFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
Expand All @@ -40,14 +41,22 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestIcebergSourceReader {
@TempDir protected Path temporaryFolder;
@TempDir protected static Path temporaryFolder;

private final GenericAppenderFactory appenderFactory =
new GenericAppenderFactory(TestFixtures.SCHEMA);
private static GenericAppenderFactory appenderFactory = null;

@BeforeAll
public static void beforeAll() {
HadoopTables tables = new HadoopTables();
Table testTable = tables.create(TestFixtures.SCHEMA, temporaryFolder.toFile().toString());
appenderFactory = new GenericAppenderFactory(testTable);
}

@Test
public void testReaderMetrics() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -53,7 +55,9 @@ public class TestLimitableDataIterator {

@BeforeAll
public static void beforeClass() throws Exception {
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
HadoopTables tables = new HadoopTables();
Table testTable = tables.create(TestFixtures.SCHEMA, TestFixtures.TABLE);
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(testTable);
List<List<Record>> recordBatchList =
ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
combinedScanTask =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
@TestTemplate
public void testRewriteAvoidRepeateCompress() throws IOException {
List<Record> expected = Lists.newArrayList();
Schema schema = icebergTableUnPartitioned.schema();
GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
GenericAppenderFactory genericAppenderFactory =
new GenericAppenderFactory(icebergTableUnPartitioned);
File file = File.createTempFile("junit", null, temp.toFile());
int count = 0;
try (FileAppender<Record> fileAppender =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
Expand All @@ -32,6 +33,7 @@
import java.util.stream.IntStream;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.RandomGenericData;
Expand All @@ -42,15 +44,28 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase {
public static final Schema SCHEMA =
new Schema(required(1, "timestamp_column", Types.TimestampType.withoutZone()));
private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);

@TempDir private static File tableLocation;

private static GenericAppenderFactory appenderFactory;

@BeforeAll
public static void beforeAll() {
HadoopTables tables = new HadoopTables();
Table table = tables.create(SCHEMA, tableLocation.toPath().toString());
appenderFactory = new GenericAppenderFactory(table);
}

@Override
protected SplitAssigner splitAssigner() {
Expand Down Expand Up @@ -138,7 +153,7 @@ private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
try {
return IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(
records, temporaryFolder, FileFormat.PARQUET, APPENDER_FACTORY));
records, temporaryFolder, FileFormat.PARQUET, appenderFactory));
} catch (IOException e) {
throw new RuntimeException("Split creation exception", e);
}
Expand Down
Loading

0 comments on commit c9dfece

Please sign in to comment.