Skip to content

Commit 51228ba

Browse files
author
Yufei Cai
authoredNov 14, 2018
Merge pull request #285 from bsinno/bugfix/shard-region-detail-stats
fixed retrieving shard region stats and aggregation
2 parents d14631b + 3e5d6a3 commit 51228ba

File tree

14 files changed

+252
-99
lines changed

14 files changed

+252
-99
lines changed
 

‎pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@
300300
<plugin>
301301
<groupId>org.apache.maven.plugins</groupId>
302302
<artifactId>maven-surefire-plugin</artifactId>
303-
<version>2.22.0</version>
303+
<version>3.0.0-M1</version>
304304
<configuration>
305305
<systemProperties>
306306
<kamon.auto-start>true</kamon.auto-start>
@@ -315,7 +315,7 @@
315315
<plugin>
316316
<groupId>org.apache.maven.plugins</groupId>
317317
<artifactId>maven-failsafe-plugin</artifactId>
318-
<version>2.20.1</version>
318+
<version>3.0.0-M1</version>
319319
<executions>
320320
<execution>
321321
<id>integration-test</id>

‎services/concierge/starter/src/main/java/org/eclipse/ditto/services/concierge/starter/actors/ConciergeRootActor.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import org.eclipse.ditto.services.models.concierge.actors.ConciergeForwarderActor;
3030
import org.eclipse.ditto.services.utils.akka.LogUtil;
3131
import org.eclipse.ditto.services.utils.cluster.ClusterStatusSupplier;
32+
import org.eclipse.ditto.services.utils.cluster.RetrieveStatisticsDetailsResponseSupplier;
3233
import org.eclipse.ditto.services.utils.config.ConfigUtil;
3334
import org.eclipse.ditto.services.utils.config.MongoConfig;
3435
import org.eclipse.ditto.services.utils.health.DefaultHealthCheckingActorFactory;
3536
import org.eclipse.ditto.services.utils.health.HealthCheckingActorOptions;
3637
import org.eclipse.ditto.services.utils.health.routes.StatusRoute;
3738
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientActor;
39+
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;
3840

3941
import com.typesafe.config.Config;
4042

@@ -53,7 +55,6 @@
5355
import akka.actor.SupervisorStrategy;
5456
import akka.cluster.Cluster;
5557
import akka.cluster.pubsub.DistributedPubSubMediator;
56-
import akka.cluster.sharding.ShardRegion;
5758
import akka.cluster.singleton.ClusterSingletonManager;
5859
import akka.cluster.singleton.ClusterSingletonManagerSettings;
5960
import akka.event.DiagnosticLoggingAdapter;
@@ -64,6 +65,7 @@
6465
import akka.japi.pf.DeciderBuilder;
6566
import akka.japi.pf.ReceiveBuilder;
6667
import akka.pattern.AskTimeoutException;
68+
import akka.pattern.PatternsCS;
6769
import akka.stream.ActorMaterializer;
6870

