Skip to content

Commit 48da1b8

Browse files
sgup432Sagar Upadhyaya
and
Sagar Upadhyaya
authored
Fix IndicesRequestCache clean up logic (opensearch-project#13597)
Signed-off-by: Sagar Upadhyaya <[email protected]> Co-authored-by: Sagar Upadhyaya <[email protected]>
1 parent 4eb33b0 commit 48da1b8

File tree

3 files changed

+144
-11
lines changed

3 files changed

+144
-11
lines changed

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.opensearch.common.cache.service.CacheService;
5454
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
5555
import org.opensearch.common.cache.store.config.CacheConfig;
56+
import org.opensearch.common.collect.Tuple;
5657
import org.opensearch.common.lease.Releasable;
5758
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
5859
import org.opensearch.common.settings.Setting;
@@ -410,7 +411,8 @@ static class Key implements Accountable, Writeable {
410411
this.shardId = in.readOptionalWriteable(ShardId::new);
411412
this.readerCacheKeyId = in.readOptionalString();
412413
this.value = in.readBytesReference();
413-
this.indexShardHashCode = in.readInt();
414+
this.indexShardHashCode = in.readInt(); // We are serializing/de-serializing this as we need to store the
415+
// key as part of tiered/disk cache. The key is not passed between nodes at this point.
414416
}
415417

416418
@Override
@@ -450,7 +452,8 @@ public void writeTo(StreamOutput out) throws IOException {
450452
out.writeOptionalWriteable(shardId);
451453
out.writeOptionalString(readerCacheKeyId);
452454
out.writeBytesReference(value);
453-
out.writeInt(indexShardHashCode);
455+
out.writeInt(indexShardHashCode); // We are serializing/de-serializing this as we need to store the
456+
// key as part of tiered/disk cache. The key is not passed between nodes at this point.
454457
}
455458
}
456459

@@ -713,15 +716,16 @@ private synchronized void cleanCache(double stalenessThreshold) {
713716
// Contains CleanupKey objects with open shard but invalidated readerCacheKeyId.
714717
final Set<CleanupKey> cleanupKeysFromOutdatedReaders = new HashSet<>();
715718
// Contains CleanupKey objects of a closed shard.
716-
final Set<Object> cleanupKeysFromClosedShards = new HashSet<>();
719+
final Set<Tuple<ShardId, Integer>> cleanupKeysFromClosedShards = new HashSet<>();
717720

718721
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext();) {
719722
CleanupKey cleanupKey = iterator.next();
720723
iterator.remove();
721724
if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) {
722725
// null indicates full cleanup, as does a closed shard
723-
ShardId shardId = ((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId();
724-
cleanupKeysFromClosedShards.add(shardId);
726+
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
727+
// Add both shardId and indexShardHashCode to uniquely identify an indexShard.
728+
cleanupKeysFromClosedShards.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode()));
725729
} else {
726730
cleanupKeysFromOutdatedReaders.add(cleanupKey);
727731
}
@@ -735,14 +739,22 @@ private synchronized void cleanCache(double stalenessThreshold) {
735739

736740
for (Iterator<ICacheKey<Key>> iterator = cache.keys().iterator(); iterator.hasNext();) {
737741
ICacheKey<Key> key = iterator.next();
738-
if (cleanupKeysFromClosedShards.contains(key.key.shardId)) {
742+
Key delegatingKey = key.key;
743+
if (cleanupKeysFromClosedShards.contains(new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode))) {
739744
// Since the shard is closed, the cache should drop stats for this shard.
740745
dimensionListsToDrop.add(key.dimensions);
741746
iterator.remove();
742747
} else {
743-
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.key.shardId).orElse(null), key.key.readerCacheKeyId);
744-
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
748+
CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null);
749+
if (cacheEntity == null) {
750+
// If cache entity is null, it means that index or shard got deleted/closed meanwhile.
751+
// So we will delete this key.
745752
iterator.remove();
753+
} else {
754+
CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId);
755+
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
756+
iterator.remove();
757+
}
746758
}
747759
}
748760
}

