Skip to content

Commit ed88251

Browse files
committedJun 12, 2024·
enable configuring even more flexibly indexes to hint for in Cleanup aggregations
* also create indexes speeding up the aggregation on snapshot collection Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
1 parent 80b89f3 commit ed88251

File tree

11 files changed

+306
-96
lines changed

11 files changed

+306
-96
lines changed
 

‎connectivity/service/src/main/resources/connectivity.conf

+14-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,26 @@ ditto {
66
database = ${?MONGO_DB_DATABASE}
77

88
read-journal {
9+
should-create-additional-snapshot-aggregation-index-pid-id = false
10+
should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID}
11+
12+
should-create-additional-snapshot-aggregation-index-pid = false
13+
should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID}
14+
915
hint-name-filterPidsThatDoesntContainTagInNewestEntry = null
1016
hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY}
1117

1218
hint-name-listLatestJournalEntries = null
1319
hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES}
1420

15-
hint-name-listNewestActiveSnapshotsByBatch = "_id_"
16-
hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH}
21+
hint-name-listNewestActiveSnapshotsByBatchPidId = null
22+
hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID}
23+
24+
hint-name-listNewestActiveSnapshotsByBatchPid = null
25+
hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID}
26+
27+
hint-name-listNewestActiveSnapshotsByBatchId = null
28+
hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID}
1729
}
1830
}
1931

‎internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java

+58-13
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,31 @@ public final class DefaultMongoReadJournalConfig implements MongoReadJournalConf
3232

3333
private static final String CONFIG_PATH = "read-journal";
3434

35+
private final boolean createAdditionalSnapshotAggregationIndexPidId;
36+
private final boolean createAdditionalSnapshotAggregationIndexPid;
3537
@Nullable private final String hintNameFilterPidsThatDoesntContainTagInNewestEntry;
3638
@Nullable private final String hintNameListLatestJournalEntries;
37-
@Nullable private final String listNewestActiveSnapshotsByBatch;
39+
@Nullable private final String listNewestActiveSnapshotsByBatchPidId;
40+
@Nullable private final String listNewestActiveSnapshotsByBatchPid;
41+
@Nullable private final String listNewestActiveSnapshotsByBatchId;
3842

3943
private DefaultMongoReadJournalConfig(final ScopedConfig config) {
44+
createAdditionalSnapshotAggregationIndexPidId = config.getBoolean(
45+
MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID.getConfigPath()
46+
);
47+
createAdditionalSnapshotAggregationIndexPid = config.getBoolean(
48+
MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID.getConfigPath()
49+
);
4050
hintNameFilterPidsThatDoesntContainTagInNewestEntry = getNullableString(config,
4151
MongoReadJournalConfigValue.HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY);
4252
hintNameListLatestJournalEntries = getNullableString(config,
4353
MongoReadJournalConfigValue.HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES);
44-
listNewestActiveSnapshotsByBatch = getNullableString(config,
45-
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH);
54+
listNewestActiveSnapshotsByBatchPidId = getNullableString(config,
55+
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID);
56+
listNewestActiveSnapshotsByBatchPid = getNullableString(config,
57+
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID);
58+
listNewestActiveSnapshotsByBatchId = getNullableString(config,
59+
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID);
4660
}
4761

