Skip to content

Commit e883040

Browse files
author
Matteo Di Pirro
committed
Fix compilation warnings for Scala 3 (#15)
1 parent 8eae247 commit e883040

24 files changed

+680
-311
lines changed

src/main/scala/io/github/alstanchev/pekko/persistence/inmemory/extension/InMemoryJournalStorage.scala src/main/scala-2/io/github/alstanchev/pekko/persistence/inmemory/extension/InMemoryJournalStorage.scala

+4-6
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,17 @@
1616

1717
package io.github.alstanchev.pekko.persistence.inmemory.extension
1818

19+
import io.github.alstanchev.pekko.persistence.inmemory.util.UUIDs
20+
import io.github.alstanchev.pekko.persistence.inmemory.{ JournalEntry, mapSeqToVector }
1921
import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef, NoSerializationVerificationNeeded }
2022
import org.apache.pekko.event.LoggingReceive
2123
import org.apache.pekko.persistence.PersistentRepr
22-
import io.github.alstanchev.pekko.persistence.inmemory.JournalEntry
23-
import io.github.alstanchev.pekko.persistence.inmemory.util.UUIDs
2424
import org.apache.pekko.persistence.query.{ NoOffset, Offset, Sequence, TimeBasedUUID }
2525
import org.apache.pekko.serialization.Serialization
26-
27-
import scala.collection.immutable._
28-
import scalaz.syntax.semigroup._
2926
import scalaz.std.AllInstances._
27+
import scalaz.syntax.semigroup._
3028

31-
import io.github.alstanchev.pekko.persistence.inmemory.mapSeqToVector
29+
import scala.collection.immutable._
3230

3331
object InMemoryJournalStorage {
3432
sealed trait JournalCommand extends NoSerializationVerificationNeeded

src/main/scala/io/github/alstanchev/pekko/persistence/inmemory/extension/InMemorySnapshotStorage.scala src/main/scala-2/io/github/alstanchev/pekko/persistence/inmemory/extension/InMemorySnapshotStorage.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package io.github.alstanchev.pekko.persistence.inmemory.extension
1818

19-
import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef }
2019
import io.github.alstanchev.pekko.persistence.inmemory.SnapshotEntry
20+
import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef }
2121
import scalaz.std.AllInstances._
2222
import scalaz.syntax.all._
2323

src/main/scala/io/github/alstanchev/pekko/persistence/inmemory/extension/StorageExtensionByProperty.scala src/main/scala-2/io/github/alstanchev/pekko/persistence/inmemory/extension/StorageExtensionByProperty.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package io.github.alstanchev.pekko.persistence.inmemory.extension
1818

19+
import com.typesafe.config.Config
1920
import org.apache.pekko.actor._
2021
import org.apache.pekko.serialization.SerializationExtension
21-
import com.typesafe.config.Config
2222

2323
import scala.collection.JavaConverters._
2424

src/main/scala/io/github/alstanchev/pekko/persistence/inmemory/package.scala src/main/scala-2/io/github/alstanchev/pekko/persistence/inmemory/package.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,15 @@ package io.github.alstanchev.pekko.persistence
1818

1919
import io.github.alstanchev.pekko.persistence.inmemory.util.UUIDs
2020
import org.apache.pekko.persistence.PersistentRepr
21-
22-
import java.util.UUID
2321
import org.apache.pekko.persistence.query.TimeBasedUUID
2422

23+
import java.util.UUID
2524
import scala.collection.immutable._
26-
import scala.compat.Platform
2725