server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void testSerializer() throws Exception {
3030
Random rand = Randomness.get();
3131
for (int valueLength : valueLengths) {
3232
for (int i = 0; i < NUM_KEYS; i++) {
33-
IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId());
33+
IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId(), System.identityHashCode(indexShard));
3434
byte[] serialized = ser.serialize(key);
3535
assertTrue(ser.equals(key, serialized));
3636
IndicesRequestCache.Key deserialized = ser.deserialize(serialized);
@@ -39,13 +39,13 @@ public void testSerializer() throws Exception {
3939
}
4040
}
4141

42-
private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard) {
42+
private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard, int indexShardHashCode) {
4343
byte[] value = new byte[valueLength];
4444
for (int i = 0; i < valueLength; i++) {
4545
value[i] = (byte) (random.nextInt(126 - 32) + 32);
4646
}
4747
BytesReference keyValue = new BytesArray(value);
48-
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID
48+
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), indexShardHashCode); // same UUID
4949
// source as used in real key
5050
}
5151
}

server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java

+121
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,14 @@
4444
import org.apache.lucene.search.TopDocs;
4545
import org.apache.lucene.store.Directory;
4646
import org.apache.lucene.util.BytesRef;
47+
import org.opensearch.Version;
4748
import org.opensearch.cluster.metadata.IndexMetadata;
49+
import org.opensearch.cluster.node.DiscoveryNode;
50+
import org.opensearch.cluster.node.DiscoveryNodes;
51+
import org.opensearch.cluster.routing.RecoverySource;
52+
import org.opensearch.cluster.routing.ShardRouting;
53+
import org.opensearch.cluster.routing.ShardRoutingHelper;
54+
import org.opensearch.cluster.routing.UnassignedInfo;
4855
import org.opensearch.common.CheckedSupplier;
4956
import org.opensearch.common.cache.ICacheKey;
5057
import org.opensearch.common.cache.RemovalNotification;
@@ -55,12 +62,14 @@
5562
import org.opensearch.common.io.stream.BytesStreamOutput;
5663
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
5764
import org.opensearch.common.settings.Settings;
65+
import org.opensearch.common.unit.TimeValue;
5866
import org.opensearch.common.util.FeatureFlags;
5967
import org.opensearch.common.util.io.IOUtils;
6068
import org.opensearch.core.common.bytes.AbstractBytesReference;
6169
import org.opensearch.core.common.bytes.BytesReference;
6270
import org.opensearch.core.common.io.stream.StreamInput;
6371
import org.opensearch.core.common.unit.ByteSizeValue;
72+
import org.opensearch.core.index.Index;
6473
import org.opensearch.core.index.shard.ShardId;
6574
import org.opensearch.core.xcontent.MediaTypeRegistry;
6675
import org.opensearch.core.xcontent.XContentHelper;
@@ -69,9 +78,12 @@
6978
import org.opensearch.index.cache.request.RequestCacheStats;
7079
import org.opensearch.index.cache.request.ShardRequestCache;
7180
import org.opensearch.index.query.TermQueryBuilder;
81+
import org.opensearch.index.seqno.RetentionLeaseSyncer;
7282
import org.opensearch.index.shard.IndexShard;
7383
import org.opensearch.index.shard.IndexShardState;
84+
import org.opensearch.index.shard.IndexShardTestCase;
7485
import org.opensearch.index.shard.ShardNotFoundException;
86+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
7587
import org.opensearch.node.Node;
7688
import org.opensearch.test.ClusterServiceUtils;
7789
import org.opensearch.test.OpenSearchSingleNodeTestCase;
@@ -95,6 +107,8 @@
95107
import java.util.concurrent.Executors;
96108
import java.util.concurrent.atomic.AtomicInteger;
97109

110+
import static java.util.Collections.emptyMap;
111+
import static java.util.Collections.emptySet;
98112
import static org.opensearch.indices.IndicesRequestCache.INDEX_DIMENSION_NAME;
99113
import static org.opensearch.indices.IndicesRequestCache.INDICES_CACHE_QUERY_SIZE;
100114
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
@@ -1298,6 +1312,113 @@ public void testGetOrComputeConcurrentlyWithMultipleIndices() throws Exception {
12981312
executorService.shutdownNow();
12991313
}
13001314

