Skip to content

Commit af79f6d

Browse files
committed
some
1 parent 8e84b9c commit af79f6d

File tree

8 files changed

+59
-24
lines changed

8 files changed

+59
-24
lines changed

Diff for: cmd/chain/serve.go

+29-12
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"net"
67
"net/http"
78
"os"
89
"os/signal"
@@ -15,6 +16,7 @@ import (
1516
"github.com/ajainc/chain/ctx/logger"
1617
chaingrpc "github.com/ajainc/chain/grpc"
1718
"github.com/ajainc/chain/grpc/gen"
19+
"github.com/improbable-eng/grpc-web/go/grpcweb"
1820
)
1921

2022
//TODO (tacogips) implement commands with option
@@ -28,24 +30,39 @@ func main() {
2830
// setup grpc server
2931
//TODO(taco) move broadcast channel size to config
3032
streamBroadcastCh := make(chan *gen.StreamPayload, 300)
31-
grpcwebServer, err := chaingrpc.Setup(c, streamBroadcastCh)
33+
grpcServer, err := chaingrpc.Setup(c, streamBroadcastCh)
34+
3235
if err != nil {
3336
panic(err)
3437
}
3538

36-
handler := func(resp http.ResponseWriter, req *http.Request) {
37-
grpcwebServer.ServeHttp(resp, req)
38-
}
39+
grpcWeb := false
40+
if grpcWeb {
41+
grpcwebServer := grpcweb.WrapServer(grpcServer)
3942

40-
grpcHttpServer := http.Server{
41-
Addr: fmt.Sprintf(":%d", port),
42-
Handler: http.HandlerFunc(handler),
43-
}
43+
handler := func(resp http.ResponseWriter, req *http.Request) {
44+
grpcwebServer.ServeHttp(resp, req)
45+
}
46+
grpcHttpServer := http.Server{
47+
Addr: fmt.Sprintf(":%d", port),
48+
Handler: http.HandlerFunc(handler),
49+
}
4450

45-
go func() {
46-
logger.Infof(c, "server started. listening on :%d", port)
47-
grpcHttpServer.ListenAndServe() // TODO(tacogips): use HTTP/2
48-
}()
51+
go func() {
52+
logger.Infof(c, "grpc web server started. listening on :%d", port)
53+
grpcHttpServer.ListenAndServe() // TODO(tacogips): use HTTP/2
54+
}()
55+
56+
} else {
57+
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
58+
if err != nil {
59+
panic(err)
60+
}
61+
go func() {
62+
logger.Infof(c, "grpc server started. listening on :%d", port)
63+
grpcServer.Serve(listener)
64+
}()
65+
}
4966

5067
waitShutdownDone := make(chan struct{})
5168
sigs := make(chan os.Signal, 1)

Diff for: cmd/chaincli/mock.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@ import (
1212
)
1313

1414
func main() {
15-
conn, err := grpc.Dial(":50051")
15+
conn, err := grpc.Dial(":50051", grpc.WithInsecure())
1616

1717
if err != nil {
1818
panic(err)
1919
}
20-
2120
defer conn.Close()
2221

2322
entityClient := gen.NewEntityServiceClient(conn)
@@ -49,7 +48,7 @@ func main() {
4948
if err != nil {
5049
log.Error(err)
5150
}
52-
fmt.Printf("%#v", activity)
51+
fmt.Printf("%#v\n", activity)
5352
}
5453
}
5554

Diff for: cmd/chainstrmcli/stream-client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
func main() {
13-
conn, err := grpc.Dial(":50051")
13+
conn, err := grpc.Dial(":50051", grpc.WithInsecure())
1414

1515
if err != nil {
1616
panic(err)

Diff for: event/create-entity-ev.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,32 @@ import (
88
"github.com/ajainc/chain/ctx/docdb"
99
"github.com/ajainc/chain/event/dao"
1010
"github.com/ajainc/chain/grpc/gen"
11+
uuid "github.com/satori/go.uuid"
1112
)
1213

1314
// CeateEntityEv stands for Creating New Entity at specified coordinates
1415
func NewCeateEntityEvent(entity *gen.Entity) *CeateEntityEv {
1516
return &CeateEntityEv{
16-
Entity: entity,
17+
eventID: uuid.NewV4().String(),
18+
Entity: entity,
1719
}
1820
}
1921

2022
type CeateEntityEv struct {
21-
Entity *gen.Entity
23+
eventID string
24+
Entity *gen.Entity
25+
}
26+
27+
func (ev *CeateEntityEv) EventID() string {
28+
return ev.eventID
2229
}
2330

2431
func (ev *CeateEntityEv) Description() string {
2532
return fmt.Sprintf("Create Entity")
2633
}
2734

2835
func (ev *CeateEntityEv) Exec(c context.Context) error {
36+
2937
db := docdb.FromContext(c)
3038

3139
ev.Entity.FillWithDefault()

Diff for: event/event.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
)
88

99
type Event interface {
10+
EventID() string
1011
Description() string
1112
Exec(context.Context) error
1213
Undo(context.Context) error

Diff for: event/executor.go

+9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package event
33
import (
44
"context"
55

6+
"github.com/ajainc/chain/ctx/logger"
67
"github.com/ajainc/chain/grpc/gen"
78
)
89

@@ -11,6 +12,8 @@ func Exec(c context.Context, ev Event, streamBroadcastCh chan *gen.StreamPayload
1112

1213
var err error
1314

15+
logger.Debugf(c, "execute event %#v", ev)
16+
1417
err = ev.Exec(c)
1518
if err != nil {
1619
return nil, err
@@ -25,6 +28,8 @@ func Exec(c context.Context, ev Event, streamBroadcastCh chan *gen.StreamPayload
2528
streamPayloads, _ := ev.ExecStreamPayloads(c)
2629
//TODO(taco) retry if error?
2730
go func() {
31+
32+
logger.Debugf(c, "stream out exec result %#v", streamPayloads)
2833
for _, paylaod := range streamPayloads {
2934
streamBroadcastCh <- paylaod
3035
}
@@ -35,12 +40,16 @@ func Exec(c context.Context, ev Event, streamBroadcastCh chan *gen.StreamPayload
3540

3641
//Undo execute ev.Undo()
3742
func Undo(c context.Context, ev Event, streamBroadcastCh chan *gen.StreamPayload) error {
43+
44+
logger.Debugf(c, "undo event %#v", ev)
3845
// TODO(tacogisp): impl
3946
ev.Undo(c)
4047

4148
streamPayloads, _ := ev.ExecStreamPayloads(c)
4249
//TODO(taco) retry if error?
4350
go func() {
51+
52+
logger.Debugf(c, "stream out undo result %#v", streamPayloads)
4453
for _, paylaod := range streamPayloads {
4554
streamBroadcastCh <- paylaod
4655
}

Diff for: grpc/server/entity-server.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package server
33
import (
44
"context"
55

6+
"github.com/ajainc/chain/ctx/logger"
67
"github.com/ajainc/chain/event"
78
"github.com/ajainc/chain/grpc/gen"
89

@@ -15,8 +16,11 @@ type EntityServer struct {
1516
}
1617

1718
func (server *EntityServer) CreateEntity(_ go16ctx.Context, in *gen.Entity) (*gen.Activity, error) {
19+
20+
logger.Debug(server.appCtx, "CreatEnity called")
21+
1822
ev := event.NewCeateEntityEvent(in)
19-
activity, err := event.Exec(server.appCtx, ev)
23+
activity, err := event.Exec(server.appCtx, ev, server.streamBroadcastCh)
2024
if err != nil {
2125
return nil, err
2226
}

Diff for: grpc/servers.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/ajainc/chain/grpc/gen"
77
"github.com/ajainc/chain/grpc/server"
8-
"github.com/improbable-eng/grpc-web/go/grpcweb"
98

109
"google.golang.org/grpc"
1110
"google.golang.org/grpc/health"
@@ -28,12 +27,10 @@ func registerServices(c context.Context, grpcServer *grpc.Server, streamBroadcas
2827
return nil
2928
}
3029

31-
func Setup(c context.Context, streamBroadcastCh chan *gen.StreamPayload, opts ...grpc.ServerOption) (*grpcweb.WrappedGrpcServer, error) {
30+
func Setup(c context.Context, streamBroadcastCh chan *gen.StreamPayload, opts ...grpc.ServerOption) (*grpc.Server, error) {
3231
s := grpc.NewServer(opts...)
3332

3433
err := registerServices(c, s, streamBroadcastCh)
3534

36-
wrappedGrpc := grpcweb.WrapServer(s)
37-
38-
return wrappedGrpc, err
35+
return s, err
3936
}

0 commit comments

Comments
 (0)