Skip to content

Commit e9087c6

Browse files
committed
#1931: fix that not all placeholders were supported in connection target filtering
1 parent f2b8dc0 commit e9087c6

6 files changed

Lines changed: 107 additions & 18 deletions

File tree

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collection;
2323
import java.util.Collections;
2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.Objects;
2627
import java.util.Optional;
2728
import java.util.Set;
@@ -86,7 +87,10 @@
8687
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
8788
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
8889
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
90+
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
91+
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
8992
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
93+
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
9094
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
9195
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
9296
import org.eclipse.ditto.internal.utils.pekko.controlflow.AbstractGraphActor;
@@ -99,6 +103,7 @@
99103
import org.eclipse.ditto.json.JsonPointer;
100104
import org.eclipse.ditto.json.JsonValue;
101105
import org.eclipse.ditto.placeholders.ExpressionResolver;
106+
import org.eclipse.ditto.placeholders.HeadersPlaceholder;
102107
import org.eclipse.ditto.placeholders.PipelineElement;
103108
import org.eclipse.ditto.placeholders.PlaceholderFactory;
104109
import org.eclipse.ditto.placeholders.PlaceholderResolver;
@@ -138,9 +143,13 @@ public final class OutboundMappingProcessorActor
138143

139144
private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
140145
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
146+
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
147+
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
148+
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
141149
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
142150
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
143151
private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance();
152+
private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();
144153

145154
private final ActorRef clientActor;
146155
private final Connection connection;
@@ -787,22 +796,37 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
787796
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
788797
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver = PlaceholderFactory
789798
.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
799+
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
800+
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
801+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
802+
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
803+
.newPlaceholderResolver(THING_PLACEHOLDER,
804+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
805+
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
806+
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
790807
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
791808
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
792809
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
793810
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
811+
final PlaceholderResolver<Map<String, String>> headersPlaceholderResolver = PlaceholderFactory
812+
.newPlaceholderResolver(HEADERS_PLACEHOLDER, signal.getDittoHeaders());
794813
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
795814
final Criteria criteria = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
796-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
815+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
816+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver,
817+
headersPlaceholderResolver
797818
).filterCriteria(filter.get(), dittoHeaders);
798819
return outboundSignalWithExtra.getExtra()
799820
.flatMap(extra -> ThingEventToThingConverter
800821
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
801822
.filter(thing -> {
802-
final PlaceholderResolver<Thing> thingPlaceholderResolver = PlaceholderFactory
823+
final PlaceholderResolver<Thing> thingJsonPlaceholderResolver = PlaceholderFactory
803824
.newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
804825
return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
805-
resourcePlaceholderResolver, timePlaceholderResolver, thingPlaceholderResolver)
826+
entityIdPlaceholderResolver, thingPlaceholderResolver,
827+
featurePlaceholderResolver, resourcePlaceholderResolver,
828+
timePlaceholderResolver, headersPlaceholderResolver,
829+
thingJsonPlaceholderResolver)
806830
.test(thing);
807831
})
808832
.map(thing -> outboundSignalWithExtra))

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilter.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
import java.util.Collections;
1919
import java.util.List;
20+
import java.util.Map;
2021
import java.util.Optional;
2122
import java.util.Set;
2223
import java.util.function.Predicate;
2324

2425
import javax.annotation.Nullable;
2526