6971
/**
@@ -79,9 +81,6 @@ public final class ConciergeRootActor extends AbstractActor {
7981
private static final String RESTARTING_CHILD_MSG = "Restarting child...";
8082

8183
private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
82-
83-
private final ActorRef conciergeShardRegion;
84-
8584
private final SupervisorStrategy supervisorStrategy = new OneForOneStrategy(true, DeciderBuilder
8685
.match(NullPointerException.class, e -> {
8786
log.error(e, "NullPointer in child actor: {}", e.getMessage());
@@ -129,6 +128,8 @@ public final class ConciergeRootActor extends AbstractActor {
129128
return SupervisorStrategy.escalate();
130129
}).build());
131130

131+
private final RetrieveStatisticsDetailsResponseSupplier retrieveStatisticsDetailsResponseSupplier;
132+
132133
private <C extends AbstractConciergeConfigReader> ConciergeRootActor(final C configReader,
133134
final ActorRef pubSubMediator,
134135
final AbstractEnforcerActorFactory<C> authorizationProxyPropsFactory,
@@ -141,7 +142,11 @@ private <C extends AbstractConciergeConfigReader> ConciergeRootActor(final C con
141142

142143
final ActorContext context = getContext();
143144

144-
conciergeShardRegion = authorizationProxyPropsFactory.startEnforcerActor(context, configReader, pubSubMediator);
145+
final ActorRef conciergeShardRegion =
146+
authorizationProxyPropsFactory.startEnforcerActor(context, configReader, pubSubMediator);
147+
148+
retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(conciergeShardRegion,
149+
ConciergeMessagingConstants.SHARD_REGION, log);
145150

146151
final ActorRef conciergeForwarder = startChildActor(context, ConciergeForwarderActor.ACTOR_NAME,
147152
ConciergeForwarderActor.props(pubSubMediator, conciergeShardRegion));
@@ -233,15 +238,20 @@ public SupervisorStrategy supervisorStrategy() {
233238
@Override
234239
public Receive createReceive() {
235240
return ReceiveBuilder.create()
236-
.matchEquals(ShardRegion.getShardRegionStateInstance(), getShardRegionState ->
237-
conciergeShardRegion.forward(getShardRegionState, getContext()))
241+
.match(RetrieveStatisticsDetails.class, this::handleRetrieveStatisticsDetails)
238242
.match(Status.Failure.class, f -> log.error(f.cause(), "Got failure <{}>!", f))
239243
.matchAny(m -> {
240244
log.warning("Unknown message <{}>.", m);
241245
unhandled(m);
242246
}).build();
243247
}
244248

249+
private void handleRetrieveStatisticsDetails(final RetrieveStatisticsDetails command) {
250+
log.info("Sending the namespace stats of the concierge shard as requested..");
251+
PatternsCS.pipe(retrieveStatisticsDetailsResponseSupplier
252+
.apply(command.getDittoHeaders()), getContext().dispatcher()).to(getSender());
253+
}
254+
245255
private void bindHttpStatusRoute(final ActorRef healthCheckingActor, final HttpConfigReader httpConfig,
246256
final ActorMaterializer materializer) {
247257
String hostname = httpConfig.getHostname();

‎services/gateway/proxy/src/main/java/org/eclipse/ditto/services/gateway/proxy/actors/StatisticsActor.java

+38-52
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
import java.util.Objects;
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.function.Consumer;
18-
import java.util.function.Function;
1918
import java.util.function.Predicate;
20-
import java.util.stream.Collectors;
2119

2220
import javax.annotation.Nonnull;
2321
import javax.annotation.Nullable;
@@ -28,6 +26,7 @@
2826
import org.eclipse.ditto.json.JsonFieldDefinition;
2927
import org.eclipse.ditto.json.JsonObject;
3028
import org.eclipse.ditto.json.JsonObjectBuilder;
29+
import org.eclipse.ditto.json.JsonValue;
3130
import org.eclipse.ditto.model.base.json.FieldType;
3231
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
3332
import org.eclipse.ditto.model.base.json.Jsonifiable;
@@ -40,6 +39,7 @@
4039
import org.eclipse.ditto.services.utils.metrics.instruments.gauge.Gauge;
4140
import org.eclipse.ditto.signals.commands.devops.RetrieveStatistics;
4241
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;
42+
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetailsResponse;
4343
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsResponse;
4444

4545
import akka.actor.AbstractActor;
@@ -144,10 +144,6 @@ private static Map<String, ShardStatisticsWrapper> initShardStatisticsMap() {
144144
return shardStatisticsMap;
145145
}
146146

147-
private static String ensureNonemptyString(final String possiblyEmptyString) {
148-
return possiblyEmptyString.isEmpty() ? EMPTY_STRING_TAG : possiblyEmptyString;
149-
}
150-
151147
@Override
152148
public Receive createReceive() {
153149
// ignore - the message is too late
@@ -168,16 +164,16 @@ public Receive createReceive() {
168164
final ActorRef self = getSelf();
169165
final ActorRef sender = getSender();
170166
becomeStatisticsAwaiting(statistics ->
171-
sender.tell(RetrieveStatisticsResponse.of(statistics.toJson(),
172-
retrieveStatistics.getDittoHeaders()), self)
167+
sender.tell(RetrieveStatisticsResponse.of(statistics.toJson(),
168+
retrieveStatistics.getDittoHeaders()), self)
173169
);
174170
})
175171
.match(RetrieveStatisticsDetails.class, retrieveStatistics -> {
176172

177-
tellRootActorToGetShardRegionState(THINGS_ROOT);
178-
tellRootActorToGetShardRegionState(POLICIES_ROOT);
179-
tellRootActorToGetShardRegionState(CONCIERGE_ROOT);
180-
tellRootActorToGetShardRegionState(SEARCH_UPDATER_ROOT);
173+
tellRootActorToRetrieveStatistics(THINGS_ROOT, retrieveStatistics);
174+
tellRootActorToRetrieveStatistics(POLICIES_ROOT, retrieveStatistics);
175+
tellRootActorToRetrieveStatistics(CONCIERGE_ROOT, retrieveStatistics);
176+
tellRootActorToRetrieveStatistics(SEARCH_UPDATER_ROOT, retrieveStatistics);
181177

182178
final ActorRef self = getSelf();
183179
final ActorRef sender = getSender();
@@ -199,9 +195,10 @@ private void tellShardRegionToSendClusterShardingStats() {
199195
tellShardRegionToSendClusterShardingStats(SR_SEARCH_UPDATER);
200196
}
201197

202-
private void tellRootActorToGetShardRegionState(final String rootActorPath) {
203-
pubSubMediator.tell(new DistributedPubSubMediator.SendToAll(
204-
rootActorPath, ShardRegion.getShardRegionStateInstance(), false), getSelf());
198+
private void tellRootActorToRetrieveStatistics(final String rootActorPath,
199+
final RetrieveStatisticsDetails retrieveStatistics) {
200+
pubSubMediator.tell(new DistributedPubSubMediator.SendToAll(rootActorPath, retrieveStatistics, false),
201+
getSelf());
205202
}
206203

207204
private void tellShardRegionToSendClusterShardingStats(final String shardRegion) {
@@ -232,7 +229,8 @@ private void becomeStatisticsAwaiting(final Consumer<Statistics> statisticsConsu
232229
currentStatistics.toJson(), retrieveStatistics.getDittoHeaders()), getSelf())
233230
)
234231
.match(ShardRegion.ClusterShardingStats.class, clusterShardingStats -> {
235-
final ShardStatisticsWrapper shardStatistics = getShardStatistics(shardStatisticsMap);
232+
final ShardStatisticsWrapper shardStatistics = getShardStatistics(shardStatisticsMap,
233+
getSender().path().name());
236234
final Map<Address, ShardRegion.ShardRegionStats> regions = clusterShardingStats.getRegions();
237235
shardStatistics.count = regions.isEmpty() ? 0 : regions.values().stream()
238236
.mapToInt(shardRegionStats -> shardRegionStats.getStats().isEmpty() ? 0 :
@@ -270,40 +268,25 @@ private void becomeStatisticsDetailsAwaiting(final Consumer<StatisticsDetails> s
270268
retrieveStatistics -> getSender().tell(RetrieveStatisticsResponse.of(
271269
currentStatisticsDetails.toJson(), retrieveStatistics.getDittoHeaders()), getSelf())
272270
)
273-
.match(ShardRegion.CurrentShardRegionState.class, currentShardRegionState -> {
274-
final ShardStatisticsWrapper shardStatistics = getShardStatistics(shardStatisticsMap);
275-
final Map<String, Long> shards = currentShardRegionState.getShards()
271+
.match(RetrieveStatisticsDetailsResponse.class, retrieveStatisticsDetailsResponse -> {
272+
final String shardRegion = retrieveStatisticsDetailsResponse.getStatisticsDetails()
276273
.stream()
277-
.map(ShardRegion.ShardState::getEntityIds)
278-
.flatMap(strSet -> strSet.stream()
279-
.map(str -> {
280-
// groupKey may be either namespace or resource-type+namespace (in case of concierge)
281-
final String[] groupKeys = str.split(":", 3);
282-
// assume String.split(String, int) may not return an empty array
283-
switch (groupKeys.length) {
284-
case 0:
285-
// should not happen with Java 8 strings, but just in case
286-
return EMPTY_STRING_TAG;
287-
case 1:
288-
case 2:
289-
// normal: namespace
290-
return ensureNonemptyString(groupKeys[0]);
291-
default:
292-
// concierge: resource-type + namespace
293-
return groupKeys[0] + ":" + groupKeys[1];
294-
}
295-
})
296-
)
297-
.collect(Collectors.groupingBy(Function.identity(),
298-
Collectors.mapping(Function.identity(), Collectors.counting())));
299-
300-
shards.forEach((key, value) -> {
301-
if (shardStatistics.hotnessMap.containsKey(key)) {
302-
shardStatistics.hotnessMap.put(key, shardStatistics.hotnessMap.get(key) + value);
303-
} else {
304-
shardStatistics.hotnessMap.put(key, value);
305-
}
306-
});
274+
.findFirst()
275+
.map(JsonField::getKeyName)
276+
.orElse(null);
277+
278+
if (shardRegion != null) {
279+
final ShardStatisticsWrapper shardStatistics =
280+
getShardStatistics(shardStatisticsMap, shardRegion);
281+
282+
retrieveStatisticsDetailsResponse.getStatisticsDetails().getValue(shardRegion)
283+
.map(JsonValue::asObject)
284+
.map(JsonObject::stream)
285+
.ifPresent(namespaceEntries -> namespaceEntries.forEach(field ->
286+
shardStatistics.hotnessMap
287+
.merge(field.getKeyName(), field.getValue().asLong(), Long::sum)
288+
));
289+
}
307290
})
308291
.match(AskTimeoutException.class, askTimeout -> {
309292
currentStatisticsDetails = new StatisticsDetails(
@@ -320,8 +303,10 @@ private void becomeStatisticsDetailsAwaiting(final Consumer<StatisticsDetails> s
320303
);
321304
}
322305

323-
private ShardStatisticsWrapper getShardStatistics(final Map<String, ShardStatisticsWrapper> shardStatisticsMap) {
324-
ShardStatisticsWrapper shardStatistics = shardStatisticsMap.get(getSender().path().name());
306+
private ShardStatisticsWrapper getShardStatistics(final Map<String, ShardStatisticsWrapper> shardStatisticsMap,
307+
final String shardRegion) {
308+
309+
ShardStatisticsWrapper shardStatistics = shardStatisticsMap.get(shardRegion);
325310
if (shardStatistics == null) {
326311
if (getSender().path().toStringWithoutAddress().contains(SR_SEARCH_UPDATER)) {
327312
shardStatistics = shardStatisticsMap.get(SR_SEARCH_UPDATER);
@@ -548,7 +533,8 @@ public String toString() {
548533

549534
}
550535

551-
private static class InternalRetrieveStatistics{
536+
private static class InternalRetrieveStatistics {
537+
552538
private InternalRetrieveStatistics() {
553539
// no-op
554540
}

‎services/gateway/streaming/src/main/java/org/eclipse/ditto/services/gateway/streaming/actors/EventAndResponsePublisher.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import akka.event.DiagnosticLoggingAdapter;
3030
import akka.japi.Creator;
3131
import akka.japi.pf.ReceiveBuilder;
32-
import akka.stream.actor.AbstractActorPublisher;
32+
import akka.stream.actor.AbstractActorPublisherWithStash;
3333
import akka.stream.actor.ActorPublisherMessage;
3434
import scala.concurrent.duration.FiniteDuration;
3535

@@ -38,7 +38,7 @@
3838
* necessary.
3939
*/
4040
public final class EventAndResponsePublisher
41-
extends AbstractActorPublisher<Jsonifiable.WithPredicate<JsonObject, JsonField>> {
41+
extends AbstractActorPublisherWithStash<Jsonifiable.WithPredicate<JsonObject, JsonField>> {
4242

4343
private static final int MESSAGE_CONSUMPTION_CHECK_SECONDS = 2;
4444
private final DiagnosticLoggingAdapter logger = LogUtil.obtain(this);
@@ -79,11 +79,15 @@ public Receive createReceive() {
7979
logger.debug("Established new connection: {}", connectionCorrelationId);
8080
getContext().become(connected(connectionCorrelationId));
8181
})
82-
.matchAny(any -> logger.warning("Got unknown message during init phase '{}'", any)).build();
82+
.matchAny(any -> {
83+
logger.info("Got unknown message during init phase '{}' - stashing..", any);
84+
stash();
85+
}).build();
8386
}
8487

