Skip to content

Commit ebda963

Browse files
authored
[Tiered caching] Framework changes (opensearch-project#10753)
* [Tiered caching] Framework changes Signed-off-by: Sagar Upadhyaya <[email protected]> * Added javadoc for new files/packages Signed-off-by: Sagar Upadhyaya <[email protected]> * Added changelog Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing javadoc warnings Signed-off-by: Sagar Upadhyaya <[email protected]> * Addressing comments Signed-off-by: Sagar Upadhyaya <[email protected]> * Addressing additional minor comments Signed-off-by: Sagar Upadhyaya <[email protected]> * Moving non null check to builder for OS onHeapCache Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding package-info for new packages Signed-off-by: Sagar Upadhyaya <[email protected]> * Removing service and adding different cache interfaces along with event listener support Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing gradle missingDoc issue Signed-off-by: Sagar Upadhyaya <[email protected]> * Changing listener logic, removing tiered cache integration with IRC Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding opensearch.internal tag for LoadAwareCacheLoader Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing thread safety issue Signed-off-by: Sagar Upadhyaya <[email protected]> * Remove compute function and event listener logic change for TieredCache Signed-off-by: Sagar Upadhyaya <[email protected]> * Making Cache.compute function private Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding javadoc and more test for cache.put Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding write locks to refresh API as well Signed-off-by: Sagar Upadhyaya <[email protected]> * Removing unwanted EventType class and refactoring one UT Signed-off-by: Sagar Upadhyaya <[email protected]> * Removing TieredCache interface Signed-off-by: Sagar Upadhyaya <[email protected]> --------- Signed-off-by: Sagar Upadhyaya <[email protected]> Signed-off-by: Sagar <[email protected]>
1 parent 6e2e72b commit ebda963

19 files changed

+1567
-55
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
100100
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
101101
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
102102
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
103+
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753]
104+
(https://github.com/opensearch-project/OpenSearch/pull/10753))
103105
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
104106
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
105107
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))

server/src/main/java/org/opensearch/common/cache/Cache.java

+59-53
Original file line numberDiff line numberDiff line change
@@ -424,68 +424,74 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
424424
}
425425
});
426426
if (value == null) {
427-
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
428-
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
429-
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
430-
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
431-
// get the value from this future on the thread that won the race to place the future into the segment map
432-
CacheSegment<K, V> segment = getCacheSegment(key);
433-
CompletableFuture<Entry<K, V>> future;
434-
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
427+
value = compute(key, loader);
428+
}
429+
return value;
430+
}
435431

436-
try (ReleasableLock ignored = segment.writeLock.acquire()) {
437-
future = segment.map.putIfAbsent(key, completableFuture);
438-
}
432+
private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
433+
long now = now();
434+
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
435+
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
436+
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
437+
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
438+
// get the value from this future on the thread that won the race to place the future into the segment map
439+
CacheSegment<K, V> segment = getCacheSegment(key);
440+
CompletableFuture<Entry<K, V>> future;
441+
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
439442

440-
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
441-
if (ok != null) {
442-
try (ReleasableLock ignored = lruLock.acquire()) {
443-
promote(ok, now);
444-
}
445-
return ok.value;
446-
} else {
447-
try (ReleasableLock ignored = segment.writeLock.acquire()) {
448-
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
449-
if (sanity != null && sanity.isCompletedExceptionally()) {
450-
segment.map.remove(key);
451-
}
452-
}
453-
return null;
454-
}
455-
};
443+
try (ReleasableLock ignored = segment.writeLock.acquire()) {
444+
future = segment.map.putIfAbsent(key, completableFuture);
445+
}
456446

