Skip to content

Commit af60a80

Browse files
committed
changes till now
1 parent c96ea6c commit af60a80

16 files changed

+127
-459
lines changed

flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/converter/DeserializationConverter.java

-35
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,24 @@
1818

1919
package org.apache.flink.connector.redshift.converter;
2020

21-
import org.apache.flink.annotation.Internal;
2221
import org.apache.flink.table.data.ArrayData;
2322
import org.apache.flink.table.data.DecimalData;
24-
import org.apache.flink.table.data.GenericArrayData;
25-
import org.apache.flink.table.data.GenericMapData;
2623
import org.apache.flink.table.data.MapData;
27-
import org.apache.flink.table.data.StringData;
2824
import org.apache.flink.table.data.TimestampData;
2925
import org.apache.flink.table.types.logical.ArrayType;
30-
import org.apache.flink.table.types.logical.DecimalType;
3126
import org.apache.flink.table.types.logical.LogicalType;
3227
import org.apache.flink.table.types.logical.MapType;
3328

34-
import java.math.BigDecimal;
35-
import java.math.BigInteger;
36-
import java.sql.Array;
3729
import java.sql.Date;
38-
import java.sql.SQLException;
39-
import java.sql.Time;
4030
import java.sql.Timestamp;
4131
import java.time.LocalDate;
4232
import java.time.LocalDateTime;
4333
import java.time.LocalTime;
4434
import java.util.HashMap;
4535
import java.util.Map;
4636

