Skip to content

Commit da0dcc9

Browse files
Added changes to enable full file cache stats
Signed-off-by: Mayank Sharma <[email protected]>
1 parent 7388205 commit da0dcc9

File tree

13 files changed

+575
-36
lines changed

13 files changed

+575
-36
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java

+87
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
import org.apache.lucene.store.Directory;
1414
import org.apache.lucene.store.FilterDirectory;
15+
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
16+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
17+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1518
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
1619
import org.opensearch.action.admin.indices.get.GetIndexRequest;
1720
import org.opensearch.action.admin.indices.get.GetIndexResponse;
@@ -28,6 +31,8 @@
2831
import org.opensearch.index.store.CompositeDirectory;
2932
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
3033
import org.opensearch.index.store.remote.filecache.FileCache;
34+
import org.opensearch.index.store.remote.filecache.FileCacheStats;
35+
import org.opensearch.index.store.remote.filecache.FullFileCacheStats;
3136
import org.opensearch.index.store.remote.utils.FileTypeUtils;
3237
import org.opensearch.indices.IndicesService;
3338
import org.opensearch.node.Node;
@@ -36,7 +41,9 @@
3641

3742
import java.util.Arrays;
3843
import java.util.HashSet;
44+
import java.util.Objects;
3945
import java.util.Set;
46+
import java.util.concurrent.ExecutionException;
4047
import java.util.stream.Collectors;
4148

