Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Changes to support Cache pinning in FileCache #17617

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
Expand All @@ -28,6 +31,8 @@
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.filecache.FullFileCacheStats;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
Expand All @@ -36,7 +41,9 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -49,6 +56,7 @@
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final String INDEX_NAME_2 = "test-idx-2";
protected static final int NUM_DOCS_IN_BULK = 1000;

/*
Expand Down Expand Up @@ -168,4 +176,88 @@ public void testWritableWarmBasic() throws Exception {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
fileCache.prune();
}

public void testFullFileAndFileCacheStats() throws ExecutionException, InterruptedException {

InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode();
internalTestCluster.startDataAndSearchNodes(1);

Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();

assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get());

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true))
.get();

Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2);
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));

// Ingesting docs again before force merge
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME_2);

// ensuring cluster is green
ensureGreen();

SearchResponse searchResponse = client().prepareSearch(INDEX_NAME_2).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);

// Ingesting docs again before force merge
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME_2);

FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();

// TODO: Make these validation more robust, when SwitchableIndexInput is implemented.

NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();

FileCacheStats fileCacheStats = nodesStatsResponse.getNodes()
.stream()
.filter(n -> n.getNode().isDataNode())
.toList()
.getFirst()
.getFileCacheStats();

if (Objects.isNull(fileCacheStats)) {
fail("File Cache Stats should not be null");
}

FullFileCacheStats fullFileCacheStats = fileCacheStats.fullFileCacheStats();

if (Objects.isNull(fullFileCacheStats)) {
fail("Full File Cache Stats should not be null");
}

// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
// leaks
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME_2)).get());
fileCache.prune();

NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
int nonEmptyFileCacheNodes = 0;
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcStats = stats.getFileCacheStats();
if (Objects.isNull(fcStats) == false) {
if (isFileCacheEmpty(fcStats) == false) {
nonEmptyFileCacheNodes++;
}
}
}
assertEquals(0, nonEmptyFileCacheNodes);

}

private boolean isFileCacheEmpty(FileCacheStats stats) {
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,15 @@ public String toString() {
*/
public void afterSyncToRemote(String file) {
ensureOpen();
/*
Decrementing the refCount here for the path so that it becomes eligible for eviction
This is a temporary solution until pinning support is added
TODO - Unpin the files here from FileCache so that they become eligible for eviction, once pinning/unpinning support is added in FileCache
Uncomment the below commented line(to remove the file from cache once uploaded) to test block based functionality
*/

logger.trace(
"Composite Directory[{}]: File {} uploaded to Remote Store and now can be eligible for eviction in FileCache",
this::toString,
() -> file
);
fileCache.decRef(getFilePath(file));
// fileCache.remove(getFilePath(fileName));
final Path filePath = getFilePath(file);
fileCache.unpin(filePath);
fileCache.remove(filePath);
}

