Skip to content

Commit c96ea6c

Browse files
committed
Introducing proper abstract in the code
1 parent 0c013d4 commit c96ea6c

13 files changed

+472
-238
lines changed

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
package org.apache.flink.connector.redshift.internal.connection;
1919

2020
import org.apache.flink.annotation.Internal;
21-
import org.apache.flink.configuration.Configuration;
22-
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
21+
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
2322
import org.apache.flink.util.FlinkRuntimeException;
2423

2524
import com.amazon.redshift.core.BaseConnection;
@@ -47,10 +46,10 @@ public class RedshiftJdbcConnectionProvider implements RedshiftConnectionProvide
4746

4847
private transient BaseConnection connection;
4948

50-
private final Configuration options;
49+
private final InternalRedshiftConfigOptions connectionProperties;
5150

52-
public RedshiftJdbcConnectionProvider(Configuration config) {
53-
this.options = config;
51+
public RedshiftJdbcConnectionProvider(InternalRedshiftConfigOptions configOptions) {
52+
this.connectionProperties = configOptions;
5453
}
5554

5655
@Override
@@ -59,9 +58,9 @@ public BaseConnection getConnection() {
5958
try {
6059
connection =
6160
createConnection(
62-
options.getString(RedshiftSinkConfigConstants.HOSTNAME),
63-
options.getInteger(RedshiftSinkConfigConstants.PORT),
64-
options.getString(RedshiftSinkConfigConstants.DATABASE_NAME));
61+
connectionProperties.getHostName(),
62+
connectionProperties.getPort(),
63+
connectionProperties.getDatabaseName());
6564
} catch (SQLException e) {
6665
throw new FlinkRuntimeException(e);
6766
}
@@ -81,8 +80,8 @@ private BaseConnection createConnection(String hostname, int portNumber, String
8180
throw new SQLException(e);
8281
}
8382

84-
final String username = options.getString(RedshiftSinkConfigConstants.USERNAME);
85-
final String password = options.getString(RedshiftSinkConfigConstants.PASSWORD);
83+
final String username = connectionProperties.getUserName();
84+
final String password = connectionProperties.getPassword();
8685
if (username != null
8786
&& !username.matches("")
8887
&& password != null
@@ -98,8 +97,7 @@ private BaseConnection createConnection(String hostname, int portNumber, String
9897

9998
@Override
10099
public boolean isConnectionValid() throws SQLException {
101-
return connection.isValid(
102-
(int) options.get(RedshiftSinkConfigConstants.TIMEOUT).getSeconds());
100+
return connection.isValid((int) connectionProperties.getTimeout().getSeconds());
103101
}
104102

105103
@Override
@@ -110,9 +108,9 @@ public BaseConnection getOrEstablishConnection() throws SQLException {
110108
}
111109
connection =
112110
createConnection(
113-
options.getString(RedshiftSinkConfigConstants.HOSTNAME),
114-
options.getInteger(RedshiftSinkConfigConstants.PORT),
115-
options.getString(RedshiftSinkConfigConstants.DATABASE_NAME));
111+
connectionProperties.getHostName(),
112+
connectionProperties.getPort(),
113+
connectionProperties.getDatabaseName());
116114
return connection;
117115
}
118116

@@ -135,9 +133,9 @@ public BaseConnection reestablishConnection() throws SQLException {
135133
closeConnection();
136134
connection =
137135
createConnection(
138-
options.getString(RedshiftSinkConfigConstants.HOSTNAME),
139-
options.getInteger(RedshiftSinkConfigConstants.PORT),
140-
options.getString(RedshiftSinkConfigConstants.DATABASE_NAME));
136+
connectionProperties.getHostName(),
137+
connectionProperties.getPort(),
138+
connectionProperties.getDatabaseName());
141139
return connection;
142140
}
143141
}
Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,16 @@
1818

1919
package org.apache.flink.connector.redshift.internal.executor;
2020

21-
import org.apache.flink.configuration.Configuration;
22-
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
2321
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
2422
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
2523
import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement;
2624
import org.apache.flink.connector.redshift.mode.copy.RedshiftCopyModeRowConverterImpl;
25+
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
2726
import org.apache.flink.connector.redshift.util.S3Util;
2827
import org.apache.flink.table.data.RowData;
2928
import org.apache.flink.table.types.logical.LogicalType;
3029
import org.apache.flink.util.FlinkRuntimeException;
3130

32-
import com.amazon.redshift.core.BaseConnection;
33-
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
3431
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
3532
import org.slf4j.Logger;
3633
import org.slf4j.LoggerFactory;
@@ -41,10 +38,10 @@
4138
import java.util.List;
4239

