Skip to content

[FLINK-35299] Respect initial position for new streams #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
32 changes: 32 additions & 0 deletions docs/content/docs/connectors/datastream/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,38 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME
If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
(for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).

### Configuring starting position for new streams

By default, the Flink Kinesis Consumer handles new streams the same way it handles a new shard for an existing stream, and it starts consuming from the earliest record (same behaviour as TRIM_HORIZON).

This behaviour is fine if you're consuming from a stream that you don't want to lose any data from, but if you're consuming from a stream with a large retention and where it is fine to start consuming from "now",
or more generally started from that is defined in `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible before.

This behaviour can now be enabled by setting the `ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to true, which will make ALL new streams "reset" to consume from the initial position
instead of starting from the beginning.

If you just want to force a particular new stream to start consuming from the defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property (described below) instead.

### Resetting specific streams to the configured initial position

One of the features of the Flink Kinesis Consumer is that it keeps track of the offset that the application is at for each shard, so that if the application is restarted we can start consuming from that offset
when restoring from snapshot.

This is the ideal behaviour most of the time, but what if you want to jump to `LATEST` or go back to `TRIM_HORIZON` for a stream that is already being tracked by the Flink Kinesis Consumer?

You can now do this via the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property, which expects a comma separated list of strings referring to the names of the Kinesis Streams to reset.

For example, if you configure your application with
```
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we are able to set different streams with different INITIAL POSITION. Let's say we would add streamA, streamB and streamC as new streams, I want to have streamA and streamB to consume from LATEST and streamC from AT_TIMESTAMP. Is this possible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not possible before and is still not possible now 😅

Even though it would be possible to do that with minimal changes, I feel like that is another feature request on its own and probably doesn't belong in this one.
The main goal of this change is: allow users to say when INITIAL position should be used regardless of the stored state.

consumerConfig.put(ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "streamA, streamB");
```
then `streamA` and `streamB` would start consuming from LATEST, even if they are already being tracked by the application.

{{< hint warning >}}
Note that you need to remove this property after the value is reset and a savepoint is taken, otherwise the Flink Kinesis Consumer will always be resetting those streams to the configured initial position.
{{< /hint >}}

### Fault Tolerance for Exactly-Once User-Defined State Update Semantics

With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata.EquivalenceWrapper;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.table.DefaultShardAssignerFactory;
Expand All @@ -54,11 +55,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -148,6 +154,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
private transient HashMap<StreamShardMetadata.EquivalenceWrapper, SequenceNumber>
sequenceNumsToRestore;

/**
* The streams present in the {@link #sequenceNumsToRestore} map, which means they were consumed
* by the application previously, so we know where to consume from.
*/
private transient Set<String> knownStreams;

