Skip to content

Commit 7703c22

Browse files
committed
Restructure Flink Connector Redshift
1 parent af60a80 commit 7703c22

25 files changed

+1431
-478
lines changed

flink-connector-aws/flink-connector-redshift/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
<properties>
1717
<redshift.jdbc.version>2.1.0.8</redshift.jdbc.version>
18+
<lombok.version>1.18.22</lombok.version>
1819
</properties>
1920
<packaging>jar</packaging>
2021

@@ -52,6 +53,12 @@
5253
<artifactId>flink-table-common</artifactId>
5354
</dependency>
5455

56+
<dependency>
57+
<groupId>org.projectlombok</groupId>
58+
<artifactId>lombok</artifactId>
59+
<version>${lombok.version}</version>
60+
</dependency>
61+
5562
</dependencies>
5663

5764
<build>

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/converter/RedshiftCopyModeRowConverter.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,20 @@
2323
import org.apache.flink.table.types.logical.LogicalType;
2424
import org.apache.flink.table.types.logical.TimestampType;
2525

26-
import java.io.Serializable;
2726
import java.time.LocalDate;
2827
import java.time.format.DateTimeFormatter;
29-
import java.util.ArrayList;
3028

3129
/** Row converter. */
32-
public class RedshiftCopyModeRowConverter implements Serializable {
30+
public class RedshiftCopyModeRowConverter implements RedshiftRowConverter {
3331

32+
private static final long serialVersionUID = 1L;
3433
private final LogicalType[] fieldTypes;
3534

3635
public RedshiftCopyModeRowConverter(LogicalType[] fieldTypes) {
3736
this.fieldTypes = fieldTypes;
3837
}
3938

40-
public String[] toExternal(RowData rowData) {
41-
ArrayList<String> csvLine = new ArrayList<>();
39+
public void toExternal(RowData rowData, String[] data) {
4240
for (int index = 0; index < rowData.getArity(); index++) {
4341
LogicalType type = fieldTypes[index];
4442
String val = "";
@@ -100,8 +98,7 @@ public String[] toExternal(RowData rowData) {
10098
default:
10199
throw new UnsupportedOperationException("Unsupported type:" + type);
102100
}
103-
csvLine.add(val);
101+
data[index] = val;
104102
}
105-
return csvLine.toArray(new String[0]);
106103
}
107104
}
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,16 @@
1818

1919
package org.apache.flink.connector.redshift.converter;
2020

21+
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
2122
import org.apache.flink.table.data.RowData;
2223
import org.apache.flink.table.types.logical.DecimalType;
2324
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
2425
import org.apache.flink.table.types.logical.LogicalType;
2526
import org.apache.flink.table.types.logical.RowType;
2627
import org.apache.flink.table.types.logical.TimestampType;
28+
import org.apache.flink.util.FlinkRuntimeException;
2729
import org.apache.flink.util.Preconditions;
2830

29-
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
30-
31-
import java.io.Serializable;
3231
import java.sql.Date;
3332
import java.sql.SQLException;
3433
import java.sql.Timestamp;
@@ -38,14 +37,14 @@
3837
import static org.apache.flink.connector.redshift.converter.RedshiftConverterUtils.toEpochDayOneTimestamp;
3938

4039
/** Row converter. */
41-
public class RedshiftJdbcRowConverter implements Serializable {
40+
public class RedshiftJdbcModeRowConverter implements RedshiftRowConverter {
4241
private static final long serialVersionUID = 1L;
4342

4443
private final RowType rowType;
4544

4645
private final SerializationConverter[] toExternalConverters;
4746

48-
public RedshiftJdbcRowConverter(RowType rowType) {
47+
public RedshiftJdbcModeRowConverter(RowType rowType) {
4948
this.rowType = Preconditions.checkNotNull(rowType);
5049
LogicalType[] logicalTypes =
5150
rowType.getFields().stream()
@@ -58,19 +57,22 @@ public RedshiftJdbcRowConverter(RowType rowType) {
5857
}
5958
}
6059

61-
public void toExternal(RowData rowData, RedshiftPreparedStatement insertStatement)
62-
throws SQLException {
60+
@Override
61+
public void toExternal(RowData rowData, FieldNamedRedshiftPreparedStatement insertStatement) {
6362
for (int index = 0; index < rowData.getArity(); index++) {
64-
if (!rowData.isNullAt(index)) {
65-
toExternalConverters[index].serialize(rowData, index, insertStatement);
66-
} else {
67-
insertStatement.setObject(index + 1, null);
63+
try {
64+
if (rowData.isNullAt(index)) {
65+
insertStatement.setObject(index + 1, null);
66+
} else {
67+
toExternalConverters[index].serialize(rowData, index, insertStatement);
68+
}
69+
} catch (SQLException e) {
70+
throw new FlinkRuntimeException(e);
6871
}
6972
}
7073
}
7174

72-
protected RedshiftJdbcRowConverter.SerializationConverter createToExternalConverter(
73-
LogicalType type) {
75+
public SerializationConverter createToExternalConverter(LogicalType type) {
7476
switch (type.getTypeRoot()) {
7577
case BOOLEAN:
7678
return (val, index, statement) ->
@@ -134,13 +136,6 @@ protected RedshiftJdbcRowConverter.SerializationConverter createToExternalConver
134136
index + 1,
135137
val.getDecimal(index, decimalPrecision, decimalScale)
136138
.toBigDecimal());
137-
// case ARRAY:
138-
// return (val, index, statement) ->
139-
// statement.setArray(
140-
// index + 1,
141-
// (Object[])
142-
// RedshiftConverterUtils.toExternal(
143-
// val.getArray(index), type));
144139
case MAP:
145140
return (val, index, statement) ->
146141
statement.setObject(
@@ -153,14 +148,4 @@ protected RedshiftJdbcRowConverter.SerializationConverter createToExternalConver
153148
throw new UnsupportedOperationException("Unsupported type:" + type);
154149
}
155150
}
156-
157-
@FunctionalInterface
158-
interface SerializationConverter extends Serializable {
159-
/**
160-
* Convert an internal field to java object and fill into the {@link
161-
* RedshiftJdbcRowConverter}.
162-
*/
163-
void serialize(RowData rowData, int index, RedshiftPreparedStatement statement)
164-
throws SQLException;
165-
}
166151
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.converter;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
23+
import org.apache.flink.table.data.RowData;
24+
25+
import java.io.Serializable;
26+
27+
/** Converter. */
28+
@PublicEvolving
29+
public interface RedshiftRowConverter extends Serializable {
30+
31+
default void toExternal(RowData rowData, FieldNamedRedshiftPreparedStatement insertStatement) {}
32+
33+
default void toExternal(RowData rowData, String[] data) {}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.converter;
20+
21+
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
22+
import org.apache.flink.table.data.RowData;
23+
24+
import java.io.Serializable;
25+
import java.sql.SQLException;
26+
27+
@FunctionalInterface
28+
interface SerializationConverter extends Serializable {
29+
long serialVersionUID = 1L;
30+
/** Convert an internal field to java object and fill into the {@link RedshiftRowConverter}. */
31+
void serialize(RowData rowData, int index, FieldNamedRedshiftPreparedStatement statement)
32+
throws SQLException;
33+
}

0 commit comments

Comments
 (0)