Skip to content

Commit 0415996

Browse files
committed
remove list files while query and invalid cache
1 parent b8511b6 commit 0415996

File tree

6 files changed

+43
-66
lines changed

6 files changed

+43
-66
lines changed

core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java

+3-24
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,6 @@ public BlockletIndexStore(CarbonLRUCache lruCache) {
6767

6868
@Override
6969
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) {
70-
return get(identifierWrapper, null);
71-
}
72-
73-
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
74-
Map<String, Map<String, BlockMetaInfo>> segInfoCache) {
7570
TableBlockIndexUniqueIdentifier identifier =
7671
identifierWrapper.getTableBlockIndexUniqueIdentifier();
7772
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
@@ -83,24 +78,11 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
8378
SegmentIndexFileStore indexFileStore =
8479
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
8580
Set<String> filesRead = new HashSet<>();
86-
String segmentFilePath = identifier.getIndexFilePath();
87-
if (segInfoCache == null) {
88-
segInfoCache = new HashMap<>();
89-
}
90-
Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
91-
segInfoCache.get(segmentFilePath);
92-
if (carbonDataFileBlockMetaInfoMapping == null) {
93-
carbonDataFileBlockMetaInfoMapping =
94-
BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
95-
identifierWrapper.getConfiguration());
96-
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
97-
}
9881
// if the identifier is not a merge file we can directly load the indexes
9982
if (identifier.getMergeIndexFileName() == null) {
10083
List<DataFileFooter> indexInfos = new ArrayList<>();
10184
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil
102-
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
103-
carbonDataFileBlockMetaInfoMapping, indexInfos);
85+
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, indexInfos);
10486
BlockIndex blockIndex =
10587
loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap,
10688
identifierWrapper.getCarbonTable(),
@@ -120,8 +102,7 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
120102
List<DataFileFooter> indexInfos = new ArrayList<>();
121103
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap(
122104
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
123-
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
124-
carbonDataFileBlockMetaInfoMapping, indexInfos);
105+
identifierWrapper.getCarbonTable()), indexFileStore, filesRead, indexInfos);
125106
if (!blockMetaInfoMap.isEmpty()) {
126107
BlockIndex blockIndex =
127108
loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
@@ -157,8 +138,6 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
157138
public List<BlockletIndexWrapper> getAll(
158139
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
159140
throws IOException {
160-
Map<String, Map<String, BlockMetaInfo>> segInfoCache =
161-
new HashMap<String, Map<String, BlockMetaInfo>>();
162141

163142
List<BlockletIndexWrapper> blockletIndexWrappers =
164143
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
@@ -177,7 +156,7 @@ public List<BlockletIndexWrapper> getAll(
177156
}
178157
if (missedIdentifiersWrapper.size() > 0) {
179158
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
180-
blockletIndexWrapper = get(identifierWrapper, segInfoCache);
159+
blockletIndexWrapper = get(identifierWrapper);
181160
blockletIndexWrappers.add(blockletIndexWrapper);
182161
}
183162
}

core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java

+11-37
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ public static Set<TableBlockIndexUniqueIdentifier> getSegmentUniqueIdentifiers(S
7979

8080
public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
8181
TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
82-
SegmentIndexFileStore indexFileStore, Set<String> filesRead,
83-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, List<DataFileFooter> indexInfos)
82+
SegmentIndexFileStore indexFileStore, Set<String> filesRead, List<DataFileFooter> indexInfos)
8483
throws IOException {
8584
boolean isTransactionalTable = true;
8685
TableBlockIndexUniqueIdentifier identifier =
@@ -130,8 +129,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
130129
}
131130
String blockPath = footer.getBlockInfo().getFilePath();
132131
if (null == blockMetaInfoMap.get(blockPath)) {
133-
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(
134-
fileNameToMetaInfoMapping, footer.getBlockInfo());
132+
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(footer.getBlockInfo());
135133
// if blockMetaInfo is null that means the file has been deleted from the file system.
136134
// This can happen in case IUD scenarios where after deleting or updating the data the
137135
// complete block is deleted but the entry still exists in index or merge index file
@@ -143,38 +141,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
143141
return blockMetaInfoMap;
144142
}
145143

146-
/**
147-
* This method will create file name to block Meta Info Mapping. This method will reduce the
148-
* number of nameNode calls and using this method one namenode will fetch 1000 entries
149-
*
150-
* @param segmentFilePath
151-
* @return
152-
* @throws IOException
153-
*/
154-
public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
155-
String segmentFilePath, Configuration configuration) throws IOException {
156-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
157-
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration);
158-
if (carbonFile instanceof AbstractDFSCarbonFile && !(carbonFile instanceof S3CarbonFile)) {
159-
PathFilter pathFilter = new PathFilter() {
160-
@Override
161-
public boolean accept(Path path) {
162-
return CarbonTablePath.isCarbonDataFile(path.getName());
163-
}
164-
};
165-
CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
166-
for (CarbonFile file : carbonFiles) {
167-
String[] location = file.getLocations();
168-
long len = file.getSize();
169-
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
170-
fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo);
171-
}
172-
}
173-
return fileNameToMetaInfoMapping;
174-
}
175-
176-
private static BlockMetaInfo createBlockMetaInfo(
177-
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, TableBlockInfo blockInfo)
144+
private static BlockMetaInfo createBlockMetaInfo(TableBlockInfo blockInfo)
178145
throws IOException {
179146
String carbonDataFile = blockInfo.getFilePath();
180147
FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
@@ -193,7 +160,14 @@ private static BlockMetaInfo createBlockMetaInfo(
193160
CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
194161
return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
195162
default:
196-
return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile));
163+
if (!FileFactory.isFileExist(carbonDataFile)) {
164+
return null;
165+
}
166+
CarbonFile file = FileFactory.getCarbonFile(FileFactory.getFormattedPath(carbonDataFile));
167+
String[] location = file.getLocations();
168+
long len = file.getSize();
169+
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
170+
return blockMetaInfo;
197171
}
198172
}
199173

integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,6 @@ BlockletIndexDetailsWithSchema> createRecordReader(InputSplit inputSplit,
127127
Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache =
128128
CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX);
129129
private Iterator<TableBlockIndexUniqueIdentifier> iterator;
130-
// Cache to avoid multiple times listing of files
131-
private Map<String, Map<String, BlockMetaInfo>> segInfoCache = new HashMap<>();
132130

133131
@Override
134132
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -152,8 +150,7 @@ public boolean nextKeyValue() {
152150
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table,
153151
false, true, true);
154152
this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper;
155-
wrapper = ((BlockletIndexStore) cache)
156-
.get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache);
153+
wrapper = ((BlockletIndexStore) cache).get(tableBlockIndexUniqueIdentifierWrapper);
157154
return true;
158155
}
159156
return false;

integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.command.mutation
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.spark.sql._
2123
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2224
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -29,6 +31,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
2931
import org.apache.carbondata.common.logging.LogServiceFactory
3032
import org.apache.carbondata.core.exception.ConcurrentOperationException
3133
import org.apache.carbondata.core.features.TableOperation
34+
import org.apache.carbondata.core.index.IndexStoreManager
3235
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
3336
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
3437
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -135,6 +138,11 @@ private[sql] case class CarbonProjectForDeleteCommand(
135138
throw new Exception(executorErrors.errorMsg)
136139
}
137140

141+
// clear invalid segments from cache
142+
IndexStoreManager.getInstance()
143+
.clearInvalidSegments(carbonTable,
144+
deletedSegments.map(_.getSegmentNo).toList.asJava)
145+
138146
// call IUD Compaction.
139147
HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable)
140148

integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.command.mutation
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.spark.sql._
2123
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2224
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -34,7 +36,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
3436
import org.apache.carbondata.core.constants.CarbonCommonConstants
3537
import org.apache.carbondata.core.exception.ConcurrentOperationException
3638
import org.apache.carbondata.core.features.TableOperation
37-
import org.apache.carbondata.core.index.Segment
39+
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
3840
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
3941
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
4042
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
@@ -190,6 +192,10 @@ private[sql] case class CarbonProjectForUpdateCommand(
190192
updateTableModel,
191193
executionErrors)
192194

195+
// clear invalid segments from cache
196+
IndexStoreManager.getInstance()
197+
.clearInvalidSegments(carbonTable,
198+
segmentsToBeDeleted.map(_.getSegmentNo).toList.asJava)
193199
// pre-priming for update command
194200
DeleteExecution.reloadDistributedSegmentCache(carbonTable,
195201
segmentsToBeDeleted, operationContext)(sparkSession)

integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala

+13
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,19 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
442442
sql("drop table if exists carbonTable2")
443443
}
444444

445+
test("test cache after delete") {
446+
sql("drop table if exists carbonTable1")
447+
sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata")
448+
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
449+
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
450+
var showCache = sql("show metacache on table carbonTable1").collect()
451+
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached"))
452+
sql("delete from carbonTable1 where col3 ='vf'").collect()
453+
showCache = sql("show metacache on table carbonTable1").collect()
454+
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/0 index files cached"))
455+
sql("drop table if exists carbonTable1")
456+
}
457+
445458
// Runs only when index server is enabled.
446459
test("test embedded pruning", false) {
447460
val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {

0 commit comments

Comments
 (0)