2826
package object inmemory {
2927
type Seq[A] = scala.collection.immutable.Seq[A]
3028

31-
def now: Long = Platform.currentTime
29+
def now: Long = java.lang.System.currentTimeMillis()
3230
def nowUuid: UUID = UUIDs.timeBased()
3331
def getTimeBasedUUID: TimeBasedUUID = TimeBasedUUID(nowUuid)
3432

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package io.github.alstanchev.pekko.persistence.inmemory.extension
2+
3+
/*
4+
* Copyright 2016 Dennis Vriend
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef, NoSerializationVerificationNeeded }
20+
import org.apache.pekko.event.LoggingReceive
21+
import org.apache.pekko.persistence.PersistentRepr
22+
import io.github.alstanchev.pekko.persistence.inmemory.JournalEntry
23+
import io.github.alstanchev.pekko.persistence.inmemory.util.UUIDs
24+
import org.apache.pekko.persistence.query.{ NoOffset, Offset, Sequence, TimeBasedUUID }
25+
import org.apache.pekko.serialization.Serialization
26+
27+
import scala.collection.immutable._
28+
import scalaz.syntax.semigroup._
29+
import scalaz.std.AllInstances._
30+
31+
import io.github.alstanchev.pekko.persistence.inmemory.mapSeqToVector
32+
33+
object InMemoryJournalStorage {
34+
sealed trait JournalCommand extends NoSerializationVerificationNeeded
35+
case object PersistenceIds extends JournalCommand
36+
final case class HighestSequenceNr(persistenceId: String, fromSequenceNr: Long) extends JournalCommand
37+
final case class EventsByTag(tag: String, offset: Offset) extends JournalCommand
38+
final case class PersistenceIds(queryListOfPersistenceIds: Seq[String]) extends JournalCommand
39+
final case class WriteList(xs: Seq[JournalEntry]) extends JournalCommand
40+
final case class Delete(persistenceId: String, toSequenceNr: Long) extends JournalCommand
41+
final case class GetJournalEntriesExceptDeleted(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) extends JournalCommand
42+
final case class GetAllJournalEntries(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) extends JournalCommand
43+
44+
/**
45+
* Java API
46+
*/
47+
def clearJournal(): ClearJournal = ClearJournal
48+
49+
sealed abstract class ClearJournal
50+
case object ClearJournal extends ClearJournal with JournalCommand
51+
52+
def getPersistenceId(prod: (String, Vector[JournalEntry])): String = prod._1
53+
def getEntries(prod: (String, Vector[JournalEntry])): Vector[JournalEntry] = prod._2
54+
def getEventsByPid(pid: String, journal: Map[String, Vector[JournalEntry]]): Option[Vector[JournalEntry]] =
55+
journal.find(_._1 == pid).map(_._2)
56+
def getAllEvents(journal: Map[String, Vector[JournalEntry]]): Vector[JournalEntry] =
57+
journal.values.flatten[JournalEntry].toVector
58+
def getMaxSequenceNr(xs: Vector[JournalEntry]): Long = xs.map(_.sequenceNr).max
59+
}
60+
61+
class InMemoryJournalStorage(serialization: Serialization) extends Actor with ActorLogging {
62+
import InMemoryJournalStorage._
63+
64+
var ordering: Long = 0L
65+
66+
def incrementAndGet: Long = {
67+
ordering += 1
68+
ordering
69+
}
70+
71+
var journal = Map.empty[String, Vector[JournalEntry]]
72+
73+
def allPersistenceIds(ref: ActorRef): Unit =
74+
ref ! org.apache.pekko.actor.Status.Success(journal.keySet)
75+
76+
def highestSequenceNr(ref: ActorRef, persistenceId: String, fromSequenceNr: Long): Unit = {
77+
val highestSequenceNrJournal = getEventsByPid(persistenceId, journal).map(getMaxSequenceNr).getOrElse(0L)
78+
ref ! org.apache.pekko.actor.Status.Success(highestSequenceNrJournal)
79+
}
80+
81+
def eventsByTag(ref: ActorRef, tag: String, offset: Offset): Unit = {
82+
def increment(offset: Long): Long = offset + 1
83+
def getByOffset(p: JournalEntry => Boolean): List[JournalEntry] = {
84+
val xs = getAllEvents(journal)
85+
.filter(_.tags.contains(tag)).toList
86+
.sortBy(_.ordering)
87+
.zipWithIndex.map {
88+
case (entry, index) =>
89+
entry.copy(offset = Option(increment(index)))
90+
}
91+
92+
xs.filter(p)
93+
}
94+
95+
val xs: List[JournalEntry] = offset match {
96+
case NoOffset => getByOffset(_.offset.exists(_ >= 0L))
97+
case Sequence(value) => getByOffset(_.offset.exists(_ > value))
98+
case value: TimeBasedUUID => getByOffset(p => UUIDs.TimeBasedUUIDOrdering.gt(p.timestamp, value))
99+
}
100+
101+
ref ! org.apache.pekko.actor.Status.Success(xs)
102+
}
103+
104+
def writelist(ref: ActorRef, xs: Seq[JournalEntry]): Unit = {
105+
val ys = xs.map(_.copy(ordering = incrementAndGet)).groupBy(_.persistenceId)
106+
journal = journal |+| ys
107+
108+
ref ! org.apache.pekko.actor.Status.Success(())
109+
}
110+
111+
def delete(ref: ActorRef, persistenceId: String, toSequenceNr: Long): Unit = {
112+
val pidEntries = journal.filter(_._1 == persistenceId)
113+
val notDeleted = pidEntries.view.mapValues(_.filterNot(_.sequenceNr <= toSequenceNr))
114+
115+
val deleted = pidEntries
116+
.view.mapValues(_.filter(_.sequenceNr <= toSequenceNr).map { journalEntry =>
117+
val updatedRepr: PersistentRepr = journalEntry.repr.update(deleted = true)
118+
val byteArray: Array[Byte] = serialization.serialize(updatedRepr) match {
119+
case scala.util.Success(arr) => arr
120+
case scala.util.Failure(cause) => throw cause
121+
}
122+
journalEntry.copy(deleted = true).copy(serialized = byteArray).copy(repr = updatedRepr)
123+
})
124+
125+
journal = journal.filterNot(_._1 == persistenceId) |+| deleted.toMap |+| notDeleted.toMap
126+
127+
ref ! org.apache.pekko.actor.Status.Success("")
128+
}
129+
130+
def messages(ref: ActorRef, persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long, all: Boolean): Unit = {
131+
def toTake = if (max >= Int.MaxValue) Int.MaxValue else max.toInt
132+
val pidEntries: Map[String, Vector[JournalEntry]] = journal.filter(_._1 == persistenceId)
133+
val xs: List[JournalEntry] = pidEntries.flatMap(_._2)
134+
.filter(_.sequenceNr >= fromSequenceNr)
135+
.filter(_.sequenceNr <= toSequenceNr)
136+
.toList.sortBy(_.sequenceNr)
137+
138+
val ys = if (all) xs else xs.filterNot(_.deleted)
139+
140+
val zs = ys.take(toTake)
141+
142+
ref ! org.apache.pekko.actor.Status.Success(zs)
143+
}
144+
145+
def clear(ref: ActorRef): Unit = {
146+
ordering = 0L
147+
journal = Map.empty[String, Vector[JournalEntry]]
148+
149+
ref ! org.apache.pekko.actor.Status.Success("")
150+
}
151+
152+
override def receive: Receive = LoggingReceive {
153+
case PersistenceIds => allPersistenceIds(sender())
154+
case HighestSequenceNr(persistenceId, fromSequenceNr) => highestSequenceNr(sender(), persistenceId, fromSequenceNr)
155+
case EventsByTag(tag, offset) => eventsByTag(sender(), tag, offset)
156+
case WriteList(xs) => writelist(sender(), xs)
157+
case Delete(persistenceId, toSequenceNr) => delete(sender(), persistenceId, toSequenceNr)
158+
case GetJournalEntriesExceptDeleted(persistenceId, fromSequenceNr, toSequenceNr, max) => messages(sender(), persistenceId, fromSequenceNr, toSequenceNr, max, all = false)
159+
case GetAllJournalEntries(persistenceId, fromSequenceNr, toSequenceNr, max) => messages(sender(), persistenceId, fromSequenceNr, toSequenceNr, max, all = true)
160+
case ClearJournal => clear(sender())
161+
}
162+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package io.github.alstanchev.pekko.persistence.inmemory.extension
2+
3+
/*
4+
* Copyright 2016 Dennis Vriend
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef }
20+
import io.github.alstanchev.pekko.persistence.inmemory.SnapshotEntry
21+
import scalaz.std.AllInstances._
22+
import scalaz.syntax.all._
23+
24+
object InMemorySnapshotStorage {
25+
sealed trait SnapshotCommand
26+
final case class Delete(persistenceId: String, sequenceNr: Long) extends SnapshotCommand
27+
final case class DeleteAllSnapshots(persistenceId: String) extends SnapshotCommand
28+
final case class DeleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long) extends SnapshotCommand
29+
final case class DeleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long) extends SnapshotCommand
30+
final case class DeleteUpToMaxSequenceNrAndMaxTimestamp(persistenceId: String, maxSequenceNr: Long, maxTimestamp: Long) extends SnapshotCommand
31+
final case class Save(persistenceId: String, sequenceNr: Long, timestamp: Long, snapshot: Array[Byte]) extends SnapshotCommand
32+
final case class SnapshotForMaxSequenceNr(persistenceId: String, sequenceNr: Long) extends SnapshotCommand
33+
final case class SnapshotForMaxSequenceNrAndMaxTimestamp(persistenceId: String, sequenceNr: Long, timestamp: Long) extends SnapshotCommand
34+
final case class SnapshotForMaxTimestamp(persistenceId: String, timestamp: Long) extends SnapshotCommand
35+
36+
sealed abstract class ClearSnapshots
37+
case object ClearSnapshots extends ClearSnapshots with SnapshotCommand
38+
39+
/**
40+
* Java API
41+
*/
42+
def clearSnapshots(): ClearSnapshots = ClearSnapshots
43+
}
44+
45+
class InMemorySnapshotStorage extends Actor with ActorLogging {
46+
import InMemorySnapshotStorage._
47+
48+
var snapshot = Map.empty[String, Vector[SnapshotEntry]]
49+
50+
def clear(ref: ActorRef): Unit = {
51+
snapshot = Map.empty[String, Vector[SnapshotEntry]]
52+
ref ! org.apache.pekko.actor.Status.Success("")
53+
}
54+
55+
def delete(persistenceId: String, predicate: SnapshotEntry => Boolean): Unit = {
56+
val pidEntries = snapshot.filter(_._1 == persistenceId)
57+
val notDeleted = pidEntries.view.mapValues(_.filterNot(predicate))
58+
snapshot = snapshot.filterNot(_._1 == persistenceId) |+| notDeleted.toMap
59+
}
60+
61+
def delete(ref: ActorRef, persistenceId: String, sequenceNr: Long): Unit = {
62+
delete(persistenceId, _.sequenceNumber == sequenceNr)
63+
64+
ref ! org.apache.pekko.actor.Status.Success("")
65+
}
66+
67+
def deleteUpToMaxSequenceNr(ref: ActorRef, persistenceId: String, maxSequenceNr: Long): Unit = {
68+
delete(persistenceId, _.sequenceNumber <= maxSequenceNr)
69+
70+
ref ! org.apache.pekko.actor.Status.Success("")
71+
}
72+
73+
def deleteAllSnapshots(ref: ActorRef, persistenceId: String): Unit = {
74+
snapshot -= persistenceId
75+
76+
ref ! org.apache.pekko.actor.Status.Success("")
77+
}
78+
79+
def deleteUpToMaxTimestamp(ref: ActorRef, persistenceId: String, maxTimestamp: Long): Unit = {
80+
val deleteUpToMaxTimestampPredicate: SnapshotEntry => Boolean =
81+
(entry: SnapshotEntry) => entry.created <= maxTimestamp
82+
83+
delete(persistenceId, deleteUpToMaxTimestampPredicate)
84+
85+
ref ! org.apache.pekko.actor.Status.Success("")
86+
}
87+
88+
def deleteUpToMaxSequenceNrAndMaxTimestamp(ref: ActorRef, persistenceId: String, maxSequenceNr: Long, maxTimestamp: Long): Unit = {
89+
delete(persistenceId, (x: SnapshotEntry) => x.sequenceNumber <= maxSequenceNr && x.created <= maxTimestamp)
90+
91+
ref ! org.apache.pekko.actor.Status.Success("")
92+
}
93+
94+
def save(ref: ActorRef, persistenceId: String, sequenceNr: Long, timestamp: Long, data: Array[Byte]): Unit = {
95+
val key = persistenceId
96+
snapshot = snapshot |+| Map(key -> Vector(SnapshotEntry(persistenceId, sequenceNr, timestamp, data)))
97+
98+
ref ! org.apache.pekko.actor.Status.Success("")
99+
}
100+
101+
def snapshotFor(ref: ActorRef, persistenceId: String)(p: SnapshotEntry => Boolean): Unit = {
102+
val determine: Option[SnapshotEntry] = snapshot.get(persistenceId).flatMap(_.find(p))
103+
104+
ref ! org.apache.pekko.actor.Status.Success(determine)
105+
}
106+
107+
def snapshotForMaxSequenceNr(ref: ActorRef, persistenceId: String, sequenceNr: Long): Unit = {
108+
val determine = snapshot.get(persistenceId).flatMap(xs => xs.filter(_.sequenceNumber <= sequenceNr).toList.sortBy(_.sequenceNumber).reverse.headOption)
109+
ref ! org.apache.pekko.actor.Status.Success(determine)
110+
}
111+
112+
def snapshotForMaxSequenceNrAndMaxTimestamp(ref: ActorRef, persistenceId: String, sequenceNr: Long, timestamp: Long): Unit = {
113+
val snapshotForMaxSequenceNrAndMaxTimestampPredicate: SnapshotEntry => Boolean =
114+
(entry: SnapshotEntry) => entry.sequenceNumber == sequenceNr && entry.created <= timestamp
115+
snapshotFor(ref, persistenceId)(snapshotForMaxSequenceNrAndMaxTimestampPredicate)
116+
}
117+
118+
def snapshotForMaxTimestamp(ref: ActorRef, persistenceId: String, timestamp: Long): Unit = {
119+
val snapshotForMaxTimestampPredicate: SnapshotEntry => Boolean =
120+
(entry: SnapshotEntry) => entry.created < timestamp
121+
snapshotFor(ref, persistenceId)(snapshotForMaxTimestampPredicate)
122+
}
123+
124+
override def receive: Receive = {
125+
case Delete(persistenceId: String, sequenceNr: Long) => delete(sender(), persistenceId, sequenceNr)
126+
case DeleteAllSnapshots(persistenceId: String) => deleteAllSnapshots(sender(), persistenceId)
127+
case DeleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long) => deleteUpToMaxTimestamp(sender(), persistenceId, maxTimestamp)
128+
case DeleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long) => deleteUpToMaxSequenceNr(sender(), persistenceId, maxSequenceNr)
129+
case DeleteUpToMaxSequenceNrAndMaxTimestamp(persistenceId: String, maxSequenceNr: Long, maxTimestamp: Long) => deleteUpToMaxSequenceNrAndMaxTimestamp(sender(), persistenceId, maxSequenceNr, maxTimestamp)
130+
case ClearSnapshots => clear(sender())
131+
case Save(persistenceId: String, sequenceNr: Long, timestamp: Long, snapshot: Array[Byte]) => save(sender(), persistenceId, sequenceNr, timestamp, snapshot)
132+
case SnapshotForMaxSequenceNr(persistenceId: String, sequenceNr: Long) => snapshotForMaxSequenceNr(sender(), persistenceId, sequenceNr)
133+
case SnapshotForMaxSequenceNrAndMaxTimestamp(persistenceId: String, sequenceNr: Long, timestamp: Long) => snapshotForMaxSequenceNrAndMaxTimestamp(sender(), persistenceId, sequenceNr, timestamp)
134+
case SnapshotForMaxTimestamp(persistenceId: String, timestamp: Long) => snapshotForMaxTimestamp(sender(), persistenceId, timestamp)
135+
}
136+
}

0 commit comments

Comments
 (0)