18
18
package org .apache .flink .connector .kinesis .sink ;
19
19
20
20
import org .apache .flink .annotation .Internal ;
21
+ import org .apache .flink .annotation .VisibleForTesting ;
21
22
import org .apache .flink .api .connector .sink2 .Sink ;
22
23
import org .apache .flink .connector .aws .util .AWSClientUtil ;
23
24
import org .apache .flink .connector .aws .util .AWSGeneralUtil ;
31
32
import org .apache .flink .connector .base .sink .writer .strategy .RateLimitingStrategy ;
32
33
import org .apache .flink .metrics .Counter ;
33
34
import org .apache .flink .metrics .groups .SinkWriterMetricGroup ;
35
+ import org .apache .flink .util .Preconditions ;
34
36
35
37
import org .slf4j .Logger ;
36
38
import org .slf4j .LoggerFactory ;
42
44
import software .amazon .awssdk .services .kinesis .model .PutRecordsResultEntry ;
43
45
import software .amazon .awssdk .services .kinesis .model .ResourceNotFoundException ;
44
46
47
+ import java .io .IOException ;
45
48
import java .util .ArrayList ;
46
49
import java .util .Collection ;
47
50
import java .util .Collections ;
51
+ import java .util .HashMap ;
48
52
import java .util .List ;
53
+ import java .util .Map ;
49
54
import java .util .Properties ;
50
55
import java .util .concurrent .CompletableFuture ;
51
56
import java .util .function .Consumer ;
@@ -96,11 +101,8 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
96
101
/* The sink writer metric group */
97
102
private final SinkWriterMetricGroup metrics ;
98
103
99
- /* The asynchronous http client for the asynchronous Kinesis client */
100
- private final SdkAsyncHttpClient httpClient ;
101
-
102
- /* The asynchronous Kinesis client - construction is by kinesisClientProperties */
103
- private final KinesisAsyncClient kinesisClient ;
104
+ /* The client provider */
105
+ private KinesisClientProvider kinesisClientProvider ;
104
106
105
107
/* Flag to whether fatally fail any time we encounter an exception when persisting records */
106
108
private final boolean failOnError ;
@@ -148,6 +150,36 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
148
150
String streamArn ,
149
151
Properties kinesisClientProperties ,
150
152
Collection <BufferedRequestState <PutRecordsRequestEntry >> states ) {
153
+ this (
154
+ elementConverter ,
155
+ context ,
156
+ maxBatchSize ,
157
+ maxInFlightRequests ,
158
+ maxBufferedRequests ,
159
+ maxBatchSizeInBytes ,
160
+ maxTimeInBufferMS ,
161
+ maxRecordSizeInBytes ,
162
+ failOnError ,
163
+ streamName ,
164
+ streamArn ,
165
+ states ,
166
+ createDefaultClientProvider (kinesisClientProperties ));
167
+ }
168
+
169
+ KinesisStreamsSinkWriter (
170
+ ElementConverter <InputT , PutRecordsRequestEntry > elementConverter ,
171
+ Sink .InitContext context ,
172
+ int maxBatchSize ,
173
+ int maxInFlightRequests ,
174
+ int maxBufferedRequests ,
175
+ long maxBatchSizeInBytes ,
176
+ long maxTimeInBufferMS ,
177
+ long maxRecordSizeInBytes ,
178
+ boolean failOnError ,
179
+ String streamName ,
180
+ String streamArn ,
181
+ Collection <BufferedRequestState <PutRecordsRequestEntry >> states ,
182
+ KinesisClientProvider kinesisClientProvider ) {
151
183
super (
152
184
elementConverter ,
153
185
context ,
@@ -167,11 +199,48 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
167
199
this .streamArn = streamArn ;
168
200
this .metrics = context .metricGroup ();
169
201
this .numRecordsOutErrorsCounter = metrics .getNumRecordsOutErrorsCounter ();
170
- this .httpClient = AWSGeneralUtil .createAsyncHttpClient (kinesisClientProperties );
171
- this .kinesisClient = buildClient (kinesisClientProperties , this .httpClient );
202
+ setKinesisClientProvider (kinesisClientProvider );
203
+ }
204
+
205
+ /**
206
+ * Create a default KinesisClientProvider to manage the Kinesis client and HTTP client.
207
+ *
208
+ * @param kinesisClientProperties Properties for configuring the Kinesis client
209
+ * @return A KinesisClientProvider implementation
210
+ */
211
+ private static KinesisClientProvider createDefaultClientProvider (Properties kinesisClientProperties ) {
212
+ return new KinesisClientProvider () {
213
+ private final SdkAsyncHttpClient httpClient =
214
+ AWSGeneralUtil .createAsyncHttpClient (kinesisClientProperties );
215
+ private final KinesisAsyncClient kinesisClient =
216
+ buildClient (kinesisClientProperties , httpClient );
217
+
218
+ @ Override
219
+ public KinesisAsyncClient get () {
220
+ return kinesisClient ;
221
+ }
222
+
223
+ @ Override
224
+ public void close () {
225
+ AWSGeneralUtil .closeResources (httpClient , kinesisClient );
226
+ }
227
+ };
228
+ }
229
+
230
+ /**
231
+ * Set a custom KinesisAsyncClient provider for testing purposes. This method is only intended
232
+ * to be used in tests.
233
+ *
234
+ * @param kinesisClientProvider The provider that supplies the KinesisAsyncClient
235
+ */
236
+ @ VisibleForTesting
237
+ void setKinesisClientProvider (KinesisClientProvider kinesisClientProvider ) {
238
+ this .kinesisClientProvider =
239
+ Preconditions .checkNotNull (
240
+ kinesisClientProvider , "The kinesisClientProvider must not be null." );
172
241
}
173
242
174
- private KinesisAsyncClient buildClient (
243
+ private static KinesisAsyncClient buildClient (
175
244
Properties kinesisClientProperties , SdkAsyncHttpClient httpClient ) {
176
245
AWSGeneralUtil .validateAwsCredentials (kinesisClientProperties );
177
246
@@ -208,6 +277,7 @@ protected void submitRequestEntries(
208
277
.streamARN (streamArn )
209
278
.build ();
210
279
280
+ KinesisAsyncClient kinesisClient = kinesisClientProvider .get ();
211
281
CompletableFuture <PutRecordsResponse > future = kinesisClient .putRecords (batchRequest );
212
282
213
283
future .whenComplete (
@@ -244,34 +314,121 @@ private void handleFullyFailedRequest(
244
314
245
315
@ Override
246
316
public void close () {
247
- AWSGeneralUtil .closeResources (httpClient , kinesisClient );
317
+ try {
318
+ kinesisClientProvider .close ();
319
+ } catch (IOException e ) {
320
+ throw new RuntimeException ("Failed to close the kinesisClientProvider" , e );
321
+ }
248
322
}
249
323
250
324
private void handlePartiallyFailedRequest (
251
325
PutRecordsResponse response ,
252
326
List <PutRecordsRequestEntry > requestEntries ,
253
327
Consumer <List <PutRecordsRequestEntry >> requestResult ) {
254
- LOG .warn (
255
- "KDS Sink failed to write and will retry {} entries to KDS" ,
256
- response .failedRecordCount ());
257
- numRecordsOutErrorsCounter .inc (response .failedRecordCount ());
328
+ int failedRecordCount = response .failedRecordCount ();
329
+ LOG .warn ("KDS Sink failed to write and will retry {} entries to KDS" , failedRecordCount );
330
+ numRecordsOutErrorsCounter .inc (failedRecordCount );
258
331
259
332
if (failOnError ) {
260
333
getFatalExceptionCons ()
261
334
.accept (new KinesisStreamsException .KinesisStreamsFailFastException ());
262
335
return ;
263
336
}
264
- List < PutRecordsRequestEntry > failedRequestEntries =
265
- new ArrayList <>(response . failedRecordCount () );
337
+
338
+ List < PutRecordsRequestEntry > failedRequestEntries = new ArrayList <>(failedRecordCount );
266
339
List <PutRecordsResultEntry > records = response .records ();
267
340
341
+ // Collect error information and build the list of failed entries
342
+ Map <String , ErrorSummary > errorSummaries =
343
+ collectErrorSummaries (records , requestEntries , failedRequestEntries );
344
+
345
+ // Log aggregated error information
346
+ logErrorSummaries (errorSummaries );
347
+
348
+ requestResult .accept (failedRequestEntries );
349
+ }
350
+
351
+ /**
352
+ * Collect error summaries from failed records and build a list of failed request entries.
353
+ *
354
+ * @param records The result entries from the Kinesis response
355
+ * @param requestEntries The original request entries
356
+ * @param failedRequestEntries List to populate with failed entries (modified as a side effect)
357
+ * @return A map of error codes to their summaries
358
+ */
359
+ private Map <String , ErrorSummary > collectErrorSummaries (
360
+ List <PutRecordsResultEntry > records ,
361
+ List <PutRecordsRequestEntry > requestEntries ,
362
+ List <PutRecordsRequestEntry > failedRequestEntries ) {
363
+
364
+ // We capture error info while minimizing logging overhead in the data path,
365
+ // which is critical for maintaining throughput performance
366
+ Map <String , ErrorSummary > errorSummaries = new HashMap <>();
367
+
268
368
for (int i = 0 ; i < records .size (); i ++) {
269
- if (records .get (i ).errorCode () != null ) {
369
+ PutRecordsResultEntry resultEntry = records .get (i );
370
+ String errorCode = resultEntry .errorCode ();
371
+
372
+ if (errorCode != null ) {
373
+ // Track the frequency of each error code to identify patterns
374
+ ErrorSummary summary =
375
+ errorSummaries .computeIfAbsent (
376
+ errorCode , code -> new ErrorSummary (resultEntry .errorMessage ()));
377
+ summary .incrementCount ();
378
+
270
379
failedRequestEntries .add (requestEntries .get (i ));
271
380
}
272
381
}
273
382
274
- requestResult .accept (failedRequestEntries );
383
+ return errorSummaries ;
384
+ }
385
+
386
+ /**
387
+ * Log aggregated error information at WARN level.
388
+ *
389
+ * @param errorSummaries Map of error codes to their summaries
390
+ */
391
+ private void logErrorSummaries (Map <String , ErrorSummary > errorSummaries ) {
392
+ // We log aggregated error information at WARN level to ensure visibility in production
393
+ // while avoiding the performance impact of logging each individual failure
394
+ if (!errorSummaries .isEmpty ()) {
395
+ StringBuilder errorSummary = new StringBuilder ("Kinesis errors summary: " );
396
+ errorSummaries .forEach (
397
+ (code , summary ) ->
398
+ errorSummary .append (
399
+ String .format (
400
+ "[%s: %d records, example: %s] " ,
401
+ code ,
402
+ summary .getCount (),
403
+ summary .getExampleMessage ())));
404
+
405
+ // Using a single WARN log with aggregated information provides operational
406
+ // visibility into errors without flooding logs in high-throughput scenarios
407
+ LOG .warn ("KDS Sink failed to write, " + errorSummary .toString ());
408
+ }
409
+ }
410
+
411
+ /** Helper class to store error summary information. */
412
+ private static class ErrorSummary {
413
+ private final String exampleMessage ;
414
+ private int count ;
415
+
416
+ ErrorSummary (String exampleMessage ) {
417
+ this .exampleMessage = exampleMessage ;
418
+ this .count = 0 ;
419
+ }
420
+
421
+ void incrementCount () {
422
+ count ++;
423
+ }
424
+
425
+ int getCount () {
426
+ return count ;
427
+ }
428
+
429
+ String getExampleMessage () {
430
+ return exampleMessage ;
431
+ }
275
432
}
276
433
277
434
private boolean isRetryable (Throwable err ) {
0 commit comments