/**
* Flag used to control reading from Kinesis: source will read data while value is true. Changed
* to false after {@link #cancel()} has been called.
Expand Down Expand Up @@ -323,70 +335,49 @@ public void run(SourceContext<T> sourceContext) throws Exception {
// initial discovery
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();

boolean applyStreamInitialPositionForNewStreams =
getApplyStreamInitialPositionForNewStreamsFlag();

Set<String> streamsToForceInitialPositionIn = getStreamsToForceInitialPositionIn();

for (StreamShardHandle shard : allShards) {
StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
new StreamShardMetadata.EquivalenceWrapper(
KinesisDataFetcher.convertToStreamShardMetadata(shard));

if (sequenceNumsToRestore != null) {

String stream = shard.getStreamName();

if (sequenceNumsToRestore == null
|| sequenceNumsToRestore.isEmpty()
|| streamsToForceInitialPositionIn.contains(stream)) {
// we're starting fresh (either for the whole consumer or for this stream);
// use the configured start position as initial state
registerFromInitialPosition(fetcher, shard, kinesisStreamShard);
} else {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
sequenceNumsToRestore.get(kinesisStreamShard)));

if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} is seeding the fetcher with restored shard {},"
+ " starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString(),
sequenceNumsToRestore.get(kinesisStreamShard));
}
registerFromState(fetcher, shard, kinesisStreamShard);
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed
// from the beginning
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} is seeding the fetcher with new discovered shard {},"
+ " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString());
// it's either a new shard for a stream that was already seen or a new stream
if (knownStreams.contains(stream)) {
// the shard wasn't discovered in the previous run, therefore should be
// consumed
// from the beginning OR this is a new stream we haven't seen yet, and the
// applyStreamInitialPositionForNewStreams flag is false
registerFromBeginning(fetcher, shard, kinesisStreamShard);
} else {
// it's a new stream
if (applyStreamInitialPositionForNewStreams) {
// the flag is true, so we respect the initial position for the new
// stream
registerFromInitialPosition(fetcher, shard, kinesisStreamShard);
} else {
// the flag is false, so we continue existing behaviour of registering
// from the beginning
registerFromBeginning(fetcher, shard, kinesisStreamShard);
}
}
}
} else {
// we're starting fresh; use the configured start position as initial state
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(
configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants
.DEFAULT_STREAM_INITIAL_POSITION))
.toSentinelSequenceNumber();

fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
startingSeqNum.get()));

if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString(),
startingSeqNum.get());
}
}
}

Expand All @@ -408,6 +399,94 @@ public void run(SourceContext<T> sourceContext) throws Exception {
sourceContext.close();
}

private Set<String> getStreamsToForceInitialPositionIn() {
String streamsToForceInitialPositionInStr =
configProps.getProperty(
ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO);

if (streamsToForceInitialPositionInStr == null) {
return Collections.emptySet();
}

return Arrays.stream(streamsToForceInitialPositionInStr.split(","))
.map(String::trim)
.collect(Collectors.toSet());
}

private Boolean getApplyStreamInitialPositionForNewStreamsFlag() {
return Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS))
.map(Boolean::parseBoolean)
.orElse(
ConsumerConfigConstants
.DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS);
}

private void registerFromBeginning(
KinesisDataFetcher<T> fetcher,
StreamShardHandle shard,
EquivalenceWrapper kinesisStreamShard) {
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} is seeding the fetcher with new discovered shard {},"
+ " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString());
}
}

private void registerFromInitialPosition(
KinesisDataFetcher<T> fetcher,
StreamShardHandle shard,
EquivalenceWrapper kinesisStreamShard) {
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(
configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))
.toSentinelSequenceNumber();

fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));

if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString(),
startingSeqNum.get());
}
}

private void registerFromState(
KinesisDataFetcher<T> fetcher,
StreamShardHandle shard,
EquivalenceWrapper kinesisStreamShard) {
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
sequenceNumsToRestore.get(kinesisStreamShard)));

if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} is seeding the fetcher with restored shard {},"
+ " starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString(),
sequenceNumsToRestore.get(kinesisStreamShard));
}
}

@Override
public void cancel() {
running = false;
Expand Down Expand Up @@ -464,8 +543,10 @@ public void initializeState(FunctionInitializationContext context) throws Except
if (context.isRestored()) {
if (sequenceNumsToRestore == null) {
sequenceNumsToRestore = new HashMap<>();
knownStreams = new HashSet<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> kinesisSequenceNumber :
sequenceNumsStateForCheckpoint.get()) {
StreamShardMetadata streamShardMetadata = kinesisSequenceNumber.f0;
sequenceNumsToRestore.put(
// we wrap the restored metadata inside an equivalence wrapper that
// checks only stream name and shard id,
Expand All @@ -474,8 +555,9 @@ public void initializeState(FunctionInitializationContext context) throws Except
// the savepoint and has a different metadata than what we last stored,
// we will still be able to match it in sequenceNumsToRestore. Please
// see FLINK-8484 for details.
new StreamShardMetadata.EquivalenceWrapper(kinesisSequenceNumber.f0),
new StreamShardMetadata.EquivalenceWrapper(streamShardMetadata),
kinesisSequenceNumber.f1);
knownStreams.add(streamShardMetadata.getStreamName());
}

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,39 @@ public enum EFORegistrationType {
public static final String EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS =
"flink.stream.efo.http-client.read-timeout";

/**
* Flag to configure whether {@link #STREAM_INITIAL_POSITION} should be considered for new
* streams, when the app is already consuming from other streams. If set to true, then any
* stream that doesn't have any shard tracked by state yet will use the initial position. If
* false (default), it is assumed that we should consume from the beginning, which is
* appropriate when you want to ensure no data is lost if the stream is already being used by
* the data producers.
*/
public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS =
"flink.stream.initpos-for-new-streams";

/**
* Property that can be used to ignore the restore state for a particular stream and instead use
* the initial position. This is useful to reset a specific stream to consume from TRIM_HORIZON
* or LATEST if needed. Values must be passed in a comma separated list. If a stream is in this
* list, it will use initial position regardless of the value of the {@link
* #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property.
*/
public static final String STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO =
"flink.stream.initpos-streams";

// ------------------------------------------------------------------------
// Default values for consumer configuration
// ------------------------------------------------------------------------

public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();

/**
* False for now so that we preserve old behaviour. TODO switch to true in the next major? If so
* update the javadoc.
*/
public static final boolean DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = false;

public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSSXXX";

Expand Down
Loading