Skip to content

Commit 4430178

Browse files
ravipesalajackylk
authored andcommittedDec 18, 2017
[CARBONDATA-1856][PARTITION] Support insert/load data for partition table
Changed carbonrelation to HadoopFSRelation during load in case of the partition table. Implement sparks Fileformat interface for carbon and use carbonoutputformat inside. Create partition.map file inside each segment for mapping between partition and index file. This closes apache#1654
1 parent 6e224dc commit 4430178

File tree

37 files changed

+1165
-225
lines changed

37 files changed

+1165
-225
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.carbondata.core.metadata;
18+
19+
import java.io.BufferedReader;
20+
import java.io.BufferedWriter;
21+
import java.io.DataInputStream;
22+
import java.io.DataOutputStream;
23+
import java.io.IOException;
24+
import java.io.InputStreamReader;
25+
import java.io.OutputStreamWriter;
26+
import java.io.Serializable;
27+
import java.nio.charset.Charset;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
33+
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
34+
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
35+
import org.apache.carbondata.core.datastore.impl.FileFactory;
36+
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
37+
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
38+
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
39+
import org.apache.carbondata.core.util.CarbonUtil;
40+
import org.apache.carbondata.core.util.path.CarbonTablePath;
41+
42+
import com.google.gson.Gson;
43+
44+
/**
45+
* Provide read and write support for partition mapping file in each segment
46+
*/
47+
public class PartitionMapFileStore {
48+
49+
/**
50+
* Write partitionmapp file to the segment folder with indexfilename and corresponding partitions.
51+
*
52+
* @param segmentPath
53+
* @param taskNo
54+
* @param partionNames
55+
* @throws IOException
56+
*/
57+
public void writePartitionMapFile(String segmentPath, final String taskNo,
58+
List<String> partionNames) throws IOException {
59+
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
60+
// write partition info to new file.
61+
if (carbonFile.exists() && partionNames.size() > 0) {
62+
CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
63+
@Override public boolean accept(CarbonFile file) {
64+
return file.getName().startsWith(taskNo) && file.getName()
65+
.endsWith(CarbonTablePath.INDEX_FILE_EXT);
66+
}
67+
});
68+
if (carbonFiles != null && carbonFiles.length > 0) {
69+
PartitionMapper partitionMapper = new PartitionMapper();
70+
Map<String, List<String>> partitionMap = new HashMap<>();
71+
partitionMap.put(carbonFiles[0].getName(), partionNames);
72+
partitionMapper.setPartitionMap(partitionMap);
73+
String path = segmentPath + "/" + taskNo + CarbonTablePath.PARTITION_MAP_EXT;
74+
writePartitionFile(partitionMapper, path);
75+
}
76+
}
77+
}
78+
79+
private void writePartitionFile(PartitionMapper partitionMapper, String path) throws IOException {
80+
AtomicFileOperations fileWrite =
81+
new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
82+
BufferedWriter brWriter = null;
83+
DataOutputStream dataOutputStream = null;
84+
Gson gsonObjectToWrite = new Gson();
85+
try {
86+
dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
87+
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
88+
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
89+
90+
String metadataInstance = gsonObjectToWrite.toJson(partitionMapper);
91+
brWriter.write(metadataInstance);
92+
} finally {
93+
if (null != brWriter) {
94+
brWriter.flush();
95+
}
96+
CarbonUtil.closeStreams(brWriter);
97+
fileWrite.close();
98+
}
99+
}
100+
101+
/**
102+
* Merge all partition files in a segment to single file.
103+
*
104+
* @param segmentPath
105+
* @throws IOException
106+
*/
107+
public void mergePartitionMapFiles(String segmentPath) throws IOException {
108+
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
109+
if (carbonFile.exists()) {
110+
CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
111+
@Override public boolean accept(CarbonFile file) {
112+
return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
113+
}
114+
});
115+
if (carbonFiles != null && carbonFiles.length > 1) {
116+
PartitionMapper partitionMapper = null;
117+
for (CarbonFile file : carbonFiles) {
118+
PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath());
119+
if (partitionMapper == null && localMapper != null) {
120+
partitionMapper = localMapper;
121+
}
122+
if (localMapper != null) {
123+
partitionMapper = partitionMapper.merge(localMapper);
124+
}
125+
}
126+
if (partitionMapper != null) {
127+
String path = segmentPath + "/" + "mergedpartitions" + CarbonTablePath.PARTITION_MAP_EXT;
128+
writePartitionFile(partitionMapper, path);
129+
for (CarbonFile file : carbonFiles) {
130+
FileFactory.deleteAllCarbonFilesOfDir(file);
131+
}
132+
}
133+
}
134+
}
135+
}
136+
137+
/**
138+
* This method reads the partition file
139+
*
140+
* @param partitionMapPath
141+
* @return
142+
*/
143+
public PartitionMapper readPartitionMap(String partitionMapPath) {
144+
Gson gsonObjectToRead = new Gson();
145+
DataInputStream dataInputStream = null;
146+
BufferedReader buffReader = null;
147+
InputStreamReader inStream = null;
148+
PartitionMapper partitionMapper;
149+
AtomicFileOperations fileOperation =
150+
new AtomicFileOperationsImpl(partitionMapPath, FileFactory.getFileType(partitionMapPath));
151+
152+
try {
153+
if (!FileFactory.isFileExist(partitionMapPath, FileFactory.getFileType(partitionMapPath))) {
154+
return null;
155+
}
156+
dataInputStream = fileOperation.openForRead();
157+
inStream = new InputStreamReader(dataInputStream,
158+
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
159+
buffReader = new BufferedReader(inStream);
160+
partitionMapper = gsonObjectToRead.fromJson(buffReader, PartitionMapper.class);
161+
} catch (IOException e) {
162+
return null;
163+
} finally {
164+
CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
165+
}
166+
167+
return partitionMapper;
168+
}
169+
170+
public static class PartitionMapper implements Serializable {
171+
172+
private static final long serialVersionUID = 3582245668420401089L;
173+
174+
private Map<String, List<String>> partitionMap;
175+
176+
public PartitionMapper merge(PartitionMapper mapper) {
177+
if (this == mapper) {
178+
return this;
179+
}
180+
if (partitionMap != null && mapper.partitionMap != null) {
181+
partitionMap.putAll(mapper.partitionMap);
182+
}
183+
if (partitionMap == null) {
184+
partitionMap = mapper.partitionMap;
185+
}
186+
return this;
187+
}
188+
189+
public Map<String, List<String>> getPartitionMap() {
190+
return partitionMap;
191+
}
192+
193+
public void setPartitionMap(Map<String, List<String>> partitionMap) {
194+
this.partitionMap = partitionMap;
195+
}
196+
}
197+
198+
}

