Skip to content

Commit ffe8a82

Browse files
authored
Merge pull request eclipse-ditto#1872 from eclipse-ditto/bugfix/1869-resolving-imported-policies-in-ditto-search
eclipse-ditto#1869 use cache in order to load imported policies when policies are …
2 parents a8b6267 + 222de26 commit ffe8a82

File tree

25 files changed

+537
-254
lines changed

25 files changed

+537
-254
lines changed

internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/Cache.java

+11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.CompletableFuture;
1818
import java.util.concurrent.ConcurrentMap;
1919
import java.util.function.Function;
20+
import java.util.function.Predicate;
2021

2122
/**
2223
* A general purpose cache for items which are associated with a key.
@@ -75,6 +76,16 @@ public interface Cache<K, V> {
7576
*/
7677
boolean invalidate(K key);
7778

79+
/**
80+
* Invalidates the passed key from the cache if present and the passed {@code valueCondition} evaluates to
81+
* {@code true}.
82+
*
83+
* @param key the key to invalidate.
84+
* @param valueCondition the condition which has to pass in order to invalidate the key from the cache.
85+
* @return {@code true} if the entry was cached and is now invalidated, {@code false} otherwise.
86+
*/
87+
boolean invalidateConditionally(K key, Predicate<V> valueCondition);
88+
7889
/**
7990
* Associates the {@code value} with the {@code key} in this cache.
8091
* <p>

internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/CaffeineCache.java

+20
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.Executor;
2323
import java.util.function.BiFunction;
2424
import java.util.function.Function;
25+
import java.util.function.Predicate;
2526

2627
import javax.annotation.Nullable;
2728

@@ -230,6 +231,25 @@ public boolean invalidate(final K key) {
230231
return currentlyExisting;
231232
}
232233

234+
@Override
235+
public boolean invalidateConditionally(final K key, final Predicate<V> valueCondition) {
236+
requireNonNull(key);
237+
238+
V value = synchronousCacheView.getIfPresent(key);
239+
if (value != null) {
240+
synchronized (value) {
241+
value = synchronousCacheView.getIfPresent(key);
242+
if (value != null && valueCondition.test(value)) {
243+
return invalidate(key);
244+
} else {
245+
return false;
246+
}
247+
}
248+
} else {
249+
return false;
250+
}
251+
}
252+
233253
// optimized batch invalidation method for caffeine
234254
@Override
235255
public void invalidateAll(final Collection<K> keys) {

internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/ProjectedCache.java

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.ConcurrentHashMap;
1818
import java.util.concurrent.ConcurrentMap;
1919
import java.util.function.Function;
20+
import java.util.function.Predicate;
2021

2122
/**
2223
* A cache working on an embedded passed {@code cache} with values of type {@code <V>}, representing a projected cache
@@ -68,6 +69,11 @@ public boolean invalidate(final K key) {
6869
return cache.invalidate(key);
6970
}
7071

72+
@Override
73+
public boolean invalidateConditionally(final K key, final Predicate<U> valueCondition) {
74+
return cache.invalidateConditionally(key, value -> valueCondition.test(project.apply(value)));
75+
}
76+
7177
@Override
7278
public void put(final K key, final U value) {
7379
cache.put(key, embed.apply(value));

policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/PolicyEnforcerCache.java

+18
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.ConcurrentHashMap;
2222
import java.util.concurrent.ConcurrentMap;
2323
import java.util.function.Function;
24+
import java.util.function.Predicate;
2425

2526
import org.eclipse.ditto.internal.utils.cache.Cache;
2627
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
@@ -102,6 +103,23 @@ public boolean invalidate(final PolicyId policyId) {
102103
return directlyCached || indirectlyCachedViaImport;
103104
}
104105

106+
@Override
107+
public boolean invalidateConditionally(final PolicyId policyId,
108+
final Predicate<Entry<PolicyEnforcer>> valueCondition) {
109+
// Invalidate the changed policy
110+
final boolean directlyCached = delegate.invalidateConditionally(policyId, valueCondition);
111+
112+
// Invalidate all policies that import the changed policy
113+
final boolean indirectlyCachedViaImport = Optional.ofNullable(policyIdToImportingMap.remove(policyId))
114+
.stream()
115+
.flatMap(Collection::stream)
116+
.map(p -> delegate.invalidateConditionally(p, valueCondition))
117+
.reduce((previous, next) -> previous || next)
118+
.orElse(false);
119+
120+
return directlyCached || indirectlyCachedViaImport;
121+
}
122+
105123
@Override
106124
public void put(final PolicyId key, final Entry<PolicyEnforcer> value) {
107125
delegate.put(key, value);

policies/model/src/main/java/org/eclipse/ditto/policies/model/PolicyImport.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import java.util.Optional;
1616

17+
import javax.annotation.Nullable;
18+
1719
import org.eclipse.ditto.base.model.json.FieldType;
1820
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
1921
import org.eclipse.ditto.base.model.json.Jsonifiable;
@@ -35,9 +37,9 @@ public interface PolicyImport extends Jsonifiable.WithFieldSelectorAndPredicate<
3537
* @param importedPolicyId the {@link PolicyId} of the imported policy.
3638
* @param effectedImports the EffectedImports of the new PolicyImport to create.
3739
* @return the new {@code PolicyImport}.
38-
* @throws NullPointerException if any argument is {@code null}.
40+
* @throws NullPointerException if {@code importedPolicyId} is {@code null}.
3941
*/
40-
static PolicyImport newInstance(final PolicyId importedPolicyId, final EffectedImports effectedImports) {
42+
static PolicyImport newInstance(final PolicyId importedPolicyId, @Nullable final EffectedImports effectedImports) {
4143
return PoliciesModelFactory.newPolicyImport(importedPolicyId, effectedImports);
4244
}
4345

thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsSearchPersistence.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@
2727

2828
import javax.annotation.Nullable;
2929

30+
import org.apache.pekko.NotUsed;
31+
import org.apache.pekko.actor.ActorSystem;
32+
import org.apache.pekko.event.Logging;
33+
import org.apache.pekko.event.LoggingAdapter;
34+
import org.apache.pekko.japi.pf.PFBuilder;
35+
import org.apache.pekko.stream.SystemMaterializer;
36+
import org.apache.pekko.stream.javadsl.Source;
3037
import org.bson.BsonDocument;
3138
import org.bson.Document;
3239
import org.bson.conversions.Bson;
@@ -71,13 +78,6 @@
7178
import com.mongodb.reactivestreams.client.MongoCollection;
7279
import com.mongodb.reactivestreams.client.MongoDatabase;
7380

74-
import org.apache.pekko.NotUsed;
75-
import org.apache.pekko.actor.ActorSystem;
76-
import org.apache.pekko.event.Logging;
77-
import org.apache.pekko.event.LoggingAdapter;
78-
import org.apache.pekko.japi.pf.PFBuilder;
79-
import org.apache.pekko.stream.SystemMaterializer;
80-
import org.apache.pekko.stream.javadsl.Source;
8181
import scala.PartialFunction;
8282

8383
/**
@@ -402,7 +402,7 @@ private static Metadata readAsMetadata(final Document document) {
402402
.map(dittoBsonJson::serialize)
403403
.map(PolicyTag::fromJson)
404404
.collect(Collectors.toSet());
405-
return Metadata.of(thingId, thingRevision, thingPolicyTag, referencedPolicies, modified, null);
405+
return Metadata.of(thingId, thingRevision, thingPolicyTag, null, referencedPolicies, modified, null);
406406
}
407407

408408
private static AbstractWriteModel documentToWriteModel(final Document document) {

thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/mapping/EnforcedThingMapper.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_REVISION;
2525
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_THING;
2626

27-
import java.util.HashSet;
27+
import java.util.LinkedHashSet;
2828
import java.util.List;
2929
import java.util.Optional;
3030
import java.util.Set;
@@ -82,7 +82,7 @@ public static ThingWriteModel toWriteModel(final JsonObject thing,
8282
final long thingRevision = thing.getValueOrThrow(Thing.JsonFields.REVISION);
8383
final var optionalPolicyId = thing.getValue(Thing.JsonFields.POLICY_ID).map(PolicyId::of);
8484

85-
final HashSet<PolicyTag> allReferencedPolicies = new HashSet<>(referencedPolicies);
85+
final Set<PolicyTag> allReferencedPolicies = new LinkedHashSet<>(referencedPolicies);
8686
final List<PolicyTag> policyTagsOfDeletedButStillImportedPolicies =
8787
Optional.ofNullable(oldMetadata).map(Metadata::getAllReferencedPolicyTags).orElseGet(Set::of).stream()
8888
.filter(oldReferencedPolicyTag -> policy.getPolicyImports()
@@ -99,7 +99,7 @@ public static ThingWriteModel toWriteModel(final JsonObject thing,
9999
.orElse(null);
100100

101101
final var metadata =
102-
Metadata.of(thingId, thingRevision, thingPolicyTag, allReferencedPolicies,
102+
Metadata.of(thingId, thingRevision, thingPolicyTag, null, allReferencedPolicies,
103103
Optional.ofNullable(oldMetadata).flatMap(Metadata::getModified).orElse(null),
104104
Optional.ofNullable(oldMetadata).map(Metadata::getEvents).orElse(List.of()),
105105
Optional.ofNullable(oldMetadata).map(Metadata::getTimers).orElse(List.of()),

0 commit comments

Comments
 (0)