From ab24d0398b161252f1894fe3dc347bb585bbc09e Mon Sep 17 00:00:00 2001 From: Borislav Borisov Date: Mon, 22 Mar 2021 15:52:44 +0000 Subject: [PATCH 1/2] Query binding --- .../journal/PostgreSqlJournalDao.scala | 3 +++ .../persistence/r2dbc/client/Handle.scala | 27 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournalDao.scala b/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournalDao.scala index ae3002a4..0e8b02d3 100644 --- a/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournalDao.scala +++ b/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournalDao.scala @@ -32,6 +32,9 @@ private[journal] object PostgreSqlJournalDao { type EntryTags = JList[RTuple2[Long, Set[String]]] + def insertEntries(handle: Handle) = { + } + def insertEntriesQuery(entries: Seq[JournalEntry]): String = { val projections = entries.flatMap(it => it.projected).reduceOption(_ + ";" + _) val events = "INSERT INTO event (id, persistence_id, sequence_nr, timestamp, payload, manifest, ser_id, ser_manifest, writer_uuid) VALUES " + entries diff --git a/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala b/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala index b671fe84..851a8051 100644 --- a/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala +++ b/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala @@ -82,6 +82,33 @@ final class Handle private(val connection: Connection) { Flux.from(this.connection.createStatement(sql).execute).flatMap(fn(_)) } + /** + * Executes the given SQL statement, and transforms each [[Result]]s that are returned from + * execution. + * + * @param sql SQL statement + * @param fn a function used to transform each [[Result]] into a [[Flux]] of values + * @tparam T the type of results + * @return the values resulting from the [[Result]] transformation + * @throws IllegalArgumentException if `sql` or `fn` is `null` + */ + def executeQuery[T](sql: String, bindings: Seq[Array[Any]], fn: Result => _ <: Publisher[T]): Flux[T] = { + require(sql != null, SQL_REQUIRED) + require(fn != null, FN_REQUIRED) + + val statement = this.connection.createStatement(sql) + var count = 0 + bindings.foreach(it => { + it.indices.foreach(i => statement.bind(i, it(i))) + count += 1; + if (count < bindings.size) { + statement.add() + } + }) + + Flux.from(statement.execute).flatMap(fn(_)) + } + /** * Commits the current transaction. * From 67e3b873dfaf6ccb66d282e6ac5c63f15ad45114 Mon Sep 17 00:00:00 2001 From: Borislav Borisov Date: Sat, 27 Mar 2021 17:22:03 +0000 Subject: [PATCH 2/2] Strict and lenient query resolvers --- .../r2dbc/journal/JournalEntry.scala | 4 +- .../r2dbc/journal/PersistenceReprSerDe.scala | 2 +- .../persistence/r2dbc/journal/Projected.scala | 7 +- postgresql/src/main/resources/reference.conf | 4 +- ...cala => LenientPostgreSqlJournalDao.scala} | 11 +-- .../journal/PostgreSqlJournal.scala | 3 +- .../journal/StrictPostgreSqlJournalDao.scala | 73 +++++++++++++++++++ .../persistence/r2dbc/client/Handle.scala | 1 + 8 files changed, 89 insertions(+), 16 deletions(-) rename postgresql/src/main/scala/akka/persistence/postgresql/journal/{PostgreSqlJournalDao.scala => LenientPostgreSqlJournalDao.scala} (94%) create mode 100644 postgresql/src/main/scala/akka/persistence/postgresql/journal/StrictPostgreSqlJournalDao.scala diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/JournalEntry.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/JournalEntry.scala index 9f92a2f1..9baa4057 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/JournalEntry.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/JournalEntry.scala @@ -39,7 +39,7 @@ private[akka] object JournalEntry { serManifest: String, bytes: Array[Byte], tags: Set[String], - projection: Option[String] + projection: Option[(String, Array[Any])] ): JournalEntry = JournalEntry( Long.MinValue, repr.persistenceId, @@ -110,5 +110,5 @@ private[akka] final case class JournalEntry( serManifest: String = "", tags: Set[String] = Set.empty, deleted: Boolean = false, - projected: Option[String] = None + projected: Option[(String, Array[Any])] = None ) diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/PersistenceReprSerDe.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/PersistenceReprSerDe.scala index 920237d0..aa1f62d6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/PersistenceReprSerDe.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/PersistenceReprSerDe.scala @@ -44,7 +44,7 @@ private[akka] final class PersistenceReprSerDe(serialization: Serialization)(imp case asyncSer: AsyncSerializer => withTransportInformation(() => asyncSer.toBinaryAsync(event)) case sync => Future(withTransportInformation(() => sync.toBinary(event))) } - future.map(JournalEntry(repr, serializer.identifier, manifest, _, tags, projection)) + future.map(JournalEntry(repr, serializer.identifier, manifest, _, tags, projection.map((_, Array.empty[Any])))) }) def deserialize(entry: JournalEntry): Future[Try[PersistentRepr]] = { diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/Projected.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/Projected.scala index 865b1bd7..7eb4d4b8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/Projected.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/Projected.scala @@ -19,7 +19,8 @@ package akka.persistence.r2dbc.journal /** * Wrap an event in `Projected` to execute the projection `sql` in the same transaction as the event. * - * @param payload the event - * @param sql The SQL to execute. + * @param payload the event + * @param sql The SQL to execute. + * @param bindings The SQL bindings. */ -final case class Projected(payload: Any, sql: String) +final case class Projected(payload: Any, sql: String, bindings: Object*) diff --git a/postgresql/src/main/resources/reference.conf b/postgresql/src/main/resources/reference.conf index 3f5ac9d2..2c1566c5 100644 --- a/postgresql/src/main/resources/reference.conf +++ b/postgresql/src/main/resources/reference.conf @@ -7,7 +7,7 @@ postgresql-journal { db { # The hostname of the database server - hostname = "localhost" + hostname = "127.0.0.1" # The port of the database server port = 5432 @@ -51,7 +51,7 @@ postgresql-snapshot-store { db { # The hostname of the database server - hostname = "localhost" + hostname = "127.0.0.1" # The port of the database server port = 5432 diff --git a/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournalDao.scala b/postgresql/src/main/scala/akka/persistence/postgresql/journal/LenientPostgreSqlJournalDao.scala similarity index 94% rename from postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournalDao.scala rename to postgresql/src/main/scala/akka/persistence/postgresql/journal/LenientPostgreSqlJournalDao.scala index 0e8b02d3..b3e9c193 100644 --- a/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournalDao.scala +++ b/postgresql/src/main/scala/akka/persistence/postgresql/journal/LenientPostgreSqlJournalDao.scala @@ -28,15 +28,12 @@ import reactor.core.publisher.Flux import reactor.util.function.{Tuple2 => RTuple2} import scala.collection.JavaConverters._ -private[journal] object PostgreSqlJournalDao { +private[journal] object LenientPostgreSqlJournalDao { type EntryTags = JList[RTuple2[Long, Set[String]]] - def insertEntries(handle: Handle) = { - } - def insertEntriesQuery(entries: Seq[JournalEntry]): String = { - val projections = entries.flatMap(it => it.projected).reduceOption(_ + ";" + _) + val projections = entries.flatMap(it => it.projected.map(_._1)).reduceOption(_ + ";" + _) val events = "INSERT INTO event (id, persistence_id, sequence_nr, timestamp, payload, manifest, ser_id, ser_manifest, writer_uuid) VALUES " + entries .map(it => s"(DEFAULT, '${it.persistenceId}', ${it.sequenceNr}, ${it.timestamp}, '\\x${hexDump(it.event)}', " + s"'${it.eventManifest}', ${it.serId}, '${it.serManifest}', '${it.writerUuid}')") @@ -76,9 +73,9 @@ private[journal] object PostgreSqlJournalDao { * * @see [[https://github.com/r2dbc/r2dbc-postgresql r2dbc-postgresql]] for more */ -final class PostgreSqlJournalDao(val r2dbc: R2dbc) extends JournalDao { +final class LenientPostgreSqlJournalDao(val r2dbc: R2dbc) extends JournalDao { - import PostgreSqlJournalDao._ + import LenientPostgreSqlJournalDao._ override def writeEvents(events: Seq[JournalEntry]): Source[Int, NotUsed] = { val flux: Flux[Integer] = r2dbc.inTransaction((handle: Handle) => diff --git a/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournal.scala b/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournal.scala index 4a26c7b8..5acc9a58 100644 --- a/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournal.scala +++ b/postgresql/src/main/scala/akka/persistence/postgresql/journal/PostgreSqlJournal.scala @@ -27,6 +27,7 @@ private[akka] final class PostgreSqlJournal(config: Config) override implicit val system: ActorSystem = context.system override protected val dao: JournalDao = - new PostgreSqlJournalDao(R2dbc(ConnectionPoolFactory("postgresql", JournalPluginConfig(config)))) +// new PostgreSqlJournalDao(R2dbc(ConnectionPoolFactory("postgresql", JournalPluginConfig(config)))) + new StrictPostgreSqlJournalDao(R2dbc(ConnectionPoolFactory("postgresql", JournalPluginConfig(config)))) } diff --git a/postgresql/src/main/scala/akka/persistence/postgresql/journal/StrictPostgreSqlJournalDao.scala b/postgresql/src/main/scala/akka/persistence/postgresql/journal/StrictPostgreSqlJournalDao.scala new file mode 100644 index 00000000..ba8afebf --- /dev/null +++ b/postgresql/src/main/scala/akka/persistence/postgresql/journal/StrictPostgreSqlJournalDao.scala @@ -0,0 +1,73 @@ +package akka.persistence.postgresql.journal + +import akka.NotUsed +import akka.persistence.postgresql.journal.LenientPostgreSqlJournalDao.EntryTags +import akka.persistence.postgresql.journal.StrictPostgreSqlJournalDao.{findEventsQuery, highestSeqNrQuery, insertEntriesQuery, insertTagsQuery} +import akka.persistence.r2dbc.client.{Handle, R2dbc} +import akka.persistence.r2dbc.journal.ResultUtils.{toJournalEntry, toSeqId} +import akka.persistence.r2dbc.journal.{JournalDao, JournalEntry} +import akka.stream.scaladsl.Source +import io.r2dbc.spi.Result +import reactor.core.publisher.Flux + +import scala.collection.JavaConverters._ + +private[journal] object StrictPostgreSqlJournalDao { + + val insertEntriesQuery: String = "INSERT INTO event (id, persistence_id, sequence_nr, timestamp, payload, manifest, " + + "ser_id, ser_manifest, writer_uuid) VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8)" + + val insertTagsQuery: String = "INSERT INTO tag (id, event_id, tag) VALUES (DEFAULT, $1, $2)" + + val findEventsQuery = "SELECT id, persistence_id, sequence_nr, timestamp, payload, manifest, ser_id, ser_manifest, writer_uuid FROM event" + + " WHERE deleted = false AND persistence_id = $1" + + " AND sequence_nr BETWEEN $2 AND $3 ORDER BY sequence_nr ASC LIMIT $4" + + val highestSeqNrQuery: String = "SELECT sequence_nr FROM event WHERE persistence_id = $1" + + " AND sequence_nr >= $2 ORDER BY sequence_nr DESC LIMIT 1" +} + +final class StrictPostgreSqlJournalDao(val r2dbc: R2dbc) extends JournalDao { + + val lenient = new LenientPostgreSqlJournalDao(r2dbc) + + override def writeEvents(entries: Seq[JournalEntry]): Source[Int, NotUsed] = { + val flux: Flux[Integer] = r2dbc.inTransaction((handle: Handle) => { + val projections = entries.flatMap(it => it.projected).map(it => { + handle.executeQuery(it._1, Seq(it._2), _.getRowsUpdated) + }) + val events = entries.map(it => Array(it.persistenceId, it.sequenceNr, it.timestamp, it.event, + it.eventManifest, it.serId, it.serManifest, it.writerUuid)) + + Flux.concat(projections.asJava) + .thenMany(handle.executeQuery(insertEntriesQuery, events, toSeqId(_, "id"))) + .zipWithIterable(entries.flatMap(it => Set(it.tags)).asJava) + .collectList + .filter((it: EntryTags) => it.asScala.map(_.getT2).exists(_.nonEmpty)) + .flatMapMany((eventTags: EntryTags) => { + val tags = eventTags.asScala.flatMap(it => it.getT2.map(Array(it.getT1, _))).toSeq + handle.executeQuery(insertTagsQuery, tags, _.getRowsUpdated) + }) + }) + Source.fromPublisher(flux.defaultIfEmpty(0)).map(_.toInt) + } + + override def fetchEvents(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, max: Long): Source[JournalEntry, NotUsed] = { + val flux = r2dbc.withHandle { handle => + handle.executeQuery(findEventsQuery, Seq(Array(persistenceId, fromSeqNr, toSeqNr, max)), toJournalEntry(_)) + } + Source.fromPublisher(flux) + } + + override def deleteEvents(persistenceId: String, toSeqNr: Long): Source[Int, NotUsed] = { + lenient.deleteEvents(persistenceId, toSeqNr) + } + + override def readHighestSequenceNr(persistenceId: String, fromSeqNr: Long): Source[Long, NotUsed] = { + val flux = r2dbc.withHandle { handle => + val toSeqNr = (result: Result) => toSeqId(result, "sequence_nr") + handle.executeQuery(highestSeqNrQuery, Seq(Array(persistenceId, fromSeqNr)), toSeqNr) + } + Source.fromPublisher(flux) + } +} diff --git a/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala b/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala index 851a8051..db03de5b 100644 --- a/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala +++ b/r2dbc-mini-client/src/main/scala/akka/persistence/r2dbc/client/Handle.scala @@ -96,6 +96,7 @@ final class Handle private(val connection: Connection) { require(sql != null, SQL_REQUIRED) require(fn != null, FN_REQUIRED) + println(s"Running query $sql") val statement = this.connection.createStatement(sql) var count = 0 bindings.foreach(it => {