4862
/**
@@ -59,7 +73,20 @@ public static DefaultMongoReadJournalConfig of(final Config config) {
5973

6074
@Nullable
6175
private static String getNullableString(final Config config, final KnownConfigValue configValue) {
62-
return config.getIsNull(configValue.getConfigPath()) ? null : config.getString(configValue.getConfigPath());
76+
return config.getIsNull(configValue.getConfigPath()) ? null :
77+
Optional.of(config.getString(configValue.getConfigPath()))
78+
.filter(s -> !s.equals("null"))
79+
.orElse(null);
80+
}
81+
82+
@Override
83+
public boolean shouldCreateAdditionalSnapshotAggregationIndexPidId() {
84+
return createAdditionalSnapshotAggregationIndexPidId;
85+
}
86+
87+
@Override
88+
public boolean shouldCreateAdditionalSnapshotAggregationIndexPid() {
89+
return createAdditionalSnapshotAggregationIndexPid;
6390
}
6491

6592
@Override
@@ -73,8 +100,18 @@ public Optional<String> getIndexNameHintForListLatestJournalEntries() {
73100
}
74101

75102
@Override
76-
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch() {
77-
return Optional.ofNullable(listNewestActiveSnapshotsByBatch);
103+
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId() {
104+
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPidId);
105+
}
106+
107+
@Override
108+
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid() {
109+
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPid);
110+
}
111+
112+
@Override
113+
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId() {
114+
return Optional.ofNullable(listNewestActiveSnapshotsByBatchId);
78115
}
79116

80117
@Override
@@ -86,25 +123,33 @@ public boolean equals(final Object o) {
86123
return false;
87124
}
88125
final DefaultMongoReadJournalConfig that = (DefaultMongoReadJournalConfig) o;
89-
return Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
90-
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
126+
return createAdditionalSnapshotAggregationIndexPidId == that.createAdditionalSnapshotAggregationIndexPidId &&
127+
createAdditionalSnapshotAggregationIndexPid == that.createAdditionalSnapshotAggregationIndexPid &&
128+
Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
129+
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
91130
Objects.equals(hintNameListLatestJournalEntries, that.hintNameListLatestJournalEntries) &&
92-
Objects.equals(listNewestActiveSnapshotsByBatch, that.listNewestActiveSnapshotsByBatch);
131+
Objects.equals(listNewestActiveSnapshotsByBatchPidId, that.listNewestActiveSnapshotsByBatchPidId);
93132
}
94133

95134
@Override
96135
public int hashCode() {
97-
return Objects.hash(hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
98-
listNewestActiveSnapshotsByBatch);
136+
return Objects.hash(createAdditionalSnapshotAggregationIndexPidId, createAdditionalSnapshotAggregationIndexPid,
137+
hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
138+
listNewestActiveSnapshotsByBatchPidId, listNewestActiveSnapshotsByBatchPid,
139+
listNewestActiveSnapshotsByBatchId);
99140
}
100141

101142
@Override
102143
public String toString() {
103144
return getClass().getSimpleName() + " [" +
104-
"hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
145+
"createAdditionalSnapshotAggregationIndexPidId=" + createAdditionalSnapshotAggregationIndexPidId +
146+
", createAdditionalSnapshotAggregationIndexPid=" + createAdditionalSnapshotAggregationIndexPid +
147+
", hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
105148
hintNameFilterPidsThatDoesntContainTagInNewestEntry +
106149
", hintNameListLatestJournalEntries=" + hintNameListLatestJournalEntries +
107-
", listNewestActiveSnapshotsByBatch=" + listNewestActiveSnapshotsByBatch +
150+
", listNewestActiveSnapshotsByBatchPidId=" + listNewestActiveSnapshotsByBatchPidId +
151+
", listNewestActiveSnapshotsByBatchPid=" + listNewestActiveSnapshotsByBatchPid +
152+
", listNewestActiveSnapshotsByBatchId=" + listNewestActiveSnapshotsByBatchId +
108153
"]";
109154
}
110155

‎internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java

+50-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@
2525
@Immutable
2626
public interface MongoReadJournalConfig {
2727

28+
/**
29+
* @return whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal
30+
* aggregation queries on the snapshot collection.
31+
*/
32+
boolean shouldCreateAdditionalSnapshotAggregationIndexPidId();
33+
34+
/**
35+
* @return whether additional index for "pid" should be created in order to speed up MongoReadJournal
36+
* aggregation queries on the snapshot collection.
37+
*/
38+
boolean shouldCreateAdditionalSnapshotAggregationIndexPid();
39+
2840
/**
2941
* @return the optional hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
3042
*/
@@ -36,17 +48,41 @@ public interface MongoReadJournalConfig {
3648
Optional<String> getIndexNameHintForListLatestJournalEntries();
3749

3850
/**
39-
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
51+
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} containing both
52+
* "pid" and "_id" fields in first "$match".
4053
*/
41-
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch();
54+
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();
4255

56+
/**
57+
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
58+
* "pid" field in first "$match".
59+
*/
60+
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid();
61+
62+
/**
63+
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
64+
* "_id" field in first "$match".
65+
*/
66+
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId();
4367

4468
/**
4569
* An enumeration of the known config path expressions and their associated default values for
4670
* {@code MongoReadJournalConfig}.
4771
*/
4872
enum MongoReadJournalConfigValue implements KnownConfigValue {
4973

74+
/**
75+
* Whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal
76+
* aggregation queries on the snapshot collection.
77+
*/
78+
SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID("should-create-additional-snapshot-aggregation-index-pid-id", false),
79+
80+
/**
81+
* Whether additional index for "pid" should be created in order to speed up MongoReadJournal aggregation
82+
* queries on the snapshot collection.
83+
*/
84+
SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID("should-create-additional-snapshot-aggregation-index-pid", false),
85+
5086
/**
5187
* Hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
5288
*/
@@ -58,9 +94,19 @@ enum MongoReadJournalConfigValue implements KnownConfigValue {
5894
HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES("hint-name-listLatestJournalEntries", null),
5995

6096
/**
61-
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
97+
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPidId}.
98+
*/
99+
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID("hint-name-listNewestActiveSnapshotsByBatchPidId", null),
100+
101+
/**
102+
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPid}.
103+
*/
104+
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID("hint-name-listNewestActiveSnapshotsByBatchPid", null),
105+
106+
/**
107+
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchId}.
62108
*/
63-
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH("hint-name-listNewestActiveSnapshotsByBatch", null);
109+
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID("hint-name-listNewestActiveSnapshotsByBatchId", null);
64110

65111
private final String path;
66112
private final Object defaultValue;

‎internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java

+65-10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.Optional;
2222
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.Function;
2526
import java.util.stream.Collectors;
@@ -166,6 +167,12 @@ public final class MongoReadJournal implements CurrentEventsByPersistenceIdQuery
166167
private static final Index TAG_PID_INDEX =
167168
IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true);
168169

170+
private static final Index SNAPS_PID_ID_INDEX =
171+
IndexFactory.newInstance("snaps_pid_id_index", List.of(S_PROCESSOR_ID, S_ID), false, false);
172+
173+
private static final Index SNAPS_PID_INDEX =
174+
IndexFactory.newInstance("snaps_pid_index", List.of(S_PROCESSOR_ID), false, false);
175+
169176
private final String journalCollection;
170177
private final String snapsCollection;
171178
private final DittoMongoClient mongoClient;
@@ -201,7 +208,8 @@ public static MongoReadJournal newInstance(final ActorSystem system) {
201208
final Config config = system.settings().config();
202209
final MongoDbConfig mongoDbConfig =
203210
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
204-
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), system);
211+
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(),
212+
system);
205213
}
206214

