Skip to content

Commit d477ec2

Browse files
authored
Merge branch 'master' into support-updating-referenced-WoT-ThingModel
2 parents 352856a + 2de8ee8 commit d477ec2

File tree

10 files changed

+225
-121
lines changed

10 files changed

+225
-121
lines changed

connectivity/model/src/main/java/org/eclipse/ditto/connectivity/model/MetricType.java

+5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public enum MetricType {
4343
*/
4444
MAPPED("mapped", MetricDirection.INBOUND, MetricDirection.OUTBOUND),
4545

46+
/**
47+
* Counts others/uncategorized for messages.
48+
*/
49+
OTHER("other", MetricDirection.INBOUND, MetricDirection.OUTBOUND),
50+
4651
/**
4752
* Counts messages that were dropped (not published by intention e.g. because no reply-to address was given).
4853
*/

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotEmpty;
1616
import static org.eclipse.ditto.connectivity.model.MetricType.DROPPED;
1717
import static org.eclipse.ditto.connectivity.model.MetricType.MAPPED;
18+
import static org.eclipse.ditto.connectivity.model.MetricType.OTHER;
1819

1920
import java.io.PrintWriter;
2021
import java.io.StringWriter;
@@ -157,6 +158,7 @@ public final class OutboundMappingProcessorActor
157158
private final ConnectionMonitor responseDispatchedMonitor;
158159
private final ConnectionMonitor responseDroppedMonitor;
159160
private final ConnectionMonitor responseMappedMonitor;
161+
private final ConnectionMonitor responseOtherMonitor;
160162
private final SignalEnrichmentFacade signalEnrichmentFacade;
161163
private final int processorPoolSize;
162164
private final DittoRuntimeExceptionToErrorResponseFunction toErrorResponseFunction;
@@ -187,6 +189,7 @@ private OutboundMappingProcessorActor(final ActorRef clientActor,
187189
responseDispatchedMonitor = connectionMonitorRegistry.forResponseDispatched(this.connection);
188190
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(this.connection);
189191
responseMappedMonitor = connectionMonitorRegistry.forResponseMapped(this.connection);
192+
responseOtherMonitor = connectionMonitorRegistry.forResponseOther(this.connection);
190193
signalEnrichmentFacade = ConnectivitySignalEnrichmentProvider.get(system, dittoExtensionConfig).getFacade(this.connection.getId());
191194
this.processorPoolSize = determinePoolSize(processorPoolSize, mappingConfig.getMaxPoolSize());
192195
toErrorResponseFunction = DittoRuntimeExceptionToErrorResponseFunction.of(DittoHeadersValidator.get(system, dittoExtensionConfig));
@@ -661,15 +664,13 @@ private Object handleSignal(final Signal<?> signal, final ActorRef sender) {
661664
})
662665
.onError((mapperId, exception, topicPath, unused) -> {
663666
if (exception instanceof DittoRuntimeException e) {
664-
monitorsForOther.forEach(monitor ->
665-
monitor.getLogger().failure(infoProvider, e));
667+
monitorsForOther.forEach(monitor -> monitor.failure(infoProvider, e));
666668
logger.withCorrelationId(e)
667669
.info("Got DittoRuntimeException during processing Signal: {} - {}",
668670
e.getMessage(),
669671
e.getDescription().orElse(""));
670672
} else {
671-
monitorsForOther.forEach(monitor ->
672-
monitor.getLogger().exception(infoProvider, exception));
673+
monitorsForOther.forEach(monitor -> monitor.exception(infoProvider, exception));
673674
logger.withCorrelationId(outbound.getSource())
674675
.warning("Got unexpected exception during processing Signal <{}>.",
675676
exception.getMessage());
@@ -696,7 +697,7 @@ private Set<ConnectionMonitor> getMonitorsForMappedSignal(final OutboundSignal o
696697

697698
private Set<ConnectionMonitor> getMonitorsForOther(final OutboundSignal outbound) {
698699

699-
return getMonitorsForOutboundSignal(outbound, MAPPED, LogType.OTHER, responseMappedMonitor);
700+
return getMonitorsForOutboundSignal(outbound, OTHER, LogType.OTHER, responseOtherMonitor);
700701
}
701702

702703
private Set<ConnectionMonitor> getMonitorsForOutboundSignal(final OutboundSignal outbound,

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/ConnectionMonitorRegistry.java

+9
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ public interface ConnectionMonitorRegistry<T> {
170170
*/
171171
T forResponseMapped(Connection connection);
172172

173+
/**
174+
* Gets counter for {@link org.eclipse.ditto.connectivity.model.MetricDirection#OUTBOUND}/{@link
175+
* org.eclipse.ditto.connectivity.model.MetricType#OTHER} messages for responses.
176+
*
177+
* @param connection connection
178+
* @return the response mapped counter
179+
*/
180+
T forResponseOther(Connection connection);
181+
173182
/**
174183
* Gets counter for {@link org.eclipse.ditto.connectivity.model.MetricDirection#OUTBOUND}/{@link
175184
* org.eclipse.ditto.connectivity.model.MetricType#PUBLISHED} messages for responses.

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/DefaultConnectionMonitorRegistry.java

+8
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,14 @@ public ConnectionMonitor forResponseMapped(final Connection connection) {
183183
.build();
184184
}
185185

186+
@Override
187+
public ConnectionMonitor forResponseOther(final Connection connection) {
188+
return DefaultConnectionMonitor.builder(
189+
connectionCounterRegistry.forResponseOther(connection),
190+
connectionLoggerRegistry.forResponseOther(connection))
191+
.build();
192+
}
193+
186194
@Override
187195
public ConnectionMonitor forResponsePublished(final Connection connection) {
188196
return DefaultConnectionMonitor.builder(

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java

+5
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,11 @@ public ConnectionLogger forResponseMapped(final Connection connection) {
464464
return getLogger(connection.getId(), LogCategory.RESPONSE, LogType.MAPPED, RESPONSES_ADDRESS);
465465
}
466466

467+
@Override
468+
public ConnectionLogger forResponseOther(final Connection connection) {
469+
return getLogger(connection.getId(), LogCategory.RESPONSE, LogType.OTHER, RESPONSES_ADDRESS);
470+
}
471+
467472
@Override
468473
public ConnectionLogger forResponsePublished(final Connection connection) {
469474
return getLogger(connection.getId(), LogCategory.RESPONSE, LogType.PUBLISHED, RESPONSES_ADDRESS);

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectivityCounterRegistry.java

+5
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ public ConnectionMetricsCounter forResponseMapped(final Connection connection) {
246246
return getCounter(connection, MetricType.MAPPED, MetricDirection.OUTBOUND, RESPONSES_ADDRESS);
247247
}
248248

249+
@Override
250+
public ConnectionMetricsCounter forResponseOther(final Connection connection) {
251+
return getCounter(connection, MetricType.OTHER, MetricDirection.OUTBOUND, RESPONSES_ADDRESS);
252+
}
253+
249254
@Override
250255
public ConnectionMetricsCounter forResponsePublished(final Connection connection) {
251256
return getCounter(connection, MetricType.PUBLISHED, MetricDirection.OUTBOUND, RESPONSES_ADDRESS);

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectivityCounterRegistryTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.eclipse.ditto.connectivity.model.MetricType.ENFORCED;
2424
import static org.eclipse.ditto.connectivity.model.MetricType.FILTERED;
2525
import static org.eclipse.ditto.connectivity.model.MetricType.MAPPED;
26+
import static org.eclipse.ditto.connectivity.model.MetricType.OTHER;
2627
import static org.eclipse.ditto.connectivity.model.MetricType.PUBLISHED;
2728
import static org.eclipse.ditto.connectivity.model.MetricType.THROTTLED;
2829
import static org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.MeasurementWindow.ONE_MINUTE_WITH_ONE_MINUTE_RESOLUTION;
@@ -208,7 +209,7 @@ public void testThrottlingCounterRegistered() {
208209

209210
final SourceMetrics sourceMetrics = counterRegistry.aggregateSourceMetrics(connection.getId());
210211

211-
final MetricType[] expectedTypes = {CONSUMED, MAPPED, DROPPED, ACKNOWLEDGED, ENFORCED, THROTTLED};
212+
final MetricType[] expectedTypes = {OTHER, CONSUMED, MAPPED, DROPPED, ACKNOWLEDGED, ENFORCED, THROTTLED};
212213
connection.getSources()
213214
.stream()
214215
.flatMap(source -> source.getAddresses().stream())

internal/utils/conditional-headers/src/main/java/org/eclipse/ditto/internal/utils/headers/conditional/IfEqualPreconditionHeader.java

+41-14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import javax.annotation.concurrent.Immutable;
2424

2525
import org.eclipse.ditto.base.model.entity.Entity;
26+
import org.eclipse.ditto.base.model.entity.id.EntityId;
27+
import org.eclipse.ditto.base.model.entity.type.EntityType;
2628
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
2729
import org.eclipse.ditto.base.model.headers.IfEqual;
2830
import org.eclipse.ditto.base.model.json.FieldType;
@@ -52,6 +54,9 @@ public final class IfEqualPreconditionHeader<C extends Command<?>> implements Pr
5254

5355
private static final String IF_EQUAL_KEY = DittoHeaderDefinition.IF_EQUAL.getKey();
5456

57+
private static final EntityType POLICY_ENTITY_TYPE = EntityType.of("policy");
58+
private static final EntityType THING_ENTITY_TYPE = EntityType.of("thing");
59+
5560
private final C command;
5661
private final IfEqual ifEqual;
5762
private final ConditionalHeadersValidator.ValidationSettings validationSettings;
@@ -132,14 +137,18 @@ private Boolean meetsConditionForModifyCommand(final Entity<?> entity,
132137

133138
return withOptionalEntity.getEntity()
134139
.map(newValue -> {
135-
final Predicate<JsonField> fieldPredicate = calculatePredicate(command.getResourcePath());
140+
final Predicate<JsonField> fieldPredicate = calculatePredicate(command.getResourcePath(),
141+
entity.getEntityId().map(EntityId::getEntityType).orElse(null), true);
136142
final Optional<JsonValue> previousValue =
137143
entity.toJson(JsonSchemaVersion.LATEST, fieldPredicate)
138144
.getValue(command.getResourcePath());
139145
final JsonValue adjustedNewValue;
140146
if (newValue.isObject()) {
141147
adjustedNewValue = newValue.asObject()
142-
.filter(calculatePredicateForNew(command.getResourcePath()));
148+
.filter(calculatePredicateForNew(
149+
command.getResourcePath(),
150+
entity.getEntityId().map(EntityId::getEntityType).orElse(null)
151+
));
143152
} else {
144153
adjustedNewValue = newValue;
145154
}
@@ -154,13 +163,19 @@ private Boolean meetsConditionForMergeCommand(final Entity<?> entity,
154163

155164
return withOptionalEntity.getEntity()
156165
.map(newValue -> {
157-
final Optional<JsonValue> previousValue = entity.toJson().getValue(command.getResourcePath());
166+
final Predicate<JsonField> fieldPredicate = calculatePredicate(command.getResourcePath(),
167+
entity.getEntityId().map(EntityId::getEntityType).orElse(null), true);
168+
final Optional<JsonValue> previousValue = entity.toJson(JsonSchemaVersion.LATEST, fieldPredicate)
169+
.getValue(command.getResourcePath());
158170
if (newValue.isObject()) {
159171
final JsonObject newObject;
160172
if (command.getResourcePath().isEmpty()) {
161173
newObject = newValue.asObject()
162174
.stream()
163-
.filter(calculatePredicateForNew(command.getResourcePath()))
175+
.filter(calculatePredicateForNew(
176+
command.getResourcePath(),
177+
entity.getEntityId().map(EntityId::getEntityType).orElse(null)
178+
))
164179
.collect(JsonCollectors.fieldsToObject());
165180
} else {
166181
newObject = newValue.asObject();
@@ -240,7 +255,8 @@ private C adjustMergeCommandByOnlyKeepingChanges(final C command,
240255

241256
return withOptionalEntity.getEntity()
242257
.map(newValue -> {
243-
final Predicate<JsonField> fieldPredicate = calculatePredicate(command.getResourcePath());
258+
final Predicate<JsonField> fieldPredicate = calculatePredicate(command.getResourcePath(),
259+
entity.getEntityId().map(EntityId::getEntityType).orElse(null), true);
244260
final JsonValue oldValue = entity.toJson(JsonSchemaVersion.LATEST, fieldPredicate)
245261
.getValue(command.getResourcePath()).orElse(null);
246262
if (null == oldValue) {
@@ -259,22 +275,33 @@ private C adjustMergeCommandByOnlyKeepingChanges(final C command,
259275
.orElse(command);
260276
}
261277

262-
private static Predicate<JsonField> calculatePredicate(final JsonPointer resourcePath) {
278+
private static Predicate<JsonField> calculatePredicate(final JsonPointer resourcePath,
279+
@Nullable final EntityType entityType,
280+
final boolean onlyNonHiddenFields
281+
) {
282+
final Predicate<JsonField> start = onlyNonHiddenFields ? FieldType.notHidden() : FieldType.all();
263283
if (resourcePath.isEmpty()) {
264-
return FieldType.notHidden()
265-
.and(Predicate.not(jsonField -> jsonField.getKey().equals(JsonKey.of("thingId"))))
266-
.and(Predicate.not(jsonField -> jsonField.getKey().equals(JsonKey.of("policyId"))));
284+
if (THING_ENTITY_TYPE.equals(entityType)) {
285+
return start.and(Predicate.not(jsonField -> jsonField.getKey().equals(JsonKey.of("thingId"))));
286+
} else if (POLICY_ENTITY_TYPE.equals(entityType)) {
287+
return start.and(Predicate.not(jsonField -> jsonField.getKey().equals(JsonKey.of("policyId"))));
288+
} else {
289+
return start;
290+
}
267291
} else {
268-
return FieldType.notHidden();
292+
return start;
269293
}
270294
}
271295

272-
private static Predicate<JsonField> calculatePredicateForNew(final JsonPointer resourcePath) {
296+
private static Predicate<JsonField> calculatePredicateForNew(final JsonPointer resourcePath,
297+
@Nullable final EntityType entityType
298+
) {
273299
if (resourcePath.isEmpty()) {
274-
// filter "special fields" for e.g. on thing level the inline "_policy":
275-
return jsonField -> !jsonField.getKeyName().startsWith("_");
300+
return calculatePredicate(resourcePath, entityType, false)
301+
// filter "special fields" for e.g. on thing level the inline "_policy":
302+
.and(jsonField -> !jsonField.getKeyName().startsWith("_"));
276303
} else {
277-
return jsonField -> true;
304+
return calculatePredicate(resourcePath, entityType, false);
278305
}
279306
}
280307

0 commit comments

Comments
 (0)