Skip to content

Commit 4047b55

Browse files
committed
Add Redshift Executor
1 parent 1b05a4c commit 4047b55

18 files changed

+2304
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.connector.redshift.sink.config;
19+
package org.apache.flink.connector.redshift.config;
2020

2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.configuration.ConfigOptions;
@@ -106,4 +106,6 @@ public class RedshiftSinkConfigConstants {
106106
.durationType()
107107
.defaultValue(Duration.ofMinutes(5))
108108
.withDescription("Duration of Connection Timeout (in minutes)");
109+
110+
private RedshiftSinkConfigConstants() {}
109111
}
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.connector.redshift.sink.config;
19+
package org.apache.flink.connector.redshift.config;
2020

2121
import org.apache.flink.configuration.Configuration;
2222
import org.apache.flink.util.FlinkRuntimeException;
@@ -44,14 +44,16 @@ public static void validateConfigs(final Configuration sinkConfig)
4444
private static void validateJdbcAssociatedConfigs(Configuration sinkConfig) {
4545
if (sinkConfig.getString(RedshiftSinkConfigConstants.HOSTNAME).trim().length() < 3
4646
|| sinkConfig.getInteger(RedshiftSinkConfigConstants.PORT) < 0) {
47-
throw new FlinkRuntimeException("Invalid configuration");
47+
throw new FlinkRuntimeException(
48+
"Invalid configuration. Provide hostname and port is necessary.");
4849
}
4950
}
5051

5152
private static void validateCopyAssociatedConfigs(Configuration sinkConfig) {
5253
if (sinkConfig.getString(RedshiftSinkConfigConstants.S3_URI) == null
5354
|| sinkConfig.getString(RedshiftSinkConfigConstants.IAM_ROLE_ARN) == null) {
54-
throw new FlinkRuntimeException("Invalid Configuration");
55+
throw new FlinkRuntimeException(
56+
"Invalid Configuration. For COPY Mode s3 temporary path and iam role arn is necessary");
5557
}
5658
}
5759
}

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.configuration.Configuration;
22-
import org.apache.flink.connector.redshift.sink.config.RedshiftSinkConfigConstants;
22+
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
2323
import org.apache.flink.util.FlinkRuntimeException;
2424

