@@ -21,12 +21,12 @@ import me.snoty.integration.common.diff.EntityStateService
21
21
import me.snoty.integration.common.diff.STATE_CODEC_REGISTRY
22
22
import me.snoty.integration.common.diff.checksum
23
23
import me.snoty.integration.common.diff.state.EntityState
24
- import me.snoty.integration.common.diff.state.NodeEntityStates
25
24
import me.snoty.integration.common.wiring.Node
26
25
import me.snoty.integration.common.wiring.flow.NodeDeletedHook
27
26
import me.snoty.integration.common.wiring.node.NodeDescriptor
28
27
import org.bson.Document
29
28
import org.bson.codecs.configuration.CodecRegistry
29
+ import org.bson.codecs.pojo.annotations.BsonId
30
30
import org.koin.core.annotation.Factory
31
31
import org.koin.core.annotation.Named
32
32
@@ -37,19 +37,19 @@ class MongoEntityStateService(
37
37
hookRegistry : HookRegistry ,
38
38
@Named(STATE_CODEC_REGISTRY ) codecRegistry : CodecRegistry ,
39
39
) : EntityStateService {
40
- private val nodeEntityStates = mongoDB.getCollection<NodeEntityStates >(" ${integration.mongoCollectionPrefix} :entityStates" )
40
+ private val nodeEntityStates = mongoDB.getCollection<MongoNodeEntityStates >(" ${integration.mongoCollectionPrefix} :entityStates" )
41
41
.withCodecRegistry(codecRegistry)
42
42
43
43
override suspend fun getLastState (nodeId : NodeId , entityId : String ): EntityState ? =
44
44
nodeEntityStates.aggregate<EntityState >(
45
- Aggregates .match(Filters .eq(NodeEntityStates ::nodeId .name, nodeId)),
46
- Aggregates .unwind(NodeEntityStates ::entities.mongoField),
47
- Aggregates .match(Filters .eq(" ${NodeEntityStates ::entities.name} .${EntityState ::id.name} " , entityId)),
48
- Aggregates .replaceRoot(NodeEntityStates ::entities.mongoField)
45
+ Aggregates .match(Filters .eq(MongoNodeEntityStates :: _id .name, nodeId)),
46
+ Aggregates .unwind(MongoNodeEntityStates ::entities.mongoField),
47
+ Aggregates .match(Filters .eq(" ${MongoNodeEntityStates ::entities.name} .${EntityState ::id.name} " , entityId)),
48
+ Aggregates .replaceRoot(MongoNodeEntityStates ::entities.mongoField)
49
49
).firstOrNull()
50
50
51
51
override fun getLastStates (nodeId : NodeId ): Flow <EntityState > =
52
- nodeEntityStates.find(Filters .eq(NodeEntityStates ::nodeId .name, nodeId))
52
+ nodeEntityStates.find(Filters .eq(MongoNodeEntityStates :: _id .name, nodeId))
53
53
.flatMapMerge { it.entities.asFlow() }
54
54
55
55
override suspend fun updateState (nodeId : NodeId , state : Document , diff : DiffResult ) {
@@ -58,15 +58,15 @@ class MongoEntityStateService(
58
58
val entityState = EntityState (id, state, state.checksum())
59
59
60
60
nodeEntityStates.upsertOne(
61
- Filters .eq(NodeEntityStates ::nodeId .name, nodeId),
62
- Updates .addToSet(NodeEntityStates ::entities.name, entityState)
61
+ Filters .eq(MongoNodeEntityStates :: _id .name, nodeId),
62
+ Updates .addToSet(MongoNodeEntityStates ::entities.name, entityState)
63
63
)
64
64
}
65
65
66
66
suspend fun pull () {
67
67
nodeEntityStates.updateOne(
68
- Filters .eq(NodeEntityStates ::nodeId .name, nodeId),
69
- Updates .pull(NodeEntityStates ::entities.name, Filters .eq(EntityState ::id.name, id))
68
+ Filters .eq(MongoNodeEntityStates :: _id .name, nodeId),
69
+ Updates .pull(MongoNodeEntityStates ::entities.name, Filters .eq(EntityState ::id.name, id))
70
70
)
71
71
}
72
72
@@ -83,8 +83,8 @@ class MongoEntityStateService(
83
83
84
84
override suspend fun updateStates (nodeId : NodeId , states : Collection <EntityStateService .EntityStateUpdate >) {
85
85
nodeEntityStates.upsertOne(
86
- Filters .eq(NodeEntityStates ::nodeId .name, nodeId),
87
- Updates .set(NodeEntityStates ::entities.name, states.map { it.state })
86
+ Filters .eq(MongoNodeEntityStates :: _id .name, nodeId),
87
+ Updates .set(MongoNodeEntityStates ::entities.name, states.map { it.state })
88
88
)
89
89
}
90
90
@@ -95,6 +95,12 @@ class MongoEntityStateService(
95
95
}
96
96
97
97
override suspend fun delete (node : Node ) {
98
- nodeEntityStates.deleteOne(Filters .eq(NodeEntityStates ::nodeId .name, node._id ))
98
+ nodeEntityStates.deleteOne(Filters .eq(MongoNodeEntityStates :: _id .name, node._id ))
99
99
}
100
100
}
101
+
102
+ private data class MongoNodeEntityStates (
103
+ @BsonId
104
+ val _id : String ,
105
+ val entities : Set <EntityState >
106
+ )
0 commit comments