// Visibility public since we need it in IT tests
Expand Down Expand Up @@ -385,12 +381,9 @@ private String[] getRemoteFiles() throws IOException {

private void cacheFile(String name) throws IOException {
Path filePath = getFilePath(name);
// put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote
// so that it can be evicted after that
// this is just a temporary solution, will pin the file once support for that is added in FileCache
// TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been
// successfully uploaded to Remote

fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT)));
fileCache.pin(filePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;
Expand Down Expand Up @@ -122,6 +121,26 @@ public void decRef(Path key) {
theCache.decRef(key);
}

/**
* Pins the key in the cache, preventing it from being evicted.
*
* @param key
*/
@Override
public void pin(Path key) {
theCache.pin(key);
}

/**
* Unpins the key in the cache, allowing it to be evicted.
*
* @param key
*/
@Override
public void unpin(Path key) {
theCache.unpin(key);
}

@Override
public long prune() {
return theCache.prune();
Expand All @@ -133,10 +152,25 @@ public long prune(Predicate<Path> keyPredicate) {
}

@Override
public CacheUsage usage() {
public long usage() {
return theCache.usage();
}

@Override
public long activeUsage() {
return theCache.activeUsage();
}

/**
* Returns the pinned usage of this cache.
*
* @return the combined pinned weight of the values in this cache.
*/
@Override
public long pinnedUsage() {
return theCache.pinnedUsage();
}

@Override
public CacheStats stats() {
return theCache.stats();
Expand All @@ -145,8 +179,8 @@ public CacheStats stats() {
// To be used only for debugging purposes
public void logCurrentState() {
logger.trace("CURRENT STATE OF FILE CACHE \n");
CacheUsage cacheUsage = theCache.usage();
logger.trace("Total Usage: " + cacheUsage.usage() + " , Active Usage: " + cacheUsage.activeUsage());
long cacheUsage = theCache.usage();
logger.trace("Total Usage: " + cacheUsage);
theCache.logCurrentState();
}

Expand Down Expand Up @@ -206,16 +240,23 @@ public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
* Returns the current {@link FileCacheStats}
*/
public FileCacheStats fileCacheStats() {
CacheStats stats = stats();
CacheUsage usage = usage();
final CacheStats stats = stats();
final CacheStats.FullFileStats fullFileStats = stats.fullFileStats();

return new FileCacheStats(
System.currentTimeMillis(),
usage.activeUsage(),
stats.activeUsage(),
capacity(),
usage.usage(),
stats.usage(),
stats.evictionWeight(),
stats.hitCount(),
stats.missCount()
stats.missCount(),
new FullFileCacheStats(
fullFileStats.getActiveUsage(),
fullFileStats.getUsage(),
fullFileStats.getEvictionWeight(),
fullFileStats.getHitCount()
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
import java.io.IOException;

/**
* Statistics on file cache
* Statistics for the file cache system that tracks memory usage and performance metrics.
* {@link FileCache} internally uses a {@link org.opensearch.index.store.remote.utils.cache.SegmentedCache}
* to manage cached file data in memory segments.
* This class aggregates statistics across all cache segments including:
* - Memory usage (total, active, used)
* - Cache performance (hits, misses, evictions)
* - Utilization percentages
* The statistics are exposed via {@link org.opensearch.action.admin.cluster.node.stats.NodeStats}
* to provide visibility into cache behavior and performance.
*
* @opensearch.api
*/
Expand All @@ -33,6 +41,7 @@ public class FileCacheStats implements Writeable, ToXContentFragment {
private final long evicted;
private final long hits;
private final long misses;
private final FullFileCacheStats fullFileCacheStats;

public FileCacheStats(
final long timestamp,
Expand All @@ -41,7 +50,8 @@ public FileCacheStats(
final long used,
final long evicted,
final long hits,
final long misses
final long misses,
final FullFileCacheStats fullFileCacheStats
) {
this.timestamp = timestamp;
this.active = active;
Expand All @@ -50,6 +60,7 @@ public FileCacheStats(
this.evicted = evicted;
this.hits = hits;
this.misses = misses;
this.fullFileCacheStats = fullFileCacheStats;
}

public FileCacheStats(final StreamInput in) throws IOException {
Expand All @@ -60,6 +71,7 @@ public FileCacheStats(final StreamInput in) throws IOException {
this.evicted = in.readLong();
this.hits = in.readLong();
this.misses = in.readLong();
this.fullFileCacheStats = new FullFileCacheStats(in);
}

public static short calculatePercentage(long used, long max) {
Expand All @@ -75,6 +87,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(evicted);
out.writeLong(hits);
out.writeLong(misses);
if (fullFileCacheStats != null) fullFileCacheStats.writeTo(out);
}

public long getTimestamp() {
Expand Down Expand Up @@ -113,6 +126,10 @@ public long getCacheMisses() {
return misses;
}

public FullFileCacheStats fullFileCacheStats() {
return fullFileCacheStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.FILE_CACHE);
Expand All @@ -125,6 +142,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.USED_PERCENT, getUsedPercent());
builder.field(Fields.HIT_COUNT, getCacheHits());
builder.field(Fields.MISS_COUNT, getCacheMisses());
if (fullFileCacheStats != null) {
fullFileCacheStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
Expand Down
Loading
Loading