Skip to content

Commit 3c92fb6

Browse files
committed
Initializa Flink Redshift Connector
1 parent 4c2fdd9 commit 3c92fb6

12 files changed

+649
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.apache.flink</groupId>
8+
<artifactId>flink-connector-aws-parent</artifactId>
9+
<version>4.2-SNAPSHOT</version>
10+
</parent>
11+
12+
13+
<artifactId>flink-connector-redshift</artifactId>
14+
<name>Flink : Connectors : AWS : Amazon Redshift</name>
15+
16+
<properties>
17+
<redshift.jdbc.version>2.1.0.8</redshift.jdbc.version>
18+
</properties>
19+
<packaging>jar</packaging>
20+
21+
<dependencies>
22+
23+
<dependency>
24+
<groupId>com.amazon.redshift</groupId>
25+
<artifactId>redshift-jdbc42</artifactId>
26+
<version>2.1.0.17</version>
27+
<scope>provided</scope>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>org.apache.commons</groupId>
32+
<artifactId>commons-csv</artifactId>
33+
<version>1.10.0</version>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>org.apache.flink</groupId>
38+
<artifactId>flink-annotations</artifactId>
39+
<version>${flink.version}</version>
40+
<scope>provided</scope>
41+
</dependency>
42+
43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-table-common</artifactId>
46+
<version>${flink.version}</version>
47+
<scope>provided</scope>
48+
</dependency>
49+
50+
</dependencies>
51+
52+
<build>
53+
<plugins>
54+
<plugin>
55+
<groupId>org.apache.maven.plugins</groupId>
56+
<artifactId>maven-shade-plugin</artifactId>
57+
<executions>
58+
<execution>
59+
<id>shade-flink</id>
60+
<phase>package</phase>
61+
<goals>
62+
<goal>shade</goal>
63+
</goals>
64+
</execution>
65+
</executions>
66+
</plugin>
67+
</plugins>
68+
</build>
69+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.mode.converter;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.data.DecimalData;
23+
import org.apache.flink.table.data.StringData;
24+
import org.apache.flink.table.data.TimestampData;
25+
import org.apache.flink.table.types.logical.DecimalType;
26+
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
27+
import org.apache.flink.table.types.logical.LogicalType;
28+
import org.apache.flink.table.types.logical.TimestampType;
29+
30+
import java.math.BigDecimal;
31+
import java.math.BigInteger;
32+
import java.sql.Date;
33+
import java.sql.Time;
34+
import java.sql.Timestamp;
35+
import java.time.LocalDate;
36+
import java.time.LocalTime;
37+
import java.util.UUID;
38+
39+
import static org.apache.flink.connector.redshift.mode.converter.RedshiftConverterUtil.toEpochDayOneTimestamp;
40+
41+
/** Base class for all converters that convert between JDBC object and Flink internal object. */
42+
@Internal
43+
public abstract class AbstractRedshiftRowConverter implements RedshiftRowConverter {
44+
public static final int BOOLEAN_TRUE = 1;
45+
46+
protected DeserializationConverter createToInternalConverter(LogicalType type) {
47+
switch (type.getTypeRoot()) {
48+
case NULL:
49+
return val -> null;
50+
case BOOLEAN:
51+
return val -> BOOLEAN_TRUE == ((Number) val).intValue();
52+
case FLOAT:
53+
case DOUBLE:
54+
case INTERVAL_YEAR_MONTH:
55+
case INTERVAL_DAY_TIME:
56+
case INTEGER:
57+
case BIGINT:
58+
case BINARY:
59+
case VARBINARY:
60+
return val -> val;
61+
case TINYINT:
62+
return val -> ((Integer) val).byteValue();
63+
case SMALLINT:
64+
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
65+
case DECIMAL:
66+
final int precision = ((DecimalType) type).getPrecision();
67+
final int scale = ((DecimalType) type).getScale();
68+
return val ->
69+
val instanceof BigInteger
70+
? DecimalData.fromBigDecimal(
71+
new BigDecimal((BigInteger) val, 0), precision, scale)
72+
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
73+
case DATE:
74+
return val -> (int) ((Date) val).toLocalDate().toEpochDay();
75+
case TIME_WITHOUT_TIME_ZONE:
76+
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
77+
case TIMESTAMP_WITH_TIME_ZONE:
78+
case TIMESTAMP_WITHOUT_TIME_ZONE:
79+
return val -> TimestampData.fromTimestamp((Timestamp) val);
80+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
81+
return val -> TimestampData.fromInstant(((Timestamp) val).toInstant());
82+
case CHAR:
83+
case VARCHAR:
84+
return val ->
85+
val instanceof UUID
86+
? StringData.fromString(val.toString())
87+
: StringData.fromString((String) val);
88+
case ARRAY:
89+
case MAP:
90+
return val -> RedshiftConverterUtil.toInternal(val, type);
91+
case ROW:
92+
case MULTISET:
93+
case RAW:
94+
default:
95+
throw new UnsupportedOperationException("Unsupported type:" + type);
96+
}
97+
}
98+
99+
protected SerializationConverter createToExternalConverter(LogicalType type) {
100+
switch (type.getTypeRoot()) {
101+
case BOOLEAN:
102+
return (val, index, statement) ->
103+
statement.setBoolean(index + 1, val.getBoolean(index));
104+
case FLOAT:
105+
return (val, index, statement) ->
106+
statement.setFloat(index + 1, val.getFloat(index));
107+
case DOUBLE:
108+
return (val, index, statement) ->
109+
statement.setDouble(index + 1, val.getDouble(index));
110+
case INTERVAL_YEAR_MONTH:
111+
case INTEGER:
112+
return (val, index, statement) -> statement.setInt(index + 1, val.getInt(index));
113+
case INTERVAL_DAY_TIME:
114+
case BIGINT:
115+
return (val, index, statement) -> statement.setLong(index + 1, val.getLong(index));
116+
case TINYINT:
117+
return (val, index, statement) -> statement.setByte(index + 1, val.getByte(index));
118+
case SMALLINT:
119+
return (val, index, statement) ->
120+
statement.setShort(index + 1, val.getShort(index));
121+
case CHAR:
122+
case VARCHAR:
123+
// value is BinaryString
124+
return (val, index, statement) ->
125+
statement.setString(index + 1, val.getString(index).toString());
126+
case BINARY:
127+
case VARBINARY:
128+
return (val, index, statement) ->
129+
statement.setBytes(index + 1, val.getBinary(index));
130+
case DATE:
131+
return (val, index, statement) ->
132+
statement.setDate(
133+
index + 1, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
134+
case TIME_WITHOUT_TIME_ZONE:
135+
return (val, index, statement) -> {
136+
LocalTime localTime = LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L);
137+
statement.setTimestamp(index + 1, toEpochDayOneTimestamp(localTime));
138+
};
139+
case TIMESTAMP_WITH_TIME_ZONE:
140+
case TIMESTAMP_WITHOUT_TIME_ZONE:
141+
final int timestampPrecision = ((TimestampType) type).getPrecision();
142+
return (val, index, statement) ->
143+
statement.setTimestamp(
144+
index + 1,
145+
val.getTimestamp(index, timestampPrecision).toTimestamp());
146+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
147+
final int localZonedTimestampPrecision =
148+
((LocalZonedTimestampType) type).getPrecision();
149+
return (val, index, statement) ->
150+
statement.setTimestamp(
151+
index + 1,
152+
Timestamp.from(
153+
val.getTimestamp(index, localZonedTimestampPrecision)
154+
.toInstant()));
155+
case DECIMAL:
156+
final int decimalPrecision = ((DecimalType) type).getPrecision();
157+
final int decimalScale = ((DecimalType) type).getScale();
158+
return (val, index, statement) ->
159+
statement.setBigDecimal(
160+
index + 1,
161+
val.getDecimal(index, decimalPrecision, decimalScale)
162+
.toBigDecimal());
163+
case ARRAY:
164+
return (val, index, statement) ->
165+
statement.setArray(
166+
index + 1,
167+
(java.sql.Array)
168+
RedshiftConverterUtil.toExternal(
169+
val.getArray(index), type));
170+
case MAP:
171+
return (val, index, statement) ->
172+
statement.setObject(
173+
index + 1,
174+
RedshiftConverterUtil.toExternal(val.getMap(index), type));
175+
case MULTISET:
176+
case ROW:
177+
case RAW:
178+
default:
179+
throw new UnsupportedOperationException("Unsupported type:" + type);
180+
}
181+
}
182+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.redshift.mode.converter;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
import java.io.Serializable;
24+
import java.sql.SQLException;
25+
26+
/** Functional Interface for Deserialization Converter. */
27+
@Internal
28+
@FunctionalInterface
29+
public interface DeserializationConverter extends Serializable {
30+
/**
31+
* Convert an object of {@link com.amazon.redshift.jdbc.RedshiftResultSet} to the internal data
32+
* structure object.
33+
*/
34+
Object deserialize(Object field) throws SQLException;
35+
}

0 commit comments

Comments
 (0)