|
31 | 31 | import org.bson.BsonString;
|
32 | 32 | import org.bson.Document;
|
33 | 33 | import org.bson.conversions.Bson;
|
| 34 | +import org.eclipse.ditto.base.service.config.ThrottlingConfig; |
34 | 35 | import org.eclipse.ditto.policies.api.PolicyTag;
|
35 | 36 | import org.eclipse.ditto.policies.model.PolicyId;
|
36 | 37 | import org.eclipse.ditto.things.model.ThingId;
|
@@ -58,8 +59,11 @@ public final class MongoThingsSearchUpdaterPersistence implements ThingsSearchUp
|
58 | 59 |
|
59 | 60 | private final MongoCollection<Document> collection;
|
60 | 61 |
|
| 62 | + private final ThrottlingConfig PolicyModificationCausedSearchIndexUpdateThrottling; |
| 63 | + |
61 | 64 | private MongoThingsSearchUpdaterPersistence(final MongoDatabase database,
|
62 |
| - final SearchPersistenceConfig updaterPersistenceConfig) { |
| 65 | + final SearchPersistenceConfig updaterPersistenceConfig) { |
| 66 | + this.PolicyModificationCausedSearchIndexUpdateThrottling = updaterPersistenceConfig.getPolicyModificationCausedSearchIndexUpdateThrottling(); |
63 | 67 |
|
64 | 68 | collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
|
65 | 69 | .withReadConcern(updaterPersistenceConfig.readConcern().getMongoReadConcern())
|
@@ -106,18 +110,25 @@ public Source<PolicyReferenceTag, NotUsed> getPolicyReferenceTags(final Map<Poli
|
106 | 110 | .append(PersistenceConstants.FIELD_POLICY_ID, new BsonInt32(1))
|
107 | 111 | .append(PersistenceConstants.FIELD_REFERENCED_POLICIES, new BsonInt32(1)));
|
108 | 112 |
|
109 |
| - return Source.fromPublisher(publisher) |
110 |
| - .mapConcat(doc -> { |
111 |
| - final ThingId thingId = ThingId.of(doc.getString(PersistenceConstants.FIELD_ID)); |
112 |
| - final Collection<PolicyId> referencedPolicyIds = referencedPolicyIds(doc); |
113 |
| - return referencedPolicyIds.stream() |
114 |
| - .map(referencedPolicyId -> Optional.ofNullable(policyRevisions.get(referencedPolicyId)) |
115 |
| - .map(revision -> PolicyTag.of(referencedPolicyId, revision)) |
116 |
| - .map(policyTag -> PolicyReferenceTag.of(thingId, policyTag)) |
117 |
| - .orElse(null)) |
118 |
| - .filter(Objects::nonNull) |
119 |
| - .toList(); |
120 |
| - }); |
| 113 | + |
| 114 | + final Source<Document, NotUsed> throttledSource = PolicyModificationCausedSearchIndexUpdateThrottling.isEnabled() |
| 115 | + ? Source.fromPublisher(publisher).throttle( |
| 116 | + PolicyModificationCausedSearchIndexUpdateThrottling.getLimit(), |
| 117 | + PolicyModificationCausedSearchIndexUpdateThrottling.getInterval() |
| 118 | + ) |
| 119 | + : Source.fromPublisher(publisher); |
| 120 | + |
| 121 | + return throttledSource.mapConcat(doc -> { |
| 122 | + final ThingId thingId = ThingId.of(doc.getString(PersistenceConstants.FIELD_ID)); |
| 123 | + final Collection<PolicyId> referencedPolicyIds = referencedPolicyIds(doc); |
| 124 | + return referencedPolicyIds.stream() |
| 125 | + .map(referencedPolicyId -> Optional.ofNullable(policyRevisions.get(referencedPolicyId)) |
| 126 | + .map(revision -> PolicyTag.of(referencedPolicyId, revision)) |
| 127 | + .map(policyTag -> PolicyReferenceTag.of(thingId, policyTag)) |
| 128 | + .orElse(null)) |
| 129 | + .filter(Objects::nonNull) |
| 130 | + .toList(); |
| 131 | + }); |
121 | 132 | }
|
122 | 133 |
|
123 | 134 | private Collection<PolicyId> referencedPolicyIds(final Document doc) {
|
|
0 commit comments