8588
private Receive connected(final String connectionCorrelationId) {
8689
this.connectionCorrelationId = connectionCorrelationId;
90+
unstashAll();
8791

8892
return ReceiveBuilder.create()
8993
.match(Signal.class, signal -> buffer.size() >= backpressureBufferSize, signal -> {

‎services/models/thingsearch/src/main/java/org/eclipse/ditto/services/models/thingsearch/ThingsSearchConstants.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public final class ThingsSearchConstants {
3939
/**
4040
* Path of the updater root actor.
4141
*/
42-
public static final String UPDATER_ROOT_ACTOR_PATH = USER_PATH + "/searchUpdaterRoot";
42+
public static final String UPDATER_ROOT_ACTOR_PATH = ROOT_ACTOR_PATH + "/searchUpdaterRoot";
4343

4444
/**
4545
* Path of the search actor.

‎services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.eclipse.ditto.services.policies.util.ConfigKeys;
3030
import org.eclipse.ditto.services.utils.akka.LogUtil;
3131
import org.eclipse.ditto.services.utils.cluster.ClusterStatusSupplier;
32+
import org.eclipse.ditto.services.utils.cluster.RetrieveStatisticsDetailsResponseSupplier;
3233
import org.eclipse.ditto.services.utils.cluster.ShardRegionExtractor;
3334
import org.eclipse.ditto.services.utils.config.ConfigUtil;
3435
import org.eclipse.ditto.services.utils.config.MongoConfig;
@@ -37,6 +38,7 @@
3738
import org.eclipse.ditto.services.utils.health.routes.StatusRoute;
3839
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
3940
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientActor;
41+
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;
4042

4143
import com.typesafe.config.Config;
4244

@@ -55,7 +57,6 @@
5557
import akka.cluster.pubsub.DistributedPubSubMediator;
5658
import akka.cluster.sharding.ClusterSharding;
5759
import akka.cluster.sharding.ClusterShardingSettings;
58-
import akka.cluster.sharding.ShardRegion;
5960
import akka.event.DiagnosticLoggingAdapter;
6061
import akka.http.javadsl.ConnectHttp;
6162
import akka.http.javadsl.Http;
@@ -65,6 +66,7 @@
6566
import akka.japi.pf.DeciderBuilder;
6667
import akka.japi.pf.ReceiveBuilder;
6768
import akka.pattern.AskTimeoutException;
69+
import akka.pattern.PatternsCS;
6870
import akka.stream.ActorMaterializer;
6971

7072
/**
@@ -125,7 +127,7 @@ public final class PoliciesRootActor extends AbstractActor {
125127
return SupervisorStrategy.escalate();
126128
}).build());
127129

128-
private final ActorRef policiesShardRegion;
130+
private final RetrieveStatisticsDetailsResponseSupplier retrieveStatisticsDetailsResponseSupplier;
129131

130132
private PoliciesRootActor(final ServiceConfigReader configReader,
131133
final SnapshotAdapter<Policy> snapshotAdapter,
@@ -150,10 +152,13 @@ private PoliciesRootActor(final ServiceConfigReader configReader,
150152
pubSubMediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
151153
pubSubMediator.tell(new DistributedPubSubMediator.Put(persistenceStreamingActor), getSelf());
152154

153-
policiesShardRegion = ClusterSharding.get(getContext().system())
155+
final ActorRef policiesShardRegion = ClusterSharding.get(getContext().system())
154156
.start(PoliciesMessagingConstants.SHARD_REGION, policySupervisorProps, shardingSettings,
155157
ShardRegionExtractor.of(numberOfShards, getContext().getSystem()));
156158

159+
retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(policiesShardRegion,
160+
PoliciesMessagingConstants.SHARD_REGION, log);
161+
157162
final HealthConfigReader healthConfig = configReader.health();
158163

159164
final ActorRef mongoClient = startChildActor(MongoClientActor.ACTOR_NAME, MongoClientActor
@@ -234,15 +239,20 @@ public SupervisorStrategy supervisorStrategy() {
234239
@Override
235240
public Receive createReceive() {
236241
return ReceiveBuilder.create()
237-
.matchEquals(ShardRegion.getShardRegionStateInstance(), getShardRegionState ->
238-
policiesShardRegion.forward(getShardRegionState, getContext()))
242+
.match(RetrieveStatisticsDetails.class, this::handleRetrieveStatisticsDetails)
239243
.match(Status.Failure.class, f -> log.error(f.cause(), "Got failure: {}", f))
240244
.matchAny(m -> {
241245
log.warning("Unknown message: {}", m);
242246
unhandled(m);
243247
}).build();
244248
}
245249

250+
private void handleRetrieveStatisticsDetails(final RetrieveStatisticsDetails command) {
251+
log.info("Sending the namespace stats of the policy shard as requested..");
252+
PatternsCS.pipe(retrieveStatisticsDetailsResponseSupplier
253+
.apply(command.getDittoHeaders()), getContext().dispatcher()).to(getSender());
254+
}
255+
246256
private ActorRef startChildActor(final String actorName, final Props props) {
247257
log.info("Starting child actor '{}'", actorName);
248258
return getContext().actorOf(props, actorName);

‎services/things/starter/src/main/java/org/eclipse/ditto/services/things/starter/ThingsRootActor.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
import org.eclipse.ditto.services.things.starter.util.ConfigKeys;
3030
import org.eclipse.ditto.services.utils.akka.LogUtil;
3131
import org.eclipse.ditto.services.utils.cluster.ClusterStatusSupplier;
32+
import org.eclipse.ditto.services.utils.cluster.RetrieveStatisticsDetailsResponseSupplier;
3233
import org.eclipse.ditto.services.utils.cluster.ShardRegionExtractor;
3334
import org.eclipse.ditto.services.utils.config.ConfigUtil;
3435
import org.eclipse.ditto.services.utils.config.MongoConfig;
3536
import org.eclipse.ditto.services.utils.health.DefaultHealthCheckingActorFactory;
3637
import org.eclipse.ditto.services.utils.health.HealthCheckingActorOptions;
3738
import org.eclipse.ditto.services.utils.health.routes.StatusRoute;
3839
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientActor;
40+
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;
3941

4042
import com.typesafe.config.Config;
4143

@@ -54,7 +56,6 @@
5456
import akka.cluster.pubsub.DistributedPubSubMediator;
5557
import akka.cluster.sharding.ClusterSharding;
5658
import akka.cluster.sharding.ClusterShardingSettings;
57-
import akka.cluster.sharding.ShardRegion;
5859
import akka.event.DiagnosticLoggingAdapter;
5960
import akka.http.javadsl.ConnectHttp;
6061
import akka.http.javadsl.Http;
@@ -64,6 +65,7 @@
6465
import akka.japi.pf.DeciderBuilder;
6566
import akka.japi.pf.ReceiveBuilder;
6667
import akka.pattern.AskTimeoutException;
68+
import akka.pattern.PatternsCS;
6769
import akka.stream.ActorMaterializer;
6870

6971
/**
@@ -135,8 +137,7 @@ final class ThingsRootActor extends AbstractActor {
135137
return SupervisorStrategy.escalate();
136138
}).build());
137139

138-
private final ActorRef thingsShardRegion;
139-
140+
private final RetrieveStatisticsDetailsResponseSupplier retrieveStatisticsDetailsResponseSupplier;
140141

141142
private ThingsRootActor(final ServiceConfigReader configReader,
142143
final ActorRef pubSubMediator,
@@ -152,12 +153,15 @@ private ThingsRootActor(final ServiceConfigReader configReader,
152153
ClusterShardingSettings.create(getContext().system())
153154
.withRole(ThingsMessagingConstants.CLUSTER_ROLE);
154155

155-
thingsShardRegion = ClusterSharding.get(getContext().system())
156+
final ActorRef thingsShardRegion = ClusterSharding.get(getContext().system())
156157
.start(ThingsMessagingConstants.SHARD_REGION,
157158
thingSupervisorProps,
158159
shardingSettings,
159160
ShardRegionExtractor.of(numberOfShards, getContext().getSystem()));
160161

162+
retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(thingsShardRegion,
163+
ThingsMessagingConstants.SHARD_REGION, log);
164+
161165
final HealthConfigReader healthConfig = configReader.health();
162166
final HealthCheckingActorOptions.Builder hcBuilder =
163167
HealthCheckingActorOptions.getBuilder(healthConfig.enabled(), healthConfig.getInterval());
@@ -245,15 +249,20 @@ public SupervisorStrategy supervisorStrategy() {
245249
@Override
246250
public Receive createReceive() {
247251
return ReceiveBuilder.create()
248-
.matchEquals(ShardRegion.getShardRegionStateInstance(), getShardRegionState ->
249-
thingsShardRegion.forward(getShardRegionState, getContext()))
252+
.match(RetrieveStatisticsDetails.class, this::handleRetrieveStatisticsDetails)
250253
.match(Status.Failure.class, f -> log.error(f.cause(), "Got failure: {}", f))
251254
.matchAny(m -> {
252255
log.warning("Unknown message: {}", m);
253256
unhandled(m);
254257
}).build();
255258
}
256259

260+
private void handleRetrieveStatisticsDetails(final RetrieveStatisticsDetails command) {
261+
log.info("Sending the namespace stats of the things shard as requested..");
262+
PatternsCS.pipe(retrieveStatisticsDetailsResponseSupplier
263+
.apply(command.getDittoHeaders()), getContext().dispatcher()).to(getSender());
264+
}
265+
257266
private ActorRef startChildActor(final String actorName, final Props props) {
258267
log.info("Starting child actor <{}>.", actorName);
259268
return getContext().actorOf(props, actorName);

‎services/thingsearch/updater-actors/src/main/java/org/eclipse/ditto/services/thingsearch/updater/actors/SearchUpdaterRootActor.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
package org.eclipse.ditto.services.thingsearch.updater.actors;
1212

1313
import java.time.Duration;
14-
import java.util.concurrent.TimeUnit;
1514

1615
import org.eclipse.ditto.services.base.config.ServiceConfigReader;
1716
import org.eclipse.ditto.services.thingsearch.common.util.ConfigKeys;
@@ -24,6 +23,7 @@
2423
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
2524
import org.eclipse.ditto.services.utils.persistence.mongo.monitoring.KamonCommandListener;
2625
import org.eclipse.ditto.services.utils.persistence.mongo.monitoring.KamonConnectionPoolListener;
26+
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;
2727

2828
import com.mongodb.event.CommandListener;
2929
import com.mongodb.event.ConnectionPoolListener;
@@ -36,7 +36,6 @@
3636
import akka.actor.Status;
3737
import akka.actor.SupervisorStrategy;
3838
import akka.cluster.pubsub.DistributedPubSubMediator;
39-
import akka.cluster.sharding.ShardRegion;
4039
import akka.cluster.singleton.ClusterSingletonManager;
4140
import akka.cluster.singleton.ClusterSingletonManagerSettings;
4241
import akka.event.Logging;
@@ -87,8 +86,7 @@ private SearchUpdaterRootActor(final ServiceConfigReader configReader, final Act
8786
final Duration resetTimeout = config.getDuration(ConfigKeys.MONGO_CIRCUIT_BREAKER_TIMEOUT_RESET);
8887
final CircuitBreaker circuitBreaker =
8988
new CircuitBreaker(getContext().dispatcher(), getContext().system().scheduler(), maxFailures,
90-
scala.concurrent.duration.Duration.create(callTimeout.getSeconds(), TimeUnit.SECONDS),
91-
scala.concurrent.duration.Duration.create(resetTimeout.getSeconds(), TimeUnit.SECONDS));
89+
callTimeout, resetTimeout);
9290
circuitBreaker.onOpen(() -> log.warning(
9391
"The circuit breaker for this search updater instance is open which means that all ThingUpdaters" +
9492
" won't process any messages until the circuit breaker is closed again"));
@@ -214,8 +212,7 @@ public SearchUpdaterRootActor create() {
214212
@Override
215213
public Receive createReceive() {
216214
return ReceiveBuilder.create()
217-
.matchEquals(ShardRegion.getShardRegionStateInstance(), getShardRegionState ->
218-
thingsUpdaterActor.forward(getShardRegionState, getContext()))
215+
.match(RetrieveStatisticsDetails.class, cmd -> thingsUpdaterActor.forward(cmd, getContext()))
219216
.match(Status.Failure.class, f -> log.error(f.cause(), "Got failure: {}", f))
220217
.matchAny(m -> {
221218
log.warning("Unknown message: {}", m);

‎services/thingsearch/updater-actors/src/main/java/org/eclipse/ditto/services/thingsearch/updater/actors/ThingsUpdater.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
import org.eclipse.ditto.model.base.json.Jsonifiable;
2222
import org.eclipse.ditto.services.models.policies.PolicyReferenceTag;
2323
import org.eclipse.ditto.services.models.things.ThingTag;
24+
import org.eclipse.ditto.services.models.thingsearch.ThingsSearchConstants;
2425
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingsSearchUpdaterPersistence;
2526
import org.eclipse.ditto.services.utils.akka.LogUtil;
27+
import org.eclipse.ditto.services.utils.cluster.RetrieveStatisticsDetailsResponseSupplier;
2628
import org.eclipse.ditto.signals.base.ShardedMessageEnvelope;
29+
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;
2730
import org.eclipse.ditto.signals.events.base.Event;
2831
import org.eclipse.ditto.signals.events.policies.PolicyEvent;
2932
import org.eclipse.ditto.signals.events.things.ThingEvent;
@@ -34,12 +37,12 @@
3437
import akka.actor.Props;
3538
import akka.cluster.pubsub.DistributedPubSub;
3639
import akka.cluster.pubsub.DistributedPubSubMediator;
37-
import akka.cluster.sharding.ShardRegion;
3840
import akka.event.DiagnosticLoggingAdapter;
3941
import akka.event.Logging;
4042
import akka.japi.Creator;
4143
import akka.japi.pf.ReceiveBuilder;
4244
import akka.pattern.CircuitBreaker;
45+
import akka.pattern.PatternsCS;
4346
import akka.stream.ActorMaterializer;
4447
import akka.stream.Materializer;
4548
import akka.stream.javadsl.Sink;
@@ -61,6 +64,7 @@ final class ThingsUpdater extends AbstractActor {
6164
private final ActorRef shardRegion;
6265
private final ThingsSearchUpdaterPersistence searchUpdaterPersistence;
6366
private final Materializer materializer;
67+
private final RetrieveStatisticsDetailsResponseSupplier retrieveStatisticsDetailsResponseSupplier;
6468

6569
private ThingsUpdater(final int numberOfShards,
6670
final ShardRegionFactory shardRegionFactory,
@@ -87,6 +91,9 @@ private ThingsUpdater(final int numberOfShards,
8791
this.searchUpdaterPersistence = searchUpdaterPersistence;
8892
materializer = ActorMaterializer.create(getContext());
8993

94+
retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(shardRegion,
95+
ThingsSearchConstants.SHARD_REGION, log);
96+
9097
if (eventProcessingActive) {
9198
pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(ThingEvent.TYPE_PREFIX, UPDATER_GROUP, self()),
9299
self());
@@ -127,8 +134,7 @@ public ThingsUpdater create() {
127134
@Override
128135
public Receive createReceive() {
129136
return ReceiveBuilder.create()
130-
.matchEquals(ShardRegion.getShardRegionStateInstance(), getShardRegionState ->
131-
shardRegion.forward(getShardRegionState, getContext()))
137+
.match(RetrieveStatisticsDetails.class, this::handleRetrieveStatisticsDetails)
132138
.match(ThingEvent.class, this::processThingEvent)
133139
.match(PolicyEvent.class, this::processPolicyEvent)
134140
.match(ThingTag.class, this::processThingTag)
@@ -140,6 +146,12 @@ public Receive createReceive() {
140146
}).build();
141147
}
142148

149+
private void handleRetrieveStatisticsDetails(final RetrieveStatisticsDetails command) {
150+
log.info("Sending the namespace stats of the search-updater shard as requested..");
151+
PatternsCS.pipe(retrieveStatisticsDetailsResponseSupplier
152+
.apply(command.getDittoHeaders()), getContext().dispatcher()).to(getSender());
153+
}
154+
143155
private void processThingTag(final ThingTag thingTag) {
144156
final String elementIdentifier = thingTag.asIdentifierString();
145157
LogUtil.enhanceLogWithCorrelationId(log, "things-tags-sync-" + elementIdentifier);

‎services/thingsearch/updater-actors/src/test/java/org/eclipse/ditto/services/thingsearch/updater/actors/ThingsUpdaterTest.java

-12
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747

4848
import akka.actor.ActorRef;
4949
import akka.actor.ActorSystem;
50-
import akka.cluster.sharding.ShardRegion;
5150
import akka.pattern.CircuitBreaker;
5251
import akka.stream.javadsl.Source;
5352
import akka.testkit.TestProbe;
@@ -136,17 +135,6 @@ public void policyReferenceTagIsForwarded() {
136135
}};
137136
}
138137

139-
@Test
140-
public void shardRegionStateIsForwarded() {
141-
final ShardRegion.GetShardRegionState$ shardRegionState = ShardRegion.getShardRegionStateInstance();
142-
new TestKit(actorSystem) {{
143-
final ActorRef underTest = createThingsUpdater();
144-
underTest.tell(shardRegionState, getRef());
145-
shardMessageReceiver.expectMsg(shardRegionState);
146-
}};
147-
}
148-
149-
150138
private void expectShardedMessage(final TestProbe probe, final Jsonifiable event, final String id) {
151139
final ShardedMessageEnvelope envelope = probe.expectMsgClass(ShardedMessageEnvelope.class);
152140
assertThat(envelope.getMessage())

‎services/utils/cluster/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
<groupId>org.eclipse.ditto</groupId>
4848
<artifactId>ditto-signals-commands-base</artifactId>
4949
</dependency>
50+
<dependency>
51+
<groupId>org.eclipse.ditto</groupId>
52+
<artifactId>ditto-signals-commands-devops</artifactId>
53+
</dependency>
5054

5155
<dependency>
5256
<groupId>com.typesafe.akka</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) 2017-2018 Bosch Software Innovations GmbH.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v2.0
6+
* which accompanies this distribution, and is available at
7+
* https://www.eclipse.org/org/documents/epl-2.0/index.php
8+
*
9+
* SPDX-License-Identifier: EPL-2.0
10+
*/
11+
package org.eclipse.ditto.services.utils.cluster;
12+
13+
import java.time.Duration;
14+
import java.util.Map;
15+
import java.util.concurrent.CompletionStage;
16+
import java.util.function.Function;
17+
import java.util.stream.Collectors;
18+
19+
import org.eclipse.ditto.json.JsonCollectors;
20+
import org.eclipse.ditto.json.JsonFactory;
21+
import org.eclipse.ditto.json.JsonField;
22+
import org.eclipse.ditto.json.JsonObject;
23+
import org.eclipse.ditto.json.JsonValue;
24+
import org.eclipse.ditto.model.base.headers.DittoHeaders;
25+
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetailsResponse;
26+
27+
import akka.actor.ActorRef;
28+
import akka.cluster.sharding.ShardRegion;
29+
import akka.event.DiagnosticLoggingAdapter;
30+
import akka.pattern.PatternsCS;
31+
32+
/**
33+
* Supplier of {@link RetrieveStatisticsDetailsResponse}s for a specific shard region - determines the "hot entities"
34+
* per namespace and aggregates them into a single {@link RetrieveStatisticsDetailsResponse}.
35+
*/
36+
public final class RetrieveStatisticsDetailsResponseSupplier
37+
implements Function<DittoHeaders, CompletionStage<RetrieveStatisticsDetailsResponse>> {
38+
39+
private static final String EMPTY_ID = "<empty>";
40+
41+
private final ActorRef shardRegion;
42+
private final String shardRegionName;
43+
private final DiagnosticLoggingAdapter log;
44+
45+
private RetrieveStatisticsDetailsResponseSupplier(final ActorRef shardRegion, final String shardRegionName,
46+
final DiagnosticLoggingAdapter log) {
47+
this.shardRegion = shardRegion;
48+
this.shardRegionName = shardRegionName;
49+
this.log = log;
50+
}
51+
52+
/**
53+
* Creates a new instance of a {@link RetrieveStatisticsDetailsResponse} supplier for the passed {@code shardRegion}
54+
* and {@code shardRegionName}.
55+
*
56+
* @param shardRegion the shard region ActoRef to use for retrieving the shard region state.
57+
* @param shardRegionName the shard region name.
58+
* @param log the logger to use.
59+
* @return the new RetrieveStatisticsDetailsResponse supplier
60+
*/
61+
public static RetrieveStatisticsDetailsResponseSupplier of(final ActorRef shardRegion, final String shardRegionName,
62+
final DiagnosticLoggingAdapter log) {
63+
return new RetrieveStatisticsDetailsResponseSupplier(shardRegion, shardRegionName, log);
64+
}
65+
66+
@Override
67+
public CompletionStage<RetrieveStatisticsDetailsResponse> apply(final DittoHeaders dittoHeaders) {
68+
return PatternsCS.ask(shardRegion, ShardRegion.getShardRegionStateInstance(),
69+
Duration.ofSeconds(5))
70+
.handle((result, throwable) -> {
71+
if (throwable != null) {
72+
log.error(throwable, "Could not determine 'ShardRegionState' for shard region <{}>",
73+
shardRegionName);
74+
return RetrieveStatisticsDetailsResponse.of(JsonObject.newBuilder()
75+
.set(shardRegionName, JsonFactory.newObject())
76+
.build(),
77+
dittoHeaders);
78+
} else if (result instanceof ShardRegion.CurrentShardRegionState) {
79+
final Map<String, Long> shardStats =
80+
((ShardRegion.CurrentShardRegionState) result).getShards()
81+
.stream()
82+
.map(ShardRegion.ShardState::getEntityIds)
83+
.flatMap(strSet -> strSet.stream()
84+
.map(str -> {
85+
// groupKey may be either namespace or resource-type+namespace (in case of concierge)
86+
final String[] groupKeys = str.split(":", 3);
87+
// assume String.split(String, int) may not return an empty array
88+
switch (groupKeys.length) {
89+
case 0:
90+
// should not happen with Java 8 strings, but just in case
91+
return EMPTY_ID;
92+
case 1:
93+
case 2:
94+
// normal: namespace
95+
return ensureNonemptyString(
96+
groupKeys[0]);
97+
default:
98+
// concierge: resource-type + namespace
99+
return groupKeys[0] + ":" +
100+
groupKeys[1];
101+
}
102+
})
103+
)
104+
.collect(Collectors.groupingBy(Function.identity(),
105+
Collectors.mapping(Function.identity(),
106+
Collectors.counting())));
107+
108+
final JsonObject namespaceStats = shardStats.entrySet().stream()
109+
.map(entry -> JsonField.newInstance(entry.getKey(),
110+
JsonValue.of(entry.getValue())))
111+
.collect(JsonCollectors.fieldsToObject());
112+
113+
final JsonObject thingNamespacesStats = JsonObject.newBuilder()
114+
.set(shardRegionName, namespaceStats)
115+
.build();
116+
117+
return RetrieveStatisticsDetailsResponse.of(thingNamespacesStats,
118+
dittoHeaders);
119+
} else {
120+
log.warning("Unexpected answer to " +
121+
"'ShardRegion.getShardRegionStateInstance()': {}", result);
122+
return RetrieveStatisticsDetailsResponse.of(JsonObject.newBuilder()
123+
.set(shardRegionName, JsonFactory.newObject())
124+
.build(),
125+
dittoHeaders);
126+
}
127+
});
128+
}
129+
130+
private static String ensureNonemptyString(final String possiblyEmptyString) {
131+
return possiblyEmptyString.isEmpty() ? EMPTY_ID : possiblyEmptyString;
132+
}
133+
}

‎signals/commands/devops/src/main/java/org/eclipse/ditto/signals/commands/devops/DevOpsCommandRegistry.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public static DevOpsCommandRegistry newInstance() {
4242
parseStrategies.put(ChangeLogLevel.TYPE, ChangeLogLevel::fromJson);
4343
parseStrategies.put(RetrieveLoggerConfig.TYPE, RetrieveLoggerConfig::fromJson);
4444
parseStrategies.put(RetrieveStatistics.TYPE, RetrieveStatistics::fromJson);
45-
parseStrategies.put(RetrieveStatisticsDetails.TYPE, RetrieveStatistics::fromJson);
45+
parseStrategies.put(RetrieveStatisticsDetails.TYPE, RetrieveStatisticsDetails::fromJson);
4646
parseStrategies.put(ExecutePiggybackCommand.TYPE, ExecutePiggybackCommand::fromJson);
4747

4848
return new DevOpsCommandRegistry(parseStrategies);

‎signals/commands/devops/src/main/java/org/eclipse/ditto/signals/commands/devops/DevOpsCommandResponseRegistry.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public static DevOpsCommandResponseRegistry newInstance() {
4242
parseStrategies.put(ChangeLogLevelResponse.TYPE, ChangeLogLevelResponse::fromJson);
4343
parseStrategies.put(RetrieveLoggerConfigResponse.TYPE, RetrieveLoggerConfigResponse::fromJson);
4444
parseStrategies.put(RetrieveStatisticsResponse.TYPE, RetrieveStatisticsResponse::fromJson);
45-
parseStrategies.put(RetrieveStatisticsDetailsResponse.TYPE, RetrieveStatisticsResponse::fromJson);
45+
parseStrategies.put(RetrieveStatisticsDetailsResponse.TYPE, RetrieveStatisticsDetailsResponse::fromJson);
4646
parseStrategies.put(AggregatedDevOpsCommandResponse.TYPE,
4747
(jsonObject, dittoHeaders) -> AggregatedDevOpsCommandResponse.fromJson(jsonObject, dittoHeaders, parseStrategies));
4848

0 commit comments

Comments
 (0)
Please sign in to comment.