|
25 | 25 | import java.util.Set;
|
26 | 26 | import java.util.stream.Collectors;
|
27 | 27 |
|
| 28 | +import org.apache.pekko.NotUsed; |
| 29 | +import org.apache.pekko.japi.pf.PFBuilder; |
| 30 | +import org.apache.pekko.stream.javadsl.Source; |
28 | 31 | import org.bson.BsonDateTime;
|
29 | 32 | import org.bson.BsonDocument;
|
30 | 33 | import org.bson.BsonInt32;
|
|
48 | 51 | import com.mongodb.reactivestreams.client.MongoCollection;
|
49 | 52 | import com.mongodb.reactivestreams.client.MongoDatabase;
|
50 | 53 |
|
51 |
| -import org.apache.pekko.NotUsed; |
52 |
| -import org.apache.pekko.japi.pf.PFBuilder; |
53 |
| -import org.apache.pekko.stream.javadsl.Source; |
54 |
| - |
55 | 54 | /**
|
56 | 55 | * MongoDB specific implementation of the {@link org.eclipse.ditto.thingsearch.service.persistence.write.ThingsSearchUpdaterPersistence}.
|
57 | 56 | */
|
58 | 57 | public final class MongoThingsSearchUpdaterPersistence implements ThingsSearchUpdaterPersistence {
|
59 | 58 |
|
60 | 59 | private final MongoCollection<Document> collection;
|
61 | 60 |
|
62 |
| - private final ThrottlingConfig PolicyModificationCausedSearchIndexUpdateThrottling; |
| 61 | + private final ThrottlingConfig policyModificationCausedSearchIndexUpdateThrottling; |
63 | 62 |
|
64 | 63 | private MongoThingsSearchUpdaterPersistence(final MongoDatabase database,
|
65 |
| - final SearchPersistenceConfig updaterPersistenceConfig) { |
66 |
| - this.PolicyModificationCausedSearchIndexUpdateThrottling = updaterPersistenceConfig.getPolicyModificationCausedSearchIndexUpdateThrottling(); |
| 64 | + final SearchPersistenceConfig updaterPersistenceConfig) |
| 65 | + { |
| 66 | + this.policyModificationCausedSearchIndexUpdateThrottling = updaterPersistenceConfig |
| 67 | + .getPolicyModificationCausedSearchIndexUpdateThrottling(); |
67 | 68 |
|
68 | 69 | collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
|
69 | 70 | .withReadConcern(updaterPersistenceConfig.readConcern().getMongoReadConcern())
|
@@ -111,10 +112,10 @@ public Source<PolicyReferenceTag, NotUsed> getPolicyReferenceTags(final Map<Poli
|
111 | 112 | .append(PersistenceConstants.FIELD_REFERENCED_POLICIES, new BsonInt32(1)));
|
112 | 113 |
|
113 | 114 |
|
114 |
| - final Source<Document, NotUsed> throttledSource = PolicyModificationCausedSearchIndexUpdateThrottling.isEnabled() |
| 115 | + final Source<Document, NotUsed> throttledSource = policyModificationCausedSearchIndexUpdateThrottling.isEnabled() |
115 | 116 | ? Source.fromPublisher(publisher).throttle(
|
116 |
| - PolicyModificationCausedSearchIndexUpdateThrottling.getLimit(), |
117 |
| - PolicyModificationCausedSearchIndexUpdateThrottling.getInterval() |
| 117 | + policyModificationCausedSearchIndexUpdateThrottling.getLimit(), |
| 118 | + policyModificationCausedSearchIndexUpdateThrottling.getInterval() |
118 | 119 | )
|
119 | 120 | : Source.fromPublisher(publisher);
|
120 | 121 |
|
|
0 commit comments