Skip to content

Commit 0c8fa59

Browse files
anubhav100ravipesala
authored andcommitted
[CARBONDATA-1731,CARBONDATA-1728] Update,Delete fails incorrectly with error
[QueryExecution] Fetch BlockletId in Executor. Currently the blockletId are not propagated to the excutrs properly. Due to this reason the Implicit columns i.e. tupleId formed in executers can wrongly form duplicate tupleId. For e.g. a Block having two blocklets i.e. 0 and 1, in executors each tasks picks each blocklets. As the ID is not propagated each executor will term the Blocklets as ID 0. Solution is to propagate the Blocklet IDs to the executor from driver This closes apache#1719
1 parent f635106 commit 0c8fa59

File tree

17 files changed

+228
-63
lines changed

17 files changed

+228
-63
lines changed

core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java

+6
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public interface DataRefNode {
5151
*/
5252
long nodeNumber();
5353

54+
/**
55+
* Method is used for retreiving the BlockletId.
56+
* @return the blockletid related to the data block.
57+
*/
58+
String blockletId();
59+
5460
/**
5561
* This method will be used to get the max value of all the columns this can
5662
* be used in case of filter query

core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java

+44-9
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public class TableBlockInfo implements Distributable, Serializable {
6464
*/
6565
private String segmentId;
6666

67+
/**
68+
* id of the Blocklet.
69+
*/
70+
private String blockletId;
71+
6772
private String[] locations;
6873

6974
private ColumnarFormatVersion version;
@@ -76,7 +81,7 @@ public class TableBlockInfo implements Distributable, Serializable {
7681
* map of block location and storage id
7782
*/
7883
private Map<String, String> blockStorageIdMap =
79-
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
84+
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
8085

8186
/**
8287
* delete delta files path for this block
@@ -85,9 +90,11 @@ public class TableBlockInfo implements Distributable, Serializable {
8590

8691
private BlockletDetailInfo detailInfo;
8792

88-
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
89-
long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
93+
public TableBlockInfo(String filePath, long blockOffset, String segmentId,
94+
String[] locations, long blockLength, ColumnarFormatVersion version,
95+
String[] deletedDeltaFilePath) {
9096
this.filePath = FileFactory.getUpdatedFilePath(filePath);
97+
this.blockletId = "0";
9198
this.blockOffset = blockOffset;
9299
this.segmentId = segmentId;
93100
this.locations = locations;
@@ -113,10 +120,29 @@ public TableBlockInfo() {
113120
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
114121
long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
115122
String[] deletedDeltaFilePath) {
116-
this(filePath, blockOffset, segmentId, locations, blockLength, version, deletedDeltaFilePath);
123+
this(filePath, blockOffset, segmentId, locations, blockLength, version,
124+
deletedDeltaFilePath);
117125
this.blockletInfos = blockletInfos;
118126
}
119127

128+
/**
129+
* constructor to initialize the TableBlockInfo with blockletIds
130+
*
131+
* @param filePath
132+
* @param blockOffset
133+
* @param segmentId
134+
* @param locations
135+
* @param blockLength
136+
* @param blockletInfos
137+
*/
138+
public TableBlockInfo(String filePath, String blockletId, long blockOffset, String segmentId,
139+
String[] locations, long blockLength, BlockletInfos blockletInfos,
140+
ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
141+
this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version,
142+
deletedDeltaFilePath);
143+
this.blockletId = blockletId;
144+
}
145+
120146
/**
121147
* constructor to initialize the TableBlockInfo with blockStorageIdMap
122148
*
@@ -129,11 +155,12 @@ public TableBlockInfo(String filePath, long blockOffset, String segmentId, Strin
129155
* @param version
130156
* @param blockStorageIdMap
131157
*/
132-
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
133-
long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
134-
Map<String, String> blockStorageIdMap, String[] deletedDeltaFilePath) {
135-
this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version,
136-
deletedDeltaFilePath);
158+
public TableBlockInfo(String filePath, String blockletId, long blockOffset, String segmentId,
159+
String[] locations, long blockLength, BlockletInfos blockletInfos,
160+
ColumnarFormatVersion version, Map<String, String> blockStorageIdMap,
161+
String[] deletedDeltaFilePath) {
162+
this(filePath, blockletId, blockOffset, segmentId, locations, blockLength, blockletInfos,
163+
version, deletedDeltaFilePath);
137164
this.blockStorageIdMap = blockStorageIdMap;
138165
}
139166

