Skip to content
This repository was archived by the owner on Sep 18, 2023. It is now read-only.

Query binding #114

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[akka] object JournalEntry {
serManifest: String,
bytes: Array[Byte],
tags: Set[String],
projection: Option[String]
projection: Option[(String, Array[Any])]
): JournalEntry = JournalEntry(
Long.MinValue,
repr.persistenceId,
Expand Down Expand Up @@ -110,5 +110,5 @@ private[akka] final case class JournalEntry(
serManifest: String = "",
tags: Set[String] = Set.empty,
deleted: Boolean = false,
projected: Option[String] = None
projected: Option[(String, Array[Any])] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[akka] final class PersistenceReprSerDe(serialization: Serialization)(imp
case asyncSer: AsyncSerializer => withTransportInformation(() => asyncSer.toBinaryAsync(event))
case sync => Future(withTransportInformation(() => sync.toBinary(event)))
}
future.map(JournalEntry(repr, serializer.identifier, manifest, _, tags, projection))
future.map(JournalEntry(repr, serializer.identifier, manifest, _, tags, projection.map((_, Array.empty[Any]))))
})

def deserialize(entry: JournalEntry): Future[Try[PersistentRepr]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package akka.persistence.r2dbc.journal
/**
* Wrap an event in `Projected` to execute the projection `sql` in the same transaction as the event.
*
* @param payload the event
* @param sql The SQL to execute.
* @param payload the event
* @param sql The SQL to execute.
* @param bindings The SQL bindings.
*/
final case class Projected(payload: Any, sql: String)
final case class Projected(payload: Any, sql: String, bindings: Object*)
4 changes: 2 additions & 2 deletions postgresql/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ postgresql-journal {

db {
# The hostname of the database server
hostname = "localhost"
hostname = "127.0.0.1"

# The port of the database server
port = 5432
Expand Down Expand Up @@ -51,7 +51,7 @@ postgresql-snapshot-store {

db {
# The hostname of the database server
hostname = "localhost"
hostname = "127.0.0.1"

# The port of the database server
port = 5432
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import reactor.core.publisher.Flux
import reactor.util.function.{Tuple2 => RTuple2}
import scala.collection.JavaConverters._

private[journal] object PostgreSqlJournalDao {
private[journal] object LenientPostgreSqlJournalDao {

type EntryTags = JList[RTuple2[Long, Set[String]]]

def insertEntriesQuery(entries: Seq[JournalEntry]): String = {
val projections = entries.flatMap(it => it.projected).reduceOption(_ + ";" + _)
val projections = entries.flatMap(it => it.projected.map(_._1)).reduceOption(_ + ";" + _)
val events = "INSERT INTO event (id, persistence_id, sequence_nr, timestamp, payload, manifest, ser_id, ser_manifest, writer_uuid) VALUES " + entries
.map(it => s"(DEFAULT, '${it.persistenceId}', ${it.sequenceNr}, ${it.timestamp}, '\\x${hexDump(it.event)}', " +
s"'${it.eventManifest}', ${it.serId}, '${it.serManifest}', '${it.writerUuid}')")
Expand Down Expand Up @@ -73,9 +73,9 @@ private[journal] object PostgreSqlJournalDao {
*
* @see [[https://github.com/r2dbc/r2dbc-postgresql r2dbc-postgresql]] for more
*/
final class PostgreSqlJournalDao(val r2dbc: R2dbc) extends JournalDao {
final class LenientPostgreSqlJournalDao(val r2dbc: R2dbc) extends JournalDao {

import PostgreSqlJournalDao._
import LenientPostgreSqlJournalDao._

override def writeEvents(events: Seq[JournalEntry]): Source[Int, NotUsed] = {
val flux: Flux[Integer] = r2dbc.inTransaction((handle: Handle) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[akka] final class PostgreSqlJournal(config: Config)

override implicit val system: ActorSystem = context.system
override protected val dao: JournalDao =
new PostgreSqlJournalDao(R2dbc(ConnectionPoolFactory("postgresql", JournalPluginConfig(config))))
// new PostgreSqlJournalDao(R2dbc(ConnectionPoolFactory("postgresql", JournalPluginConfig(config))))
new StrictPostgreSqlJournalDao(R2dbc(ConnectionPoolFactory("postgresql", JournalPluginConfig(config))))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package akka.persistence.postgresql.journal

import akka.NotUsed
import akka.persistence.postgresql.journal.LenientPostgreSqlJournalDao.EntryTags
import akka.persistence.postgresql.journal.StrictPostgreSqlJournalDao.{findEventsQuery, highestSeqNrQuery, insertEntriesQuery, insertTagsQuery}
import akka.persistence.r2dbc.client.{Handle, R2dbc}
import akka.persistence.r2dbc.journal.ResultUtils.{toJournalEntry, toSeqId}
import akka.persistence.r2dbc.journal.{JournalDao, JournalEntry}
import akka.stream.scaladsl.Source
import io.r2dbc.spi.Result
import reactor.core.publisher.Flux

import scala.collection.JavaConverters._

private[journal] object StrictPostgreSqlJournalDao {

val insertEntriesQuery: String = "INSERT INTO event (id, persistence_id, sequence_nr, timestamp, payload, manifest, " +
"ser_id, ser_manifest, writer_uuid) VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8)"

val insertTagsQuery: String = "INSERT INTO tag (id, event_id, tag) VALUES (DEFAULT, $1, $2)"

val findEventsQuery = "SELECT id, persistence_id, sequence_nr, timestamp, payload, manifest, ser_id, ser_manifest, writer_uuid FROM event" +
" WHERE deleted = false AND persistence_id = $1" +
" AND sequence_nr BETWEEN $2 AND $3 ORDER BY sequence_nr ASC LIMIT $4"

val highestSeqNrQuery: String = "SELECT sequence_nr FROM event WHERE persistence_id = $1" +
" AND sequence_nr >= $2 ORDER BY sequence_nr DESC LIMIT 1"
}

final class StrictPostgreSqlJournalDao(val r2dbc: R2dbc) extends JournalDao {

val lenient = new LenientPostgreSqlJournalDao(r2dbc)

override def writeEvents(entries: Seq[JournalEntry]): Source[Int, NotUsed] = {
val flux: Flux[Integer] = r2dbc.inTransaction((handle: Handle) => {
val projections = entries.flatMap(it => it.projected).map(it => {
handle.executeQuery(it._1, Seq(it._2), _.getRowsUpdated)
})
val events = entries.map(it => Array(it.persistenceId, it.sequenceNr, it.timestamp, it.event,
it.eventManifest, it.serId, it.serManifest, it.writerUuid))

Flux.concat(projections.asJava)
.thenMany(handle.executeQuery(insertEntriesQuery, events, toSeqId(_, "id")))
.zipWithIterable(entries.flatMap(it => Set(it.tags)).asJava)
.collectList
.filter((it: EntryTags) => it.asScala.map(_.getT2).exists(_.nonEmpty))
.flatMapMany((eventTags: EntryTags) => {
val tags = eventTags.asScala.flatMap(it => it.getT2.map(Array(it.getT1, _))).toSeq
handle.executeQuery(insertTagsQuery, tags, _.getRowsUpdated)
})
})
Source.fromPublisher(flux.defaultIfEmpty(0)).map(_.toInt)
}

override def fetchEvents(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, max: Long): Source[JournalEntry, NotUsed] = {
val flux = r2dbc.withHandle { handle =>
handle.executeQuery(findEventsQuery, Seq(Array(persistenceId, fromSeqNr, toSeqNr, max)), toJournalEntry(_))
}
Source.fromPublisher(flux)
}

override def deleteEvents(persistenceId: String, toSeqNr: Long): Source[Int, NotUsed] = {
lenient.deleteEvents(persistenceId, toSeqNr)
}

override def readHighestSequenceNr(persistenceId: String, fromSeqNr: Long): Source[Long, NotUsed] = {
val flux = r2dbc.withHandle { handle =>
val toSeqNr = (result: Result) => toSeqId(result, "sequence_nr")
handle.executeQuery(highestSeqNrQuery, Seq(Array(persistenceId, fromSeqNr)), toSeqNr)
}
Source.fromPublisher(flux)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,34 @@ final class Handle private(val connection: Connection) {
Flux.from(this.connection.createStatement(sql).execute).flatMap(fn(_))
}

/**
* Executes the given SQL statement, and transforms each [[Result]]s that are returned from
* execution.
*
* @param sql SQL statement
* @param fn a function used to transform each [[Result]] into a [[Flux]] of values
* @tparam T the type of results
* @return the values resulting from the [[Result]] transformation
* @throws IllegalArgumentException if `sql` or `fn` is `null`
*/
def executeQuery[T](sql: String, bindings: Seq[Array[Any]], fn: Result => _ <: Publisher[T]): Flux[T] = {
require(sql != null, SQL_REQUIRED)
require(fn != null, FN_REQUIRED)

println(s"Running query $sql")
val statement = this.connection.createStatement(sql)
var count = 0
bindings.foreach(it => {
it.indices.foreach(i => statement.bind(i, it(i)))
count += 1;
if (count < bindings.size) {
statement.add()
}
})

Flux.from(statement.execute).flatMap(fn(_))
}

/**
* Commits the current transaction.
*
Expand Down