1315+
public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Exception {
1316+
threadPool = getThreadPool();
1317+
String indexName = "test1";
1318+
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
1319+
// Create a shard
1320+
IndexService indexService = createIndex(
1321+
indexName,
1322+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
1323+
);
1324+
Index idx = resolveIndex(indexName);
1325+
ShardRouting shardRouting = indicesService.indexService(idx).getShard(0).routingEntry();
1326+
IndexShard indexShard = indexService.getShard(0);
1327+
Directory dir = newDirectory();
1328+
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
1329+
writer.addDocument(newDoc(0, "foo"));
1330+
writer.addDocument(newDoc(1, "hack"));
1331+
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
1332+
Loader loader = new Loader(reader, 0);
1333+
1334+
// Set clean interval to a high value as we will do it manually here.
1335+
IndicesRequestCache cache = getIndicesRequestCache(
1336+
Settings.builder()
1337+
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000))
1338+
.build()
1339+
);
1340+
IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
1341+
TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar");
1342+
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
1343+
1344+
// Cache some values for indexShard
1345+
BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());
1346+
1347+
// Verify response and stats.
1348+
assertEquals("foo", value.streamInput().readString());
1349+
RequestCacheStats stats = indexShard.requestCache().stats();
1350+
assertEquals("foo", value.streamInput().readString());
1351+
assertEquals(1, cache.count());
1352+
assertEquals(1, stats.getMissCount());
1353+
assertTrue(stats.getMemorySizeInBytes() > 0);
1354+
1355+
// Remove the shard making its cache entries stale
1356+
IOUtils.close(reader, writer, dir);
1357+
indexService.removeShard(0, "force");
1358+
1359+
// We again try to create a shard with same ShardId
1360+
ShardRouting newRouting = shardRouting;
1361+
String nodeId = newRouting.currentNodeId();
1362+
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom");
1363+
newRouting = newRouting.moveToUnassigned(unassignedInfo)
1364+
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
1365+
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
1366+
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
1367+
indexShard = indexService.createShard(
1368+
newRouting,
1369+
s -> {},
1370+
RetentionLeaseSyncer.EMPTY,
1371+
SegmentReplicationCheckpointPublisher.EMPTY,
1372+
null,
1373+
null,
1374+
localNode,
1375+
null,
1376+
DiscoveryNodes.builder().add(localNode).build()
1377+
);
1378+
1379+
// Verify that the new shard requestStats entries are empty.
1380+
stats = indexShard.requestCache().stats();
1381+
assertEquals("foo", value.streamInput().readString());
1382+
assertEquals(1, cache.count()); // Still contains the old indexShard stale entry
1383+
assertEquals(0, stats.getMissCount());
1384+
assertTrue(stats.getMemorySizeInBytes() == 0);
1385+
IndexShardTestCase.updateRoutingEntry(indexShard, newRouting);
1386+
1387+
// Now we cache again with new IndexShard(same shardId as older one).
1388+
dir = newDirectory();
1389+
writer = new IndexWriter(dir, newIndexWriterConfig());
1390+
writer.addDocument(newDoc(0, "foo"));
1391+
writer.addDocument(newDoc(1, "hack"));
1392+
reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
1393+
loader = new Loader(reader, 0);
1394+
cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
1395+
termQuery = new TermQueryBuilder("id", "bar");
1396+
termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
1397+
value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());
1398+
1399+
// Assert response and stats. We verify that cache now has 2 entries, one for older/removed shard and other
1400+
// for the current shard.
1401+
assertEquals("foo", value.streamInput().readString());
1402+
stats = indexShard.requestCache().stats();
1403+
assertEquals("foo", value.streamInput().readString());
1404+
assertEquals(2, cache.count()); // One entry for older shard and other for the current shard.
1405+
assertEquals(1, stats.getMissCount());
1406+
assertTrue(stats.getMemorySizeInBytes() > 0);
1407+
1408+
// Trigger clean up of cache.
1409+
cache.cacheCleanupManager.cleanCache();
1410+
// Verify that cache still has entries for current shard and only removed older shards entries.
1411+
assertEquals(1, cache.count());
1412+
1413+
// Now make current indexShard entries stale as well.
1414+
reader.close();
1415+
// Trigger clean up of cache and verify that cache has no entries now.
1416+
cache.cacheCleanupManager.cleanCache();
1417+
assertEquals(0, cache.count());
1418+
1419+
IOUtils.close(reader, writer, dir, cache);
1420+
}
1421+
13011422
public static String generateString(int length) {
13021423
String characters = "abcdefghijklmnopqrstuvwxyz";
13031424
StringBuilder sb = new StringBuilder(length);

0 commit comments

Comments
 (0)