diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java index 5ccc5894a1..01e3ffb7c8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java @@ -16,9 +16,15 @@ package io.javaoperatorsdk.operator.api.reconciler; import java.util.List; +import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.HasMetadata; public interface IndexedResourceCache extends ResourceCache { + List byIndex(String indexName, String indexKey); + + default Stream byIndexStream(String indexName, String indexKey) { + return byIndex(indexName, indexKey).stream(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java index dd2844d9f8..672b48e540 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java @@ -24,9 +24,22 @@ @SuppressWarnings("unchecked") public interface ResourceCache extends Cache { + /** + * Lists all resources in the given namespace. + * + * @param namespace the namespace to list resources from + * @return a stream of all cached resources in the namespace + */ default Stream list(String namespace) { return list(namespace, TRUE); } + /** + * Lists resources in the given namespace that match the provided predicate. + * + * @param namespace the namespace to list resources from + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(String namespace, Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java index de4d00d717..b9ef475509 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java @@ -37,7 +37,8 @@ /** * Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an * idiomatic way, in particular to make sure that the latest version of the resource is present in - * the caches for the next reconciliation. + * the caches for the next reconciliation. In other words, it provides read-cache-after-write + * consistency. * * @param

the resource type on which this object operates */ diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java index ffcfd2df58..b2c2d2692d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java @@ -25,17 +25,45 @@ public interface Cache { Predicate TRUE = (a) -> true; + /** + * Retrieves a resource from the cache by its {@link ResourceID}. + * + * @param resourceID the identifier of the resource + * @return an Optional containing the resource if present in the cache + */ Optional get(ResourceID resourceID); + /** + * Checks whether a resource with the given {@link ResourceID} exists in the cache. + * + * @param resourceID the identifier of the resource + * @return {@code true} if the resource is present in the cache + */ default boolean contains(ResourceID resourceID) { return get(resourceID).isPresent(); } + /** + * Returns a stream of all {@link ResourceID}s currently in the cache. + * + * @return a stream of resource identifiers + */ Stream keys(); + /** + * Lists all resources in the cache. + * + * @return a stream of all cached resources + */ default Stream list() { return list(TRUE); } + /** + * Lists resources in the cache that match the provided predicate. + * + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 6632ce631e..a0b7938302 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -107,10 +107,6 @@ private void initSources() { } } - C configuration() { - return configuration; - } - public void changeNamespaces(Set namespaces) { var sourcesToRemove = sources.keySet().stream().filter(k -> !namespaces.contains(k)).collect(Collectors.toSet()); @@ -256,12 +252,14 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + @Override + public Stream byIndexStream(String indexName, String indexKey) { + return sources.values().stream().flatMap(s -> s.byIndexStream(indexName, indexKey)); + } + @Override public List byIndex(String indexName, String indexKey) { - return sources.values().stream() - .map(s -> s.byIndex(indexName, indexKey)) - .flatMap(List::stream) - .collect(Collectors.toList()); + return byIndexStream(indexName, indexKey).toList(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 3b9898b133..f021101229 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -18,11 +18,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -111,7 +113,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< res.ifPresentOrElse( r -> { R latestResource = (R) r.getResource().orElseThrow(); - // as previous resource version we use the one from successful update, since // we process new event here only if that is more recent then the event from our update. // Note that this is equivalent with the scenario when an informer watch connection @@ -219,11 +220,6 @@ public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); } - @Override - public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); - } - void setTemporalResourceCache(TemporaryResourceCache temporaryResourceCache) { this.temporaryResourceCache = temporaryResourceCache; } @@ -236,19 +232,163 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override - public List byIndex(String indexName, String indexKey) { - return manager().byIndex(indexName, indexKey); + public Stream list(String namespace, Predicate predicate) { + return mergeWithTempCacheForList(manager().list(namespace), namespace, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override - public Stream keys() { - return cache.keys(); + public Stream list(Predicate predicate) { + return mergeWithTempCacheForList(manager().list(), null, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override - public Stream list(Predicate predicate) { - return cache.list(predicate); + public Stream byIndexStream(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey); + } + + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ + @Override + public List byIndex(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey) + .collect(Collectors.toList()); + } + + // namespace is filtered on informer manager level + private Stream mergeWithTempCacheForList( + Stream stream, String namespace, Predicate predicate) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + + return stream.filter(filterResourceByPredicate(predicate)); + } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + if (tempResources.isEmpty()) { + return stream.filter(filterResourceByPredicate(predicate)); + } + + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + return tempResource; + } + return r; + }) + // we filter on predicate only since namespace changes would not be detected anyway. + .filter(filterResourceByPredicate(predicate)) + .toList(); + + return Stream.concat( + tempResources.values().stream() + .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)), + upToDateList.stream()); + } + + private Stream mergeWithTempCacheForIndex( + Stream stream, String indexName, String indexKey) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return stream; + } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + if (tempResources.isEmpty()) { + return stream; + } + + var indexer = indexers.get(indexName); + if (indexer == null) { + throw new IllegalArgumentException("Indexer not found for: " + indexName); + } + + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + if (!indexer.apply(tempResource).contains(indexKey)) { + return null; + } + return tempResource; + } + return r; + }) + .filter(Objects::nonNull) + .toList(); + + // remaining temp resources are ghost resources — include only those matching the index + return Stream.concat( + tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)), + upToDateList.stream()); + } + + private static Predicate filterResourceByPredicate( + Predicate predicate) { + return filterResourceByNamespaceAndPredicate(null, predicate); + } + + private static Predicate filterResourceByNamespaceAndPredicate( + String namespace, Predicate predicate) { + return r -> { + if (namespace != null) { + if (!Optional.of(r) + .map(rr -> Objects.equals(namespace, rr.getMetadata().getNamespace())) + .orElse(false)) { + return false; + } + } + if (predicate != null) { + return predicate.test(r); + } + return true; + }; + } + + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Keys from the temporary resource + * cache (ghost resources) are included in the result. + */ + @Override + public Stream keys() { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return manager().keys(); + } + var managerKeys = manager().keys().collect(Collectors.toSet()); + var tempKeys = temporaryResourceCache.getResources().keySet(); + return Stream.concat( + managerKeys.stream(), tempKeys.stream().filter(k -> !managerKeys.contains(k))); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index d5a39f785b..405f52cc8d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -265,4 +266,12 @@ public void checkGhostResources() { public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + synchronized boolean isEmpty() { + return cache.isEmpty(); + } + + synchronized Map getResources() { + return Collections.unmodifiableMap(cache); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 87119c86c6..fe78bd3147 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -16,9 +16,13 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,6 +42,7 @@ import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; @@ -53,6 +58,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -84,8 +90,11 @@ void setup() { when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any())) .thenReturn(Set.of(ResourceID.fromResource(testDeployment()))); when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig); - when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); + when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class); + when(informerConfig.isComparableResourceVersions()).thenReturn(true); + when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); + informerEventSource = spy( new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { @@ -512,6 +521,222 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); } + @Test + void listReplacesResourceFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, Cache.TRUE).toList(); + + assertThat(result).containsExactly(newer); + } + + @Test + void listExcludesResourceWhenTempCacheContainsNewerVersionThatNoLongerMatchesPredicate() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("4"); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = + informerEventSource + .list(null, r -> !"5".equals(r.getMetadata().getResourceVersion())) + .toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenNotInTempCache() { + var original = testDeployment(); + + when(temporaryResourceCache.getResources()).thenReturn(new HashMap<>()); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void listReplacesOnlyMatchingResources() { + var dep1 = testDeployment(); + var dep2 = testDeployment(); + dep2.getMetadata().setName("other"); + var newerDep1 = testDeployment(); + newerDep1.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(dep1), newerDep1))); + + var informerManager = mock(InformerManager.class); + when(informerManager.list(nullable(String.class))).thenReturn(Stream.of(dep1, dep2)); + when(informerEventSource.manager()).thenReturn(informerManager); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); + } + + @Test + void byIndexStreamReplacesFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(informerManager); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).containsExactly(newer); + } + + @Test + void byIndexStreamSkipsNewerTempCacheResourceWhenIndexedValueChanged() { + var original = testDeployment(); + original.getMetadata().setLabels(Map.of("app", "key")); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + newer.getMetadata().setLabels(Map.of("app", "other")); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(informerManager); + informerEventSource.addIndexers( + Map.of("idx", d -> List.of(d.getMetadata().getLabels().get("app")))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); + + var mim = mock(InformerManager.class); + when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void listAddsGhostResources() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(ghostResource), ghostResource))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(resource)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); + } + + @Test + void keysIncludesGhostResourceKeys() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + var resourceId = ResourceID.fromResource(resource); + var ghostResourceId = ResourceID.fromResource(ghostResource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(ghostResourceId, ghostResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(ghostResourceId)).thenReturn(false); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactlyInAnyOrder(resourceId, ghostResourceId); + } + + @Test + void keysDoesNotDuplicateExistingKeys() { + var resource = testDeployment(); + var newerResource = testDeployment(); + newerResource.getMetadata().setResourceVersion("5"); + + var resourceId = ResourceID.fromResource(resource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(resourceId, newerResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(resourceId)).thenReturn(true); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactly(resourceId); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java index 0e9a953129..c8fc206106 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java @@ -36,35 +36,72 @@ public class CachingFilteringUpdateReconciler implements Reconciler { + public static final String RESOURCE_VERSION_INDEX = "resourceVersionIndex"; private final AtomicBoolean issueFound = new AtomicBoolean(false); + private InformerEventSource configMapEventSource; + @Override public UpdateControl reconcile( CachingFilteringUpdateCustomResource resource, Context context) { + try { + var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 1)); + var cachedCM = context.getSecondaryResource(ConfigMap.class); + if (cachedCM.isEmpty()) { + throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource)); + } + checkListContainsCM(updated); + checkIfResourceVersionIndexContainsUpdated(updated); + updated = context.resourceOperations().serverSideApply(prepareCM(resource, 2)); + cachedCM = context.getSecondaryResource(ConfigMap.class); + if (!cachedCM + .orElseThrow() + .getMetadata() + .getResourceVersion() + .equals(updated.getMetadata().getResourceVersion())) { + throw new IllegalStateException( + "Update error for resource: " + ResourceID.fromResource(resource)); + } + checkListContainsCM(updated); + checkIfResourceVersionIndexContainsUpdated(updated); - context.resourceOperations().serverSideApply(prepareCM(resource, 1)); - var cachedCM = context.getSecondaryResource(ConfigMap.class); - if (cachedCM.isEmpty()) { + ensureStatusExists(resource); + resource.getStatus().setUpdated(true); + return UpdateControl.patchStatus(resource); + } catch (IllegalStateException e) { issueFound.set(true); - throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource)); + throw e; } + } - var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 2)); - cachedCM = context.getSecondaryResource(ConfigMap.class); - if (!cachedCM - .orElseThrow() - .getMetadata() - .getResourceVersion() - .equals(updated.getMetadata().getResourceVersion())) { - issueFound.set(true); + private void checkIfResourceVersionIndexContainsUpdated(ConfigMap updated) { + if (configMapEventSource + .byIndex(RESOURCE_VERSION_INDEX, updated.getMetadata().getResourceVersion()) + .stream() + .noneMatch( + r -> + ResourceID.fromResource(r).equals(ResourceID.fromResource(updated)) + && r.getMetadata() + .getResourceVersion() + .equals(updated.getMetadata().getResourceVersion()))) { throw new IllegalStateException( - "Update error for resource: " + ResourceID.fromResource(resource)); + "Index does not contain resource: " + ResourceID.fromResource(updated)); } + } - ensureStatusExists(resource); - resource.getStatus().setUpdated(true); - return UpdateControl.patchStatus(resource); + private void checkListContainsCM(ConfigMap updated) { + if (configMapEventSource + .list() + .noneMatch( + r -> + ResourceID.fromResource(r).equals(ResourceID.fromResource(updated)) + && r.getMetadata() + .getResourceVersion() + .equals(updated.getMetadata().getResourceVersion()))) { + throw new IllegalStateException( + "List does not contain resource: " + ResourceID.fromResource(updated)); + } } private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p, int num) { @@ -84,13 +121,15 @@ private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p, int n @Override public List> prepareEventSources( EventSourceContext context) { - InformerEventSource cmES = + configMapEventSource = new InformerEventSource<>( InformerEventSourceConfiguration.from( ConfigMap.class, CachingFilteringUpdateCustomResource.class) .build(), context); - return List.of(cmES); + configMapEventSource.addIndexers( + Map.of(RESOURCE_VERSION_INDEX, cm -> List.of(cm.getMetadata().getResourceVersion()))); + return List.of(configMapEventSource); } private void ensureStatusExists(CachingFilteringUpdateCustomResource resource) {