207215
/**
@@ -240,6 +248,32 @@ public CompletionStage<Done> ensureTagPidIndex() {
240248
return indexInitializer.createNonExistingIndices(journalCollection, List.of(TAG_PID_INDEX));
241249
}
242250

251+
/**
252+
* Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" and "_id".
253+
*
254+
* @return a future that completes after index creation completes or fails when index creation fails.
255+
*/
256+
public CompletionStage<Done> ensureSnapshotCollectionPidIdIndex() {
257+
if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPidId()) {
258+
return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_ID_INDEX));
259+
} else {
260+
return CompletableFuture.completedFuture(Done.getInstance());
261+
}
262+
}
263+
264+
/**
265+
* Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" .
266+
*
267+
* @return a future that completes after index creation completes or fails when index creation fails.
268+
*/
269+
public CompletionStage<Done> ensureSnapshotCollectionPidIndex() {
270+
if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPid()) {
271+
return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_INDEX));
272+
} else {
273+
return CompletableFuture.completedFuture(Done.getInstance());
274+
}
275+
}
276+
243277
/**
244278
* Retrieve all unique PIDs in journals. Does its best not to create long-living cursors on the database by reading
245279
* {@code batchSize} events per query.
@@ -349,6 +383,7 @@ private Source<String, NotUsed> filterPidsThatDoesntContainTagInNewestEntry(fina
349383
));
350384
final AggregatePublisher<Document> hintedAggregate =
351385
readJournalConfig.getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry()
386+
.filter(hint -> !hint.equals("null"))
352387
.map(aggregate::hintString)
353388
.orElse(aggregate);
354389
return Source.fromPublisher(hintedAggregate)
@@ -928,7 +963,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
928963

929964
final List<Bson> pipeline = new ArrayList<>(5);
930965
// match stage
931-
pipeline.add(Aggregates.match(snapshotFilter.toMongoFilter()));
966+
final Bson matchFilter = snapshotFilter.toMongoFilter();
967+
pipeline.add(Aggregates.match(matchFilter));
932968

933969
// sort stage
934970
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(S_PROCESSOR_ID), Sorts.descending(S_SN))));
@@ -947,8 +983,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
947983
final String items = "i";
948984
pipeline.add(Aggregates.group(
949985
new Document("_id", new BsonNull()),
950-
Accumulators.max(maxPid, "$"+ S_ID),
951-
Accumulators.push(items,"$$ROOT")));
986+
Accumulators.max(maxPid, "$" + S_ID),
987+
Accumulators.push(items, "$$ROOT")));
952988

953989
// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
954990
// if includeDeleted=true keeps them using "$$DESCEND"
@@ -963,13 +999,15 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
963999
)));
9641000

9651001
final AggregatePublisher<Document> aggregate = snapshotStore.aggregate(pipeline);
966-
final AggregatePublisher<Document> hintedAggregate =
967-
readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatch()
968-
.map(aggregate::hintString)
969-
.orElse(aggregate);
1002+
final Optional<String> indexHint = calculateIndexHint(matchFilter);
1003+
final AggregatePublisher<Document> hintedAggregate = indexHint
1004+
.filter(hint -> !hint.equals("null"))
1005+
.map(aggregate::hintString)
1006+
.orElse(aggregate);
9701007
return Source.fromPublisher(
971-
hintedAggregate
972-
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
1008+
hintedAggregate
1009+
.batchSize(batchSize)
1010+
// use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
9731011
)
9741012
.flatMapConcat(document -> {
9751013
final String theMaxPid = document.getString(maxPid);
@@ -983,6 +1021,23 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
9831021
});
9841022
}
9851023

1024+
private Optional<String> calculateIndexHint(final Bson matchFilter) {
1025+
final String matchJson = matchFilter.toBsonDocument().toJson();
1026+
final boolean matchContainsPid = matchJson.contains("\"pid\":");
1027+
final boolean matchContainsId = matchJson.contains("\"_id\":");
1028+
final Optional<String> indexHint;
1029+
if (matchContainsPid && matchContainsId) {
1030+
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();
1031+
} else if (matchContainsPid) {
1032+
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPid();
1033+
} else if (matchContainsId) {
1034+
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchId();
1035+
} else {
1036+
indexHint = Optional.empty();
1037+
}
1038+
return indexHint;
1039+
}
1040+
9861041
private static Source<List<String>, NotUsed> listJournalEntryTags(final MongoCollection<Document> journal,
9871042
final String pid) {
9881043

‎internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import java.util.function.Supplier;
2222
import java.util.stream.LongStream;
2323

24-
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
25-
2624
import org.apache.pekko.NotUsed;
2725
import org.apache.pekko.japi.Pair;
2826
import org.apache.pekko.stream.Materializer;
2927
import org.apache.pekko.stream.javadsl.Source;
28+
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
29+
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
3030

3131
/**
3232
* An Pekko stream to handle background cleanup regulated by insert times.
@@ -42,6 +42,7 @@ final class Cleanup {
4242
private final boolean deleteFinalDeletedSnapshot;
4343

4444
Cleanup(final MongoReadJournal readJournal,
45+
final ThreadSafeDittoLoggingAdapter logger,
4546
final Materializer materializer,
4647
final Supplier<Pair<Integer, Integer>> responsibilitySupplier,
4748
final Duration historyRetentionDuration,
@@ -56,14 +57,22 @@ final class Cleanup {
5657
this.readBatchSize = readBatchSize;
5758
this.deleteBatchSize = deleteBatchSize;
5859
this.deleteFinalDeletedSnapshot = deleteFinalDeletedSnapshot;
60+
61+
readJournal.ensureSnapshotCollectionPidIdIndex()
62+
.thenCompose(done -> readJournal.ensureSnapshotCollectionPidIndex())
63+
.exceptionally(e -> {
64+
logger.error(e, "Failed to create index for read journal snapshot aggregation queries");
65+
return null;
66+
});
5967
}
6068

6169
static Cleanup of(final CleanupConfig config,
6270
final MongoReadJournal readJournal,
71+
final ThreadSafeDittoLoggingAdapter logger,
6372
final Materializer materializer,
6473
final Supplier<Pair<Integer, Integer>> responsibilitySupplier) {
6574

66-
return new Cleanup(readJournal, materializer, responsibilitySupplier,
75+
return new Cleanup(readJournal, logger, materializer, responsibilitySupplier,
6776
config.getHistoryRetentionDuration(),
6877
config.getReadsPerQuery(),
6978
config.getWritesPerCredit(),
@@ -76,7 +85,8 @@ Source<Source<CleanupResult, NotUsed>, NotUsed> getCleanupStream(final String lo
7685
}
7786

7887
private Source<SnapshotRevision, NotUsed> getSnapshotRevisions(final String lowerBound) {
79-
return readJournal.getNewestSnapshotsAbove(lowerBound, readBatchSize, true, historyRetentionDuration, materializer)
88+
return readJournal.getNewestSnapshotsAbove(lowerBound, readBatchSize, true, historyRetentionDuration,
89+
materializer)
8090
.map(document -> new SnapshotRevision(document.getString(S_ID),
8191
document.getLong(S_SN),
8292
"DELETED".equals(document.getString(LIFECYCLE))))

‎internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java

+21-22
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,6 @@
1717

1818
import javax.annotation.Nullable;
1919

20-
import org.eclipse.ditto.base.api.common.ModifyConfig;
21-
import org.eclipse.ditto.base.api.common.RetrieveConfig;
22-
import org.eclipse.ditto.base.model.headers.DittoHeaders;
23-
import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior;
24-
import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior;
25-
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
26-
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
27-
import org.eclipse.ditto.internal.utils.health.RetrieveHealth;
28-
import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse;
29-
import org.eclipse.ditto.internal.utils.health.StatusDetailMessage;
30-
import org.eclipse.ditto.internal.utils.health.StatusInfo;
31-
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
32-
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
33-
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
34-
import org.eclipse.ditto.json.JsonObject;
35-
import org.eclipse.ditto.json.JsonValue;
36-
37-
import com.typesafe.config.Config;
38-
import com.typesafe.config.ConfigFactory;
39-
4020
import org.apache.pekko.Done;
4121
import org.apache.pekko.actor.AbstractFSM;
4222
import org.apache.pekko.actor.ActorRef;
@@ -53,6 +33,25 @@
5333
import org.apache.pekko.stream.UniqueKillSwitch;
5434
import org.apache.pekko.stream.javadsl.Keep;
5535
import org.apache.pekko.stream.javadsl.Sink;
36+
import org.eclipse.ditto.base.api.common.ModifyConfig;
37+
import org.eclipse.ditto.base.api.common.RetrieveConfig;
38+
import org.eclipse.ditto.base.model.headers.DittoHeaders;
39+
import org.eclipse.ditto.internal.utils.health.RetrieveHealth;
40+
import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse;
41+
import org.eclipse.ditto.internal.utils.health.StatusDetailMessage;
42+
import org.eclipse.ditto.internal.utils.health.StatusInfo;
43+
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
44+
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
45+
import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior;
46+
import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior;
47+
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
48+
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
49+
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
50+
import org.eclipse.ditto.json.JsonObject;
51+
import org.eclipse.ditto.json.JsonValue;
52+
53+
import com.typesafe.config.Config;
54+
import com.typesafe.config.ConfigFactory;
5655

5756
/**
5857
* Actor to control persistence cleanup.
@@ -110,7 +109,7 @@ private PersistenceCleanupActor(final CleanupConfig config,
110109
this.mongoReadJournal = mongoReadJournal;
111110
responsibilitySupplier = ClusterResponsibilitySupplier.of(cluster, myRole);
112111
this.config = config;
113-
cleanup = Cleanup.of(config, mongoReadJournal, materializer, responsibilitySupplier);
112+
cleanup = Cleanup.of(config, mongoReadJournal, logger, materializer, responsibilitySupplier);
114113
credits = Credits.of(config);
115114
}
116115

@@ -324,7 +323,7 @@ public Config getConfig() {
324323
@Override
325324
public Config setConfig(final Config config) {
326325
this.config = this.config.setAll(config);
327-
cleanup = Cleanup.of(this.config, mongoReadJournal, materializer, responsibilitySupplier);
326+
cleanup = Cleanup.of(this.config, mongoReadJournal, logger, materializer, responsibilitySupplier);
328327
credits = Credits.of(this.config);
329328
getSelf().tell(Control.SHUTDOWN, ActorRef.noSender());
330329

‎internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,25 @@
2424
import java.time.Duration;
2525
import java.util.List;
2626
import java.util.Optional;
27+
import java.util.concurrent.CompletableFuture;
2728
import java.util.stream.Collectors;
2829

29-
import org.bson.Document;
30-
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
31-
import org.junit.After;
32-
import org.junit.Before;
33-
import org.junit.Test;
34-
35-
import com.mongodb.client.result.DeleteResult;
36-
30+
import org.apache.pekko.Done;
3731
import org.apache.pekko.actor.ActorSystem;
3832
import org.apache.pekko.japi.Pair;
3933
import org.apache.pekko.stream.Materializer;
4034
import org.apache.pekko.stream.SystemMaterializer;
4135
import org.apache.pekko.stream.javadsl.Sink;
4236
import org.apache.pekko.stream.javadsl.Source;
4337
import org.apache.pekko.testkit.javadsl.TestKit;
38+
import org.bson.Document;
39+
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
40+
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
41+
import org.junit.After;
42+
import org.junit.Before;
43+
import org.junit.Test;
44+
45+
import com.mongodb.client.result.DeleteResult;
4446

4547
/**
4648
* Tests {@link Cleanup}.
@@ -55,6 +57,8 @@ public final class CleanupTest {
5557
@Before
5658
public void init() {
5759
mongoReadJournal = mock(MongoReadJournal.class);
60+
when(mongoReadJournal.ensureSnapshotCollectionPidIdIndex())
61+
.thenReturn(CompletableFuture.completedFuture(Done.getInstance()));
5862
materializer = SystemMaterializer.get(actorSystem).materializer();
5963
}
6064

@@ -68,7 +72,8 @@ public void emptyStream() {
6872
when(mongoReadJournal.getNewestSnapshotsAbove(any(), anyInt(), eq(true), any(), any()))
6973
.thenReturn(Source.empty());
7074

71-
final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1),
75+
final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer,
76+
() -> Pair.create(0, 1),
7277
Duration.ZERO, 1, 1, true);
7378
final var result = underTest.getCleanupStream("")
7479
.flatMapConcat(x -> x)
@@ -96,7 +101,8 @@ public void deleteFinalDeletedSnapshot() {
96101
invocation.<Long>getArgument(1) * 1000L + invocation.<Long>getArgument(2) * 10L)))
97102
.when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong());
98103

99-
final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1),
104+
final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer,
105+
() -> Pair.create(0, 1),
100106
Duration.ZERO, 1, 4, true);
101107

102108
final var result = underTest.getCleanupStream("")
@@ -130,7 +136,8 @@ public void excludeFinalDeletedSnapshot() {
130136
invocation.<Long>getArgument(1) * 1000L + invocation.<Long>getArgument(2) * 10L)))
131137
.when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong());
132138

133-
final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1),
139+
final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer,
140+
() -> Pair.create(0, 1),
134141
Duration.ZERO, 1, 4, false);
135142

136143
final var result = underTest.getCleanupStream("")
@@ -172,7 +179,8 @@ public void ignorePidsNotResponsibleFor() {
172179
.when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong());
173180

174181
// WHEN: the instance is responsible for 1/3 of the 3 PIDs
175-
final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(2, 3),
182+
final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer,
183+
() -> Pair.create(2, 3),
176184
Duration.ZERO, 1, 4, false);
177185

178186
final var result = underTest.getCleanupStream("")

‎internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,12 @@
2323

2424
import java.time.Duration;
2525
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.concurrent.atomic.AtomicLong;
2829
import java.util.concurrent.atomic.LongAccumulator;
2930

30-
import org.bson.Document;
31-
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
32-
import org.junit.After;
33-
import org.junit.Before;
34-
import org.junit.Test;
35-
36-
import com.mongodb.client.result.DeleteResult;
37-
31+
import org.apache.pekko.Done;
3832
import org.apache.pekko.actor.ActorSystem;
3933
import org.apache.pekko.event.Logging;
4034
import org.apache.pekko.japi.Pair;
@@ -48,6 +42,14 @@
4842
import org.apache.pekko.stream.testkit.javadsl.TestSink;
4943
import org.apache.pekko.stream.testkit.javadsl.TestSource;
5044
import org.apache.pekko.testkit.javadsl.TestKit;
45+
import org.bson.Document;
46+
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
47+
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
48+
import org.junit.After;
49+
import org.junit.Before;
50+
import org.junit.Test;
51+
52+
import com.mongodb.client.result.DeleteResult;
5153

5254
/**
5355
* Tests {@link Credits}.
@@ -110,6 +112,8 @@ public void noElementRequestedWithoutCredit() {
110112
@Test
111113
public void onePersistenceWriteAllowedPerCredit() {
112114
final var mongoReadJournal = mock(MongoReadJournal.class);
115+
when(mongoReadJournal.ensureSnapshotCollectionPidIdIndex())
116+
.thenReturn(CompletableFuture.completedFuture(Done.getInstance()));
113117
final var opsCounter = new AtomicInteger(0);
114118

115119
when(mongoReadJournal.getNewestSnapshotsAbove(any(), anyInt(), eq(true), any(), any()))
@@ -134,7 +138,8 @@ public void onePersistenceWriteAllowedPerCredit() {
134138
// mock timer permits 1 batch of credit, after which no credit is given out
135139
final var mockTimerResult = new AtomicLong(0L);
136140
doAnswer(inv -> mockTimerResult.getAndSet(1001L)).when(mockTimer).getThenReset();
137-
final var cleanup = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1),
141+
final var cleanup = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer,
142+
() -> Pair.create(0, 1),
138143
Duration.ZERO, 1, 4, true);
139144
final var underTest = new Credits(getFastCreditConfig(4), mockTimer);
140145

‎internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActorTest.java

+22-16
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,28 @@
1919
import static org.mockito.Mockito.mock;
2020
import static org.mockito.Mockito.timeout;
2121
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.when;
2223

2324
import java.time.Duration;
2425
import java.time.temporal.ChronoUnit;
2526
import java.util.Map;
27+
import java.util.concurrent.CompletableFuture;
2628
import java.util.concurrent.atomic.AtomicReference;
2729

30+
import org.apache.pekko.Done;
31+
import org.apache.pekko.NotUsed;
32+
import org.apache.pekko.actor.ActorRef;
33+
import org.apache.pekko.actor.ActorSystem;
34+
import org.apache.pekko.actor.FSM;
35+
import org.apache.pekko.actor.Props;
36+
import org.apache.pekko.event.Logging;
37+
import org.apache.pekko.japi.Pair;
38+
import org.apache.pekko.stream.Attributes;
39+
import org.apache.pekko.stream.KillSwitches;
40+
import org.apache.pekko.stream.javadsl.Keep;
41+
import org.apache.pekko.stream.javadsl.Source;
42+
import org.apache.pekko.stream.testkit.javadsl.TestSource;
43+
import org.apache.pekko.testkit.javadsl.TestKit;
2844
import org.eclipse.ditto.base.api.common.ModifyConfig;
2945
import org.eclipse.ditto.base.api.common.ModifyConfigResponse;
3046
import org.eclipse.ditto.base.api.common.RetrieveConfig;
@@ -45,21 +61,6 @@
4561
import com.typesafe.config.ConfigFactory;
4662
import com.typesafe.config.ConfigRenderOptions;
4763

48-
import org.apache.pekko.Done;
49-
import org.apache.pekko.NotUsed;
50-
import org.apache.pekko.actor.ActorRef;
51-
import org.apache.pekko.actor.ActorSystem;
52-
import org.apache.pekko.actor.FSM;
53-
import org.apache.pekko.actor.Props;
54-
import org.apache.pekko.event.Logging;
55-
import org.apache.pekko.japi.Pair;
56-
import org.apache.pekko.stream.Attributes;
57-
import org.apache.pekko.stream.KillSwitches;
58-
import org.apache.pekko.stream.javadsl.Keep;
59-
import org.apache.pekko.stream.javadsl.Source;
60-
import org.apache.pekko.stream.testkit.javadsl.TestSource;
61-
import org.apache.pekko.testkit.javadsl.TestKit;
62-
6364
/**
6465
* Tests {@link PersistenceCleanupActor}.
6566
*/
@@ -69,13 +70,18 @@ public final class PersistenceCleanupActorTest {
6970
ConfigFactory.load("test.conf"));
7071
private final AtomicReference<Source<Source<CleanupResult, NotUsed>, NotUsed>> sourceBox =
7172
new AtomicReference<>(Source.empty());
73+
74+
private MongoReadJournal mongoReadJournal;
7275
private Cleanup cleanup;
7376
private Credits credits;
7477

