-
Notifications
You must be signed in to change notification settings - Fork 3.9k
xds: float LRU cache across interceptors #11992
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
54d5db3
408a85e
661ddfa
6e0abb1
244ac4e
954023a
34558dd
b0632fc
2f85328
a013542
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,15 +59,17 @@ | |
|
||
static final String TYPE_URL = | ||
"type.googleapis.com/envoy.extensions.filters.http.gcp_authn.v3.GcpAuthnFilterConfig"; | ||
|
||
private final LruCache<String, CallCredentials> callCredentialsCache; | ||
final String filterInstanceName; | ||
|
||
GcpAuthenticationFilter(String name) { | ||
GcpAuthenticationFilter(String name, int cacheSize) { | ||
filterInstanceName = checkNotNull(name, "name"); | ||
this.callCredentialsCache = new LruCache<>(cacheSize); | ||
} | ||
|
||
|
||
static final class Provider implements Filter.Provider { | ||
private final int cacheSize = 10; | ||
|
||
@Override | ||
public String[] typeUrls() { | ||
return new String[]{TYPE_URL}; | ||
|
@@ -80,7 +82,7 @@ | |
|
||
@Override | ||
public GcpAuthenticationFilter newInstance(String name) { | ||
return new GcpAuthenticationFilter(name); | ||
return new GcpAuthenticationFilter(name, cacheSize); | ||
} | ||
|
||
@Override | ||
|
@@ -101,11 +103,14 @@ | |
// Validate cache_config | ||
if (gcpAuthnProto.hasCacheConfig()) { | ||
TokenCacheConfig cacheConfig = gcpAuthnProto.getCacheConfig(); | ||
cacheSize = cacheConfig.getCacheSize().getValue(); | ||
if (cacheSize == 0) { | ||
return ConfigOrError.fromError( | ||
"cache_config.cache_size must be greater than zero"); | ||
if (cacheConfig.hasCacheSize()) { | ||
cacheSize = cacheConfig.getCacheSize().getValue(); | ||
if (cacheSize == 0) { | ||
return ConfigOrError.fromError( | ||
"cache_config.cache_size must be greater than zero"); | ||
} | ||
} | ||
|
||
// LruCache's size is an int and briefly exceeds its maximum size before evicting entries | ||
cacheSize = UnsignedLongs.min(cacheSize, Integer.MAX_VALUE - 1); | ||
} | ||
|
@@ -127,8 +132,9 @@ | |
@Nullable FilterConfig overrideConfig, ScheduledExecutorService scheduler) { | ||
|
||
ComputeEngineCredentials credentials = ComputeEngineCredentials.create(); | ||
LruCache<String, CallCredentials> callCredentialsCache = | ||
new LruCache<>(((GcpAuthenticationConfig) config).getCacheSize()); | ||
synchronized (callCredentialsCache) { | ||
callCredentialsCache.resizeCache(((GcpAuthenticationConfig) config).getCacheSize()); | ||
} | ||
return new ClientInterceptor() { | ||
@Override | ||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | ||
|
@@ -254,23 +260,37 @@ | |
|
||
private static final class LruCache<K, V> { | ||
|
||
private final Map<K, V> cache; | ||
private Map<K, V> cache; | ||
private int maxSize; | ||
|
||
LruCache(int maxSize) { | ||
this.cache = new LinkedHashMap<K, V>( | ||
maxSize, | ||
0.75f, | ||
true) { | ||
@Override | ||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { | ||
return size() > maxSize; | ||
} | ||
}; | ||
this.maxSize = maxSize; | ||
this.cache = createEvictingMap(maxSize); | ||
} | ||
|
||
V getOrInsert(K key, Function<K, V> create) { | ||
return cache.computeIfAbsent(key, create); | ||
} | ||
|
||
private void resizeCache(int newSize) { | ||
if (newSize >= maxSize) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't understand this. I thought if you get a bigger new size you should resize, and if you get an equal or smaller new size only that should be a no-op because decreasing cache size doesn't make since when another filter instance created a bigger cache. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with your point but well, cache_size is configuration coming externally. We should adhere to the configuration. If the cache size decreases, then we can't use the normal code to discard entries, as that only discards a single entry. So without special code, the configuration wouldn't be observed. It is generally really bad to not observe configuration updates, from a testing/debugging perspective. You do a config push and see behavior X, but then after restarting you see behavior Y. It can be really hard to track down the bugs. The cache size doesn't change often, so the performance hit would be temporary (although there are still issues with that). It's also much easier to figure out what happened. If you keep the old cache, then someone may be debugging something and it matters what happened a month ago. I discussed this with Eric offline on the similar lines and understood this POV. |
||
maxSize = newSize; | ||
return; | ||
} | ||
Map<K, V> newCache = createEvictingMap(newSize); | ||
maxSize = newSize; | ||
newCache.putAll(cache); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. createEvictingMap is creating an empty LinkedHashMap. There are separator constructors
but none that both take an old map and specify the initialCapacity. |
||
cache = newCache; | ||
} | ||
|
||
private Map<K, V> createEvictingMap(int size) { | ||
return new LinkedHashMap<K, V>(size, 0.75f, true) { | ||
@Override | ||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { | ||
return size() > LruCache.this.maxSize; | ||
} | ||
}; | ||
} | ||
} | ||
|
||
static class AudienceMetadataParser implements MetadataValueParser { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ | |
import static org.junit.Assert.assertTrue; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
|
@@ -89,8 +90,8 @@ public class GcpAuthenticationFilterTest { | |
|
||
@Test | ||
public void testNewFilterInstancesPerFilterName() { | ||
assertThat(new GcpAuthenticationFilter("FILTER_INSTANCE_NAME1")) | ||
.isNotEqualTo(new GcpAuthenticationFilter("FILTER_INSTANCE_NAME1")); | ||
assertThat(new GcpAuthenticationFilter("FILTER_INSTANCE_NAME1", 10)) | ||
.isNotEqualTo(new GcpAuthenticationFilter("FILTER_INSTANCE_NAME1", 10)); | ||
} | ||
|
||
@Test | ||
|
@@ -152,7 +153,7 @@ public void testClientInterceptor_success() throws IOException, ResourceInvalidE | |
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0") | ||
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig); | ||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = Mockito.mock(Channel.class); | ||
|
@@ -181,7 +182,7 @@ public void testClientInterceptor_createsAndReusesCachedCredentials() | |
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0") | ||
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig); | ||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = Mockito.mock(Channel.class); | ||
|
@@ -190,7 +191,7 @@ public void testClientInterceptor_createsAndReusesCachedCredentials() | |
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel); | ||
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel); | ||
|
||
verify(mockChannel, Mockito.times(2)) | ||
verify(mockChannel, times(2)) | ||
.newCall(eq(methodDescriptor), callOptionsCaptor.capture()); | ||
CallOptions firstCapturedOptions = callOptionsCaptor.getAllValues().get(0); | ||
CallOptions secondCapturedOptions = callOptionsCaptor.getAllValues().get(1); | ||
|
@@ -202,7 +203,7 @@ public void testClientInterceptor_createsAndReusesCachedCredentials() | |
@Test | ||
public void testClientInterceptor_withoutClusterSelectionKey() throws Exception { | ||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = mock(Channel.class); | ||
|
@@ -233,7 +234,7 @@ public void testClientInterceptor_clusterSelectionKeyWithoutPrefix() throws Exce | |
Channel mockChannel = mock(Channel.class); | ||
|
||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel); | ||
|
@@ -244,7 +245,7 @@ public void testClientInterceptor_clusterSelectionKeyWithoutPrefix() throws Exce | |
@Test | ||
public void testClientInterceptor_xdsConfigDoesNotExist() throws Exception { | ||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = mock(Channel.class); | ||
|
@@ -274,7 +275,7 @@ public void testClientInterceptor_incorrectClusterName() throws Exception { | |
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster") | ||
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig); | ||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = mock(Channel.class); | ||
|
@@ -300,7 +301,7 @@ public void testClientInterceptor_statusOrError() throws Exception { | |
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0") | ||
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig); | ||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = mock(Channel.class); | ||
|
@@ -329,7 +330,7 @@ public void testClientInterceptor_notAudienceWrapper() | |
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0") | ||
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig); | ||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME"); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 10); | ||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = Mockito.mock(Channel.class); | ||
|
@@ -342,6 +343,40 @@ public void testClientInterceptor_notAudienceWrapper() | |
assertThat(clientCall.error.getDescription()).contains("GCP Authn found wrong type"); | ||
} | ||
|
||
@Test | ||
public void testLruCacheAcrossInterceptors() throws IOException, ResourceInvalidException { | ||
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig( | ||
CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate))); | ||
XdsConfig defaultXdsConfig = new XdsConfig.XdsConfigBuilder() | ||
.setListener(ldsUpdate) | ||
.setRoute(rdsUpdate) | ||
.setVirtualHost(rdsUpdate.virtualHosts.get(0)) | ||
.addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig)).build(); | ||
CallOptions callOptionsWithXds = CallOptions.DEFAULT | ||
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0") | ||
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig); | ||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME", 2); | ||
ClientInterceptor interceptor1 | ||
= filter.buildClientInterceptor(new GcpAuthenticationConfig(2), null, null); | ||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod(); | ||
Channel mockChannel = Mockito.mock(Channel.class); | ||
ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(CallOptions.class); | ||
|
||
interceptor1.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel); | ||
verify(mockChannel).newCall(eq(methodDescriptor), callOptionsCaptor.capture()); | ||
CallOptions capturedOptions1 = callOptionsCaptor.getAllValues().get(0); | ||
assertNotNull(capturedOptions1.getCredentials()); | ||
ClientInterceptor interceptor2 | ||
= filter.buildClientInterceptor(new GcpAuthenticationConfig(1), null, null); | ||
interceptor2.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel); | ||
verify(mockChannel, times(2)) | ||
.newCall(eq(methodDescriptor), callOptionsCaptor.capture()); | ||
CallOptions capturedOptions2 = callOptionsCaptor.getAllValues().get(1); | ||
assertNotNull(capturedOptions2.getCredentials()); | ||
|
||
assertSame(capturedOptions1.getCredentials(), capturedOptions2.getCredentials()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cached value is tested but not eviction behavior. With cache max size of 1, you need to test using a cluster resource with a 2nd audience string and get the credentials and assert the behavior for the 1st audience string's call credentials getting evicted. filter max size 2 |
||
} | ||
|
||
private static LdsUpdate getLdsUpdate() { | ||
Filter.NamedFilterConfig routerFilterConfig = new Filter.NamedFilterConfig( | ||
serverName, RouterFilter.ROUTER_CONFIG); | ||
|
Uh oh!
There was an error while loading. Please reload this page.