Skip to content

Commit a98c611

Browse files
committed
#1931: fix that not all placeholders were supported in connection target filtering
1 parent f2b8dc0 commit a98c611

File tree

7 files changed

+113
-23
lines changed

7 files changed

+113
-23
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@
8686
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
8787
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
8888
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
89+
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
90+
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
8991
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
92+
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
9093
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
9194
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
9295
import org.eclipse.ditto.internal.utils.pekko.controlflow.AbstractGraphActor;
@@ -99,6 +102,7 @@
99102
import org.eclipse.ditto.json.JsonPointer;
100103
import org.eclipse.ditto.json.JsonValue;
101104
import org.eclipse.ditto.placeholders.ExpressionResolver;
105+
import org.eclipse.ditto.placeholders.HeadersPlaceholder;
102106
import org.eclipse.ditto.placeholders.PipelineElement;
103107
import org.eclipse.ditto.placeholders.PlaceholderFactory;
104108
import org.eclipse.ditto.placeholders.PlaceholderResolver;
@@ -138,9 +142,13 @@ public final class OutboundMappingProcessorActor
138142

139143
private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
140144
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
145+
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
146+
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
147+
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
141148
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
142149
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
143150
private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance();
151+
private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();
144152

145153
private final ActorRef clientActor;
146154
private final Connection connection;
@@ -787,22 +795,33 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
787795
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
788796
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver = PlaceholderFactory
789797
.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
798+
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
799+
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
800+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
801+
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
802+
.newPlaceholderResolver(THING_PLACEHOLDER,
803+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
804+
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
805+
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
790806
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
791807
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
792808
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
793809
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
794810
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
795811
final Criteria criteria = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
796-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
812+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
813+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
797814
).filterCriteria(filter.get(), dittoHeaders);
798815
return outboundSignalWithExtra.getExtra()
799816
.flatMap(extra -> ThingEventToThingConverter
800817
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
801818
.filter(thing -> {
802-
final PlaceholderResolver<Thing> thingPlaceholderResolver = PlaceholderFactory
819+
final PlaceholderResolver<Thing> thingJsonPlaceholderResolver = PlaceholderFactory
803820
.newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
804821
return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
805-
resourcePlaceholderResolver, timePlaceholderResolver, thingPlaceholderResolver)
822+
entityIdPlaceholderResolver, thingPlaceholderResolver,
823+
featurePlaceholderResolver, resourcePlaceholderResolver,
824+
timePlaceholderResolver, thingJsonPlaceholderResolver)
806825
.test(thing);
807826
})
808827
.map(thing -> outboundSignalWithExtra))

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilter.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import javax.annotation.Nullable;
2525

2626
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
27+
import org.eclipse.ditto.base.model.entity.id.EntityId;
2728
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
2829
import org.eclipse.ditto.base.model.headers.DittoHeaders;
2930
import org.eclipse.ditto.base.model.namespaces.NamespaceReader;
@@ -39,6 +40,9 @@
3940
import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectivityAnnouncement;
4041
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
4142
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
43+
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
44+
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
45+
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
4246
import org.eclipse.ditto.json.JsonFieldSelector;
4347
import org.eclipse.ditto.json.JsonPointer;
4448
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
@@ -70,6 +74,9 @@ public final class SignalFilter {
7074

7175
private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
7276
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
77+
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
78+
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
79+
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
7380
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
7481
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
7582

@@ -165,24 +172,35 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere
165172
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
166173
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver =
167174
PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
175+
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
176+
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
177+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
178+
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
179+
.newPlaceholderResolver(THING_PLACEHOLDER,
180+
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
181+
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
182+
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
168183
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
169184
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
170185
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
171186
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
172187
final Criteria criteria = parseCriteria(filterOptional.get(), signal.getDittoHeaders(),
173-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
188+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
189+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
174190
final Set<JsonPointer> extraFields = filteredTopic.getExtraFields()
175191
.map(JsonFieldSelector::getPointers)
176192
.orElse(Collections.emptySet());
177193
if (signal instanceof ThingEvent) {
178194
return ThingEventToThingConverter.thingEventToThing((ThingEvent<?>) signal)
179195
.filter(thing -> Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, thing,
180-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver))
196+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
197+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver))
181198
.isPresent();
182199
} else {
183200
final Thing emptyThing = Thing.newBuilder().build();
184201
return Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, emptyThing,
185-
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
202+
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
203+
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
186204
}
187205
} else {
188206
return true;

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/ConnectionValidator.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@
2626

2727
import javax.annotation.concurrent.Immutable;
2828

29+
import org.apache.pekko.actor.ActorSystem;
30+
import org.apache.pekko.event.LoggingAdapter;
2931
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
3032
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
3133
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
3234
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
3335
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
3436
import org.eclipse.ditto.base.model.headers.DittoHeaders;
35-
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
3637
import org.eclipse.ditto.connectivity.model.ClientCertificateCredentials;
3738
import org.eclipse.ditto.connectivity.model.Connection;
3839
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
@@ -52,6 +53,10 @@
5253
import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig;
5354
import org.eclipse.ditto.connectivity.service.messaging.internal.ssl.SSLContextCreator;
5455
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
56+
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
57+
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
58+
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
59+
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
5560
import org.eclipse.ditto.placeholders.ExpressionResolver;
5661
import org.eclipse.ditto.placeholders.PlaceholderFactory;
5762
import org.eclipse.ditto.placeholders.TimePlaceholder;
@@ -60,9 +65,6 @@
6065
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
6166
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
6267

63-
import org.apache.pekko.actor.ActorSystem;
64-
import org.apache.pekko.event.LoggingAdapter;
65-
6668
/**
6769
* Validate a connection according to its type.
6870
*/
@@ -83,7 +85,8 @@ private ConnectionValidator(LoggingAdapter loggingAdapter,
8385
.collect(Collectors.toMap(AbstractProtocolValidator::type, Function.identity()));
8486
this.specMap = Collections.unmodifiableMap(theSpecMap);
8587
queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
86-
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance());
88+
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance(),
89+
EntityIdPlaceholder.getInstance(), ThingPlaceholder.getInstance(), FeaturePlaceholder.getInstance());
8790
}
8891

