Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix configuration of throttling of search index update after policy update #2134

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ enum ConfigValue implements KnownConfigValue {
/**
* Whether throttling should be enabled.
*/
ENABLED("enabled", false),
ENABLED("enabled", true),

/**
* The throttling interval meaning in which duration may the configured
Expand Down
2 changes: 1 addition & 1 deletion deployment/helm/ditto/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ description: |
A digital twin is a virtual, cloud based, representation of his real world counterpart
(real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc).
type: application
version: 3.7.0 # chart version is effectively set by release-job
version: 3.7.1 # chart version is effectively set by release-job
appVersion: 3.7.0
keywords:
- iot-chart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ spec:
value: "{{ .Values.thingsSearch.config.mongodb.policyModificationCausedSearchIndexUpdateThrottling.enabled }}"
- name: POLICY_MODIFICATION_CAUSED_SEARCH_INDEX_UPDATE_THROTTLING_INTERVAL
value: "{{ .Values.thingsSearch.config.mongodb.policyModificationCausedSearchIndexUpdateThrottling.interval }}"
- name: POLICY_MODIFICATION_CAUSED_SEARCH_INDEX_UPDATE_policyModificationCausedSearchIndexUpdateThrottlingTHROTTLING_LIMIT
- name: POLICY_MODIFICATION_CAUSED_SEARCH_INDEX_UPDATE_THROTTLING_LIMIT
value: "{{ .Values.thingsSearch.config.mongodb.policyModificationCausedSearchIndexUpdateThrottling.limit }}"
- name: THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_WITH_ACKS_WRITE_CONCERN
value: "{{ .Values.thingsSearch.config.mongodb.searchWithAcksWriteConcern }}"
Expand Down
10 changes: 5 additions & 5 deletions deployment/helm/ditto/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1376,13 +1376,13 @@ thingsSearch:
updaterPersistenceReadConcern: "local"
# updaterPersistenceReadPreference configures the MongoDB read preference for the "ThingUpdater"
updaterPersistenceReadPreference: "primaryPreferred"
# PolicyModificationCausedSearchIndexUpdateThrottling contains throttling configuration for the search Index update after a policy update
# policyModificationCausedSearchIndexUpdateThrottling contains throttling configuration for the search Index update after a policy update
policyModificationCausedSearchIndexUpdateThrottling:
# enabled defines whether throttling should be applied for search Index update after a policy update.
enabled: false
# The time window within which the throttling limit applies.
# enabled defines whether throttling should be applied for search Index update after a policy update
enabled: true
# interval defined the time window within which the throttling limit applies
interval: 1s
# The maximum number of updates allowed within each throttling interval.
# limit is the maximum number of updates allowed within each throttling interval
limit: 100
# updater contains configuration for the "Things Updater" of things-search service
updater:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.javadsl.Source;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
Expand All @@ -48,22 +51,20 @@
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;

import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.javadsl.Source;

/**
* MongoDB specific implementation of the {@link org.eclipse.ditto.thingsearch.service.persistence.write.ThingsSearchUpdaterPersistence}.
*/
public final class MongoThingsSearchUpdaterPersistence implements ThingsSearchUpdaterPersistence {

private final MongoCollection<Document> collection;

private final ThrottlingConfig PolicyModificationCausedSearchIndexUpdateThrottling;
private final ThrottlingConfig policyModificationCausedSearchIndexUpdateThrottling;

private MongoThingsSearchUpdaterPersistence(final MongoDatabase database,
final SearchPersistenceConfig updaterPersistenceConfig) {
this.PolicyModificationCausedSearchIndexUpdateThrottling = updaterPersistenceConfig.getPolicyModificationCausedSearchIndexUpdateThrottling();
final SearchPersistenceConfig updaterPersistenceConfig)
{
this.policyModificationCausedSearchIndexUpdateThrottling = updaterPersistenceConfig
.getPolicyModificationCausedSearchIndexUpdateThrottling();

collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
.withReadConcern(updaterPersistenceConfig.readConcern().getMongoReadConcern())
Expand Down Expand Up @@ -111,10 +112,10 @@ public Source<PolicyReferenceTag, NotUsed> getPolicyReferenceTags(final Map<Poli
.append(PersistenceConstants.FIELD_REFERENCED_POLICIES, new BsonInt32(1)));


final Source<Document, NotUsed> throttledSource = PolicyModificationCausedSearchIndexUpdateThrottling.isEnabled()
final Source<Document, NotUsed> throttledSource = policyModificationCausedSearchIndexUpdateThrottling.isEnabled()
? Source.fromPublisher(publisher).throttle(
PolicyModificationCausedSearchIndexUpdateThrottling.getLimit(),
PolicyModificationCausedSearchIndexUpdateThrottling.getInterval()
policyModificationCausedSearchIndexUpdateThrottling.getLimit(),
policyModificationCausedSearchIndexUpdateThrottling.getInterval()
)
: Source.fromPublisher(publisher);

Expand Down
10 changes: 6 additions & 4 deletions thingsearch/service/src/main/resources/search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,18 @@ ditto {
# read concern is one of: default, local, majority, linearizable, snapshot, available
readConcern = ${ditto.mongodb.options.readConcern}
readConcern = ${?UPDATER_PERSISTENCE_MONGO_DB_READ_CONCERN}
# PolicyModificationCausedSearchIndexUpdateThrottling contains throttling configuration for the search Index update after a policy update

# policyModificationCausedSearchIndexUpdateThrottling contains throttling configuration for the search Index update after a policy update
policyModificationCausedSearchIndexUpdateThrottling {
# enabled defines whether throttling should be applied for search Index update after a policy update.
# enabled defines whether throttling should be applied for search Index update after a policy update
enabled = false
enabled = ${?POLICY_MODIFICATION_CAUSED_SEARCH_INDEX_UPDATE_THROTTLING_ENABLED}
# The interval at which updates are throttled (e.g., every 1 second)

# interval at which updates are throttled (e.g., every 1 second)
interval = 1s
interval = ${?POLICY_MODIFICATION_CAUSED_SEARCH_INDEX_UPDATE_THROTTLING_INTERVAL}

# Maximum number of updates allowed per interval (e.g., 100 updates per second)
# limit is the maximum number of updates allowed per interval (e.g., 100 updates per second)
limit = 100
limit = ${?POLICY_MODIFICATION_CAUSED_SEARCH_INDEX_UPDATE_THROTTLING_LIMIT}
}
Expand Down
Loading