‎core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,10 @@ private org.apache.carbondata.format.PartitionType fromWrapperToExternalPartitio
229229
return org.apache.carbondata.format.PartitionType.RANGE;
230230
case RANGE_INTERVAL:
231231
return org.apache.carbondata.format.PartitionType.RANGE_INTERVAL;
232+
case NATIVE_HIVE:
233+
return org.apache.carbondata.format.PartitionType.NATIVE_HIVE;
232234
default:
233-
return org.apache.carbondata.format.PartitionType.HASH;
235+
return org.apache.carbondata.format.PartitionType.NATIVE_HIVE;
234236
}
235237
}
236238

@@ -559,8 +561,10 @@ private PartitionType fromExternalToWrapperPartitionType(
559561
return PartitionType.RANGE;
560562
case RANGE_INTERVAL:
561563
return PartitionType.RANGE_INTERVAL;
564+
case NATIVE_HIVE:
565+
return PartitionType.NATIVE_HIVE;
562566
default:
563-
return PartitionType.HASH;
567+
return PartitionType.NATIVE_HIVE;
564568
}
565569
}
566570

‎core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/PartitionType.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ public enum PartitionType {
2323
RANGE,
2424
RANGE_INTERVAL,
2525
LIST,
26-
HASH
26+
HASH,
27+
NATIVE_HIVE
2728
}

