Skip to content

Commit a5d1b79

Browse files
authored
Merge pull request eclipse-ditto#1904 from eclipse-ditto/bugfix/consistency-signal-enrichment
eclipse-ditto#1893 fix ensuring the consistency when doing signal enrichment
2 parents de3dc85 + 1cd5c8f commit a5d1b79

File tree

6 files changed

+138
-91
lines changed

6 files changed

+138
-91
lines changed

internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/CachingSignalEnrichmentFacade.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ public interface CachingSignalEnrichmentFacade extends SignalEnrichmentFacade {
3838
*
3939
* @param thingId the thing to retrieve.
4040
* @param events received thing events to reduce traffic. If there are no events, a fresh entry is retrieved.
41-
* @param minAcceptableSeqNr the minimum sequence number acceptable as result. If negative,
41+
* @param atRevisionNumber the revision/sequence number to retrieve the thing at. If negative,
4242
* cache loading is forced.
4343
* @return future of the retrieved thing.
4444
*/
45-
CompletionStage<JsonObject> retrieveThing(ThingId thingId, List<ThingEvent<?>> events, long minAcceptableSeqNr);
45+
CompletionStage<JsonObject> retrieveThing(ThingId thingId, List<ThingEvent<?>> events, long atRevisionNumber);
4646

4747
default JsonObject applyJsonFieldSelector(final JsonObject jsonObject,
4848
@Nullable final JsonFieldSelector fieldSelector) {

internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java

+56-31
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Collection;
1818
import java.util.List;
1919
import java.util.Optional;
20+
import java.util.UUID;
2021
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.CompletionStage;
2223
import java.util.concurrent.Executor;
@@ -25,7 +26,9 @@
2526
import javax.annotation.Nullable;
2627

2728
import org.eclipse.ditto.base.model.entity.id.EntityId;
29+
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
2830
import org.eclipse.ditto.base.model.headers.DittoHeaders;
31+
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
2932
import org.eclipse.ditto.base.model.signals.Signal;
3033
import org.eclipse.ditto.base.model.signals.WithResource;
3134
import org.eclipse.ditto.internal.utils.cache.Cache;
@@ -54,7 +57,8 @@
5457
*/
5558
public class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {
5659

57-
private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
60+
private static final ThreadSafeDittoLogger LOGGER =
61+
DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
5862
private static final String CACHE_NAME_SUFFIX = "_signal_enrichment_cache";
5963

6064
protected final Cache<SignalEnrichmentCacheKey, JsonObject> extraFieldsCache;
@@ -93,23 +97,45 @@ public static DittoCachingSignalEnrichmentFacade newInstance(final SignalEnrichm
9397

9498
@Override
9599
public CompletionStage<JsonObject> retrieveThing(final ThingId thingId, final List<ThingEvent<?>> events,
96-
final long minAcceptableSeqNr) {
100+
final long atRevisionNumber) {
97101

98-
final DittoHeaders dittoHeaders = DittoHeaders.empty();
102+
final DittoHeaders dittoHeadersNotAddedToCacheKey =
103+
buildDittoHeadersNotAddedToCacheKey(events, atRevisionNumber);
99104

100105
final JsonFieldSelector fieldSelector = determineSelector(thingId.getNamespace());
101106

102-
if (minAcceptableSeqNr < 0) {
103-
final var cacheKey =
104-
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, fieldSelector));
107+
if (atRevisionNumber < 0) {
108+
final var cacheKey = SignalEnrichmentCacheKey.of(
109+
thingId,
110+
SignalEnrichmentContext.of(DittoHeaders.empty(), dittoHeadersNotAddedToCacheKey, fieldSelector));
105111
extraFieldsCache.invalidate(cacheKey);
106-
return doCacheLookup(cacheKey, dittoHeaders);
112+
return doCacheLookup(cacheKey, dittoHeadersNotAddedToCacheKey);
107113
} else {
108114
final var cachingParameters =
109-
new CachingParameters(fieldSelector, events, false, minAcceptableSeqNr);
115+
new CachingParameters(fieldSelector, events, false, atRevisionNumber);
110116

111-
return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters);
117+
return doRetrievePartialThing(thingId, DittoHeaders.empty(), dittoHeadersNotAddedToCacheKey,
118+
cachingParameters);
119+
}
120+
}
121+
122+
private static DittoHeaders buildDittoHeadersNotAddedToCacheKey(final List<ThingEvent<?>> events,
123+
final long atRevisionNumber) {
124+
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = DittoHeaders.newBuilder();
125+
if (!events.isEmpty()) {
126+
dittoHeadersBuilder.correlationId(
127+
events.get(events.size() - 1)
128+
.getDittoHeaders()
129+
.getCorrelationId()
130+
.orElseGet(() -> UUID.randomUUID().toString())
131+
+ "-enrichment"
132+
);
112133
}
134+
if (atRevisionNumber > 0) {
135+
dittoHeadersBuilder
136+
.putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(atRevisionNumber));
137+
}
138+
return dittoHeadersBuilder.build();
113139
}
114140

115141
@Override
@@ -125,7 +151,7 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
125151
final var cachingParameters =
126152
new CachingParameters(jsonFieldSelector, thingEvents, true, 0);
127153

128-
return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
154+
return doRetrievePartialThing(thingId, dittoHeaders, null, cachingParameters)
129155
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
130156
}
131157