2627
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
28+
import org.eclipse.ditto.base.model.entity.id.EntityId;
2729
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
2830
import org.eclipse.ditto.base.model.headers.DittoHeaders;
2931
import org.eclipse.ditto.base.model.namespaces.NamespaceReader;
@@ -39,10 +41,14 @@
3941
import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectivityAnnouncement;
4042
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
4143
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
44+
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
45+
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
46+
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
4247
import org.eclipse.ditto.json.JsonFieldSelector;
4348
import org.eclipse.ditto.json.JsonPointer;
4449
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
4550
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;
51+
import org.eclipse.ditto.placeholders.HeadersPlaceholder;
4652
import org.eclipse.ditto.placeholders.PlaceholderFactory;
4753
import org.eclipse.ditto.placeholders.PlaceholderResolver;
4854
import org.eclipse.ditto.placeholders.TimePlaceholder;
@@ -70,8 +76,12 @@ public final class SignalFilter {
7076

7177
private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
7278
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
79+
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
80+
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
81+
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
7382
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
7483
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
84+
private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();
7585

7686
private final Connection connection;
7787
private final ConnectionMonitorRegistry<ConnectionMonitor> connectionMonitorRegistry;
@@ -165,24 +175,40 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere
165175
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
166176
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver =
167177
PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
178+
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
179+
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
180+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
181+
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
182+
.newPlaceholderResolver(THING_PLACEHOLDER,
183+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
184+
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
185+
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
168186
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
169187
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
170188
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
171189
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
190+
final PlaceholderResolver<Map<String, String>> headersPlaceholderResolver = PlaceholderFactory
191+
.newPlaceholderResolver(HEADERS_PLACEHOLDER, signal.getDittoHeaders());
172192
final Criteria criteria = parseCriteria(filterOptional.get(), signal.getDittoHeaders(),
173-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
193+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
194+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver,
195+
headersPlaceholderResolver);
174196
final Set<JsonPointer> extraFields = filteredTopic.getExtraFields()
175197
.map(JsonFieldSelector::getPointers)
176198
.orElse(Collections.emptySet());
177199
if (signal instanceof ThingEvent) {
178200
return ThingEventToThingConverter.thingEventToThing((ThingEvent<?>) signal)
179201
.filter(thing -> Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, thing,
180-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver))
202+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
203+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver,
204+
headersPlaceholderResolver))
181205
.isPresent();
182206
} else {
183207
final Thing emptyThing = Thing.newBuilder().build();
184208
return Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, emptyThing,
185-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
209+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
210+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver,
211+
headersPlaceholderResolver);
186212
}
187213
} else {
188214
return true;

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/ConnectionValidator.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@
2626

2727
import javax.annotation.concurrent.Immutable;
2828

29+
import org.apache.pekko.actor.ActorSystem;
30+
import org.apache.pekko.event.LoggingAdapter;
2931
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
3032
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
3133
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
3234
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
3335
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
3436
import org.eclipse.ditto.base.model.headers.DittoHeaders;
35-
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
3637
import org.eclipse.ditto.connectivity.model.ClientCertificateCredentials;
3738
import org.eclipse.ditto.connectivity.model.Connection;
3839
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
@@ -52,6 +53,10 @@
5253
import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig;
5354
import org.eclipse.ditto.connectivity.service.messaging.internal.ssl.SSLContextCreator;
5455
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
56+
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
57+
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
58+
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
59+
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
5560
import org.eclipse.ditto.placeholders.ExpressionResolver;
5661
import org.eclipse.ditto.placeholders.PlaceholderFactory;
5762
import org.eclipse.ditto.placeholders.TimePlaceholder;
@@ -60,9 +65,6 @@
6065
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
6166
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
6267

63-
import org.apache.pekko.actor.ActorSystem;
64-
import org.apache.pekko.event.LoggingAdapter;
65-
6668
/**
6769
* Validate a connection according to its type.
6870
*/
@@ -83,7 +85,9 @@ private ConnectionValidator(LoggingAdapter loggingAdapter,
8385
.collect(Collectors.toMap(AbstractProtocolValidator::type, Function.identity()));
8486
this.specMap = Collections.unmodifiableMap(theSpecMap);
8587
queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
86-
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance());
88+
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance(),
89+
EntityIdPlaceholder.getInstance(), ThingPlaceholder.getInstance(), FeaturePlaceholder.getInstance(),
90+
PlaceholderFactory.newHeadersPlaceholder());
8791
}
8892

8993
/**

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/TestConstants.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import org.eclipse.ditto.things.model.ThingId;
143143
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;
144144
import org.eclipse.ditto.things.model.signals.events.FeatureDesiredPropertiesModified;
145+
import org.eclipse.ditto.things.model.signals.events.FeaturePropertiesModified;
145146
import org.eclipse.ditto.things.model.signals.events.ThingModified;
146147
import org.eclipse.ditto.things.model.signals.events.ThingModifiedEvent;
147148
import org.mockito.Mockito;
@@ -331,6 +332,8 @@ public static final class Things {
331332
public static final class Feature {
332333

333334
public static final String FEATURE_ID = "Feature";
335+
public static final FeatureProperties FEATURE_PROPERTIES = FeatureProperties.newBuilder()
336+
.set("property", "test").build();
334337
public static final FeatureProperties FEATURE_DESIRED_PROPERTIES = FeatureProperties.newBuilder()
335338
.set("property", "test").build();
336339

@@ -996,8 +999,14 @@ public static ThingModifiedEvent<?> thingModified(final Collection<Authorization
996999
TestConstants.INSTANT, dittoHeaders, TestConstants.METADATA);
9971000
}
9981001

1002+
public static ThingModifiedEvent<?> featurePropertiesModified(Collection<AuthorizationSubject> readSubjects) {
1003+
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
1004+
return FeaturePropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
1005+
Feature.FEATURE_PROPERTIES, 1, null, dittoHeaders, null);
1006+
}
1007+
9991008
public static ThingModifiedEvent<?> featureDesiredPropertiesModified(Collection<AuthorizationSubject> readSubjects) {
1000-
DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
1009+
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
10011010
return FeatureDesiredPropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
10021011
Feature.FEATURE_DESIRED_PROPERTIES, 1, null, dittoHeaders, null);
10031012
}

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilterWithFilterTest.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -313,16 +313,36 @@ public void applySignalFilterForMessagesWithExtraFieldsAndRqlFilter() {
313313
*/
314314
@Test
315315
public void applySignalFilterOnFeatureDesiredPropertiesModified() {
316-
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
316+
final Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
317317
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
318318
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
319319
.withFilter("like(resource:path,'/features/" + TestConstants.Feature.FEATURE_ID + "*')")
320320
.build()).build();
321-
Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
322-
SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
323-
Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));
321+
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
322+
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
323+
final Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));
324+
325+
final List<Target> filteredTargets = signalFilter.filter(signal);
326+
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
327+
}
328+
329+
/**
330+
* Test that target filtering works using feature:id placeholder
331+
*/
332+
@Test
333+
public void applySignalFilterWithFeatureIdPlaceholder() {
334+
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
335+
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
336+
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
337+
.withFilter("eq(feature:id,'Feature')")
338+
.build()
339+
)
340+
.build();
341+
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
342+
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
343+
final Signal<?> signal = TestConstants.featurePropertiesModified(Collections.singletonList(AUTHORIZED));
324344

325-
List<Target> filteredTargets = signalFilter.filter(signal);
345+
final List<Target> filteredTargets = signalFilter.filter(signal);
326346
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
327347
}
328348
}

0 commit comments

Comments
 (0)