Skip to content

Commit 335a26c

Browse files
authored
Merge pull request #1900 from eclipse-ditto/bugfix/1894-pubsub-add-namespaces-to-topic
#1894 optimize Ditto internal pub/sub by adding subscribed for namespaces to topic
2 parents 591e7a0 + b4c6d58 commit 335a26c

File tree

12 files changed

+132
-53
lines changed

12 files changed

+132
-53
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
7575
import org.eclipse.ditto.base.model.acks.FatalPubSubException;
7676
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
77-
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
7877
import org.eclipse.ditto.base.model.entity.id.EntityId;
7978
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
8079
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
@@ -146,6 +145,7 @@
146145
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
147146
import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider;
148147
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
148+
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
149149
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
150150
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
151151
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
@@ -181,7 +181,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
181181
private static final Pattern EXCLUDED_ADDRESS_REPORTING_CHILD_NAME_PATTERN = Pattern.compile(
182182
OutboundMappingProcessorActor.ACTOR_NAME + "|" + OutboundDispatchingActor.ACTOR_NAME + "|" +
183183
"ackr.*" + "|" + "StreamSupervisor-.*|" +
184-
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
184+
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
185185
private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";
186186

187187
private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
@@ -330,7 +330,8 @@ protected void init() {
330330
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
331331
subscriptionManager =
332332
startSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
333-
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
333+
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection,
334+
connectivityConfig().getClientConfig());
334335

335336
if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
336337
tunnelActor = childActorNanny.startChildActor(SshTunnelActor.ACTOR_NAME,
@@ -1924,7 +1925,8 @@ private ActorRef startSubscriptionManager(final ActorSelection proxyActor, final
19241925
*
19251926
* @return reference of the streaming subscription manager.
19261927
*/
1927-
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor, final ClientConfig clientConfig) {
1928+
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor,
1929+
final ClientConfig clientConfig) {
19281930
final var mat = Materializer.createMaterializer(this::getContext);
19291931
final var props = StreamingSubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(),
19301932
proxyActor, mat);
@@ -2098,9 +2100,14 @@ private String getPubsubGroup() {
20982100
private Set<String> getTargetAuthSubjects() {
20992101
return connection.getTargets()
21002102
.stream()
2101-
.map(Target::getAuthorizationContext)
2102-
.map(AuthorizationContext::getAuthorizationSubjectIds)
2103-
.flatMap(List::stream)
2103+
.map(target -> {
2104+
final Set<String> namespaces = target.getTopics().stream()
2105+
.flatMap(ft -> ft.getNamespaces().stream())
2106+
.collect(Collectors.toSet());
2107+
return ReadSubjectExtractor
2108+
.determineTopicsFor(namespaces, target.getAuthorizationContext().getAuthorizationSubjects());
2109+
})
2110+
.flatMap(Collection::stream)
21042111
.collect(Collectors.toSet());
21052112
}
21062113

gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,9 @@ private Route buildThingsSseRoute(final RequestContext ctx,
297297
jsonPointerString -> {
298298
if (INBOX_OUTBOX_PATTERN.matcher(jsonPointerString).matches()) {
299299
return createMessagesSseRoute(ctx, dhcs, thingId,
300-
jsonPointerString);
300+
jsonPointerString,
301+
getNamespaces(parameters.get(PARAM_NAMESPACES))
302+
);
301303
} else {
302304
params.put(PARAM_FIELDS, jsonPointerString);
303305
return createSseRoute(ctx, dhcs,
@@ -428,7 +430,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
428430
connectionCorrelationId, dittoHeaders);
429431
final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
430432
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
431-
authorizationContext, null);
433+
authorizationContext, namespaces, null);
432434
Patterns.ask(streamingActor, connect, LOCAL_ASK_TIMEOUT)
433435
.thenApply(ActorRef.class::cast)
434436
.thenAccept(streamingSessionActor ->
@@ -459,7 +461,8 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
459461
private Route createMessagesSseRoute(final RequestContext ctx,
460462
final CompletionStage<DittoHeaders> dittoHeadersStage,
461463
final String thingId,
462-
final String messagePath) {
464+
final String messagePath,
465+
final List<String> namespaces) {
463466

464467
final List<ThingId> targetThingIds = List.of(ThingId.of(thingId));
465468
final CompletionStage<SignalEnrichmentFacade> facadeStage = signalEnrichmentProvider == null
@@ -490,7 +493,7 @@ private Route createMessagesSseRoute(final RequestContext ctx,
490493
final var connect =
491494
new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
492495
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
493-
authorizationContext, null);
496+
authorizationContext, namespaces, null);
494497
final String resourcePathRqlStatement;
495498
if (INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(messagePath).matches()) {
496499
resourcePathRqlStatement = String.format(

gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.time.Duration;
1919
import java.util.Collection;
2020
import java.util.Collections;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Optional;
2324
import java.util.Set;
@@ -575,7 +576,7 @@ private Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgo
575576
return new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_WS,
576577
version, optJsonWebToken.map(JsonWebToken::getExpirationTime).orElse(null),
577578
readDeclaredAcknowledgementLabels(additionalHeaders), connectionAuthContext,
578-
wsKillSwitch);
579+
List.of(), wsKillSwitch);
579580
})
580581
.recoverWithRetries(1, new PFBuilder<Throwable, Source<SessionedJsonifiable, NotUsed>>()
581582
.match(GatewayWebsocketSessionAbortedException.class,

gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
8989
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
9090
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
91+
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
9192
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
9293
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
9394
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
@@ -138,6 +139,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
138139
private final Set<AcknowledgementLabel> declaredAcks;
139140
private final ThreadSafeDittoLoggingAdapter logger;
140141
private AuthorizationContext authorizationContext;
142+
private List<String> namespaces;
141143

142144
private Cancellable cancellableShutdownTask;
143145
@Nullable private final KillSwitch killSwitch;
@@ -164,6 +166,7 @@ private StreamingSessionActor(final Connect connect,
164166
this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
165167
outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
166168
authorizationContext = connect.getConnectionAuthContext();
169+
namespaces = connect.getNamespaces();
167170
killSwitch = connect.getKillSwitch().orElse(null);
168171
streamingSessions = new EnumMap<>(StreamingType.class);
169172
ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(),
@@ -373,6 +376,7 @@ private Receive createPubSubBehavior() {
373376
})
374377
.match(StartStreaming.class, startStreaming -> {
375378
authorizationContext = startStreaming.getAuthorizationContext();
379+
namespaces = startStreaming.getNamespaces();
376380
Criteria criteria;
377381
try {
378382
criteria = startStreaming.getFilter()
@@ -399,7 +403,10 @@ private Receive createPubSubBehavior() {
399403
final var subscribeConfirmation = new ConfirmSubscription(startStreaming.getStreamingType());
400404
final Collection<StreamingType> currentStreamingTypes = streamingSessions.keySet();
401405
dittoProtocolSub.subscribe(currentStreamingTypes,
402-
authorizationContext.getAuthorizationSubjectIds(),
406+
ReadSubjectExtractor.determineTopicsFor(
407+
startStreaming.getNamespaces(),
408+
authorizationContext.getAuthorizationSubjects()
409+
),
403410
getSelf()
404411
).whenComplete((ack, throwable) -> {
405412
if (null == throwable) {
@@ -444,7 +451,12 @@ private Receive createPubSubBehavior() {
444451
case LIVE_COMMANDS, LIVE_EVENTS, MESSAGES:
445452
default:
446453
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
447-
authorizationContext.getAuthorizationSubjectIds(), getSelf())
454+
ReadSubjectExtractor.determineTopicsFor(
455+
namespaces,
456+
authorizationContext.getAuthorizationSubjects()
457+
),
458+
getSelf()
459+
)
448460
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
449461
}
450462
})
@@ -817,7 +829,11 @@ private void startSubscriptionRefreshTimer() {
817829

818830
private void resubscribe(final Control trigger) {
819831
if (!streamingSessions.isEmpty() && outstandingSubscriptionAcks.isEmpty()) {
820-
dittoProtocolSub.subscribe(streamingSessions.keySet(), authorizationContext.getAuthorizationSubjectIds(),
832+
dittoProtocolSub.subscribe(streamingSessions.keySet(),
833+
ReadSubjectExtractor.determineTopicsFor(
834+
namespaces,
835+
authorizationContext.getAuthorizationSubjects()
836+
),
821837
getSelf(), null, true);
822838
}
823839
startSubscriptionRefreshTimer();

gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/signals/Connect.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,20 @@
1515
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
1616

1717
import java.time.Instant;
18+
import java.util.List;
1819
import java.util.Objects;
1920
import java.util.Optional;
2021
import java.util.Set;
2122

2223
import javax.annotation.Nullable;
2324

25+
import org.apache.pekko.stream.KillSwitch;
26+
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
2427
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
2528
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
2629
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
2730
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
2831

29-
import org.apache.pekko.stream.KillSwitch;
30-
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
31-
3232
/**
3333
* Message to be sent in order to establish a new "streaming" connection via {@link org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor}.
3434
*/
@@ -41,6 +41,7 @@ public final class Connect {
4141
@Nullable private final Instant sessionExpirationTime;
4242
private final Set<AcknowledgementLabel> declaredAcknowledgementLabels;
4343
private final AuthorizationContext connectionAuthContext;
44+
private final List<String> namespaces;
4445
@Nullable private final KillSwitch killSwitch;
4546

4647
/**
@@ -53,6 +54,7 @@ public final class Connect {
5354
* @param sessionExpirationTime how long to keep the session alive when idling.
5455
* @param declaredAcknowledgementLabels labels of acknowledgements this session may send.
5556
* @param connectionAuthContext the authorizationContext of the streaming session.
57+
* @param namespaces the namespaces to subscribe to in the streaming session (if already known).
5658
* @param killSwitch the kill switch to terminate the streaming session.
5759
*/
5860
public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher,
@@ -62,6 +64,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
6264
@Nullable final Instant sessionExpirationTime,
6365
final Set<AcknowledgementLabel> declaredAcknowledgementLabels,
6466
final AuthorizationContext connectionAuthContext,
67+
final List<String> namespaces,
6568
@Nullable final KillSwitch killSwitch) {
6669
this.eventAndResponsePublisher = eventAndResponsePublisher;
6770
this.connectionCorrelationId = checkNotNull(connectionCorrelationId, "connectionCorrelationId")
@@ -71,6 +74,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
7174
this.sessionExpirationTime = sessionExpirationTime;
7275
this.declaredAcknowledgementLabels = declaredAcknowledgementLabels;
7376
this.connectionAuthContext = connectionAuthContext;
77+
this.namespaces = namespaces;
7478
this.killSwitch = killSwitch;
7579
}
7680

@@ -102,6 +106,10 @@ public AuthorizationContext getConnectionAuthContext() {
102106
return connectionAuthContext;
103107
}
104108

109+
public List<String> getNamespaces() {
110+
return namespaces;
111+
}
112+
105113
public Optional<KillSwitch> getKillSwitch() {
106114
return Optional.ofNullable(killSwitch);
107115
}
@@ -121,13 +129,14 @@ public boolean equals(final Object o) {
121129
Objects.equals(sessionExpirationTime, connect.sessionExpirationTime) &&
122130
Objects.equals(declaredAcknowledgementLabels, connect.declaredAcknowledgementLabels) &&
123131
Objects.equals(connectionAuthContext, connect.connectionAuthContext) &&
132+
Objects.equals(namespaces, connect.namespaces) &&
124133
Objects.equals(killSwitch, connect.killSwitch);
125134
}
126135

127136
@Override
128137
public int hashCode() {
129138
return Objects.hash(eventAndResponsePublisher, connectionCorrelationId, type, sessionExpirationTime,
130-
declaredAcknowledgementLabels, connectionAuthContext, killSwitch);
139+
declaredAcknowledgementLabels, connectionAuthContext, namespaces, killSwitch);
131140
}
132141

133142
@Override
@@ -139,6 +148,7 @@ public String toString() {
139148
", sessionExpirationTime=" + sessionExpirationTime +
140149
", declaredAcknowledgementLabels=" + declaredAcknowledgementLabels +
141150
", connectionAuthContext=" + connectionAuthContext +
151+
", namespaces=" + namespaces +
142152
", killSwitch=" + killSwitch +
143153
"]";
144154
}

gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorHeaderInteractionTest.java

+14-13
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.stream.Collectors;
2727

28+
import org.apache.pekko.actor.AbstractActor;
29+
import org.apache.pekko.actor.ActorRef;
30+
import org.apache.pekko.actor.ActorSystem;
31+
import org.apache.pekko.actor.Props;
32+
import org.apache.pekko.japi.pf.ReceiveBuilder;
33+
import org.apache.pekko.stream.Attributes;
34+
import org.apache.pekko.stream.OverflowStrategy;
35+
import org.apache.pekko.stream.javadsl.Keep;
36+
import org.apache.pekko.stream.javadsl.Sink;
37+
import org.apache.pekko.stream.javadsl.Source;
38+
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
39+
import org.apache.pekko.testkit.TestProbe;
40+
import org.apache.pekko.testkit.javadsl.TestKit;
2841
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
2942
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
3043
import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory;
@@ -61,19 +74,6 @@
6174

6275
import com.typesafe.config.ConfigFactory;
6376

64-
import org.apache.pekko.actor.AbstractActor;
65-
import org.apache.pekko.actor.ActorRef;
66-
import org.apache.pekko.actor.ActorSystem;
67-
import org.apache.pekko.actor.Props;
68-
import org.apache.pekko.japi.pf.ReceiveBuilder;
69-
import org.apache.pekko.stream.Attributes;
70-
import org.apache.pekko.stream.OverflowStrategy;
71-
import org.apache.pekko.stream.javadsl.Keep;
72-
import org.apache.pekko.stream.javadsl.Sink;
73-
import org.apache.pekko.stream.javadsl.Source;
74-
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
75-
import org.apache.pekko.testkit.TestProbe;
76-
import org.apache.pekko.testkit.javadsl.TestKit;
7777
import scala.concurrent.duration.FiniteDuration;
7878

7979
/**
@@ -196,6 +196,7 @@ private ActorRef createStreamingSessionActor() {
196196
final Connect connect =
197197
new Connect(sourceQueue, "connectionCorrelationId", "ws",
198198
JsonSchemaVersion.V_2, null, Set.of(), AuthorizationModelFactory.emptyAuthContext(),
199+
List.of(),
199200
null);
200201
final Props props = StreamingSessionActor.props(connect, dittoProtocolSub, commandRouterProbe.ref(),
201202
DefaultStreamingConfig.of(ConfigFactory.empty()), HeaderTranslator.empty(),

gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ private Connect getConnect(final Set<AcknowledgementLabel> declaredAcks) {
433433
null,
434434
declaredAcks,
435435
authorizationContext,
436+
List.of(),
436437
killSwitch);
437438
}
438439

0 commit comments

Comments
 (0)