4249
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -49,6 +56,7 @@
4956
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {
5057

5158
protected static final String INDEX_NAME = "test-idx-1";
59+
protected static final String INDEX_NAME_2 = "test-idx-2";
5260
protected static final int NUM_DOCS_IN_BULK = 1000;
5361

5462
/*
@@ -103,6 +111,7 @@ public void testWritableWarmFeatureFlagDisabled() {
103111
}
104112

105113
public void testWritableWarmBasic() throws Exception {
114+
106115
InternalTestCluster internalTestCluster = internalCluster();
107116
internalTestCluster.startClusterManagerOnlyNode();
108117
internalTestCluster.startDataAndSearchNodes(1);
@@ -168,4 +177,82 @@ public void testWritableWarmBasic() throws Exception {
168177
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
169178
fileCache.prune();
170179
}
180+
181+
public void testFullFileAndFileCacheStats() throws ExecutionException, InterruptedException {
182+
183+
InternalTestCluster internalTestCluster = internalCluster();
184+
internalTestCluster.startClusterManagerOnlyNode();
185+
internalTestCluster.startDataAndSearchNodes(1);
186+
187+
Settings settings = Settings.builder()
188+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
189+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
190+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
191+
.build();
192+
193+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get());
194+
195+
// Verify from the cluster settings if the data locality is partial
196+
GetIndexResponse getIndexResponse = client().admin()
197+
.indices()
198+
.getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true))
199+
.get();
200+
201+
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2);
202+
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
203+
204+
// Ingesting docs again before force merge
205+
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
206+
flushAndRefresh(INDEX_NAME_2);
207+
208+
// ensuring cluster is green
209+
ensureGreen();
210+
211+
SearchResponse searchResponse = client().prepareSearch(INDEX_NAME_2).setQuery(QueryBuilders.matchAllQuery()).get();
212+
// Asserting that search returns same number of docs as ingested
213+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
214+
215+
// Ingesting docs again before force merge
216+
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
217+
flushAndRefresh(INDEX_NAME_2);
218+
219+
FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
220+
221+
// TODO: Make these validation more robust.
222+
223+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
224+
225+
FileCacheStats fileCacheStats = nodesStatsResponse.getNodes().getFirst().getFileCacheStats();
226+
FullFileCacheStats fullFileCacheStats = fileCacheStats.fullFileCacheStats();
227+
228+
if (Objects.isNull(fileCacheStats)) {
229+
fail("File Cache Stats should not be null");
230+
}
231+
232+
if (Objects.isNull(fullFileCacheStats)) {
233+
fail("Full File Cache Stats should not be null");
234+
}
235+
236+
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
237+
// leaks
238+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME_2)).get());
239+
fileCache.prune();
240+
241+
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
242+
int nonEmptyFileCacheNodes = 0;
243+
for (NodeStats stats : response.getNodes()) {
244+
FileCacheStats fcStats = stats.getFileCacheStats();
245+
if (!Objects.isNull(fcStats)) {
246+
if (!isFileCacheEmpty(fcStats)) {
247+
nonEmptyFileCacheNodes++;
248+
}
249+
}
250+
}
251+
assertEquals(0, nonEmptyFileCacheNodes);
252+
253+
}
254+
255+
private boolean isFileCacheEmpty(FileCacheStats stats) {
256+
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
257+
}
171258
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

+73-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.lucene.store.IndexInput;
1414
import org.opensearch.common.annotation.PublicApi;
15+
import org.opensearch.common.cache.RemovalReason;
1516
import org.opensearch.core.common.breaker.CircuitBreaker;
1617
import org.opensearch.core.common.breaker.CircuitBreakingException;
1718
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
@@ -27,6 +28,7 @@
2728
import java.util.function.BiFunction;
2829
import java.util.function.Predicate;
2930

31+
import static org.opensearch.ExceptionsHelper.catchAsRuntimeException;
3032
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
3133

3234
/**
@@ -51,12 +53,66 @@
5153
public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
5254
private static final Logger logger = LogManager.getLogger(FileCache.class);
5355
private final SegmentedCache<Path, CachedIndexInput> theCache;
56+
private final FullFileCacheStatsCollector fullFileCacheStatsCollector;
5457

5558
private final CircuitBreaker circuitBreaker;
5659

57-
public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
58-
this.theCache = cache;
60+
public FileCache(long capacity, CircuitBreaker circuitBreaker) {
61+
62+
this.circuitBreaker = circuitBreaker;
63+
this.fullFileCacheStatsCollector = new FullFileCacheStatsCollector();
64+
65+
this.theCache = SegmentedCache.<Path, CachedIndexInput>builder()
66+
// use length in bytes as the weight of the file item
67+
.weigher(CachedIndexInput::length)
68+
.listener((removalNotification) -> {
69+
RemovalReason removalReason = removalNotification.getRemovalReason();
70+
CachedIndexInput value = removalNotification.getValue();
71+
Path key = removalNotification.getKey();
72+
73+
if (removalReason != RemovalReason.REPLACED) {
74+
75+
// check if full file and publish metric
76+
if (value instanceof CachedFullFileIndexInput) {
77+
fullFileCacheStatsCollector.recordUsedFileBytes(value.length(), false);
78+
}
79+
80+
catchAsRuntimeException(value::close);
81+
catchAsRuntimeException(() -> Files.deleteIfExists(key));
82+
}
83+
})
84+
.capacity(capacity)
85+
.build();
86+
;
87+
88+
}
89+
90+
public FileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
5991
this.circuitBreaker = circuitBreaker;
92+
this.fullFileCacheStatsCollector = new FullFileCacheStatsCollector();
93+
94+
this.theCache = SegmentedCache.<Path, CachedIndexInput>builder()
95+
// use length in bytes as the weight of the file item
96+
.weigher(CachedIndexInput::length)
97+
.listener((removalNotification) -> {
98+
RemovalReason removalReason = removalNotification.getRemovalReason();
99+
CachedIndexInput value = removalNotification.getValue();
100+
Path key = removalNotification.getKey();
101+
if (removalReason != RemovalReason.REPLACED) {
102+
103+
// check if full file and publish metric
104+
if (value instanceof CachedFullFileIndexInput) {
105+
fullFileCacheStatsCollector.recordUsedFileBytes(value.length(), false);
106+
}
107+
108+
catchAsRuntimeException(value::close);
109+
catchAsRuntimeException(() -> Files.deleteIfExists(key));
110+
}
111+
})
112+
.capacity(capacity)
113+
.concurrencyLevel(concurrencyLevel)
114+
.build();
115+
;
60116
}
61117

62118
public long capacity() {
@@ -65,6 +121,10 @@ public long capacity() {
65121

66122
@Override
67123
public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
124+
// check if full file and publish metric
125+
if (indexInput instanceof CachedFullFileIndexInput) {
126+
fullFileCacheStatsCollector.recordUsedFileBytes(indexInput.length(), true);
127+
}
68128
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
69129
checkParentBreaker(filePath);
70130
return cachedIndexInput;
@@ -208,17 +268,27 @@ public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
208268
public FileCacheStats fileCacheStats() {
209269
CacheStats stats = stats();
210270
CacheUsage usage = usage();
271+
211272
return new FileCacheStats(
212273
System.currentTimeMillis(),
213274
usage.activeUsage(),
214275
capacity(),
215276
usage.usage(),
216277
stats.evictionWeight(),
217278
stats.hitCount(),
218-
stats.missCount()
279+
stats.missCount(),
280+
this.fullFileCacheStats()
219281
);
220282
}
221283

284+
/**
285+
* Returns the current {@link FullFileCacheStats}
286+
* @return
287+
*/
288+
private FullFileCacheStats fullFileCacheStats() {
289+
return fullFileCacheStatsCollector.getStats();
290+
}
291+
222292
/**
223293
* Placeholder for the existing file blocks that are in the disk-based
224294
* local cache at node startup time. We can't open a file handle to these

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java

+2-24
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,7 @@
88

99
package org.opensearch.index.store.remote.filecache;
1010

11-
import org.opensearch.common.cache.RemovalReason;
1211
import org.opensearch.core.common.breaker.CircuitBreaker;
13-
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
14-
15-
import java.nio.file.Files;
16-
import java.nio.file.Path;
17-
18-
import static org.opensearch.ExceptionsHelper.catchAsRuntimeException;
1912

2013
/**
2114
* File Cache (FC) is introduced to solve the problem that the local disk cannot hold
@@ -38,26 +31,11 @@
3831
public class FileCacheFactory {
3932

4033
public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
41-
return new FileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
34+
return new FileCache(capacity, circuitBreaker);
4235
}
4336

4437
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
45-
return new FileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
46-
}
47-
48-
private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
49-
return SegmentedCache.<Path, CachedIndexInput>builder()
50-
// use length in bytes as the weight of the file item
51-
.weigher(CachedIndexInput::length)
52-
.listener((removalNotification) -> {
53-
RemovalReason removalReason = removalNotification.getRemovalReason();
54-
CachedIndexInput value = removalNotification.getValue();
55-
Path key = removalNotification.getKey();
56-
if (removalReason != RemovalReason.REPLACED) {
57-
catchAsRuntimeException(value::close);
58-
catchAsRuntimeException(() -> Files.deleteIfExists(key));
59-
}
60-
});
38+
return new FileCache(capacity, concurrencyLevel, circuitBreaker);
6139
}
6240

6341
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheStats.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class FileCacheStats implements Writeable, ToXContentFragment {
3333
private final long evicted;
3434
private final long hits;
3535
private final long misses;
36+
private final FullFileCacheStats fullFileCacheStats;
3637

3738
public FileCacheStats(
3839
final long timestamp,
@@ -41,7 +42,8 @@ public FileCacheStats(
4142
final long used,
4243
final long evicted,
4344
final long hits,
44-
final long misses
45+
final long misses,
46+
final FullFileCacheStats fullFileCacheStats
4547
) {
4648
this.timestamp = timestamp;
4749
this.active = active;
@@ -50,6 +52,7 @@ public FileCacheStats(
5052
this.evicted = evicted;
5153
this.hits = hits;
5254
this.misses = misses;
55+
this.fullFileCacheStats = fullFileCacheStats;
5356
}
5457

5558
public FileCacheStats(final StreamInput in) throws IOException {
@@ -60,6 +63,7 @@ public FileCacheStats(final StreamInput in) throws IOException {
6063
this.evicted = in.readLong();
6164
this.hits = in.readLong();
6265
this.misses = in.readLong();
66+
this.fullFileCacheStats = new FullFileCacheStats(in);
6367
}
6468

6569
public static short calculatePercentage(long used, long max) {
@@ -75,6 +79,7 @@ public void writeTo(final StreamOutput out) throws IOException {
7579
out.writeLong(evicted);
7680
out.writeLong(hits);
7781
out.writeLong(misses);
82+
if (fullFileCacheStats != null) fullFileCacheStats.writeTo(out);
7883
}
7984

8085
public long getTimestamp() {
@@ -113,6 +118,10 @@ public long getCacheMisses() {
113118
return misses;
114119
}
115120

121+
public FullFileCacheStats fullFileCacheStats() {
122+
return fullFileCacheStats;
123+
}
124+
116125
@Override
117126
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
118127
builder.startObject(Fields.FILE_CACHE);
@@ -125,6 +134,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
125134
builder.field(Fields.USED_PERCENT, getUsedPercent());
126135
builder.field(Fields.HIT_COUNT, getCacheHits());
127136
builder.field(Fields.MISS_COUNT, getCacheMisses());
137+
if (fullFileCacheStats != null) {
138+
fullFileCacheStats.toXContent(builder, params);
139+
}
128140
builder.endObject();
129141
return builder;
130142
}

0 commit comments

Comments
 (0)