Skip to content

Commit 0c013d4

Browse files
committed
Final Redshift Connector Working Change
1 parent 4047b55 commit 0c013d4

21 files changed

+664
-252
lines changed

flink-connector-aws/flink-connector-redshift/pom.xml

+10-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@
2727
<scope>provided</scope>
2828
</dependency>
2929

30+
<dependency>
31+
<groupId>software.amazon.awssdk</groupId>
32+
<artifactId>s3</artifactId>
33+
<version>${aws.sdkv2.version}</version>
34+
</dependency>
35+
36+
3037
<dependency>
3138
<groupId>org.apache.commons</groupId>
3239
<artifactId>commons-csv</artifactId>
@@ -43,8 +50,6 @@
4350
<dependency>
4451
<groupId>org.apache.flink</groupId>
4552
<artifactId>flink-table-common</artifactId>
46-
<version>${flink.version}</version>
47-
<scope>provided</scope>
4853
</dependency>
4954

5055
</dependencies>
@@ -53,17 +58,16 @@
5358
<plugins>
5459
<plugin>
5560
<groupId>org.apache.maven.plugins</groupId>
56-
<artifactId>maven-shade-plugin</artifactId>
61+
<artifactId>maven-jar-plugin</artifactId>
5762
<executions>
5863
<execution>
59-
<id>shade-flink</id>
60-
<phase>package</phase>
6164
<goals>
62-
<goal>shade</goal>
65+
<goal>test-jar</goal>
6366
</goals>
6467
</execution>
6568
</executions>
6669
</plugin>
6770
</plugins>
6871
</build>
72+
6973
</project>

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigConstants.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -33,49 +33,51 @@ public class RedshiftSinkConfigConstants {
3333
ConfigOptions.key("hostname")
3434
.stringType()
3535
.noDefaultValue()
36-
.withDescription("Hostname of Redshift Cluster");
36+
.withDescription("Redshift Cluster's Hostname");
3737

3838
public static final ConfigOption<Integer> PORT =
3939
ConfigOptions.key("port")
4040
.intType()
4141
.defaultValue(5439)
42-
.withDescription("Port of the Redshift Cluster");
42+
.withDescription(
43+
"Redshift's cluster Port. By default, AWS console set value to 5439 for redshift cluster.");
4344

4445
public static final ConfigOption<String> USERNAME =
4546
ConfigOptions.key("username")
4647
.stringType()
4748
.noDefaultValue()
48-
.withDescription("Username of Redshift Cluster");
49+
.withDescription("username for Redshift Cluster connection.");
4950

5051
public static final ConfigOption<String> PASSWORD =
5152
ConfigOptions.key("password")
5253
.stringType()
5354
.noDefaultValue()
54-
.withDescription("Password of Redshift Cluster");
55+
.withDescription("Password of Redshift Cluster associated with username.");
5556

5657
public static final ConfigOption<String> DATABASE_NAME =
5758
ConfigOptions.key("database-name")
5859
.stringType()
5960
.noDefaultValue()
60-
.withDescription("Database to which it needs to be connected");
61+
.withDescription("Name of Database to which connector is intended to connect.");
6162

6263
public static final ConfigOption<String> TABLE_NAME =
6364
ConfigOptions.key("table-name")
6465
.stringType()
6566
.noDefaultValue()
66-
.withDescription("Table to which it needs to be connected");
67+
.withDescription("Table Name to which sink/source to be setup.");
6768

6869
public static final ConfigOption<Integer> BATCH_SIZE =
6970
ConfigOptions.key("sink.batch-size")
7071
.intType()
7172
.defaultValue(1000)
72-
.withDescription("Port of the Redshift Cluster");
73+
.withDescription(
74+
"Sink Property. Batch size to be added as while writing to redshift");
7375

7476
public static final ConfigOption<SinkMode> SINK_MODE =
7577
ConfigOptions.key("sink.write.mode")
7678
.enumType(SinkMode.class)
7779
.defaultValue(SinkMode.JDBC)
78-
.withDescription("Mode of sink");
80+
.withDescription("Mode of sink. Currently it only supports JDBC / COPY ");
7981

8082
public static final ConfigOption<String> IAM_ROLE_ARN =
8183
ConfigOptions.key("aws.iam-role")
@@ -87,19 +89,20 @@ public class RedshiftSinkConfigConstants {
8789
ConfigOptions.key("sink.write.temp.s3-uri")
8890
.stringType()
8991
.noDefaultValue()
90-
.withDescription("Temporary S3 URI to store data");
92+
.withDescription(
93+
"Temporary S3 URI to store data. In COPY mode, Redshift architecture uses s3 to store intermittent data. Reference :https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html");
9194

9295
public static final ConfigOption<Integer> MAX_RETIRES =
9396
ConfigOptions.key("sink.max.retries")
9497
.intType()
9598
.defaultValue(2)
96-
.withDescription("Maximum number of Retries in case of Failure");
99+
.withDescription("Maximum number of Retries");
97100

98101
public static final ConfigOption<Duration> FLUSH_INTERVAL =
99102
ConfigOptions.key("sink.flush.interval")
100103
.durationType()
101104
.defaultValue(Duration.of(10, SECONDS))
102-
.withDescription("Maximum number of Retries in case of Failure");
105+
.withDescription("Flush Interval for Sink");
103106

104107
public static final ConfigOption<Duration> TIMEOUT =
105108
ConfigOptions.key("sink.connection.timeout")

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigUtil.java

-59
This file was deleted.

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftConnectionProvider.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@
2626
import java.sql.Connection;
2727
import java.sql.SQLException;
2828

29-
/** Redshift connection provider. */
29+
/**
30+
* Redshift connection provider Interface. Redshift can be configured using both JDBC and ODBC.
31+
* Reference : <a
32+
* href="https://docs.aws.amazon.com/redshift/latest/mgmt/configuring-connections.html">Redshift
33+
* Connection Configuration</a>.
34+
*/
3035
@Internal
3136
public interface RedshiftConnectionProvider {
37+
3238
/**
3339
* Get existing connection.
3440
*

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import java.sql.DriverManager;
3434
import java.sql.SQLException;
3535

36-
/** Redshift connection provider. */
36+
/** Redshift JDBC connection provider. */
3737
@Internal
3838
@NotThreadSafe
3939
public class RedshiftJdbcConnectionProvider implements RedshiftConnectionProvider, Serializable {
@@ -103,8 +103,9 @@ public boolean isConnectionValid() throws SQLException {
103103
}
104104

105105
@Override
106-
public BaseConnection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
107-
if (!connection.isClosed()) {
106+
public BaseConnection getOrEstablishConnection() throws SQLException {
107+
108+
if (connection != null && !connection.isClosed()) {
108109
return connection;
109110
}
110111
connection =
@@ -121,15 +122,16 @@ public void closeConnection() {
121122
try {
122123
connection.close();
123124
} catch (SQLException e) {
124-
LOG.warn("Redshift connection close failed.", e);
125+
LOG.warn("Redshift connection failed to close.", e);
125126
} finally {
126127
connection = null;
127128
}
128129
}
130+
LOG.info("Redshift Connection is already closed.");
129131
}
130132

131133
@Override
132-
public BaseConnection reestablishConnection() throws SQLException, ClassNotFoundException {
134+
public BaseConnection reestablishConnection() throws SQLException {
133135
closeConnection();
134136
connection =
135137
createConnection(

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftBatchExecutor.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818

1919
package org.apache.flink.connector.redshift.internal.executor;
2020

21-
import org.apache.flink.api.common.functions.RuntimeContext;
2221
import org.apache.flink.configuration.Configuration;
2322
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
2423
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
2524
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
2625
import org.apache.flink.table.data.RowData;
2726
import org.apache.flink.util.FlinkRuntimeException;
27+
import org.apache.flink.util.Preconditions;
2828

29-
import com.amazon.redshift.RedshiftConnection;
29+
import com.amazon.redshift.core.BaseConnection;
3030
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
3131
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
3232
import org.slf4j.Logger;
@@ -59,16 +59,19 @@ public RedshiftBatchExecutor(
5959
}
6060

6161
@Override
62-
public void prepareStatement(RedshiftConnection connection) throws SQLException {
62+
public void prepareStatement(BaseConnection connection) throws SQLException {
6363
if (connection instanceof RedshiftConnectionImpl) {
6464
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
6565
statement = (RedshiftPreparedStatement) redshiftConnection.prepareStatement(sql);
6666
}
67+
LOG.warn(
68+
"Unable to create Redshift Statement for Execution. File a JIRA in case of issue.");
6769
}
6870

6971
@Override
7072
public void prepareStatement(RedshiftConnectionProvider connectionProvider)
7173
throws SQLException {
74+
Preconditions.checkNotNull(connectionProvider, "Connection Provider cannot be Null");
7275
this.connectionProvider = connectionProvider;
7376
try {
7477
prepareStatement(connectionProvider.getOrEstablishConnection());
@@ -77,9 +80,6 @@ public void prepareStatement(RedshiftConnectionProvider connectionProvider)
7780
}
7881
}
7982

80-
@Override
81-
public void setRuntimeContext(RuntimeContext context) {}
82-
8383
@Override
8484
public void addToBatch(RowData record) throws SQLException {
8585
switch (record.getRowKind()) {
@@ -117,6 +117,11 @@ public void closeStatement() {
117117
}
118118
}
119119

120+
@Override
121+
public String getName() {
122+
return "redshift-batch-executor";
123+
}
124+
120125
@Override
121126
public String toString() {
122127
return "RedshiftBatchExecutor{"

0 commit comments

Comments
 (0)