diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java index 88c9ae436e85f..c1acee2973365 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java @@ -12,6 +12,9 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexResponse; @@ -28,6 +31,8 @@ import org.opensearch.index.store.CompositeDirectory; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheStats; +import org.opensearch.index.store.remote.filecache.FullFileCacheStats; import org.opensearch.index.store.remote.utils.FileTypeUtils; import org.opensearch.indices.IndicesService; import org.opensearch.node.Node; @@ -36,7 +41,9 @@ import java.util.Arrays; import java.util.HashSet; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -49,6 +56,7 @@ public class WritableWarmIT extends RemoteStoreBaseIntegTestCase { protected static final String INDEX_NAME = "test-idx-1"; + protected static final String INDEX_NAME_2 = "test-idx-2"; protected static final int NUM_DOCS_IN_BULK = 1000; /* @@ -168,4 +176,88 @@ public void testWritableWarmBasic() throws Exception { assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); fileCache.prune(); } + + public void testFullFileAndFileCacheStats() throws ExecutionException, InterruptedException { + + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(); + internalTestCluster.startDataAndSearchNodes(1); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) + .build(); + + assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get()); + + // Verify from the cluster settings if the data locality is partial + GetIndexResponse getIndexResponse = client().admin() + .indices() + .getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true)) + .get(); + + Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2); + assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())); + + // Ingesting docs again before force merge + indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME_2); + + // ensuring cluster is green + ensureGreen(); + + SearchResponse searchResponse = client().prepareSearch(INDEX_NAME_2).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + + // Ingesting docs again before force merge + indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME_2); + + FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache(); + + // TODO: Make these validation more robust, when SwitchableIndexInput is implemented. + + NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet(); + + FileCacheStats fileCacheStats = nodesStatsResponse.getNodes() + .stream() + .filter(n -> n.getNode().isDataNode()) + .toList() + .getFirst() + .getFileCacheStats(); + + if (Objects.isNull(fileCacheStats)) { + fail("File Cache Stats should not be null"); + } + + FullFileCacheStats fullFileCacheStats = fileCacheStats.fullFileCacheStats(); + + if (Objects.isNull(fullFileCacheStats)) { + fail("Full File Cache Stats should not be null"); + } + + // Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file + // leaks + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME_2)).get()); + fileCache.prune(); + + NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet(); + int nonEmptyFileCacheNodes = 0; + for (NodeStats stats : response.getNodes()) { + FileCacheStats fcStats = stats.getFileCacheStats(); + if (Objects.isNull(fcStats) == false) { + if (isFileCacheEmpty(fcStats) == false) { + nonEmptyFileCacheNodes++; + } + } + } + assertEquals(0, nonEmptyFileCacheNodes); + + } + + private boolean isFileCacheEmpty(FileCacheStats stats) { + return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L; + } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index a8aa33a977cb8..2e529ea5ae395 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -14,7 +14,6 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.breaker.CircuitBreakingException; -import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.index.store.remote.utils.cache.RefCountedCache; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; import org.opensearch.index.store.remote.utils.cache.stats.CacheStats; @@ -133,10 +132,15 @@ public long prune(Predicate keyPredicate) { } @Override - public CacheUsage usage() { + public long usage() { return theCache.usage(); } + @Override + public long activeUsage() { + return theCache.activeUsage(); + } + @Override public CacheStats stats() { return theCache.stats(); @@ -145,8 +149,8 @@ public CacheStats stats() { // To be used only for debugging purposes public void logCurrentState() { logger.trace("CURRENT STATE OF FILE CACHE \n"); - CacheUsage cacheUsage = theCache.usage(); - logger.trace("Total Usage: " + cacheUsage.usage() + " , Active Usage: " + cacheUsage.activeUsage()); + long cacheUsage = theCache.usage(); + logger.trace("Total Usage: " + cacheUsage); theCache.logCurrentState(); } @@ -206,16 +210,23 @@ public void restoreFromDirectory(List fileCacheDataPaths) { * Returns the current {@link FileCacheStats} */ public FileCacheStats fileCacheStats() { - CacheStats stats = stats(); - CacheUsage usage = usage(); + final CacheStats stats = stats(); + final CacheStats.FullFileStats fullFileStats = stats.fullFileStats(); + return new FileCacheStats( System.currentTimeMillis(), - usage.activeUsage(), + stats.activeUsage(), capacity(), - usage.usage(), + stats.usage(), stats.evictionWeight(), stats.hitCount(), - stats.missCount() + stats.missCount(), + new FullFileCacheStats( + fullFileStats.getActiveUsage(), + fullFileStats.getUsage(), + fullFileStats.getEvictionWeight(), + fullFileStats.getHitCount() + ) ); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheStats.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheStats.java index 070fd663896a3..8b18d679c8218 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheStats.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheStats.java @@ -19,7 +19,15 @@ import java.io.IOException; /** - * Statistics on file cache + * Statistics for the file cache system that tracks memory usage and performance metrics. + * {@link FileCache} internally uses a {@link org.opensearch.index.store.remote.utils.cache.SegmentedCache} + * to manage cached file data in memory segments. + * This class aggregates statistics across all cache segments including: + * - Memory usage (total, active, used) + * - Cache performance (hits, misses, evictions) + * - Utilization percentages + * The statistics are exposed via {@link org.opensearch.action.admin.cluster.node.stats.NodeStats} + * to provide visibility into cache behavior and performance. * * @opensearch.api */ @@ -33,6 +41,7 @@ public class FileCacheStats implements Writeable, ToXContentFragment { private final long evicted; private final long hits; private final long misses; + private final FullFileCacheStats fullFileCacheStats; public FileCacheStats( final long timestamp, @@ -41,7 +50,8 @@ public FileCacheStats( final long used, final long evicted, final long hits, - final long misses + final long misses, + final FullFileCacheStats fullFileCacheStats ) { this.timestamp = timestamp; this.active = active; @@ -50,6 +60,7 @@ public FileCacheStats( this.evicted = evicted; this.hits = hits; this.misses = misses; + this.fullFileCacheStats = fullFileCacheStats; } public FileCacheStats(final StreamInput in) throws IOException { @@ -60,6 +71,7 @@ public FileCacheStats(final StreamInput in) throws IOException { this.evicted = in.readLong(); this.hits = in.readLong(); this.misses = in.readLong(); + this.fullFileCacheStats = new FullFileCacheStats(in); } public static short calculatePercentage(long used, long max) { @@ -75,6 +87,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeLong(evicted); out.writeLong(hits); out.writeLong(misses); + if (fullFileCacheStats != null) fullFileCacheStats.writeTo(out); } public long getTimestamp() { @@ -113,6 +126,10 @@ public long getCacheMisses() { return misses; } + public FullFileCacheStats fullFileCacheStats() { + return fullFileCacheStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.FILE_CACHE); @@ -125,6 +142,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.USED_PERCENT, getUsedPercent()); builder.field(Fields.HIT_COUNT, getCacheHits()); builder.field(Fields.MISS_COUNT, getCacheMisses()); + if (fullFileCacheStats != null) { + fullFileCacheStats.toXContent(builder, params); + } builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCacheStats.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCacheStats.java new file mode 100644 index 0000000000000..438f64404ef65 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCacheStats.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +import static org.opensearch.index.store.remote.filecache.FileCacheStats.calculatePercentage; + +/** + * Statistics for the file cache system that tracks memory usage and performance metrics. + * Aggregates statistics across all cache segments including: + * - Memory usage: active and used bytes. + * - Cache performance: hit counts and eviction counts. + * - Utilization: active percentage of total used memory. + * The statistics are exposed as part of {@link FileCacheStats} and via {@link org.opensearch.action.admin.cluster.node.stats.NodeStats} + * to provide visibility into cache behavior and performance. + * + * @opensearch.api + */ +@ExperimentalApi +public class FullFileCacheStats implements Writeable, ToXContentFragment { + + private final long active; + private final long used; + private final long evicted; + private final long hits; + + public FullFileCacheStats(final long active, final long used, final long evicted, final long hits) { + this.active = active; + this.used = used; + this.evicted = evicted; + this.hits = hits; + } + + public FullFileCacheStats(final StreamInput in) throws IOException { + this.active = in.readLong(); + this.used = in.readLong(); + this.evicted = in.readLong(); + this.hits = in.readLong(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeLong(active); + out.writeLong(used); + out.writeLong(evicted); + out.writeLong(hits); + } + + public long getActive() { + return active; + } + + public long getUsed() { + return used; + } + + public long getEvicted() { + return evicted; + } + + public long getHits() { + return hits; + } + + public short getActivePercent() { + return calculatePercentage(active, used); + } + + static final class Fields { + static final String FULL_FILE_STATS = "full_file_stats"; + static final String ACTIVE = "active"; + static final String ACTIVE_IN_BYTES = "active_in_bytes"; + static final String USED = "used"; + static final String USED_IN_BYTES = "used_in_bytes"; + static final String EVICTIONS = "evictions"; + static final String EVICTIONS_IN_BYTES = "evictions_in_bytes"; + static final String ACTIVE_PERCENT = "active_percent"; + static final String HIT_COUNT = "hit_count"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.FULL_FILE_STATS); + builder.humanReadableField(FullFileCacheStats.Fields.ACTIVE_IN_BYTES, FullFileCacheStats.Fields.ACTIVE, getActive()); + builder.humanReadableField(FullFileCacheStats.Fields.USED_IN_BYTES, FullFileCacheStats.Fields.USED, getUsed()); + builder.humanReadableField(FullFileCacheStats.Fields.EVICTIONS_IN_BYTES, FullFileCacheStats.Fields.EVICTIONS, getEvicted()); + builder.field(FullFileCacheStats.Fields.ACTIVE_PERCENT, getActivePercent()); + builder.field(FullFileCacheStats.Fields.HIT_COUNT, getHits()); + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 2b619c26f49c0..d7f484bb26a79 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -111,13 +111,13 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, Stream try { // This local file cache is ref counted and may not strictly enforce configured capacity. // If we find available capacity is exceeded, deny further BlobFetchRequests. - if (fileCache.capacity() < fileCache.usage().usage()) { + if (fileCache.capacity() < fileCache.usage()) { fileCache.prune(); throw new IOException( "Local file cache capacity (" + fileCache.capacity() + ") exceeded (" - + fileCache.usage().usage() + + fileCache.usage() + ") - BlobFetchRequest failed: " + request.getFilePath() ); diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/CacheUsage.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/CacheUsage.java deleted file mode 100644 index 0b5480d3ca978..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/CacheUsage.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.remote.utils.cache; - -import org.opensearch.common.annotation.PublicApi; - -/** - * Usage metrics for {@link RefCountedCache} - * - * @opensearch.internal - */ -@PublicApi(since = "2.7.0") -public class CacheUsage { - /** - * Cache usage of the system - */ - private final long usage; - - /** - * Cache usage by entries which are referenced - */ - private final long activeUsage; - - public CacheUsage(long usage, long activeUsage) { - this.usage = usage; - this.activeUsage = activeUsage; - } - - public long usage() { - return usage; - } - - public long activeUsage() { - return activeUsage; - } -} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java index 6e30a39e27bb1..d5841be0fd8b3 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java @@ -29,7 +29,7 @@ /** * LRU implementation of {@link RefCountedCache}. As long as {@link Node#refCount} greater than 0 then node is not eligible for eviction. - * So this is best effort lazy cache to maintain capacity.
+ * So this is the best effort lazy cache to maintain capacity.
* For more context why in-house cache implementation exist look at * this comment and * this ticket for future plans @@ -58,20 +58,10 @@ class LRUCache implements RefCountedCache { private final Weigher weigher; - private final StatsCounter statsCounter; + private final StatsCounter statsCounter; private final ReentrantLock lock; - /** - * this tracks cache usage on the system (as long as cache entry is in the cache) - */ - private long usage; - - /** - * this tracks cache usage only by entries which are being referred ({@link Node#refCount > 0}) - */ - private long activeUsage; - static class Node { final K key; @@ -117,7 +107,7 @@ public V get(K key) { } // hit incRef(key); - statsCounter.recordHits(key, 1); + statsCounter.recordHits(key, node.value, 1); return node.value; } finally { lock.unlock(); @@ -168,7 +158,7 @@ public V compute(K key, BiFunction remappingF removeNode(key); return null; } else { - statsCounter.recordHits(key, 1); + statsCounter.recordHits(key, node.value, 1); replaceNode(node, newValue); return newValue; } @@ -193,16 +183,16 @@ public void remove(K key) { public void clear() { lock.lock(); try { - usage = 0L; - activeUsage = 0L; lru.clear(); final Iterator> iterator = data.values().iterator(); while (iterator.hasNext()) { Node node = iterator.next(); iterator.remove(); - statsCounter.recordRemoval(node.weight); + statsCounter.recordRemoval(node.value, node.weight); listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); } + statsCounter.resetUsage(); + statsCounter.resetActiveUsage(); } finally { lock.unlock(); } @@ -222,7 +212,7 @@ public void incRef(K key) { if (node != null) { if (node.refCount == 0) { // if it was inactive, we should add the weight to active usage from now - activeUsage += node.weight; + statsCounter.recordActiveUsage(node.value, node.weight, false); } if (node.evictable()) { @@ -254,7 +244,7 @@ public void decRef(K key) { if (node.refCount == 0) { // if it was active, we should remove its weight from active usage - activeUsage -= node.weight; + statsCounter.recordActiveUsage(node.value, node.weight, true); } } } finally { @@ -276,10 +266,9 @@ public long prune(Predicate keyPredicate) { iterator.remove(); data.remove(node.key, node); sum += node.weight; - statsCounter.recordRemoval(node.weight); + statsCounter.recordRemoval(node.value, node.weight); listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); } - usage -= sum; } finally { lock.unlock(); } @@ -287,10 +276,21 @@ public long prune(Predicate keyPredicate) { } @Override - public CacheUsage usage() { + public long usage() { + lock.lock(); + try { + return statsCounter.usage(); + } finally { + lock.unlock(); + } + } + + @Override + + public long activeUsage() { lock.lock(); try { - return new CacheUsage(usage, activeUsage); + return statsCounter.activeUsage(); } finally { lock.unlock(); } @@ -333,7 +333,7 @@ private void addNode(K key, V value) { final long weight = weigher.weightOf(value); Node newNode = new Node<>(key, value, weight); data.put(key, newNode); - usage += weight; + statsCounter.recordUsage(value, weight, false); incRef(key); evict(); } @@ -346,13 +346,9 @@ private void replaceNode(Node node, V newValue) { // update the value and weight node.value = newValue; node.weight = newWeight; - // update usage - final long weightDiff = newWeight - oldWeight; - if (node.refCount > 0) { - activeUsage += weightDiff; - } - usage += weightDiff; - statsCounter.recordReplacement(); + + // update stats + statsCounter.recordReplacement(oldValue, newValue, oldWeight, newWeight, node.refCount > 0); listener.onRemoval(new RemovalNotification<>(node.key, oldValue, RemovalReason.REPLACED)); } incRef(node.key); @@ -363,19 +359,18 @@ private void removeNode(K key) { Node node = data.remove(key); if (node != null) { if (node.refCount > 0) { - activeUsage -= node.weight; + statsCounter.recordActiveUsage(node.value, node.weight, true); } - usage -= node.weight; if (node.evictable()) { lru.remove(node.key); } - statsCounter.recordRemoval(node.weight); + statsCounter.recordRemoval(node.value, node.weight); listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); } } private boolean hasOverflowed() { - return usage >= capacity; + return statsCounter.usage() >= capacity; } private void evict() { @@ -387,8 +382,7 @@ private void evict() { iterator.remove(); // Notify the listener only if the entry was evicted data.remove(node.key, node); - usage -= node.weight; - statsCounter.recordEviction(node.weight); + statsCounter.recordEviction(node.value, node.weight); listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.CAPACITY)); } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java index e6b5a5f945d83..f90cd20b895cf 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java @@ -98,7 +98,14 @@ default long prune() { * * @return the combined weight of the values in this cache */ - CacheUsage usage(); + long usage(); + + /** + * Returns the active usage of this cache. + * + * @return the combined active weight of the values in this cache. + */ + long activeUsage(); /** * Returns a current snapshot of this cache's cumulative statistics. All statistics are diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java index ecf9bb2ead0d2..4c8c515ce0633 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java @@ -152,38 +152,92 @@ public long prune(Predicate keyPredicate) { } @Override - public CacheUsage usage() { - long usage = 0L; - long activeUsage = 0L; + public long usage() { + long totalUsage = 0L; for (RefCountedCache cache : table) { - CacheUsage c = cache.usage(); - usage += c.usage(); - activeUsage += c.activeUsage(); + CacheStats c = cache.stats(); + totalUsage += c.usage(); + + } + return totalUsage; + } + + @Override + public long activeUsage() { + long totalActiveUsage = 0L; + for (RefCountedCache cache : table) { + CacheStats c = cache.stats(); + totalActiveUsage += c.activeUsage(); } - return new CacheUsage(usage, activeUsage); + return totalActiveUsage; } @Override public CacheStats stats() { - long hitCount = 0L; - long missCount = 0L; - long removeCount = 0L; - long removeWeight = 0L; - long replaceCount = 0L; - long evictionCount = 0L; - long evictionWeight = 0L; + + long totalHitCount = 0L; + long totalMissCount = 0L; + long totalRemoveCount = 0L; + long totalRemoveWeight = 0L; + long totalReplaceCount = 0L; + long totalEvictionCount = 0L; + long totalEvictionWeight = 0L; + long totalUsage = 0L; + long totalActiveUsage = 0L; + + // full file counts + long totalFullFileHitCount = 0L; + long totalFullFileRemoveCount = 0L; + long totalFullFileRemoveWeight = 0L; + long totalFullFileReplaceCount = 0L; + long totalFullFileEvictionCount = 0L; + long totalFullFileEvictionWeight = 0L; + long totalFullFileUsage = 0L; + long totalFullFileActiveUsage = 0L; for (RefCountedCache cache : table) { CacheStats c = cache.stats(); - hitCount += c.hitCount(); - missCount += c.missCount(); - removeCount += c.removeCount(); - removeWeight += c.removeWeight(); - replaceCount += c.replaceCount(); - evictionCount += c.evictionCount(); - evictionWeight += c.evictionWeight(); + totalHitCount += c.hitCount(); + totalMissCount += c.missCount(); + totalRemoveCount += c.removeCount(); + totalRemoveWeight += c.removeWeight(); + totalReplaceCount += c.replaceCount(); + totalEvictionCount += c.evictionCount(); + totalEvictionWeight += c.evictionWeight(); + totalUsage += c.usage(); + totalActiveUsage += c.activeUsage(); + + CacheStats.FullFileStats fullFileStats = c.fullFileStats(); + + totalFullFileHitCount += fullFileStats.getHitCount(); + totalFullFileRemoveCount += fullFileStats.getRemoveCount(); + totalFullFileRemoveWeight += fullFileStats.getRemoveWeight(); + totalFullFileReplaceCount += fullFileStats.getReplaceCount(); + totalFullFileEvictionCount += fullFileStats.getEvictionCount(); + totalFullFileEvictionWeight += fullFileStats.getEvictionWeight(); + totalFullFileUsage += fullFileStats.getUsage(); + totalFullFileActiveUsage += fullFileStats.getActiveUsage(); } - return new CacheStats(hitCount, missCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight); + + return new CacheStats( + totalHitCount, + totalMissCount, + totalRemoveCount, + totalRemoveWeight, + totalReplaceCount, + totalEvictionCount, + totalEvictionWeight, + totalUsage, + totalActiveUsage, + totalFullFileHitCount, + totalFullFileRemoveCount, + totalFullFileRemoveWeight, + totalFullFileReplaceCount, + totalFullFileEvictionCount, + totalFullFileEvictionWeight, + totalFullFileUsage, + totalFullFileActiveUsage + ); } // To be used only for debugging purposes diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/CacheStats.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/CacheStats.java index 55893752669a8..597232e15ae7f 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/CacheStats.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/CacheStats.java @@ -8,6 +8,7 @@ package org.opensearch.index.store.remote.utils.cache.stats; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.index.store.remote.utils.cache.RefCountedCache; @@ -27,6 +28,144 @@ public final class CacheStats { private final long replaceCount; private final long evictionCount; private final long evictionWeight; + private final long usage; + private final long activeUsage; + private final FullFileStats fullFileStats; + + /** + * Represents Stats about FullFiles in a {@link RefCountedCache}. + */ + @ExperimentalApi + public class FullFileStats { + private final long hitCount; + private final long removeCount; + private final long removeWeight; + private final long replaceCount; + private final long evictionCount; + private final long evictionWeight; + private final long usage; + private final long activeUsage; + + FullFileStats( + long hitCount, + long removeCount, + long removeWeight, + long replaceCount, + long evictionCount, + long evictionWeight, + long usage, + long activeUsage + ) { + + if ((hitCount < 0) + || (removeCount < 0) + || (removeWeight < 0) + || (replaceCount < 0) + || (evictionCount < 0) + || (evictionWeight < 0)) { + throw new IllegalArgumentException(); + } + this.hitCount = hitCount; + this.removeCount = removeCount; + this.removeWeight = removeWeight; + this.replaceCount = replaceCount; + this.evictionCount = evictionCount; + this.evictionWeight = evictionWeight; + this.usage = usage; + this.activeUsage = activeUsage; + + } + + public long getActiveUsage() { + return activeUsage; + } + + public long getUsage() { + return usage; + } + + public long getEvictionWeight() { + return evictionWeight; + } + + public long getEvictionCount() { + return evictionCount; + } + + public long getReplaceCount() { + return replaceCount; + } + + public long getRemoveWeight() { + return removeWeight; + } + + public long getRemoveCount() { + return removeCount; + } + + public long getHitCount() { + return hitCount; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } else if (!(o instanceof FullFileStats)) { + return false; + } + FullFileStats other = (FullFileStats) o; + return hitCount == other.hitCount + && removeCount == other.removeCount + && removeWeight == other.removeWeight + && replaceCount == other.replaceCount + && evictionCount == other.evictionCount + && evictionWeight == other.evictionWeight + && usage == other.usage + && activeUsage == other.activeUsage; + } + + @Override + public int hashCode() { + return Objects.hash(hitCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight, usage, activeUsage); + } + + @Override + public String toString() { + return getClass().getSimpleName() + + '{' + + "hitCount=" + + hitCount + + ", " + + "missCount=" + + missCount + + ", " + + "removeCount=" + + removeCount + + ", " + + "removeWeight=" + + removeWeight + + ", " + + "replaceCount=" + + replaceCount + + ", " + + "evictionCount=" + + evictionCount + + ", " + + "evictionWeight=" + + evictionWeight + + ", " + + "usage=" + + usage + + ", " + + "activeUsage=" + + activeUsage + + '}'; + } + + // + } /** * Constructs a new {@code CacheStats} instance. @@ -49,7 +188,17 @@ public CacheStats( long removeWeight, long replaceCount, long evictionCount, - long evictionWeight + long evictionWeight, + long usage, + long activeUsage, + long fullFileHitCount, + long fullFileRemoveCount, + long fullFileRemoveWeight, + long fullFileReplaceCount, + long fullFileEvictionCount, + long fullFileEvictionWeight, + long fullFileUsage, + long fullFileActiveUsage ) { if ((hitCount < 0) || (missCount < 0) @@ -67,6 +216,18 @@ public CacheStats( this.replaceCount = replaceCount; this.evictionCount = evictionCount; this.evictionWeight = evictionWeight; + this.usage = usage; + this.activeUsage = activeUsage; + this.fullFileStats = new FullFileStats( + fullFileHitCount, + fullFileRemoveCount, + fullFileRemoveWeight, + fullFileReplaceCount, + fullFileEvictionCount, + fullFileEvictionWeight, + fullFileUsage, + fullFileActiveUsage + ); } /** @@ -176,9 +337,46 @@ public long evictionWeight() { return evictionWeight; } + /** + * Returns the total weight of the cache. + * + * @return the total weight of the cache + */ + public long usage() { + return usage; + } + + /** + * Returns the total active weight of the cache. + * + * @return the total active weight of the cache + */ + public long activeUsage() { + return activeUsage; + } + + /** + * Returns full file stats for the cache. + * @return + */ + public FullFileStats fullFileStats() { + return fullFileStats; + } + @Override public int hashCode() { - return Objects.hash(hitCount, missCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight); + return Objects.hash( + hitCount, + missCount, + removeCount, + removeWeight, + replaceCount, + evictionCount, + evictionWeight, + usage, + activeUsage, + fullFileStats + ); } @Override @@ -195,7 +393,10 @@ public boolean equals(Object o) { && removeWeight == other.removeWeight && replaceCount == other.replaceCount && evictionCount == other.evictionCount - && evictionWeight == other.evictionWeight; + && evictionWeight == other.evictionWeight + && usage == other.usage + && activeUsage == other.activeUsage + && fullFileStats.equals(other.fullFileStats); } @Override @@ -222,6 +423,15 @@ public String toString() { + ", " + "evictionWeight=" + evictionWeight + + ", " + + "usage=" + + usage + + ", " + + "activeUsage=" + + activeUsage + + ", " + + "fullFileStats=" + + fullFileStats.toString() + '}'; } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/DefaultStatsCounter.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/DefaultStatsCounter.java index 60fe5223ef37e..d99bc6dd0f6c1 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/DefaultStatsCounter.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/DefaultStatsCounter.java @@ -8,12 +8,14 @@ package org.opensearch.index.store.remote.utils.cache.stats; +import org.opensearch.index.store.remote.filecache.CachedFullFileIndexInput; + /** * A non thread-safe {@link StatsCounter} implementation. * * @opensearch.internal */ -public class DefaultStatsCounter implements StatsCounter { +public class DefaultStatsCounter implements StatsCounter { private long hitCount; private long missCount; private long removeCount; @@ -21,6 +23,25 @@ public class DefaultStatsCounter implements StatsCounter { private long replaceCount; private long evictionCount; private long evictionWeight; + /** + * this tracks cache usage on the system (as long as cache entry is in the cache) + */ + private long usage; + + /** + * this tracks cache usage only by entries which are being referred. + */ + private long activeUsage; + + // Stats Counters for Full File. + private long fullFileHitCount; + private long fullFileRemoveCount; + private long fullFileRemoveWeight; + private long fullFileReplaceCount; + private long fullFileEvictionCount; + private long fullFileEvictionWeight; + private long fullFileUsage; + private long fullFileActiveUsage; public DefaultStatsCounter() { this.hitCount = 0L; @@ -30,11 +51,23 @@ public DefaultStatsCounter() { this.replaceCount = 0L; this.evictionCount = 0L; this.evictionWeight = 0L; + this.usage = 0L; + this.activeUsage = 0L; + this.fullFileHitCount = 0L; + this.fullFileRemoveCount = 0L; + this.fullFileRemoveWeight = 0L; + this.fullFileReplaceCount = 0L; + this.fullFileEvictionCount = 0L; + this.fullFileEvictionWeight = 0L; + this.fullFileUsage = 0L; + this.fullFileActiveUsage = 0L; } @Override - public void recordHits(K key, int count) { + public void recordHits(K key, V value, int count) { hitCount += count; + + if (isFullFile(value)) fullFileHitCount++; } @Override @@ -43,29 +76,121 @@ public void recordMisses(K key, int count) { } @Override - public void recordRemoval(long weight) { + public void recordRemoval(V value, long weight) { removeCount++; removeWeight += weight; + usage -= weight; + + if (isFullFile(value)) { + fullFileRemoveCount++; + fullFileRemoveWeight += weight; + fullFileUsage -= weight; + } } @Override - public void recordReplacement() { + public void recordReplacement(V oldValue, V newValue, long oldWeight, long newWeight, boolean shouldUpdateActiveUsage) { replaceCount++; + if (isFullFile(oldValue)) fullFileReplaceCount++; + + boolean isOldFullFile = isFullFile(oldValue); + boolean isNewFullFile = isFullFile(newValue); + + if (shouldUpdateActiveUsage) activeUsage = activeUsage - oldWeight + newWeight; + usage = usage - oldWeight + newWeight; + + if (isOldFullFile == false && isNewFullFile == true) { + if (shouldUpdateActiveUsage) fullFileActiveUsage = fullFileActiveUsage + newWeight; + fullFileUsage = fullFileUsage + newWeight; + } else if (isOldFullFile == true && isNewFullFile == false) { + if (shouldUpdateActiveUsage) fullFileActiveUsage = fullFileActiveUsage - oldWeight; + fullFileUsage = fullFileUsage - oldWeight; + } else if (isOldFullFile == true && isNewFullFile == true) { + if (shouldUpdateActiveUsage) fullFileActiveUsage = fullFileActiveUsage - oldWeight + newWeight; + fullFileUsage = fullFileUsage - oldWeight + newWeight; + } + } @Override - public void recordEviction(long weight) { + public void recordEviction(V value, long weight) { evictionCount++; evictionWeight += weight; + usage -= weight; + + if (isFullFile(value)) { + fullFileEvictionCount++; + fullFileEvictionWeight += weight; + fullFileUsage -= weight; + } + } + + @Override + public void recordUsage(V value, long weight, boolean shouldDecrease) { + weight = shouldDecrease ? -1 * weight : weight; + usage += weight; + if (isFullFile(value)) fullFileUsage += weight; + } + + @Override + public void recordActiveUsage(V value, long weight, boolean shouldDecrease) { + weight = shouldDecrease ? -1 * weight : weight; + + activeUsage += weight; + if (isFullFile(value)) fullFileActiveUsage += weight; + } + + @Override + public void resetActiveUsage() { + this.activeUsage = 0; + this.fullFileActiveUsage = 0; + } + + @Override + public void resetUsage() { + this.usage = 0; + this.fullFileUsage = 0; + } + + @Override + public long activeUsage() { + return this.activeUsage; + } + + @Override + public long usage() { + return this.usage; } @Override public CacheStats snapshot() { - return new CacheStats(hitCount, missCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight); + return new CacheStats( + hitCount, + missCount, + removeCount, + removeWeight, + replaceCount, + evictionCount, + evictionWeight, + usage, + activeUsage, + fullFileHitCount, + fullFileRemoveCount, + fullFileRemoveWeight, + fullFileReplaceCount, + fullFileEvictionCount, + fullFileEvictionWeight, + fullFileUsage, + fullFileActiveUsage + ); } @Override public String toString() { return snapshot().toString(); } + + private boolean isFullFile(V value) { + return value instanceof CachedFullFileIndexInput; + } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/StatsCounter.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/StatsCounter.java index b096bb8d652ae..a849220904a1f 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/StatsCounter.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/StatsCounter.java @@ -18,14 +18,14 @@ * * @opensearch.internal */ -public interface StatsCounter { +public interface StatsCounter { /** * Records cache hits. This should be called when a cache request returns a cached value. * * @param count the number of hits to record */ - void recordHits(K key, int count); + void recordHits(K key, V value, int count); /** * Records cache misses. This should be called when a cache request returns a value that was not @@ -47,7 +47,7 @@ public interface StatsCounter { * * @param weight the weight of the removed entry */ - void recordRemoval(long weight); + void recordRemoval(V value, long weight); /** * Records the replacement of an entry from the cache. This should only been called when an entry is @@ -55,7 +55,7 @@ public interface StatsCounter { * {@link RefCountedCache#put(Object, Object)} * {@link RefCountedCache#compute(Object, BiFunction)} */ - void recordReplacement(); + void recordReplacement(V oldValue, V newValue, long oldWeight, long newWeight, boolean shouldUpdateActiveUsage); /** * Records the eviction of an entry from the cache. This should only been called when an entry is @@ -64,7 +64,49 @@ public interface StatsCounter { * * @param weight the weight of the evicted entry */ - void recordEviction(long weight); + void recordEviction(V value, long weight); + + /** + * Records the usage of the cache. This should be called when an entry is created/removed/replaced in the cache. + * + * @param value Entry of the cache. + * @param weight Weight of the entry. + * @param shouldDecrease Should the usage of the cache be decreased or not. + */ + void recordUsage(V value, long weight, boolean shouldDecrease); + + /** + * Records the cache usage by entries which are active (being referenced). + * This should be called when an active entry is created/removed/replaced in the cache. + * @param value Entry of the cache. + * @param weight Weight of the entry. + * @param shouldDecrease Should the active usage of the cache be decreased or not. + */ + void recordActiveUsage(V value, long weight, boolean shouldDecrease); + + /** + * Resets the cache usage by entries which are active (being referenced). + * This should be called when cache is cleared. + */ + void resetActiveUsage(); + + /** + * Resets the cache usage. + * This should be called when cache is cleared. + */ + void resetUsage(); + + /** + * Returns the active usage of the cache. + * @return Active usage of the cache. + */ + long activeUsage(); + + /** + * Returns the usage of the cache. + * @return Usage of the cache. + */ + long usage(); /** * Returns a snapshot of this counter's values. Note that this may be an inconsistent view, as it diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index db77ec7628e76..0b446443d4b25 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -80,7 +80,7 @@ public FsInfo stats(FsInfo previous) throws IOException { paths[i] = getFSInfo(dataLocations[i]); if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) { paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes()); - paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage()); + paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage()); // fileCacheFree will be less than zero if the cache being over-subscribed long fileCacheFree = paths[i].fileCacheReserved - paths[i].fileCacheUtilized; if (fileCacheFree > 0) { diff --git a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java index 4ec7db2f3d552..06879f7c047ef 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java @@ -37,6 +37,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.remote.filecache.FileCacheStats; +import org.opensearch.index.store.remote.filecache.FullFileCacheStats; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; @@ -94,7 +95,8 @@ private static Map randomFileCacheStats() { randomLong(), randomLong(), randomLong(), - randomLong() + randomLong(), + new FullFileCacheStats(randomLong(), randomLong(), randomLong(), randomLong()) ); builder.put(key, fileCacheStats); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 94e91c3f7c3c1..726875e4f926b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -302,9 +302,9 @@ public void testDiskThresholdForRemoteShards() { shardSizes.put("[test][0][r]", 10L); Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); - fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); - fileCacheStatsMap.put("node3", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0, null)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0, null)); + fileCacheStatsMap.put("node3", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0, null)); final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -396,8 +396,8 @@ public void testFileCacheRemoteShardsDecisions() { // First node has filecache size as 0, second has 1000, greater than the shard sizes. Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 0, 0, 0, 0, 0)); - fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 0, 0, 0, 0, 0, null)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0, null)); final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheStatsTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheStatsTests.java index 7931c6fec5414..2000eeb3ba3a5 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheStatsTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheStatsTests.java @@ -10,7 +10,6 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.index.store.remote.utils.cache.stats.CacheStats; import org.opensearch.test.OpenSearchTestCase; @@ -25,34 +24,74 @@ public static CacheStats getMockCacheStats() { final long replaced = randomLongBetween(0, 10000); final long hits = randomLongBetween(0, 10000); final long miss = randomLongBetween(0, 10000); - return new CacheStats(hits, miss, 0, removed, replaced, 0, evicted); - } + final long usage = randomLongBetween(10000, BYTES_IN_GB); + final long activeUsage = randomLongBetween(10000, BYTES_IN_GB); + final long fullFileHitCount = randomLongBetween(0, 10000); + final long fullFileRemoveCount = randomLongBetween(0, 10000); + final long fullFileRemoveWeight = randomLongBetween(10000, BYTES_IN_GB); + final long fullFileReplaceCount = randomLongBetween(0, 10000); + final long fullFileEvictionCount = randomLongBetween(0, 10000); + final long fullFileEvictionWeight = randomLongBetween(10000, BYTES_IN_GB); + final long fullFileUsage = randomLongBetween(0, 10000); + final long fullFileActiveUsage = randomLongBetween(0, 10000); - public static CacheUsage getMockCacheUsage(long total) { - final long used = randomLongBetween(100, total); - final long active = randomLongBetween(10, used); - return new CacheUsage(used, active); + return new CacheStats( + hits, + miss, + 0, + removed, + replaced, + 0, + evicted, + usage, + activeUsage, + fullFileHitCount, + fullFileRemoveCount, + fullFileRemoveWeight, + fullFileReplaceCount, + fullFileEvictionCount, + fullFileEvictionWeight, + fullFileUsage, + fullFileActiveUsage + ); } public static long getMockCacheCapacity() { return randomLongBetween(10 * BYTES_IN_GB, 1000 * BYTES_IN_GB); } - public static FileCacheStats getFileCacheStats(final long fileCacheCapacity, final CacheStats stats, final CacheUsage usage) { + public static FileCacheStats getFileCacheStats(final long fileCacheCapacity, final CacheStats stats) throws IOException { return new FileCacheStats( System.currentTimeMillis(), - usage.activeUsage(), + stats.activeUsage(), fileCacheCapacity, - usage.usage(), + stats.usage(), stats.evictionWeight(), stats.hitCount(), - stats.missCount() + stats.missCount(), + getMockFullFileCacheStats() ); } - public static FileCacheStats getMockFileCacheStats() { + public static FullFileCacheStats getMockFullFileCacheStats() { + final long active = randomLongBetween(100000, BYTES_IN_GB); + final long used = randomLongBetween(100000, BYTES_IN_GB); + final long evicted = randomLongBetween(0, getMockCacheStats().fullFileStats().getEvictionWeight()); + final long hit = randomLongBetween(0, 10); + return new FullFileCacheStats(active, used, evicted, hit); + } + + public static FileCacheStats getMockFileCacheStats() throws IOException { final long fcSize = getMockCacheCapacity(); - return getFileCacheStats(fcSize, getMockCacheStats(), getMockCacheUsage(fcSize)); + return getFileCacheStats(fcSize, getMockCacheStats()); + } + + public static void validateFullFileStats(FullFileCacheStats original, FullFileCacheStats deserialized) { + assertEquals(original.getHits(), deserialized.getHits()); + assertEquals(original.getActive(), deserialized.getActive()); + assertEquals(original.getUsed(), deserialized.getUsed()); + assertEquals(original.getEvicted(), deserialized.getEvicted()); + assertEquals(original.getActivePercent(), deserialized.getActivePercent()); } public static void validateFileCacheStats(FileCacheStats original, FileCacheStats deserialized) { @@ -64,6 +103,7 @@ public static void validateFileCacheStats(FileCacheStats original, FileCacheStat assertEquals(original.getEvicted(), deserialized.getEvicted()); assertEquals(original.getCacheHits(), deserialized.getCacheHits()); assertEquals(original.getCacheMisses(), deserialized.getCacheMisses()); + validateFullFileStats(original.fullFileCacheStats(), deserialized.fullFileCacheStats()); } public void testFileCacheStatsSerialization() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index c1e3636cc9928..fadad159cd4cf 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.filecache; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.breaker.TestCircuitBreaker; @@ -16,7 +18,6 @@ import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; -import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -77,6 +78,69 @@ public void testGet() { } } + public void testGetWithCachedFullFileIndexInput() throws IOException { + FileCache fileCache = createFileCache(1 * 1000); + for (int i = 0; i < 4; i++) { + Path filePath = path.resolve(NodeEnvironment.CACHE_FOLDER) + .resolve("indexName") + .resolve("shardId") + .resolve(Integer.toString(i)) + .resolve(RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION); + createFile("indexName", "shardId/".concat(Integer.toString(i)), "test_file"); + FSDirectory fsDirectory = FSDirectory.open(filePath); + FileCachedIndexInput fileCachedIndexInput = new FileCachedIndexInput( + fileCache, + filePath, + fsDirectory.openInput("test_file", IOContext.DEFAULT) + ); + fileCache.put(filePath.resolve("test_file"), new CachedFullFileIndexInput(fileCache, filePath, fileCachedIndexInput)); + } + // verify all files are put into file cache + for (int i = 0; i < 4; i++) { + Path filePath = path.resolve(NodeEnvironment.CACHE_FOLDER) + .resolve("indexName") + .resolve("shardId") + .resolve(Integer.toString(i)) + .resolve(RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION); + assertNotNull(fileCache.get(filePath.resolve("test_file"))); + + fileCache.decRef(filePath); + fileCache.decRef(filePath); + } + + // Test eviction by adding more files to exceed cache capacity + for (int i = 4; i < 8000; i++) { + Path filePath = path.resolve(NodeEnvironment.CACHE_FOLDER) + .resolve("indexName") + .resolve("shardId") + .resolve(Integer.toString(i)) + .resolve(RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION); + createFile("indexName", "shardId/".concat(Integer.toString(i)), "test_file"); + FSDirectory fsDirectory = FSDirectory.open(filePath); + FileCachedIndexInput fileCachedIndexInput = new FileCachedIndexInput( + fileCache, + filePath, + fsDirectory.openInput("test_file", IOContext.DEFAULT) + ); + fileCache.put(filePath.resolve("test_file"), new CachedFullFileIndexInput(fileCache, filePath, fileCachedIndexInput)); + } + + // Verify some of the original files were evicted + boolean someEvicted = false; + for (int i = 0; i < 8000; i++) { + Path filePath = path.resolve(NodeEnvironment.CACHE_FOLDER) + .resolve("indexName") + .resolve("shardId") + .resolve(Integer.toString(i)) + .resolve(RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION); + if (fileCache.get(filePath) == null) { + someEvicted = true; + break; + } + } + assertTrue("Expected some files to be evicted", someEvicted); + } + public void testGetThrowException() { assertThrows(NullPointerException.class, () -> { FileCache fileCache = createFileCache(MEGA_BYTES); @@ -244,10 +308,13 @@ public void testUsage() { ); putAndDecRef(fileCache, 0, 16 * MEGA_BYTES); - CacheUsage expectedCacheUsage = new CacheUsage(16 * MEGA_BYTES, 0); - CacheUsage realCacheUsage = fileCache.usage(); - assertEquals(expectedCacheUsage.activeUsage(), realCacheUsage.activeUsage()); - assertEquals(expectedCacheUsage.usage(), realCacheUsage.usage()); + long expectedCacheUsage = 16 * MEGA_BYTES; + long expectedActiveCacheUsage = 0; + long realCacheUsage = fileCache.usage(); + long realActiveCacheUsage = fileCache.activeUsage(); + + assertEquals(expectedCacheUsage, realCacheUsage); + assertEquals(expectedActiveCacheUsage, realActiveCacheUsage); } public void testStats() { @@ -278,11 +345,11 @@ public void testCacheRestore() throws IOException { String shardId = "0"; createFile(indexName, shardId, "test.0"); FileCache fileCache = createFileCache(MEGA_BYTES); - assertEquals(0, fileCache.usage().usage()); + assertEquals(0, fileCache.usage()); Path fileCachePath = path.resolve(NodeEnvironment.CACHE_FOLDER).resolve(indexName).resolve(shardId); fileCache.restoreFromDirectory(List.of(fileCachePath)); - assertTrue(fileCache.usage().usage() > 0); - assertEquals(0, fileCache.usage().activeUsage()); + assertTrue(fileCache.usage() > 0); + assertEquals(0, fileCache.activeUsage()); } private void putAndDecRef(FileCache cache, int path, long indexInputSize) { diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java index 258bc2db4c5d0..ce0a4d7bf3c02 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java @@ -74,6 +74,6 @@ public void testSlice() throws IOException { } protected boolean isActiveAndTotalUsageSame() { - return fileCache.usage().activeUsage() == fileCache.usage().usage(); + return fileCache.activeUsage() == fileCache.usage(); } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCacheStatsTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCacheStatsTests.java new file mode 100644 index 0000000000000..7db69d135f78b --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCacheStatsTests.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class FullFileCacheStatsTests extends OpenSearchTestCase { + + private static final long BYTES_IN_GB = 1024 * 1024 * 1024; + + public static FullFileCacheStats getMockFullFileCacheStats() { + final long active = randomLongBetween(100000, BYTES_IN_GB); + final long used = randomLongBetween(100000, BYTES_IN_GB); + final long evicted = randomLongBetween(0, active); + final long hits = randomLongBetween(0, 10); + return new FullFileCacheStats(active, used, evicted, hits); + } + + public static void validateFullFileCacheStats(FullFileCacheStats expected, FullFileCacheStats actual) { + assertEquals(expected.getActive(), actual.getActive()); + assertEquals(expected.getUsed(), actual.getUsed()); + assertEquals(expected.getEvicted(), actual.getEvicted()); + assertEquals(expected.getHits(), actual.getHits()); + assertEquals(expected.getActivePercent(), actual.getActivePercent()); + } + + public void testFullFileCacheStatsSerialization() throws IOException { + final FullFileCacheStats fullFileCacheStats = getMockFullFileCacheStats(); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + fullFileCacheStats.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + validateFullFileCacheStats(fullFileCacheStats, new FullFileCacheStats(in)); + } + } + + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 1eae5119ab462..668eac51b1b81 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -67,10 +67,10 @@ protected static byte[] createData() { public void testSingleAccess() throws Exception { try (IndexInput i = fetchBlobWithName("file")) { assertIndexInputIsFunctional(i); - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB)); + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB)); } - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB)); + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L)); + MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB)); } public void testConcurrentAccess() throws Exception { @@ -152,8 +152,8 @@ public void testDownloadFails() throws Exception { IOException.class, () -> transferManager.fetchBlob(BlobFetchRequest.builder().fileName("file").directory(directory).blobParts(blobParts).build()) ); - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo(0L)); + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L)); + MatcherAssert.assertThat(fileCache.usage(), equalTo(0L)); } public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception { diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java index 97e9fb288136d..b02db5bba58d6 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java @@ -29,16 +29,16 @@ public void testBasicGetAndPutAndRemove() { public void testUsageWithIncrementAndDecrement() { refCountedCache.put("1", 10L); - assertEquals(10L, refCountedCache.usage().usage()); - assertEquals(10L, refCountedCache.usage().activeUsage()); + assertEquals(10L, refCountedCache.usage()); + assertEquals(10L, refCountedCache.activeUsage()); refCountedCache.decRef("1"); - assertEquals(10L, refCountedCache.usage().usage()); - assertEquals(0L, refCountedCache.usage().activeUsage()); + assertEquals(10L, refCountedCache.usage()); + assertEquals(0L, refCountedCache.activeUsage()); refCountedCache.incRef("1"); - assertEquals(10L, refCountedCache.usage().usage()); - assertEquals(10L, refCountedCache.usage().activeUsage()); + assertEquals(10L, refCountedCache.usage()); + assertEquals(10L, refCountedCache.activeUsage()); } public void testEviction() { @@ -53,20 +53,20 @@ public void testEviction() { assertNotNull(refCountedCache.get("4")); assertNotNull(refCountedCache.get("5")); - assertEquals(75L, refCountedCache.usage().usage()); - assertEquals(75L, refCountedCache.usage().activeUsage()); + assertEquals(75L, refCountedCache.usage()); + assertEquals(75L, refCountedCache.activeUsage()); } public void testComputeRemoveWhenExists() { refCountedCache.put("1", 25L); refCountedCache.decRef("1"); - assertEquals(0, refCountedCache.usage().activeUsage()); - assertEquals(25L, refCountedCache.usage().usage()); + assertEquals(0, refCountedCache.activeUsage()); + assertEquals(25L, refCountedCache.usage()); assertNull(refCountedCache.compute("1", (k, v) -> null)); assertNull(refCountedCache.get("1")); - assertEquals(0, refCountedCache.usage().activeUsage()); - assertEquals(0L, refCountedCache.usage().usage()); + assertEquals(0, refCountedCache.activeUsage()); + assertEquals(0L, refCountedCache.usage()); } public void testComputeRemoveWhenNotExists() { @@ -109,8 +109,8 @@ public void testActiveUsageGreaterThanCapacity() { final String key = Integer.toString(i); refCountedCache.put(key, 25L); } - assertEquals(125L, refCountedCache.usage().usage()); - assertEquals(125L, refCountedCache.usage().activeUsage()); + assertEquals(125L, refCountedCache.usage()); + assertEquals(125L, refCountedCache.activeUsage()); } public void testReferenceCountingItemsThatDoNotExist() { @@ -121,8 +121,8 @@ public void testReferenceCountingItemsThatDoNotExist() { assertUsage(0, 0); refCountedCache.decRef("1"); assertNull(refCountedCache.get("1")); - assertEquals(0L, refCountedCache.usage().usage()); - assertEquals(0L, refCountedCache.usage().activeUsage()); + assertEquals(0L, refCountedCache.usage()); + assertEquals(0L, refCountedCache.activeUsage()); } public void testPrune() { @@ -215,13 +215,13 @@ public void testClear() { refCountedCache.put("1", 10L); refCountedCache.put("2", 10L); refCountedCache.put("3", 10L); - assertEquals(30L, refCountedCache.usage().usage()); + assertEquals(30L, refCountedCache.usage()); refCountedCache.clear(); - assertEquals(0L, refCountedCache.usage().usage()); + assertEquals(0L, refCountedCache.usage()); } private void assertUsage(long usage, long activeUsage) { - assertEquals(usage, refCountedCache.usage().usage()); - assertEquals(activeUsage, refCountedCache.usage().activeUsage()); + assertEquals(usage, refCountedCache.usage()); + assertEquals(activeUsage, refCountedCache.activeUsage()); } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4f0600588daef..db49526a66ca2 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -458,7 +458,7 @@ public void testSearchableSnapshotOverSubscription() { Map nodeFileCacheStats = new HashMap<>(); for (TestClusterNodes.TestClusterNode node : testClusterNodes.nodes.values()) { - nodeFileCacheStats.put(node.node.getId(), new FileCacheStats(0, 1, 0, 0, 0, 0, 0)); + nodeFileCacheStats.put(node.node.getId(), new FileCacheStats(0, 1, 0, 0, 0, 0, 0, null)); } ClusterInfo clusterInfo = new ClusterInfo(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), nodeFileCacheStats); testClusterNodes.nodes.values().forEach(node -> when(node.getMockClusterInfoService().getClusterInfo()).thenReturn(clusterInfo));