Skip to content

Commit c30d459

Browse files
authored
Merge pull request eclipse-ditto#1815 from eclipse-ditto/feature/1583-apply-filter-in-historical-events
eclipse-ditto#1583 apply RQL based filtering when streaming "historical" thing events
2 parents 7a507f6 + 7cfe527 commit c30d459

File tree

12 files changed

+291
-30
lines changed

12 files changed

+291
-30
lines changed

Diff for: base/model/src/main/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEvents.java

+128-4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public final class SubscribeForPersistedEvents extends AbstractStreamingSubscrip
6161
@Nullable private final Instant fromHistoricalTimestamp;
6262
@Nullable private final Instant toHistoricalTimestamp;
6363
@Nullable private final String prefix;
64+
@Nullable private final String filter;
6465

6566
private SubscribeForPersistedEvents(final EntityId entityId,
6667
final JsonPointer resourcePath,
@@ -69,6 +70,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
6970
@Nullable final Instant fromHistoricalTimestamp,
7071
@Nullable final Instant toHistoricalTimestamp,
7172
@Nullable final String prefix,
73+
@Nullable final CharSequence filter,
7274
final DittoHeaders dittoHeaders) {
7375

7476
super(TYPE, entityId, resourcePath, dittoHeaders);
@@ -77,6 +79,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
7779
this.fromHistoricalTimestamp = fromHistoricalTimestamp;
7880
this.toHistoricalTimestamp = toHistoricalTimestamp;
7981
this.prefix = prefix;
82+
this.filter = filter != null ? filter.toString() : null;
8083
}
8184

8285
/**
@@ -89,7 +92,9 @@ private SubscribeForPersistedEvents(final EntityId entityId,
8992
* @param dittoHeaders the command headers of the request.
9093
* @return the command.
9194
* @throws NullPointerException if any non-nullable argument is {@code null}.
95+
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, long, long, CharSequence, DittoHeaders)}
9296
*/
97+
@Deprecated
9398
public static SubscribeForPersistedEvents of(final EntityId entityId,
9499
final JsonPointer resourcePath,
95100
final long fromHistoricalRevision,
@@ -103,6 +108,38 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
103108
null,
104109
null,
105110
null,
111+
null,
112+
dittoHeaders);
113+
}
114+
115+
/**
116+
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code long} revisions.
117+
*
118+
* @param entityId the entityId that should be streamed.
119+
* @param resourcePath the resource path for which to stream events.
120+
* @param fromHistoricalRevision the revision to start the streaming from.
121+
* @param toHistoricalRevision the revision to stop the streaming at.
122+
* @param dittoHeaders the command headers of the request.
123+
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
124+
* @return the command.
125+
* @throws NullPointerException if any non-nullable argument is {@code null}.
126+
* @since 3.5.0
127+
*/
128+
public static SubscribeForPersistedEvents of(final EntityId entityId,
129+
final JsonPointer resourcePath,
130+
final long fromHistoricalRevision,
131+
final long toHistoricalRevision,
132+
@Nullable final CharSequence filter,
133+
final DittoHeaders dittoHeaders) {
134+
135+
return new SubscribeForPersistedEvents(entityId,
136+
resourcePath,
137+
fromHistoricalRevision,
138+
toHistoricalRevision,
139+
null,
140+
null,
141+
null,
142+
filter,
106143
dittoHeaders);
107144
}
108145

