From 1998af6ded0f648196c2bef1beb6664e0e1dbe43 Mon Sep 17 00:00:00 2001 From: Antonio Silva Date: Mon, 6 May 2024 18:45:49 +0100 Subject: [PATCH 01/14] Initial commit with the change to respect initial position for new streams - disabled by default so that this can go in a minor release. Default can be adjusted for a major. --- .../kinesis/FlinkKinesisConsumer.java | 86 +++++++++++++++---- .../config/ConsumerConfigConstants.java | 10 +++ 2 files changed, 78 insertions(+), 18 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 06d0acc9d..fcf5c6b3e 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -58,7 +58,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -148,6 +150,12 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction private transient HashMap sequenceNumsToRestore; + /** + * The streams present in the {@link #sequenceNumsToRestore} map, which means they were consumed + * by the application previously, so we know where to consume from. + */ + private transient Set knownStreams; + /** * Flag used to control reading from Kinesis: source will read data while value is true. Changed * to false after {@link #cancel()} has been called. @@ -323,14 +331,23 @@ public void run(SourceContext sourceContext) throws Exception { // initial discovery List allShards = fetcher.discoverNewShardsToSubscribe(); + boolean applyStreamInitialPositionForNewStreams = + Optional.ofNullable( + configProps.getProperty( + ConsumerConfigConstants + .APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS)) + .map(Boolean::parseBoolean) + .orElse( + ConsumerConfigConstants + .DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS); + for (StreamShardHandle shard : allShards) { StreamShardMetadata.EquivalenceWrapper kinesisStreamShard = new StreamShardMetadata.EquivalenceWrapper( KinesisDataFetcher.convertToStreamShardMetadata(shard)); if (sequenceNumsToRestore != null) { - - if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) { + if (knownStreams.contains(shard.getStreamName())) { // if the shard was already seen and is contained in the state, // just use the sequence number stored in the state fetcher.registerNewSubscribedShardState( @@ -348,24 +365,55 @@ public void run(SourceContext sourceContext) throws Exception { sequenceNumsToRestore.get(kinesisStreamShard)); } } else { - // the shard wasn't discovered in the previous run, therefore should be consumed - // from the beginning - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} is seeding the fetcher with new discovered shard {}," - + " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString()); + if (applyStreamInitialPositionForNewStreams) { + // we're starting fresh (either for the whole consumer or for this stream); + // use the configured start position as initial state + SentinelSequenceNumber startingSeqNum = + InitialPosition.valueOf( + configProps.getProperty( + ConsumerConfigConstants + .STREAM_INITIAL_POSITION, + ConsumerConfigConstants + .DEFAULT_STREAM_INITIAL_POSITION)) + .toSentinelSequenceNumber(); + + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), + shard, + startingSeqNum.get())); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString(), + startingSeqNum.get()); + } + } else { + // the shard wasn't discovered in the previous run, therefore should be + // consumed + // from the beginning OR this is a new stream we haven't seen yet, and the + // applyStreamInitialPositionForNewStreams flag is false + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), + shard, + SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM + .get())); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} is seeding the fetcher with new discovered shard {}," + + " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString()); + } } } } else { - // we're starting fresh; use the configured start position as initial state + // we're starting fresh (either for the whole consumer or for this stream); + // use the configured start position as initial state SentinelSequenceNumber startingSeqNum = InitialPosition.valueOf( configProps.getProperty( @@ -466,6 +514,7 @@ public void initializeState(FunctionInitializationContext context) throws Except sequenceNumsToRestore = new HashMap<>(); for (Tuple2 kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) { + StreamShardMetadata streamShardMetadata = kinesisSequenceNumber.f0; sequenceNumsToRestore.put( // we wrap the restored metadata inside an equivalence wrapper that // checks only stream name and shard id, @@ -474,8 +523,9 @@ public void initializeState(FunctionInitializationContext context) throws Except // the savepoint and has a different metadata than what we last stored, // we will still be able to match it in sequenceNumsToRestore. Please // see FLINK-8484 for details. - new StreamShardMetadata.EquivalenceWrapper(kinesisSequenceNumber.f0), + new StreamShardMetadata.EquivalenceWrapper(streamShardMetadata), kinesisSequenceNumber.f1); + knownStreams.add(streamShardMetadata.getStreamName()); } LOG.info( diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 710682266..5d2e01da7 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -320,12 +320,22 @@ public enum EFORegistrationType { public static final String EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS = "flink.stream.efo.http-client.read-timeout"; + /** + * Flag to configure whether {@link #STREAM_INITIAL_POSITION} should be considered for new + * streams, when the app is already consuming from other streams. + */ + public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = + "flink.stream.initpos-for-new-streams"; + // ------------------------------------------------------------------------ // Default values for consumer configuration // ------------------------------------------------------------------------ public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); + /** False for now so that we preserve old behaviour. TODO switch to true in the next major */ + public static final boolean DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = false; + public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; From 7d435638d40ecaed708157004eda9b5a95c3f20a Mon Sep 17 00:00:00 2001 From: Antonio Silva Date: Tue, 7 May 2024 11:56:57 +0100 Subject: [PATCH 02/14] Add new property to allow reset of individual streams --- .../kinesis/FlinkKinesisConsumer.java | 205 ++++++++++-------- .../config/ConsumerConfigConstants.java | 13 ++ 2 files changed, 125 insertions(+), 93 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index fcf5c6b3e..75946877b 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata.EquivalenceWrapper; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.table.DefaultShardAssignerFactory; @@ -54,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -61,6 +63,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -332,108 +335,35 @@ public void run(SourceContext sourceContext) throws Exception { List allShards = fetcher.discoverNewShardsToSubscribe(); boolean applyStreamInitialPositionForNewStreams = - Optional.ofNullable( - configProps.getProperty( - ConsumerConfigConstants - .APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS)) - .map(Boolean::parseBoolean) - .orElse( - ConsumerConfigConstants - .DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS); + getApplyStreamInitialPositionForNewStreamsFlag(); + + Set streamsToForceInitialPositionIn = getStreamsToForceInitialPositionIn(); for (StreamShardHandle shard : allShards) { StreamShardMetadata.EquivalenceWrapper kinesisStreamShard = new StreamShardMetadata.EquivalenceWrapper( KinesisDataFetcher.convertToStreamShardMetadata(shard)); + String stream = shard.getStreamName(); - if (sequenceNumsToRestore != null) { - if (knownStreams.contains(shard.getStreamName())) { + if (sequenceNumsToRestore == null || streamsToForceInitialPositionIn.contains(stream)) { + // we're starting fresh (either for the whole consumer or for this stream); + // use the configured start position as initial state + registerFromInitialPosition(fetcher, shard, kinesisStreamShard); + } else { + if (knownStreams.contains(stream)) { // if the shard was already seen and is contained in the state, // just use the sequence number stored in the state - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - sequenceNumsToRestore.get(kinesisStreamShard))); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} is seeding the fetcher with restored shard {}," - + " starting state set to the restored sequence number {}", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString(), - sequenceNumsToRestore.get(kinesisStreamShard)); - } + registerFromState(fetcher, shard, kinesisStreamShard); + } else if (applyStreamInitialPositionForNewStreams) { + // we're starting fresh (either for the whole consumer or for this stream); + // use the configured start position as initial state + registerFromInitialPosition(fetcher, shard, kinesisStreamShard); } else { - if (applyStreamInitialPositionForNewStreams) { - // we're starting fresh (either for the whole consumer or for this stream); - // use the configured start position as initial state - SentinelSequenceNumber startingSeqNum = - InitialPosition.valueOf( - configProps.getProperty( - ConsumerConfigConstants - .STREAM_INITIAL_POSITION, - ConsumerConfigConstants - .DEFAULT_STREAM_INITIAL_POSITION)) - .toSentinelSequenceNumber(); - - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - startingSeqNum.get())); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString(), - startingSeqNum.get()); - } - } else { - // the shard wasn't discovered in the previous run, therefore should be - // consumed - // from the beginning OR this is a new stream we haven't seen yet, and the - // applyStreamInitialPositionForNewStreams flag is false - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM - .get())); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} is seeding the fetcher with new discovered shard {}," - + " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString()); - } - } - } - } else { - // we're starting fresh (either for the whole consumer or for this stream); - // use the configured start position as initial state - SentinelSequenceNumber startingSeqNum = - InitialPosition.valueOf( - configProps.getProperty( - ConsumerConfigConstants.STREAM_INITIAL_POSITION, - ConsumerConfigConstants - .DEFAULT_STREAM_INITIAL_POSITION)) - .toSentinelSequenceNumber(); - - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - startingSeqNum.get())); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString(), - startingSeqNum.get()); + // the shard wasn't discovered in the previous run, therefore should be + // consumed + // from the beginning OR this is a new stream we haven't seen yet, and the + // applyStreamInitialPositionForNewStreams flag is false + registerFromBeginning(fetcher, shard, kinesisStreamShard); } } } @@ -456,6 +386,95 @@ public void run(SourceContext sourceContext) throws Exception { sourceContext.close(); } + private Set getStreamsToForceInitialPositionIn() { + String streamsToForceInitialPositionInStr = + Optional.ofNullable( + configProps.getProperty( + ConsumerConfigConstants + .STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO)) + .orElse( + ConsumerConfigConstants + .DEFAULT_STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO); + + return Arrays.stream(streamsToForceInitialPositionInStr.split(",")) + .map(String::trim) + .collect(Collectors.toSet()); + } + + private Boolean getApplyStreamInitialPositionForNewStreamsFlag() { + return Optional.ofNullable( + configProps.getProperty( + ConsumerConfigConstants + .APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS)) + .map(Boolean::parseBoolean) + .orElse( + ConsumerConfigConstants + .DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS); + } + + private void registerFromBeginning( + KinesisDataFetcher fetcher, + StreamShardHandle shard, + EquivalenceWrapper kinesisStreamShard) { + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), + shard, + SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} is seeding the fetcher with new discovered shard {}," + + " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString()); + } + } + + private void registerFromInitialPosition( + KinesisDataFetcher fetcher, + StreamShardHandle shard, + EquivalenceWrapper kinesisStreamShard) { + SentinelSequenceNumber startingSeqNum = + InitialPosition.valueOf( + configProps.getProperty( + ConsumerConfigConstants.STREAM_INITIAL_POSITION, + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)) + .toSentinelSequenceNumber(); + + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get())); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString(), + startingSeqNum.get()); + } + } + + private void registerFromState( + KinesisDataFetcher fetcher, + StreamShardHandle shard, + EquivalenceWrapper kinesisStreamShard) { + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), + shard, + sequenceNumsToRestore.get(kinesisStreamShard))); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} is seeding the fetcher with restored shard {}," + + " starting state set to the restored sequence number {}", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString(), + sequenceNumsToRestore.get(kinesisStreamShard)); + } + } + @Override public void cancel() { running = false; diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 5d2e01da7..ceb539cec 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -327,6 +327,17 @@ public enum EFORegistrationType { public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = "flink.stream.initpos-for-new-streams"; + /** + * Property that can be used to ignore the restore state for a particular stream and instead use + * the initial position. This is useful to reset a specific stream to consume from TRIM_HORIZON + * or LATEST if needed. Values must be passed in a comma separated list. + * + *

