Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1894 optimize Ditto internal pub/sub by adding subscribed for namespaces to topic #1900

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -74,7 +74,6 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.FatalPubSubException;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
@@ -146,6 +145,7 @@
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
@@ -181,7 +181,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private static final Pattern EXCLUDED_ADDRESS_REPORTING_CHILD_NAME_PATTERN = Pattern.compile(
OutboundMappingProcessorActor.ACTOR_NAME + "|" + OutboundDispatchingActor.ACTOR_NAME + "|" +
"ackr.*" + "|" + "StreamSupervisor-.*|" +
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";

private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
@@ -330,7 +330,8 @@ protected void init() {
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
subscriptionManager =
startSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection,
connectivityConfig().getClientConfig());

if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
tunnelActor = childActorNanny.startChildActor(SshTunnelActor.ACTOR_NAME,
@@ -1924,7 +1925,8 @@ private ActorRef startSubscriptionManager(final ActorSelection proxyActor, final
*
* @return reference of the streaming subscription manager.
*/
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor, final ClientConfig clientConfig) {
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor,
final ClientConfig clientConfig) {
final var mat = Materializer.createMaterializer(this::getContext);
final var props = StreamingSubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(),
proxyActor, mat);
@@ -2098,9 +2100,14 @@ private String getPubsubGroup() {
private Set<String> getTargetAuthSubjects() {
return connection.getTargets()
.stream()
.map(Target::getAuthorizationContext)
.map(AuthorizationContext::getAuthorizationSubjectIds)
.flatMap(List::stream)
.map(target -> {
final Set<String> namespaces = target.getTopics().stream()
.flatMap(ft -> ft.getNamespaces().stream())
.collect(Collectors.toSet());
return ReadSubjectExtractor
.determineTopicsFor(namespaces, target.getAuthorizationContext().getAuthorizationSubjects());
})
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

Original file line number Diff line number Diff line change
@@ -297,7 +297,9 @@ private Route buildThingsSseRoute(final RequestContext ctx,
jsonPointerString -> {
if (INBOX_OUTBOX_PATTERN.matcher(jsonPointerString).matches()) {
return createMessagesSseRoute(ctx, dhcs, thingId,
jsonPointerString);
jsonPointerString,
getNamespaces(parameters.get(PARAM_NAMESPACES))
);
} else {
params.put(PARAM_FIELDS, jsonPointerString);
return createSseRoute(ctx, dhcs,
@@ -428,7 +430,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
connectionCorrelationId, dittoHeaders);
final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, null);
authorizationContext, namespaces, null);
Patterns.ask(streamingActor, connect, LOCAL_ASK_TIMEOUT)
.thenApply(ActorRef.class::cast)
.thenAccept(streamingSessionActor ->
@@ -459,7 +461,8 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
private Route createMessagesSseRoute(final RequestContext ctx,
final CompletionStage<DittoHeaders> dittoHeadersStage,
final String thingId,
final String messagePath) {
final String messagePath,
final List<String> namespaces) {

final List<ThingId> targetThingIds = List.of(ThingId.of(thingId));
final CompletionStage<SignalEnrichmentFacade> facadeStage = signalEnrichmentProvider == null
@@ -490,7 +493,7 @@ private Route createMessagesSseRoute(final RequestContext ctx,
final var connect =
new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, null);
authorizationContext, namespaces, null);
final String resourcePathRqlStatement;
if (INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(messagePath).matches()) {
resourcePathRqlStatement = String.format(
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -575,7 +576,7 @@ private Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgo
return new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_WS,
version, optJsonWebToken.map(JsonWebToken::getExpirationTime).orElse(null),
readDeclaredAcknowledgementLabels(additionalHeaders), connectionAuthContext,
wsKillSwitch);
List.of(), wsKillSwitch);
})
.recoverWithRetries(1, new PFBuilder<Throwable, Source<SessionedJsonifiable, NotUsed>>()
.match(GatewayWebsocketSessionAbortedException.class,
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
@@ -138,6 +139,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
private final Set<AcknowledgementLabel> declaredAcks;
private final ThreadSafeDittoLoggingAdapter logger;
private AuthorizationContext authorizationContext;
private List<String> namespaces;

private Cancellable cancellableShutdownTask;
@Nullable private final KillSwitch killSwitch;
@@ -164,6 +166,7 @@ private StreamingSessionActor(final Connect connect,
this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
authorizationContext = connect.getConnectionAuthContext();
namespaces = connect.getNamespaces();
killSwitch = connect.getKillSwitch().orElse(null);
streamingSessions = new EnumMap<>(StreamingType.class);
ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(),
@@ -373,6 +376,7 @@ private Receive createPubSubBehavior() {
})
.match(StartStreaming.class, startStreaming -> {
authorizationContext = startStreaming.getAuthorizationContext();
namespaces = startStreaming.getNamespaces();
Criteria criteria;
try {
criteria = startStreaming.getFilter()
@@ -399,7 +403,10 @@ private Receive createPubSubBehavior() {
final var subscribeConfirmation = new ConfirmSubscription(startStreaming.getStreamingType());
final Collection<StreamingType> currentStreamingTypes = streamingSessions.keySet();
dittoProtocolSub.subscribe(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(),
ReadSubjectExtractor.determineTopicsFor(
startStreaming.getNamespaces(),
authorizationContext.getAuthorizationSubjects()
),
getSelf()
).whenComplete((ack, throwable) -> {
if (null == throwable) {
@@ -444,7 +451,12 @@ private Receive createPubSubBehavior() {
case LIVE_COMMANDS, LIVE_EVENTS, MESSAGES:
default:
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(), getSelf())
ReadSubjectExtractor.determineTopicsFor(
namespaces,
authorizationContext.getAuthorizationSubjects()
),
getSelf()
)
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
}
})
@@ -817,7 +829,11 @@ private void startSubscriptionRefreshTimer() {

private void resubscribe(final Control trigger) {
if (!streamingSessions.isEmpty() && outstandingSubscriptionAcks.isEmpty()) {
dittoProtocolSub.subscribe(streamingSessions.keySet(), authorizationContext.getAuthorizationSubjectIds(),
dittoProtocolSub.subscribe(streamingSessions.keySet(),
ReadSubjectExtractor.determineTopicsFor(
namespaces,
authorizationContext.getAuthorizationSubjects()
),
getSelf(), null, true);
}
startSubscriptionRefreshTimer();
Original file line number Diff line number Diff line change
@@ -15,20 +15,20 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import javax.annotation.Nullable;

import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;

import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;

/**
* Message to be sent in order to establish a new "streaming" connection via {@link org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor}.
*/
@@ -41,6 +41,7 @@ public final class Connect {
@Nullable private final Instant sessionExpirationTime;
private final Set<AcknowledgementLabel> declaredAcknowledgementLabels;
private final AuthorizationContext connectionAuthContext;
private final List<String> namespaces;
@Nullable private final KillSwitch killSwitch;

/**
@@ -53,6 +54,7 @@ public final class Connect {
* @param sessionExpirationTime how long to keep the session alive when idling.
* @param declaredAcknowledgementLabels labels of acknowledgements this session may send.
* @param connectionAuthContext the authorizationContext of the streaming session.
* @param namespaces the namespaces to subscribe to in the streaming session (if already known).
* @param killSwitch the kill switch to terminate the streaming session.
*/
public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher,
@@ -62,6 +64,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
@Nullable final Instant sessionExpirationTime,
final Set<AcknowledgementLabel> declaredAcknowledgementLabels,
final AuthorizationContext connectionAuthContext,
final List<String> namespaces,
@Nullable final KillSwitch killSwitch) {
this.eventAndResponsePublisher = eventAndResponsePublisher;
this.connectionCorrelationId = checkNotNull(connectionCorrelationId, "connectionCorrelationId")
@@ -71,6 +74,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
this.sessionExpirationTime = sessionExpirationTime;
this.declaredAcknowledgementLabels = declaredAcknowledgementLabels;
this.connectionAuthContext = connectionAuthContext;
this.namespaces = namespaces;
this.killSwitch = killSwitch;
}

@@ -102,6 +106,10 @@ public AuthorizationContext getConnectionAuthContext() {
return connectionAuthContext;
}

public List<String> getNamespaces() {
return namespaces;
}

public Optional<KillSwitch> getKillSwitch() {
return Optional.ofNullable(killSwitch);
}
@@ -121,13 +129,14 @@ public boolean equals(final Object o) {
Objects.equals(sessionExpirationTime, connect.sessionExpirationTime) &&
Objects.equals(declaredAcknowledgementLabels, connect.declaredAcknowledgementLabels) &&
Objects.equals(connectionAuthContext, connect.connectionAuthContext) &&
Objects.equals(namespaces, connect.namespaces) &&
Objects.equals(killSwitch, connect.killSwitch);
}

@Override
public int hashCode() {
return Objects.hash(eventAndResponsePublisher, connectionCorrelationId, type, sessionExpirationTime,
declaredAcknowledgementLabels, connectionAuthContext, killSwitch);
declaredAcknowledgementLabels, connectionAuthContext, namespaces, killSwitch);
}

@Override
@@ -139,6 +148,7 @@ public String toString() {
", sessionExpirationTime=" + sessionExpirationTime +
", declaredAcknowledgementLabels=" + declaredAcknowledgementLabels +
", connectionAuthContext=" + connectionAuthContext +
", namespaces=" + namespaces +
", killSwitch=" + killSwitch +
"]";
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory;
@@ -61,19 +74,6 @@

import com.typesafe.config.ConfigFactory;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import scala.concurrent.duration.FiniteDuration;

/**
@@ -196,6 +196,7 @@ private ActorRef createStreamingSessionActor() {
final Connect connect =
new Connect(sourceQueue, "connectionCorrelationId", "ws",
JsonSchemaVersion.V_2, null, Set.of(), AuthorizationModelFactory.emptyAuthContext(),
List.of(),
null);
final Props props = StreamingSessionActor.props(connect, dittoProtocolSub, commandRouterProbe.ref(),
DefaultStreamingConfig.of(ConfigFactory.empty()), HeaderTranslator.empty(),
Original file line number Diff line number Diff line change
@@ -433,6 +433,7 @@ private Connect getConnect(final Set<AcknowledgementLabel> declaredAcks) {
null,
declaredAcks,
authorizationContext,
List.of(),
killSwitch);
}

Original file line number Diff line number Diff line change
@@ -19,12 +19,11 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Extension;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;

/**
* Subscriptions for Ditto protocol channels.
@@ -55,7 +54,7 @@ default CompletionStage<Void> subscribe(Collection<StreamingType> types, Collect
* check for resubscriptions.
*/
CompletionStage<Boolean> subscribe(Collection<StreamingType> types, Collection<String> topics, ActorRef subscriber,
@Nullable String group, final boolean resubscribe);
@Nullable String group, boolean resubscribe);

/**
* Remove a subscriber.
Original file line number Diff line number Diff line change
@@ -14,12 +14,11 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorRef;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;

import org.apache.pekko.actor.ActorRef;

/**
* A jolly locale for the spreading of news.
*
@@ -42,7 +41,7 @@ public interface DistributedPub<T> {
* of a signal to ensure event order for each entity.
* @return the wrapped message to send to the publisher.
*/
Object wrapForPublication(T message, final CharSequence groupIndexKey);
Object wrapForPublication(T message, CharSequence groupIndexKey);

/**
* Wrap the message in an envelope to send to the publisher.
@@ -53,7 +52,7 @@ public interface DistributedPub<T> {
* @param ackExtractor extractor of ack-related information from the message.
* @return the wrapped message to send to the publisher.
*/
<S extends T> Object wrapForPublicationWithAcks(S message, final CharSequence groupIndexKey,
<S extends T> Object wrapForPublicationWithAcks(S message, CharSequence groupIndexKey,
AckExtractor<S> ackExtractor);

/**
Original file line number Diff line number Diff line change
@@ -18,11 +18,10 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorRef;
import org.eclipse.ditto.internal.utils.ddata.DistributedDataConfig;
import org.eclipse.ditto.internal.utils.pubsub.api.SubAck;

import org.apache.pekko.actor.ActorRef;

/**
* Access point for Ditto pub-sub subscribers.
*/
@@ -40,7 +39,7 @@ public interface DistributedSub {
*/
CompletionStage<SubAck> subscribeWithFilterAndGroup(Collection<String> topics,
ActorRef subscriber, @Nullable Predicate<Collection<String>> filter, @Nullable String group,
final boolean resubscribe);
boolean resubscribe);

/**
* Unsubscribe for a collection of topics.
Original file line number Diff line number Diff line change
@@ -15,8 +15,11 @@
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;

@@ -39,14 +42,55 @@ public static <T extends WithDittoHeaders> ReadSubjectExtractor<T> of() {
return new ReadSubjectExtractor<>();
}

/**
* Determines the pub/sub topics based on the provided {@code namespaces} and {@code authorizationSubjects}.
*
* @param namespaces the namespaces for which pub/sub topics should be determined for - may be empty if all
* namespaces are relevant and no scoping to namespaces should be done.
* @param authorizationSubjects the authorization subjects to subscribe for.
* @return a set of Ditto pub/sub topics with e.g. to subscribe for events in the cluster.
*/
public static Set<String> determineTopicsFor(
final Collection<String> namespaces,
final Collection<AuthorizationSubject> authorizationSubjects
) {

final Set<String> authorizationSubjectIds = authorizationSubjects.stream()
.map(AuthorizationSubject::getId)
.collect(Collectors.toSet());
if (namespaces.isEmpty()) {
return authorizationSubjectIds;
} else {
return namespaces.stream().flatMap(
namespace -> combineNamespaceWithAuthSubjects(namespace, authorizationSubjectIds)
).collect(Collectors.toSet());
}
}

@Override
public Collection<String> getTopics(final T event) {
final DittoHeaders dittoHeaders = event.getDittoHeaders();
final Set<AuthorizationSubject> readGrantedSubjects = dittoHeaders.getReadGrantedSubjects();

return readGrantedSubjects.stream()
final Set<String> topicsWithoutNamespace = readGrantedSubjects.stream()
.map(AuthorizationSubject::getId)
.collect(Collectors.toSet());

if (event instanceof WithEntityId withEntityId &&
withEntityId.getEntityId() instanceof NamespacedEntityId namespacedEntityId) {
final String namespace = namespacedEntityId.getNamespace();
return Stream.concat(
combineNamespaceWithAuthSubjects(namespace, topicsWithoutNamespace),
topicsWithoutNamespace.stream()
).collect(Collectors.toSet());
} else {
return topicsWithoutNamespace;
}
}

private static Stream<String> combineNamespaceWithAuthSubjects(final String namespace,
final Set<String> authorizationSubjectIds) {
return authorizationSubjectIds.stream().map(subject -> namespace + "#" + subject);
}

}