Skip to content

Commit 6e6905a

Browse files
authored
Merge pull request #104 from SwissBorg/conf-and-fixes
Make V2 migration parallelism configurable
2 parents a442237 + 947a49a commit 6e6905a

File tree

3 files changed

+4
-2
lines changed

3 files changed

+4
-2
lines changed

migration/src/main/resources/db/akka-persistence-postgres/migration/.keep

Whitespace-only changes.

migration/src/main/resources/reference.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ akka-persistence-postgres {
22
migration {
33
v2 {
44
batchSize = 500
5+
parallelism = 8
56
}
67
}
78
}

migration/src/main/scala/akka/persistence/postgres/migration/v2/V2__Extract_journal_metadata.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private[migration] class V2__Extract_journal_metadata(globalConfig: Config, db:
6363

6464
private val migrationConf: Config = globalConfig.getConfig("akka-persistence-postgres.migration")
6565
private val migrationBatchSize: Long = migrationConf.getLong("v2.batchSize")
66+
private val conversionParallelism: Int = migrationConf.getInt("v2.parallelism")
6667

6768
@throws[Exception]
6869
override def migrate(context: Context): Unit = {
@@ -101,7 +102,7 @@ private[migration] class V2__Extract_journal_metadata(globalConfig: Config, db:
101102

102103
val dml = Source
103104
.fromPublisher(eventsPublisher)
104-
.mapAsync(8) {
105+
.mapAsync(conversionParallelism) {
105106
case (ordering, deleted, persistenceId, sequenceNumber, oldMessage, tags) =>
106107
Future.fromTry {
107108
for {
@@ -159,7 +160,7 @@ private[migration] class V2__Extract_journal_metadata(globalConfig: Config, db:
159160

160161
val dml = Source
161162
.fromPublisher(eventsPublisher)
162-
.mapAsync(8) {
163+
.mapAsync(conversionParallelism) {
163164
case (persistenceId, sequenceNumber, created, serializedOldSnapshot) =>
164165
Future.fromTry {
165166
for {

0 commit comments

Comments
 (0)