47-
/** Utility Class to map Flink Rich DataTypes with Redshift Limited DataType Support. */
48-
@Internal
49-
public class RedshiftConverterUtil {
50-
51-
/** Restrict Object Creation for the Utility Class. */
52-
private RedshiftConverterUtil() {}
53-
54-
public static final String UNKNOWN_ARRAY_ELEMENT_TYPE = "Unknown array element type";
55-
public static final int BOOLEAN_TRUE = 1;
37+
/** Row converter. */
38+
public class RedshiftConverterUtils {
5639

5740
public static Object toExternal(Object value, LogicalType type) {
5841
switch (type.getTypeRoot()) {
@@ -91,7 +74,7 @@ public static Object toExternal(Object value, LogicalType type) {
9174
.orElseThrow(
9275
() ->
9376
new RuntimeException(
94-
UNKNOWN_ARRAY_ELEMENT_TYPE));
77+
"Unknown array element type"));
9578
ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
9679
ArrayData arrayData = ((ArrayData) value);
9780
Object[] objectArray = new Object[arrayData.size()];
@@ -123,83 +106,8 @@ public static Object toExternal(Object value, LogicalType type) {
123106
}
124107
}
125108

126-
static Object toInternal(Object value, LogicalType type) throws SQLException {
127-
switch (type.getTypeRoot()) {
128-
case NULL:
129-
return null;
130-
case BOOLEAN:
131-
return BOOLEAN_TRUE == ((Number) value).intValue();
132-
case FLOAT:
133-
case DOUBLE:
134-
case INTERVAL_YEAR_MONTH:
135-
case INTERVAL_DAY_TIME:
136-
case INTEGER:
137-
case BIGINT:
138-
case BINARY:
139-
case VARBINARY:
140-
return value;
141-
case TINYINT:
142-
return ((Integer) value).byteValue();
143-
case SMALLINT:
144-
return value instanceof Integer ? ((Integer) value).shortValue() : value;
145-
case DECIMAL:
146-
final int precision = ((DecimalType) type).getPrecision();
147-
final int scale = ((DecimalType) type).getScale();
148-
return value instanceof BigInteger
149-
? DecimalData.fromBigDecimal(
150-
new BigDecimal((BigInteger) value, 0), precision, scale)
151-
: DecimalData.fromBigDecimal((BigDecimal) value, precision, scale);
152-
case DATE:
153-
return (int) (((Date) value).toLocalDate().toEpochDay());
154-
case TIME_WITHOUT_TIME_ZONE:
155-
return (int) (((Time) value).toLocalTime().toNanoOfDay() / 1_000_000L);
156-
case TIMESTAMP_WITH_TIME_ZONE:
157-
case TIMESTAMP_WITHOUT_TIME_ZONE:
158-
return TimestampData.fromTimestamp((Timestamp) value);
159-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
160-
return TimestampData.fromInstant(((Timestamp) value).toInstant());
161-
case CHAR:
162-
case VARCHAR:
163-
return StringData.fromString((String) value);
164-
case ARRAY:
165-
LogicalType elementType =
166-
type.getChildren().stream()
167-
.findFirst()
168-
.orElseThrow(
169-
() -> new RuntimeException(UNKNOWN_ARRAY_ELEMENT_TYPE));
170-
Object externalArray = ((Array) value).getArray();
171-
int externalArrayLength = java.lang.reflect.Array.getLength(externalArray);
172-
Object[] internalArray = new Object[externalArrayLength];
173-
for (int i = 0; i < externalArrayLength; i++) {
174-
internalArray[i] =
175-
toInternal(java.lang.reflect.Array.get(externalArray, i), elementType);
176-
}
177-
return new GenericArrayData(internalArray);
178-
case MAP:
179-
LogicalType keyType = ((MapType) type).getKeyType();
180-
LogicalType valueType = ((MapType) type).getValueType();
181-
Map<?, ?> externalMap = (Map<?, ?>) value;
182-
Map<Object, Object> internalMap = new HashMap<>(externalMap.size());
183-
for (Map.Entry<?, ?> entry : externalMap.entrySet()) {
184-
internalMap.put(
185-
toInternal(entry.getKey(), keyType),
186-
toInternal(entry.getValue(), valueType));
187-
}
188-
return new GenericMapData(internalMap);
189-
case ROW:
190-
// todo: Implement Support for ROW.
191-
case MULTISET:
192-
// todo: Implement Support for MultiSet.
193-
case RAW:
194-
// todo: Implement Support for RAW.
195-
default:
196-
throw new UnsupportedOperationException("Unsupported type:" + type);
197-
}
198-
}
199-
200109
public static Timestamp toEpochDayOneTimestamp(LocalTime localTime) {
201-
final LocalDate datePrefix = LocalDate.ofEpochDay(1);
202-
LocalDateTime localDateTime = localTime.atDate(datePrefix);
110+
LocalDateTime localDateTime = localTime.atDate(LocalDate.ofEpochDay(1));
203111
return Timestamp.valueOf(localDateTime);
204112
}
205113
}
Original file line numberDiff line numberDiff line change
@@ -16,86 +16,80 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.connector.redshift.mode.copy;
19+
package org.apache.flink.connector.redshift.converter;
2020

21-
import org.apache.flink.annotation.Internal;
22-
import org.apache.flink.connector.redshift.converter.AbstractRedshiftRowConverter;
2321
import org.apache.flink.table.data.RowData;
2422
import org.apache.flink.table.types.logical.DecimalType;
2523
import org.apache.flink.table.types.logical.LogicalType;
2624
import org.apache.flink.table.types.logical.TimestampType;
2725

28-
import java.sql.ResultSet;
26+
import java.io.Serializable;
2927
import java.time.LocalDate;
3028
import java.time.format.DateTimeFormatter;
3129
import java.util.ArrayList;
3230

33-
/** Converter Implementation for Redshift in COPY Mode. */
34-
@Internal
35-
public class RedshiftCopyModeRowConverterImpl extends AbstractRedshiftRowConverter {
36-
37-
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
31+
/** Row converter. */
32+
public class RedshiftCopyModeRowConverter implements Serializable {
3833

3934
private final LogicalType[] fieldTypes;
4035

41-
public RedshiftCopyModeRowConverterImpl(LogicalType[] fieldTypes) {
36+
public RedshiftCopyModeRowConverter(LogicalType[] fieldTypes) {
4237
this.fieldTypes = fieldTypes;
4338
}
4439

45-
@Override
46-
public Object toExternal(RowData rowData) {
40+
public String[] toExternal(RowData rowData) {
4741
ArrayList<String> csvLine = new ArrayList<>();
4842
for (int index = 0; index < rowData.getArity(); index++) {
4943
LogicalType type = fieldTypes[index];
50-
StringBuilder sb = new StringBuilder();
44+
String val = "";
5145
switch (type.getTypeRoot()) {
5246
case BOOLEAN:
53-
sb.append(rowData.getBoolean(index));
47+
val = Boolean.toString(rowData.getBoolean(index));
5448
break;
5549
case FLOAT:
56-
sb.append(rowData.getFloat(index));
50+
val = String.valueOf(rowData.getFloat(index));
5751
break;
5852
case DOUBLE:
59-
sb.append(rowData.getDouble(index));
53+
val = String.valueOf(rowData.getDouble(index));
6054
break;
6155
case INTERVAL_YEAR_MONTH:
6256
case INTEGER:
63-
sb.append(rowData.getInt(index));
57+
val = String.valueOf(rowData.getInt(index));
6458
break;
6559
case INTERVAL_DAY_TIME:
6660
case BIGINT:
67-
sb.append(rowData.getLong(index));
61+
val = String.valueOf(rowData.getLong(index));
6862
break;
6963
case TINYINT:
7064
case SMALLINT:
7165
case CHAR:
7266
case VARCHAR:
73-
sb.append(rowData.getString(index).toString());
67+
val = rowData.getString(index).toString();
7468
break;
7569
case BINARY:
7670
case VARBINARY:
7771
case DATE:
78-
sb.append(
72+
val =
7973
LocalDate.ofEpochDay(rowData.getInt(index))
80-
.format(DateTimeFormatter.ISO_DATE));
74+
.format(DateTimeFormatter.ISO_DATE);
8175
break;
8276
case TIME_WITHOUT_TIME_ZONE:
8377
case TIMESTAMP_WITH_TIME_ZONE:
8478
case TIMESTAMP_WITHOUT_TIME_ZONE:
8579
final int timestampPrecision = ((TimestampType) type).getPrecision();
86-
final DateTimeFormatter dateTimeFormatter =
87-
DateTimeFormatter.ofPattern(DATE_TIME_FORMAT);
88-
sb.append(
80+
final DateTimeFormatter timeFormaterr =
81+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
82+
val =
8983
rowData.getTimestamp(index, timestampPrecision)
9084
.toLocalDateTime()
91-
.format(dateTimeFormatter));
85+
.format(timeFormaterr);
9286
break;
9387
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
9488

9589
case DECIMAL:
9690
final int decimalPrecision = ((DecimalType) type).getPrecision();
9791
final int decimalScale = ((DecimalType) type).getScale();
98-
sb.append(rowData.getDecimal(index, decimalPrecision, decimalScale));
92+
val = String.valueOf(rowData.getDecimal(index, decimalPrecision, decimalScale));
9993
break;
10094
case ARRAY:
10195

@@ -106,14 +100,8 @@ public Object toExternal(RowData rowData) {
106100
default:
107101
throw new UnsupportedOperationException("Unsupported type:" + type);
108102
}
109-
csvLine.add(sb.toString());
103+
csvLine.add(val);
110104
}
111105
return csvLine.toArray(new String[0]);
112106
}
113-
114-
@Override
115-
public RowData toInternal(ResultSet resultSet) {
116-
throw new UnsupportedOperationException(
117-
"COPY Mode is supported only for writing data to Redshift in batch manner.");
118-
}
119107
}

0 commit comments

Comments
 (0)