Skip to content

Commit 6737125

Browse files
buu-nguyenTheAltoClef
authored andcommitted
✨ feat(tests): add environment variable support for MySQL to Doris E2E tests
- Introduces a new test case `testMySQL2DorisEnvVar` that validates environment variable resolution during data syncing from MySQL to Doris. - Implements methods to set and unset environment variables for testing purposes. - Adds initialization SQL and configuration files to support the new test scenario.
1 parent 785f7c8 commit 6737125

File tree

3 files changed

+162
-0
lines changed

3 files changed

+162
-0
lines changed

flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,117 @@ public void testMySQL2DorisMultiDatabase2OneSync() throws Exception {
581581
cancelE2EJob(jobName);
582582
}
583583

584+
@Test
585+
public void testMySQL2DorisEnvVar() throws Exception {
586+
String jobName = "testMySQL2DorisEnvVar";
587+
String resourcePath = "container/e2e/mysql2doris/testMySQL2DorisEnvVar.txt";
588+
589+
// Set environment variables for testing
590+
setEnvironmentVariable("TABLE_PREFIX", "env_");
591+
setEnvironmentVariable("TABLE_SUFFIX", "_test");
592+
593+
try {
594+
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnvVar_init.sql");
595+
startMysql2DorisJob(jobName, resourcePath);
596+
597+
// wait 2 times checkpoint
598+
Thread.sleep(20000);
599+
LOG.info("Start to verify create table result with environment variable resolution.");
600+
String tblQuery =
601+
String.format(
602+
"SELECT TABLE_NAME \n"
603+
+ "FROM INFORMATION_SCHEMA.TABLES \n"
604+
+ "WHERE TABLE_SCHEMA = '%s'",
605+
"test_e2e_mysql_env");
606+
// Verify that tables are created with env variable resolved prefix and suffix
607+
List<String> expectedTables =
608+
Arrays.asList(
609+
"env_tbl1_test", "env_tbl2_test", "env_tbl3_test", "env_tbl5_test");
610+
ContainerUtils.checkResult(
611+
getDorisQueryConnection(), LOG, expectedTables, tblQuery, 1, false);
612+
613+
LOG.info("Start to verify init result with environment variable resolution.");
614+
List<String> expected =
615+
Arrays.asList(
616+
"doris_env_1,1", "doris_env_2,2", "doris_env_3,3", "doris_env_5,5");
617+
String sql1 =
618+
"select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test union all select * from test_e2e_mysql_env.env_tbl5_test) res order by 1";
619+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
620+
621+
// add incremental data to verify CDC continues working with env vars
622+
ContainerUtils.executeSQLStatement(
623+
getMySQLQueryConnection(),
624+
LOG,
625+
"insert into test_e2e_mysql_env.tbl1 values ('doris_env_1_1',10)",
626+
"insert into test_e2e_mysql_env.tbl2 values ('doris_env_2_1',11)",
627+
"update test_e2e_mysql_env.tbl1 set age=18 where name='doris_env_1'",
628+
"delete from test_e2e_mysql_env.tbl2 where name='doris_env_2'");
629+
Thread.sleep(20000);
630+
631+
LOG.info(
632+
"Start to verify incremental data result with environment variable resolution.");
633+
List<String> expected2 =
634+
Arrays.asList(
635+
"doris_env_1,18",
636+
"doris_env_1_1,10",
637+
"doris_env_2_1,11",
638+
"doris_env_3,3");
639+
String sql2 =
640+
"select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test ) res order by 1";
641+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
642+
643+
cancelE2EJob(jobName);
644+
} finally {
645+
// Clean up environment variables
646+
unsetEnvironmentVariable("TABLE_PREFIX");
647+
unsetEnvironmentVariable("TABLE_SUFFIX");
648+
}
649+
}
650+
651+
/** Set environment variable using reflection (for testing purposes only) */
652+
@SuppressWarnings("unchecked")
653+
private void setEnvironmentVariable(String key, String value) {
654+
try {
655+
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
656+
java.lang.reflect.Field theEnvironmentField =
657+
processEnvironmentClass.getDeclaredField("theEnvironment");
658+
theEnvironmentField.setAccessible(true);
659+
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
660+
env.put(key, value);
661+
662+
java.lang.reflect.Field theCaseInsensitiveEnvironmentField =
663+
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
664+
theCaseInsensitiveEnvironmentField.setAccessible(true);
665+
Map<String, String> cienv =
666+
(Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
667+
cienv.put(key, value);
668+
} catch (Exception e) {
669+
LOG.warn("Failed to set environment variable: " + key + "=" + value, e);
670+
}
671+
}
672+
673+
/** Unset environment variable using reflection (for testing purposes only) */
674+
@SuppressWarnings("unchecked")
675+
private void unsetEnvironmentVariable(String key) {
676+
try {
677+
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
678+
java.lang.reflect.Field theEnvironmentField =
679+
processEnvironmentClass.getDeclaredField("theEnvironment");
680+
theEnvironmentField.setAccessible(true);
681+
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
682+
env.remove(key);
683+
684+
java.lang.reflect.Field theCaseInsensitiveEnvironmentField =
685+
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
686+
theCaseInsensitiveEnvironmentField.setAccessible(true);
687+
Map<String, String> cienv =
688+
(Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
689+
cienv.remove(key);
690+
} catch (Exception e) {
691+
LOG.warn("Failed to unset environment variable: " + key, e);
692+
}
693+
}
694+
584695
@After
585696
public void close() {
586697
try {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
mysql-sync-database
2+
--database test_e2e_mysql_env
3+
--mysql-conf database-name=test_e2e_mysql_env
4+
--table-prefix ${TABLE_PREFIX}
5+
--table-suffix ${TABLE_SUFFIX}
6+
--including-tables "tbl.*"
7+
--excluding-tables "tbl4"
8+
--sink-conf sink.ignore.update-before=false
9+
--table-conf replication_num=1
10+
--single-sink true
11+
--ignore-default-value false
12+
--schema-change-mode sql_parser
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
DROP DATABASE if EXISTS test_e2e_mysql_env;
2+
CREATE DATABASE if NOT EXISTS test_e2e_mysql_env;
3+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl1;
4+
CREATE TABLE test_e2e_mysql_env.tbl1 (
5+
`name` varchar(256) primary key,
6+
`age` int
7+
);
8+
insert into test_e2e_mysql_env.tbl1 values ('doris_env_1',1);
9+
10+
11+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl2;
12+
CREATE TABLE test_e2e_mysql_env.tbl2 (
13+
`name` varchar(256) primary key,
14+
`age` int
15+
);
16+
insert into test_e2e_mysql_env.tbl2 values ('doris_env_2',2);
17+
18+
19+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl3;
20+
CREATE TABLE test_e2e_mysql_env.tbl3 (
21+
`name` varchar(256) primary key,
22+
`age` int
23+
);
24+
insert into test_e2e_mysql_env.tbl3 values ('doris_env_3',3);
25+
26+
27+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl4;
28+
CREATE TABLE test_e2e_mysql_env.tbl4 (
29+
`name` varchar(256) primary key,
30+
`age` int
31+
);
32+
insert into test_e2e_mysql_env.tbl4 values ('doris_env_4',4);
33+
34+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl5;
35+
CREATE TABLE test_e2e_mysql_env.tbl5 (
36+
`name` varchar(256) primary key,
37+
`age` int
38+
);
39+
insert into test_e2e_mysql_env.tbl5 values ('doris_env_5',5);

0 commit comments

Comments
 (0)