27
27
28
28
import javax .annotation .Nullable ;
29
29
30
+ import org .apache .pekko .NotUsed ;
31
+ import org .apache .pekko .actor .ActorRef ;
32
+ import org .apache .pekko .actor .Props ;
33
+ import org .apache .pekko .cluster .pubsub .DistributedPubSubMediator ;
34
+ import org .apache .pekko .japi .pf .PFBuilder ;
35
+ import org .apache .pekko .japi .pf .ReceiveBuilder ;
36
+ import org .apache .pekko .pattern .Patterns ;
37
+ import org .apache .pekko .stream .Materializer ;
38
+ import org .apache .pekko .stream .SourceRef ;
39
+ import org .apache .pekko .stream .javadsl .Sink ;
40
+ import org .apache .pekko .stream .javadsl .Source ;
30
41
import org .eclipse .ditto .base .model .entity .id .WithEntityId ;
31
42
import org .eclipse .ditto .base .model .exceptions .DittoInternalErrorException ;
32
43
import org .eclipse .ditto .base .model .exceptions .DittoRuntimeException ;
44
+ import org .eclipse .ditto .base .model .headers .DittoHeaderDefinition ;
33
45
import org .eclipse .ditto .base .model .headers .DittoHeaders ;
34
- import org .eclipse .ditto .base .model .headers .DittoHeadersSettable ;
35
46
import org .eclipse .ditto .base .model .json .Jsonifiable ;
36
47
import org .eclipse .ditto .base .model .signals .commands .Command ;
37
48
import org .eclipse .ditto .base .model .signals .commands .CommandResponse ;
38
49
import org .eclipse .ditto .base .model .signals .commands .WithEntity ;
50
+ import org .eclipse .ditto .internal .utils .cluster .DistPubSubAccess ;
39
51
import org .eclipse .ditto .internal .utils .pekko .actors .AbstractActorWithShutdownBehaviorAndRequestCounting ;
40
52
import org .eclipse .ditto .internal .utils .pekko .logging .DittoDiagnosticLoggingAdapter ;
41
53
import org .eclipse .ditto .internal .utils .pekko .logging .DittoLoggerFactory ;
42
- import org .eclipse .ditto .internal .utils .cluster .DistPubSubAccess ;
43
54
import org .eclipse .ditto .internal .utils .tracing .DittoTracing ;
44
55
import org .eclipse .ditto .internal .utils .tracing .span .SpanOperationName ;
45
56
import org .eclipse .ditto .internal .utils .tracing .span .StartedSpan ;
53
64
import org .eclipse .ditto .things .model .signals .commands .query .RetrieveThings ;
54
65
import org .eclipse .ditto .things .model .signals .commands .query .RetrieveThingsResponse ;
55
66
56
- import org .apache .pekko .NotUsed ;
57
- import org .apache .pekko .actor .ActorRef ;
58
- import org .apache .pekko .actor .Props ;
59
- import org .apache .pekko .cluster .pubsub .DistributedPubSubMediator ;
60
- import org .apache .pekko .japi .pf .PFBuilder ;
61
- import org .apache .pekko .japi .pf .ReceiveBuilder ;
62
- import org .apache .pekko .pattern .Patterns ;
63
- import org .apache .pekko .stream .Materializer ;
64
- import org .apache .pekko .stream .SourceRef ;
65
- import org .apache .pekko .stream .javadsl .Sink ;
66
- import org .apache .pekko .stream .javadsl .Source ;
67
-
68
67
/**
69
68
* Acts as a client for {@code ThingsAggregatorActor} which responds
70
69
* to a {@link RetrieveThings} command via a {@link SourceRef} which is a pointer in the cluster emitting the retrieved
@@ -108,8 +107,8 @@ public static Props props(final ActorRef pubSubMediator) {
108
107
@ Override
109
108
public Receive handleMessage () {
110
109
return ReceiveBuilder .create ()
111
- .match (RetrieveThings .class , rt -> handleRetrieveThings ( rt , rt ) )
112
- .match (SudoRetrieveThings .class , srt -> handleSudoRetrieveThings ( srt , srt ) )
110
+ .match (RetrieveThings .class , this :: handleRetrieveThings )
111
+ .match (SudoRetrieveThings .class , this :: handleSudoRetrieveThings )
113
112
.matchAny (m -> {
114
113
log .warning ("Got unknown message: {}" , m );
115
114
unhandled (m );
@@ -122,52 +121,47 @@ public void serviceUnbind(final Control serviceUnbind) {
122
121
// nothing to do
123
122
}
124
123
125
- private void handleRetrieveThings (final RetrieveThings rt , final Object msgToAsk ) {
124
+ private void handleRetrieveThings (final RetrieveThings rt ) {
126
125
final List <ThingId > thingIds = rt .getEntityIds ();
127
126
log .withCorrelationId (rt )
128
127
.info ("Got '{}' message. Retrieving requested '{}' Things.." ,
129
128
RetrieveThings .class .getSimpleName (), thingIds .size ());
130
-
131
- final ActorRef sender = getSender ();
132
- askTargetActor (rt , thingIds , msgToAsk , sender );
129
+ askTargetActor (rt , thingIds , getSender ());
133
130
}
134
131
135
- private void handleSudoRetrieveThings (final SudoRetrieveThings srt , final Object msgToAsk ) {
132
+ private void handleSudoRetrieveThings (final SudoRetrieveThings srt ) {
136
133
final List <ThingId > thingIds = srt .getThingIds ();
137
134
log .withCorrelationId (srt )
138
135
.info ("Got '{}' message. Retrieving requested '{}' Things.." ,
139
136
SudoRetrieveThings .class .getSimpleName (), thingIds .size ());
140
-
141
- final ActorRef sender = getSender ();
142
- askTargetActor (srt , thingIds , msgToAsk , sender );
137
+ askTargetActor (srt , thingIds , getSender ());
143
138
}
144
139
145
- private void askTargetActor (final Command <?> command , final List <ThingId > thingIds ,
146
- final Object msgToAsk , final ActorRef sender ) {
147
-
148
- final Object tracedMsgToAsk ;
140
+ private void askTargetActor (final Command <?> command , final List <ThingId > thingIds , final ActorRef sender )
141
+ {
142
+ final DittoHeaders dittoHeaders = command .getDittoHeaders ();
149
143
final var startedSpan = DittoTracing .newPreparedSpan (
150
- command . getDittoHeaders () ,
144
+ dittoHeaders ,
151
145
TRACE_AGGREGATOR_RETRIEVE_THINGS
152
146
)
153
147
.tag ("size" , Integer .toString (thingIds .size ()))
154
148
.start ();
155
- if ( msgToAsk instanceof DittoHeadersSettable <?> dittoHeadersSettable ) {
156
- tracedMsgToAsk = dittoHeadersSettable . setDittoHeaders (
157
- DittoHeaders . of ( startedSpan . propagateContext ( dittoHeadersSettable . getDittoHeaders ()) )
158
- );
159
- } else {
160
- tracedMsgToAsk = msgToAsk ;
161
- }
149
+ final Command <?> tracedCommand = command . setDittoHeaders (
150
+ DittoHeaders . of ( startedSpan . propagateContext (
151
+ dittoHeaders . toBuilder ( )
152
+ . removeHeader ( DittoHeaderDefinition . W3C_TRACEPARENT . getKey ())
153
+ . build ()
154
+ ))
155
+ );
162
156
163
157
final DistributedPubSubMediator .Publish pubSubMsg =
164
- DistPubSubAccess .publishViaGroup (command .getType (), tracedMsgToAsk );
158
+ DistPubSubAccess .publishViaGroup (tracedCommand .getType (), tracedCommand );
165
159
166
160
withRequestCounting (
167
161
Patterns .ask (pubSubMediator , pubSubMsg , Duration .ofSeconds (ASK_TIMEOUT ))
168
162
.thenAccept (response -> {
169
163
if (response instanceof SourceRef ) {
170
- handleSourceRef ((SourceRef <?>) response , thingIds , command , sender , startedSpan );
164
+ handleSourceRef ((SourceRef <?>) response , thingIds , tracedCommand , sender , startedSpan );
171
165
} else if (response instanceof DittoRuntimeException dre ) {
172
166
startedSpan .tagAsFailed (dre ).finish ();
173
167
sender .tell (response , getSelf ());
@@ -177,7 +171,7 @@ private void askTargetActor(final Command<?> command, final List<ThingId> thingI
177
171
response .getClass ().getSimpleName (), response );
178
172
final DittoInternalErrorException responseEx =
179
173
DittoInternalErrorException .newBuilder ()
180
- .dittoHeaders (command .getDittoHeaders ())
174
+ .dittoHeaders (tracedCommand .getDittoHeaders ())
181
175
.build ();
182
176
startedSpan .tagAsFailed (responseEx ).finish ();
183
177
sender .tell (responseEx , getSelf ());
@@ -187,7 +181,8 @@ private void askTargetActor(final Command<?> command, final List<ThingId> thingI
187
181
}
188
182
189
183
private void handleSourceRef (final SourceRef <?> sourceRef , final List <ThingId > thingIds ,
190
- final Command <?> originatingCommand , final ActorRef originatingSender , final StartedSpan startedSpan ) {
184
+ final Command <?> originatingCommand , final ActorRef originatingSender , final StartedSpan startedSpan )
185
+ {
191
186
final Function <Jsonifiable <?>, PlainJson > thingPlainJsonSupplier ;
192
187
final Function <List <PlainJson >, CommandResponse <?>> overallResponseSupplier ;
193
188
final UnaryOperator <List <PlainJson >> plainJsonSorter = supplyPlainJsonSorter (thingIds );
0 commit comments