7578
@Before
7679
public void init() {
80+
mongoReadJournal = mock(MongoReadJournal.class);
7781
cleanup = mock(Cleanup.class);
7882
credits = mock(Credits.class);
83+
when(mongoReadJournal.ensureSnapshotCollectionPidIdIndex())
84+
.thenReturn(CompletableFuture.completedFuture(Done.getInstance()));
7985
doAnswer(inv -> Source.empty()).when(cleanup).getCleanupStream(any());
8086
doAnswer(inv -> sourceBox.get()).when(credits).regulate(any(), any());
8187
}
@@ -333,7 +339,7 @@ private void waitForResponse(final TestKit testKit,
333339

334340
private Props testProps() {
335341
return Props.create(PersistenceCleanupActor.class,
336-
() -> new PersistenceCleanupActor(cleanup, credits, mock(MongoReadJournal.class),
342+
() -> new PersistenceCleanupActor(cleanup, credits, mongoReadJournal,
337343
() -> Pair.create(0, 1)));
338344
}
339345

‎policies/service/src/main/resources/policies.conf

+14-2
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,26 @@ ditto {
2020
database = ${?MONGO_DB_DATABASE}
2121

2222
read-journal {
23+
should-create-additional-snapshot-aggregation-index-pid-id = false
24+
should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID}
25+
26+
should-create-additional-snapshot-aggregation-index-pid = false
27+
should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID}
28+
2329
hint-name-filterPidsThatDoesntContainTagInNewestEntry = null
2430
hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY}
2531

2632
hint-name-listLatestJournalEntries = null
2733
hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES}
2834