@@ -356,4 +383,12 @@ public BlockletDetailInfo getDetailInfo() {
356383
public void setDetailInfo(BlockletDetailInfo detailInfo) {
357384
this.detailInfo = detailInfo;
358385
}
386+
387+
public String getBlockletId() {
388+
return blockletId;
389+
}
390+
391+
public void setBlockletId(String blockletId) {
392+
this.blockletId = blockletId;
393+
}
359394
}

core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java

+4
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ public BTreeNonLeafNode() {
135135
throw new UnsupportedOperationException("Unsupported operation");
136136
}
137137

138+
@Override public String blockletId() {
139+
throw new UnsupportedOperationException("Unsupported operation");
140+
}
141+
138142
/**
139143
* This method will be used to get the max value of all the columns this can
140144
* be used in case of filter query

core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java

+8
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public TableBlockInfo getTableBlockInfo() {
5959
return blockInfo.getTableBlockInfo();
6060
}
6161

62+
/**
63+
* Below method is suppose to return the Blocklet ID.
64+
* @return
65+
*/
66+
@Override public String blockletId() {
67+
return blockInfo.getTableBlockInfo().getDetailInfo().getBlockletId().toString();
68+
}
69+
6270
/**
6371
* number of pages in blocklet
6472
* @return

core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java

+4
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ public BlockletBTreeLeafNode(BTreeBuilderInfo builderInfos, int leafIndex, long
9898
}
9999
}
100100

101+
@Override public String blockletId() {
102+
return "0";
103+
}
104+
101105
/**
102106
* Below method will be used to get the dimension chunks
103107
*

core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java

+12
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
3636

3737
private short versionNumber;
3838

39+
private short blockletId;
40+
3941
private int[] dimLens;
4042

4143
private long schemaUpdatedTimeStamp;
@@ -94,6 +96,7 @@ public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
9496
out.writeInt(rowCount);
9597
out.writeShort(pagesCount);
9698
out.writeShort(versionNumber);
99+
out.writeShort(blockletId);
97100
out.writeShort(dimLens.length);
98101
for (int i = 0; i < dimLens.length; i++) {
99102
out.writeInt(dimLens[i]);
@@ -106,6 +109,7 @@ public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
106109
rowCount = in.readInt();
107110
pagesCount = in.readShort();
108111
versionNumber = in.readShort();
112+
blockletId = in.readShort();
109113
dimLens = new int[in.readShort()];
110114
for (int i = 0; i < dimLens.length; i++) {
111115
dimLens[i] = in.readInt();
@@ -114,4 +118,12 @@ public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
114118
blockletInfo = new BlockletInfo();
115119
blockletInfo.readFields(in);
116120
}
121+
122+
public Short getBlockletId() {
123+
return blockletId;
124+
}
125+
126+
public void setBlockletId(Short blockletId) {
127+
this.blockletId = blockletId;
128+
}
117129
}

core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java

+1
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) {
599599
detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
600600
detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
601601
detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
602+
detailInfo.setBlockletId((short) blockletId);
602603
detailInfo.setDimLens(columnCardinality);
603604
detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
604605
BlockletInfo blockletInfo = new BlockletInfo();

core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java

+5
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
5353
BlockletDetailInfo detailInfo = blockInfo.getDetailInfo();
5454
detailInfo.getBlockletInfo().setNumberOfRows(detailInfo.getRowCount());
5555
detailInfo.getBlockletInfo().setNumberOfPages(detailInfo.getPagesCount());
56+
detailInfo.setBlockletId(blockInfo.getDetailInfo().getBlockletId());
5657
int[] pageRowCount = new int[detailInfo.getPagesCount()];
5758
int numberOfPagesCompletelyFilled = detailInfo.getRowCount()
5859
/ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
@@ -86,6 +87,10 @@ public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
8687
return index;
8788
}
8889

90+
@Override public String blockletId() {
91+
return blockInfos.get(index).getDetailInfo().getBlockletId().toString();
92+
}
93+
8994
@Override public byte[][] getColumnsMaxValue() {
9095
return null;
9196
}

core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
7676
totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
7777
scannedResult.setBlockletId(
7878
blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
79-
.getDataBlock().nodeNumber());
79+
.getDataBlock().blockletId());
8080
DimensionRawColumnChunk[] dimensionRawColumnChunks =
8181
blocksChunkHolder.getDimensionRawDataChunk();
8282
DimensionColumnDataChunk[][] dimensionColumnDataChunks =

core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ private AbstractScannedResult fillScannedResult(BlocksChunkHolder blocksChunkHol
175175
AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
176176
scannedResult.setBlockletId(
177177
blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
178-
.getDataBlock().nodeNumber());
178+
.getDataBlock().blockletId());
179179
// valid scanned blocklet
180180
QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
181181
.get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);

hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java

+18-16
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
409409
if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
410410
continue;
411411
}
412-
carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit,
412+
carbonSplits.add(CarbonInputSplit.from(segmentId, "0", fileSplit,
413413
ColumnarFormatVersion.valueOf(
414414
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
415415
}
@@ -452,9 +452,9 @@ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterReso
452452
Boolean isIUDTable = false;
453453

454454
AbsoluteTableIdentifier absoluteTableIdentifier =
455-
getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
455+
getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
456456
SegmentUpdateStatusManager updateStatusManager =
457-
new SegmentUpdateStatusManager(absoluteTableIdentifier);
457+
new SegmentUpdateStatusManager(absoluteTableIdentifier);
458458

459459
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
460460

@@ -476,22 +476,23 @@ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterReso
476476
// In case IUD is not performed in this table avoid searching for
477477
// invalidated blocks.
478478
if (CarbonUtil
479-
.isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
480-
invalidBlockVOForSegmentId, updateStatusManager)) {
479+
.isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
480+
invalidBlockVOForSegmentId, updateStatusManager)) {
481481
continue;
482482
}
483483
// When iud is done then only get delete delta files for a block
484484
try {
485485
deleteDeltaFilePath =
486-
updateStatusManager.getDeleteDeltaFilePath(tableBlockInfo.getFilePath());
486+
updateStatusManager.getDeleteDeltaFilePath(tableBlockInfo.getFilePath());
487487
} catch (Exception e) {
488488
throw new IOException(e);
489489
}
490490
}
491-
result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
492-
tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
493-
tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
494-
tableBlockInfo.getVersion(), deleteDeltaFilePath));
491+
result.add(new CarbonInputSplit(segmentNo, tableBlockInfo.getBlockletId(),
492+
new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(),
493+
tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(),
494+
tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), tableBlockInfo.getVersion(),
495+
deleteDeltaFilePath));
495496
}
496497
}
497498
return result;
@@ -583,7 +584,7 @@ private List<TableBlockInfo> getTableBlockInfo(JobContext job,
583584
Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, UpdateVO updateDetails,
584585
SegmentUpdateStatusManager updateStatusManager,
585586
String segmentId, Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys)
586-
throws IOException {
587+
throws IOException {
587588
List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
588589

589590
// get file location of all files of given segment
@@ -603,7 +604,8 @@ private List<TableBlockInfo> getTableBlockInfo(JobContext job,
603604
BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
604605
carbonInputSplit.getNumberOfBlocklets());
605606
tableBlockInfoList.add(
606-
new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
607+
new TableBlockInfo(carbonInputSplit.getPath().toString(),
608+
carbonInputSplit.getBlockletId(), carbonInputSplit.getStart(),
607609
tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
608610
carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
609611
carbonInputSplit.getBlockStorageIdMap(), carbonInputSplit.getDeleteDeltaFiles()));
@@ -613,9 +615,9 @@ private List<TableBlockInfo> getTableBlockInfo(JobContext job,
613615
}
614616

615617
private boolean isValidBlockBasedOnUpdateDetails(
616-
Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit,
617-
UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId,
618-
Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys) {
618+
Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit,
619+
UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId,
620+
Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys) {
619621
String taskID = null;
620622
if (null != carbonInputSplit) {
621623
if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
@@ -741,7 +743,7 @@ public BlockMappingVO getBlockRowCount(JobContext job,
741743
updateStatusManager);
742744
for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskMap :
743745
taskAbstractIndexMap
744-
.entrySet()) {
746+
.entrySet()) {
745747
AbstractIndex taskAbstractIndex = taskMap.getValue();
746748
countOfBlocksInSeg += new BlockLevelTraverser()
747749
.getBlockRowMapping(taskAbstractIndex, blockRowCountMapping, eachValidSeg,

0 commit comments

Comments
 (0)