If a stream is in this list, it will use initial position regardless of the value of the + * {@link #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property. + */ + public static final String STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO = + "flink.stream.initpos-streams"; + // ------------------------------------------------------------------------ // Default values for consumer configuration // ------------------------------------------------------------------------ @@ -336,6 +347,8 @@ public enum EFORegistrationType { /** False for now so that we preserve old behaviour. TODO switch to true in the next major */ public static final boolean DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = false; + public static final String DEFAULT_STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO = ""; + public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; From 49f4bc4f30311a8e990cffb909a3b4eda7e76372 Mon Sep 17 00:00:00 2001 From: Antonio Silva Date: Tue, 7 May 2024 12:25:08 +0100 Subject: [PATCH 03/14] add docs --- .../docs/connectors/datastream/kinesis.md | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md index 72bd5d738..90c63b24a 100644 --- a/docs/content/docs/connectors/datastream/kinesis.md +++ b/docs/content/docs/connectors/datastream/kinesis.md @@ -217,6 +217,31 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). +### Configuring starting position for new streams + +By default, the Flink Kinesis Consumer handles new streams the same way it handles a new shard for an existing stream, and it starts consuming from the earliest record (same behaviour as TRIM_HORIZON). + +This is usually not what you want for new streams, where it makes more sense to respect what is defined in `ConsumerConfigConstants.STREAM_INITIAL_POSITION`. +This behaviour can now be enabled by setting the `ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to true. + +### Resetting specific streams to the starting position + +One of the features of the Flink Kinesis Consumer is that it keeps track of the offset that the application is at for each shard, so that if the application is restarted we can start consuming from that offset +when restoring from snapshot. + +This is the ideal behaviour most of the time, but what if you want to jump to `LATEST` or go back to `TRIM_HORIZON` for a stream that is already being tracked by the Flink Kinesis Consumer? + +You can now do this via the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property, which expects a comma separated list of strings referring to the names of the Kinesis Streams to reset. + +For example, if you configure your application with +``` +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +consumerConfig.put(ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "streamA, streamB"); +``` +then `streamA` and `streamB` would start consuming from LATEST, even if they are already being tracked by the application. + +Note that you would need to reset this property afterwards, otherwise the Flink Kinesis Consumer will always be resetting those streams to LATEST. + ### Fault Tolerance for Exactly-Once User-Defined State Update Semantics With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and From 224b9ad3f6792c1019ce49fc106126ab2a9c69d0 Mon Sep 17 00:00:00 2001 From: Antonio Silva Date: Tue, 7 May 2024 12:41:42 +0100 Subject: [PATCH 04/14] fix missing initialization of set --- .../streaming/connectors/kinesis/FlinkKinesisConsumer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 75946877b..deeeec112 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -58,6 +58,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -531,6 +532,7 @@ public void initializeState(FunctionInitializationContext context) throws Except if (context.isRestored()) { if (sequenceNumsToRestore == null) { sequenceNumsToRestore = new HashMap<>(); + knownStreams = new HashSet<>(); for (Tuple2 kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) { StreamShardMetadata streamShardMetadata = kinesisSequenceNumber.f0; From f4cac88d1612d91905daf8c61e2c99f675d10b5c Mon Sep 17 00:00:00 2001 From: Antonio Silva Date: Tue, 7 May 2024 12:59:43 +0100 Subject: [PATCH 05/14] fix conditions --- .../kinesis/FlinkKinesisConsumer.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index deeeec112..c85f4adf7 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -51,7 +51,6 @@ import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -351,20 +350,30 @@ public void run(SourceContext sourceContext) throws Exception { // use the configured start position as initial state registerFromInitialPosition(fetcher, shard, kinesisStreamShard); } else { - if (knownStreams.contains(stream)) { + if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) { // if the shard was already seen and is contained in the state, // just use the sequence number stored in the state registerFromState(fetcher, shard, kinesisStreamShard); - } else if (applyStreamInitialPositionForNewStreams) { - // we're starting fresh (either for the whole consumer or for this stream); - // use the configured start position as initial state - registerFromInitialPosition(fetcher, shard, kinesisStreamShard); } else { - // the shard wasn't discovered in the previous run, therefore should be - // consumed - // from the beginning OR this is a new stream we haven't seen yet, and the - // applyStreamInitialPositionForNewStreams flag is false - registerFromBeginning(fetcher, shard, kinesisStreamShard); + // it's either a new shard for a stream that was already seen or a new stream + if (knownStreams.contains(stream)) { + // the shard wasn't discovered in the previous run, therefore should be + // consumed + // from the beginning OR this is a new stream we haven't seen yet, and the + // applyStreamInitialPositionForNewStreams flag is false + registerFromBeginning(fetcher, shard, kinesisStreamShard); + } else { + // it's a new stream + if (applyStreamInitialPositionForNewStreams) { + // the flag is true, so we respect the initial position for the new + // stream + registerFromInitialPosition(fetcher, shard, kinesisStreamShard); + } else { + // the flag is false, so we continue existing behaviour of registering + // from the beginning + registerFromBeginning(fetcher, shard, kinesisStreamShard); + } + } } } } From 1b62ab1abdcaf12029a211376f71b3e65f74a98b Mon Sep 17 00:00:00 2001 From: antsilva Date: Mon, 8 Jul 2024 09:42:25 +0100 Subject: [PATCH 06/14] Minor update to comment in javadoc --- .../connectors/kinesis/config/ConsumerConfigConstants.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index ceb539cec..c8a4dae37 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -330,9 +330,8 @@ public enum EFORegistrationType { /** * Property that can be used to ignore the restore state for a particular stream and instead use * the initial position. This is useful to reset a specific stream to consume from TRIM_HORIZON - * or LATEST if needed. Values must be passed in a comma separated list. - * - *

If a stream is in this list, it will use initial position regardless of the value of the + * or LATEST if needed. Values must be passed in a comma separated list. If a stream is in this list, + * it will use initial position regardless of the value of the * {@link #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property. */ public static final String STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO = From 1230ee093e5a17228e809eff35eb98c5def034f5 Mon Sep 17 00:00:00 2001 From: antsilva Date: Mon, 8 Jul 2024 09:51:16 +0100 Subject: [PATCH 07/14] Add more detail to javadoc comment --- .../connectors/kinesis/config/ConsumerConfigConstants.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index c8a4dae37..2e2c578dd 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -322,7 +322,11 @@ public enum EFORegistrationType { /** * Flag to configure whether {@link #STREAM_INITIAL_POSITION} should be considered for new - * streams, when the app is already consuming from other streams. + * streams, when the app is already consuming from other streams. If set to true, then any + * stream that doesn't have any shard tracked by state yet will use the initial position. + * If false (default), it is assumed that we should consume from the beginning, which is + * appropriate when you want to ensure no data is lost if the stream is already being used + * by the data producers. */ public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = "flink.stream.initpos-for-new-streams"; From 5284bb3d9a8aded0ce441b5d532dd128f17d2630 Mon Sep 17 00:00:00 2001 From: antsilva Date: Mon, 8 Jul 2024 09:52:45 +0100 Subject: [PATCH 08/14] update TODO --- .../connectors/kinesis/config/ConsumerConfigConstants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 2e2c578dd..251be4db2 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -347,7 +347,7 @@ public enum EFORegistrationType { public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); - /** False for now so that we preserve old behaviour. TODO switch to true in the next major */ + /** False for now so that we preserve old behaviour. TODO switch to true in the next major? If so update the javadoc. */ public static final boolean DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = false; public static final String DEFAULT_STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO = ""; From cc56278997d477506370567b67df29dc665b0a01 Mon Sep 17 00:00:00 2001 From: antsilva Date: Mon, 8 Jul 2024 10:59:22 +0100 Subject: [PATCH 09/14] Add initial test - not ready yet but to have a point to come back to if needed --- .../kinesis/FlinkKinesisConsumer.java | 56 +++++---- .../config/ConsumerConfigConstants.java | 2 - .../kinesis/FlinkKinesisConsumerTest.java | 115 ++++++++++++++++++ 3 files changed, 147 insertions(+), 26 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index c85f4adf7..820ccf799 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -120,7 +120,9 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction // Consumer properties // ------------------------------------------------------------------------ - /** The names of the Kinesis streams that we will be consuming from. */ + /** + * The names of the Kinesis streams that we will be consuming from. + */ private final List streams; /** @@ -129,10 +131,14 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction */ private final Properties configProps; - /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ + /** + * User supplied deserialization schema to convert Kinesis byte messages to Flink objects. + */ private final KinesisDeserializationSchema deserializer; - /** The function that determines which subtask a shard should be assigned to. */ + /** + * The function that determines which subtask a shard should be assigned to. + */ private KinesisShardAssigner shardAssigner = new DefaultShardAssignerFactory().getShardAssigner(); @@ -149,7 +155,9 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction */ private transient KinesisDataFetcher fetcher; - /** The sequence numbers to restore to upon restore from failure. */ + /** + * The sequence numbers to restore to upon restore from failure. + */ private transient HashMap sequenceNumsToRestore; @@ -175,7 +183,9 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction // State for Checkpoint // ------------------------------------------------------------------------ - /** State name to access shard sequence number states; cannot be changed. */ + /** + * State name to access shard sequence number states; cannot be changed. + */ private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State"; private transient ListState> @@ -191,11 +201,11 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction *

The AWS credentials to be used, AWS region of the Kinesis streams, initial position to * start streaming from are configured with a {@link Properties} instance. * - * @param stream The single AWS Kinesis stream to read from. + * @param stream The single AWS Kinesis stream to read from. * @param deserializer The deserializer used to convert raw bytes of Kinesis records to Java - * objects (without key). - * @param configProps The properties used to configure AWS credentials, AWS region, and initial - * starting position. + * objects (without key). + * @param configProps The properties used to configure AWS credentials, AWS region, and initial + * starting position. */ public FlinkKinesisConsumer( String stream, DeserializationSchema deserializer, Properties configProps) { @@ -208,11 +218,11 @@ public FlinkKinesisConsumer( *

The AWS credentials to be used, AWS region of the Kinesis streams, initial position to * start streaming from are configured with a {@link Properties} instance. * - * @param stream The single AWS Kinesis stream to read from. + * @param stream The single AWS Kinesis stream to read from. * @param deserializer The keyed deserializer used to convert raw bytes of Kinesis records to - * Java objects. - * @param configProps The properties used to configure AWS credentials, AWS region, and initial - * starting position. + * Java objects. + * @param configProps The properties used to configure AWS credentials, AWS region, and initial + * starting position. */ public FlinkKinesisConsumer( String stream, KinesisDeserializationSchema deserializer, Properties configProps) { @@ -225,11 +235,11 @@ public FlinkKinesisConsumer( *

The AWS credentials to be used, AWS region of the Kinesis streams, initial position to * start streaming from are configured with a {@link Properties} instance. * - * @param streams The AWS Kinesis streams to read from. + * @param streams The AWS Kinesis streams to read from. * @param deserializer The keyed deserializer used to convert raw bytes of Kinesis records to - * Java objects. - * @param configProps The properties used to configure AWS credentials, AWS region, and initial - * starting position. + * Java objects. + * @param configProps The properties used to configure AWS credentials, AWS region, and initial + * starting position. */ public FlinkKinesisConsumer( List streams, @@ -398,13 +408,11 @@ public void run(SourceContext sourceContext) throws Exception { private Set getStreamsToForceInitialPositionIn() { String streamsToForceInitialPositionInStr = - Optional.ofNullable( - configProps.getProperty( - ConsumerConfigConstants - .STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO)) - .orElse( - ConsumerConfigConstants - .DEFAULT_STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO); + configProps.getProperty(ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO); + + if (streamsToForceInitialPositionInStr == null) { + return Collections.emptySet(); + } return Arrays.stream(streamsToForceInitialPositionInStr.split(",")) .map(String::trim) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 251be4db2..efbd91281 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -350,8 +350,6 @@ public enum EFORegistrationType { /** False for now so that we preserve old behaviour. TODO switch to true in the next major? If so update the javadoc. */ public static final boolean DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = false; - public static final String DEFAULT_STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO = ""; - public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 82836dffe..7880c2da8 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -70,6 +70,7 @@ import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; +import org.jetbrains.annotations.NotNull; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.MockedStatic; @@ -95,6 +96,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -1332,4 +1334,117 @@ static void assertGlobalWatermark(long expected) { assertThat(WATERMARK.get()).isEqualTo(expected); } } + + + + /* =========================================================================== + Tests for FLINK-35299 - TODO: Rename and move where appropriate + The setup for these tests will always be the same: + - stream A with discovered shards 0 and 1 + - stream B with discovered shard 0 + + Then a few things will be discovered: + - stream C with shards 0 and 1 + - new shard (1) for stream B + ==============================================================================*/ + + /** + * Tests the default values of the properties introduced in FLINK-35299: + * - IF there is some state already + * - new streams should start from EARLIEST + * - new shards for existing streams start from EARLIEST + * - existing shards should continue from the state value + * - IF there is no state at all, new streams should start from INITIAL POSITION. + */ + @Test + @SuppressWarnings("unchecked") + public void testDefaultsWhenThereIsState() throws Exception { + Properties config = TestUtils.getStandardProperties(); + KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, new SequenceNumber("A0")); + expectedResults.put(streamShardA1, new SequenceNumber("A1")); + expectedResults.put(streamShardB0, new SequenceNumber("B0")); + expectedResults.put(streamShardB1, new SequenceNumber("EARLIEST_SEQUENCE_NUM")); + expectedResults.put(streamShardC0, new SequenceNumber("EARLIEST_SEQUENCE_NUM")); + + runAndValidate(existingState, mockedFetcher, shardsToSubscribe, config, expectedResults); + } + + private void runAndValidate(List> existingState, KinesisDataFetcher mockedFetcher, List shardsToSubscribe, Properties config, Map expectedResults) throws Exception { + TestingListState> listState = new TestingListState<>(); + listState.addAll(existingState); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shardsToSubscribe); + + List streamsToConsume = shardsToSubscribe.stream().map(StreamShardHandle::getStreamName).distinct().collect(Collectors.toList()); + FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>(streamsToConsume, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), config); + RuntimeContext context = new MockStreamingRuntimeContext(true, 2, 0); + consumer.setRuntimeContext(context); + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + consumer.initializeState(initializationContext); + + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + // check interactions with fetched + expectedResults.forEach((streamShardHandle, sequenceNumber) -> verifyRegisterNewSubscribedShard(mockedFetcher, streamShardHandle, sequenceNumber)); + + // arbitrary checkpoint to validate new state + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123)); + assertThat(listState.isClearCalled()).isTrue(); + List> list = listState.getList(); + for (Tuple2 entry : list) { + StreamShardMetadata streamShardMetadata = entry.f0; + SequenceNumber sequenceNumber = entry.f1; + + SequenceNumber expectedSequenceNumber = expectedResults.get(getStreamShard(streamShardMetadata)); + assertThat(sequenceNumber).isEqualTo(expectedSequenceNumber); + } + } + + private static void verifyRegisterNewSubscribedShard(KinesisDataFetcher mockedFetcher, StreamShardHandle streamShardHandle, SequenceNumber sequenceNumber) { + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState( + KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle), + streamShardHandle, + sequenceNumber) + ); + } + + private StreamShardHandle getStreamShard(StreamShardMetadata streamShardMetadata) { + return getStreamShard(streamShardMetadata.getStreamName(), streamShardMetadata.getShardId()); + } + + private static StreamShardHandle getStreamShard(String streamName, int shardId) { + return getStreamShard(streamName, KinesisShardIdGenerator.generateFromShardOrder(shardId)); + } + + private static StreamShardHandle getStreamShard(String streamName, String shardId) { + return new StreamShardHandle(streamName, new Shard().withShardId(shardId)); + } } From 7739ee40517a7613687f25e35abd67c4d8513462 Mon Sep 17 00:00:00 2001 From: antsilva Date: Mon, 8 Jul 2024 11:01:20 +0100 Subject: [PATCH 10/14] minor refactor to test setup --- .../connectors/kinesis/FlinkKinesisConsumerTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 7880c2da8..e2bcc8edd 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -70,7 +70,6 @@ import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; -import org.jetbrains.annotations.NotNull; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.MockedStatic; @@ -1360,7 +1359,6 @@ static void assertGlobalWatermark(long expected) { @SuppressWarnings("unchecked") public void testDefaultsWhenThereIsState() throws Exception { Properties config = TestUtils.getStandardProperties(); - KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); List> existingState = new ArrayList<>(); existingState.add(createShardState("stream-A", 0, "A0")); @@ -1386,10 +1384,15 @@ public void testDefaultsWhenThereIsState() throws Exception { expectedResults.put(streamShardB1, new SequenceNumber("EARLIEST_SEQUENCE_NUM")); expectedResults.put(streamShardC0, new SequenceNumber("EARLIEST_SEQUENCE_NUM")); - runAndValidate(existingState, mockedFetcher, shardsToSubscribe, config, expectedResults); + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); } - private void runAndValidate(List> existingState, KinesisDataFetcher mockedFetcher, List shardsToSubscribe, Properties config, Map expectedResults) throws Exception { + private void runAndValidate(Properties config, + List> existingState, + List shardsToSubscribe, + Map expectedResults) throws Exception { + KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); + TestingListState> listState = new TestingListState<>(); listState.addAll(existingState); when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shardsToSubscribe); From 1d166df082abb25b713191134c772a9504e7353f Mon Sep 17 00:00:00 2001 From: antsilva Date: Mon, 8 Jul 2024 11:33:23 +0100 Subject: [PATCH 11/14] add remaining tests --- .../kinesis/FlinkKinesisConsumer.java | 2 +- .../kinesis/FlinkKinesisConsumerTest.java | 260 +++++++++++++++++- 2 files changed, 246 insertions(+), 16 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 820ccf799..808ad78ae 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -355,7 +355,7 @@ public void run(SourceContext sourceContext) throws Exception { KinesisDataFetcher.convertToStreamShardMetadata(shard)); String stream = shard.getStreamName(); - if (sequenceNumsToRestore == null || streamsToForceInitialPositionIn.contains(stream)) { + if (sequenceNumsToRestore == null || sequenceNumsToRestore.isEmpty() || streamsToForceInitialPositionIn.contains(stream)) { // we're starting fresh (either for the whole consumer or for this stream); // use the configured start position as initial state registerFromInitialPosition(fetcher, shard, kinesisStreamShard); diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index e2bcc8edd..d5c3e694b 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; @@ -97,6 +98,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -1334,31 +1337,172 @@ static void assertGlobalWatermark(long expected) { } } - - /* =========================================================================== - Tests for FLINK-35299 - TODO: Rename and move where appropriate + Tests for FLINK-35299 The setup for these tests will always be the same: - - stream A with discovered shards 0 and 1 - - stream B with discovered shard 0 + - stream A with state for shards 0 and 1 + - stream B with state for shard 0 - Then a few things will be discovered: - - stream C with shards 0 and 1 + Then new shards will be discovered: - new shard (1) for stream B + - new shard (0) for stream C - since stream C is not in state yet, it qualifies as a "new stream". ==============================================================================*/ + /** + * Tests FLINK-35299 with the default config values: + * - IF there is no state at all, all new streams/shards should start from INITIAL POSITION. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299DefaultsWhenThereIsNoState() throws Exception { + Properties config = TestUtils.getStandardProperties(); + + List> existingState = null; + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + SequenceNumber defaultInitialPositionSeqNumber = getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardC0, defaultInitialPositionSeqNumber); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 with the {@link ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} + * flag is set to true. + * - IF there is no state at all, all new streams/shards should start from INITIAL POSITION. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299ApplyStreamInitialPositionForNewStreamsWhenThereIsNoState() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS, "true"); + + List> existingState = null; + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + SequenceNumber defaultInitialPositionSeqNumber = getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardC0, defaultInitialPositionSeqNumber); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 with the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} + * list contains some values. + * - IF there is no state at all, all new streams/shards should start from INITIAL POSITION. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299StreamsToApplyStreamInitialPositionToWhenThereIsNoState() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-A"); + + List> existingState = null; + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + SequenceNumber defaultInitialPositionSeqNumber = getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardC0, defaultInitialPositionSeqNumber); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + /** * Tests the default values of the properties introduced in FLINK-35299: * - IF there is some state already * - new streams should start from EARLIEST * - new shards for existing streams start from EARLIEST * - existing shards should continue from the state value - * - IF there is no state at all, new streams should start from INITIAL POSITION. */ @Test @SuppressWarnings("unchecked") - public void testDefaultsWhenThereIsState() throws Exception { + public void testFLINK35299DefaultsWhenThereIsState() throws Exception { + Properties config = TestUtils.getStandardProperties(); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 when the {@link ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} + * flag is set to true. In this case any NEW streams should start from the initial position configured and + * everything else should stay as it was. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299ApplyStreamInitialPositionForNewStreamsSetToTrue() throws Exception { Properties config = TestUtils.getStandardProperties(); + config.setProperty(APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS, "true"); List> existingState = new ArrayList<>(); existingState.add(createShardState("stream-A", 0, "A0")); @@ -1378,11 +1522,87 @@ public void testDefaultsWhenThereIsState() throws Exception { shardsToSubscribe.add(streamShardC0); // new stream Map expectedResults = new HashMap<>(); - expectedResults.put(streamShardA0, new SequenceNumber("A0")); - expectedResults.put(streamShardA1, new SequenceNumber("A1")); - expectedResults.put(streamShardB0, new SequenceNumber("B0")); - expectedResults.put(streamShardB1, new SequenceNumber("EARLIEST_SEQUENCE_NUM")); - expectedResults.put(streamShardC0, new SequenceNumber("EARLIEST_SEQUENCE_NUM")); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 when the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} + * flag is set to a non-null list. In this case the stream used in that list should use the initial position + * from the config instead of using the state value. Everything else should behave as before. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299StreamsToApplyStreamInitialPositionTo() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-A"); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put(streamShardA1, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 when the {@link ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag + * is set to true and the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} + * list is set to a non-null value. + */ + @SuppressWarnings("unchecked") + public void testFLINK35299BothNewPropertiesBeingUsed() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS, "true"); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-B"); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put(streamShardB0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); runAndValidate(config, existingState, shardsToSubscribe, expectedResults); } @@ -1394,7 +1614,9 @@ private void runAndValidate(Properties config, KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); TestingListState> listState = new TestingListState<>(); - listState.addAll(existingState); + if (existingState != null) { + listState.addAll(existingState); + } when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shardsToSubscribe); List streamsToConsume = shardsToSubscribe.stream().map(StreamShardHandle::getStreamName).distinct().collect(Collectors.toList()); @@ -1430,6 +1652,14 @@ private void runAndValidate(Properties config, } } + private static SequenceNumber getSequenceNumber(String seqNumber) { + return new SequenceNumber(seqNumber); + } + + private static SequenceNumber getSequenceNumber(InitialPosition initialPosition) { + return initialPosition.toSentinelSequenceNumber().get(); + } + private static void verifyRegisterNewSubscribedShard(KinesisDataFetcher mockedFetcher, StreamShardHandle streamShardHandle, SequenceNumber sequenceNumber) { Mockito.verify(mockedFetcher).registerNewSubscribedShardState( new KinesisStreamShardState( From 190941c03a4523fe8bd99626544e42ea8da42655 Mon Sep 17 00:00:00 2001 From: antsilva Date: Mon, 8 Jul 2024 11:45:57 +0100 Subject: [PATCH 12/14] Add new test, improve docs --- .../docs/connectors/datastream/kinesis.md | 13 +++++-- .../kinesis/FlinkKinesisConsumerTest.java | 38 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md index 90c63b24a..68762d555 100644 --- a/docs/content/docs/connectors/datastream/kinesis.md +++ b/docs/content/docs/connectors/datastream/kinesis.md @@ -221,8 +221,13 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME By default, the Flink Kinesis Consumer handles new streams the same way it handles a new shard for an existing stream, and it starts consuming from the earliest record (same behaviour as TRIM_HORIZON). -This is usually not what you want for new streams, where it makes more sense to respect what is defined in `ConsumerConfigConstants.STREAM_INITIAL_POSITION`. -This behaviour can now be enabled by setting the `ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to true. +This behaviour is fine if you're consuming from a stream that you don't want to lose any data from, but if you're consuming from a stream with a large retention and where it is fine to start consuming from "now", +or more generally started from that is defined in `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible before. + +This behaviour can now be enabled by setting the `ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to true, which will make ALL new streams "reset" to consume from the initial position +instead of starting from the beginning. + +If you just want to force a particular new stream to start consuming from the defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property (described below) instead. ### Resetting specific streams to the starting position @@ -240,7 +245,9 @@ consumerConfig.put(ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSIT ``` then `streamA` and `streamB` would start consuming from LATEST, even if they are already being tracked by the application. -Note that you would need to reset this property afterwards, otherwise the Flink Kinesis Consumer will always be resetting those streams to LATEST. +{{< hint warning >}} +Note that you need to remove this property after the value is reset and a savepoint is taken, otherwise the Flink Kinesis Consumer will always be resetting those streams to the configured initial position. +{{< /hint >}} ### Fault Tolerance for Exactly-Once User-Defined State Update Semantics diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index d5c3e694b..294c0303a 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1569,6 +1569,44 @@ public void testFLINK35299StreamsToApplyStreamInitialPositionTo() throws Excepti runAndValidate(config, existingState, shardsToSubscribe, expectedResults); } + /** + * Tests FLINK-35299 when the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} + * flag contains streams that are not tracked yet. + * This is an edge case of {@link #testFLINK35299StreamsToApplyStreamInitialPositionTo()}. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299StreamsToApplyStreamInitialPositionToForANewStream() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-C"); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + /** * Tests FLINK-35299 when the {@link ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag * is set to true and the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} From 194744e4647949eee57a84702575a5c972e29341 Mon Sep 17 00:00:00 2001 From: antsilva Date: Tue, 9 Jul 2024 10:59:43 +0100 Subject: [PATCH 13/14] Apply code style --- .../kinesis/FlinkKinesisConsumer.java | 52 +++-- .../config/ConsumerConfigConstants.java | 19 +- .../kinesis/FlinkKinesisConsumerTest.java | 185 ++++++++++++------ 3 files changed, 156 insertions(+), 100 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 808ad78ae..c0cb93b90 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -51,6 +51,7 @@ import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,9 +121,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction // Consumer properties // ------------------------------------------------------------------------ - /** - * The names of the Kinesis streams that we will be consuming from. - */ + /** The names of the Kinesis streams that we will be consuming from. */ private final List streams; /** @@ -131,14 +130,10 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction */ private final Properties configProps; - /** - * User supplied deserialization schema to convert Kinesis byte messages to Flink objects. - */ + /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ private final KinesisDeserializationSchema deserializer; - /** - * The function that determines which subtask a shard should be assigned to. - */ + /** The function that determines which subtask a shard should be assigned to. */ private KinesisShardAssigner shardAssigner = new DefaultShardAssignerFactory().getShardAssigner(); @@ -155,9 +150,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction */ private transient KinesisDataFetcher fetcher; - /** - * The sequence numbers to restore to upon restore from failure. - */ + /** The sequence numbers to restore to upon restore from failure. */ private transient HashMap sequenceNumsToRestore; @@ -183,9 +176,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction // State for Checkpoint // ------------------------------------------------------------------------ - /** - * State name to access shard sequence number states; cannot be changed. - */ + /** State name to access shard sequence number states; cannot be changed. */ private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State"; private transient ListState> @@ -201,11 +192,11 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction *

The AWS credentials to be used, AWS region of the Kinesis streams, initial position to * start streaming from are configured with a {@link Properties} instance. * - * @param stream The single AWS Kinesis stream to read from. + * @param stream The single AWS Kinesis stream to read from. * @param deserializer The deserializer used to convert raw bytes of Kinesis records to Java - * objects (without key). - * @param configProps The properties used to configure AWS credentials, AWS region, and initial - * starting position. + * objects (without key). + * @param configProps The properties used to configure AWS credentials, AWS region, and initial + * starting position. */ public FlinkKinesisConsumer( String stream, DeserializationSchema deserializer, Properties configProps) { @@ -218,11 +209,11 @@ public FlinkKinesisConsumer( *

The AWS credentials to be used, AWS region of the Kinesis streams, initial position to * start streaming from are configured with a {@link Properties} instance. * - * @param stream The single AWS Kinesis stream to read from. + * @param stream The single AWS Kinesis stream to read from. * @param deserializer The keyed deserializer used to convert raw bytes of Kinesis records to - * Java objects. - * @param configProps The properties used to configure AWS credentials, AWS region, and initial - * starting position. + * Java objects. + * @param configProps The properties used to configure AWS credentials, AWS region, and initial + * starting position. */ public FlinkKinesisConsumer( String stream, KinesisDeserializationSchema deserializer, Properties configProps) { @@ -235,11 +226,11 @@ public FlinkKinesisConsumer( *

The AWS credentials to be used, AWS region of the Kinesis streams, initial position to * start streaming from are configured with a {@link Properties} instance. * - * @param streams The AWS Kinesis streams to read from. + * @param streams The AWS Kinesis streams to read from. * @param deserializer The keyed deserializer used to convert raw bytes of Kinesis records to - * Java objects. - * @param configProps The properties used to configure AWS credentials, AWS region, and initial - * starting position. + * Java objects. + * @param configProps The properties used to configure AWS credentials, AWS region, and initial + * starting position. */ public FlinkKinesisConsumer( List streams, @@ -355,7 +346,9 @@ public void run(SourceContext sourceContext) throws Exception { KinesisDataFetcher.convertToStreamShardMetadata(shard)); String stream = shard.getStreamName(); - if (sequenceNumsToRestore == null || sequenceNumsToRestore.isEmpty() || streamsToForceInitialPositionIn.contains(stream)) { + if (sequenceNumsToRestore == null + || sequenceNumsToRestore.isEmpty() + || streamsToForceInitialPositionIn.contains(stream)) { // we're starting fresh (either for the whole consumer or for this stream); // use the configured start position as initial state registerFromInitialPosition(fetcher, shard, kinesisStreamShard); @@ -408,7 +401,8 @@ public void run(SourceContext sourceContext) throws Exception { private Set getStreamsToForceInitialPositionIn() { String streamsToForceInitialPositionInStr = - configProps.getProperty(ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO); + configProps.getProperty( + ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO); if (streamsToForceInitialPositionInStr == null) { return Collections.emptySet(); diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index efbd91281..e6d0fb84c 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -323,10 +323,10 @@ public enum EFORegistrationType { /** * Flag to configure whether {@link #STREAM_INITIAL_POSITION} should be considered for new * streams, when the app is already consuming from other streams. If set to true, then any - * stream that doesn't have any shard tracked by state yet will use the initial position. - * If false (default), it is assumed that we should consume from the beginning, which is - * appropriate when you want to ensure no data is lost if the stream is already being used - * by the data producers. + * stream that doesn't have any shard tracked by state yet will use the initial position. If + * false (default), it is assumed that we should consume from the beginning, which is + * appropriate when you want to ensure no data is lost if the stream is already being used by + * the data producers. */ public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = "flink.stream.initpos-for-new-streams"; @@ -334,9 +334,9 @@ public enum EFORegistrationType { /** * Property that can be used to ignore the restore state for a particular stream and instead use * the initial position. This is useful to reset a specific stream to consume from TRIM_HORIZON - * or LATEST if needed. Values must be passed in a comma separated list. If a stream is in this list, - * it will use initial position regardless of the value of the - * {@link #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property. + * or LATEST if needed. Values must be passed in a comma separated list. If a stream is in this + * list, it will use initial position regardless of the value of the {@link + * #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property. */ public static final String STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO = "flink.stream.initpos-streams"; @@ -347,7 +347,10 @@ public enum EFORegistrationType { public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); - /** False for now so that we preserve old behaviour. TODO switch to true in the next major? If so update the javadoc. */ + /** + * False for now so that we preserve old behaviour. TODO switch to true in the next major? If so + * update the javadoc. + */ public static final boolean DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = false; public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 294c0303a..e86587c5d 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1338,19 +1338,19 @@ static void assertGlobalWatermark(long expected) { } /* =========================================================================== - Tests for FLINK-35299 - The setup for these tests will always be the same: - - stream A with state for shards 0 and 1 - - stream B with state for shard 0 + Tests for FLINK-35299 + The setup for these tests will always be the same: + - stream A with state for shards 0 and 1 + - stream B with state for shard 0 - Then new shards will be discovered: - - new shard (1) for stream B - - new shard (0) for stream C - since stream C is not in state yet, it qualifies as a "new stream". - ==============================================================================*/ + Then new shards will be discovered: + - new shard (1) for stream B + - new shard (0) for stream C - since stream C is not in state yet, it qualifies as a "new stream". + ==============================================================================*/ /** - * Tests FLINK-35299 with the default config values: - * - IF there is no state at all, all new streams/shards should start from INITIAL POSITION. + * Tests FLINK-35299 with the default config values: - IF there is no state at all, all new + * streams/shards should start from INITIAL POSITION. */ @Test @SuppressWarnings("unchecked") @@ -1372,7 +1372,10 @@ public void testFLINK35299DefaultsWhenThereIsNoState() throws Exception { shardsToSubscribe.add(streamShardC0); // new stream Map expectedResults = new HashMap<>(); - SequenceNumber defaultInitialPositionSeqNumber = getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + SequenceNumber defaultInitialPositionSeqNumber = + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); @@ -1383,13 +1386,14 @@ public void testFLINK35299DefaultsWhenThereIsNoState() throws Exception { } /** - * Tests FLINK-35299 with the {@link ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} - * flag is set to true. - * - IF there is no state at all, all new streams/shards should start from INITIAL POSITION. + * Tests FLINK-35299 with the {@link + * ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag is set to true. - + * IF there is no state at all, all new streams/shards should start from INITIAL POSITION. */ @Test @SuppressWarnings("unchecked") - public void testFLINK35299ApplyStreamInitialPositionForNewStreamsWhenThereIsNoState() throws Exception { + public void testFLINK35299ApplyStreamInitialPositionForNewStreamsWhenThereIsNoState() + throws Exception { Properties config = TestUtils.getStandardProperties(); config.setProperty(APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS, "true"); @@ -1408,7 +1412,10 @@ public void testFLINK35299ApplyStreamInitialPositionForNewStreamsWhenThereIsNoSt shardsToSubscribe.add(streamShardC0); // new stream Map expectedResults = new HashMap<>(); - SequenceNumber defaultInitialPositionSeqNumber = getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + SequenceNumber defaultInitialPositionSeqNumber = + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); @@ -1419,13 +1426,15 @@ public void testFLINK35299ApplyStreamInitialPositionForNewStreamsWhenThereIsNoSt } /** - * Tests FLINK-35299 with the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} - * list contains some values. - * - IF there is no state at all, all new streams/shards should start from INITIAL POSITION. + * Tests FLINK-35299 with the {@link + * ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} list contains some + * values. - IF there is no state at all, all new streams/shards should start from INITIAL + * POSITION. */ @Test @SuppressWarnings("unchecked") - public void testFLINK35299StreamsToApplyStreamInitialPositionToWhenThereIsNoState() throws Exception { + public void testFLINK35299StreamsToApplyStreamInitialPositionToWhenThereIsNoState() + throws Exception { Properties config = TestUtils.getStandardProperties(); config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-A"); @@ -1444,7 +1453,10 @@ public void testFLINK35299StreamsToApplyStreamInitialPositionToWhenThereIsNoStat shardsToSubscribe.add(streamShardC0); // new stream Map expectedResults = new HashMap<>(); - SequenceNumber defaultInitialPositionSeqNumber = getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + SequenceNumber defaultInitialPositionSeqNumber = + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); @@ -1455,11 +1467,9 @@ public void testFLINK35299StreamsToApplyStreamInitialPositionToWhenThereIsNoStat } /** - * Tests the default values of the properties introduced in FLINK-35299: - * - IF there is some state already - * - new streams should start from EARLIEST - * - new shards for existing streams start from EARLIEST - * - existing shards should continue from the state value + * Tests the default values of the properties introduced in FLINK-35299: - IF there is some + * state already - new streams should start from EARLIEST - new shards for existing streams + * start from EARLIEST - existing shards should continue from the state value */ @Test @SuppressWarnings("unchecked") @@ -1494,9 +1504,10 @@ public void testFLINK35299DefaultsWhenThereIsState() throws Exception { } /** - * Tests FLINK-35299 when the {@link ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} - * flag is set to true. In this case any NEW streams should start from the initial position configured and - * everything else should stay as it was. + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag is set to true. + * In this case any NEW streams should start from the initial position configured and everything + * else should stay as it was. */ @Test @SuppressWarnings("unchecked") @@ -1526,15 +1537,20 @@ public void testFLINK35299ApplyStreamInitialPositionForNewStreamsSetToTrue() thr expectedResults.put(streamShardA1, getSequenceNumber("A1")); expectedResults.put(streamShardB0, getSequenceNumber("B0")); expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); - expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put( + streamShardC0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); runAndValidate(config, existingState, shardsToSubscribe, expectedResults); } /** - * Tests FLINK-35299 when the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} - * flag is set to a non-null list. In this case the stream used in that list should use the initial position - * from the config instead of using the state value. Everything else should behave as before. + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} flag is set to a + * non-null list. In this case the stream used in that list should use the initial position from + * the config instead of using the state value. Everything else should behave as before. */ @Test @SuppressWarnings("unchecked") @@ -1560,8 +1576,16 @@ public void testFLINK35299StreamsToApplyStreamInitialPositionTo() throws Excepti shardsToSubscribe.add(streamShardC0); // new stream Map expectedResults = new HashMap<>(); - expectedResults.put(streamShardA0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); - expectedResults.put(streamShardA1, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put( + streamShardA0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put( + streamShardA1, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); expectedResults.put(streamShardB0, getSequenceNumber("B0")); expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.TRIM_HORIZON)); @@ -1570,13 +1594,15 @@ public void testFLINK35299StreamsToApplyStreamInitialPositionTo() throws Excepti } /** - * Tests FLINK-35299 when the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} - * flag contains streams that are not tracked yet. - * This is an edge case of {@link #testFLINK35299StreamsToApplyStreamInitialPositionTo()}. + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} flag contains streams + * that are not tracked yet. This is an edge case of {@link + * #testFLINK35299StreamsToApplyStreamInitialPositionTo()}. */ @Test @SuppressWarnings("unchecked") - public void testFLINK35299StreamsToApplyStreamInitialPositionToForANewStream() throws Exception { + public void testFLINK35299StreamsToApplyStreamInitialPositionToForANewStream() + throws Exception { Properties config = TestUtils.getStandardProperties(); config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-C"); @@ -1602,15 +1628,20 @@ public void testFLINK35299StreamsToApplyStreamInitialPositionToForANewStream() t expectedResults.put(streamShardA1, getSequenceNumber("A1")); expectedResults.put(streamShardB0, getSequenceNumber("B0")); expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); - expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put( + streamShardC0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); runAndValidate(config, existingState, shardsToSubscribe, expectedResults); } /** - * Tests FLINK-35299 when the {@link ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag - * is set to true and the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} - * list is set to a non-null value. + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag is set to true + * and the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} list is + * set to a non-null value. */ @SuppressWarnings("unchecked") public void testFLINK35299BothNewPropertiesBeingUsed() throws Exception { @@ -1638,32 +1669,52 @@ public void testFLINK35299BothNewPropertiesBeingUsed() throws Exception { Map expectedResults = new HashMap<>(); expectedResults.put(streamShardA0, getSequenceNumber("A0")); expectedResults.put(streamShardA1, getSequenceNumber("A1")); - expectedResults.put(streamShardB0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put( + streamShardB0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); - expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.valueOf(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put( + streamShardC0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); runAndValidate(config, existingState, shardsToSubscribe, expectedResults); } - private void runAndValidate(Properties config, - List> existingState, - List shardsToSubscribe, - Map expectedResults) throws Exception { + private void runAndValidate( + Properties config, + List> existingState, + List shardsToSubscribe, + Map expectedResults) + throws Exception { KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); - TestingListState> listState = new TestingListState<>(); + TestingListState> listState = + new TestingListState<>(); if (existingState != null) { listState.addAll(existingState); } when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shardsToSubscribe); - List streamsToConsume = shardsToSubscribe.stream().map(StreamShardHandle::getStreamName).distinct().collect(Collectors.toList()); - FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>(streamsToConsume, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), config); + List streamsToConsume = + shardsToSubscribe.stream() + .map(StreamShardHandle::getStreamName) + .distinct() + .collect(Collectors.toList()); + FlinkKinesisConsumer consumer = + new FlinkKinesisConsumer<>( + streamsToConsume, + new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), + config); RuntimeContext context = new MockStreamingRuntimeContext(true, 2, 0); consumer.setRuntimeContext(context); OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))).thenReturn(listState); + when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) + .thenReturn(listState); StateInitializationContext initializationContext = mock(StateInitializationContext.class); when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); @@ -1675,7 +1726,10 @@ private void runAndValidate(Properties config, consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); // check interactions with fetched - expectedResults.forEach((streamShardHandle, sequenceNumber) -> verifyRegisterNewSubscribedShard(mockedFetcher, streamShardHandle, sequenceNumber)); + expectedResults.forEach( + (streamShardHandle, sequenceNumber) -> + verifyRegisterNewSubscribedShard( + mockedFetcher, streamShardHandle, sequenceNumber)); // arbitrary checkpoint to validate new state consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123)); @@ -1685,7 +1739,8 @@ private void runAndValidate(Properties config, StreamShardMetadata streamShardMetadata = entry.f0; SequenceNumber sequenceNumber = entry.f1; - SequenceNumber expectedSequenceNumber = expectedResults.get(getStreamShard(streamShardMetadata)); + SequenceNumber expectedSequenceNumber = + expectedResults.get(getStreamShard(streamShardMetadata)); assertThat(sequenceNumber).isEqualTo(expectedSequenceNumber); } } @@ -1698,17 +1753,21 @@ private static SequenceNumber getSequenceNumber(InitialPosition initialPosition) return initialPosition.toSentinelSequenceNumber().get(); } - private static void verifyRegisterNewSubscribedShard(KinesisDataFetcher mockedFetcher, StreamShardHandle streamShardHandle, SequenceNumber sequenceNumber) { - Mockito.verify(mockedFetcher).registerNewSubscribedShardState( - new KinesisStreamShardState( - KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle), - streamShardHandle, - sequenceNumber) - ); + private static void verifyRegisterNewSubscribedShard( + KinesisDataFetcher mockedFetcher, + StreamShardHandle streamShardHandle, + SequenceNumber sequenceNumber) { + Mockito.verify(mockedFetcher) + .registerNewSubscribedShardState( + new KinesisStreamShardState( + KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle), + streamShardHandle, + sequenceNumber)); } private StreamShardHandle getStreamShard(StreamShardMetadata streamShardMetadata) { - return getStreamShard(streamShardMetadata.getStreamName(), streamShardMetadata.getShardId()); + return getStreamShard( + streamShardMetadata.getStreamName(), streamShardMetadata.getShardId()); } private static StreamShardHandle getStreamShard(String streamName, int shardId) { From 636f1f2c1df223247ea6ff4bc17e59ff94cdd36e Mon Sep 17 00:00:00 2001 From: antsilva Date: Tue, 9 Jul 2024 11:01:09 +0100 Subject: [PATCH 14/14] minor reword to make docs clearer --- docs/content/docs/connectors/datastream/kinesis.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md index 68762d555..b5f021166 100644 --- a/docs/content/docs/connectors/datastream/kinesis.md +++ b/docs/content/docs/connectors/datastream/kinesis.md @@ -229,7 +229,7 @@ instead of starting from the beginning. If you just want to force a particular new stream to start consuming from the defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property (described below) instead. -### Resetting specific streams to the starting position +### Resetting specific streams to the configured initial position One of the features of the Flink Kinesis Consumer is that it keeps track of the offset that the application is at for each shard, so that if the application is restarted we can start consuming from that offset when restoring from snapshot.