4340
/** Executor for Redshift Upload batch Operation. */
44-
public class RedshiftUploadBatchExecutor implements RedshiftExecutor {
41+
public class RedshiftCopyModeBatchExecutor implements RedshiftExecutor {
4542
private static final long serialVersionUID = 1L;
4643

47-
private static final Logger LOG = LoggerFactory.getLogger(RedshiftUploadBatchExecutor.class);
44+
private static final Logger LOG = LoggerFactory.getLogger(RedshiftCopyModeBatchExecutor.class);
4845

4946
private final int maxRetries;
5047

@@ -68,35 +65,27 @@ public class RedshiftUploadBatchExecutor implements RedshiftExecutor {
6865

6966
private transient RedshiftConnectionProvider connectionProvider;
7067

71-
public RedshiftUploadBatchExecutor(
72-
String[] fieldNames, LogicalType[] fieldTypes, Configuration options) {
73-
this.tableName = options.getString(RedshiftSinkConfigConstants.TABLE_NAME);
68+
public RedshiftCopyModeBatchExecutor(
69+
String[] fieldNames, LogicalType[] fieldTypes, InternalRedshiftConfigOptions options) {
70+
this.tableName = options.getTableName();
7471
this.fieldNames = fieldNames;
75-
this.maxRetries = options.getInteger(RedshiftSinkConfigConstants.MAX_RETIRES);
72+
this.maxRetries = options.getMaxRetries();
7673
this.csvData = new ArrayList<>();
7774
this.s3Client = S3Client.create();
7875
this.copyRowConverter = new RedshiftCopyModeRowConverterImpl(fieldTypes);
7976

80-
this.tempS3Uri =
81-
S3Util.getS3UriWithFileName(options.getString(RedshiftSinkConfigConstants.S3_URI));
82-
this.iamRoleArn = options.getString(RedshiftSinkConfigConstants.IAM_ROLE_ARN);
77+
this.tempS3Uri = S3Util.getS3UriWithFileName(options.getS3Uri());
78+
this.iamRoleArn = options.getIamRoleArn();
8379
}
8480

8581
@Override
86-
public void prepareStatement(BaseConnection connection) throws SQLException {
87-
sql = RedshiftStatement.getTableCopyStatement(tableName, tempS3Uri, fieldNames, iamRoleArn);
88-
if (connection instanceof RedshiftConnectionImpl) {
89-
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
90-
statement = (RedshiftPreparedStatement) redshiftConnection.prepareStatement(sql);
91-
}
92-
}
93-
94-
@Override
95-
public void prepareStatement(RedshiftConnectionProvider connectionProvider)
82+
public void prepareStatements(RedshiftConnectionProvider connectionProvider)
9683
throws SQLException {
97-
this.connectionProvider = connectionProvider;
84+
sql = RedshiftStatement.getTableCopyStatement(tableName, tempS3Uri, fieldNames, iamRoleArn);
9885
try {
99-
prepareStatement(connectionProvider.getOrEstablishConnection());
86+
statement =
87+
(RedshiftPreparedStatement)
88+
connectionProvider.getOrEstablishConnection().prepareStatement(sql);
10089
} catch (ClassNotFoundException e) {
10190
throw new FlinkRuntimeException(e);
10291
}
Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,15 @@
1818

1919
package org.apache.flink.connector.redshift.internal.executor;
2020

21-
import org.apache.flink.configuration.Configuration;
22-
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
2321
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
2422
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
2523
import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement;
2624
import org.apache.flink.connector.redshift.mode.copy.RedshiftCopyModeRowConverterImpl;
25+
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
2726
import org.apache.flink.connector.redshift.util.S3Util;
2827
import org.apache.flink.table.data.RowData;
2928
import org.apache.flink.table.types.logical.LogicalType;
30-
import org.apache.flink.util.FlinkRuntimeException;
3129

32-
import com.amazon.redshift.core.BaseConnection;
3330
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
3431
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
3532
import org.slf4j.Logger;
@@ -43,11 +40,11 @@
4340
import java.util.function.Function;
4441

4542
/** Executor for Redshift Upload Upsert Operation. */
46-
public class RedshiftUploadUpsertExecutor implements RedshiftExecutor {
43+
public class RedshiftCopyModeUpsertExecutor implements RedshiftExecutor {
4744

4845
private static final long serialVersionUID = 1L;
4946

50-
private static final Logger LOG = LoggerFactory.getLogger(RedshiftUploadUpsertExecutor.class);
47+
private static final Logger LOG = LoggerFactory.getLogger(RedshiftCopyModeUpsertExecutor.class);
5148

5249
private final int maxRetries;
5350

@@ -87,43 +84,43 @@ public class RedshiftUploadUpsertExecutor implements RedshiftExecutor {
8784

8885
private final transient S3Client s3Client;
8986

90-
private RedshiftConnectionProvider connectionProvider;
91-
92-
public RedshiftUploadUpsertExecutor(
87+
public RedshiftCopyModeUpsertExecutor(
9388
String[] fieldNames,
9489
String[] keyFields,
9590
LogicalType[] fieldTypes,
9691
RedshiftRowConverter deleteConverter,
9792
Function<RowData, RowData> deleteExtractor,
98-
Configuration options) {
93+
InternalRedshiftConfigOptions options) {
9994

100-
this.maxRetries = options.getInteger(RedshiftSinkConfigConstants.MAX_RETIRES);
95+
this.maxRetries = options.getMaxRetries();
10196
this.fieldNames = fieldNames;
10297
this.keyFields = keyFields;
10398
this.deleteConverter = deleteConverter;
10499
this.deleteExtractor = deleteExtractor;
105100
this.csvInsertData = new ArrayList<>();
106101
this.csvUpdateData = new ArrayList<>();
107102

108-
this.tableName = options.getString(RedshiftSinkConfigConstants.TABLE_NAME);
109-
this.iamRoleArn = options.getString(RedshiftSinkConfigConstants.IAM_ROLE_ARN);
103+
this.tableName = options.getTableName();
104+
this.iamRoleArn = options.getIamRoleArn();
110105
this.s3Client = S3Client.create();
111106
this.copyRowConverter = new RedshiftCopyModeRowConverterImpl(fieldTypes);
112107
this.stageTableName = "_" + tableName + "_stage";
113-
this.tempS3Uri =
114-
S3Util.getS3UriWithFileName(options.getString(RedshiftSinkConfigConstants.S3_URI));
108+
this.tempS3Uri = S3Util.getS3UriWithFileName(options.getS3Uri());
115109
}
116110

117111
@Override
118-
public void prepareStatement(BaseConnection connection) throws SQLException {
112+
public void prepareStatements(RedshiftConnectionProvider connectionProvider)
113+
throws SQLException {
119114
final String createTableSql =
120115
RedshiftStatement.getCreateTempTableAsStatement(tableName, stageTableName);
121116
final String insertSql =
122-
RedshiftStatement.getInsertFromStageTable(tableName, stageTableName, fieldNames);
117+
RedshiftStatement.getInsertFromStageTableStatement(
118+
tableName, stageTableName, fieldNames);
123119
deleteSql = RedshiftStatement.getDeleteStatement(tableName, keyFields);
124120
final String deleteFromStageSql =
125-
RedshiftStatement.getDeleteFromStageTable(tableName, stageTableName, keyFields);
126-
final String truncateSql = RedshiftStatement.getTruncateTable(stageTableName);
121+
RedshiftStatement.getDeleteFromStageTableStatement(
122+
tableName, stageTableName, keyFields);
123+
final String truncateSql = RedshiftStatement.getTruncateTableStatement(stageTableName);
127124
copyInsertSql =
128125
RedshiftStatement.getTableCopyStatement(
129126
tableName, tempS3Uri, fieldNames, iamRoleArn);
@@ -141,27 +138,16 @@ public void prepareStatement(BaseConnection connection) throws SQLException {
141138
deleteFromStageSql,
142139
insertSql,
143140
"END;");
144-
if (connection instanceof RedshiftConnectionImpl) {
145-
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
146-
insertStatement =
147-
(RedshiftPreparedStatement) redshiftConnection.prepareStatement(copyInsertSql);
148-
updateTrxStatement =
149-
(RedshiftPreparedStatement)
150-
redshiftConnection.prepareStatement(updateTransactionSql);
151-
deleteStatement =
152-
(RedshiftPreparedStatement) redshiftConnection.prepareStatement(deleteSql);
153-
}
154-
}
155141

156-
@Override
157-
public void prepareStatement(RedshiftConnectionProvider connectionProvider)
158-
throws SQLException {
159-
this.connectionProvider = connectionProvider;
160-
try {
161-
prepareStatement(connectionProvider.getOrEstablishConnection());
162-
} catch (ClassNotFoundException e) {
163-
throw new FlinkRuntimeException(e);
164-
}
142+
RedshiftConnectionImpl redshiftConnection =
143+
(RedshiftConnectionImpl) connectionProvider.getConnection();
144+
insertStatement =
145+
(RedshiftPreparedStatement) redshiftConnection.prepareStatement(copyInsertSql);
146+
updateTrxStatement =
147+
(RedshiftPreparedStatement)
148+
redshiftConnection.prepareStatement(updateTransactionSql);
149+
deleteStatement =
150+
(RedshiftPreparedStatement) redshiftConnection.prepareStatement(deleteSql);
165151
}
166152

167153
@Override
@@ -230,7 +216,7 @@ public void closeStatement() {
230216

231217
@Override
232218
public String getName() {
233-
return "redshift-upload-upsert-executor";
219+
return "redshift-copy-mode-upsert-executor";
234220
}
235221

236222
@Override
@@ -247,8 +233,6 @@ public String toString() {
247233
+ '\''
248234
+ ", maxRetries="
249235
+ maxRetries
250-
+ ", connectionProvider="
251-
+ connectionProvider
252236
+ '}';
253237
}
254238
}

0 commit comments

Comments
 (0)