29-
hint-name-listNewestActiveSnapshotsByBatch = "_id_"
30-
hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH}
35+
hint-name-listNewestActiveSnapshotsByBatchPidId = null
36+
hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID}
37+
38+
hint-name-listNewestActiveSnapshotsByBatchPid = null
39+
hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID}
40+
41+
hint-name-listNewestActiveSnapshotsByBatchId = null
42+
hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID}
3143
}
3244
}
3345

‎things/service/src/main/resources/things.conf

+14-2
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,26 @@ ditto {
2626
database = ${?MONGO_DB_DATABASE}
2727

2828
read-journal {
29+
should-create-additional-snapshot-aggregation-index-pid-id = false
30+
should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID}
31+
32+
should-create-additional-snapshot-aggregation-index-pid = false
33+
should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID}
34+
2935
hint-name-filterPidsThatDoesntContainTagInNewestEntry = null
3036
hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY}
3137

3238
hint-name-listLatestJournalEntries = null
3339
hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES}
3440

35-
hint-name-listNewestActiveSnapshotsByBatch = "_id_"
36-
hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH}
41+
hint-name-listNewestActiveSnapshotsByBatchPidId = null
42+
hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID}
43+
44+
hint-name-listNewestActiveSnapshotsByBatchPid = null
45+
hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID}
46+
47+
hint-name-listNewestActiveSnapshotsByBatchId = null
48+
hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID}
3749
}
3850
}
3951

0 commit comments

Comments
 (0)
Please sign in to comment.