Skip to content

Commit b0bdab2

Browse files
niuge01jackylk
authored andcommitted
[CARBONDATA-3640][CARBONDATA-3557] Support flink ingest carbon partition table
Add support for flink carbon sink to write partitioned carbondata files as stage files. Add support for INSERT STAGE command to load stage files into CarbonData table. This closes apache#3542
1 parent 3b85e9f commit b0bdab2

File tree

20 files changed

+874
-308
lines changed

20 files changed

+874
-308
lines changed

Diff for: core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java

+58
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ public class StageInput {
3939
*/
4040
private Map<String, Long> files;
4141

42+
/**
43+
* this list of partition data information in this StageInput
44+
* @see PartitionLocation
45+
*/
46+
private List<PartitionLocation> locations;
47+
4248
public StageInput() {
4349

4450
}
@@ -48,6 +54,11 @@ public StageInput(String base, Map<String, Long> files) {
4854
this.files = files;
4955
}
5056

57+
public StageInput(String base, List<PartitionLocation> locations) {
58+
this.base = base;
59+
this.locations = locations;
60+
}
61+
5162
public String getBase() {
5263
return base;
5364
}
@@ -64,6 +75,14 @@ public void setFiles(Map<String, Long> files) {
6475
this.files = files;
6576
}
6677

78+
public List<PartitionLocation> getLocations() {
79+
return this.locations;
80+
}
81+
82+
public void setLocations(final List<PartitionLocation> locations) {
83+
this.locations = locations;
84+
}
85+
6786
public List<InputSplit> createSplits() {
6887
return
6988
files.entrySet().stream().filter(
@@ -75,4 +94,43 @@ public List<InputSplit> createSplits() {
7594
).collect(Collectors.toList());
7695
}
7796

97+
public static final class PartitionLocation {
98+
99+
public PartitionLocation() {
100+
101+
}
102+
103+
public PartitionLocation(final Map<String, String> partitions, final Map<String, Long> files) {
104+
this.partitions = partitions;
105+
this.files = files;
106+
}
107+
108+
/**
109+
* the list of (partitionColumn, partitionValue) of this partition.
110+
*/
111+
private Map<String, String> partitions;
112+
113+
/**
114+
* the list of (file, length) in this partition.
115+
*/
116+
private Map<String, Long> files;
117+
118+
public Map<String, String> getPartitions() {
119+
return this.partitions;
120+
}
121+
122+
public void setPartitions(final Map<String, String> partitions) {
123+
this.partitions = partitions;
124+
}
125+
126+
public Map<String, Long> getFiles() {
127+
return this.files;
128+
}
129+
130+
public void setFiles(final Map<String, Long> files) {
131+
this.files = files;
132+
}
133+
134+
}
135+
78136
}

Diff for: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected DateFormat initialValue() {
7575
/**
7676
* DataType converter for different computing engines
7777
*/
78-
private static DataTypeConverter converter;
78+
private static final ThreadLocal<DataTypeConverter> converter = new ThreadLocal<>();
7979

8080
/**
8181
* This method will convert a given value to its specific type
@@ -105,7 +105,7 @@ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType da
105105
new BigDecimal(msrValue).setScale(scale, RoundingMode.HALF_UP);
106106
BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
107107
if (useConverter) {
108-
return converter.convertFromBigDecimalToDecimal(decimal);
108+
return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
109109
} else {
110110
return decimal;
111111
}
@@ -144,7 +144,7 @@ public static Object getNoDictionaryValueBasedOnDataType(String dimValue, DataTy
144144
new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
145145
BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
146146
if (useConverter) {
147-
return converter.convertFromBigDecimalToDecimal(decimal);
147+
return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
148148
} else {
149149
return decimal;
150150
}
@@ -457,7 +457,7 @@ public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue,
457457
}
458458
} else {
459459
// Default action for String/Varchar
460-
return converter.convertFromStringToUTF8String(dimensionValue);
460+
return getDataTypeConverter().convertFromStringToUTF8String(dimensionValue);
461461
}
462462
}
463463

@@ -974,7 +974,7 @@ private static String parseStringToBigDecimal(String value, ColumnSchema columnS
974974
*/
975975
public static void setDataTypeConverter(DataTypeConverter converterLocal) {
976976
if (converterLocal != null) {
977-
converter = converterLocal;
977+
converter.set(converterLocal);
978978
timeStampformatter.remove();
979979
dateformatter.remove();
980980
}
@@ -989,10 +989,17 @@ public static void clearFormatter() {
989989
}
990990

991991
public static DataTypeConverter getDataTypeConverter() {
992-
if (converter == null) {
993-
converter = new DataTypeConverterImpl();
992+
DataTypeConverter dataTypeConverter = converter.get();
993+
if (dataTypeConverter == null) {
994+
synchronized (converter) {
995+
dataTypeConverter = converter.get();
996+
if (dataTypeConverter == null) {
997+
dataTypeConverter = new DataTypeConverterImpl();
998+
converter.set(dataTypeConverter);
999+
}
1000+
}
9941001
}
995-
return converter;
1002+
return dataTypeConverter;
9961003
}
9971004

9981005
public static DataType valueOf(String name) {

Diff for: integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ public abstract class ProxyFileWriter<OUT> implements BulkWriter<OUT> {
2525

2626
public abstract ProxyFileWriterFactory getFactory();
2727

28-
public abstract String getPartition();
28+
public abstract String getIdentifier();
29+
30+
public abstract String getPath();
2931

3032
public abstract void commit() throws IOException;
3133

Diff for: integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void setConfiguration(final Configuration configuration) {
7474
this.configuration = configuration;
7575
}
7676

77-
public abstract ProxyFileWriter<OUT> create(String partition) throws IOException;
77+
public abstract ProxyFileWriter<OUT> create(String identifier, String path) throws IOException;
7878

7979
public static class Configuration implements Serializable {
8080

Diff for: integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,22 @@ public final class ProxyRecoverable
2525
public ProxyRecoverable(
2626
final String writerType,
2727
final ProxyFileWriterFactory.Configuration writerConfiguration,
28-
final String partition
28+
final String writerIdentifier,
29+
final String writePath
2930
) {
3031
this.writerType = writerType;
3132
this.writerConfiguration = writerConfiguration;
32-
this.partition = partition;
33+
this.writerIdentifier = writerIdentifier;
34+
this.writePath = writePath;
3335
}
3436

3537
private final String writerType;
3638

3739
private final ProxyFileWriterFactory.Configuration writerConfiguration;
3840

39-
private final String partition;
41+
private final String writerIdentifier;
42+
43+
private final String writePath;
4044

4145
public String getWriterType() {
4246
return this.writerType;
@@ -46,8 +50,12 @@ public ProxyFileWriterFactory.Configuration getWriterConfiguration() {
4650
return this.writerConfiguration;
4751
}
4852

49-
public String getPartition() {
50-
return this.partition;
53+
public String getWriterIdentifier() {
54+
return this.writerIdentifier;
55+
}
56+
57+
public String getWritePath() {
58+
return this.writePath;
5159
}
5260

5361
}

Diff for: integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ public Committer closeForCommit() {
8282
new ProxyRecoverable(
8383
this.writer.getFactory().getType(),
8484
this.writer.getFactory().getConfiguration(),
85-
this.writer.getPartition()
85+
this.writer.getIdentifier(),
86+
this.writer.getPath()
8687
)
8788
);
8889
}
@@ -118,7 +119,8 @@ private ProxyFileWriter<?> newWriter() throws IOException {
118119
throw new UnsupportedOperationException();
119120
}
120121
writerFactory.setConfiguration(this.recoverable.getWriterConfiguration());
121-
return writerFactory.create(this.recoverable.getPartition());
122+
return writerFactory.create(this.recoverable.getWriterIdentifier(),
123+
this.recoverable.getWritePath());
122124
}
123125

124126
}

Diff for: integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ public byte[] serialize(final ProxyRecoverable proxyRecoverable) {
5353
final ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
5454
serializeString(byteBuffer, proxyRecoverable.getWriterType());
5555
serializeConfiguration(byteBuffer, proxyRecoverable.getWriterConfiguration());
56-
serializeString(byteBuffer, proxyRecoverable.getPartition());
56+
serializeString(byteBuffer, proxyRecoverable.getWriterIdentifier());
57+
serializeString(byteBuffer, proxyRecoverable.getWritePath());
5758
final byte[] bytes = new byte[byteBuffer.position()];
5859
byteBuffer.position(0);
5960
byteBuffer.get(bytes);
@@ -113,8 +114,9 @@ public ProxyRecoverable deserialize(final int version, final byte[] bytes) {
113114
final String writerType = deserializeString(byteBuffer);
114115
final ProxyFileWriterFactory.Configuration writerConfiguration =
115116
deserializeConfiguration(byteBuffer);
116-
final String partition = deserializeString(byteBuffer);
117-
return new ProxyRecoverable(writerType, writerConfiguration, partition);
117+
final String writerIdentifier = deserializeString(byteBuffer);
118+
final String writePath = deserializeString(byteBuffer);
119+
return new ProxyRecoverable(writerType, writerConfiguration, writerIdentifier, writePath);
118120
}
119121

120122
private static ProxyFileWriterFactory.Configuration deserializeConfiguration(

Diff for: integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public final class CarbonLocalProperty {
2323

2424
public static final String DATA_PATH = "carbon.writer.local.data.path";
2525

26+
static final String COMMIT_THRESHOLD = "carbon.writer.local.commit.threshold";
27+
2628
private CarbonLocalProperty() {
2729
// private constructor.
2830
}

0 commit comments

Comments
 (0)