diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisClientProvider.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisClientProvider.java new file mode 100644 index 000000000..8e310007b --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisClientProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.annotation.Internal; + +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +import java.io.Closeable; + +/** + * Provider interface for KinesisAsyncClient instances. This is primarily used for testing to inject + * mock clients. + */ +@Internal +interface KinesisClientProvider extends Closeable { + /** + * Returns a KinesisAsyncClient instance. + * + * @return The KinesisAsyncClient instance + */ + KinesisAsyncClient get(); + +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java index 684d018aa..70404f8c3 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kinesis.sink; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.connector.aws.util.AWSClientUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; @@ -31,6 +32,7 @@ import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +44,13 @@ import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -96,11 +101,8 @@ class KinesisStreamsSinkWriter extends AsyncSinkWriter extends AsyncSinkWriter> states) { + this( + elementConverter, + context, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes, + failOnError, + streamName, + streamArn, + states, + createDefaultClientProvider(kinesisClientProperties)); + } + + KinesisStreamsSinkWriter( + ElementConverter elementConverter, + Sink.InitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + boolean failOnError, + String streamName, + String streamArn, + Collection> states, + KinesisClientProvider kinesisClientProvider) { super( elementConverter, context, @@ -167,11 +199,48 @@ class KinesisStreamsSinkWriter extends AsyncSinkWriter future = kinesisClient.putRecords(batchRequest); future.whenComplete( @@ -244,34 +314,121 @@ private void handleFullyFailedRequest( @Override public void close() { - AWSGeneralUtil.closeResources(httpClient, kinesisClient); + try { + kinesisClientProvider.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close the kinesisClientProvider", e); + } } private void handlePartiallyFailedRequest( PutRecordsResponse response, List requestEntries, Consumer> requestResult) { - LOG.warn( - "KDS Sink failed to write and will retry {} entries to KDS", - response.failedRecordCount()); - numRecordsOutErrorsCounter.inc(response.failedRecordCount()); + int failedRecordCount = response.failedRecordCount(); + LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", failedRecordCount); + numRecordsOutErrorsCounter.inc(failedRecordCount); if (failOnError) { getFatalExceptionCons() .accept(new KinesisStreamsException.KinesisStreamsFailFastException()); return; } - List failedRequestEntries = - new ArrayList<>(response.failedRecordCount()); + + List failedRequestEntries = new ArrayList<>(failedRecordCount); List records = response.records(); + // Collect error information and build the list of failed entries + Map errorSummaries = + collectErrorSummaries(records, requestEntries, failedRequestEntries); + + // Log aggregated error information + logErrorSummaries(errorSummaries); + + requestResult.accept(failedRequestEntries); + } + + /** + * Collect error summaries from failed records and build a list of failed request entries. + * + * @param records The result entries from the Kinesis response + * @param requestEntries The original request entries + * @param failedRequestEntries List to populate with failed entries (modified as a side effect) + * @return A map of error codes to their summaries + */ + private Map collectErrorSummaries( + List records, + List requestEntries, + List failedRequestEntries) { + + // We capture error info while minimizing logging overhead in the data path, + // which is critical for maintaining throughput performance + Map errorSummaries = new HashMap<>(); + for (int i = 0; i < records.size(); i++) { - if (records.get(i).errorCode() != null) { + PutRecordsResultEntry resultEntry = records.get(i); + String errorCode = resultEntry.errorCode(); + + if (errorCode != null) { + // Track the frequency of each error code to identify patterns + ErrorSummary summary = + errorSummaries.computeIfAbsent( + errorCode, code -> new ErrorSummary(resultEntry.errorMessage())); + summary.incrementCount(); + failedRequestEntries.add(requestEntries.get(i)); } } - requestResult.accept(failedRequestEntries); + return errorSummaries; + } + + /** + * Log aggregated error information at WARN level. + * + * @param errorSummaries Map of error codes to their summaries + */ + private void logErrorSummaries(Map errorSummaries) { + // We log aggregated error information at WARN level to ensure visibility in production + // while avoiding the performance impact of logging each individual failure + if (!errorSummaries.isEmpty()) { + StringBuilder errorSummary = new StringBuilder("Kinesis errors summary: "); + errorSummaries.forEach( + (code, summary) -> + errorSummary.append( + String.format( + "[%s: %d records, example: %s] ", + code, + summary.getCount(), + summary.getExampleMessage()))); + + // Using a single WARN log with aggregated information provides operational + // visibility into errors without flooding logs in high-throughput scenarios + LOG.warn("KDS Sink failed to write, " + errorSummary.toString()); + } + } + + /** Helper class to store error summary information. */ + private static class ErrorSummary { + private final String exampleMessage; + private int count; + + ErrorSummary(String exampleMessage) { + this.exampleMessage = exampleMessage; + this.count = 0; + } + + void incrementCount() { + count++; + } + + int getCount() { + return count; + } + + String getExampleMessage() { + return exampleMessage; + } } private boolean isRetryable(Throwable err) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java index f3c13d427..af948830b 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java @@ -19,18 +19,30 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy; import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Test class for {@link KinesisStreamsSinkWriter}. */ public class KinesisStreamsSinkWriterTest { @@ -44,6 +56,9 @@ public class KinesisStreamsSinkWriterTest { private static final long MAX_TIME_IN_BUFFER = 5000; private static final long MAX_RECORD_SIZE = 1000 * 1024; private static final boolean FAIL_ON_ERROR = false; + private static final String STREAM_NAME = "streamName"; + private static final String STREAM_ARN = + "arn:aws:kinesis:us-east-1:000000000000:stream/" + STREAM_NAME; private KinesisStreamsSinkWriter sinkWriter; @@ -54,24 +69,122 @@ public class KinesisStreamsSinkWriterTest { .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); + // Helper method to create a sink with default settings + private KinesisStreamsSink createDefaultSink() { + Properties sinkProperties = AWSServicesTestUtils.createConfig("https://kds-fake-endpoint"); + return new KinesisStreamsSink<>( + ELEMENT_CONVERTER_PLACEHOLDER, + MAX_BATCH_SIZE, + MAX_INFLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER, + MAX_RECORD_SIZE, + FAIL_ON_ERROR, + STREAM_NAME, + STREAM_ARN, + sinkProperties); + } + + // Helper method to create a simple test request entry + private PutRecordsRequestEntry createTestEntry(String data, String partitionKey) { + return PutRecordsRequestEntry.builder() + .data(SdkBytes.fromUtf8String(data)) + .partitionKey(partitionKey) + .build(); + } + + // Helper method to create a list of test request entries + private List createTestEntries(int count) { + List entries = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + entries.add(createTestEntry("record" + i, "key" + i)); + } + return entries; + } + + // Helper method to create a tracking client that reports if it was used + private KinesisAsyncClient createTrackingClient(AtomicBoolean wasUsed) { + return new KinesisAsyncClient() { + @Override + public String serviceName() { + return "Kinesis"; + } + + @Override + public void close() {} + + @Override + public CompletableFuture putRecords( + PutRecordsRequest putRecordsRequest) { + wasUsed.set(true); + return CompletableFuture.completedFuture( + PutRecordsResponse.builder() + .failedRecordCount(0) + .records(Collections.emptyList()) + .build()); + } + }; + } + + // Helper method to create a client that returns partial failures with different error codes + private KinesisAsyncClient createPartialFailureClient() { + return new KinesisAsyncClient() { + @Override + public String serviceName() { + return "Kinesis"; + } + + @Override + public void close() {} + + @Override + public CompletableFuture putRecords( + PutRecordsRequest putRecordsRequest) { + // Create a response with 5 records, 3 of which failed with different error codes + List resultEntries = new ArrayList<>(); + + // Success record + resultEntries.add(PutRecordsResultEntry.builder().build()); + + // Failed records with different error codes + resultEntries.add( + PutRecordsResultEntry.builder() + .errorCode("ProvisionedThroughputExceededException") + .errorMessage("Rate exceeded for shard 0000") + .build()); + + resultEntries.add( + PutRecordsResultEntry.builder() + .errorCode("InternalFailure") + .errorMessage("Internal service failure") + .build()); + + resultEntries.add( + PutRecordsResultEntry.builder() + .errorCode("ProvisionedThroughputExceededException") + .errorMessage("Rate exceeded for shard 0001") + .build()); + + // Success record + resultEntries.add(PutRecordsResultEntry.builder().build()); + + PutRecordsResponse response = + PutRecordsResponse.builder() + .failedRecordCount(3) + .records(resultEntries) + .build(); + + return CompletableFuture.completedFuture(response); + } + }; + } + @Test void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() throws IOException { TestSinkInitContext sinkInitContext = new TestSinkInitContext(); - Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint"); - KinesisStreamsSink sink = - new KinesisStreamsSink<>( - ELEMENT_CONVERTER_PLACEHOLDER, - MAX_BATCH_SIZE, - MAX_INFLIGHT_REQUESTS, - MAX_BUFFERED_REQUESTS, - MAX_BATCH_SIZE_IN_BYTES, - MAX_TIME_IN_BUFFER, - MAX_RECORD_SIZE, - FAIL_ON_ERROR, - "streamName", - "arn:aws:kinesis:us-east-1:000000000000:stream/streamName", - sinkProperties); + KinesisStreamsSink sink = createDefaultSink(); sinkWriter = (KinesisStreamsSinkWriter) sink.createWriter(sinkInitContext); assertThat(sinkWriter) @@ -95,4 +208,132 @@ void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpect .extracting("decreaseFactor") .isEqualTo(EXPECTED_AIMD_DEC_FACTOR); } + + @Test + public void testCustomKinesisClientProviderIsUsed() throws IOException { + // Create a tracking client + AtomicBoolean clientWasUsed = new AtomicBoolean(false); + KinesisAsyncClient mockClient = createTrackingClient(clientWasUsed); + + // Create a client provider with the mock client + TestKinesisClientProvider clientProvider = new TestKinesisClientProvider(mockClient); + + // Create the sink + KinesisStreamsSink sink = createDefaultSink(); + + // Create the writer and set the client provider + TestSinkInitContext sinkInitContext = new TestSinkInitContext(); + KinesisStreamsSinkWriter writer = + (KinesisStreamsSinkWriter) sink.createWriter(sinkInitContext); + writer.setKinesisClientProvider(clientProvider); + + // Submit a request to trigger the client + List requestEntries = + Collections.singletonList(createTestEntry("test", "test-key")); + + CompletableFuture> result = new CompletableFuture<>(); + writer.submitRequestEntries(requestEntries, result::complete); + + // Verify the mock client was used + assertThat(clientWasUsed.get()).isTrue(); + assertThat(result.join()).isEmpty(); // No failures + } + + @Test + public void testCustomKinesisClientIsClosedWithWriter() throws IOException { + // Create a client provider with a mock client + TestKinesisClientProvider clientProvider = + new TestKinesisClientProvider(createTrackingClient(new AtomicBoolean())); + + // Create the sink + KinesisStreamsSink sink = createDefaultSink(); + + // Create with a custom client provider and close the writer + TestSinkInitContext sinkInitContext = new TestSinkInitContext(); + KinesisStreamsSinkWriter writer = + (KinesisStreamsSinkWriter) sink.createWriter(sinkInitContext); + writer.setKinesisClientProvider(clientProvider); + writer.close(); + + // Verify the client provider was closed + assertThat(clientProvider.getCloseCount()).isEqualTo(1); + } + + @Test + public void testDefaultClientProviderIsUsed() throws IOException { + // Create the sink without setting a custom client provider + KinesisStreamsSink sink = createDefaultSink(); + + // Create the writer + TestSinkInitContext sinkInitContext = new TestSinkInitContext(); + KinesisStreamsSinkWriter writer = + (KinesisStreamsSinkWriter) sink.createWriter(sinkInitContext); + + // Verify that the writer has a non-null client provider and client + assertThat(writer) + .extracting("kinesisClientProvider") + .isNotNull(); + // Close the writer and verify no exceptions are thrown + writer.close(); + } + + @Test + public void testErrorLoggingWithCustomClient() throws IOException { + // Create a client provider with a partial failure client + TestKinesisClientProvider clientProvider = + new TestKinesisClientProvider(createPartialFailureClient()); + + // Create the sink + KinesisStreamsSink sink = createDefaultSink(); + + // Create the writer and set the client provider + TestSinkInitContext sinkInitContext = new TestSinkInitContext(); + KinesisStreamsSinkWriter writer = + (KinesisStreamsSinkWriter) sink.createWriter(sinkInitContext); + writer.setKinesisClientProvider(clientProvider); + + // Create test request entries + List requestEntries = createTestEntries(5); + + // Call submitRequestEntries and capture the result + CompletableFuture> failedRequests = new CompletableFuture<>(); + Consumer> requestResult = failedRequests::complete; + + writer.submitRequestEntries(requestEntries, requestResult); + + // Verify that the correct records are returned for retry + List result = failedRequests.join(); + assertThat(result).hasSize(3); + assertThat(result) + .contains(requestEntries.get(1), requestEntries.get(2), requestEntries.get(3)); + + // Verify that the error counter was incremented correctly + assertThat(sinkInitContext.metricGroup().getNumRecordsOutErrorsCounter().getCount()) + .isEqualTo(3); + } + + /** A test implementation of KinesisClientProvider that returns a mock KinesisAsyncClient. */ + private static class TestKinesisClientProvider implements KinesisClientProvider { + private final KinesisAsyncClient kinesisClient; + private int closeCount = 0; + + private TestKinesisClientProvider(KinesisAsyncClient kinesisClient) { + this.kinesisClient = kinesisClient; + } + + @Override + public KinesisAsyncClient get() { + return kinesisClient; + } + + @Override + public void close() { + AWSGeneralUtil.closeResources(kinesisClient); + closeCount++; + } + + public int getCloseCount() { + return closeCount; + } + } }