8992
/**

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/TestConstants.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import org.eclipse.ditto.things.model.ThingId;
143143
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;
144144
import org.eclipse.ditto.things.model.signals.events.FeatureDesiredPropertiesModified;
145+
import org.eclipse.ditto.things.model.signals.events.FeaturePropertiesModified;
145146
import org.eclipse.ditto.things.model.signals.events.ThingModified;
146147
import org.eclipse.ditto.things.model.signals.events.ThingModifiedEvent;
147148
import org.mockito.Mockito;
@@ -331,6 +332,8 @@ public static final class Things {
331332
public static final class Feature {
332333

333334
public static final String FEATURE_ID = "Feature";
335+
public static final FeatureProperties FEATURE_PROPERTIES = FeatureProperties.newBuilder()
336+
.set("property", "test").build();
334337
public static final FeatureProperties FEATURE_DESIRED_PROPERTIES = FeatureProperties.newBuilder()
335338
.set("property", "test").build();
336339

@@ -996,8 +999,14 @@ public static ThingModifiedEvent<?> thingModified(final Collection<Authorization
996999
TestConstants.INSTANT, dittoHeaders, TestConstants.METADATA);
9971000
}
9981001

1002+
public static ThingModifiedEvent<?> featurePropertiesModified(Collection<AuthorizationSubject> readSubjects) {
1003+
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
1004+
return FeaturePropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
1005+
Feature.FEATURE_PROPERTIES, 1, null, dittoHeaders, null);
1006+
}
1007+
9991008
public static ThingModifiedEvent<?> featureDesiredPropertiesModified(Collection<AuthorizationSubject> readSubjects) {
1000-
DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
1009+
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
10011010
return FeatureDesiredPropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
10021011
Feature.FEATURE_DESIRED_PROPERTIES, 1, null, dittoHeaders, null);
10031012
}

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilterWithFilterTest.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -313,16 +313,36 @@ public void applySignalFilterForMessagesWithExtraFieldsAndRqlFilter() {
313313
*/
314314
@Test
315315
public void applySignalFilterOnFeatureDesiredPropertiesModified() {
316-
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
316+
final Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
317317
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
318318
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
319319
.withFilter("like(resource:path,'/features/" + TestConstants.Feature.FEATURE_ID + "*')")
320320
.build()).build();
321-
Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
322-
SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
323-
Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));
321+
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
322+
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
323+
final Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));
324+
325+
final List<Target> filteredTargets = signalFilter.filter(signal);
326+
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
327+
}
328+
329+
/**
330+
* Test that target filtering works using feature:id placeholder
331+
*/
332+
@Test
333+
public void applySignalFilterWithFeatureIdPlaceholder() {
334+
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
335+
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
336+
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
337+
.withFilter("eq(feature:id,'Feature')")
338+
.build()
339+
)
340+
.build();
341+
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
342+
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
343+
final Signal<?> signal = TestConstants.featurePropertiesModified(Collections.singletonList(AUTHORIZED));
324344

325-
List<Target> filteredTargets = signalFilter.filter(signal);
345+
final List<Target> filteredTargets = signalFilter.filter(signal);
326346
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
327347
}
328348
}

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/validation/ConnectionValidatorTest.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
import java.util.stream.Collectors;
3535
import java.util.stream.IntStream;
3636

37+
import org.apache.pekko.actor.ActorSystem;
38+
import org.apache.pekko.event.LoggingAdapter;
39+
import org.apache.pekko.http.javadsl.model.Uri;
40+
import org.apache.pekko.testkit.javadsl.TestKit;
3741
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
3842
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
3943
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
@@ -75,11 +79,6 @@
7579
import com.typesafe.config.ConfigFactory;
7680
import com.typesafe.config.ConfigValueFactory;
7781

78-
import org.apache.pekko.actor.ActorSystem;
79-
import org.apache.pekko.event.LoggingAdapter;
80-
import org.apache.pekko.http.javadsl.model.Uri;
81-
import org.apache.pekko.testkit.javadsl.TestKit;
82-
8382
/**
8483
* Tests {@link ConnectionValidator}.
8584
*/
@@ -459,6 +458,22 @@ private static Connection createConnection(final ConnectionId connectionId) {
459458
.build();
460459
}
461460

461+
@Test
462+
public void acceptValidConnectionWithValidTargetFilterContainingPlaceholders() {
463+
final List<Target> targetWithValidFilter = singletonList(
464+
ConnectivityModelFactory.newTargetBuilder(TestConstants.Targets.TWIN_TARGET)
465+
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
466+
.withFilter("and(exists(feature:id),eq(thing:namespace,'org.eclipse.ditto'))")
467+
.build())
468+
.build());
469+
final Connection connection = createConnection(CONNECTION_ID)
470+
.toBuilder()
471+
.setTargets(targetWithValidFilter)
472+
.build();
473+
final ConnectionValidator underTest = getConnectionValidator();
474+
underTest.validate(connection, DittoHeaders.empty(), actorSystem);
475+
}
476+
462477
@Test
463478
public void acceptValidConnectionWithValidNumberPayloadMapping() {
464479
final Connection connection = createConnection(CONNECTION_ID)

0 commit comments

Comments
 (0)