Skip to content

Commit bde2afd

Browse files
committed
[FLINK-38531][cdc-connector-mysql]Fix data loss when restoring from a checkpoint positioned in the middle of a bulk DML operation.
1 parent 7c7a74d commit bde2afd

File tree

3 files changed

+350
-7
lines changed

3 files changed

+350
-7
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,19 @@ public int compareTo(BinlogOffset that) {
190190
// Both have GTIDs, so base the comparison entirely on the GTID sets.
191191
GtidSet gtidSet = new GtidSet(gtidSetStr);
192192
GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
193-
if (gtidSet.equals(targetGtidSet)) {
194-
long restartSkipEvents = this.getRestartSkipEvents();
195-
long targetRestartSkipEvents = that.getRestartSkipEvents();
193+
if (!gtidSet.equals(targetGtidSet)) {
194+
// The GTIDs are not an exact match, so figure out if this is a subset of the
195+
// target offset ...
196+
return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
197+
}
198+
// The GTIDs are the same, so compare the completed events in the transaction ...
199+
long restartSkipEvents = this.getRestartSkipEvents();
200+
long targetRestartSkipEvents = that.getRestartSkipEvents();
201+
if (restartSkipEvents != targetRestartSkipEvents) {
196202
return Long.compare(restartSkipEvents, targetRestartSkipEvents);
197203
}
198-
// The GTIDs are not an exact match, so figure out if this is a subset of the target
199-
// offset
200-
// ...
201-
return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
204+
// The completed events are the same, so compare the row number ...
205+
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
202206
}
203207
// The target offset did use GTIDs while this did not use GTIDs. So, we assume
204208
// that this offset is older since GTIDs are often enabled but rarely disabled.

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.junit.jupiter.api.AfterEach;
6767
import org.junit.jupiter.api.BeforeAll;
6868
import org.junit.jupiter.api.Test;
69+
import org.junit.jupiter.api.Timeout;
6970
import org.testcontainers.lifecycle.Startables;
7071

7172
import java.sql.Connection;
@@ -80,6 +81,7 @@
8081
import java.util.Map;
8182
import java.util.Optional;
8283
import java.util.Properties;
84+
import java.util.concurrent.TimeUnit;
8385
import java.util.function.Predicate;
8486
import java.util.stream.Collectors;
8587
import java.util.stream.Stream;
@@ -786,6 +788,94 @@ void testReadBinlogFromGtidSet() throws Exception {
786788
assertEqualsInOrder(Arrays.asList(expected), actual);
787789
}
788790

791+
/**
792+
* In a bad case, it will skip the rest records whitch causes infinite wait for empty data. So
793+
* it should has a timeout to avoid it.
794+
*/
795+
@Test
796+
@Timeout(value = 600, unit = TimeUnit.SECONDS)
797+
void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws Exception {
798+
// Preparations
799+
customerDatabase.createAndInitialize();
800+
MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"});
801+
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
802+
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);
803+
DataType dataType =
804+
DataTypes.ROW(
805+
DataTypes.FIELD("id", DataTypes.BIGINT()),
806+
DataTypes.FIELD("name", DataTypes.STRING()),
807+
DataTypes.FIELD("address", DataTypes.STRING()),
808+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
809+
810+
// Capture the current binlog offset, and we will start the reader from here
811+
BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection);
812+
813+
// In this case, the binlog is:
814+
// Event 0: QUERY,BEGIN
815+
// Event 1: TABLE_MAP
816+
// Event 2: Update id = 101 and id = 102
817+
// ROW 1 : Update id=101
818+
// ROW 2 : Update id=102
819+
// Event 3: TABLE_MAP
820+
// Event 4: Update id = 103 and id = 109
821+
// ROW 1 : Update id=103
822+
// ROW 2 : Update id=109
823+
824+
// When a checkpoint is triggered
825+
// after id=103 ,before id=109 ,
826+
// the position restored from checkpoint will be event=4 and row=1
827+
BinlogOffset checkpointOffset =
828+
BinlogOffset.builder()
829+
.setBinlogFilePosition("", 0)
830+
.setGtidSet(startingOffset.getGtidSet())
831+
// Because the position restored from checkpoint
832+
// will skip 4 events to drop the first update:
833+
// QUERY / TABLE_MAP / EXT_UPDATE_ROWS / TABLE_MAP
834+
.setSkipEvents(4)
835+
// The position restored from checkpoint
836+
// will skip 1 rows to drop the first
837+
.setSkipRows(1)
838+
.build();
839+
840+
// Create a new config to start reading from the offset captured above
841+
MySqlSourceConfig sourceConfig =
842+
getConfig(
843+
StartupOptions.specificOffset(checkpointOffset),
844+
new String[] {"customers"});
845+
846+
// Create reader and submit splits
847+
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
848+
BinlogSplitReader reader = createBinlogReader(sourceConfig);
849+
reader.submitSplit(split);
850+
851+
// Create some binlog events:
852+
// Event 0: QUERY,BEGIN
853+
// Event 1: TABLE_MAP
854+
// Event 2: Update id = 101 and id = 102
855+
// ROW 1 : Update id=101
856+
// ROW 2 : Update id=102
857+
// Event 3: TABLE_MAP
858+
// Event 4: Update id = 103 and id = 109
859+
// ROW 1 : Update id=103
860+
// ROW 2 : Update id=109
861+
// The event 0-3 will be dropped because skipEvents = 4.
862+
// The row 1 in event 4 will be dropped because skipRows = 1.
863+
// Only the update on 109 will be captured.
864+
updateCustomersTableInBulk(
865+
mySqlConnection, customerDatabase.qualifiedTableName("customers"));
866+
867+
// Read with binlog split reader and validate
868+
String[] expected =
869+
new String[] {
870+
"-U[109, user_4, Shanghai, 123567891234]",
871+
"+U[109, user_4, Pittsburgh, 123567891234]"
872+
};
873+
List<String> actual = readBinlogSplits(dataType, reader, expected.length);
874+
875+
reader.close();
876+
assertEqualsInOrder(Arrays.asList(expected), actual);
877+
}
878+
789879
@Test
790880
void testReadBinlogFromTimestamp() throws Exception {
791881
// Preparations
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source.offset;
19+
20+
import org.assertj.core.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
/** Unit test for {@link BinlogOffset}. */
24+
public class BinlogOffsetTest {
25+
public static final String PART_OF_GTID_SET_1 = "abcd:1-4";
26+
public static final String PART_OF_GTID_SET_2 = "efgh:1-10";
27+
public static final String FULL_GTID_SET =
28+
String.join(",", PART_OF_GTID_SET_1, PART_OF_GTID_SET_2);
29+
30+
@Test
31+
public void testCompareToWithGtidSet() {
32+
// Test same GTID sets in different orders
33+
BinlogOffset offset1 = BinlogOffset.builder().setGtidSet(FULL_GTID_SET).build();
34+
BinlogOffset offset2 =
35+
BinlogOffset.builder()
36+
.setGtidSet(String.join(",", PART_OF_GTID_SET_2, PART_OF_GTID_SET_1))
37+
.build();
38+
assetCompareTo(offset1, offset2, 0);
39+
40+
// The test uses GTID instead of position for comparison.
41+
offset1 =
42+
BinlogOffset.builder()
43+
.setGtidSet(FULL_GTID_SET)
44+
.setBinlogFilePosition("binlog.001", 123)
45+
.build();
46+
offset2 =
47+
BinlogOffset.builder()
48+
.setGtidSet(String.join(",", PART_OF_GTID_SET_2, PART_OF_GTID_SET_1))
49+
.setBinlogFilePosition("binlog.001", 456)
50+
.build();
51+
assetCompareTo(offset1, offset2, 0);
52+
53+
// Test different GTID sets where one contains another
54+
BinlogOffset offset3 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
55+
BinlogOffset offset4 =
56+
BinlogOffset.builder()
57+
.setGtidSet("abcd:1-5") // Contains offset3's GTID set
58+
.build();
59+
60+
// offset3 should be before offset4
61+
assetCompareTo(offset3, offset4, -1);
62+
assetCompareTo(offset4, offset3, 1);
63+
64+
// The test uses GTID instead of position for comparison.
65+
offset3 =
66+
BinlogOffset.builder()
67+
.setGtidSet(PART_OF_GTID_SET_1)
68+
.setBinlogFilePosition("binlog.001", 1000)
69+
.build();
70+
offset4 =
71+
BinlogOffset.builder()
72+
.setGtidSet("abcd:1-5") // Contains offset3's GTID set
73+
.setBinlogFilePosition("binlog.001", 23)
74+
.build();
75+
assetCompareTo(offset3, offset4, -1);
76+
assetCompareTo(offset4, offset3, 1);
77+
78+
// Test completely different GTID sets
79+
BinlogOffset offset5 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
80+
BinlogOffset offset6 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
81+
82+
// offsets don't contain each other, result is always 1
83+
assetCompareTo(offset5, offset6, 1);
84+
assetCompareTo(offset6, offset5, 1);
85+
}
86+
87+
@Test
88+
public void testCompareToWithGtidSetAndSkipEventsAndSkipRows() {
89+
// Test same GTID but different skip events
90+
BinlogOffset offset1 =
91+
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(5).build();
92+
BinlogOffset offset2 =
93+
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(10).build();
94+
95+
assetCompareTo(offset1, offset2, -1);
96+
assetCompareTo(offset2, offset1, 1);
97+
98+
// Test same GTID and skip events but different skip rows
99+
BinlogOffset offset3 =
100+
BinlogOffset.builder()
101+
.setGtidSet(FULL_GTID_SET)
102+
.setSkipEvents(5)
103+
.setSkipRows(10)
104+
.build();
105+
BinlogOffset offset4 =
106+
BinlogOffset.builder()
107+
.setGtidSet(FULL_GTID_SET)
108+
.setSkipEvents(5)
109+
.setSkipRows(20)
110+
.build();
111+
112+
assetCompareTo(offset3, offset4, -1);
113+
assetCompareTo(offset4, offset3, 1);
114+
}
115+
116+
@Test
117+
public void testCompareToWithGtidSetExistence() {
118+
// Test one offset has GTID set and another doesn't
119+
BinlogOffset offsetWithGtid =
120+
BinlogOffset.builder()
121+
.setGtidSet(PART_OF_GTID_SET_1)
122+
.setBinlogFilePosition("binlog.001", 123)
123+
.build();
124+
BinlogOffset offsetWithoutGtid =
125+
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 456).build();
126+
127+
// When one has GTID and another doesn't, the one without GTID is considered older
128+
assetCompareTo(offsetWithGtid, offsetWithoutGtid, 1);
129+
assetCompareTo(offsetWithoutGtid, offsetWithGtid, -1);
130+
131+
// Test the reverse scenario
132+
BinlogOffset offsetWithGtid2 =
133+
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
134+
BinlogOffset offsetWithoutGtid2 =
135+
BinlogOffset.builder()
136+
.setBinlogFilePosition("binlog.002", 789)
137+
.setSkipEvents(5)
138+
.build();
139+
140+
assetCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1);
141+
assetCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1);
142+
}
143+
144+
@Test
145+
public void testCompareToWithFilePosition() {
146+
// Test same file position - should be equal
147+
BinlogOffset offset1 =
148+
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build();
149+
BinlogOffset offset2 =
150+
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build();
151+
assetCompareTo(offset1, offset2, 0);
152+
153+
// Test different file names
154+
BinlogOffset offset3 =
155+
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build();
156+
BinlogOffset offset4 =
157+
BinlogOffset.builder().setBinlogFilePosition("binlog.002", 123).build();
158+
assetCompareTo(offset3, offset4, -1);
159+
assetCompareTo(offset4, offset3, 1);
160+
161+
// Test different positions in same file
162+
BinlogOffset offset5 =
163+
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 100).build();
164+
BinlogOffset offset6 =
165+
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 200).build();
166+
assetCompareTo(offset5, offset6, -1);
167+
assetCompareTo(offset6, offset5, 1);
168+
}
169+
170+
@Test
171+
public void testCompareToWithFilePositionAndSkipEventsAndSkipRows() {
172+
// Test with skip events
173+
BinlogOffset offset1 =
174+
BinlogOffset.builder()
175+
.setBinlogFilePosition("binlog.001", 123)
176+
.setSkipEvents(5)
177+
.build();
178+
BinlogOffset offset2 =
179+
BinlogOffset.builder()
180+
.setBinlogFilePosition("binlog.001", 123)
181+
.setSkipEvents(10)
182+
.build();
183+
assetCompareTo(offset1, offset2, -1);
184+
assetCompareTo(offset2, offset1, 1);
185+
186+
// Test with skip rows
187+
BinlogOffset offset3 =
188+
BinlogOffset.builder()
189+
.setBinlogFilePosition("binlog.001", 123)
190+
.setSkipEvents(5)
191+
.setSkipRows(10)
192+
.build();
193+
BinlogOffset offset4 =
194+
BinlogOffset.builder()
195+
.setBinlogFilePosition("binlog.001", 123)
196+
.setSkipEvents(5)
197+
.setSkipRows(20)
198+
.build();
199+
assetCompareTo(offset3, offset4, -1);
200+
assetCompareTo(offset4, offset3, 1);
201+
}
202+
203+
@Test
204+
public void testCompareToTimestampWithDifferentServerId() {
205+
// Test different server IDs with different timestamps
206+
BinlogOffset offset1 =
207+
BinlogOffset.builder()
208+
.setServerId(1L)
209+
.setTimestampSec(1000L)
210+
.setBinlogFilePosition("binlog.001", 123)
211+
.build();
212+
BinlogOffset offset2 =
213+
BinlogOffset.builder()
214+
.setServerId(2L)
215+
.setTimestampSec(2000L)
216+
.setBinlogFilePosition("binlog.001", 123)
217+
.build();
218+
219+
// Should compare based on timestamp since server IDs are different
220+
assetCompareTo(offset1, offset2, -1);
221+
assetCompareTo(offset2, offset1, 1);
222+
223+
// Test same timestamps but different server IDs
224+
BinlogOffset offset3 =
225+
BinlogOffset.builder()
226+
.setServerId(1L)
227+
.setTimestampSec(1500L)
228+
.setBinlogFilePosition("binlog.001", 432)
229+
.build();
230+
BinlogOffset offset4 =
231+
BinlogOffset.builder()
232+
.setServerId(2L)
233+
.setTimestampSec(1500L)
234+
.setBinlogFilePosition("binlog.001", 123)
235+
.build();
236+
237+
// Same timestamp, different server IDs - should compare based on timestamp (which are
238+
// equal)
239+
// But since server IDs are different and timestamps are same, it will fall through to file
240+
// position comparison
241+
// Since file positions are same, it will compare skip events (default 0)
242+
assetCompareTo(offset3, offset4, 0);
243+
}
244+
245+
private void assetCompareTo(BinlogOffset offset1, BinlogOffset offset2, int expected) {
246+
int actual = offset1.compareTo(offset2);
247+
Assertions.assertThat(expected).isEqualTo(actual);
248+
}
249+
}

0 commit comments

Comments
 (0)