Skip to content

Commit d241588

Browse files
authored
eventually-postgres: add EventSubscriber using LISTEN feature (#96)
* feat(postgres): add EventSubscriber implementation using LISTEN * feat: remove unnecessary Arc requirements for thread-safe objects * docs(postgres): add documentation for EventSubscriber * test(postgres): add integration tests * fix: try to not limit threads for tarpaulin
1 parent e96c738 commit d241588

File tree

13 files changed

+833
-360
lines changed

13 files changed

+833
-360
lines changed

.github/workflows/rust.yml

-2
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ jobs:
4040

4141
- name: Run cargo-tarpaulin (main)
4242
uses: actions-rs/[email protected]
43-
with:
44-
args: '-- --test-threads 1'
4543

4644
- name: Upload to codecov.io
4745
uses: codecov/[email protected]

eventually-core/src/aggregate.rs

+15-14
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
//! Foundation traits for creating Domain abstractions
22
//! using [the `Aggregate` pattern](https://martinfowler.com/bliki/DDD_Aggregate.html).
33
4-
use std::sync::Arc;
5-
64
use futures::future::BoxFuture;
75

86
#[cfg(feature = "serde")]
@@ -102,23 +100,26 @@ impl<T> AggregateExt for T where T: Aggregate {}
102100
///
103101
/// [`AggregateRoot`]: struct.AggregateRoot.html
104102
#[derive(Clone)]
105-
pub struct AggregateRootBuilder<T> {
106-
aggregate: Arc<T>,
103+
pub struct AggregateRootBuilder<T>
104+
where
105+
T: Aggregate + Clone,
106+
{
107+
aggregate: T,
107108
}
108109

109-
impl<T> From<Arc<T>> for AggregateRootBuilder<T>
110+
impl<T> From<T> for AggregateRootBuilder<T>
110111
where
111-
T: Aggregate,
112+
T: Aggregate + Clone,
112113
{
113114
#[inline]
114-
fn from(aggregate: Arc<T>) -> Self {
115+
fn from(aggregate: T) -> Self {
115116
Self { aggregate }
116117
}
117118
}
118119

119120
impl<T> AggregateRootBuilder<T>
120121
where
121-
T: Aggregate,
122+
T: Aggregate + Clone,
122123
{
123124
/// Builds a new [`AggregateRoot`] instance for the specified Aggregate [`Id`].
124125
///
@@ -172,7 +173,7 @@ where
172173
#[cfg_attr(feature = "serde", derive(Serialize))]
173174
pub struct AggregateRoot<T>
174175
where
175-
T: Aggregate + 'static,
176+
T: Aggregate + Clone + 'static,
176177
{
177178
id: T::Id,
178179
version: u32,
@@ -181,15 +182,15 @@ where
181182
state: T::State,
182183

183184
#[cfg_attr(feature = "serde", serde(skip_serializing))]
184-
aggregate: Arc<T>,
185+
aggregate: T,
185186

186187
#[cfg_attr(feature = "serde", serde(skip_serializing))]
187188
to_commit: Option<Vec<T::Event>>,
188189
}
189190

190191
impl<T> PartialEq for AggregateRoot<T>
191192
where
192-
T: Aggregate,
193+
T: Aggregate + Clone,
193194
{
194195
#[inline]
195196
fn eq(&self, other: &Self) -> bool {
@@ -199,7 +200,7 @@ where
199200

200201
impl<T> Versioned for AggregateRoot<T>
201202
where
202-
T: Aggregate,
203+
T: Aggregate + Clone,
203204
{
204205
#[inline]
205206
fn version(&self) -> u32 {
@@ -209,7 +210,7 @@ where
209210

210211
impl<T> AggregateRoot<T>
211212
where
212-
T: Aggregate,
213+
T: Aggregate + Clone,
213214
{
214215
/// Returns a reference to the Aggregate [`Id`] that represents
215216
/// the entity wrapped by this [`AggregateRoot`] instance.
@@ -246,7 +247,7 @@ where
246247

247248
impl<T> AggregateRoot<T>
248249
where
249-
T: Aggregate,
250+
T: Aggregate + Clone,
250251
T::Event: Clone,
251252
T::State: Clone,
252253
{

eventually-core/src/repository.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,19 @@ pub type Result<T, A, S> =
6060
/// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
6161
/// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
6262
/// [`EventStore`]: ../store/trait.EventStore.html
63+
#[derive(Clone)]
6364
pub struct Repository<T, Store>
6465
where
65-
T: Aggregate + 'static,
66+
T: Aggregate + Clone + 'static,
67+
Store: EventStore<SourceId = T::Id, Event = T::Event>,
6668
{
6769
builder: AggregateRootBuilder<T>,
6870
store: Store,
6971
}
7072

7173
impl<T, Store> Repository<T, Store>
7274
where
73-
T: Aggregate,
75+
T: Aggregate + Clone,
7476
Store: EventStore<SourceId = T::Id, Event = T::Event>,
7577
{
7678
/// Creates a new `Repository` instance, using the [`Aggregate`]

eventually-postgres/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ tokio-postgres = { version = "0.5", features = ["with-serde_json-1"] }
2121
thiserror = "1.0"
2222
refinery = { version = "0.3.0", features = ["tokio-postgres"] }
2323
anyhow = "1.0.32"
24+
tokio = { version = "0.2", features = ["sync"] }
2425

2526
[dev-dependencies]
26-
tokio = { version = "0.2", features = ["sync"] }
27+
testcontainers = "0.9"
28+
tokio = { version = "0.2", features = ["sync", "macros"] }

0 commit comments

Comments
 (0)