Skip to content

Commit 8a1ded6

Browse files
author
Arun Lakshman
committed
[FLINK-37627][BugFix][Connectors/Kinesis] Restarting from a checkpoint/savepoint which coincides with shard split causes data loss
1 parent ca96d84 commit 8a1ded6

File tree

11 files changed

+399
-19
lines changed

11 files changed

+399
-19
lines changed

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java

+18-8
Original file line numberDiff line numberDiff line change
@@ -137,16 +137,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
137137
}
138138
}
139139

140-
private void handleFinishedSplits(int subtask, SplitsFinishedEvent splitsFinishedEvent) {
140+
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) {
141141
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
142-
splitAssignment
143-
.get(subtask)
144-
.removeIf(
145-
split ->
146-
splitsFinishedEvent
147-
.getFinishedSplitIds()
148-
.contains(split.splitId()));
142+
Set<KinesisShardSplit> splitsAssignment = splitAssignment.get(subtaskId);
143+
// during recovery, splitAssignment may return null since there might be no split assigned
144+
// to the subtask, but there might be SplitsFinishedEvent from that subtask.
145+
// We will not do child shard assignment if that is the case since that might lead to child
146+
// shards trying to get assigned before there being any readers.
147+
if (splitsAssignment == null) {
148+
LOG.info(
149+
"handleFinishedSplits called for subtask: {} which doesn't have any "
150+
+ "assigned splits right now. This might happen due to job restarts. "
151+
+ "Child shard discovery might be delayed until we have enough readers."
152+
+ "Finished split ids: {}",
153+
subtaskId,
154+
splitsFinishedEvent.getFinishedSplitIds());
155+
return;
156+
}
149157

158+
splitsAssignment.removeIf(
159+
split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId()));
150160
assignSplits();
151161
}
152162

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ public class UniformShardAssigner implements KinesisShardAssigner {
4444
public int assign(KinesisShardSplit split, Context context) {
4545
Preconditions.checkArgument(
4646
!context.getRegisteredReaders().isEmpty(),
47-
"Expected at least one registered reader. Unable to assign split.");
47+
"Expected at least one registered reader. Unable to assign split with id: %s.",
48+
split.splitId());
4849
BigInteger hashKeyStart = new BigInteger(split.getStartingHashKey());
4950
BigInteger hashKeyEnd = new BigInteger(split.getEndingHashKey());
5051
BigInteger hashKeyMid = hashKeyStart.add(hashKeyEnd).divide(TWO);

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java

+6
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ private boolean verifyAllParentSplitsAreFinished(KinesisShardSplit split) {
159159
return allParentsFinished;
160160
}
161161

162+
/**
163+
* Checks if split with specified id is finished.
164+
*
165+
* @param splitId Id of the split to check
166+
* @return true if split is finished, otherwise false
167+
*/
162168
private boolean isFinished(String splitId) {
163169
return !knownSplits.containsKey(splitId);
164170
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java

+70-2
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@
3333
import org.slf4j.LoggerFactory;
3434
import software.amazon.awssdk.services.kinesis.model.Record;
3535

36+
import java.util.ArrayList;
37+
import java.util.Collections;
3638
import java.util.HashSet;
3739
import java.util.List;
3840
import java.util.Map;
41+
import java.util.NavigableMap;
42+
import java.util.Set;
43+
import java.util.TreeMap;
3944

4045
/**
4146
* Coordinates the reading from assigned splits. Runs on the TaskManager.
@@ -49,6 +54,8 @@ public class KinesisStreamsSourceReader<T>
4954

5055
private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class);
5156
private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
57+
private final NavigableMap<Long, Set<KinesisShardSplit>> splitFinishedEvents;
58+
private long currentCheckpointId;
5259

5360
public KinesisStreamsSourceReader(
5461
SingleThreadFetcherManager<Record, KinesisShardSplit> splitFetcherManager,
@@ -58,15 +65,67 @@ public KinesisStreamsSourceReader(
5865
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
5966
super(splitFetcherManager, recordEmitter, config, context);
6067
this.shardMetricGroupMap = shardMetricGroupMap;
68+
this.splitFinishedEvents = new TreeMap<>();
69+
this.currentCheckpointId = Long.MIN_VALUE;
6170
}
6271

6372
@Override
6473
protected void onSplitFinished(Map<String, KinesisShardSplitState> finishedSplitIds) {
74+
if (finishedSplitIds.isEmpty()) {
75+
return;
76+
}
77+
splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>());
78+
finishedSplitIds.values().stream()
79+
.map(
80+
finishedSplit ->
81+
new KinesisShardSplit(
82+
finishedSplit.getStreamArn(),
83+
finishedSplit.getShardId(),
84+
finishedSplit.getNextStartingPosition(),
85+
finishedSplit.getKinesisShardSplit().getParentShardIds(),
86+
finishedSplit.getKinesisShardSplit().getStartingHashKey(),
87+
finishedSplit.getKinesisShardSplit().getEndingHashKey(),
88+
true))
89+
.forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split));
90+
6591
context.sendSourceEventToCoordinator(
6692
new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet())));
6793
finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
6894
}
6995

96+
/**
97+
* At snapshot, we also store the pending finished split ids in the current checkpoint so that
98+
* in case we have to restore the reader from state, we also send the finished split ids
99+
* otherwise we run a risk of data loss during restarts of the source because of the
100+
* SplitsFinishedEvent going missing.
101+
*
102+
* @param checkpointId the checkpoint id
103+
* @return a list of finished splits
104+
*/
105+
@Override
106+
public List<KinesisShardSplit> snapshotState(long checkpointId) {
107+
this.currentCheckpointId = checkpointId;
108+
List<KinesisShardSplit> splits = new ArrayList<>(super.snapshotState(checkpointId));
109+
110+
if (!splitFinishedEvents.isEmpty()) {
111+
// Add all finished splits to the snapshot
112+
splitFinishedEvents.values().forEach(splits::addAll);
113+
}
114+
115+
return splits;
116+
}
117+
118+
/**
119+
* During notifyCheckpointComplete, we should clean up the state of finished splits that are
120+
* less than or equal to the checkpoint id.
121+
*
122+
* @param checkpointId the checkpoint id
123+
*/
124+
@Override
125+
public void notifyCheckpointComplete(long checkpointId) {
126+
splitFinishedEvents.headMap(checkpointId, true).clear();
127+
}
128+
70129
@Override
71130
protected KinesisShardSplitState initializedState(KinesisShardSplit split) {
72131
return new KinesisShardSplitState(split);
@@ -79,8 +138,17 @@ protected KinesisShardSplit toSplitType(String splitId, KinesisShardSplitState s
79138

80139
@Override
81140
public void addSplits(List<KinesisShardSplit> splits) {
82-
splits.forEach(this::registerShardMetricGroup);
83-
super.addSplits(splits);
141+
List<KinesisShardSplit> kinesisShardSplits = new ArrayList<>();
142+
for (KinesisShardSplit split : splits) {
143+
if (split.isFinished()) {
144+
context.sendSourceEventToCoordinator(
145+
new SplitsFinishedEvent(Collections.singleton(split.splitId())));
146+
} else {
147+
kinesisShardSplits.add(split);
148+
}
149+
}
150+
kinesisShardSplits.forEach(this::registerShardMetricGroup);
151+
super.addSplits(kinesisShardSplits);
84152
}
85153

86154
@Override

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public final class KinesisShardSplit implements SourceSplit {
4444
private final Set<String> parentShardIds;
4545
private final String startingHashKey;
4646
private final String endingHashKey;
47+
private final boolean finished;
4748

4849
public KinesisShardSplit(
4950
String streamArn,
@@ -52,6 +53,24 @@ public KinesisShardSplit(
5253
Set<String> parentShardIds,
5354
String startingHashKey,
5455
String endingHashKey) {
56+
this(
57+
streamArn,
58+
shardId,
59+
startingPosition,
60+
parentShardIds,
61+
startingHashKey,
62+
endingHashKey,
63+
false);
64+
}
65+
66+
public KinesisShardSplit(
67+
String streamArn,
68+
String shardId,
69+
StartingPosition startingPosition,
70+
Set<String> parentShardIds,
71+
String startingHashKey,
72+
String endingHashKey,
73+
boolean finished) {
5574
checkNotNull(streamArn, "streamArn cannot be null");
5675
checkNotNull(shardId, "shardId cannot be null");
5776
checkNotNull(startingPosition, "startingPosition cannot be null");
@@ -65,6 +84,11 @@ public KinesisShardSplit(
6584
this.parentShardIds = new HashSet<>(parentShardIds);
6685
this.startingHashKey = startingHashKey;
6786
this.endingHashKey = endingHashKey;
87+
this.finished = finished;
88+
}
89+
90+
public boolean isFinished() {
91+
return finished;
6892
}
6993

7094
@Override
@@ -116,6 +140,8 @@ public String toString() {
116140
+ ", endingHashKey='"
117141
+ endingHashKey
118142
+ '\''
143+
+ ", finished="
144+
+ finished
119145
+ '}';
120146
}
121147

@@ -133,7 +159,8 @@ public boolean equals(Object o) {
133159
&& Objects.equals(startingPosition, that.startingPosition)
134160
&& Objects.equals(parentShardIds, that.parentShardIds)
135161
&& Objects.equals(startingHashKey, that.startingHashKey)
136-
&& Objects.equals(endingHashKey, that.endingHashKey);
162+
&& Objects.equals(endingHashKey, that.endingHashKey)
163+
&& Objects.equals(finished, that.finished);
137164
}
138165

139166
@Override
@@ -144,6 +171,7 @@ public int hashCode() {
144171
startingPosition,
145172
parentShardIds,
146173
startingHashKey,
147-
endingHashKey);
174+
endingHashKey,
175+
finished);
148176
}
149177
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java

+50-5
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@
4242
@Internal
4343
public class KinesisShardSplitSerializer implements SimpleVersionedSerializer<KinesisShardSplit> {
4444

45-
private static final int CURRENT_VERSION = 1;
46-
private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1));
45+
private static final int CURRENT_VERSION = 2;
46+
private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1, 2));
4747

4848
@Override
4949
public int getVersion() {
@@ -78,6 +78,7 @@ public byte[] serialize(KinesisShardSplit split) throws IOException {
7878
}
7979
out.writeUTF(split.getStartingHashKey());
8080
out.writeUTF(split.getEndingHashKey());
81+
out.writeBoolean(split.isFinished());
8182

8283
out.flush();
8384
return baos.toByteArray();
@@ -112,6 +113,41 @@ byte[] serializeV0(KinesisShardSplit split) throws IOException {
112113
}
113114
}
114115

116+
/** This method used only to test backwards compatibility of deserialization logic. */
117+
@VisibleForTesting
118+
byte[] serializeV1(KinesisShardSplit split) throws IOException {
119+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
120+
DataOutputStream out = new DataOutputStream(baos)) {
121+
122+
out.writeUTF(split.getStreamArn());
123+
out.writeUTF(split.getShardId());
124+
out.writeUTF(split.getStartingPosition().getShardIteratorType().toString());
125+
if (split.getStartingPosition().getStartingMarker() == null) {
126+
out.writeBoolean(false);
127+
} else {
128+
out.writeBoolean(true);
129+
Object startingMarker = split.getStartingPosition().getStartingMarker();
130+
out.writeBoolean(startingMarker instanceof Instant);
131+
if (startingMarker instanceof Instant) {
132+
out.writeLong(((Instant) startingMarker).toEpochMilli());
133+
}
134+
out.writeBoolean(startingMarker instanceof String);
135+
if (startingMarker instanceof String) {
136+
out.writeUTF((String) startingMarker);
137+
}
138+
}
139+
out.writeInt(split.getParentShardIds().size());
140+
for (String parentShardId : split.getParentShardIds()) {
141+
out.writeUTF(parentShardId);
142+
}
143+
out.writeUTF(split.getStartingHashKey());
144+
out.writeUTF(split.getEndingHashKey());
145+
146+
out.flush();
147+
return baos.toByteArray();
148+
}
149+
}
150+
115151
@Override
116152
public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOException {
117153
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
@@ -140,7 +176,8 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx
140176
}
141177

142178
Set<String> parentShardIds = new HashSet<>();
143-
if (version == CURRENT_VERSION) {
179+
// parentShardIds was added in V1
180+
if (version >= 1) {
144181
int parentShardCount = in.readInt();
145182
for (int i = 0; i < parentShardCount; i++) {
146183
parentShardIds.add(in.readUTF());
@@ -149,21 +186,29 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx
149186

150187
String startingHashKey;
151188
String endingHashKey;
152-
if (version == CURRENT_VERSION) {
189+
// startingHashKey and endingHashKey were added in V1
190+
if (version >= 1) {
153191
startingHashKey = in.readUTF();
154192
endingHashKey = in.readUTF();
155193
} else {
156194
startingHashKey = "-1";
157195
endingHashKey = "0";
158196
}
159197

198+
boolean finished = false;
199+
// isFinished was added in V2
200+
if (version >= 2) {
201+
finished = in.readBoolean();
202+
}
203+
160204
return new KinesisShardSplit(
161205
streamArn,
162206
shardId,
163207
new StartingPosition(shardIteratorType, startingMarker),
164208
parentShardIds,
165209
startingHashKey,
166-
endingHashKey);
210+
endingHashKey,
211+
finished);
167212
}
168213
}
169214
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ void testNoRegisteredReaders() {
101101
assertThatExceptionOfType(IllegalArgumentException.class)
102102
.isThrownBy(() -> assigner.assign(split, assignerContext))
103103
.withMessageContaining(
104-
"Expected at least one registered reader. Unable to assign split.");
104+
String.format(
105+
"Expected at least one registered reader. Unable to assign split with id: %s.",
106+
split.splitId()));
105107
}
106108

107109
private void createReaderWithAssignedSplits(

0 commit comments

Comments
 (0)