@@ -140,7 +166,7 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
140166
* @param minAcceptableSeqNr minimum sequence number of the concerned signals to not invalidate the cache.
141167
* @return future that completes with the parts of a thing or fails with an error.
142168
*/
143-
@SuppressWarnings("java:S1612")
169+
@SuppressWarnings({"java:S1612", "unused"})
144170
public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
145171
final JsonFieldSelector jsonFieldSelector,
146172
final DittoHeaders dittoHeaders,
@@ -156,19 +182,22 @@ public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
156182
final var cachingParameters =
157183
new CachingParameters(jsonFieldSelector, thingEvents, true, minAcceptableSeqNr);
158184

159-
return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
185+
return doRetrievePartialThing(thingId, dittoHeaders, null, cachingParameters)
160186
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
161187
}
162188

163189
protected CompletionStage<JsonObject> doRetrievePartialThing(final EntityId thingId,
164-
final DittoHeaders dittoHeaders,
165-
final CachingParameters cachingParameters) {
190+
final DittoHeaders dittoHeaders,
191+
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
192+
final CachingParameters cachingParameters) {
166193

167194
final var fieldSelector = cachingParameters.fieldSelector;
168195
final JsonFieldSelector enhancedFieldSelector = enhanceFieldSelectorWithRevision(fieldSelector);
169196

170-
final var idWithResourceType =
171-
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, enhancedFieldSelector));
197+
final var idWithResourceType = SignalEnrichmentCacheKey.of(
198+
thingId,
199+
SignalEnrichmentContext.of(dittoHeaders, dittoHeadersNotAddedToCacheKey, enhancedFieldSelector)
200+
);
172201

