Skip to content

Commit d12a28c

Browse files
authored
Merge pull request #115 from SwissBorg/fix-migration-query-streaming
Migration Tool: Prevent PostgreSQL client from caching all data in memory
2 parents f76da83 + d9977b8 commit d12a28c

File tree

1 file changed

+39
-35
lines changed

1 file changed

+39
-35
lines changed

migration/src/main/scala/akka/persistence/postgres/migration/v2/V2__Extract_journal_metadata.scala

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@ import akka.stream.Materializer
1717
import akka.stream.scaladsl.Source
1818
import com.typesafe.config.Config
1919
import org.flywaydb.core.api.migration.Context
20-
import slick.jdbc.JdbcBackend
20+
import slick.jdbc.{ JdbcBackend, ResultSetConcurrency, ResultSetType }
2121

2222
import scala.concurrent.duration.Duration
2323
import scala.concurrent.{ Await, Future }
2424
import scala.util.Failure
2525

2626
// Class name must obey FlyWay naming rules (https://flywaydb.org/documentation/migrations#naming-1)
27-
private[migration] class V2__Extract_journal_metadata(globalConfig: Config, journalConfig: JournalConfig, snapshotConfig: SnapshotConfig, db: JdbcBackend.Database)(
28-
implicit system: ActorSystem,
29-
mat: Materializer)
27+
private[migration] class V2__Extract_journal_metadata(
28+
globalConfig: Config,
29+
journalConfig: JournalConfig,
30+
snapshotConfig: SnapshotConfig,
31+
db: JdbcBackend.Database)(implicit system: ActorSystem, mat: Materializer)
3032
extends SlickMigration {
3133

3234
import system.dispatcher
@@ -95,27 +97,31 @@ private[migration] class V2__Extract_journal_metadata(globalConfig: Config, jour
9597
import journalTableConfig.columnNames._
9698
db.stream(
9799
sql"SELECT #$ordering, #$deleted, #$persistenceId, #$sequenceNumber, #$message, #$tags FROM #$journalTableName WHERE #$metadata IS NULL"
98-
.as[(Long, Boolean, String, Long, Array[Byte], List[Int])])
100+
.as[(Long, Boolean, String, Long, Array[Byte], List[Int])]
101+
.withStatementParameters(
102+
rsType = ResultSetType.ForwardOnly,
103+
rsConcurrency = ResultSetConcurrency.ReadOnly,
104+
fetchSize = migrationBatchSize.max(Int.MaxValue).toInt)
105+
.transactionally)
99106
}
100107

101108
val dml = Source
102109
.fromPublisher(eventsPublisher)
103-
.mapAsync(conversionParallelism) {
104-
case (ordering, deleted, persistenceId, sequenceNumber, oldMessage, tags) =>
105-
Future.fromTry {
106-
for {
107-
pr <- deserializer.deserialize(oldMessage)
108-
(newMessage, metadata) <- serializer.serialize(pr)
109-
} yield TempJournalRow(
110-
ordering,
111-
deleted,
112-
persistenceId,
113-
sequenceNumber,
114-
oldMessage,
115-
newMessage,
116-
tags,
117-
metadata)
118-
}
110+
.mapAsync(conversionParallelism) { case (ordering, deleted, persistenceId, sequenceNumber, oldMessage, tags) =>
111+
Future.fromTry {
112+
for {
113+
pr <- deserializer.deserialize(oldMessage)
114+
(newMessage, metadata) <- serializer.serialize(pr)
115+
} yield TempJournalRow(
116+
ordering,
117+
deleted,
118+
persistenceId,
119+
sequenceNumber,
120+
oldMessage,
121+
newMessage,
122+
tags,
123+
metadata)
124+
}
119125
}
120126
.batch(migrationBatchSize, List(_))(_ :+ _)
121127
.map(journalQueries.updateAll)
@@ -153,25 +159,23 @@ private[migration] class V2__Extract_journal_metadata(globalConfig: Config, jour
153159
db.stream {
154160
sql"SELECT #$persistenceId, #$sequenceNumber, #$created, #$snapshot FROM #$snapshotTableName WHERE #$metadata IS NULL"
155161
.as[(String, Long, Long, Array[Byte])]
162+
.withStatementParameters(
163+
rsType = ResultSetType.ForwardOnly,
164+
rsConcurrency = ResultSetConcurrency.ReadOnly,
165+
fetchSize = migrationBatchSize.max(Int.MaxValue).toInt)
166+
.transactionally
156167
}
157168
}
158169

159170
val dml = Source
160171
.fromPublisher(eventsPublisher)
161-
.mapAsync(conversionParallelism) {
162-
case (persistenceId, sequenceNumber, created, serializedOldSnapshot) =>
163-
Future.fromTry {
164-
for {
165-
oldSnapshot <- deserializer.deserialize(serializedOldSnapshot)
166-
(newSnapshot, metadata) <- serializer.serialize(oldSnapshot)
167-
} yield TempSnapshotRow(
168-
persistenceId,
169-
sequenceNumber,
170-
created,
171-
serializedOldSnapshot,
172-
newSnapshot,
173-
metadata)
174-
}
172+
.mapAsync(conversionParallelism) { case (persistenceId, sequenceNumber, created, serializedOldSnapshot) =>
173+
Future.fromTry {
174+
for {
175+
oldSnapshot <- deserializer.deserialize(serializedOldSnapshot)
176+
(newSnapshot, metadata) <- serializer.serialize(oldSnapshot)
177+
} yield TempSnapshotRow(persistenceId, sequenceNumber, created, serializedOldSnapshot, newSnapshot, metadata)
178+
}
175179
}
176180
.batch(migrationBatchSize, List(_))(_ :+ _)
177181
.map(snapshotQueries.insertOrUpdate)

0 commit comments

Comments
 (0)