Skip to content

fix: primary cache utils mechanism #2814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.function.UnaryOperator;

import org.slf4j.Logger;
Expand All @@ -25,6 +27,8 @@
public class PrimaryUpdateAndCacheUtils {

public static final int DEFAULT_MAX_RETRY = 10;
public static final int RESOURCE_CACHE_POLL_TIMEOUT = 10000;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shawkins do you know some real life data, if on how long it would take on larger cluster to an informer to receive the event? On small one is almost instant (if watch works), not sure how the cluster size influences this.

public static final int DEFAULT_SLEEP_FOR_CACHE_POLL_MILLIS = 30;

private PrimaryUpdateAndCacheUtils() {}

Expand Down Expand Up @@ -90,8 +94,8 @@ public static <P extends HasMetadata> P ssaPatchStatusAndCacheResource(
}

/**
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator,
* int)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}.
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator, int,
* long,long)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}.
*
* @param resourceToUpdate original resource to update
* @param context of reconciliation
Expand All @@ -106,7 +110,13 @@ public static <P extends HasMetadata> P updateAndCacheResource(
UnaryOperator<P> modificationFunction,
UnaryOperator<P> updateMethod) {
return updateAndCacheResource(
resourceToUpdate, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY);
resourceToUpdate,
context,
modificationFunction,
updateMethod,
DEFAULT_MAX_RETRY,
RESOURCE_CACHE_POLL_TIMEOUT,
DEFAULT_SLEEP_FOR_CACHE_POLL_MILLIS);
}

/**
Expand All @@ -133,7 +143,9 @@ public static <P extends HasMetadata> P updateAndCacheResource(
Context<P> context,
UnaryOperator<P> modificationFunction,
UnaryOperator<P> updateMethod,
int maxRetry) {
int maxRetry,
long cachePollTimeoutMillis,
long pollDelayMillis) {

if (log.isDebugEnabled()) {
log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate));
Expand Down Expand Up @@ -180,14 +192,35 @@ public static <P extends HasMetadata> P updateAndCacheResource(
resourceToUpdate.getMetadata().getNamespace(),
e.getCode());
resourceToUpdate =
(P)
context
.getClient()
.resources(resourceToUpdate.getClass())
.inNamespace(resourceToUpdate.getMetadata().getNamespace())
.withName(resourceToUpdate.getMetadata().getName())
.get();
pollLocalCache(context, resourceToUpdate, cachePollTimeoutMillis, pollDelayMillis);
}
}
}

private static <P extends HasMetadata> P pollLocalCache(
Context<P> context, P staleResource, long timeoutMillis, long pollDelayMillis) {
try {
var resourceId = ResourceID.fromResource(staleResource);
var startTime = LocalTime.now();
while (startTime.plus(timeoutMillis, ChronoUnit.MILLIS).isAfter(LocalTime.now())) {
log.debug("Polling cache for resource: {}", resourceId);
var cachedResource = context.getPrimaryCache().get(resourceId).orElseThrow();
if (!cachedResource
.getMetadata()
.getResourceVersion()
.equals(staleResource.getMetadata().getResourceVersion())) {
return context
.getControllerConfiguration()
.getConfigurationService()
.getResourceCloner()
.clone(cachedResource);
}
Thread.sleep(pollDelayMillis);
}
throw new OperatorException("Timeout of resource polling from cache for resource");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OperatorException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
}
var resourceId = ResourceID.fromResource(newResource);
var cachedResource =
getResourceFromCache(resourceId)
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);

boolean moveAhead = false;
if (previousResourceVersion == null && cachedResource == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Optional;
import java.util.function.UnaryOperator;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.TestUtils;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
Expand All @@ -29,6 +35,7 @@ class PrimaryUpdateAndCacheUtilsTest {
Context<TestCustomResource> context = mock(Context.class);
KubernetesClient client = mock(KubernetesClient.class);
Resource resource = mock(Resource.class);
IndexedResourceCache<TestCustomResource> primaryCache = mock(IndexedResourceCache.class);

@BeforeEach
void setup() {
Expand All @@ -41,6 +48,20 @@ void setup() {
when(mixedOp.inNamespace(any())).thenReturn(mixedOp);
when(mixedOp.withName(any())).thenReturn(resource);
when(resource.get()).thenReturn(TestUtils.testCustomResource1());
when(context.getPrimaryCache()).thenReturn(primaryCache);

var controllerConfiguration = mock(ControllerConfiguration.class);
when(context.getControllerConfiguration()).thenReturn(controllerConfiguration);
var configService = mock(ConfigurationService.class);
when(controllerConfiguration.getConfigurationService()).thenReturn(configService);
when(configService.getResourceCloner())
.thenReturn(
new Cloner() {
@Override
public <R extends HasMetadata> R clone(R object) {
return new KubernetesSerialization().clone(object);
}
});
}

@Test
Expand Down Expand Up @@ -76,6 +97,10 @@ void retriesConflicts() {
when(updateOperation.apply(any()))
.thenThrow(new KubernetesClientException("", 409, null))
.thenReturn(TestUtils.testCustomResource1());
var freshResource = TestUtils.testCustomResource1();

freshResource.getMetadata().setResourceVersion("2");
when(primaryCache.get(any())).thenReturn(Optional.of(freshResource));

var updated =
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
Expand All @@ -89,15 +114,21 @@ void retriesConflicts() {
updateOperation);

assertThat(updated).isNotNull();
verify(resource, times(1)).get();
verify(primaryCache, times(1)).get(any());
}

@Test
void throwsIfRetryExhausted() {
var updateOperation = mock(UnaryOperator.class);

when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
var stubbing = when(primaryCache.get(any()));

for (int i = 0; i < DEFAULT_MAX_RETRY; i++) {
var resource = TestUtils.testCustomResource1();
resource.getMetadata().setResourceVersion("" + i);
stubbing = stubbing.thenReturn(Optional.of(resource));
}
assertThrows(
OperatorException.class,
() ->
Expand All @@ -106,6 +137,28 @@ void throwsIfRetryExhausted() {
context,
UnaryOperator.identity(),
updateOperation));
verify(resource, times(DEFAULT_MAX_RETRY)).get();
verify(primaryCache, times(DEFAULT_MAX_RETRY)).get(any());
}

@Test
void cachePollTimeouts() {
var updateOperation = mock(UnaryOperator.class);

when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
when(primaryCache.get(any())).thenReturn(Optional.of(TestUtils.testCustomResource1()));

var ex =
assertThrows(
OperatorException.class,
() ->
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
TestUtils.testCustomResource1(),
context,
UnaryOperator.identity(),
updateOperation,
2,
50L,
10L));
assertThat(ex.getMessage()).contains("Timeout");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@
@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("spwl")
public class StatusPatchCacheWithLockCustomResource
extends CustomResource<StatusPatchCacheWithLockSpec, StatusPatchCacheWithLockStatus>
implements Namespaced {}
public class StatusPatchCacheCustomResource
extends CustomResource<StatusPatchCacheSpec, StatusPatchCacheStatus> implements Namespaced {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class StatusPatchCacheWithLockIT {
public class StatusPatchCacheIT {

public static final String TEST_1 = "test1";

@RegisterExtension
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withReconciler(StatusPatchCacheWithLockReconciler.class)
.withReconciler(StatusPatchCacheReconciler.class)
.build();

@Test
void testStatusAlwaysUpToDate() {
var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class);
var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class);

extension.create(testResource());

Expand All @@ -39,10 +39,10 @@ void testStatusAlwaysUpToDate() {
});
}

StatusPatchCacheWithLockCustomResource testResource() {
var res = new StatusPatchCacheWithLockCustomResource();
StatusPatchCacheCustomResource testResource() {
var res = new StatusPatchCacheCustomResource();
res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build());
res.setSpec(new StatusPatchCacheWithLockSpec());
res.setSpec(new StatusPatchCacheSpec());
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

@ControllerConfiguration
public class StatusPatchCacheWithLockReconciler
implements Reconciler<StatusPatchCacheWithLockCustomResource> {
public class StatusPatchCacheReconciler implements Reconciler<StatusPatchCacheCustomResource> {

public volatile int latestValue = 0;
public volatile boolean errorPresent = false;

@Override
public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
StatusPatchCacheWithLockCustomResource resource,
Context<StatusPatchCacheWithLockCustomResource> context) {
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {

if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) {
errorPresent = true;
Expand Down Expand Up @@ -50,22 +48,20 @@ public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
}

@Override
public List<EventSource<?, StatusPatchCacheWithLockCustomResource>> prepareEventSources(
EventSourceContext<StatusPatchCacheWithLockCustomResource> context) {
public List<EventSource<?, StatusPatchCacheCustomResource>> prepareEventSources(
EventSourceContext<StatusPatchCacheCustomResource> context) {
// periodic event triggering for testing purposes
return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache()));
}

private StatusPatchCacheWithLockCustomResource createFreshCopy(
StatusPatchCacheWithLockCustomResource resource) {
var res = new StatusPatchCacheWithLockCustomResource();
private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) {
var res = new StatusPatchCacheCustomResource();
res.setMetadata(
new ObjectMetaBuilder()
.withName(resource.getMetadata().getName())
.withNamespace(resource.getMetadata().getNamespace())
.build());
res.setStatus(new StatusPatchCacheWithLockStatus());

res.setStatus(new StatusPatchCacheStatus());
return res;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.javaoperatorsdk.operator.baseapi.statuscache;

public class StatusPatchCacheWithLockSpec {
public class StatusPatchCacheSpec {

private int counter = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.javaoperatorsdk.operator.baseapi.statuscache;

public class StatusPatchCacheWithLockStatus {
public class StatusPatchCacheStatus {

private Integer value = 0;

public Integer getValue() {
return value;
}

public StatusPatchCacheWithLockStatus setValue(Integer value) {
public StatusPatchCacheStatus setValue(Integer value) {
this.value = value;
return this;
}
Expand Down