Skip to content

Commit 02dc01e

Browse files
authored
feat: add Subscription trait, Transient and Persisted variants (#101)
* feat: add Subscription trait and Transient subscription * refactor(inmemory): Projector uses a Subscription impl * feat(postgres): add Persistent subscription implementation * fix: clippy complaints
1 parent d241588 commit 02dc01e

File tree

11 files changed

+537
-108
lines changed

11 files changed

+537
-108
lines changed

eventually-core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ default = []
1313
full = ["serde"]
1414

1515
[dependencies]
16+
anyhow = "1.0"
1617
futures = { version = "0.3", features = ["async-await"] }
1718
serde = { version = "1.0", features = ["derive"], optional = true }
1819
thiserror = "1.0"

eventually-core/src/subscription.rs

+188-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
//!
44
//! [`EventStore`]: ../store/trait.EventStore.html
55
6-
use futures::future::BoxFuture;
7-
use futures::stream::BoxStream;
6+
use std::error::Error as StdError;
7+
use std::sync::atomic::{AtomicU32, Ordering};
8+
use std::sync::Arc;
89

9-
use crate::store::Persisted;
10+
use futures::future::{ok, BoxFuture, FutureExt};
11+
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
12+
13+
use crate::store::{EventStore, Persisted, Select};
1014

1115
/// Stream of events returned by the [`EventSubscriber::subscribe_all`] method.
1216
///
@@ -63,3 +67,184 @@ pub trait EventSubscriber {
6367
/// [`EventStream`]: type.EventStream.html
6468
fn subscribe_all(&self) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
6569
}
70+
71+
/// Stream of events returned by the [`Subscription::resume`] method.
72+
///
73+
/// [`Subscription::resume`]: trait.Subscription.html#method.resume
74+
pub type SubscriptionStream<'a, S> = BoxStream<
75+
'a,
76+
Result<
77+
Persisted<<S as Subscription>::SourceId, <S as Subscription>::Event>,
78+
<S as Subscription>::Error,
79+
>,
80+
>;
81+
82+
/// A Subscription to an [`EventStream`] which can be "checkpointed":
83+
/// keeps a record of the latest message processed by itself using [`checkpoint`],
84+
/// and can resume working from such message by using the [`resume`].
85+
///
86+
/// [`resume`]: trait.Subscription.html#method.resume
87+
/// [`checkpoint`]: trait.Subscription.html#method.checkpoint
88+
pub trait Subscription {
89+
/// Type of the Source id, typically an [`AggregateId`].
90+
///
91+
/// [`AggregateId`]: ../aggregate/type.AggregateId.html
92+
type SourceId: Eq;
93+
94+
/// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
95+
///
96+
/// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
97+
/// [`EventStore`]: ../store/trait.EventStore.html
98+
type Event;
99+
100+
/// Possible errors returned when receiving events from the notification channel.
101+
type Error;
102+
103+
/// Resumes the current state of a `Subscription` by returning the [`EventStream`],
104+
/// starting from the last event processed by the `Subscription`.
105+
///
106+
/// [`EventStream`]: trait.EventStream.html
107+
fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>>;
108+
109+
/// Saves the provided version (or sequence number) as the latest
110+
/// version processed.
111+
fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>>;
112+
}
113+
114+
/// Error type returned by a [`Transient`] Subscription.
115+
///
116+
/// [`Transient`]: struct.Transient.html
117+
#[derive(Debug, thiserror::Error)]
118+
pub enum Error {
119+
/// Error caused by the Subscription's [`EventStore`].
120+
///
121+
/// [`EventStore`]: ../store/trait.EventStore.html
122+
#[error("error received while listening to the event stream from the store: {0}")]
123+
Store(#[source] anyhow::Error),
124+
125+
/// Error caused by the Subscription's [`EventSubscriber`].
126+
///
127+
/// [`EventSubscriber`]: trait.EventSubscriber.html
128+
#[error("error received while listening to the event stream subscription: {0}")]
129+
Subscription(#[source] anyhow::Error),
130+
}
131+
132+
/// [`Subscription`] type which gets deleted once the process using it
133+
/// gets terminated.
134+
///
135+
/// Useful for in-memory or one-off [`Projection`]s.
136+
///
137+
/// [`Subscription`]: trait.Subscription.html
138+
/// [`Projection`]: ../projection/trait.Projection.html
139+
pub struct Transient<Store, Subscriber> {
140+
store: Store,
141+
subscriber: Subscriber,
142+
last_sequence_number: Arc<AtomicU32>,
143+
}
144+
145+
impl<Store, Subscriber> Transient<Store, Subscriber> {
146+
/// Creates a new [`Subscription`] using the specified [`EventStore`]
147+
/// and [`EventSubscriber`] to create the [`SubscriptionStream`] from.
148+
///
149+
/// [`Subscription`]: trait.Subscription.html
150+
/// [`EventStore`]: ../store/trait.EventStore.html
151+
/// [`EventSubscriber`]: trait.EventSubscriber.html
152+
/// [`SubscriptionStream`]: type.SubscriptionStream.html
153+
pub fn new(store: Store, subscriber: Subscriber) -> Self {
154+
Self {
155+
store,
156+
subscriber,
157+
last_sequence_number: Default::default(),
158+
}
159+
}
160+
161+
/// Specifies the sequence number of the `Event` the [`SubscriptionStream`]
162+
/// should start from when calling [`run`].
163+
///
164+
/// [`SubscriptionStream`]: type.SubscriptionStream.html
165+
/// [`run`]: struct.Transient.html#method.run
166+
pub fn from(self, sequence_number: u32) -> Self {
167+
self.last_sequence_number
168+
.store(sequence_number, Ordering::Relaxed);
169+
170+
self
171+
}
172+
}
173+
174+
impl<Store, Subscriber> Subscription for Transient<Store, Subscriber>
175+
where
176+
Store: EventStore + Send + Sync,
177+
Subscriber: EventSubscriber<
178+
SourceId = <Store as EventStore>::SourceId,
179+
Event = <Store as EventStore>::Event,
180+
> + Send
181+
+ Sync,
182+
<Store as EventStore>::SourceId: Send + Sync,
183+
<Store as EventStore>::Event: Send + Sync,
184+
<Store as EventStore>::Error: StdError + Send + Sync + 'static,
185+
<Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
186+
{
187+
type SourceId = Store::SourceId;
188+
type Event = Store::Event;
189+
type Error = Error;
190+
191+
fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>> {
192+
Box::pin(async move {
193+
// Create the Subscription first, so that once the future has been resolved
194+
// we'll start receiving events right away.
195+
//
196+
// This is to avoid losing events when waiting for the one-off stream
197+
// to resolve its future.
198+
//
199+
// The impact is that we _might_ get duplicated events from the one-off stream
200+
// and the subscription stream. Luckily, we can discard those by
201+
// keeping an internal state of the last processed sequence number,
202+
// and discard all those events that are found.
203+
let subscription = self
204+
.subscriber
205+
.subscribe_all()
206+
.await
207+
.map_err(anyhow::Error::from)
208+
.map_err(Error::Store)?;
209+
210+
let one_off_stream = self
211+
.store
212+
.stream_all(Select::From(
213+
self.last_sequence_number.load(Ordering::Relaxed),
214+
))
215+
.await
216+
.map_err(anyhow::Error::from)
217+
.map_err(Error::Subscription)?;
218+
219+
let stream = one_off_stream
220+
.map_err(anyhow::Error::from)
221+
.map_err(Error::Store)
222+
.chain(
223+
subscription
224+
.map_err(anyhow::Error::from)
225+
.map_err(Error::Subscription),
226+
)
227+
.try_filter_map(move |event| async move {
228+
let expected_sequence_number =
229+
self.last_sequence_number.load(Ordering::Relaxed);
230+
231+
let event_sequence_number = event.sequence_number();
232+
233+
if event_sequence_number < expected_sequence_number {
234+
return Ok(None); // Duplicated event detected, let's skip it.
235+
}
236+
237+
Ok(Some(event))
238+
})
239+
.boxed();
240+
241+
Ok(stream)
242+
})
243+
}
244+
245+
fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>> {
246+
// Checkpointing happens in memory on the atomic sequence number checkpoint.
247+
self.last_sequence_number.store(version, Ordering::Relaxed);
248+
ok(()).boxed()
249+
}
250+
}

eventually-postgres/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ thiserror = "1.0"
2222
refinery = { version = "0.3.0", features = ["tokio-postgres"] }
2323
anyhow = "1.0.32"
2424
tokio = { version = "0.2", features = ["sync"] }
25+
tracing = "0.1"
2526

2627
[dev-dependencies]
2728
testcontainers = "0.9"

eventually-postgres/src/lib.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,21 @@
5353
//! [`eventually`]: https://docs.rs/eventually
5454
//! [`EventStore`]: struct.EventStore.html
5555
56-
mod store;
57-
mod subscriber;
56+
pub mod store;
57+
pub mod subscriber;
58+
pub mod subscription;
5859

5960
pub use store::*;
6061
pub use subscriber::*;
62+
pub use subscription::*;
63+
64+
use tokio_postgres::types::ToSql;
65+
66+
/// Adapter type for parameters compatible with `tokio_postgres::Client` methods.
67+
pub(crate) type Params<'a> = &'a [&'a (dyn ToSql + Sync)];
68+
69+
#[inline]
70+
#[allow(trivial_casts)]
71+
pub(crate) fn slice_iter<'a>(s: Params<'a>) -> impl ExactSizeIterator<Item = &'a dyn ToSql> + 'a {
72+
s.iter().map(|s| *s as _)
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE TABLE subscriptions (
2+
name TEXT PRIMARY KEY,
3+
aggregate_type_id TEXT NOT NULL,
4+
last_sequence_number BIGINT NOT NULL DEFAULT -1,
5+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
6+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
7+
8+
-- Remove all subscriptions in case the aggregate type is deleted.
9+
FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_types(id) ON DELETE CASCADE
10+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CREATE OR REPLACE FUNCTION get_or_create_subscription(
2+
name TEXT,
3+
aggregate_type_id TEXT
4+
)
5+
RETURNS subscriptions
6+
AS $$
7+
8+
INSERT INTO subscriptions (name, aggregate_type_id)
9+
VALUES (name, aggregate_type_id)
10+
ON CONFLICT (name)
11+
DO UPDATE SET name=EXCLUDED.name -- Perform update to force row returning.
12+
RETURNING *;
13+
14+
$$ LANGUAGE SQL;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
CREATE OR REPLACE FUNCTION checkpoint_subscription(
2+
subscription_name TEXT,
3+
aggregate_type TEXT,
4+
sequence_number BIGINT
5+
)
6+
RETURNS VOID
7+
AS $$
8+
DECLARE
9+
old_sequence_number BIGINT;
10+
BEGIN
11+
12+
SELECT s.last_sequence_number INTO old_sequence_number
13+
FROM subscriptions s
14+
WHERE s."name" = subscription_name AND s.aggregate_type_id = aggregate_type;
15+
16+
IF old_sequence_number >= sequence_number THEN
17+
RAISE EXCEPTION 'invalid sequence number provided: %, should be greater than %', sequence_number, old_sequence_number;
18+
END IF;
19+
20+
UPDATE subscriptions s
21+
SET last_sequence_number = sequence_number, updated_at = NOW()
22+
WHERE s."name" = subscription_name AND s.aggregate_type_id = aggregate_type;
23+
24+
END;
25+
$$ LANGUAGE PLPGSQL;

eventually-postgres/src/store.rs

+3-10
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ use futures::stream::{StreamExt, TryStreamExt};
1010

1111
use serde::{Deserialize, Serialize};
1212

13-
use tokio_postgres::types::ToSql;
1413
use tokio_postgres::Client;
1514

15+
use crate::{slice_iter, Params};
16+
1617
/// Embedded migrations module.
1718
mod embedded {
1819
use refinery::embed_migrations;
@@ -171,8 +172,8 @@ impl EventStoreBuilderMigrated {
171172
/// [`EventStoreBuilder`]: ../../eventually_core/store/trait.EventStoreBuilder.html
172173
#[derive(Debug, Clone)]
173174
pub struct EventStore<Id, Event> {
175+
pub(crate) type_name: &'static str,
174176
client: Arc<Client>,
175-
type_name: &'static str,
176177
id: std::marker::PhantomData<Id>,
177178
payload: std::marker::PhantomData<Event>,
178179
}
@@ -322,11 +323,3 @@ where
322323
.boxed())
323324
}
324325
}
325-
326-
type Params<'a> = &'a [&'a (dyn ToSql + Sync)];
327-
328-
#[inline]
329-
#[allow(trivial_casts)]
330-
fn slice_iter<'a>(s: Params<'a>) -> impl ExactSizeIterator<Item = &'a dyn ToSql> + 'a {
331-
s.iter().map(|s| *s as _)
332-
}

0 commit comments

Comments
 (0)