diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index e512d790a..159d03ff4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -38,11 +38,18 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** cdc sync tools. */ public class CdcTools { private static final List EMPTY_KEYS = Collections.singletonList(DatabaseSyncConfig.PASSWORD); + + // Regex pattern to find environment variables like $VAR or ${VAR} + private static final Pattern ENV_VAR_PATTERN = + Pattern.compile("\\$(?:([A-Za-z_][A-Za-z0-9_]*)|\\{([A-Za-z_][A-Za-z0-9_]*)\\})"); + private static StreamExecutionEnvironment flinkEnvironmentForTesting; private static JobClient jobClient; @@ -221,7 +228,30 @@ public static Map getConfigMap(MultipleParameterTool params, Str for (String param : params.getMultiParameter(key)) { String[] kv = param.split("=", 2); if (kv.length == 2) { - map.put(kv[0].trim(), kv[1].trim()); + String originalValue = kv[1].trim(); + String resolvedValue = originalValue; + + // Use pre-compiled pattern to find environment variables like $VAR or ${VAR} + Matcher matcher = ENV_VAR_PATTERN.matcher(originalValue); + StringBuffer sb = new StringBuffer(); + boolean varFound = false; + while (matcher.find()) { + varFound = true; + String varName = matcher.group(1) != null ? matcher.group(1) : matcher.group(2); + String envValue = System.getenv(varName); + if (envValue != null) { + // Replace with environment variable value + matcher.appendReplacement(sb, Matcher.quoteReplacement(envValue)); + } else { + // If environment variable is not found, keep the original placeholder + matcher.appendReplacement(sb, Matcher.quoteReplacement(matcher.group(0))); + } + } + if (varFound) { + matcher.appendTail(sb); + resolvedValue = sb.toString(); + } + map.put(kv[0].trim(), resolvedValue); continue; } else if (kv.length == 1 && EMPTY_KEYS.contains(kv[0])) { map.put(kv[0].trim(), ""); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java index e1c8c8072..531f32c96 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java @@ -31,6 +31,7 @@ import java.sql.Statement; import java.util.Arrays; import java.util.List; +import java.util.Map; public class Mysql2DorisE2ECase extends AbstractE2EService { private static final Logger LOG = LoggerFactory.getLogger(Mysql2DorisE2ECase.class); @@ -581,6 +582,117 @@ public void testMySQL2DorisMultiDatabase2OneSync() throws Exception { cancelE2EJob(jobName); } + @Test + public void testMySQL2DorisEnvVar() throws Exception { + String jobName = "testMySQL2DorisEnvVar"; + String resourcePath = "container/e2e/mysql2doris/testMySQL2DorisEnvVar.txt"; + + // Set environment variables for testing + setEnvironmentVariable("TABLE_PREFIX", "env_"); + setEnvironmentVariable("TABLE_SUFFIX", "_test"); + + try { + initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnvVar_init.sql"); + startMysql2DorisJob(jobName, resourcePath); + + // wait 2 times checkpoint + Thread.sleep(20000); + LOG.info("Start to verify create table result with environment variable resolution."); + String tblQuery = + String.format( + "SELECT TABLE_NAME \n" + + "FROM INFORMATION_SCHEMA.TABLES \n" + + "WHERE TABLE_SCHEMA = '%s'", + "test_e2e_mysql_env"); + // Verify that tables are created with env variable resolved prefix and suffix + List expectedTables = + Arrays.asList( + "env_tbl1_test", "env_tbl2_test", "env_tbl3_test", "env_tbl5_test"); + ContainerUtils.checkResult( + getDorisQueryConnection(), LOG, expectedTables, tblQuery, 1, false); + + LOG.info("Start to verify init result with environment variable resolution."); + List expected = + Arrays.asList( + "doris_env_1,1", "doris_env_2,2", "doris_env_3,3", "doris_env_5,5"); + String sql1 = + "select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test union all select * from test_e2e_mysql_env.env_tbl5_test) res order by 1"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2); + + // add incremental data to verify CDC continues working with env vars + ContainerUtils.executeSQLStatement( + getMySQLQueryConnection(), + LOG, + "insert into test_e2e_mysql_env.tbl1 values ('doris_env_1_1',10)", + "insert into test_e2e_mysql_env.tbl2 values ('doris_env_2_1',11)", + "update test_e2e_mysql_env.tbl1 set age=18 where name='doris_env_1'", + "delete from test_e2e_mysql_env.tbl2 where name='doris_env_2'"); + Thread.sleep(20000); + + LOG.info( + "Start to verify incremental data result with environment variable resolution."); + List expected2 = + Arrays.asList( + "doris_env_1,18", + "doris_env_1_1,10", + "doris_env_2_1,11", + "doris_env_3,3"); + String sql2 = + "select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test ) res order by 1"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2); + + cancelE2EJob(jobName); + } finally { + // Clean up environment variables + unsetEnvironmentVariable("TABLE_PREFIX"); + unsetEnvironmentVariable("TABLE_SUFFIX"); + } + } + + /** Set environment variable using reflection (for testing purposes only) */ + @SuppressWarnings("unchecked") + private void setEnvironmentVariable(String key, String value) { + try { + Class processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + java.lang.reflect.Field theEnvironmentField = + processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map env = (Map) theEnvironmentField.get(null); + env.put(key, value); + + java.lang.reflect.Field theCaseInsensitiveEnvironmentField = + processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map cienv = + (Map) theCaseInsensitiveEnvironmentField.get(null); + cienv.put(key, value); + } catch (Exception e) { + LOG.warn("Failed to set environment variable: " + key + "=" + value, e); + } + } + + /** Unset environment variable using reflection (for testing purposes only) */ + @SuppressWarnings("unchecked") + private void unsetEnvironmentVariable(String key) { + try { + Class processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + java.lang.reflect.Field theEnvironmentField = + processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map env = (Map) theEnvironmentField.get(null); + env.remove(key); + + java.lang.reflect.Field theCaseInsensitiveEnvironmentField = + processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map cienv = + (Map) theCaseInsensitiveEnvironmentField.get(null); + cienv.remove(key); + } catch (Exception e) { + LOG.warn("Failed to unset environment variable: " + key, e); + } + } + @After public void close() { try { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java index 41bcb6213..081edf31e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java @@ -115,4 +115,91 @@ private void assertEquals( Assert.assertTrue(valueConf.contains(value)); } } + + @Test + public void testGetConfigMapWithEnvironmentVariables() { + // Test cases for environment variable substitution. + // We assume these environment variables are NOT set in the test environment, + // so they should resolve to their placeholder strings. + + // Case 1: Simple env var placeholder + MultipleParameterTool params1 = + MultipleParameterTool.fromArgs( + new String[] {"--test-conf", "db.user=$DB_USER_UNSET"}); + Map expected1 = new HashMap<>(); + expected1.put("db.user", "$DB_USER_UNSET"); + Assert.assertEquals(expected1, CdcTools.getConfigMap(params1, "test-conf")); + + // Case 2: Env var with braces placeholder + MultipleParameterTool params2 = + MultipleParameterTool.fromArgs( + new String[] {"--test-conf", "db.pass=${DB_PASS_UNSET}"}); + Map expected2 = new HashMap<>(); + expected2.put("db.pass", "${DB_PASS_UNSET}"); + Assert.assertEquals(expected2, CdcTools.getConfigMap(params2, "test-conf")); + + // Case 3: Mix of plain string and env var placeholder + MultipleParameterTool params3 = + MultipleParameterTool.fromArgs( + new String[] { + "--test-conf", + "db.host=localhost", + "--test-conf", + "db.port=$DB_PORT_UNSET" + }); + Map expected3 = new HashMap<>(); + expected3.put("db.host", "localhost"); + expected3.put("db.port", "$DB_PORT_UNSET"); + Assert.assertEquals(expected3, CdcTools.getConfigMap(params3, "test-conf")); + + // Case 4: Env var within a string + MultipleParameterTool params4 = + MultipleParameterTool.fromArgs( + new String[] { + "--test-conf", "conn.string=jdbc:mysql://$DB_HOST_UNSET:3306/mydb" + }); + Map expected4 = new HashMap<>(); + expected4.put("conn.string", "jdbc:mysql://$DB_HOST_UNSET:3306/mydb"); + Assert.assertEquals(expected4, CdcTools.getConfigMap(params4, "test-conf")); + + // Case 5: Multiple env vars in one string + MultipleParameterTool params5 = + MultipleParameterTool.fromArgs( + new String[] { + "--test-conf", "credentials=user:$USER_UNSET,pass:$PASS_UNSET" + }); + Map expected5 = new HashMap<>(); + expected5.put("credentials", "user:$USER_UNSET,pass:$PASS_UNSET"); + Assert.assertEquals(expected5, CdcTools.getConfigMap(params5, "test-conf")); + + // Case 6: No env vars (regular behavior) + MultipleParameterTool params6 = + MultipleParameterTool.fromArgs( + new String[] {"--test-conf", "key1=value1", "--test-conf", "key2=value2"}); + Map expected6 = new HashMap<>(); + expected6.put("key1", "value1"); + expected6.put("key2", "value2"); + Assert.assertEquals(expected6, CdcTools.getConfigMap(params6, "test-conf")); + + // Case 7: Env var for a key that allows empty value (e.g. password), resolves to + // placeholder + MultipleParameterTool params7 = + MultipleParameterTool.fromArgs( + new String[] {"--test-conf", "password=$PASSWORD_UNSET"}); + Map expected7 = new HashMap<>(); + expected7.put( + "password", "$PASSWORD_UNSET"); // DatabaseSyncConfig.PASSWORD is in EMPTY_KEYS + Assert.assertEquals(expected7, CdcTools.getConfigMap(params7, "test-conf")); + + // Case 8: Env var that resolves to an empty string (if it were set to empty) + // For this test, we simulate it by having the placeholder itself, as we can't set it to + // empty easily here. + // If $EMPTY_VAR was set to "", the result for "key" would be "". + // Since it's not set, it remains "$EMPTY_VAR". + MultipleParameterTool params8 = + MultipleParameterTool.fromArgs(new String[] {"--test-conf", "key=$EMPTY_VAR"}); + Map expected8 = new HashMap<>(); + expected8.put("key", "$EMPTY_VAR"); + Assert.assertEquals(expected8, CdcTools.getConfigMap(params8, "test-conf")); + } } diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnvVar.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnvVar.txt new file mode 100644 index 000000000..1686fa3ff --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnvVar.txt @@ -0,0 +1,12 @@ +mysql-sync-database + --database test_e2e_mysql_env + --mysql-conf database-name=test_e2e_mysql_env + --table-prefix ${TABLE_PREFIX} + --table-suffix ${TABLE_SUFFIX} + --including-tables "tbl.*" + --excluding-tables "tbl4" + --sink-conf sink.ignore.update-before=false + --table-conf replication_num=1 + --single-sink true + --ignore-default-value false + --schema-change-mode sql_parser \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnvVar_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnvVar_init.sql new file mode 100644 index 000000000..fe6c19639 --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnvVar_init.sql @@ -0,0 +1,39 @@ +DROP DATABASE if EXISTS test_e2e_mysql_env; +CREATE DATABASE if NOT EXISTS test_e2e_mysql_env; +DROP TABLE IF EXISTS test_e2e_mysql_env.tbl1; +CREATE TABLE test_e2e_mysql_env.tbl1 ( + `name` varchar(256) primary key, + `age` int +); +insert into test_e2e_mysql_env.tbl1 values ('doris_env_1',1); + + +DROP TABLE IF EXISTS test_e2e_mysql_env.tbl2; +CREATE TABLE test_e2e_mysql_env.tbl2 ( + `name` varchar(256) primary key, + `age` int +); +insert into test_e2e_mysql_env.tbl2 values ('doris_env_2',2); + + +DROP TABLE IF EXISTS test_e2e_mysql_env.tbl3; +CREATE TABLE test_e2e_mysql_env.tbl3 ( + `name` varchar(256) primary key, + `age` int +); +insert into test_e2e_mysql_env.tbl3 values ('doris_env_3',3); + + +DROP TABLE IF EXISTS test_e2e_mysql_env.tbl4; +CREATE TABLE test_e2e_mysql_env.tbl4 ( + `name` varchar(256) primary key, + `age` int +); +insert into test_e2e_mysql_env.tbl4 values ('doris_env_4',4); + +DROP TABLE IF EXISTS test_e2e_mysql_env.tbl5; +CREATE TABLE test_e2e_mysql_env.tbl5 ( + `name` varchar(256) primary key, + `age` int +); +insert into test_e2e_mysql_env.tbl5 values ('doris_env_5',5); \ No newline at end of file