Skip to content

Commit c63d26c

Browse files
committed
remove list files while query and invalid cache
1 parent b8511b6 commit c63d26c

File tree

6 files changed

+43
-76
lines changed

6 files changed

+43
-76
lines changed

core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java

+3-25
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22-
import java.util.HashMap;
2322
import java.util.HashSet;
2423
import java.util.List;
2524
import java.util.Map;
@@ -67,11 +66,6 @@ public BlockletIndexStore(CarbonLRUCache lruCache) {
6766

6867
@Override
6968
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) {
70-
return get(identifierWrapper, null);
71-
}
72-
73-
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
74-
Map<String, Map<String, BlockMetaInfo>> segInfoCache) {
7569
TableBlockIndexUniqueIdentifier identifier =
7670
identifierWrapper.getTableBlockIndexUniqueIdentifier();
7771
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
@@ -83,24 +77,11 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
8377
SegmentIndexFileStore indexFileStore =
8478
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
8579
Set<String> filesRead = new HashSet<>();
86-
String segmentFilePath = identifier.getIndexFilePath();
87-
if (segInfoCache == null) {
88-
segInfoCache = new HashMap<>();
89-
}
90-
Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
91-
segInfoCache.get(segmentFilePath);
92-
if (carbonDataFileBlockMetaInfoMapping == null) {
93-
carbonDataFileBlockMetaInfoMapping =
94-
BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
95-
identifierWrapper.getConfiguration());
96-
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
97-
}
9880
// if the identifier is not a merge file we can directly load the indexes
9981
if (identifier.getMergeIndexFileName() == null) {
10082
List<DataFileFooter> indexInfos = new ArrayList<>();
10183
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil
102-
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
103-
carbonDataFileBlockMetaInfoMapping, indexInfos);
84+
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, indexInfos);
10485
BlockIndex blockIndex =
10586
loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap,
10687
identifierWrapper.getCarbonTable(),
@@ -120,8 +101,7 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
120101
List<DataFileFooter> indexInfos = new ArrayList<>();
121102
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap(
122103
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
123-
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
124-
carbonDataFileBlockMetaInfoMapping, indexInfos);
104+
identifierWrapper.getCarbonTable()), indexFileStore, filesRead, indexInfos);
125105
if (!blockMetaInfoMap.isEmpty()) {
126106
BlockIndex blockIndex =
127107
loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
@@ -157,8 +137,6 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
157137
public List<BlockletIndexWrapper> getAll(
158138
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
159139
throws IOException {
160-
Map<String, Map<String, BlockMetaInfo>> segInfoCache =
161-
new HashMap<String, Map<String, BlockMetaInfo>>();
162140

163141
List<BlockletIndexWrapper> blockletIndexWrappers =
164142
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
@@ -177,7 +155,7 @@ public List<BlockletIndexWrapper> getAll(
177155
}
178156
if (missedIdentifiersWrapper.size() > 0) {
179157
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
180-
blockletIndexWrapper = get(identifierWrapper, segInfoCache);
158+
blockletIndexWrapper = get(identifierWrapper);
181159
blockletIndexWrappers.add(blockletIndexWrapper);
182160
}
183161
}

core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java

+11-43
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,13 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Set;
34-
import java.util.TreeMap;
3534

3635
import org.apache.carbondata.common.logging.LogServiceFactory;
3736
import org.apache.carbondata.core.constants.CarbonCommonConstants;
3837
import org.apache.carbondata.core.datastore.block.SegmentProperties;
3938
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
4039
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
41-
import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
4240
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
43-
import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
4441
import org.apache.carbondata.core.datastore.impl.FileFactory;
4542
import org.apache.carbondata.core.index.Segment;
4643
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
@@ -61,9 +58,6 @@
6158
import org.apache.carbondata.core.util.path.CarbonTablePath;
6259

6360
import org.apache.commons.io.FilenameUtils;
64-
import org.apache.hadoop.conf.Configuration;
65-
import org.apache.hadoop.fs.Path;
66-
import org.apache.hadoop.fs.PathFilter;
6761
import org.apache.log4j.Logger;
6862

6963
public class BlockletIndexUtil {
@@ -79,8 +73,7 @@ public static Set<TableBlockIndexUniqueIdentifier> getSegmentUniqueIdentifiers(S
7973

8074
public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
8175
TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
82-
SegmentIndexFileStore indexFileStore, Set<String> filesRead,
83-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, List<DataFileFooter> indexInfos)
76+
SegmentIndexFileStore indexFileStore, Set<String> filesRead, List<DataFileFooter> indexInfos)
8477
throws IOException {
8578
boolean isTransactionalTable = true;
8679
TableBlockIndexUniqueIdentifier identifier =
@@ -130,8 +123,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
130123
}
131124
String blockPath = footer.getBlockInfo().getFilePath();
132125
if (null == blockMetaInfoMap.get(blockPath)) {
133-
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(
134-
fileNameToMetaInfoMapping, footer.getBlockInfo());
126+
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(footer.getBlockInfo());
135127
// if blockMetaInfo is null that means the file has been deleted from the file system.
136128
// This can happen in case IUD scenarios where after deleting or updating the data the
137129
// complete block is deleted but the entry still exists in index or merge index file
@@ -143,38 +135,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
143135
return blockMetaInfoMap;
144136
}
145137

146-
/**
147-
* This method will create file name to block Meta Info Mapping. This method will reduce the
148-
* number of nameNode calls and using this method one namenode will fetch 1000 entries
149-
*
150-
* @param segmentFilePath
151-
* @return
152-
* @throws IOException
153-
*/
154-
public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
155-
String segmentFilePath, Configuration configuration) throws IOException {
156-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
157-
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration);
158-
if (carbonFile instanceof AbstractDFSCarbonFile && !(carbonFile instanceof S3CarbonFile)) {
159-
PathFilter pathFilter = new PathFilter() {
160-
@Override
161-
public boolean accept(Path path) {
162-
return CarbonTablePath.isCarbonDataFile(path.getName());
163-
}
164-
};
165-
CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
166-
for (CarbonFile file : carbonFiles) {
167-
String[] location = file.getLocations();
168-
long len = file.getSize();
169-
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
170-
fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo);
171-
}
172-
}
173-
return fileNameToMetaInfoMapping;
174-
}
175-
176-
private static BlockMetaInfo createBlockMetaInfo(
177-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, TableBlockInfo blockInfo)
138+
private static BlockMetaInfo createBlockMetaInfo(TableBlockInfo blockInfo)
178139
throws IOException {
179140
String carbonDataFile = blockInfo.getFilePath();
180141
FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
@@ -193,7 +154,14 @@ private static BlockMetaInfo createBlockMetaInfo(
193154
CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
194155
return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
195156
default:
196-
return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile));
157+
if (!FileFactory.isFileExist(carbonDataFile)) {
158+
return null;
159+
}
160+
CarbonFile file = FileFactory.getCarbonFile(FileFactory.getFormattedPath(carbonDataFile));
161+
String[] location = file.getLocations();
162+
long len = file.getSize();
163+
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
164+
return blockMetaInfo;
197165
}
198166
}
199167

integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@
2020
import java.io.IOException;
2121
import java.io.Serializable;
2222
import java.util.ArrayList;
23-
import java.util.HashMap;
2423
import java.util.HashSet;
2524
import java.util.Iterator;
2625
import java.util.List;
27-
import java.util.Map;
2826
import java.util.Set;
2927

3028
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -38,7 +36,6 @@
3836
import org.apache.carbondata.core.index.dev.CacheableIndex;
3937
import org.apache.carbondata.core.index.dev.IndexFactory;
4038
import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
41-
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
4239
import org.apache.carbondata.core.indexstore.BlockletIndexStore;
4340
import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
4441
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
@@ -127,8 +124,6 @@ BlockletIndexDetailsWithSchema> createRecordReader(InputSplit inputSplit,
127124
Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache =
128125
CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX);
129126
private Iterator<TableBlockIndexUniqueIdentifier> iterator;
130-
// Cache to avoid multiple times listing of files
131-
private Map<String, Map<String, BlockMetaInfo>> segInfoCache = new HashMap<>();
132127

133128
@Override
134129
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -152,8 +147,7 @@ public boolean nextKeyValue() {
152147
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table,
153148
false, true, true);
154149
this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper;
155-
wrapper = ((BlockletIndexStore) cache)
156-
.get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache);
150+
wrapper = ((BlockletIndexStore) cache).get(tableBlockIndexUniqueIdentifierWrapper);
157151
return true;
158152
}
159153
return false;

integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.command.mutation
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.spark.sql._
2123
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2224
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -29,6 +31,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
2931
import org.apache.carbondata.common.logging.LogServiceFactory
3032
import org.apache.carbondata.core.exception.ConcurrentOperationException
3133
import org.apache.carbondata.core.features.TableOperation
34+
import org.apache.carbondata.core.index.IndexStoreManager
3235
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
3336
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
3437
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -135,6 +138,11 @@ private[sql] case class CarbonProjectForDeleteCommand(
135138
throw new Exception(executorErrors.errorMsg)
136139
}
137140

141+
// clear invalid segments from cache
142+
IndexStoreManager.getInstance()
143+
.clearInvalidSegments(carbonTable,
144+
deletedSegments.map(_.getSegmentNo).toList.asJava)
145+
138146
// call IUD Compaction.
139147
HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable)
140148

integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.command.mutation
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.spark.sql._
2123
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2224
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -34,7 +36,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
3436
import org.apache.carbondata.core.constants.CarbonCommonConstants
3537
import org.apache.carbondata.core.exception.ConcurrentOperationException
3638
import org.apache.carbondata.core.features.TableOperation
37-
import org.apache.carbondata.core.index.Segment
39+
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
3840
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
3941
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
4042
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
@@ -190,6 +192,10 @@ private[sql] case class CarbonProjectForUpdateCommand(
190192
updateTableModel,
191193
executionErrors)
192194

195+
// clear invalid segments from cache
196+
IndexStoreManager.getInstance()
197+
.clearInvalidSegments(carbonTable,
198+
segmentsToBeDeleted.map(_.getSegmentNo).toList.asJava)
193199
// pre-priming for update command
194200
DeleteExecution.reloadDistributedSegmentCache(carbonTable,
195201
segmentsToBeDeleted, operationContext)(sparkSession)

integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala

+13
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,19 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
442442
sql("drop table if exists carbonTable2")
443443
}
444444

445+
test("test cache after delete") {
446+
sql("drop table if exists carbonTable1")
447+
sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata")
448+
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
449+
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
450+
var showCache = sql("show metacache on table carbonTable1").collect()
451+
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached"))
452+
sql("delete from carbonTable1 where col3 ='vf'").collect()
453+
showCache = sql("show metacache on table carbonTable1").collect()
454+
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/0 index files cached"))
455+
sql("drop table if exists carbonTable1")
456+
}
457+
445458
// Runs only when index server is enabled.
446459
test("test embedded pruning", false) {
447460
val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {

0 commit comments

Comments
 (0)