‎core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.carbondata.core.metadata.encoder.Encoding;
2828
import org.apache.carbondata.core.metadata.schema.BucketingInfo;
2929
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
30+
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
3031
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
3132
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
3233
import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
@@ -573,6 +574,11 @@ public boolean isPartitionTable() {
573574
return null != tablePartitionMap.get(getTableName());
574575
}
575576

577+
public boolean isHivePartitionTable() {
578+
PartitionInfo partitionInfo = tablePartitionMap.get(getTableName());
579+
return null != partitionInfo && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE;
580+
}
581+
576582
/**
577583
* @return absolute table identifier
578584
*/

‎core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java

+4-76
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818
package org.apache.carbondata.core.mutate;
1919

2020
import java.io.IOException;
21-
import java.text.ParseException;
22-
import java.text.SimpleDateFormat;
2321
import java.util.ArrayList;
2422
import java.util.Arrays;
25-
import java.util.Date;
2623
import java.util.HashMap;
2724
import java.util.List;
2825
import java.util.Map;
@@ -382,7 +379,7 @@ public static String getSegmentId(String segmentName) {
382379
return segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
383380
}
384381

385-
public static int getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
382+
public static long getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
386383
String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
387384

388385
// scan all the carbondata files and get the latest task ID.
@@ -397,11 +394,11 @@ public static int getLatestTaskIdForSegment(String segmentId, CarbonTablePath ta
397394
return false;
398395
}
399396
});
400-
int max = 0;
397+
long max = 0;
401398
if (null != dataFiles) {
402399
for (CarbonFile file : dataFiles) {
403-
int taskNumber =
404-
Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()).split("_")[0]);
400+
long taskNumber =
401+
Long.parseLong(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()).split("_")[0]);
405402
if (taskNumber > max) {
406403
max = taskNumber;
407404
}
@@ -412,75 +409,6 @@ public static int getLatestTaskIdForSegment(String segmentId, CarbonTablePath ta
412409

413410
}
414411

415-
public static String getLatestBlockNameForSegment(String segmentId, CarbonTablePath tablePath) {
416-
String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
417-
418-
// scan all the carbondata files and get the latest task ID.
419-
CarbonFile segment =
420-
FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
421-
422-
CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
423-
@Override public boolean accept(CarbonFile file) {
424-
int max = 0;
425-
if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
426-
int taskNumber = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
427-
if (taskNumber >= max) {
428-
return true;
429-
}
430-
}
431-
return false;
432-
}
433-
});
434-
435-
// get the latest among the data files. highest task number will be at the last.
436-
return dataFiles[dataFiles.length - 1].getName();
437-
}
438-
439-
/**
440-
* This method will convert a given timestamp to long value and then to string back
441-
*
442-
* @param factTimeStamp
443-
* @return
444-
*/
445-
public static String convertTimeStampToString(String factTimeStamp) {
446-
SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
447-
Date dateToStr = null;
448-
try {
449-
dateToStr = parser.parse(factTimeStamp);
450-
return Long.toString(dateToStr.getTime());
451-
} catch (ParseException e) {
452-
LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
453-
return null;
454-
}
455-
}
456-
457-
/**
458-
* This method will convert a given timestamp to long value and then to string back
459-
*
460-
* @param factTimeStamp
461-
* @return
462-
*/
463-
public static long convertTimeStampToLong(String factTimeStamp) {
464-
SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
465-
Date dateToStr = null;
466-
try {
467-
dateToStr = parser.parse(factTimeStamp);
468-
return dateToStr.getTime();
469-
} catch (ParseException e) {
470-
LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
471-
parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
472-
try {
473-
dateToStr = parser.parse(factTimeStamp);
474-
return dateToStr.getTime();
475-
} catch (ParseException e1) {
476-
LOGGER
477-
.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e1.getMessage());
478-
return 0;
479-
}
480-
}
481-
}
482-
483-
484412
/**
485413
* Handling of the clean up of old carbondata files, index files , delte delta,
486414
* update status files.

0 commit comments

Comments
 (0)
Please sign in to comment.