1
1
package akka .persistence .inmemory .journal
2
2
3
+ import java .util .concurrent .ConcurrentHashMap
4
+
3
5
import akka .actor .ActorLogging
4
- import akka .persistence .{PersistentConfirmation , PersistentId , PersistentRepr }
5
6
import akka .persistence .journal .AsyncWriteJournal
7
+ import akka .persistence .{PersistentConfirmation , PersistentId , PersistentRepr }
6
8
7
- import java .util .concurrent .ConcurrentHashMap
8
9
import scala .collection .JavaConverters ._
9
10
import scala .collection .immutable .Seq
10
11
import scala .concurrent .Future
@@ -13,122 +14,83 @@ class InMemoryJournal extends AsyncWriteJournal with ActorLogging {
13
14
implicit val ec = context.system.dispatcher
14
15
val journal : scala.collection.mutable.Map [String , List [PersistentRepr ]] = new ConcurrentHashMap [String , List [PersistentRepr ]].asScala
15
16
16
- override def asyncWriteMessages (messages : Seq [PersistentRepr ]): Future [Unit ] = {
17
- Future [Unit ] {
18
- val mess = messages
19
- log.debug(" writeMessages for {} persistent messages" , mess.size)
20
- mess.foreach { repr =>
21
- import repr ._
22
- journal.get(persistenceId) match {
23
- case None => journal.put(processorId, List (repr))
24
- case Some (list) => journal.put(processorId, repr :: list)
25
- }
17
+ override def asyncWriteMessages (messages : Seq [PersistentRepr ]): Future [Unit ] = Future {
18
+ log.debug(" writeMessages for {} persistent messages" , messages.size)
19
+ messages.foreach { repr =>
20
+ import repr ._
21
+ val journalForPersistenceId = journal.get(persistenceId)
22
+ if (journalForPersistenceId.isEmpty) {
23
+ journal.put(processorId, List (repr))
24
+ } else {
25
+ journalForPersistenceId foreach (xs => journal.put(processorId, repr :: xs))
26
26
}
27
27
}
28
28
}
29
29
30
- override def asyncDeleteMessagesTo (persistenceId : String , toSequenceNr : Long , permanent : Boolean ): Future [Unit ] = {
31
- Future [Unit ] {
32
- val perm = permanent
33
- val pid = persistenceId
34
- val toSeq = toSequenceNr
35
- log.debug(" asyncDeleteMessagesTo for processorId: {} to sequenceNr: {}, permanent: {}" , pid, toSeq, perm)
36
- perm match {
37
- case true =>
38
- journal.get(pid) match {
39
- case None =>
40
- case Some (list) => journal.put(pid, list.filterNot(_.sequenceNr <= toSeq))
41
- }
42
- case false =>
43
- journal.get(pid) match {
44
- case None =>
45
- case Some (list) => journal.put(pid, list.map { repr =>
46
- if (repr.sequenceNr <= toSeq) repr.update(deleted = true ) else repr
47
- })
48
- }
30
+ override def asyncDeleteMessagesTo (persistenceId : String , toSequenceNr : Long , permanent : Boolean ): Future [Unit ] = Future {
31
+ log.debug(" asyncDeleteMessagesTo for processorId: {} to sequenceNr: {}, permanent: {}" , persistenceId, toSequenceNr, permanent)
32
+ if (permanent) {
33
+ journal.get(persistenceId) foreach { list =>
34
+ journal.put(persistenceId, list.filterNot(_.sequenceNr <= toSequenceNr))
35
+ }
36
+ } else {
37
+ journal.get(persistenceId) foreach { list =>
38
+ journal.put(persistenceId, list.map { repr => if (repr.sequenceNr <= toSequenceNr) repr.update(deleted = true ) else repr })
49
39
}
50
40
}
51
41
}
52
42
53
43
@ scala.deprecated(" writeConfirmations will be removed, since Channels will be removed." )
54
- override def asyncWriteConfirmations (confirmations : Seq [PersistentConfirmation ]): Future [Unit ] = {
55
- Future [Unit ] {
56
- val confirms = confirmations
57
- log.debug(" writeConfirmations for {} messages" , confirms.size)
58
- confirms.foreach { confirmation =>
59
- import confirmation ._
60
- journal.get(persistenceId) match {
61
- case None =>
62
- case Some (list) =>
63
- journal.put(persistenceId, list.map { msg =>
64
- if (msg.sequenceNr == sequenceNr) {
65
- val confirmationIds = msg.confirms :+ confirmation.channelId
66
- msg.update(confirms = confirmationIds)
67
- } else msg
68
- })
69
- }
44
+ override def asyncWriteConfirmations (confirmations : Seq [PersistentConfirmation ]): Future [Unit ] = Future {
45
+ log.debug(" writeConfirmations for {} messages" , confirmations.size)
46
+ confirmations.foreach { confirmation =>
47
+ import confirmation ._
48
+ journal.get(persistenceId).foreach { list =>
49
+ journal.put(persistenceId, list.map { msg =>
50
+ if (msg.sequenceNr == sequenceNr) {
51
+ val confirmationIds = msg.confirms :+ confirmation.channelId
52
+ msg.update(confirms = confirmationIds)
53
+ } else msg
54
+ })
70
55
}
71
56
}
72
57
}
73
58
74
59
@ scala.deprecated(" asyncDeleteMessages will be removed." )
75
- override def asyncDeleteMessages (messageIds : Seq [PersistentId ], permanent : Boolean ): Future [Unit ] = {
76
- Future [Unit ] {
77
- val mids = messageIds
78
- val perm = permanent
79
- log.debug(" Async delete {} messages, permanent: {}" , mids.size, perm)
80
-
81
- mids.foreach { persistentId =>
82
- import persistentId ._
83
- perm match {
84
- case true =>
85
- journal.get(processorId) match {
86
- case None =>
87
- case Some (list) => journal.put(processorId, list.filterNot(_.sequenceNr == sequenceNr))
88
- }
89
- case false =>
90
- journal.get(processorId) match {
91
- case None =>
92
- case Some (list) => journal.put(processorId, list.map { repr =>
93
- if (repr.sequenceNr == sequenceNr) repr.update(deleted = true ) else repr
94
- })
95
- }
60
+ override def asyncDeleteMessages (messageIds : Seq [PersistentId ], permanent : Boolean ): Future [Unit ] = Future {
61
+ log.debug(" Async delete {} messages, permanent: {}" , messageIds.size, permanent)
62
+ messageIds.foreach { persistentId =>
63
+ import persistentId ._
64
+ if (permanent) {
65
+ journal.get(processorId) foreach { list =>
66
+ journal.put(processorId, list.filterNot(_.sequenceNr == sequenceNr))
67
+ }
68
+ } else {
69
+ journal.get(processorId) foreach { list =>
70
+ journal.put(processorId, list.map { repr =>
71
+ if (repr.sequenceNr == sequenceNr) repr.update(deleted = true ) else repr
72
+ })
96
73
}
97
74
}
98
75
}
99
76
}
100
77
101
- override def asyncReadHighestSequenceNr (persistenceId : String , fromSequenceNr : Long ): Future [Long ] = {
102
- Future [Long ] {
103
- val pid = persistenceId
104
- val fromSeq = fromSequenceNr
105
- log.debug(" Async read for highest sequence number for processorId: {} (hint, seek from nr: {})" , pid, fromSeq)
106
- journal.get(pid) match {
107
- case None | Some (Nil ) => 0
108
- case Some (list) => list.map(_.sequenceNr).max
109
- }
78
+ override def asyncReadHighestSequenceNr (persistenceId : String , fromSequenceNr : Long ): Future [Long ] = Future {
79
+ log.debug(" Async read for highest sequence number for processorId: {} (hint, seek from nr: {})" , persistenceId, fromSequenceNr)
80
+ journal.get(persistenceId) match {
81
+ case None | Some (Nil ) => 0
82
+ case Some (list) => list.map(_.sequenceNr).max
110
83
}
111
84
}
112
85
113
- override def asyncReplayMessages (persistenceId : String , fromSequenceNr : Long , toSequenceNr : Long , max : Long )(replayCallback : (PersistentRepr ) => Unit ): Future [Unit ] = {
114
- Future [Unit ] {
115
- val pid = persistenceId
116
- val fromSeq = fromSequenceNr
117
- val toSeq = toSequenceNr
118
- val limit = max
119
- val replay = replayCallback
120
-
121
- log.debug(" Async replay for processorId {}, from sequenceNr: {}, to sequenceNr: {} with max records: {}" , pid, fromSeq, toSeq, limit)
122
-
123
- journal.get(pid) match {
124
- case None =>
125
- case Some (list) =>
126
- val takeMax = if (limit >= java.lang.Integer .MAX_VALUE ) java.lang.Integer .MAX_VALUE else limit.toInt
127
- list.filter { repr =>
128
- repr.sequenceNr >= fromSeq && repr.sequenceNr <= toSeq
129
- }.sortBy(_.sequenceNr)
130
- .take(takeMax).foreach(replay)
131
- }
86
+ override def asyncReplayMessages (persistenceId : String , fromSequenceNr : Long , toSequenceNr : Long , max : Long )(replayCallback : (PersistentRepr ) => Unit ): Future [Unit ] = Future {
87
+ log.debug(" Async replay for processorId {}, from sequenceNr: {}, to sequenceNr: {} with max records: {}" , persistenceId, fromSequenceNr, toSequenceNr, max)
88
+ journal.get(persistenceId) foreach { list =>
89
+ val takeMax = if (max >= java.lang.Integer .MAX_VALUE ) java.lang.Integer .MAX_VALUE else max.toInt
90
+ list.filter { repr =>
91
+ repr.sequenceNr >= fromSequenceNr && repr.sequenceNr <= toSequenceNr
92
+ }.sortBy(_.sequenceNr)
93
+ .take(takeMax).foreach(replayCallback)
132
94
}
133
95
}
134
96
}
0 commit comments