@@ -116,7 +153,9 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
116153
* @param dittoHeaders the command headers of the request.
117154
* @return the command.
118155
* @throws NullPointerException if any non-nullable argument is {@code null}.
156+
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Instant, Instant, CharSequence, DittoHeaders)}
119157
*/
158+
@Deprecated
120159
public static SubscribeForPersistedEvents of(final EntityId entityId,
121160
final JsonPointer resourcePath,
122161
@Nullable final Instant fromHistoricalTimestamp,
@@ -130,6 +169,72 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
130169
fromHistoricalTimestamp,
131170
toHistoricalTimestamp,
132171
null,
172+
null,
173+
dittoHeaders);
174+
}
175+
176+
/**
177+
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
178+
*
179+
* @param entityId the entityId that should be streamed.
180+
* @param resourcePath the resource path for which to stream events.
181+
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
182+
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
183+
* @param dittoHeaders the command headers of the request.
184+
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
185+
* @return the command.
186+
* @throws NullPointerException if any non-nullable argument is {@code null}.
187+
* @since 3.5.0
188+
*/
189+
public static SubscribeForPersistedEvents of(final EntityId entityId,
190+
final JsonPointer resourcePath,
191+
@Nullable final Instant fromHistoricalTimestamp,
192+
@Nullable final Instant toHistoricalTimestamp,
193+
@Nullable final CharSequence filter,
194+
final DittoHeaders dittoHeaders) {
195+
196+
return new SubscribeForPersistedEvents(entityId,
197+
resourcePath,
198+
0L,
199+
Long.MAX_VALUE,
200+
fromHistoricalTimestamp,
201+
toHistoricalTimestamp,
202+
null,
203+
filter,
204+
dittoHeaders);
205+
}
206+
207+
/**
208+
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
209+
*
210+
* @param entityId the entityId that should be streamed.
211+
* @param resourcePath the resource path for which to stream events.
212+
* @param fromHistoricalRevision the revision to start the streaming from.
213+
* @param toHistoricalRevision the revision to stop the streaming at.
214+
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
215+
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
216+
* @param dittoHeaders the command headers of the request.
217+
* @return the command.
218+
* @throws NullPointerException if any non-nullable argument is {@code null}.
219+
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Long, Long, Instant, Instant, CharSequence, DittoHeaders)}
220+
*/
221+
@Deprecated
222+
public static SubscribeForPersistedEvents of(final EntityId entityId,
223+
final JsonPointer resourcePath,
224+
@Nullable final Long fromHistoricalRevision,
225+
@Nullable final Long toHistoricalRevision,
226+
@Nullable final Instant fromHistoricalTimestamp,
227+
@Nullable final Instant toHistoricalTimestamp,
228+
final DittoHeaders dittoHeaders) {
229+
230+
return new SubscribeForPersistedEvents(entityId,
231+
resourcePath,
232+
null != fromHistoricalRevision ? fromHistoricalRevision : 0L,
233+
null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE,
234+
fromHistoricalTimestamp,
235+
toHistoricalTimestamp,
236+
null,
237+
null,
133238
dittoHeaders);
134239
}
135240

@@ -142,16 +247,19 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
142247
* @param toHistoricalRevision the revision to stop the streaming at.
143248
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
144249
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
250+
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
145251
* @param dittoHeaders the command headers of the request.
146252
* @return the command.
147253
* @throws NullPointerException if any non-nullable argument is {@code null}.
254+
* @since 3.5.0
148255
*/
149256
public static SubscribeForPersistedEvents of(final EntityId entityId,
150257
final JsonPointer resourcePath,
151258
@Nullable final Long fromHistoricalRevision,
152259
@Nullable final Long toHistoricalRevision,
153260
@Nullable final Instant fromHistoricalTimestamp,
154261
@Nullable final Instant toHistoricalTimestamp,
262+
@Nullable final CharSequence filter,
155263
final DittoHeaders dittoHeaders) {
156264

157265
return new SubscribeForPersistedEvents(entityId,
@@ -161,6 +269,7 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
161269
fromHistoricalTimestamp,
162270
toHistoricalTimestamp,
163271
null,
272+
filter,
164273
dittoHeaders);
165274
}
166275

@@ -182,6 +291,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
182291
jsonObject.getValue(JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
183292
jsonObject.getValue(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
184293
jsonObject.getValue(JsonFields.PREFIX).orElse(null),
294+
jsonObject.getValue(JsonFields.FILTER).orElse(null),
185295
dittoHeaders
186296
);
187297
}
@@ -195,7 +305,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
195305
*/
196306
public SubscribeForPersistedEvents setPrefix(@Nullable final String prefix) {
197307
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
198-
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, getDittoHeaders());
308+
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, getDittoHeaders());
199309
}
200310

