From 07e57461ba2432ae59c9b6376205da5ef284ae9b Mon Sep 17 00:00:00 2001 From: Rob Goretsky Date: Wed, 24 Jul 2024 15:47:24 -0400 Subject: [PATCH 1/6] [FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found --- .../dynamodb/table/DynamoDbDynamicSink.java | 19 +- .../table/DynamoDbDynamicSinkFactory.java | 4 + .../table/RowDataElementConverter.java | 10 +- .../RowDataToAttributeValueConverter.java | 29 ++- .../table/RowDataElementConverterTest.java | 20 +- .../RowDataToAttributeValueConverterTest.java | 171 +++++++++++++++--- 6 files changed, 216 insertions(+), 37 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java index d79cd786a..995333656 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java @@ -51,6 +51,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink overwriteByPartitionKeys; + private final Set primaryKeys; protected DynamoDbDynamicSink( @Nullable Integer maxBatchSize, @@ -62,7 +63,8 @@ protected DynamoDbDynamicSink( boolean failOnError, Properties dynamoDbClientProperties, DataType physicalDataType, - Set overwriteByPartitionKeys) { + Set overwriteByPartitionKeys, + Set primaryKeys) { super( maxBatchSize, maxInFlightRequests, @@ -74,6 +76,7 @@ protected DynamoDbDynamicSink( this.dynamoDbClientProperties = dynamoDbClientProperties; this.physicalDataType = physicalDataType; this.overwriteByPartitionKeys = overwriteByPartitionKeys; + this.primaryKeys = primaryKeys; } @Override @@ -89,7 +92,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setFailOnError(failOnError) .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys)) .setDynamoDbProperties(dynamoDbClientProperties) - .setElementConverter(new RowDataElementConverter(physicalDataType)); + .setElementConverter(new RowDataElementConverter(physicalDataType, primaryKeys)); addAsyncOptionsToSinkBuilder(builder); @@ -108,7 +111,8 @@ public DynamicTableSink copy() { failOnError, dynamoDbClientProperties, physicalDataType, - overwriteByPartitionKeys); + overwriteByPartitionKeys, + primaryKeys); } @Override @@ -136,6 +140,7 @@ public static class DynamoDbDynamicTableSinkBuilder private Properties dynamoDbClientProperties; private DataType physicalDataType; private Set overwriteByPartitionKeys; + private Set primaryKeys; public DynamoDbDynamicTableSinkBuilder setTableName(String tableName) { this.tableName = tableName; @@ -164,6 +169,11 @@ public DynamoDbDynamicTableSinkBuilder setOverwriteByPartitionKeys( return this; } + public DynamoDbDynamicTableSinkBuilder setPrimaryKeys(Set primaryKeys) { + this.primaryKeys = primaryKeys; + return this; + } + @Override public AsyncDynamicTableSink build() { return new DynamoDbDynamicSink( @@ -176,7 +186,8 @@ public AsyncDynamicTableSink build() { failOnError, dynamoDbClientProperties, physicalDataType, - overwriteByPartitionKeys); + overwriteByPartitionKeys, + primaryKeys); } } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java index 008828cc2..5aa9bac9b 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java @@ -58,6 +58,10 @@ public DynamicTableSink createDynamicTableSink(Context context) { .setDynamoDbClientProperties( dynamoDbConfiguration.getSinkClientProperties()); + if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()){ + builder = builder.setPrimaryKeys(new HashSet<>(catalogTable.getResolvedSchema().getPrimaryKey().get().getColumns())); + } + addAsyncOptionsToBuilder(dynamoDbConfiguration.getAsyncSinkProperties(), builder); return builder.build(); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java index d0c4c90fd..9b17c13fb 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java @@ -27,6 +27,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import java.util.Set; + /** * Implementation of an {@link ElementConverter} for the DynamoDb Table sink. The element converter * maps the Flink internal type of {@link RowData} to a {@link DynamoDbWriteRequest} to be used by @@ -36,19 +38,21 @@ public class RowDataElementConverter implements ElementConverter { private final DataType physicalDataType; + private final Set primaryKeys; private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter; - public RowDataElementConverter(DataType physicalDataType) { + public RowDataElementConverter(DataType physicalDataType, Set primaryKeys) { this.physicalDataType = physicalDataType; + this.primaryKeys = primaryKeys; this.rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(physicalDataType); + new RowDataToAttributeValueConverter(physicalDataType, primaryKeys); } @Override public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) { if (rowDataToAttributeValueConverter == null) { rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(physicalDataType); + new RowDataToAttributeValueConverter(physicalDataType, primaryKeys); } DynamoDbWriteRequest.Builder builder = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java index 1b7a05e45..0fbc485ce 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java @@ -21,10 +21,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.types.RowKind; import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; @@ -32,8 +34,10 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.flink.table.data.RowData.createFieldGetter; @@ -43,14 +47,35 @@ public class RowDataToAttributeValueConverter { private final DataType physicalDataType; private final TableSchema tableSchema; + private final Set primaryKeys; - public RowDataToAttributeValueConverter(DataType physicalDataType) { + public RowDataToAttributeValueConverter(DataType physicalDataType, Set primaryKeys) { this.physicalDataType = physicalDataType; + this.primaryKeys = primaryKeys; this.tableSchema = createTableSchema(); } public Map convertRowData(RowData row) { - return tableSchema.itemToMap(row, false); + Map itemMap = new HashMap<>(); + itemMap = tableSchema.itemToMap(row, false); + + // In case of DELETE, only the primary key field(s) should be sent in the request + // In order to accomplish this, we need PRIMARY KEY fields to have been set in Table definition. + if (row.getRowKind() == RowKind.DELETE){ + if (primaryKeys == null || primaryKeys.isEmpty()) { + throw new TableException("PRIMARY KEY on Table must be set for DynamoDB DELETE operation"); + } + Map pkOnlyMap = new HashMap(); + for (String key : primaryKeys) { + AttributeValue value = itemMap.get(key); + pkOnlyMap.put(key, value); + } + return pkOnlyMap; + } + else { + return itemMap; + } + } private StaticTableSchema createTableSchema() { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java index 1f92bf180..998cfd7be 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java @@ -33,6 +33,9 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -45,10 +48,16 @@ public class RowDataElementConverterTest { DataTypes.FIELD("partition_key", DataTypes.STRING()), DataTypes.FIELD("payload", DataTypes.STRING())); private static final RowDataElementConverter elementConverter = - new RowDataElementConverter(DATA_TYPE); + new RowDataElementConverter(DATA_TYPE, null); private static final SinkWriter.Context context = new UnusedSinkWriterContext(); private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(DATA_TYPE); + new RowDataToAttributeValueConverter(DATA_TYPE, null); + + private static final Set primaryKeys = new HashSet<>(Collections.singletonList("partition_key")); + private static final RowDataElementConverter elementConverterWithPK = + new RowDataElementConverter(DATA_TYPE, primaryKeys); + private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverterWithPK = + new RowDataToAttributeValueConverter(DATA_TYPE, primaryKeys); @Test void testInsert() { @@ -91,11 +100,12 @@ void testUpdateBeforeIsUnsupported() { @Test void testDelete() { RowData rowData = createElement(RowKind.DELETE); - DynamoDbWriteRequest actualWriteRequest = elementConverter.apply(rowData, context); + // In case of DELETE, a set of Primary Key(s) is required. + DynamoDbWriteRequest actualWriteRequest = elementConverterWithPK.apply(rowData, context); DynamoDbWriteRequest expectedWriterequest = DynamoDbWriteRequest.builder() .setType(DynamoDbWriteRequestType.DELETE) - .setItem(rowDataToAttributeValueConverter.convertRowData(rowData)) + .setItem(rowDataToAttributeValueConverterWithPK.convertRowData(rowData)) .build(); assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest); @@ -106,7 +116,7 @@ void testAttributeConverterReinitializedAfterSerialization() throws IOException, ClassNotFoundException { RowData rowData = createElement(RowKind.INSERT); - RowDataElementConverter originalConverter = new RowDataElementConverter(DATA_TYPE); + RowDataElementConverter originalConverter = new RowDataElementConverter(DATA_TYPE, null); RowDataElementConverter transformedConverter = InstantiationUtil.deserializeObject( InstantiationUtil.serializeObject(originalConverter), diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index 5566ae103..d81c428f1 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -34,7 +35,11 @@ import java.time.Instant; import java.time.LocalDateTime; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -51,7 +56,7 @@ void testChar() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.CHAR(9))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); @@ -68,7 +73,7 @@ void testVarChar() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.VARCHAR(13))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); @@ -85,7 +90,7 @@ void testString() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); @@ -102,7 +107,7 @@ void testBoolean() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.BOOLEAN())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -118,7 +123,7 @@ void testDecimal() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.DECIMAL(5, 4))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(DecimalData.fromBigDecimal(value, 5, 4))); @@ -135,7 +140,7 @@ void testTinyInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.TINYINT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -151,7 +156,7 @@ void testSmallInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.SMALLINT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -167,7 +172,7 @@ void testInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.INT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -183,7 +188,7 @@ void testBigInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.BIGINT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -199,7 +204,7 @@ void testFloat() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.FLOAT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -215,7 +220,7 @@ void testDouble() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.DOUBLE())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -231,7 +236,7 @@ void testTimestamp() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.TIMESTAMP())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(TimestampData.fromLocalDateTime(value))); @@ -249,7 +254,7 @@ void testStringArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.STRING()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, StringData::fromString)); @@ -274,7 +279,7 @@ void testBooleanArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.BOOLEAN()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -302,7 +307,7 @@ void testBigDecimalArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.DECIMAL(1, 0)))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, d -> DecimalData.fromBigDecimal(d, 1, 0))); @@ -331,7 +336,7 @@ void testByteArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.TINYINT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -359,7 +364,7 @@ void testShortArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.SMALLINT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -386,7 +391,7 @@ void testIntegerArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.INT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -414,7 +419,7 @@ void testLongArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.BIGINT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -441,7 +446,7 @@ void testFloatArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.FLOAT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -469,7 +474,7 @@ void testDoubleArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.DOUBLE()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -497,7 +502,7 @@ void testLocalDateTimeArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.TIMESTAMP()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, TimestampData::fromLocalDateTime)); @@ -526,7 +531,7 @@ void testInstantArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType); + new RowDataToAttributeValueConverter(dataType, null); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, TimestampData::fromInstant)); @@ -547,12 +552,132 @@ void testInstantArray() { assertThat(actualResult).containsAllEntriesOf(expectedResult); } + @Test + void testDeleteOnlyPrimaryKey() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = new HashSet<>(Collections.singletonList(key)); + + // Create a Row with two fields - "key" and "otherField". "key" is the single primary key. + // For a Delete request, only "key" should be included in the expectedResult, and not "otherField". + DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = + singletonMap(key, AttributeValue.builder().s(value).build()); + + assertThat(actualResult).containsAllEntriesOf(expectedResult); + assertThat(expectedResult).containsAllEntriesOf(actualResult); + } + + @Test + void testDeleteOnlyPrimaryKeys() { + String key = "key"; + String value = "some_string"; + String additionalKey = "additional_key"; + String additionalValue = "additional_value"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = new HashSet<>(); + primaryKeys.add(key); + primaryKeys.add(additionalKey); + + // Create a Row with three fields - "key", "additional_key", and "otherField". + // "key" and "additional_key" make up the composite primary key. + // For a Delete request, only "key" and "additional_key" should be included in the expectedResult, and not "otherField". + DataType dataType = DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(additionalKey, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(additionalValue), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(additionalKey, AttributeValue.builder().s(additionalValue).build()); + + assertThat(actualResult).containsAllEntriesOf(expectedResult); + assertThat(expectedResult).containsAllEntriesOf(actualResult); + } + + @Test + void testPKIgnoredForInsert() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = new HashSet<>(Collections.singletonList(key)); + + // Create a Row with two fields - "key" and "otherField". "key" is the primary key. + // For an Insert request, all fields should be included regardless of the Primary Key. + DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.INSERT); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build()); + + assertThat(actualResult).containsAllEntriesOf(expectedResult); + assertThat(expectedResult).containsAllEntriesOf(actualResult); + } + + @Test + void testPKIgnoredForUpdateAfter() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = new HashSet<>(Collections.singletonList(key)); + + // Create a Row with two fields - "key" and "otherField". "key" is the primary key. + // For an UPDATE_BEFORE request, all fields should be included regardless of the Primary Key. + DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.UPDATE_AFTER); + + Map actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build()); + + assertThat(actualResult).containsAllEntriesOf(expectedResult); + assertThat(expectedResult).containsAllEntriesOf(actualResult); + } + private RowData createElement(Object value) { GenericRowData element = new GenericRowData(1); element.setField(0, value); return element; } + private RowData createElementWithMultipleFields(Object... values) { + GenericRowData element = new GenericRowData(values.length); + for (int i = 0; i < values.length; i++) { + element.setField(i, values[i]); + } + return element; + } + private RowData createArray(T[] value, Function elementConverter) { return createElement( new GenericArrayData(Arrays.stream(value).map(elementConverter).toArray())); From 68f12537c0e34f795216debe1dfa0a2efcb3af08 Mon Sep 17 00:00:00 2001 From: Rob Goretsky Date: Wed, 24 Jul 2024 16:51:47 -0400 Subject: [PATCH 2/6] Add documenation updates --- .../docs/connectors/table/dynamodb.md | 40 +++++++++++++++++++ .../content/docs/connectors/table/dynamodb.md | 40 +++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/docs/content.zh/docs/connectors/table/dynamodb.md b/docs/content.zh/docs/connectors/table/dynamodb.md index ada2a4632..d9daf424d 100644 --- a/docs/content.zh/docs/connectors/table/dynamodb.md +++ b/docs/content.zh/docs/connectors/table/dynamodb.md @@ -302,6 +302,46 @@ WITH ( ); ``` +## Specifying Primary Key For Deletes + +The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that a Dynamo Delete request contain **only** the key field(s), or else the Delete request will fail with `DynamoDbException: The provided key element does not match the schema`. +Thus, if a Changelog stream being is being written to DynamoDB that contains DELETEs, you must specify the `PRIMARY KEY` on the table. +This `PRIMARY KEY` specified for the Flink SQL Table must match the actual Primary Key of the DynamoDB table - so it must be either just the Partition Key, or in case of a composite primary key, it must be the Partition Key and Sort Key. + +Example For Partition Key as only Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Example For Partition Key and Sort Key as Composite Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id, item_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs. + ## Notice The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries. diff --git a/docs/content/docs/connectors/table/dynamodb.md b/docs/content/docs/connectors/table/dynamodb.md index e4952b220..e35677b23 100644 --- a/docs/content/docs/connectors/table/dynamodb.md +++ b/docs/content/docs/connectors/table/dynamodb.md @@ -303,6 +303,46 @@ WITH ( ); ``` +## Specifying Primary Key For Deletes + +The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that a Dynamo Delete request contain **only** the key field(s), or else the Delete request will fail with `DynamoDbException: The provided key element does not match the schema`. +Thus, if a Changelog stream being is being written to DynamoDB that contains DELETEs, you must specify the `PRIMARY KEY` on the table. +This `PRIMARY KEY` specified for the Flink SQL Table must match the actual Primary Key of the DynamoDB table - so it must be either just the Partition Key, or in case of a composite primary key, it must be the Partition Key and Sort Key. + +Example For Partition Key as only Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Example For Partition Key and Sort Key as Composite Primary Key: +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING, + PRIMARY KEY (user_id, item_id) NOT ENFORCED +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs. + ## Notice The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries. From aa030f3bed7b937b9de78e80543b8a31b7b8abc0 Mon Sep 17 00:00:00 2001 From: Rob Goretsky Date: Thu, 1 Aug 2024 08:54:38 -0400 Subject: [PATCH 3/6] Apply spotless formatting updates --- .../dynamodb/table/DynamoDbDynamicSink.java | 3 +- .../table/DynamoDbDynamicSinkFactory.java | 11 +++- .../RowDataToAttributeValueConverter.java | 30 +++++------ .../table/RowDataElementConverterTest.java | 3 +- .../RowDataToAttributeValueConverterTest.java | 52 +++++++++++++------ 5 files changed, 65 insertions(+), 34 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java index 995333656..98bfc242e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java @@ -92,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setFailOnError(failOnError) .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys)) .setDynamoDbProperties(dynamoDbClientProperties) - .setElementConverter(new RowDataElementConverter(physicalDataType, primaryKeys)); + .setElementConverter( + new RowDataElementConverter(physicalDataType, primaryKeys)); addAsyncOptionsToSinkBuilder(builder); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java index 5aa9bac9b..b68753b12 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java @@ -58,8 +58,15 @@ public DynamicTableSink createDynamicTableSink(Context context) { .setDynamoDbClientProperties( dynamoDbConfiguration.getSinkClientProperties()); - if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()){ - builder = builder.setPrimaryKeys(new HashSet<>(catalogTable.getResolvedSchema().getPrimaryKey().get().getColumns())); + if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) { + builder = + builder.setPrimaryKeys( + new HashSet<>( + catalogTable + .getResolvedSchema() + .getPrimaryKey() + .get() + .getColumns())); } addAsyncOptionsToBuilder(dynamoDbConfiguration.getAsyncSinkProperties(), builder); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java index 0fbc485ce..9b9cc635c 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java @@ -60,22 +60,22 @@ public Map convertRowData(RowData row) { itemMap = tableSchema.itemToMap(row, false); // In case of DELETE, only the primary key field(s) should be sent in the request - // In order to accomplish this, we need PRIMARY KEY fields to have been set in Table definition. - if (row.getRowKind() == RowKind.DELETE){ - if (primaryKeys == null || primaryKeys.isEmpty()) { - throw new TableException("PRIMARY KEY on Table must be set for DynamoDB DELETE operation"); - } - Map pkOnlyMap = new HashMap(); - for (String key : primaryKeys) { - AttributeValue value = itemMap.get(key); - pkOnlyMap.put(key, value); - } - return pkOnlyMap; - } - else { - return itemMap; + // In order to accomplish this, we need PRIMARY KEY fields to have been set in Table + // definition. + if (row.getRowKind() == RowKind.DELETE) { + if (primaryKeys == null || primaryKeys.isEmpty()) { + throw new TableException( + "PRIMARY KEY on Table must be set for DynamoDB DELETE operation"); + } + Map pkOnlyMap = new HashMap(); + for (String key : primaryKeys) { + AttributeValue value = itemMap.get(key); + pkOnlyMap.put(key, value); + } + return pkOnlyMap; + } else { + return itemMap; } - } private StaticTableSchema createTableSchema() { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java index 998cfd7be..f02d5de11 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java @@ -53,7 +53,8 @@ public class RowDataElementConverterTest { private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverter = new RowDataToAttributeValueConverter(DATA_TYPE, null); - private static final Set primaryKeys = new HashSet<>(Collections.singletonList("partition_key")); + private static final Set primaryKeys = + new HashSet<>(Collections.singletonList("partition_key")); private static final RowDataElementConverter elementConverterWithPK = new RowDataElementConverter(DATA_TYPE, primaryKeys); private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverterWithPK = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index d81c428f1..a7817085a 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -561,11 +561,17 @@ void testDeleteOnlyPrimaryKey() { Set primaryKeys = new HashSet<>(Collections.singletonList(key)); // Create a Row with two fields - "key" and "otherField". "key" is the single primary key. - // For a Delete request, only "key" should be included in the expectedResult, and not "otherField". - DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + // For a Delete request, only "key" should be included in the expectedResult, and not + // "otherField". + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = new RowDataToAttributeValueConverter(dataType, primaryKeys); - RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); rowData.setRowKind(RowKind.DELETE); Map actualResult = @@ -591,15 +597,20 @@ void testDeleteOnlyPrimaryKeys() { // Create a Row with three fields - "key", "additional_key", and "otherField". // "key" and "additional_key" make up the composite primary key. - // For a Delete request, only "key" and "additional_key" should be included in the expectedResult, and not "otherField". - DataType dataType = DataTypes.ROW( - DataTypes.FIELD(key, DataTypes.STRING()), - DataTypes.FIELD(additionalKey, DataTypes.STRING()), - DataTypes.FIELD(otherField, DataTypes.STRING())); + // For a Delete request, only "key" and "additional_key" should be included in the + // expectedResult, and not "otherField". + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(additionalKey, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = new RowDataToAttributeValueConverter(dataType, primaryKeys); - RowData rowData = createElementWithMultipleFields( - StringData.fromString(value), StringData.fromString(additionalValue), StringData.fromString(otherValue)); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), + StringData.fromString(additionalValue), + StringData.fromString(otherValue)); rowData.setRowKind(RowKind.DELETE); Map actualResult = @@ -622,10 +633,15 @@ void testPKIgnoredForInsert() { // Create a Row with two fields - "key" and "otherField". "key" is the primary key. // For an Insert request, all fields should be included regardless of the Primary Key. - DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = new RowDataToAttributeValueConverter(dataType, primaryKeys); - RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); rowData.setRowKind(RowKind.INSERT); Map actualResult = @@ -647,11 +663,17 @@ void testPKIgnoredForUpdateAfter() { Set primaryKeys = new HashSet<>(Collections.singletonList(key)); // Create a Row with two fields - "key" and "otherField". "key" is the primary key. - // For an UPDATE_BEFORE request, all fields should be included regardless of the Primary Key. - DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + // For an UPDATE_BEFORE request, all fields should be included regardless of the Primary + // Key. + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = new RowDataToAttributeValueConverter(dataType, primaryKeys); - RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); rowData.setRowKind(RowKind.UPDATE_AFTER); Map actualResult = From 5372665be78e3fbba69ffd35263f307e9e58192a Mon Sep 17 00:00:00 2001 From: Rob Goretsky Date: Thu, 1 Aug 2024 11:12:35 -0400 Subject: [PATCH 4/6] Use overriden constructor, add test for exception --- .../RowDataToAttributeValueConverter.java | 7 +++++ .../RowDataToAttributeValueConverterTest.java | 26 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java index 9b9cc635c..3dc5b6def 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java @@ -34,6 +34,7 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,6 +50,12 @@ public class RowDataToAttributeValueConverter { private final TableSchema tableSchema; private final Set primaryKeys; + public RowDataToAttributeValueConverter(DataType physicalDataType) { + this.physicalDataType = physicalDataType; + this.primaryKeys = Collections.emptySet(); + this.tableSchema = createTableSchema(); + } + public RowDataToAttributeValueConverter(DataType physicalDataType, Set primaryKeys) { this.physicalDataType = physicalDataType; this.primaryKeys = primaryKeys; diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index a7817085a..6ee458763 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.dynamodb.table; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericRowData; @@ -45,6 +46,7 @@ import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; /** Test for {@link RowDataToAttributeValueConverter}. */ public class RowDataToAttributeValueConverterTest { @@ -553,6 +555,30 @@ void testInstantArray() { } @Test + void testDeleteExceptionWhenNoPrimaryKey() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set primaryKeys = Collections.emptySet(); + + // Create a Row with two fields - "key" and "otherField". "key" is the single primary key. + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = + createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + assertThrows(TableException.class, () -> { + rowDataToAttributeValueConverter.convertRowData(rowData); + }); + } + void testDeleteOnlyPrimaryKey() { String key = "key"; String value = "some_string"; From cdfcd05caed4f134f892e40cae959edbcc7cface Mon Sep 17 00:00:00 2001 From: Rob Goretsky Date: Thu, 1 Aug 2024 11:14:49 -0400 Subject: [PATCH 5/6] formatting --- .../table/RowDataToAttributeValueConverterTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index 6ee458763..9c61d3df1 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -574,9 +574,11 @@ void testDeleteExceptionWhenNoPrimaryKey() { StringData.fromString(value), StringData.fromString(otherValue)); rowData.setRowKind(RowKind.DELETE); - assertThrows(TableException.class, () -> { - rowDataToAttributeValueConverter.convertRowData(rowData); - }); + assertThrows( + TableException.class, + () -> { + rowDataToAttributeValueConverter.convertRowData(rowData); + }); } void testDeleteOnlyPrimaryKey() { From 621df92cd44a8602bcc91f7540d5c184fb376e2b Mon Sep 17 00:00:00 2001 From: Rob Goretsky Date: Tue, 13 Aug 2024 17:25:55 -0400 Subject: [PATCH 6/6] Updates as per PR feedback --- .../RowDataToAttributeValueConverter.java | 4 +- .../table/RowDataElementConverterTest.java | 4 +- .../RowDataToAttributeValueConverterTest.java | 60 +++++++++---------- 3 files changed, 30 insertions(+), 38 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java index 3dc5b6def..8db4c5cbc 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java @@ -51,9 +51,7 @@ public class RowDataToAttributeValueConverter { private final Set primaryKeys; public RowDataToAttributeValueConverter(DataType physicalDataType) { - this.physicalDataType = physicalDataType; - this.primaryKeys = Collections.emptySet(); - this.tableSchema = createTableSchema(); + this(physicalDataType, Collections.emptySet()); } public RowDataToAttributeValueConverter(DataType physicalDataType, Set primaryKeys) { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java index f02d5de11..79db51176 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashSet; import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; @@ -53,8 +52,7 @@ public class RowDataElementConverterTest { private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverter = new RowDataToAttributeValueConverter(DATA_TYPE, null); - private static final Set primaryKeys = - new HashSet<>(Collections.singletonList("partition_key")); + private static final Set primaryKeys = Collections.singleton("partition_key"); private static final RowDataElementConverter elementConverterWithPK = new RowDataElementConverter(DATA_TYPE, primaryKeys); private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverterWithPK = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index 9c61d3df1..4ec61ff19 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -58,7 +58,7 @@ void testChar() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.CHAR(9))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); @@ -75,7 +75,7 @@ void testVarChar() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.VARCHAR(13))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); @@ -92,7 +92,7 @@ void testString() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); @@ -109,7 +109,7 @@ void testBoolean() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.BOOLEAN())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -125,7 +125,7 @@ void testDecimal() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.DECIMAL(5, 4))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(DecimalData.fromBigDecimal(value, 5, 4))); @@ -142,7 +142,7 @@ void testTinyInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.TINYINT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -158,7 +158,7 @@ void testSmallInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.SMALLINT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -174,7 +174,7 @@ void testInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.INT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -190,7 +190,7 @@ void testBigInt() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.BIGINT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -206,7 +206,7 @@ void testFloat() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.FLOAT())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -222,7 +222,7 @@ void testDouble() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.DOUBLE())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map expectedResult = @@ -238,7 +238,7 @@ void testTimestamp() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.TIMESTAMP())); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createElement(TimestampData.fromLocalDateTime(value))); @@ -256,7 +256,7 @@ void testStringArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.STRING()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, StringData::fromString)); @@ -281,7 +281,7 @@ void testBooleanArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.BOOLEAN()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -309,7 +309,7 @@ void testBigDecimalArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.DECIMAL(1, 0)))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, d -> DecimalData.fromBigDecimal(d, 1, 0))); @@ -338,7 +338,7 @@ void testByteArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.TINYINT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -366,7 +366,7 @@ void testShortArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.SMALLINT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -393,7 +393,7 @@ void testIntegerArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.INT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -421,7 +421,7 @@ void testLongArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.BIGINT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -448,7 +448,7 @@ void testFloatArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.FLOAT()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -476,7 +476,7 @@ void testDoubleArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.DOUBLE()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map expectedResult = @@ -504,7 +504,7 @@ void testLocalDateTimeArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.TIMESTAMP()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, TimestampData::fromLocalDateTime)); @@ -533,7 +533,7 @@ void testInstantArray() { DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ()))); RowDataToAttributeValueConverter rowDataToAttributeValueConverter = - new RowDataToAttributeValueConverter(dataType, null); + new RowDataToAttributeValueConverter(dataType); Map actualResult = rowDataToAttributeValueConverter.convertRowData( createArray(value, TimestampData::fromInstant)); @@ -607,8 +607,7 @@ void testDeleteOnlyPrimaryKey() { Map expectedResult = singletonMap(key, AttributeValue.builder().s(value).build()); - assertThat(actualResult).containsAllEntriesOf(expectedResult); - assertThat(expectedResult).containsAllEntriesOf(actualResult); + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); } @Test @@ -647,8 +646,7 @@ void testDeleteOnlyPrimaryKeys() { expectedResult.put(key, AttributeValue.builder().s(value).build()); expectedResult.put(additionalKey, AttributeValue.builder().s(additionalValue).build()); - assertThat(actualResult).containsAllEntriesOf(expectedResult); - assertThat(expectedResult).containsAllEntriesOf(actualResult); + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); } @Test @@ -678,8 +676,7 @@ void testPKIgnoredForInsert() { expectedResult.put(key, AttributeValue.builder().s(value).build()); expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build()); - assertThat(actualResult).containsAllEntriesOf(expectedResult); - assertThat(expectedResult).containsAllEntriesOf(actualResult); + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); } @Test @@ -688,7 +685,7 @@ void testPKIgnoredForUpdateAfter() { String value = "some_string"; String otherField = "other_field"; String otherValue = "other_value"; - Set primaryKeys = new HashSet<>(Collections.singletonList(key)); + Set primaryKeys = Collections.singleton(key); // Create a Row with two fields - "key" and "otherField". "key" is the primary key. // For an UPDATE_BEFORE request, all fields should be included regardless of the Primary @@ -710,8 +707,7 @@ void testPKIgnoredForUpdateAfter() { expectedResult.put(key, AttributeValue.builder().s(value).build()); expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build()); - assertThat(actualResult).containsAllEntriesOf(expectedResult); - assertThat(expectedResult).containsAllEntriesOf(actualResult); + assertThat(actualResult).containsExactlyInAnyOrderEntriesOf(expectedResult); } private RowData createElement(Object value) {