Skip to content

Commit 6cfa13d

Browse files
z3d1khlteoh37
authored andcommitted
[FLINK-35815][Connector/Kinesis] Fix detection of recoverable exceptions for EFO operations
1 parent 4258ec4 commit 6cfa13d

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.streaming.connectors.kinesis.util;
1919

2020
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.util.ExceptionUtils;
2122

2223
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
2324
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
@@ -74,8 +75,12 @@ public static boolean isNoneEfoRegistrationType(final Properties properties) {
7475
}
7576

7677
public static boolean isRecoverableException(Exception e) {
77-
Throwable cause = e.getCause();
78-
return cause instanceof LimitExceededException
79-
|| cause instanceof ProvisionedThroughputExceededException;
78+
return ExceptionUtils.findThrowable(
79+
e,
80+
throwable ->
81+
throwable instanceof LimitExceededException
82+
|| throwable
83+
instanceof ProvisionedThroughputExceededException)
84+
.isPresent();
8085
}
8186
}

flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.Test;
2121
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
2222
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
23+
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
2324
import software.amazon.awssdk.utils.AttributeMap;
2425

2526
import java.time.Duration;
@@ -133,7 +134,19 @@ public void testIsNoneEfoRegistrationType() {
133134
}
134135

135136
@Test
136-
public void testIsRecoverableExceptionForRecoverable() {
137+
public void testIsRecoverableExceptionForRecoverableLimitExceeded() {
138+
Exception recoverable = LimitExceededException.builder().build();
139+
assertThat(AwsV2Util.isRecoverableException(recoverable)).isTrue();
140+
}
141+
142+
@Test
143+
public void testIsRecoverableExceptionForRecoverableProvisionedThroughputExceeded() {
144+
Exception recoverable = ProvisionedThroughputExceededException.builder().build();
145+
assertThat(AwsV2Util.isRecoverableException(recoverable)).isTrue();
146+
}
147+
148+
@Test
149+
public void testIsRecoverableExceptionForRecoverableWrapped() {
137150
Exception recoverable = LimitExceededException.builder().build();
138151
assertThat(AwsV2Util.isRecoverableException(new ExecutionException(recoverable))).isTrue();
139152
}

0 commit comments

Comments
 (0)