Skip to content

[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kinesis.sink;

import org.apache.flink.annotation.Internal;

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

import java.io.Closeable;

/**
* Provider interface for KinesisAsyncClient instances. This is primarily used for testing to inject
* mock clients.
*/
@Internal
interface KinesisClientProvider extends Closeable {
/**
* Returns a KinesisAsyncClient instance.
*
* @return The KinesisAsyncClient instance
*/
KinesisAsyncClient get();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.connector.kinesis.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,10 +44,13 @@
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -96,11 +101,8 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
/* The sink writer metric group */
private final SinkWriterMetricGroup metrics;

/* The asynchronous http client for the asynchronous Kinesis client */
private final SdkAsyncHttpClient httpClient;

/* The asynchronous Kinesis client - construction is by kinesisClientProperties */
private final KinesisAsyncClient kinesisClient;
/* The client provider */
private KinesisClientProvider kinesisClientProvider;

/* Flag to whether fatally fail any time we encounter an exception when persisting records */
private final boolean failOnError;
Expand Down Expand Up @@ -148,6 +150,36 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
String streamArn,
Properties kinesisClientProperties,
Collection<BufferedRequestState<PutRecordsRequestEntry>> states) {
this(
elementConverter,
context,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
failOnError,
streamName,
streamArn,
states,
createDefaultClientProvider(kinesisClientProperties));
}

KinesisStreamsSinkWriter(
ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
boolean failOnError,
String streamName,
String streamArn,
Collection<BufferedRequestState<PutRecordsRequestEntry>> states,
KinesisClientProvider kinesisClientProvider) {
super(
elementConverter,
context,
Expand All @@ -167,11 +199,48 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
this.streamArn = streamArn;
this.metrics = context.metricGroup();
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
this.httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
setKinesisClientProvider(kinesisClientProvider);
}

/**
* Create a default KinesisClientProvider to manage the Kinesis client and HTTP client.
*
* @param kinesisClientProperties Properties for configuring the Kinesis client
* @return A KinesisClientProvider implementation
*/
private static KinesisClientProvider createDefaultClientProvider(Properties kinesisClientProperties) {
return new KinesisClientProvider() {
private final SdkAsyncHttpClient httpClient =
AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
private final KinesisAsyncClient kinesisClient =
buildClient(kinesisClientProperties, httpClient);

@Override
public KinesisAsyncClient get() {
return kinesisClient;
}

@Override
public void close() {
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
}
};
}

/**
* Set a custom KinesisAsyncClient provider for testing purposes. This method is only intended
* to be used in tests.
*
* @param kinesisClientProvider The provider that supplies the KinesisAsyncClient
*/
@VisibleForTesting
void setKinesisClientProvider(KinesisClientProvider kinesisClientProvider) {
this.kinesisClientProvider =
Preconditions.checkNotNull(
kinesisClientProvider, "The kinesisClientProvider must not be null.");
}

private KinesisAsyncClient buildClient(
private static KinesisAsyncClient buildClient(
Properties kinesisClientProperties, SdkAsyncHttpClient httpClient) {
AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);

Expand Down Expand Up @@ -208,6 +277,7 @@ protected void submitRequestEntries(
.streamARN(streamArn)
.build();

KinesisAsyncClient kinesisClient = kinesisClientProvider.get();
CompletableFuture<PutRecordsResponse> future = kinesisClient.putRecords(batchRequest);

future.whenComplete(
Expand Down Expand Up @@ -244,34 +314,121 @@ private void handleFullyFailedRequest(

@Override
public void close() {
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
try {
kinesisClientProvider.close();
} catch (IOException e) {
throw new RuntimeException("Failed to close the kinesisClientProvider", e);
}
}

private void handlePartiallyFailedRequest(
PutRecordsResponse response,
List<PutRecordsRequestEntry> requestEntries,
Consumer<List<PutRecordsRequestEntry>> requestResult) {
LOG.warn(
"KDS Sink failed to write and will retry {} entries to KDS",
response.failedRecordCount());
numRecordsOutErrorsCounter.inc(response.failedRecordCount());
int failedRecordCount = response.failedRecordCount();
LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", failedRecordCount);
numRecordsOutErrorsCounter.inc(failedRecordCount);

if (failOnError) {
getFatalExceptionCons()
.accept(new KinesisStreamsException.KinesisStreamsFailFastException());
return;
}
List<PutRecordsRequestEntry> failedRequestEntries =
new ArrayList<>(response.failedRecordCount());

List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(failedRecordCount);
List<PutRecordsResultEntry> records = response.records();

// Collect error information and build the list of failed entries
Map<String, ErrorSummary> errorSummaries =
collectErrorSummaries(records, requestEntries, failedRequestEntries);

// Log aggregated error information
logErrorSummaries(errorSummaries);

requestResult.accept(failedRequestEntries);
}

/**
* Collect error summaries from failed records and build a list of failed request entries.
*
* @param records The result entries from the Kinesis response
* @param requestEntries The original request entries
* @param failedRequestEntries List to populate with failed entries (modified as a side effect)
* @return A map of error codes to their summaries
*/
private Map<String, ErrorSummary> collectErrorSummaries(
List<PutRecordsResultEntry> records,
List<PutRecordsRequestEntry> requestEntries,
List<PutRecordsRequestEntry> failedRequestEntries) {

// We capture error info while minimizing logging overhead in the data path,
// which is critical for maintaining throughput performance
Map<String, ErrorSummary> errorSummaries = new HashMap<>();

for (int i = 0; i < records.size(); i++) {
if (records.get(i).errorCode() != null) {
PutRecordsResultEntry resultEntry = records.get(i);
String errorCode = resultEntry.errorCode();

if (errorCode != null) {
// Track the frequency of each error code to identify patterns
ErrorSummary summary =
errorSummaries.computeIfAbsent(
errorCode, code -> new ErrorSummary(resultEntry.errorMessage()));
summary.incrementCount();

failedRequestEntries.add(requestEntries.get(i));
}
}

requestResult.accept(failedRequestEntries);
return errorSummaries;
}

/**
* Log aggregated error information at WARN level.
*
* @param errorSummaries Map of error codes to their summaries
*/
private void logErrorSummaries(Map<String, ErrorSummary> errorSummaries) {
// We log aggregated error information at WARN level to ensure visibility in production
// while avoiding the performance impact of logging each individual failure
if (!errorSummaries.isEmpty()) {
StringBuilder errorSummary = new StringBuilder("Kinesis errors summary: ");
errorSummaries.forEach(
(code, summary) ->
errorSummary.append(
String.format(
"[%s: %d records, example: %s] ",
code,
summary.getCount(),
summary.getExampleMessage())));

// Using a single WARN log with aggregated information provides operational
// visibility into errors without flooding logs in high-throughput scenarios
LOG.warn("KDS Sink failed to write, " + errorSummary.toString());
}
}

/** Helper class to store error summary information. */
private static class ErrorSummary {
private final String exampleMessage;
private int count;

ErrorSummary(String exampleMessage) {
this.exampleMessage = exampleMessage;
this.count = 0;
}

void incrementCount() {
count++;
}

int getCount() {
return count;
}

String getExampleMessage() {
return exampleMessage;
}
}

private boolean isRetryable(Throwable err) {
Expand Down
Loading