Skip to content

Commit c9cf391

Browse files
authored
Merge pull request eclipse-ditto#1847 from eclipse-ditto/feature/enhance-wot-skeleton-generation-with-options
Enhance WoT skeleton creation with more options
2 parents f982ba5 + 075bd85 commit c9cf391

File tree

17 files changed

+531
-137
lines changed

17 files changed

+531
-137
lines changed

edge/service/src/main/resources/ditto-edge-service.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ ditto {
3333

3434
ask-with-retry {
3535
# maximum duration to wait for answers from entity shard regions
36-
ask-timeout = 3s
36+
ask-timeout = 5s
3737
ask-timeout = ${?CONCIERGE_CACHES_ASK_TIMEOUT}
3838

3939
# one of: OFF, NO_DELAY, FIXED_DELAY, BACKOFF_DELAY

internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/Cache.java

+12
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ public interface Cache<K, V> {
3636
*/
3737
CompletableFuture<Optional<V>> get(K key);
3838

39+
/**
40+
* Returns a {@link CompletableFuture} returning the value which is associated with the specified key, specifying
41+
* an {@code errorHandler}.
42+
*
43+
* @param key the key to get the associated value for.
44+
* @param errorHandler function to invoke when a {@code Throwable} is encountered by the cache loader.
45+
* @return a {@link CompletableFuture} returning the value which is associated with the specified key or an empty
46+
* {@link Optional}.
47+
* @throws NullPointerException if {@code key} is {@code null}.
48+
*/
49+
CompletableFuture<Optional<V>> get(K key, Function<Throwable, Optional<V>> errorHandler);
50+
3951
/**
4052
* Retrieve the value associated with a key in a future if it exists in the cache, or a future empty optional if
4153
* it does not. The cache loader will never be called.

internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/CaffeineCache.java

+16
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
import java.util.Collection;
1818
import java.util.Optional;
1919
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.CompletionException;
2021
import java.util.concurrent.ConcurrentMap;
2122
import java.util.concurrent.Executor;
2223
import java.util.function.BiFunction;
24+
import java.util.function.Function;
2325

2426
import javax.annotation.Nullable;
2527

@@ -165,6 +167,20 @@ public CompletableFuture<Optional<V>> get(final K key) {
165167
return asyncCache.get(key, asyncLoad).thenApply(Optional::ofNullable);
166168
}
167169

170+
@Override
171+
public CompletableFuture<Optional<V>> get(final K key, final Function<Throwable, Optional<V>> errorHandler) {
172+
requireNonNull(key);
173+
174+
return asyncCache.get(key, asyncLoad).thenApply(Optional::ofNullable)
175+
.exceptionally(throwable -> {
176+
if (throwable instanceof CompletionException completionException) {
177+
return errorHandler.apply(completionException.getCause());
178+
} else {
179+
return errorHandler.apply(throwable);
180+
}
181+
});
182+
}
183+
168184
/**
169185
* Lookup a value in cache, or create it via {@code mappingFunction} and store it if the value was not cached.
170186
* Only available for Caffeine caches.

internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/ProjectedCache.java

+5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public CompletableFuture<Optional<U>> get(final K key) {
4848
return cache.get(key).thenApply(projectOptional);
4949
}
5050

51+
@Override
52+
public CompletableFuture<Optional<U>> get(final K key, final Function<Throwable, Optional<U>> errorHandler) {
53+
return cache.get(key, throwable -> errorHandler.apply(throwable).map(embed)).thenApply(projectOptional);
54+
}
55+
5156
@Override
5257
public CompletableFuture<Optional<U>> getIfPresent(final K key) {
5358
return cache.getIfPresent(key).thenApply(projectOptional);

internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java

+55-19
Original file line numberDiff line numberDiff line change
@@ -565,12 +565,20 @@ private record PersistEventAsync<
565565
/**
566566
* Persist an event, modify actor state by the event strategy, then invoke the handler.
567567
*
568-
* @param event the event to persist and apply.
568+
* @param eventStage the event stage to persist and apply.
569569
* @param handler what happens afterwards.
570+
* @param errorHandler the errorHandler to invoke for encountered throwables from the CompletionStage
570571
*/
571-
protected void persistAndApplyEventAsync(final CompletionStage<E> event, final BiConsumer<E, S> handler) {
572-
Patterns.pipe(event.thenApply(e -> new PersistEventAsync<>(e, handler)), getContext().getDispatcher())
573-
.to(getSelf());
572+
protected void persistAndApplyEventAsync(final CompletionStage<E> eventStage, final BiConsumer<E, S> handler,
573+
final Consumer<Throwable> errorHandler) {
574+
Patterns.pipe(eventStage.handle((e, throwable) -> {
575+
if (throwable != null) {
576+
errorHandler.accept(throwable);
577+
return null;
578+
} else {
579+
return new PersistEventAsync<>(e, handler);
580+
}
581+
}), getContext().getDispatcher()).to(getSelf());
574582
}
575583

576584
/**
@@ -736,9 +744,11 @@ public void onMutation(final Command<?> command, final E event, final WithDittoH
736744
}
737745

738746
@Override
739-
public void onStagedMutation(final Command<?> command, final CompletionStage<E> event,
747+
public void onStagedMutation(final Command<?> command,
748+
final CompletionStage<E> event,
740749
final CompletionStage<WithDittoHeaders> response,
741-
final boolean becomeCreated, final boolean becomeDeleted) {
750+
final boolean becomeCreated,
751+
final boolean becomeDeleted) {
742752

743753
final ActorRef sender = getSender();
744754
persistAndApplyEventAsync(event, (persistedEvent, resultingEntity) -> {
@@ -751,6 +761,16 @@ public void onStagedMutation(final Command<?> command, final CompletionStage<E>
751761
if (becomeCreated) {
752762
becomeCreatedHandler();
753763
}
764+
}, throwable -> {
765+
final DittoRuntimeException dittoRuntimeException =
766+
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
767+
DittoInternalErrorException.newBuilder()
768+
.cause(t)
769+
.dittoHeaders(command.getDittoHeaders())
770+
.build());
771+
if (shouldSendResponse(command.getDittoHeaders())) {
772+
notifySender(sender, dittoRuntimeException);
773+
}
754774
});
755775
}
756776

@@ -766,7 +786,13 @@ public void onQuery(final Command<?> command, final WithDittoHeaders response) {
766786
public void onStagedQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
767787
if (command.getDittoHeaders().isResponseRequired()) {
768788
final ActorRef sender = getSender();
769-
response.thenAccept(r -> notifySender(sender, r));
789+
response.whenComplete((r, throwable) -> {
790+
if (throwable instanceof DittoRuntimeException dittoRuntimeException) {
791+
notifySender(sender, dittoRuntimeException);
792+
} else {
793+
notifySender(sender, r);
794+
}
795+
});
770796
}
771797
}
772798

@@ -883,7 +909,13 @@ private void notifySender(final WithDittoHeaders message) {
883909
}
884910

885911
private void notifySender(final ActorRef sender, final CompletionStage<WithDittoHeaders> message) {
886-
message.thenAccept(msg -> notifySender(sender, msg));
912+
message.whenComplete((msg, throwable) -> {
913+
if (throwable instanceof DittoRuntimeException dittoRuntimeException) {
914+
notifySender(sender, dittoRuntimeException);
915+
} else {
916+
notifySender(sender, msg);
917+
}
918+
});
887919
}
888920

889921
private void takeSnapshotByInterval(final Control takeSnapshot) {
@@ -1097,19 +1129,23 @@ public void onQuery(final Command<?> command, final WithDittoHeaders response) {
10971129
@Override
10981130
public void onStagedQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
10991131
if (command.getDittoHeaders().isResponseRequired()) {
1100-
response.thenAccept(r -> {
1101-
final WithDittoHeaders theResponseToSend;
1102-
if (response instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
1103-
final DittoHeaders queryCommandHeaders = r.getDittoHeaders();
1104-
final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder()
1105-
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(),
1106-
historicalDittoHeaders.toJson().toString())
1107-
.build();
1108-
theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders);
1132+
response.whenComplete((r, throwable) -> {
1133+
if (throwable instanceof DittoRuntimeException dittoRuntimeException) {
1134+
notifySender(sender, dittoRuntimeException);
11091135
} else {
1110-
theResponseToSend = r;
1136+
final WithDittoHeaders theResponseToSend;
1137+
if (response instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
1138+
final DittoHeaders queryCommandHeaders = r.getDittoHeaders();
1139+
final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder()
1140+
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(),
1141+
historicalDittoHeaders.toJson().toString())
1142+
.build();
1143+
theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders);
1144+
} else {
1145+
theResponseToSend = r;
1146+
}
1147+
notifySender(sender, theResponseToSend);
11111148
}
1112-
notifySender(sender, theResponseToSend);
11131149
});
11141150
}
11151151
}

internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public abstract class AbstractPersistenceSupervisor<E extends EntityId, S extend
128128
@Nullable protected ActorRef persistenceActorChild;
129129
@Nullable protected ActorRef enforcerChild;
130130

131-
private final Duration localAskTimeout;
131+
protected final Duration localAskTimeout;
132132

133133
private final ExponentialBackOffConfig exponentialBackOffConfig;
134134
private final SignalTransformer signalTransformer;

policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/PolicyEnforcerCache.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ConcurrentHashMap;
2222
import java.util.concurrent.ConcurrentMap;
23-
import java.util.stream.Stream;
23+
import java.util.function.Function;
2424

2525
import org.eclipse.ditto.internal.utils.cache.Cache;
2626
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
@@ -70,6 +70,12 @@ public CompletableFuture<Optional<Entry<PolicyEnforcer>>> get(final PolicyId key
7070
return delegate.get(key);
7171
}
7272

73+
@Override
74+
public CompletableFuture<Optional<Entry<PolicyEnforcer>>> get(final PolicyId key,
75+
final Function<Throwable, Optional<Entry<PolicyEnforcer>>> errorHandler) {
76+
return delegate.get(key, errorHandler);
77+
}
78+
7379
@Override
7480
public CompletableFuture<Optional<Entry<PolicyEnforcer>>> getIfPresent(final PolicyId key) {
7581
return delegate.getIfPresent(key);

policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
import javax.annotation.Nullable;
2121

22+
import org.apache.pekko.actor.ActorRef;
23+
import org.apache.pekko.actor.ActorSystem;
24+
import org.apache.pekko.actor.Props;
25+
import org.apache.pekko.persistence.RecoveryCompleted;
26+
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
27+
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
2228
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
2329
import org.eclipse.ditto.base.model.headers.DittoHeaders;
2430
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
@@ -47,11 +53,6 @@
4753
import org.eclipse.ditto.policies.service.persistence.actors.strategies.commands.PolicyCommandStrategies;
4854
import org.eclipse.ditto.policies.service.persistence.actors.strategies.events.PolicyEventStrategies;
4955

50-
import org.apache.pekko.actor.ActorRef;
51-
import org.apache.pekko.actor.ActorSystem;
52-
import org.apache.pekko.actor.Props;
53-
import org.apache.pekko.persistence.RecoveryCompleted;
54-
5556
/**
5657
* PersistentActor which "knows" the state of a single {@link Policy}.
5758
*/
@@ -279,6 +280,16 @@ public void onStagedMutation(final Command<?> command, final CompletionStage<Pol
279280
if (becomeCreated) {
280281
becomeCreatedHandler();
281282
}
283+
}, throwable -> {
284+
final DittoRuntimeException dittoRuntimeException =
285+
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
286+
DittoInternalErrorException.newBuilder()
287+
.cause(t)
288+
.dittoHeaders(command.getDittoHeaders())
289+
.build());
290+
if (shouldSendResponse(command.getDittoHeaders())) {
291+
notifySender(sender, dittoRuntimeException);
292+
}
282293
});
283294
}
284295

things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java

+20-16
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@
2424

2525
import javax.annotation.Nullable;
2626

27+
import org.apache.pekko.actor.ActorKilledException;
28+
import org.apache.pekko.actor.ActorRef;
29+
import org.apache.pekko.actor.ActorSelection;
30+
import org.apache.pekko.actor.ActorSystem;
31+
import org.apache.pekko.actor.Props;
32+
import org.apache.pekko.japi.pf.FI;
33+
import org.apache.pekko.japi.pf.ReceiveBuilder;
34+
import org.apache.pekko.pattern.AskTimeoutException;
35+
import org.apache.pekko.pattern.Patterns;
36+
import org.apache.pekko.stream.Materializer;
37+
import org.apache.pekko.stream.javadsl.Keep;
38+
import org.apache.pekko.stream.javadsl.Sink;
39+
import org.apache.pekko.stream.javadsl.Source;
2740
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
2841
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
2942
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
@@ -67,19 +80,6 @@
6780
import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated;
6881
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;
6982

70-
import org.apache.pekko.actor.ActorKilledException;
71-
import org.apache.pekko.actor.ActorRef;
72-
import org.apache.pekko.actor.ActorSelection;
73-
import org.apache.pekko.actor.ActorSystem;
74-
import org.apache.pekko.actor.Props;
75-
import org.apache.pekko.japi.pf.FI;
76-
import org.apache.pekko.japi.pf.ReceiveBuilder;
77-
import org.apache.pekko.pattern.AskTimeoutException;
78-
import org.apache.pekko.stream.Materializer;
79-
import org.apache.pekko.stream.javadsl.Keep;
80-
import org.apache.pekko.stream.javadsl.Sink;
81-
import org.apache.pekko.stream.javadsl.Source;
82-
8383
/**
8484
* Supervisor for {@link ThingPersistenceActor} which means it will create, start and watch it as child actor.
8585
* <p>
@@ -244,7 +244,10 @@ public static Props props(final ActorRef pubSubMediator,
244244
@Override
245245
protected CompletionStage<Object> askEnforcerChild(final Signal<?> signal) {
246246

247-
if (signal instanceof ThingCommandResponse<?> thingCommandResponse &&
247+
if (signal instanceof CreateThing createThing && createThing.getThing().getDefinition().isPresent()) {
248+
// for thing creations containing a "definition", retrieving WoT model from URL is involved, give more time:
249+
return Patterns.ask(enforcerChild, signal, localAskTimeout.multipliedBy(3));
250+
} else if (signal instanceof ThingCommandResponse<?> thingCommandResponse &&
248251
CommandResponse.isLiveCommandResponse(thingCommandResponse)) {
249252

250253
return signal.getDittoHeaders().getCorrelationId()
@@ -337,10 +340,11 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
337340
@Override
338341
protected CompletableFuture<Object> handleTargetActorAndEnforcerException(final Signal<?> signal, final Throwable throwable) {
339342
if (RollbackCreatedPolicy.shouldRollbackBasedOnException(signal, throwable)) {
343+
final Throwable cause = throwable.getCause();
340344
log.withCorrelationId(signal)
341-
.info("Target actor exception received: <{}>. " +
345+
.info("Target actor exception received: <{}>, cause: <{}>. " +
342346
"Sending RollbackCreatedPolicy msg to self, potentially rolling back a created policy.",
343-
throwable.getClass().getSimpleName());
347+
throwable.getClass().getSimpleName(), cause != null ? cause.getClass().getSimpleName() : "-");
344348
final CompletableFuture<Object> responseFuture = new CompletableFuture<>();
345349
getSelf().tell(RollbackCreatedPolicy.of(signal, throwable, responseFuture), getSelf());
346350
return responseFuture;

things/service/src/main/resources/things.conf

+24
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,30 @@ ditto {
167167
expire-after-access = ${?THINGS_WOT_THING_MODEL_CACHE_EXPIRE_AFTER_ACCESS}
168168
}
169169

170+
tm-based-creation {
171+
thing {
172+
skeleton-creation-enabled = true
173+
skeleton-creation-enabled = ${?THINGS_WOT_TM_BASED_CREATION_THING_SKELETON_CREATION_ENABLED}
174+
175+
generate-defaults-for-optional-properties = false
176+
generate-defaults-for-optional-properties = ${?THINGS_WOT_TM_BASED_CREATION_THING_GENERATE_DEFAULTS_FOR_OPTIONAL_PROPERTIES}
177+
178+
throw-exception-on-wot-errors = true
179+
throw-exception-on-wot-errors = ${?THINGS_WOT_TM_BASED_CREATION_THING_THROW_EXCEPTION_ON_WOT_ERRORS}
180+
}
181+
182+
feature {
183+
skeleton-creation-enabled = true
184+
skeleton-creation-enabled = ${?THINGS_WOT_TM_BASED_CREATION_FEATURE_SKELETON_CREATION_ENABLED}
185+
186+
generate-defaults-for-optional-properties = false
187+
generate-defaults-for-optional-properties = ${?THINGS_WOT_TM_BASED_CREATION_FEATURE_GENERATE_DEFAULTS_FOR_OPTIONAL_PROPERTIES}
188+
189+
throw-exception-on-wot-errors = true
190+
throw-exception-on-wot-errors = ${?THINGS_WOT_TM_BASED_CREATION_FEATURE_THROW_EXCEPTION_ON_WOT_ERRORS}
191+
}
192+
}
193+
170194
to-thing-description {
171195
base-prefix = "http://localhost:8080"
172196
base-prefix = ${?THINGS_WOT_TO_THING_DESCRIPTION_BASE_PREFIX}

0 commit comments

Comments
 (0)