Skip to content

Commit 90d2523

Browse files
author
kawada_yusuke
committed
some
1 parent e48ac49 commit 90d2523

File tree

6 files changed

+98
-1
lines changed

6 files changed

+98
-1
lines changed

event/add-relation-ev.go

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (ev *AddRelationEv) ExecStreamPayloads(c context.Context) ([]*gen.StreamPay
6464
return nil, err
6565
}
6666

67+
//TODO(taco) use helper
6768
return []*gen.StreamPayload{
6869
{
6970
Operation: gen.StreamPayload_NEW,
@@ -87,6 +88,7 @@ func (ev *AddRelationEv) Undo(c context.Context) error {
8788
}
8889

8990
func (ev *AddRelationEv) UndoStreamPayloads(c context.Context) ([]*gen.StreamPayload, error) {
91+
//TODO(taco) use helper
9092
return []*gen.StreamPayload{
9193
{
9294
Operation: gen.StreamPayload_DELETE,

event/dao/entity.go

+28
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/ajainc/chain/ctx/docdb"
7+
"github.com/ajainc/chain/ctx/logger"
78
"github.com/ajainc/chain/grpc/gen"
89
"github.com/fatih/structs"
910
"github.com/mitchellh/mapstructure"
@@ -63,6 +64,33 @@ func GetEntityByObjectID(c context.Context, objectID string) (int, gen.Entity, e
6364
return id, *entity, nil
6465
}
6566

67+
//GetAllEntities
68+
func GetAllEntities(c context.Context) ([]*gen.Entity, error) {
69+
db := docdb.FromContext(c)
70+
71+
entities := db.Use(docdb.COLL_ENTITY)
72+
var result []*gen.Entity
73+
entities.ForEachDoc(func(id int, docContent []byte) bool {
74+
doc, err := entities.Read(id)
75+
if err != nil {
76+
logger.Error(c, err)
77+
return false
78+
}
79+
80+
entity := new(gen.Entity)
81+
err = UnmarshalEntity(doc, entity)
82+
if err != nil {
83+
logger.Error(c, err)
84+
return false
85+
}
86+
result = append(result, entity)
87+
88+
return true
89+
})
90+
91+
return result, nil
92+
}
93+
6694
func UnmarshalEntity(src map[string]interface{}, dst *gen.Entity) error {
6795
return mapstructure.Decode(src, dst)
6896
}

event/dao/relation.go

+28
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/ajainc/chain/ctx/docdb"
7+
"github.com/ajainc/chain/ctx/logger"
78
"github.com/ajainc/chain/grpc/gen"
89
"github.com/fatih/structs"
910
"github.com/mitchellh/mapstructure"
@@ -44,6 +45,33 @@ func GetRelationByObjectID(c context.Context, objectID string) (int, gen.Rel, er
4445
return id, *rel, nil
4546
}
4647

48+
//GetAllRelations
49+
func GetAllRelations(c context.Context) ([]*gen.Rel, error) {
50+
db := docdb.FromContext(c)
51+
52+
rels := db.Use(docdb.COLL_REL)
53+
var result []*gen.Rel
54+
rels.ForEachDoc(func(id int, docContent []byte) bool {
55+
doc, err := rels.Read(id)
56+
if err != nil {
57+
logger.Error(c, err)
58+
return false
59+
}
60+
61+
entity := new(gen.Rel)
62+
err = UnmarshalRelation(doc, entity)
63+
if err != nil {
64+
logger.Error(c, err)
65+
return false
66+
}
67+
result = append(result, entity)
68+
69+
return true
70+
})
71+
72+
return result, nil
73+
}
74+
4775
func UnmarshalRelation(src map[string]interface{}, dst *gen.Rel) error {
4876
return mapstructure.Decode(src, dst)
4977
}

event/executor.go

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ func Undo(c context.Context, ev Event, streamBroadcastCh chan *gen.StreamPayload
5151
streamPayloads, _ := ev.UndoStreamPayloads(c)
5252
//TODO(taco) retry if error?
5353
go func() {
54-
5554
logger.Debugf(c, "stream out undo result %#v", streamPayloads)
5655
for _, paylaod := range streamPayloads {
5756
streamBroadcastCh <- paylaod

grpc/gen/stream-helper.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package gen
2+
3+
func NewEntitySteamPayload(ope StreamPayload_Operation, entity *Entity) *StreamPayload {
4+
return &StreamPayload{
5+
Operation: ope,
6+
Object: &StreamPayload_Entity{
7+
entity,
8+
},
9+
}
10+
}
11+
12+
func NewRelationSteamPayload(ope StreamPayload_Operation, rel *Rel) *StreamPayload {
13+
return &StreamPayload{
14+
Operation: ope,
15+
Object: &StreamPayload_Rel{
16+
rel,
17+
},
18+
}
19+
}

grpc/server/stream-server.go

+21
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77

88
"github.com/ajainc/chain/ctx/logger"
9+
"github.com/ajainc/chain/event/dao"
910
"github.com/ajainc/chain/grpc/gen"
1011
)
1112

@@ -81,6 +82,26 @@ func (server *StreamServer) Connect(req *gen.StreamConnectReq, stream gen.Stream
8182
return err
8283
}
8384
server.addListener(newListener)
85+
86+
// load all data
87+
//TODO (taco) move to apropriate place
88+
go func() {
89+
entities, err := dao.GetAllEntities(server.appCtx)
90+
if err != nil {
91+
logger.Errorf(server.appCtx, "failed to get all entities before send to stream")
92+
}
93+
for _, entity := range entities {
94+
newListener.Send(gen.NewEntitySteamPayload(gen.StreamPayload_NEW, entity))
95+
}
96+
97+
rels, err := dao.GetAllRelations(server.appCtx)
98+
99+
for _, rel := range rels {
100+
newListener.Send(gen.NewRelationSteamPayload(gen.StreamPayload_NEW, rel))
101+
}
102+
103+
}()
104+
84105
err = newListener.Listen() // wait
85106
if err != nil {
86107
logger.Warnf(server.appCtx, err.Error())

0 commit comments

Comments
 (0)