457-
CompletableFuture<V> completableValue;
458-
if (future == null) {
459-
future = completableFuture;
460-
completableValue = future.handle(handler);
461-
V loaded;
462-
try {
463-
loaded = loader.load(key);
464-
} catch (Exception e) {
465-
future.completeExceptionally(e);
466-
throw new ExecutionException(e);
467-
}
468-
if (loaded == null) {
469-
NullPointerException npe = new NullPointerException("loader returned a null value");
470-
future.completeExceptionally(npe);
471-
throw new ExecutionException(npe);
472-
} else {
473-
future.complete(new Entry<>(key, loaded, now));
447+
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
448+
if (ok != null) {
449+
try (ReleasableLock ignored = lruLock.acquire()) {
450+
promote(ok, now);
474451
}
452+
return ok.value;
475453
} else {
476-
completableValue = future.handle(handler);
454+
try (ReleasableLock ignored = segment.writeLock.acquire()) {
455+
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
456+
if (sanity != null && sanity.isCompletedExceptionally()) {
457+
segment.map.remove(key);
458+
}
459+
}
460+
return null;
477461
}
462+
};
478463

464+
CompletableFuture<V> completableValue;
465+
if (future == null) {
466+
future = completableFuture;
467+
completableValue = future.handle(handler);
468+
V loaded;
479469
try {
480-
value = completableValue.get();
481-
// check to ensure the future hasn't been completed with an exception
482-
if (future.isCompletedExceptionally()) {
483-
future.get(); // call get to force the exception to be thrown for other concurrent callers
484-
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
485-
}
486-
} catch (InterruptedException e) {
487-
throw new IllegalStateException(e);
470+
loaded = loader.load(key);
471+
} catch (Exception e) {
472+
future.completeExceptionally(e);
473+
throw new ExecutionException(e);
488474
}
475+
if (loaded == null) {
476+
NullPointerException npe = new NullPointerException("loader returned a null value");
477+
future.completeExceptionally(npe);
478+
throw new ExecutionException(npe);
479+
} else {
480+
future.complete(new Entry<>(key, loaded, now));
481+
}
482+
} else {
483+
completableValue = future.handle(handler);
484+
}
485+
V value;
486+
try {
487+
value = completableValue.get();
488+
// check to ensure the future hasn't been completed with an exception
489+
if (future.isCompletedExceptionally()) {
490+
future.get(); // call get to force the exception to be thrown for other concurrent callers
491+
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
492+
}
493+
} catch (InterruptedException e) {
494+
throw new IllegalStateException(e);
489495
}
490496
return value;
491497
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache;
10+
11+
/**
12+
* Represents a cache interface.
13+
* @param <K> Type of key.
14+
* @param <V> Type of value.
15+
*
16+
* @opensearch.experimental
17+
*/
18+
public interface ICache<K, V> {
19+
V get(K key);
20+
21+
void put(K key, V value);
22+
23+
V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception;
24+
25+
void invalidate(K key);
26+
27+
void invalidateAll();
28+
29+
Iterable<K> keys();
30+
31+
long count();
32+
33+
void refresh();
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache;
10+
11+
/**
12+
* Extends a cache loader with awareness of whether the data is loaded or not.
13+
* @param <K> Type of key.
14+
* @param <V> Type of value.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public interface LoadAwareCacheLoader<K, V> extends CacheLoader<K, V> {
19+
boolean isLoaded();
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.store;
10+
11+
import org.opensearch.common.cache.Cache;
12+
import org.opensearch.common.cache.CacheBuilder;
13+
import org.opensearch.common.cache.LoadAwareCacheLoader;
14+
import org.opensearch.common.cache.RemovalListener;
15+
import org.opensearch.common.cache.RemovalNotification;
16+
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
17+
import org.opensearch.common.cache.store.enums.CacheStoreType;
18+
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;
19+
20+
/**
21+
* This variant of on-heap cache uses OpenSearch custom cache implementation.
22+
* @param <K> Type of key.
23+
* @param <V> Type of value.
24+
*
25+
* @opensearch.experimental
26+
*/
27+
public class OpenSearchOnHeapCache<K, V> implements StoreAwareCache<K, V>, RemovalListener<K, V> {
28+
29+
private final Cache<K, V> cache;
30+
31+
private final StoreAwareCacheEventListener<K, V> eventListener;
32+
33+
public OpenSearchOnHeapCache(Builder<K, V> builder) {
34+
CacheBuilder<K, V> cacheBuilder = CacheBuilder.<K, V>builder()
35+
.setMaximumWeight(builder.getMaxWeightInBytes())
36+
.weigher(builder.getWeigher())
37+
.removalListener(this);
38+
if (builder.getExpireAfterAcess() != null) {
39+
cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess());
40+
}
41+
cache = cacheBuilder.build();
42+
this.eventListener = builder.getEventListener();
43+
}
44+
45+
@Override
46+
public V get(K key) {
47+
V value = cache.get(key);
48+
if (value != null) {
49+
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
50+
} else {
51+
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
52+
}
53+
return value;
54+
}
55+
56+
@Override
57+
public void put(K key, V value) {
58+
cache.put(key, value);
59+
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
60+
}
61+
62+
@Override
63+
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
64+
V value = cache.computeIfAbsent(key, key1 -> loader.load(key));
65+
if (!loader.isLoaded()) {
66+
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
67+
} else {
68+
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
69+
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
70+
}
71+
return value;
72+
}
73+
74+
@Override
75+
public void invalidate(K key) {
76+
cache.invalidate(key);
77+
}
78+
79+
@Override
80+
public void invalidateAll() {
81+
cache.invalidateAll();
82+
}
83+
84+
@Override
85+
public Iterable<K> keys() {
86+
return cache.keys();
87+
}
88+
89+
@Override
90+
public long count() {
91+
return cache.count();
92+
}
93+
94+
@Override
95+
public void refresh() {
96+
cache.refresh();
97+
}
98+
99+
@Override
100+
public CacheStoreType getTierType() {
101+
return CacheStoreType.ON_HEAP;
102+
}
103+
104+
@Override
105+
public void onRemoval(RemovalNotification<K, V> notification) {
106+
eventListener.onRemoval(
107+
new StoreAwareCacheRemovalNotification<>(
108+
notification.getKey(),
109+
notification.getValue(),
110+
notification.getRemovalReason(),
111+
CacheStoreType.ON_HEAP
112+
)
113+
);
114+
}
115+
116+
/**
117+
* Builder object
118+
* @param <K> Type of key
119+
* @param <V> Type of value
120+
*/
121+
public static class Builder<K, V> extends StoreAwareCacheBuilder<K, V> {
122+
123+
@Override
124+
public StoreAwareCache<K, V> build() {
125+
return new OpenSearchOnHeapCache<K, V>(this);
126+
}
127+
}
128+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.store;
10+
11+
import org.opensearch.common.cache.ICache;
12+
import org.opensearch.common.cache.store.enums.CacheStoreType;
13+
14+
/**
15+
* Represents a cache with a specific type of store like onHeap, disk etc.
16+
* @param <K> Type of key.
17+
* @param <V> Type of value.
18+
*
19+
* @opensearch.experimental
20+
*/
21+
public interface StoreAwareCache<K, V> extends ICache<K, V> {
22+
CacheStoreType getTierType();
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.store;
10+
11+
import org.opensearch.common.cache.RemovalNotification;
12+
import org.opensearch.common.cache.RemovalReason;
13+
import org.opensearch.common.cache.store.enums.CacheStoreType;
14+
15+
/**
16+
* Removal notification for store aware cache.
17+
* @param <K> Type of key.
18+
* @param <V> Type of value.
19+
*
20+
* @opensearch.internal
21+
*/
22+
public class StoreAwareCacheRemovalNotification<K, V> extends RemovalNotification<K, V> {
23+
private final CacheStoreType cacheStoreType;
24+
25+
public StoreAwareCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) {
26+
super(key, value, removalReason);
27+
this.cacheStoreType = cacheStoreType;
28+
}
29+
30+
public CacheStoreType getCacheStoreType() {
31+
return cacheStoreType;
32+
}
33+
}

0 commit comments

Comments
 (0)