173202
final var cachingParametersWithEnhancedFieldSelector = new CachingParameters(enhancedFieldSelector,
174203
cachingParameters.concernedEvents,
@@ -282,7 +311,7 @@ private static DittoHeaders getLastDittoHeaders(final List<? extends Signal<?>>
282311
}
283312

284313
protected CompletableFuture<JsonObject> doCacheLookup(final SignalEnrichmentCacheKey cacheKey,
285-
final DittoHeaders dittoHeaders) {
314+
final DittoHeaders dittoHeaders) {
286315
LOGGER.withCorrelationId(dittoHeaders).debug("Looking up cache entry for <{}>", cacheKey);
287316

288317
return extraFieldsCache.get(cacheKey)
@@ -343,16 +372,11 @@ private CompletionStage<JsonObject> handleNextExpectedThingEvents(final SignalEn
343372
JsonObject jsonObject = cachedJsonObject;
344373
for (final ThingEvent<?> thingEvent : concernedSignals) {
345374

346-
switch (thingEvent.getCommandCategory()) {
347-
case MERGE:
348-
jsonObject = getMergeJsonObject(jsonObject, thingEvent);
349-
break;
350-
case DELETE:
351-
jsonObject = getDeleteJsonObject(jsonObject, thingEvent);
352-
break;
353-
default:
354-
jsonObject = getDefaultJsonObject(jsonObject, thingEvent);
355-
}
375+
jsonObject = switch (thingEvent.getCommandCategory()) {
376+
case MERGE -> getMergeJsonObject(jsonObject, thingEvent);
377+
case DELETE -> getDeleteJsonObject(jsonObject, thingEvent);
378+
default -> getDefaultJsonObject(jsonObject, thingEvent);
379+
};
356380
// invalidate cache on policy change if the flag is set
357381
if (cachingParameters.invalidateCacheOnPolicyChange) {
358382
final var optionalCompletionStage =
@@ -392,7 +416,8 @@ private static JsonObject getDeleteJsonObject(final JsonObject jsonObject, final
392416
} else if (resourcePath.isEmpty()) {
393417
result = JsonObject.empty();
394418
} else {
395-
result = jsonObject.remove(resourcePath).remove(Thing.JsonFields.METADATA.getPointer().append(resourcePath));
419+
result =
420+
jsonObject.remove(resourcePath).remove(Thing.JsonFields.METADATA.getPointer().append(resourcePath));
396421
}
397422

398423
return result;
@@ -463,9 +488,9 @@ protected static final class CachingParameters {
463488
private final long minAcceptableSeqNr;
464489

465490
public CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
466-
final List<ThingEvent<?>> concernedEvents,
467-
final boolean invalidateCacheOnPolicyChange,
468-
final long minAcceptableSeqNr) {
491+
final List<ThingEvent<?>> concernedEvents,
492+
final boolean invalidateCacheOnPolicyChange,
493+
final long minAcceptableSeqNr) {
469494

470495
this.fieldSelector = fieldSelector;
471496
this.concernedEvents = concernedEvents;

internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/SignalEnrichmentCacheKey.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* Implementation for a {@link CacheKey} used in scope of signal enrichment.
2828
*/
2929
@Immutable
30-
final class SignalEnrichmentCacheKey implements CacheKey<SignalEnrichmentContext> {
30+
public final class SignalEnrichmentCacheKey implements CacheKey<SignalEnrichmentContext> {
3131

3232
static final String DELIMITER = ":";
3333

@@ -63,8 +63,7 @@ public Optional<SignalEnrichmentContext> getCacheLookupContext() {
6363

6464
@Override
6565
public boolean equals(@Nullable final Object o) {
66-
if (o instanceof SignalEnrichmentCacheKey) {
67-
final var that = (SignalEnrichmentCacheKey) o;
66+
if (o instanceof SignalEnrichmentCacheKey that) {
6867
return isIdEqualValueBased(that) && Objects.equals(context, that.context);
6968
} else {
7069
return false;

internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/SignalEnrichmentCacheLoader.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ public CompletableFuture<JsonObject> asyncLoad(final SignalEnrichmentCacheKey ke
5656
final ThingId thingId = ThingId.of(key.getId());
5757
final JsonFieldSelector jsonFieldSelector = selectorOptional.orElse(null);
5858
final DittoHeaders dittoHeaders = context.getDittoHeaders();
59-
return facade.retrievePartialThing(thingId, jsonFieldSelector, dittoHeaders, null)
59+
final DittoHeaders retrieveHeaders = context.getDittoHeadersNotAddedToCacheKey()
60+
.map(extraHeaders -> (DittoHeaders) dittoHeaders.toBuilder().putHeaders(extraHeaders).build())
61+
.orElse(dittoHeaders);
62+
return facade.retrievePartialThing(thingId, jsonFieldSelector, retrieveHeaders, null)
6063
.toCompletableFuture();
6164
} else {
6265
// no context; nothing to load.

internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/SignalEnrichmentContext.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,36 @@
2929
* signal enrichment caching.
3030
*/
3131
@Immutable
32-
final class SignalEnrichmentContext implements CacheLookupContext {
32+
public final class SignalEnrichmentContext implements CacheLookupContext {
3333

3434
private final DittoHeaders dittoHeaders;
35+
@Nullable private final DittoHeaders dittoHeadersNotAddedToCacheKey;
3536
@Nullable private final JsonFieldSelector jsonFieldSelector;
3637

3738
private SignalEnrichmentContext(final DittoHeaders dittoHeaders,
39+
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
3840
@Nullable final JsonFieldSelector jsonFieldSelector) {
3941
this.dittoHeaders = checkNotNull(dittoHeaders, "dittoHeaders");
42+
this.dittoHeadersNotAddedToCacheKey = dittoHeadersNotAddedToCacheKey;
4043
this.jsonFieldSelector = jsonFieldSelector;
4144
}
4245

4346
/**
44-
* Creates a new SignalEnrichmentContext from the passed optional {@code dittoHeaders} and {@code jsonFieldSelector}
45-
* retaining the for caching relevant {@code dittoHeaders} from the passed ones.
47+
* Creates a new SignalEnrichmentContext from the passed optional {@code dittoHeaders},
48+
* {@code dittoHeadersNotAddedToCacheKey} and {@code jsonFieldSelector}
49+
* retaining the for caching relevant {@code dittoHeaders} from the passed ones, but not adding the
50+
* {@code dittoHeadersNotAddedToCacheKey} to hashCode/equals, ignoring those for caching.
4651
*
4752
* @param dittoHeaders the DittoHeaders to use as key in the cache lookup context.
53+
* @param dittoHeadersNotAddedToCacheKey the DittoHeaders to additionally use, but not include in the cache key.
4854
* @param jsonFieldSelector the JsonFieldSelector to use in the cache lookup context.
4955
* @return the created context.
5056
*/
5157
static SignalEnrichmentContext of(final DittoHeaders dittoHeaders,
58+
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
5259
@Nullable final JsonFieldSelector jsonFieldSelector) {
5360

54-
return new SignalEnrichmentContext(dittoHeaders, jsonFieldSelector);
61+
return new SignalEnrichmentContext(dittoHeaders, dittoHeadersNotAddedToCacheKey, jsonFieldSelector);
5562
}
5663

5764
/**
@@ -63,6 +70,14 @@ public DittoHeaders getDittoHeaders() {
6370
return dittoHeaders;
6471
}
6572

73+
/**
74+
* @return the additional DittoHeaders to use, e.g. for looking up the cache entry, but which are not part of the
75+
* cache key.
76+
*/
77+
public Optional<DittoHeaders> getDittoHeadersNotAddedToCacheKey() {
78+
return Optional.ofNullable(dittoHeadersNotAddedToCacheKey);
79+
}
80+
6681
/**
6782
* Returns the optional JsonFieldSelector this context provides.
6883
*
@@ -94,6 +109,7 @@ public int hashCode() {
94109
public String toString() {
95110
return getClass().getSimpleName() + " [" +
96111
"dittoHeaders=" + dittoHeaders +
112+
", dittoHeadersNotAddedToCacheKey=" + dittoHeadersNotAddedToCacheKey +
97113
", jsonFieldSelector=" + jsonFieldSelector +
98114
"]";
99115
}

0 commit comments

Comments
 (0)