2525
import com.amazon.redshift.core.BaseConnection;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.internal.executor;
20+
21+
import org.apache.flink.api.common.functions.RuntimeContext;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
24+
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
25+
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
26+
import org.apache.flink.table.data.RowData;
27+
import org.apache.flink.util.FlinkRuntimeException;
28+
29+
import com.amazon.redshift.RedshiftConnection;
30+
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
31+
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.sql.SQLException;
36+
37+
/** Executor for Redshift Batch Executor Operation. */
38+
public class RedshiftBatchExecutor implements RedshiftExecutor {
39+
40+
private static final long serialVersionUID = 1L;
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(RedshiftBatchExecutor.class);
43+
44+
private final String sql;
45+
46+
private final RedshiftRowConverter converter;
47+
48+
private final int maxRetries;
49+
50+
private transient RedshiftPreparedStatement statement;
51+
52+
private transient RedshiftConnectionProvider connectionProvider;
53+
54+
public RedshiftBatchExecutor(
55+
String sql, RedshiftRowConverter converter, Configuration options) {
56+
this.sql = sql;
57+
this.converter = converter;
58+
this.maxRetries = options.getInteger(RedshiftSinkConfigConstants.MAX_RETIRES);
59+
}
60+
61+
@Override
62+
public void prepareStatement(RedshiftConnection connection) throws SQLException {
63+
if (connection instanceof RedshiftConnectionImpl) {
64+
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
65+
statement = (RedshiftPreparedStatement) redshiftConnection.prepareStatement(sql);
66+
}
67+
}
68+
69+
@Override
70+
public void prepareStatement(RedshiftConnectionProvider connectionProvider)
71+
throws SQLException {
72+
this.connectionProvider = connectionProvider;
73+
try {
74+
prepareStatement(connectionProvider.getOrEstablishConnection());
75+
} catch (ClassNotFoundException e) {
76+
throw new FlinkRuntimeException(e);
77+
}
78+
}
79+
80+
@Override
81+
public void setRuntimeContext(RuntimeContext context) {}
82+
83+
@Override
84+
public void addToBatch(RowData record) throws SQLException {
85+
switch (record.getRowKind()) {
86+
case INSERT:
87+
converter.toExternal(record);
88+
statement.addBatch();
89+
break;
90+
case UPDATE_AFTER:
91+
case DELETE:
92+
case UPDATE_BEFORE:
93+
break;
94+
default:
95+
throw new UnsupportedOperationException(
96+
String.format(
97+
"Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE, but get: %s.",
98+
record.getRowKind()));
99+
}
100+
}
101+
102+
@Override
103+
public void executeBatch() throws SQLException {
104+
attemptExecuteBatch(statement, maxRetries);
105+
}
106+
107+
@Override
108+
public void closeStatement() {
109+
if (statement != null) {
110+
try {
111+
statement.close();
112+
} catch (SQLException exception) {
113+
LOG.warn("Redshift batch statement could not be closed.", exception);
114+
} finally {
115+
statement = null;
116+
}
117+
}
118+
}
119+
120+
@Override
121+
public String toString() {
122+
return "RedshiftBatchExecutor{"
123+
+ "SQL='"
124+
+ sql
125+
+ '\''
126+
+ ", maxRetries="
127+
+ maxRetries
128+
+ ", connectionProvider="
129+
+ connectionProvider
130+
+ '}';
131+
}
132+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.internal.executor;
20+
21+
import org.apache.flink.api.common.functions.RuntimeContext;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
24+
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
25+
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
26+
import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement;
27+
import org.apache.flink.connector.redshift.mode.SinkMode;
28+
import org.apache.flink.connector.redshift.mode.copy.RedshiftCopyModeRowConverterImpl;
29+
import org.apache.flink.table.data.GenericRowData;
30+
import org.apache.flink.table.data.RowData;
31+
import org.apache.flink.table.types.logical.LogicalType;
32+
33+
import com.amazon.redshift.RedshiftConnection;
34+
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
35+
import org.apache.commons.lang3.ArrayUtils;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
import java.io.Serializable;
40+
import java.sql.SQLException;
41+
import java.util.Arrays;
42+
import java.util.function.Function;
43+
import java.util.stream.IntStream;
44+
45+
import static org.apache.flink.table.data.RowData.createFieldGetter;
46+
47+
/** Executor for Redshift Operation. */
48+
public interface RedshiftExecutor extends Serializable {
49+
50+
Logger LOG = LoggerFactory.getLogger(RedshiftExecutor.class);
51+
52+
void setRuntimeContext(RuntimeContext context);
53+
54+
void prepareStatement(RedshiftConnection connection) throws SQLException;
55+
56+
void prepareStatement(RedshiftConnectionProvider connectionProvider) throws SQLException;
57+
58+
void addToBatch(RowData rowData) throws SQLException;
59+
60+
void executeBatch() throws SQLException;
61+
62+
void closeStatement();
63+
64+
default void attemptExecuteBatch(RedshiftPreparedStatement stmt, int maxRetries)
65+
throws SQLException {
66+
attemptExecuteBatch(stmt, maxRetries, true);
67+
}
68+
69+
default void attemptExecuteBatch(
70+
RedshiftPreparedStatement stmt, int maxRetries, Boolean batchMode) throws SQLException {
71+
for (int i = 0; i <= maxRetries; i++) {
72+
try {
73+
if (batchMode) {
74+
stmt.executeBatch();
75+
} else {
76+
stmt.execute();
77+
}
78+
79+
return;
80+
} catch (Exception exception) {
81+
LOG.error("Redshift executeBatch error, retry times = {}", i, exception);
82+
if (i >= maxRetries) {
83+
throw new SQLException(
84+
String.format(
85+
"Attempt to execute batch failed, exhausted retry times = %d",
86+
maxRetries),
87+
exception);
88+
}
89+
try {
90+
Thread.sleep(1000L * i);
91+
} catch (InterruptedException ex) {
92+
Thread.currentThread().interrupt();
93+
throw new SQLException(
94+
"Unable to flush; interrupted while doing another attempt", ex);
95+
}
96+
}
97+
}
98+
}
99+
100+
static RedshiftExecutor createRedshiftExecutor(
101+
String[] fieldNames,
102+
String[] keyFields,
103+
LogicalType[] fieldTypes,
104+
Configuration options) {
105+
if (keyFields.length > 0) {
106+
if (options.get(RedshiftSinkConfigConstants.SINK_MODE).equals(SinkMode.COPY)) {
107+
LOG.info("Create Upload Copy UPSERT Executor.");
108+
return createUploadUpsertExecutor(fieldNames, keyFields, fieldTypes, options);
109+
} else {
110+
LOG.info("Create pure JDBC UPSRET Executor.");
111+
return createUpsertExecutor(fieldNames, keyFields, fieldTypes, options);
112+
}
113+
114+
} else {
115+
if (options.get(RedshiftSinkConfigConstants.SINK_MODE).equals(SinkMode.COPY)) {
116+
LOG.info("Create Upload Copy batch Executor.");
117+
return createUploadBatchExecutor(fieldNames, fieldTypes, options);
118+
} else {
119+
LOG.info("Create pure JDBC batch Executor.");
120+
return createBatchExecutor(fieldNames, fieldTypes, options);
121+
}
122+
}
123+
}
124+
125+
static RedshiftUploadBatchExecutor createUploadBatchExecutor(
126+
String[] fieldNames, LogicalType[] fieldTypes, Configuration options) {
127+
return new RedshiftUploadBatchExecutor(fieldNames, fieldTypes, options);
128+
}
129+
130+
static RedshiftUploadUpsertExecutor createUploadUpsertExecutor(
131+
String[] fieldNames,
132+
String[] keyFields,
133+
LogicalType[] fieldTypes,
134+
Configuration options) {
135+
int[] delFields =
136+
Arrays.stream(keyFields)
137+
.mapToInt(pk -> ArrayUtils.indexOf(fieldNames, pk))
138+
.toArray();
139+
LogicalType[] delTypes =
140+
Arrays.stream(delFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
141+
142+
return new RedshiftUploadUpsertExecutor(
143+
fieldNames,
144+
keyFields,
145+
fieldTypes,
146+
new RedshiftCopyModeRowConverterImpl(delTypes),
147+
createExtractor(fieldTypes, delFields),
148+
options);
149+
}
150+
151+
static RedshiftBatchExecutor createBatchExecutor(
152+
String[] fieldNames, LogicalType[] fieldTypes, Configuration options) {
153+
String insertSql =
154+
RedshiftStatement.getInsertIntoStatement(
155+
options.getString(RedshiftSinkConfigConstants.TABLE_NAME), fieldNames);
156+
RedshiftRowConverter converter = new RedshiftCopyModeRowConverterImpl(fieldTypes);
157+
return new RedshiftBatchExecutor(insertSql, converter, options);
158+
}
159+
160+
static RedshiftUpsertExecutor createUpsertExecutor(
161+
String[] fieldNames,
162+
String[] keyFields,
163+
LogicalType[] fieldTypes,
164+
Configuration options) {
165+
String tableName = options.getString(RedshiftSinkConfigConstants.TABLE_NAME);
166+
String insertSql = RedshiftStatement.getInsertIntoStatement(tableName, fieldNames);
167+
String updateSql = RedshiftStatement.getUpdateStatement(tableName, fieldNames, keyFields);
168+
String deleteSql = RedshiftStatement.getDeleteStatement(tableName, keyFields);
169+
170+
// Re-sort the order of fields to fit the sql statement.
171+
int[] delFields =
172+
Arrays.stream(keyFields)
173+
.mapToInt(pk -> ArrayUtils.indexOf(fieldNames, pk))
174+
.toArray();
175+
int[] updatableFields =
176+
IntStream.range(0, fieldNames.length)
177+
.filter(idx -> !ArrayUtils.contains(keyFields, fieldNames[idx]))
178+
.toArray();
179+
int[] updFields = ArrayUtils.addAll(updatableFields, delFields);
180+
181+
LogicalType[] delTypes =
182+
Arrays.stream(delFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
183+
LogicalType[] updTypes =
184+
Arrays.stream(updFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
185+
186+
return new RedshiftUpsertExecutor(
187+
insertSql,
188+
updateSql,
189+
deleteSql,
190+
new RedshiftCopyModeRowConverterImpl(fieldTypes),
191+
new RedshiftCopyModeRowConverterImpl(updTypes),
192+
new RedshiftCopyModeRowConverterImpl(delTypes),
193+
createExtractor(fieldTypes, updFields),
194+
createExtractor(fieldTypes, delFields),
195+
options);
196+
}
197+
198+
static Function<RowData, RowData> createExtractor(LogicalType[] logicalTypes, int[] fields) {
199+
final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fields.length];
200+
for (int i = 0; i < fields.length; i++) {
201+
fieldGetters[i] = createFieldGetter(logicalTypes[fields[i]], fields[i]);
202+
}
203+
204+
return row -> {
205+
GenericRowData rowData = new GenericRowData(row.getRowKind(), fieldGetters.length);
206+
for (int i = 0; i < fieldGetters.length; i++) {
207+
rowData.setField(i, fieldGetters[i].getFieldOrNull(row));
208+
}
209+
return rowData;
210+
};
211+
}
212+
}

0 commit comments

Comments
 (0)