Skip to content

Commit 1b05a4c

Browse files
committed
Add Configuration Validation
1 parent da16552 commit 1b05a4c

File tree

3 files changed

+54
-7
lines changed

3 files changed

+54
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.mode;
20+
21+
/** Enum to Define different Modes of Sink supported in Redshift. */
22+
public enum SinkMode {
23+
JDBC,
24+
COPY
25+
}

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/sink/config/RedshiftSinkConfigConstants.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.configuration.ConfigOptions;
23+
import org.apache.flink.connector.redshift.mode.SinkMode;
2324

2425
import java.time.Duration;
2526

@@ -28,12 +29,6 @@
2829
/** Constants to be used with the RedshiftSink. */
2930
public class RedshiftSinkConfigConstants {
3031

31-
/** Enum to Define different Modes of Sink supported in Redshift. */
32-
public enum SinkMode {
33-
JDBC,
34-
COPY
35-
}
36-
3732
public static final ConfigOption<String> HOSTNAME =
3833
ConfigOptions.key("hostname")
3934
.stringType()

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/sink/config/RedshiftSinkConfigUtil.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,39 @@
1919
package org.apache.flink.connector.redshift.sink.config;
2020

2121
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.util.FlinkRuntimeException;
2223

2324
/** Utility functions to use with {@link RedshiftSinkConfigConstants}. */
2425
public class RedshiftSinkConfigUtil {
2526

2627
// private constructor to prevent initialization of utility class.
2728
private RedshiftSinkConfigUtil() {}
2829

29-
public static void validateConfigs(final Configuration sinkConfig) {}
30+
public static void validateConfigs(final Configuration sinkConfig)
31+
throws FlinkRuntimeException {
32+
switch (sinkConfig.get(RedshiftSinkConfigConstants.SINK_MODE).toString()) {
33+
case "JDBC":
34+
validateJdbcAssociatedConfigs(sinkConfig);
35+
break;
36+
case "COPY":
37+
validateCopyAssociatedConfigs(sinkConfig);
38+
break;
39+
default:
40+
throw new FlinkRuntimeException("Invalid Sink Mode");
41+
}
42+
}
43+
44+
private static void validateJdbcAssociatedConfigs(Configuration sinkConfig) {
45+
if (sinkConfig.getString(RedshiftSinkConfigConstants.HOSTNAME).trim().length() < 3
46+
|| sinkConfig.getInteger(RedshiftSinkConfigConstants.PORT) < 0) {
47+
throw new FlinkRuntimeException("Invalid configuration");
48+
}
49+
}
50+
51+
private static void validateCopyAssociatedConfigs(Configuration sinkConfig) {
52+
if (sinkConfig.getString(RedshiftSinkConfigConstants.S3_URI) == null
53+
|| sinkConfig.getString(RedshiftSinkConfigConstants.IAM_ROLE_ARN) == null) {
54+
throw new FlinkRuntimeException("Invalid Configuration");
55+
}
56+
}
3057
}

0 commit comments

Comments
 (0)