201311
/**
@@ -244,6 +354,14 @@ public Optional<String> getPrefix() {
244354
return Optional.ofNullable(prefix);
245355
}
246356

357+
/**
358+
* @return the optional RQL filter to apply for persisted events before publishing to the stream
359+
* @since 3.5.0
360+
*/
361+
public Optional<String> getFilter() {
362+
return Optional.ofNullable(filter);
363+
}
364+
247365
@Override
248366
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
249367
final JsonSchemaVersion schemaVersion,
@@ -263,6 +381,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
263381
jsonObjectBuilder.set(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, toHistoricalTimestamp.toString(), predicate);
264382
}
265383
getPrefix().ifPresent(thePrefix -> jsonObjectBuilder.set(JsonFields.PREFIX, thePrefix));
384+
getFilter().ifPresent(theFilter -> jsonObjectBuilder.set(JsonFields.FILTER, theFilter));
266385
}
267386

268387
@Override
@@ -273,13 +392,13 @@ public String getTypePrefix() {
273392
@Override
274393
public SubscribeForPersistedEvents setDittoHeaders(final DittoHeaders dittoHeaders) {
275394
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
276-
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, dittoHeaders);
395+
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, dittoHeaders);
277396
}
278397

279398
@Override
280399
public int hashCode() {
281400
return Objects.hash(super.hashCode(), entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
282-
fromHistoricalTimestamp, toHistoricalTimestamp, prefix);
401+
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter);
283402
}
284403

285404
@Override
@@ -297,7 +416,8 @@ public boolean equals(@Nullable final Object obj) {
297416
toHistoricalRevision == that.toHistoricalRevision &&
298417
Objects.equals(fromHistoricalTimestamp, that.fromHistoricalTimestamp) &&
299418
Objects.equals(toHistoricalTimestamp, that.toHistoricalTimestamp) &&
300-
Objects.equals(prefix, that.prefix);
419+
Objects.equals(prefix, that.prefix) &&
420+
Objects.equals(filter, that.filter);
301421
}
302422

303423
@Override
@@ -313,6 +433,7 @@ public String toString() {
313433
+ ", fromHistoricalTimestamp=" + fromHistoricalTimestamp
314434
+ ", toHistoricalTimestamp=" + toHistoricalTimestamp
315435
+ ", prefix=" + prefix
436+
+ ", filter=" + filter
316437
+ "]";
317438
}
318439

@@ -339,6 +460,9 @@ private JsonFields() {
339460

340461
static final JsonFieldDefinition<String> PREFIX =
341462
JsonFactory.newStringFieldDefinition("prefix", REGULAR, V_2);
463+
464+
public static final JsonFieldDefinition<String> FILTER =
465+
JsonFactory.newStringFieldDefinition("filter", REGULAR, V_2);
342466
}
343467

344468
}

Diff for: base/model/src/test/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEventsTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public final class SubscribeForPersistedEventsTest {
4343
private static final long KNOWN_TO_REV = 42L;
4444
private static final String KNOWN_FROM_TS = "2022-10-25T14:00:00Z";
4545
private static final String KNOWN_TO_TS = "2022-10-25T15:00:00Z";
46+
private static final String KNOWN_FILTER = "exists(thingId)";
4647

4748
private static final String JSON_ALL_FIELDS = JsonFactory.newObjectBuilder()
4849
.set(Command.JsonFields.TYPE, SubscribeForPersistedEvents.TYPE)
@@ -53,6 +54,7 @@ public final class SubscribeForPersistedEventsTest {
5354
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
5455
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP, KNOWN_FROM_TS)
5556
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, KNOWN_TO_TS)
57+
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
5658
.build()
5759
.toString();
5860

@@ -63,6 +65,7 @@ public final class SubscribeForPersistedEventsTest {
6365
.set(StreamingSubscriptionCommand.JsonFields.JSON_RESOURCE_PATH, KNOWN_RESOURCE_PATH)
6466
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_REVISION, KNOWN_FROM_REV)
6567
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
68+
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
6669
.build().toString();
6770

6871
@Test
@@ -88,6 +91,7 @@ public void toJsonWithAllFieldsSet() {
8891
KNOWN_TO_REV,
8992
Instant.parse(KNOWN_FROM_TS),
9093
Instant.parse(KNOWN_TO_TS),
94+
KNOWN_FILTER,
9195
DittoHeaders.empty()
9296
);
9397

