Skip to content

Commit 3a9c080

Browse files
Add large snapshot support (#383)
* Add large snapshot payload support * Refactor class name * fix snapshot plugin class name in test config * Add load time measurement * Add GridFS snapshot example * downgrade sample to net6.0, make project not packable * Fix unit test * Remove transaction, use native GridFS driver * Add GridFS documentation --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 816d098 commit 3a9c080

18 files changed

+728
-14
lines changed

Akka.Persistence.MongoDb.sln

+15
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,18 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{BE1178E1
1313
build.fsx = build.fsx
1414
build.ps1 = build.ps1
1515
build.sh = build.sh
16+
src\Directory.Build.props = src\Directory.Build.props
17+
src\Directory.Packages.props = src\Directory.Packages.props
18+
README.md = README.md
19+
RELEASE_NOTES.md = RELEASE_NOTES.md
1620
EndProjectSection
1721
EndProject
1822
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.MongoDb.Hosting", "src\Akka.Persistence.MongoDb.Hosting\Akka.Persistence.MongoDb.Hosting.csproj", "{72B8C165-FE00-465F-A2E9-60B4B79F81AF}"
1923
EndProject
24+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{57AE52C4-1E22-42B9-9214-4FEE4F5DFC70}"
25+
EndProject
26+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LargeSnapshot", "src\examples\LargeSnapshot\LargeSnapshot.csproj", "{8FA9D3B5-C67E-49A6-8449-74E060458F25}"
27+
EndProject
2028
Global
2129
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2230
Debug|Any CPU = Debug|Any CPU
@@ -35,11 +43,18 @@ Global
3543
{72B8C165-FE00-465F-A2E9-60B4B79F81AF}.Debug|Any CPU.Build.0 = Debug|Any CPU
3644
{72B8C165-FE00-465F-A2E9-60B4B79F81AF}.Release|Any CPU.ActiveCfg = Release|Any CPU
3745
{72B8C165-FE00-465F-A2E9-60B4B79F81AF}.Release|Any CPU.Build.0 = Release|Any CPU
46+
{8FA9D3B5-C67E-49A6-8449-74E060458F25}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
47+
{8FA9D3B5-C67E-49A6-8449-74E060458F25}.Debug|Any CPU.Build.0 = Debug|Any CPU
48+
{8FA9D3B5-C67E-49A6-8449-74E060458F25}.Release|Any CPU.ActiveCfg = Release|Any CPU
49+
{8FA9D3B5-C67E-49A6-8449-74E060458F25}.Release|Any CPU.Build.0 = Release|Any CPU
3850
EndGlobalSection
3951
GlobalSection(SolutionProperties) = preSolution
4052
HideSolutionNode = FALSE
4153
EndGlobalSection
4254
GlobalSection(ExtensibilityGlobals) = postSolution
4355
SolutionGuid = {D4F3F966-EB9C-443A-8158-419703A68B92}
4456
EndGlobalSection
57+
GlobalSection(NestedProjects) = preSolution
58+
{8FA9D3B5-C67E-49A6-8449-74E060458F25} = {57AE52C4-1E22-42B9-9214-4FEE4F5DFC70}
59+
EndGlobalSection
4560
EndGlobal

README.md

+28-8
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
Akka Persistence journal and snapshot store backed by MongoDB database.
44

5-
### Setup
5+
> [!NOTE]
6+
>
7+
> The MongoDB operator to limit the number of documents in a query only accepts an integer while akka provides a long as maximum for the loading of events during the replay. Internally the long value is cast to an integer and if the value is higher than Int32.MaxValue, Int32.MaxValue is used. So if you have stored more than 2,147,483,647 events for a single PersistenceId, you may have a problem :wink:
8+
9+
## Setup
610

711
To activate the journal plugin, add the following lines to actor system configuration file:
812

@@ -22,9 +26,9 @@ akka.persistence.snapshot-store.mongodb.collection = "<snapshot-store collection
2226

2327
Remember that connection string must be provided separately to Journal and Snapshot Store. To finish setup simply initialize plugin using: `MongoDbPersistence.Get(actorSystem);`
2428

25-
### Configuration
29+
## Configuration
2630

27-
Both journal and snapshot store share the same configuration keys (however they resides in separate scopes, so they are definied distinctly for either journal or snapshot store):
31+
Both journal and snapshot store share the same configuration keys (however they reside in separate scopes, so they are defined distinctly for either journal or snapshot store):
2832

2933
```hocon
3034
akka.persistence {
@@ -127,9 +131,9 @@ akka.persistence {
127131
}
128132
```
129133

130-
### Programmatic configuration
134+
## Programmatic configuration
131135

132-
You can programmatically overrides the connection string setting in the HOCON configuration by adding a `MongoDbPersistenceSetup` to the
136+
You can programmatically override the connection string setting in the HOCON configuration by adding a `MongoDbPersistenceSetup` to the
133137
`ActorSystemSetup` during `ActorSystem` creation. The `MongoDbPersistenceSetup` takes `MongoClientSettings` instances to be used to configure
134138
MongoDB client connection to the server. The `connection-string` settings in the HOCON configuration will be ignored if any of these `MongoClientSettings`
135139
exists inside the Setup object.
@@ -178,7 +182,8 @@ var setup = BootstrapSetup.Create()
178182
var actorSystem = ActorSystem.Create("actorSystem", setup);
179183
```
180184

181-
### Serialization
185+
## Serialization
186+
182187
[Going from v1.4.0 onwards, all events and snapshots are saved as byte arrays using the standard Akka.Persistence format](https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/72).
183188

184189
However, in the event that you have one of the following use cases:
@@ -205,5 +210,20 @@ Setting `legacy-serialization = on` will allow you to save objects in a BSON for
205210

206211
**WARNING**: However, `legacy-serialization = on` will break Akka.NET serialization. `IActorRef`s, Akka.Cluster.Sharding, `AtLeastOnceDelivery` actors, and other built-in Akka.NET use cases can't be properly supported using this format. Use it at your own risk.
207212

208-
### Notice
209-
- The MongoDB operator to limit the number of documents in a query only accepts an integer while akka provides a long as maximum for the loading of events during the replay. Internally the long value is cast to an integer and if the value is higher then Int32.MaxValue, Int32.MaxValue is used. So if you have stored more then 2,147,483,647 events for a single PersistenceId, you may have a problem :wink:
213+
# Large Snapshot Store Support
214+
215+
MongoDb limits the size of documents it can store to 16 megabytes. If you know you will need to store snapshots larger than 16 megabytes, you can use the `Akka.Persistence.MongoDb.Snapshot.MongoDbGridFSSnapshotStore` snapshot store plugin.
216+
217+
> [!NOTE]
218+
>
219+
> `MongoDbGridFSSnapshotStore` is not designed for normal snapshot store plugin, it does not support transaction and read/write operations are slower on this plugin. Only use this plugin if you are having 16 megabyte limit problem.
220+
221+
Note that `MongoDbGridFSSnapshotStore` is considered as advanced optional plugin and it will not get an `Akka.Hosting` support. You will need to use HOCON configuration to use this plugin.
222+
223+
## Configuring `MongoDbGridFSSnapshotStore`
224+
225+
You will need to override the `class` property of the snapshot store `mongodb` HOCON settings to use this plugin. If you're using a custom snapshot-store plugin identifier, then you will need to change that instead. Note that the `use-write-transaction` setting is ignored on this plugin.
226+
227+
```
228+
akka.persistence.snapshot-store.mongodb.class = "Akka.Persistence.MongoDb.Snapshot.MongoDbGridFSSnapshotStore, Akka.Persistence.MongoDb"
229+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System;
2+
using System.Text;
3+
using Akka.Configuration;
4+
using Akka.Hosting;
5+
using Akka.Persistence.Hosting;
6+
using Akka.Persistence.MongoDb.Snapshot;
7+
8+
#nullable enable
9+
namespace Akka.Persistence.MongoDb.Hosting;
10+
11+
public class MongoDbGridFsSnapshotOptions : MongoDbSnapshotOptions
12+
{
13+
public MongoDbGridFsSnapshotOptions() : this(true)
14+
{
15+
}
16+
17+
public MongoDbGridFsSnapshotOptions(bool isDefault, string identifier = "mongodb") : base(isDefault, identifier)
18+
{
19+
}
20+
21+
public override Type Type { get; } = typeof(MongoDbGridFsSnapshotStore);
22+
}

src/Akka.Persistence.MongoDb.Hosting/MongoDbSnapshotOptions.cs

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Akka.Configuration;
44
using Akka.Hosting;
55
using Akka.Persistence.Hosting;
6+
using Akka.Persistence.MongoDb.Snapshot;
67

78
#nullable enable
89
namespace Akka.Persistence.MongoDb.Hosting;
@@ -22,6 +23,8 @@ public MongoDbSnapshotOptions(bool isDefault, string identifier = "mongodb") : b
2223
AutoInitialize = true;
2324
}
2425

26+
public virtual Type Type { get; } = typeof(MongoDbSnapshotStore);
27+
2528
/// <summary>
2629
/// Connection string used to access the MongoDb, also specifies the database.
2730
/// </summary>
@@ -55,6 +58,8 @@ public MongoDbSnapshotOptions(bool isDefault, string identifier = "mongodb") : b
5558

5659
protected override StringBuilder Build(StringBuilder sb)
5760
{
61+
sb.AppendLine($"class = {Type.AssemblyQualifiedName.ToHocon()}");
62+
5863
sb.AppendLine($"connection-string = {ConnectionString.ToHocon()}");
5964

6065
if(UseWriteTransaction is not null)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="MongoDbSnapshotStoreSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Diagnostics;
10+
using System.Security.Cryptography;
11+
using System.Threading.Tasks;
12+
using Akka.Configuration;
13+
using Akka.Event;
14+
using Akka.Persistence.TCK.Snapshot;
15+
using FluentAssertions;
16+
using Xunit;
17+
using Xunit.Abstractions;
18+
19+
#nullable enable
20+
namespace Akka.Persistence.MongoDb.Tests.GridFS;
21+
22+
[Collection("MongoDbSpec")]
23+
public class MongoDbGridFsLegacySerializationSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
24+
{
25+
protected override bool SupportsSerialization => false;
26+
27+
public MongoDbGridFsLegacySerializationSnapshotStoreSpec(DatabaseFixture databaseFixture, ITestOutputHelper output)
28+
: base(CreateSpecConfig(databaseFixture), nameof(MongoDbGridFsLegacySerializationSnapshotStoreSpec), output)
29+
{
30+
Initialize();
31+
}
32+
33+
protected override int SnapshotByteSizeLimit => 128 * 1024 * 1024;
34+
35+
private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
36+
{
37+
var specString = @"
38+
akka.test.single-expect-default = 3s
39+
akka.persistence {
40+
publish-plugin-commands = on
41+
snapshot-store {
42+
plugin = ""akka.persistence.snapshot-store.mongodb""
43+
mongodb {
44+
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbGridFsSnapshotStore, Akka.Persistence.MongoDb""
45+
connection-string = """ + databaseFixture.ConnectionString + @"""
46+
auto-initialize = on
47+
collection = ""SnapshotStore""
48+
legacy-serialization = on
49+
}
50+
}
51+
}";
52+
53+
return ConfigurationFactory.ParseString(specString)
54+
.WithFallback(MongoDbPersistence.DefaultConfiguration());
55+
}
56+
57+
[Fact]
58+
public async Task SnapshotStore_should_save_bigger_size_snapshot_consistently()
59+
{
60+
var metadata = new SnapshotMetadata(Pid, 100);
61+
var bigSnapshot = new byte[SnapshotByteSizeLimit];
62+
new Random().NextBytes(bigSnapshot);
63+
var senderProbe = CreateTestProbe();
64+
SnapshotStore.Tell(new SaveSnapshot(metadata, bigSnapshot), senderProbe.Ref);
65+
var saved = await senderProbe.ExpectMsgAsync<SaveSnapshotSuccess>();
66+
67+
var stopwatch = Stopwatch.StartNew();
68+
SnapshotStore.Tell(
69+
new LoadSnapshot(Pid, new SnapshotSelectionCriteria(saved.Metadata.SequenceNr), long.MaxValue),
70+
senderProbe.Ref);
71+
var loaded = await senderProbe.ExpectMsgAsync<LoadSnapshotResult>();
72+
stopwatch.Stop();
73+
Log.Info($"{SnapshotByteSizeLimit} bytes snapshot loaded in {stopwatch.Elapsed.Milliseconds} milliseconds");
74+
75+
MD5.Create().ComputeHash((byte[])loaded.Snapshot.Snapshot).Should()
76+
.BeEquivalentTo(MD5.Create().ComputeHash(bigSnapshot));
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="MongoDbSnapshotStoreSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Diagnostics;
10+
using System.Security.Cryptography;
11+
using System.Threading.Tasks;
12+
using Akka.Configuration;
13+
using Akka.Event;
14+
using Akka.Persistence.TCK.Snapshot;
15+
using FluentAssertions;
16+
using Xunit;
17+
using Xunit.Abstractions;
18+
19+
#nullable enable
20+
namespace Akka.Persistence.MongoDb.Tests.GridFS;
21+
22+
[Collection("MongoDbSpec")]
23+
public class MongoDbGridFsSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
24+
{
25+
public MongoDbGridFsSnapshotStoreSpec(DatabaseFixture databaseFixture, ITestOutputHelper output)
26+
: base(CreateSpecConfig(databaseFixture), nameof(MongoDbGridFsSnapshotStoreSpec), output)
27+
{
28+
Initialize();
29+
}
30+
31+
protected override int SnapshotByteSizeLimit => 128 * 1024 * 1024;
32+
33+
private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
34+
{
35+
var specString = $$"""
36+
akka.test.single-expect-default = 3s
37+
akka.persistence {
38+
publish-plugin-commands = on
39+
snapshot-store {
40+
plugin = "akka.persistence.snapshot-store.mongodb"
41+
mongodb {
42+
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbGridFsSnapshotStore, Akka.Persistence.MongoDb"
43+
connection-string = "{{databaseFixture.ConnectionString}}"
44+
use-write-transaction = off
45+
auto-initialize = on
46+
collection = "SnapshotStore"
47+
}
48+
}
49+
}
50+
""";
51+
52+
return ConfigurationFactory.ParseString(specString);
53+
}
54+
55+
[Fact]
56+
public async Task SnapshotStore_should_save_bigger_size_snapshot_consistently()
57+
{
58+
var metadata = new SnapshotMetadata(Pid, 100);
59+
var bigSnapshot = new byte[SnapshotByteSizeLimit];
60+
new Random().NextBytes(bigSnapshot);
61+
var senderProbe = CreateTestProbe();
62+
SnapshotStore.Tell(new SaveSnapshot(metadata, bigSnapshot), senderProbe.Ref);
63+
var saved = await senderProbe.ExpectMsgAsync<SaveSnapshotSuccess>();
64+
65+
var stopwatch = Stopwatch.StartNew();
66+
SnapshotStore.Tell(
67+
new LoadSnapshot(Pid, new SnapshotSelectionCriteria(saved.Metadata.SequenceNr), long.MaxValue),
68+
senderProbe.Ref);
69+
var loaded = await senderProbe.ExpectMsgAsync<LoadSnapshotResult>();
70+
stopwatch.Stop();
71+
Log.Info($"{SnapshotByteSizeLimit} bytes snapshot loaded in {stopwatch.Elapsed.Milliseconds} milliseconds");
72+
73+
MD5.Create().ComputeHash((byte[])loaded.Snapshot.Snapshot).Should()
74+
.BeEquivalentTo(MD5.Create().ComputeHash(bigSnapshot));
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using Akka.Configuration;
2+
using Akka.Persistence.TCK.Serialization;
3+
using Akka.Util.Internal;
4+
using Xunit;
5+
using Xunit.Abstractions;
6+
7+
#nullable enable
8+
namespace Akka.Persistence.MongoDb.Tests.GridFS.Serialization;
9+
10+
[Collection("MongoDbSpec")]
11+
public class MongoDbSnapshotStoreSerializationSpec : SnapshotStoreSerializationSpec, IClassFixture<DatabaseFixture>
12+
{
13+
public static readonly AtomicCounter Counter = new AtomicCounter(0);
14+
15+
private readonly ITestOutputHelper _output;
16+
17+
public MongoDbSnapshotStoreSerializationSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
18+
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), nameof(MongoDbSnapshotStoreSerializationSpec), output)
19+
{
20+
_output = output;
21+
output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current));
22+
}
23+
24+
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
25+
{
26+
var specString = @"
27+
akka.test.single-expect-default = 3s
28+
akka.persistence {
29+
publish-plugin-commands = on
30+
snapshot-store {
31+
plugin = ""akka.persistence.snapshot-store.mongodb""
32+
mongodb {
33+
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbGridFsSnapshotStore, Akka.Persistence.MongoDb""
34+
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
35+
auto-initialize = on
36+
collection = ""SnapshotStore""
37+
}
38+
}
39+
}";
40+
41+
return ConfigurationFactory.ParseString(specString);
42+
}
43+
}

src/Akka.Persistence.MongoDb.Tests/Hosting/MongoDbSnapshotOptionsSpec.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void DefaultSnapshotOptionsTest()
3737
config.Should().NotBeNull();
3838
baseConfig.Should().NotBeNull();
3939

40-
config.GetString("class").Should().Be(baseConfig.GetString("class"));
40+
Type.GetType(config.GetString("class")).Should().Be(Type.GetType(baseConfig.GetString("class")));
4141
config.GetString("connection-string").Should().Be(baseConfig.GetString("connection-string"));
4242
config.GetBoolean("use-write-transaction").Should().Be(baseConfig.GetBoolean("use-write-transaction"));
4343
config.GetBoolean("auto-initialize").Should().Be(baseConfig.GetBoolean("auto-initialize"));
@@ -62,7 +62,7 @@ public void CustomIdSnapshotOptionsTest()
6262
config.Should().NotBeNull();
6363
baseConfig.Should().NotBeNull();
6464

65-
config.GetString("class").Should().Be(baseConfig.GetString("class"));
65+
Type.GetType(config.GetString("class")).Should().Be(Type.GetType(baseConfig.GetString("class")));
6666
config.GetString("connection-string").Should().Be(baseConfig.GetString("connection-string"));
6767
config.GetBoolean("use-write-transaction").Should().Be(baseConfig.GetBoolean("use-write-transaction"));
6868
config.GetBoolean("auto-initialize").Should().Be(baseConfig.GetBoolean("auto-initialize"));

src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
<PackageReference Include="Akka.Persistence.Query" />
1111
<PackageReference Include="Akka.Streams" />
1212
<PackageReference Include="MongoDB.Driver" />
13+
<PackageReference Include="MongoDB.Driver.GridFS" />
1314
</ItemGroup>
1415
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using MongoDB.Bson;
2+
using MongoDB.Bson.Serialization.Attributes;
3+
4+
namespace Akka.Persistence.MongoDb.Snapshot;
5+
6+
public class GridFsPayloadEnvelope
7+
{
8+
[BsonElement("_v")]
9+
public object Payload { get; set; }
10+
}

0 commit comments

Comments
 (0)