Skip to content

Commit 6e26954

Browse files
committed
remove list files while query and invalid cache
1 parent 8691cb7 commit 6e26954

File tree

8 files changed

+36
-102
lines changed

8 files changed

+36
-102
lines changed

core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java

-14
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.hadoop.fs.FileSystem;
4545
import org.apache.hadoop.fs.LocatedFileStatus;
4646
import org.apache.hadoop.fs.Path;
47-
import org.apache.hadoop.fs.PathFilter;
4847
import org.apache.hadoop.fs.RemoteIterator;
4948
import org.apache.hadoop.fs.permission.FsAction;
5049
import org.apache.hadoop.fs.permission.FsPermission;
@@ -480,19 +479,6 @@ public List<CarbonFile> listFiles(Boolean recursive) throws IOException {
480479
return getFiles(listStatus);
481480
}
482481

483-
@Override
484-
public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
485-
List<FileStatus> listStatus = new ArrayList<>();
486-
RemoteIterator<LocatedFileStatus> iter = fileSystem.listLocatedStatus(path);
487-
while (iter.hasNext()) {
488-
LocatedFileStatus fileStatus = iter.next();
489-
if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
490-
listStatus.add(fileStatus);
491-
}
492-
}
493-
return getFiles(listStatus.toArray(new FileStatus[listStatus.size()]));
494-
}
495-
496482
protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
497483
throws IOException {
498484
List<CarbonFile> carbonFiles = new ArrayList<>();

core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java

-6
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.IOException;
2323
import java.util.List;
2424

25-
import org.apache.hadoop.fs.PathFilter;
2625
import org.apache.hadoop.fs.permission.FsPermission;
2726

2827
public interface CarbonFile {
@@ -41,11 +40,6 @@ public interface CarbonFile {
4140

4241
List<CarbonFile> listDirs() throws IOException;
4342

44-
/**
45-
* It returns list of files with location details.
46-
*/
47-
CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException;
48-
4943
String getName();
5044

5145
boolean isDirectory();

core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java

-6
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
5050
import org.apache.commons.io.FileUtils;
5151
import org.apache.hadoop.fs.Path;
52-
import org.apache.hadoop.fs.PathFilter;
5352
import org.apache.hadoop.fs.permission.FsPermission;
5453
import org.apache.log4j.Logger;
5554
import org.xerial.snappy.SnappyInputStream;
@@ -442,11 +441,6 @@ public boolean createNewLockFile() throws IOException {
442441
return file.createNewFile();
443442
}
444443

445-
@Override
446-
public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) {
447-
return listFiles();
448-
}
449-
450444
@Override
451445
public String[] getLocations() {
452446
return new String[]{"localhost"};

core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java

+3-25
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22-
import java.util.HashMap;
2322
import java.util.HashSet;
2423
import java.util.List;
2524
import java.util.Map;
@@ -67,11 +66,6 @@ public BlockletIndexStore(CarbonLRUCache lruCache) {
6766

6867
@Override
6968
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) {
70-
return get(identifierWrapper, null);
71-
}
72-
73-
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
74-
Map<String, Map<String, BlockMetaInfo>> segInfoCache) {
7569
TableBlockIndexUniqueIdentifier identifier =
7670
identifierWrapper.getTableBlockIndexUniqueIdentifier();
7771
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
@@ -83,24 +77,11 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
8377
SegmentIndexFileStore indexFileStore =
8478
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
8579
Set<String> filesRead = new HashSet<>();
86-
String segmentFilePath = identifier.getIndexFilePath();
87-
if (segInfoCache == null) {
88-
segInfoCache = new HashMap<>();
89-
}
90-
Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
91-
segInfoCache.get(segmentFilePath);
92-
if (carbonDataFileBlockMetaInfoMapping == null) {
93-
carbonDataFileBlockMetaInfoMapping =
94-
BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
95-
identifierWrapper.getConfiguration());
96-
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
97-
}
9880
// if the identifier is not a merge file we can directly load the indexes
9981
if (identifier.getMergeIndexFileName() == null) {
10082
List<DataFileFooter> indexInfos = new ArrayList<>();
10183
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil
102-
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
103-
carbonDataFileBlockMetaInfoMapping, indexInfos);
84+
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, indexInfos);
10485
BlockIndex blockIndex =
10586
loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap,
10687
identifierWrapper.getCarbonTable(),
@@ -120,8 +101,7 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
120101
List<DataFileFooter> indexInfos = new ArrayList<>();
121102
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap(
122103
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
123-
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
124-
carbonDataFileBlockMetaInfoMapping, indexInfos);
104+
identifierWrapper.getCarbonTable()), indexFileStore, filesRead, indexInfos);
125105
if (!blockMetaInfoMap.isEmpty()) {
126106
BlockIndex blockIndex =
127107
loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
@@ -157,8 +137,6 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
157137
public List<BlockletIndexWrapper> getAll(
158138
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
159139
throws IOException {
160-
Map<String, Map<String, BlockMetaInfo>> segInfoCache =
161-
new HashMap<String, Map<String, BlockMetaInfo>>();
162140

163141
List<BlockletIndexWrapper> blockletIndexWrappers =
164142
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
@@ -177,7 +155,7 @@ public List<BlockletIndexWrapper> getAll(
177155
}
178156
if (missedIdentifiersWrapper.size() > 0) {
179157
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
180-
blockletIndexWrapper = get(identifierWrapper, segInfoCache);
158+
blockletIndexWrapper = get(identifierWrapper);
181159
blockletIndexWrappers.add(blockletIndexWrapper);
182160
}
183161
}

core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java

+15-43
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,13 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Set;
34-
import java.util.TreeMap;
3534

3635
import org.apache.carbondata.common.logging.LogServiceFactory;
3736
import org.apache.carbondata.core.constants.CarbonCommonConstants;
3837
import org.apache.carbondata.core.datastore.block.SegmentProperties;
3938
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
4039
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
41-
import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
4240
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
43-
import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
4441
import org.apache.carbondata.core.datastore.impl.FileFactory;
4542
import org.apache.carbondata.core.index.Segment;
4643
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
@@ -61,9 +58,6 @@
6158
import org.apache.carbondata.core.util.path.CarbonTablePath;
6259

6360
import org.apache.commons.io.FilenameUtils;
64-
import org.apache.hadoop.conf.Configuration;
65-
import org.apache.hadoop.fs.Path;
66-
import org.apache.hadoop.fs.PathFilter;
6761
import org.apache.log4j.Logger;
6862

6963
public class BlockletIndexUtil {
@@ -79,8 +73,7 @@ public static Set<TableBlockIndexUniqueIdentifier> getSegmentUniqueIdentifiers(S
7973

8074
public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
8175
TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
82-
SegmentIndexFileStore indexFileStore, Set<String> filesRead,
83-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, List<DataFileFooter> indexInfos)
76+
SegmentIndexFileStore indexFileStore, Set<String> filesRead, List<DataFileFooter> indexInfos)
8477
throws IOException {
8578
boolean isTransactionalTable = true;
8679
TableBlockIndexUniqueIdentifier identifier =
@@ -130,8 +123,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
130123
}
131124
String blockPath = footer.getBlockInfo().getFilePath();
132125
if (null == blockMetaInfoMap.get(blockPath)) {
133-
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(
134-
fileNameToMetaInfoMapping, footer.getBlockInfo());
126+
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(footer.getBlockInfo());
135127
// if blockMetaInfo is null that means the file has been deleted from the file system.
136128
// This can happen in case IUD scenarios where after deleting or updating the data the
137129
// complete block is deleted but the entry still exists in index or merge index file
@@ -143,38 +135,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
143135
return blockMetaInfoMap;
144136
}
145137

146-
/**
147-
* This method will create file name to block Meta Info Mapping. This method will reduce the
148-
* number of nameNode calls and using this method one namenode will fetch 1000 entries
149-
*
150-
* @param segmentFilePath
151-
* @return
152-
* @throws IOException
153-
*/
154-
public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
155-
String segmentFilePath, Configuration configuration) throws IOException {
156-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
157-
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration);
158-
if (carbonFile instanceof AbstractDFSCarbonFile && !(carbonFile instanceof S3CarbonFile)) {
159-
PathFilter pathFilter = new PathFilter() {
160-
@Override
161-
public boolean accept(Path path) {
162-
return CarbonTablePath.isCarbonDataFile(path.getName());
163-
}
164-
};
165-
CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
166-
for (CarbonFile file : carbonFiles) {
167-
String[] location = file.getLocations();
168-
long len = file.getSize();
169-
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
170-
fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo);
171-
}
172-
}
173-
return fileNameToMetaInfoMapping;
174-
}
175-
176-
private static BlockMetaInfo createBlockMetaInfo(
177-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, TableBlockInfo blockInfo)
138+
private static BlockMetaInfo createBlockMetaInfo(TableBlockInfo blockInfo)
178139
throws IOException {
179140
String carbonDataFile = blockInfo.getFilePath();
180141
FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
@@ -193,7 +154,18 @@ private static BlockMetaInfo createBlockMetaInfo(
193154
CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
194155
return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
195156
default:
196-
return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile));
157+
// Here, it gets carbondata file path from footer and creates BlockMetaInfo for each file.
158+
// It creates BlockMetaInfo only for valid files.
159+
// Example: Assume a single partition with 1000 carbondata files.
160+
// Perform 1st update: adds 900 new carbondata files to same folder.
161+
// Perform 2nd update (same update query): adds another 900 carbondata files.
162+
// Now the files added by 1st update are invalid.
163+
// Perform query: Creates BlockMetaInfo for valid identifiers.
164+
if (!FileFactory.isFileExist(carbonDataFile)) {
165+
return null;
166+
}
167+
CarbonFile file = FileFactory.getCarbonFile(FileFactory.getFormattedPath(carbonDataFile));
168+
return new BlockMetaInfo(file.getLocations(), file.getSize());
197169
}
198170
}
199171

integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@
2020
import java.io.IOException;
2121
import java.io.Serializable;
2222
import java.util.ArrayList;
23-
import java.util.HashMap;
2423
import java.util.HashSet;
2524
import java.util.Iterator;
2625
import java.util.List;
27-
import java.util.Map;
2826
import java.util.Set;
2927

3028
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -38,7 +36,6 @@
3836
import org.apache.carbondata.core.index.dev.CacheableIndex;
3937
import org.apache.carbondata.core.index.dev.IndexFactory;
4038
import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
41-
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
4239
import org.apache.carbondata.core.indexstore.BlockletIndexStore;
4340
import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
4441
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
@@ -127,8 +124,6 @@ BlockletIndexDetailsWithSchema> createRecordReader(InputSplit inputSplit,
127124
Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache =
128125
CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX);
129126
private Iterator<TableBlockIndexUniqueIdentifier> iterator;
130-
// Cache to avoid multiple times listing of files
131-
private Map<String, Map<String, BlockMetaInfo>> segInfoCache = new HashMap<>();
132127

133128
@Override
134129
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -152,8 +147,7 @@ public boolean nextKeyValue() {
152147
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table,
153148
false, true, true);
154149
this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper;
155-
wrapper = ((BlockletIndexStore) cache)
156-
.get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache);
150+
wrapper = ((BlockletIndexStore) cache).get(tableBlockIndexUniqueIdentifierWrapper);
157151
return true;
158152
}
159153
return false;

integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
3737
import org.apache.carbondata.core.constants.CarbonCommonConstants
3838
import org.apache.carbondata.core.datastore.compression.CompressorFactory
3939
import org.apache.carbondata.core.datastore.impl.FileFactory
40-
import org.apache.carbondata.core.index.Segment
40+
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
4141
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
4242
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
4343
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
@@ -84,6 +84,9 @@ object DeleteExecution {
8484
if (executorErrors.failureCauses == FailureCauses.NONE) {
8585
operatedRowCount = res.flatten.map(_._2._3).sum
8686
}
87+
// clear invalid segments from cache
88+
IndexStoreManager.getInstance()
89+
.clearInvalidSegments(carbonTable, segmentsTobeDeleted.map(_.getSegmentNo).toList.asJava)
8790
(segmentsTobeDeleted, operatedRowCount, isUpdateRequired, tblStatusWriteVersion)
8891
}
8992

integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala

+13
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,19 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
442442
sql("drop table if exists carbonTable2")
443443
}
444444

445+
test("test cache after delete") {
446+
sql("drop table if exists carbonTable1")
447+
sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata")
448+
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
449+
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
450+
var showCache = sql("show metacache on table carbonTable1").collect()
451+
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached"))
452+
sql("delete from carbonTable1 where col3 ='vf'").collect()
453+
showCache = sql("show metacache on table carbonTable1").collect()
454+
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/0 index files cached"))
455+
sql("drop table if exists carbonTable1")
456+
}
457+
445458
// Runs only when index server is enabled.
446459
test("test embedded pruning", false) {
447460
val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {

0 commit comments

Comments
 (0)