Skip to content

Commit da16552

Browse files
committed
Create Redshift Connection and Utilites for sink
1 parent cc41ecc commit da16552

File tree

6 files changed

+380
-9
lines changed

6 files changed

+380
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,69 @@
1-
package org.apache.flink.connector.redshift.internal.connection;public interface JdbcConnectionProvider {
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.redshift.internal.connection;
19+
20+
import org.apache.flink.annotation.Internal;
21+
22+
import com.amazon.redshift.core.BaseConnection;
23+
24+
import javax.annotation.Nullable;
25+
26+
import java.sql.Connection;
27+
import java.sql.SQLException;
28+
29+
/** Redshift connection provider. */
30+
@Internal
31+
public interface RedshiftConnectionProvider {
32+
/**
33+
* Get existing connection.
34+
*
35+
* @return existing connection
36+
*/
37+
@Nullable
38+
BaseConnection getConnection();
39+
40+
/**
41+
* Check whether possible existing connection is valid or not through {@link
42+
* Connection#isValid(int)}.
43+
*
44+
* @return true if existing connection is valid
45+
* @throws SQLException sql exception throw from {@link Connection#isValid(int)}
46+
*/
47+
boolean isConnectionValid() throws SQLException;
48+
49+
/**
50+
* Get existing connection or establish a new one if there is none.
51+
*
52+
* @return existing connection or newly established connection
53+
* @throws SQLException sql exception
54+
* @throws ClassNotFoundException driver class not found
55+
*/
56+
BaseConnection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
57+
58+
/** Close possible existing connection. */
59+
void closeConnection();
60+
61+
/**
62+
* Close possible existing connection and establish an new one.
63+
*
64+
* @return newly established connection
65+
* @throws SQLException sql exception
66+
* @throws ClassNotFoundException driver class not found
67+
*/
68+
BaseConnection reestablishConnection() throws SQLException, ClassNotFoundException;
269
}
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,141 @@
1-
package org.apache.flink.connector.redshift.internal.connection;public class RedshiftJdbcConnectionProvider {
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.redshift.internal.connection;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.connector.redshift.sink.config.RedshiftSinkConfigConstants;
23+
import org.apache.flink.util.FlinkRuntimeException;
24+
25+
import com.amazon.redshift.core.BaseConnection;
26+
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import javax.annotation.concurrent.NotThreadSafe;
31+
32+
import java.io.Serializable;
33+
import java.sql.DriverManager;
34+
import java.sql.SQLException;
35+
36+
/** Redshift connection provider. */
37+
@Internal
38+
@NotThreadSafe
39+
public class RedshiftJdbcConnectionProvider implements RedshiftConnectionProvider, Serializable {
40+
private static final long serialVersionUID = 1L;
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(RedshiftJdbcConnectionProvider.class);
43+
44+
private static final String REDSHIFT_DRIVER_NAME = "com.amazon.redshift.Driver";
45+
46+
private static final String REDSHIFT_JDBC_IDENTIFIER = "jdbc:redshift://";
47+
48+
private transient BaseConnection connection;
49+
50+
private final Configuration options;
51+
52+
public RedshiftJdbcConnectionProvider(Configuration config) {
53+
this.options = config;
54+
}
55+
56+
@Override
57+
public BaseConnection getConnection() {
58+
if (connection == null) {
59+
try {
60+
connection =
61+
createConnection(
62+
options.getString(RedshiftSinkConfigConstants.HOSTNAME),
63+
options.getInteger(RedshiftSinkConfigConstants.PORT),
64+
options.getString(RedshiftSinkConfigConstants.DATABASE_NAME));
65+
} catch (SQLException e) {
66+
throw new FlinkRuntimeException(e);
67+
}
68+
}
69+
return connection;
70+
}
71+
72+
private BaseConnection createConnection(String hostname, int portNumber, String databaseName)
73+
throws SQLException {
74+
RedshiftConnectionImpl redshiftConnection;
75+
String url = REDSHIFT_JDBC_IDENTIFIER + hostname + ":" + portNumber + "/" + databaseName;
76+
LOG.info("connection to {}", url);
77+
78+
try {
79+
Class.forName(REDSHIFT_DRIVER_NAME);
80+
} catch (ClassNotFoundException e) {
81+
throw new SQLException(e);
82+
}
83+
84+
final String username = options.getString(RedshiftSinkConfigConstants.USERNAME);
85+
final String password = options.getString(RedshiftSinkConfigConstants.PASSWORD);
86+
if (username != null
87+
&& !username.matches("")
88+
&& password != null
89+
&& !password.matches("")) {
90+
redshiftConnection =
91+
(RedshiftConnectionImpl) DriverManager.getConnection(url, username, password);
92+
} else {
93+
redshiftConnection = (RedshiftConnectionImpl) DriverManager.getConnection(url);
94+
}
95+
96+
return redshiftConnection;
97+
}
98+
99+
@Override
100+
public boolean isConnectionValid() throws SQLException {
101+
return connection.isValid(
102+
(int) options.get(RedshiftSinkConfigConstants.TIMEOUT).getSeconds());
103+
}
104+
105+
@Override
106+
public BaseConnection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
107+
if (!connection.isClosed()) {
108+
return connection;
109+
}
110+
connection =
111+
createConnection(
112+
options.getString(RedshiftSinkConfigConstants.HOSTNAME),
113+
options.getInteger(RedshiftSinkConfigConstants.PORT),
114+
options.getString(RedshiftSinkConfigConstants.DATABASE_NAME));
115+
return connection;
116+
}
117+
118+
@Override
119+
public void closeConnection() {
120+
if (connection != null) {
121+
try {
122+
connection.close();
123+
} catch (SQLException e) {
124+
LOG.warn("Redshift connection close failed.", e);
125+
} finally {
126+
connection = null;
127+
}
128+
}
129+
}
130+
131+
@Override
132+
public BaseConnection reestablishConnection() throws SQLException, ClassNotFoundException {
133+
closeConnection();
134+
connection =
135+
createConnection(
136+
options.getString(RedshiftSinkConfigConstants.HOSTNAME),
137+
options.getInteger(RedshiftSinkConfigConstants.PORT),
138+
options.getString(RedshiftSinkConfigConstants.DATABASE_NAME));
139+
return connection;
140+
}
2141
}

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/jdbc/RedshiftJDBCModeRowConverterImpl.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,26 @@
1-
package org.apache.flink.connector.redshift.jdbc.converter;
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
218

19+
package org.apache.flink.connector.redshift.mode.jdbc;
20+
21+
import org.apache.flink.connector.redshift.converter.AbstractRedshiftRowConverter;
22+
import org.apache.flink.connector.redshift.converter.DeserializationConverter;
23+
import org.apache.flink.connector.redshift.converter.SerializationConverter;
324
import org.apache.flink.table.data.GenericRowData;
425
import org.apache.flink.table.data.RowData;
526
import org.apache.flink.table.types.logical.LogicalType;
@@ -12,7 +33,7 @@
1233
import java.sql.SQLException;
1334

1435
/** Redshift Converter Implementation. */
15-
public class RedshiftRowConverterImpl extends AbstractRedshiftRowConverter {
36+
public class RedshiftJDBCModeRowConverterImpl extends AbstractRedshiftRowConverter {
1637
private static final long serialVersionUID = 1L;
1738

1839
private final RowType rowType;
@@ -21,7 +42,7 @@ public class RedshiftRowConverterImpl extends AbstractRedshiftRowConverter {
2142

2243
private final SerializationConverter[] toExternalConverters;
2344

24-
public RedshiftRowConverterImpl(RowType rowType) {
45+
public RedshiftJDBCModeRowConverterImpl(RowType rowType) {
2546
this.rowType = Preconditions.checkNotNull(rowType);
2647
LogicalType[] logicalTypes =
2748
rowType.getFields().stream()
@@ -49,9 +70,11 @@ public RowData toInternal(ResultSet resultSet) throws SQLException {
4970
return genericRowData;
5071
}
5172

73+
RedshiftPreparedStatement insertStatement;
74+
5275
@Override
53-
public RedshiftPreparedStatement toExternal(
54-
RowData rowData, RedshiftPreparedStatement insertStatement) throws SQLException {
76+
public Object toExternal(RowData rowData) throws SQLException {
77+
5578
for (int index = 0; index < rowData.getArity(); index++) {
5679
if (!rowData.isNullAt(index)) {
5780
toExternalConverters[index].serialize(rowData, index, insertStatement);

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/sink/RedshiftSinkConfigConstants.java

Lines changed: 0 additions & 2 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.sink.config;
20+
21+
import org.apache.flink.configuration.ConfigOption;
22+
import org.apache.flink.configuration.ConfigOptions;
23+
24+
import java.time.Duration;
25+
26+
import static java.time.temporal.ChronoUnit.SECONDS;
27+
28+
/** Constants to be used with the RedshiftSink. */
29+
public class RedshiftSinkConfigConstants {
30+
31+
/** Enum to Define different Modes of Sink supported in Redshift. */
32+
public enum SinkMode {
33+
JDBC,
34+
COPY
35+
}
36+
37+
public static final ConfigOption<String> HOSTNAME =
38+
ConfigOptions.key("hostname")
39+
.stringType()
40+
.noDefaultValue()
41+
.withDescription("Hostname of Redshift Cluster");
42+
43+
public static final ConfigOption<Integer> PORT =
44+
ConfigOptions.key("port")
45+
.intType()
46+
.defaultValue(5439)
47+
.withDescription("Port of the Redshift Cluster");
48+
49+
public static final ConfigOption<String> USERNAME =
50+
ConfigOptions.key("username")
51+
.stringType()
52+
.noDefaultValue()
53+
.withDescription("Username of Redshift Cluster");
54+
55+
public static final ConfigOption<String> PASSWORD =
56+
ConfigOptions.key("password")
57+
.stringType()
58+
.noDefaultValue()
59+
.withDescription("Password of Redshift Cluster");
60+
61+
public static final ConfigOption<String> DATABASE_NAME =
62+
ConfigOptions.key("database-name")
63+
.stringType()
64+
.noDefaultValue()
65+
.withDescription("Database to which it needs to be connected");
66+
67+
public static final ConfigOption<String> TABLE_NAME =
68+
ConfigOptions.key("table-name")
69+
.stringType()
70+
.noDefaultValue()
71+
.withDescription("Table to which it needs to be connected");
72+
73+
public static final ConfigOption<Integer> BATCH_SIZE =
74+
ConfigOptions.key("sink.batch-size")
75+
.intType()
76+
.defaultValue(1000)
77+
.withDescription("Port of the Redshift Cluster");
78+
79+
public static final ConfigOption<SinkMode> SINK_MODE =
80+
ConfigOptions.key("sink.write.mode")
81+
.enumType(SinkMode.class)
82+
.defaultValue(SinkMode.JDBC)
83+
.withDescription("Mode of sink");
84+
85+
public static final ConfigOption<String> IAM_ROLE_ARN =
86+
ConfigOptions.key("aws.iam-role")
87+
.stringType()
88+
.noDefaultValue()
89+
.withDescription("IAM ROLE");
90+
91+
public static final ConfigOption<String> S3_URI =
92+
ConfigOptions.key("sink.write.temp.s3-uri")
93+
.stringType()
94+
.noDefaultValue()
95+
.withDescription("Temporary S3 URI to store data");
96+
97+
public static final ConfigOption<Integer> MAX_RETIRES =
98+
ConfigOptions.key("sink.max.retries")
99+
.intType()
100+
.defaultValue(2)
101+
.withDescription("Maximum number of Retries in case of Failure");
102+
103+
public static final ConfigOption<Duration> FLUSH_INTERVAL =
104+
ConfigOptions.key("sink.flush.interval")
105+
.durationType()
106+
.defaultValue(Duration.of(10, SECONDS))
107+
.withDescription("Maximum number of Retries in case of Failure");
108+
109+
public static final ConfigOption<Duration> TIMEOUT =
110+
ConfigOptions.key("sink.connection.timeout")
111+
.durationType()
112+
.defaultValue(Duration.ofMinutes(5))
113+
.withDescription("Duration of Connection Timeout (in minutes)");
114+
}

0 commit comments

Comments
 (0)