@@ -102,6 +106,7 @@ public void toJsonWithOnlyRequiredFieldsSet() {
102106
JsonPointer.of(KNOWN_RESOURCE_PATH),
103107
KNOWN_FROM_REV,
104108
KNOWN_TO_REV,
109+
KNOWN_FILTER,
105110
DittoHeaders.empty());
106111
final String json = command.toJsonString();
107112
assertThat(json).isEqualTo(JSON_MINIMAL);
@@ -116,6 +121,7 @@ public void fromJsonWithAllFieldsSet() {
116121
KNOWN_TO_REV,
117122
Instant.parse(KNOWN_FROM_TS),
118123
Instant.parse(KNOWN_TO_TS),
124+
KNOWN_FILTER,
119125
DittoHeaders.empty()
120126
);
121127
assertThat(SubscribeForPersistedEvents.fromJson(JsonObject.of(JSON_ALL_FIELDS), DittoHeaders.empty()))
@@ -130,6 +136,7 @@ public void fromJsonWithOnlyRequiredFieldsSet() {
130136
JsonPointer.of(KNOWN_RESOURCE_PATH),
131137
KNOWN_FROM_REV,
132138
KNOWN_TO_REV,
139+
KNOWN_FILTER,
133140
DittoHeaders.empty()));
134141
}
135142

Diff for: connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java

+17-11
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,22 @@
2525
import javax.annotation.Nullable;
2626
import javax.jms.JMSRuntimeException;
2727

28+
import org.apache.pekko.actor.ActorKilledException;
29+
import org.apache.pekko.actor.ActorRef;
30+
import org.apache.pekko.actor.OneForOneStrategy;
31+
import org.apache.pekko.actor.Props;
32+
import org.apache.pekko.actor.ReceiveTimeout;
33+
import org.apache.pekko.actor.SupervisorStrategy;
34+
import org.apache.pekko.japi.pf.DeciderBuilder;
35+
import org.apache.pekko.japi.pf.FI;
36+
import org.apache.pekko.japi.pf.ReceiveBuilder;
37+
import org.apache.pekko.pattern.Patterns;
2838
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
2939
import org.eclipse.ditto.base.model.headers.DittoHeaders;
3040
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
3141
import org.eclipse.ditto.base.model.signals.Signal;
42+
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
43+
import org.eclipse.ditto.base.model.signals.events.Event;
3244
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
3345
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
3446
import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig;
@@ -48,17 +60,6 @@
4860
import com.typesafe.config.Config;
4961
import com.typesafe.config.ConfigFactory;
5062

51-
import org.apache.pekko.actor.ActorKilledException;
52-
import org.apache.pekko.actor.ActorRef;
53-
import org.apache.pekko.actor.OneForOneStrategy;
54-
import org.apache.pekko.actor.Props;
55-
import org.apache.pekko.actor.ReceiveTimeout;
56-
import org.apache.pekko.actor.SupervisorStrategy;
57-
import org.apache.pekko.japi.pf.DeciderBuilder;
58-
import org.apache.pekko.japi.pf.FI;
59-
import org.apache.pekko.japi.pf.ReceiveBuilder;
60-
import org.apache.pekko.pattern.Patterns;
61-
6263
/**
6364
* Supervisor for {@link ConnectionPersistenceActor} which means it will create, start and watch it as child actor.
6465
* <p>
@@ -158,6 +159,11 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
158159
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
159160
}
160161

162+
@Override
163+
protected boolean applyPersistedEventFilter(final Event<?> event, final SubscribeForPersistedEvents subscribe) {
164+
return true;
165+
}
166+
161167
@Override
162168
protected boolean shouldBecomeTwinSignalProcessingAwaiting(final Signal<?> signal) {
163169
return super.shouldBecomeTwinSignalProcessingAwaiting(signal) &&

Diff for: documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json

+4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
"type": "string",
2222
"format": "date-time",
2323
"description": "The timestamp to stop the streaming at."
24+
},
25+
"filter": {
26+
"type": "string",
27+
"description": "An RQL expression defining which events to filter for in the stream. Only supported for thing events."
2428
}
2529
}
2630
}

0 commit comments

Comments
 (0)