diff --git a/docs/content.zh/docs/connectors/table/dynamodb.md b/docs/content.zh/docs/connectors/table/dynamodb.md index ada2a4632..d9daf424d 100644 --- a/docs/content.zh/docs/connectors/table/dynamodb.md +++ b/docs/content.zh/docs/connectors/table/dynamodb.md @@ -302,6 +302,46 @@ WITH ( ); ``` +## Specifying Primary Key For Deletes + +The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that a Dynamo Delete request contain **only** the key field(s), or else the Delete request will fail with `DynamoDbException: The provided key element does not match the schema`. +Thus, if a Changelog stream being is being written to DynamoDB that contains DELETEs, you must specify the `PRIMARY KEY` on the table. +This `PRIMARY KEY` specified for the Flink SQL Table must match the actual Primary Key of the DynamoDB table - so it must be either just the Partition Key, or in case of a composite primary key, it must be the Partition Key and Sort Key. + +Example For Partition Key as only Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Example For Partition Key and Sort Key as Composite Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id, item_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs. + ## Notice The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries. diff --git a/docs/content/docs/connectors/table/dynamodb.md b/docs/content/docs/connectors/table/dynamodb.md index e4952b220..e35677b23 100644 --- a/docs/content/docs/connectors/table/dynamodb.md +++ b/docs/content/docs/connectors/table/dynamodb.md @@ -303,6 +303,46 @@ WITH ( ); ``` +## Specifying Primary Key For Deletes + +The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that a Dynamo Delete request contain **only** the key field(s), or else the Delete request will fail with `DynamoDbException: The provided key element does not match the schema`. +Thus, if a Changelog stream being is being written to DynamoDB that contains DELETEs, you must specify the `PRIMARY KEY` on the table. +This `PRIMARY KEY` specified for the Flink SQL Table must match the actual Primary Key of the DynamoDB table - so it must be either just the Partition Key, or in case of a composite primary key, it must be the Partition Key and Sort Key. + +Example For Partition Key as only Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Example For Partition Key and Sort Key as Composite Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id, item_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs. + ## Notice The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries. diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java index d79cd786a..98bfc242e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java @@ -51,6 +51,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink overwriteByPartitionKeys; + private final Set primaryKeys; protected DynamoDbDynamicSink( @Nullable Integer maxBatchSize, @@ -62,7 +63,8 @@ protected DynamoDbDynamicSink( boolean failOnError, Properties dynamoDbClientProperties, DataType physicalDataType, - Set overwriteByPartitionKeys) { + Set overwriteByPartitionKeys, + Set primaryKeys) { super( maxBatchSize, maxInFlightRequests, @@ -74,6 +76,7 @@ protected DynamoDbDynamicSink( this.dynamoDbClientProperties = dynamoDbClientProperties; this.physicalDataType = physicalDataType; this.overwriteByPartitionKeys = overwriteByPartitionKeys; + this.primaryKeys = primaryKeys; } @Override @@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setFailOnError(failOnError) .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys)) .setDynamoDbProperties(dynamoDbClientProperties) - .setElementConverter(new RowDataElementConverter(physicalDataType)); + .setElementConverter( + new RowDataElementConverter(physicalDataType, primaryKeys)); addAsyncOptionsToSinkBuilder(builder); @@ -108,7 +112,8 @@ public DynamicTableSink copy() { failOnError, dynamoDbClientProperties, physicalDataType, - overwriteByPartitionKeys); + overwriteByPartitionKeys, + primaryKeys); } @Override @@ -136,6 +141,7 @@ public static class DynamoDbDynamicTableSinkBuilder private Properties dynamoDbClientProperties; private DataType physicalDataType; private Set overwriteByPartitionKeys; + private Set primaryKeys; public DynamoDbDynamicTableSinkBuilder setTableName(String tableName) { this.tableName = tableName; @@ -164,6 +170,11 @@ public DynamoDbDynamicTableSinkBuilder setOverwriteByPartitionKeys( return this; } + public DynamoDbDynamicTableSinkBuilder setPrimaryKeys(Set primaryKeys) { + this.primaryKeys = primaryKeys; + return this; + } + @Override public AsyncDynamicTableSink build() { return new DynamoDbDynamicSink( @@ -176,7 +187,8 @@ public AsyncDynamicTableSink build() { failOnError, dynamoDbClientProperties, physicalDataType, - overwriteByPartitionKeys); + overwriteByPartitionKeys, + primaryKeys); } } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java index 008828cc2..b68753b12 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java @@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context context) { .setDynamoDbClientProperties( dynamoDbConfiguration.getSinkClientProperties()); + if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) { + builder = + builder.setPrimaryKeys( + new HashSet<>( + catalogTable + .getResolvedSchema() + .getPrimaryKey() + .get() + .getColumns())); + } + addAsyncOptionsToBuilder(dynamoDbConfiguration.getAsyncSinkProperties(), builder); return builder.build(); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java index d0c4c90fd..9b17c13fb 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java @@ -27,6 +27,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import java.util.Set; + /** * Implementation of an {@link ElementConverter} for the DynamoDb Table sink. The element converter * maps the Flink internal type of {@link RowData} to a {@link DynamoDbWriteRequest} to be used by @@ -36,19 +38,21 @@ public class RowDataElementConverter implements ElementConverter { private final DataType physicalDataType; + private final Set primaryKeys; private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter; - public RowDataElementConverter(DataType physicalDataType) { + public RowDataElementConverter(DataType physicalDataType, Set primaryKeys) { this.physicalDataType = physicalDataType; + this.primaryKeys = primaryKeys; this.rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(physicalDataType); + new RowDataToAttributeValueConverter(physicalDataType, primaryKeys); } @Override public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) { if (rowDataToAttributeValueConverter == null) { rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(physicalDataType); + new RowDataToAttributeValueConverter(physicalDataType, primaryKeys); } DynamoDbWriteRequest.Builder builder = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java index 1b7a05e45..8db4c5cbc 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java @@ -21,10 +21,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.types.RowKind; import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; @@ -32,8 +34,11 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.flink.table.data.RowData.createFieldGetter; @@ -43,14 +48,39 @@ public class RowDataToAttributeValueConverter { private final DataType physicalDataType; private final TableSchema tableSchema; + private final Set primaryKeys; public RowDataToAttributeValueConverter(DataType physicalDataType) { + this(physicalDataType, Collections.emptySet()); + } + + public RowDataToAttributeValueConverter(DataType physicalDataType, Set primaryKeys) { this.physicalDataType = physicalDataType; + this.primaryKeys = primaryKeys; this.tableSchema = createTableSchema(); } public Map convertRowData(RowData row) { - return tableSchema.itemToMap(row, false); + Map itemMap = new HashMap<>(); + itemMap = tableSchema.itemToMap(row, false); + + // In case of DELETE, only the primary key field(s) should be sent in the request + // In order to accomplish this, we need PRIMARY KEY fields to have been set in Table + // definition. + if (row.getRowKind() == RowKind.DELETE) { + if (primaryKeys == null || primaryKeys.isEmpty()) { + throw new TableException( + "PRIMARY KEY on Table must be set for DynamoDB DELETE operation"); + } + Map pkOnlyMap = new HashMap(); + for (String key : primaryKeys) { + AttributeValue value = itemMap.get(key); + pkOnlyMap.put(key, value); + } + return pkOnlyMap; + } else { + return itemMap; + } } private StaticTableSchema createTableSchema() { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java index 1f92bf180..79db51176 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java @@ -33,6 +33,8 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Collections; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -45,10 +47,16 @@ public class RowDataElementConverterTest { DataTypes.FIELD("partition_key", DataTypes.STRING()), DataTypes.FIELD("payload", DataTypes.STRING())); private static final RowDataElementConverter elementConverter = - new RowDataElementConverter(DATA_TYPE); + new RowDataElementConverter(DATA_TYPE, null); private static final SinkWriter.Context context = new UnusedSinkWriterContext(); private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(DATA_TYPE); + new RowDataToAttributeValueConverter(DATA_TYPE, null); + + private static final Set primaryKeys = Collections.singleton("partition_key"); + private static final RowDataElementConverter elementConverterWithPK = + new RowDataElementConverter(DATA_TYPE, primaryKeys); + private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverterWithPK = + new RowDataToAttributeValueConverter(DATA_TYPE, primaryKeys); @Test void testInsert() { @@ -91,11 +99,12 @@ void testUpdateBeforeIsUnsupported() { @Test void testDelete() { RowData rowData = createElement(RowKind.DELETE); - DynamoDbWriteRequest actualWriteRequest = elementConverter.apply(rowData, context); + // In case of DELETE, a set of Primary Key(s) is required. + DynamoDbWriteRequest actualWriteRequest = elementConverterWithPK.apply(rowData, context); DynamoDbWriteRequest expectedWriterequest = DynamoDbWriteRequest.builder() .setType(DynamoDbWriteRequestType.DELETE) - .setItem(rowDataToAttributeValueConverter.convertRowData(rowData)) + .setItem(rowDataToAttributeValueConverterWithPK.convertRowData(rowData)) .build(); assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest); @@ -106,7 +115,7 @@ void testAttributeConverterReinitializedAfterSerialization() throws IOException, ClassNotFoundException { RowData rowData = createElement(RowKind.INSERT); - RowDataElementConverter originalConverter = new RowDataElementConverter(DATA_TYPE); + RowDataElementConverter originalConverter = new RowDataElementConverter(DATA_TYPE, null); RowDataElementConverter transformedConverter = InstantiationUtil.deserializeObject( InstantiationUtil.serializeObject(originalConverter), diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index 5566ae103..4ec61ff19 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.dynamodb.table; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericRowData; @@ -26,6 +27,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -34,12 +36,17 @@ import java.time.Instant; import java.time.LocalDateTime; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; /** Test for {@link RowDataToAttributeValueConverter}. */ public class RowDataToAttributeValueConverterTest { @@ -547,12 +554,176 @@ void testInstantArray() { assertThat(actualResult).containsAllEntriesOf(expectedResult); } + @Test + void testDeleteExceptionWhenNoPrimaryKey() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = Collections.emptySet(); + + // Create a Row with two fields - "key" and "otherField". "key" is the single primary key. + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + assertThrows( + TableException.class, + () -> { + rowDataToAttributeValueConverter.convertRowData(rowData); + }); + } + + void testDeleteOnlyPrimaryKey() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = new HashSet<>(Collections.singletonList(key)); + + // Create a Row with two fields - "key" and "otherField". "key" is the single primary key. + // For a Delete request, only "key" should be included in the expectedResult, and not + // "otherField". + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = + singletonMap(key, AttributeValue.builder().s(value).build()); + + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); + } + + @Test + void testDeleteOnlyPrimaryKeys() { + String key = "key"; + String value = "some_string"; + String additionalKey = "additional_key"; + String additionalValue = "additional_value"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = new HashSet<>(); + primaryKeys.add(key); + primaryKeys.add(additionalKey); + + // Create a Row with three fields - "key", "additional_key", and "otherField". + // "key" and "additional_key" make up the composite primary key. + // For a Delete request, only "key" and "additional_key" should be included in the + // expectedResult, and not "otherField". + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(additionalKey, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), + StringData.fromString(additionalValue), + StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(additionalKey, AttributeValue.builder().s(additionalValue).build()); + + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); + } + + @Test + void testPKIgnoredForInsert() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = new HashSet<>(Collections.singletonList(key)); + + // Create a Row with two fields - "key" and "otherField". "key" is the primary key. + // For an Insert request, all fields should be included regardless of the Primary Key. + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.INSERT); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build()); + + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); + } + + @Test + void testPKIgnoredForUpdateAfter() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = Collections.singleton(key); + + // Create a Row with two fields - "key" and "otherField". "key" is the primary key. + // For an UPDATE_BEFORE request, all fields should be included regardless of the Primary + // Key. + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.UPDATE_AFTER); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build()); + + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); + } + private RowData createElement(Object value) { GenericRowData element = new GenericRowData(1); element.setField(0, value); return element; } + private RowData createElementWithMultipleFields(Object... values) { + GenericRowData element = new GenericRowData(values.length); + for (int i = 0; i < values.length; i++) { + element.setField(i, values[i]); + } + return element; + } + private RowData createArray(T[] value, Function elementConverter) { return createElement( new GenericArrayData(Arrays.stream(value).map(elementConverter).toArray()));