-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathConcurrentObjectMap.scala
56 lines (50 loc) · 1.21 KB
/
ConcurrentObjectMap.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//
// Copyright 2022- IBM Inc. All rights reserved
// SPDX-License-Identifier: Apache 2.0
//
package org.apache.spark.shuffle
import scala.collection.concurrent.TrieMap
import scala.util.Try
class ConcurrentObjectMap[K, V] {
private val lock = new Object()
private val valueLocks = new TrieMap[K, Object]()
private val map = new TrieMap[K, V]()
def clear(): Unit = {
lock.synchronized {
map.clear()
}
}
def getOrElsePut(key: K, op: K => V): V = {
val l = valueLocks
.get(key)
.getOrElse({
lock.synchronized {
valueLocks.getOrElseUpdate(
key, {
new Object()
}
)
}
})
l.synchronized {
return map.getOrElseUpdate(key, op(key))
}
}
def remove(filter: (K) => Boolean, action: Option[(V) => Unit]): Unit = {
lock.synchronized {
val keys = valueLocks.filterKeys(filter)
keys.foreach(v => {
val key = v._1
val l = v._2
l.synchronized {
valueLocks.remove(key)
val obj = map.get(key)
if (obj.isDefined && action.isDefined) {
Try(action.get(obj.get))
}
}
map.remove(key)
})
}
}
}