Skip to content

Commit aed659c

Browse files
committed
minor changes
1 parent ef1f0f4 commit aed659c

File tree

6 files changed

+34
-10
lines changed

6 files changed

+34
-10
lines changed

utils/client/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type Client interface {
2222
}
2323

2424
// DataCallback is the callback invoked when data is read by the socket
25-
type DataCallback func(data *model.Message)
25+
type DataCallback func(data *model.Message) bool
2626

2727
// CreateWebsocketClient makes a client object to manage the socket
2828
func CreateWebsocketClient(socket *websocket.Conn) *WebsocketClient {

utils/client/grpc_realtime.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,12 @@ func (c *GRPCRealtimeClient) Read(cb DataCallback) {
9393
data["Where"] = temp
9494

9595
msg := &model.Message{Type: in.Type, ID: in.Id, Data: data}
96-
cb(msg)
96+
97+
// Close the reader if callback returned false
98+
next := cb(msg)
99+
if !next {
100+
return
101+
}
97102
}
98103
}
99104

utils/client/grpc_service.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,12 @@ func (c *GRPCServiceClient) Read(cb DataCallback) {
8989
case utils.TypeServiceRegister:
9090
data := map[string]interface{}{"service": in.Service, "token": in.Token, "project": in.Project}
9191
msg := &model.Message{ID: in.Id, Type: utils.TypeServiceRegister, Data: data}
92-
cb(msg)
92+
93+
// Close the reader if callback returned false
94+
next := cb(msg)
95+
if !next {
96+
return
97+
}
9398

9499
case utils.TypeServiceRequest:
95100
var params interface{}
@@ -100,7 +105,12 @@ func (c *GRPCServiceClient) Read(cb DataCallback) {
100105
"error": in.Error,
101106
}
102107
msg := &model.Message{ID: in.Id, Type: utils.TypeServiceRequest, Data: data}
103-
cb(msg)
108+
109+
// Close the reader if callback returned false
110+
next := cb(msg)
111+
if !next {
112+
return
113+
}
104114

105115
default:
106116
log.Println("GRPC Service Error - Invalid request type", in.Type)

utils/client/websocket.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ func (c *WebsocketClient) Read(cb DataCallback) {
5454
return
5555
}
5656

57-
cb(data)
57+
// Close the reader if callback returned false
58+
next := cb(data)
59+
if !next {
60+
return
61+
}
5862
}
5963
}
6064

utils/server/grpc.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func (s *Server) Service(stream pb.SpaceCloud_ServiceServer) error {
355355
// Get GRPC Service client details
356356
clientID := c.ClientID()
357357

358-
c.Read(func(req *model.Message) {
358+
c.Read(func(req *model.Message) bool {
359359
switch req.Type {
360360
case utils.TypeServiceRegister:
361361
// TODO add security rule for functions registered as well
@@ -374,6 +374,8 @@ func (s *Server) Service(stream pb.SpaceCloud_ServiceServer) error {
374374

375375
s.functions.HandleServiceResponse(data)
376376
}
377+
378+
return true
377379
})
378380
return nil
379381
}
@@ -388,7 +390,7 @@ func (s *Server) RealTime(stream pb.SpaceCloud_RealTimeServer) error {
388390
ctx := c.Context()
389391
clientID := c.ClientID()
390392

391-
c.Read(func(req *model.Message) {
393+
c.Read(func(req *model.Message) bool {
392394
switch req.Type {
393395
case utils.TypeRealtimeSubscribe:
394396
// For realtime subscribe event
@@ -402,7 +404,7 @@ func (s *Server) RealTime(stream pb.SpaceCloud_RealTimeServer) error {
402404
if err != nil {
403405
res := model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: false, Error: err.Error()}
404406
c.Write(&model.Message{ID: req.ID, Type: req.Type, Data: res})
405-
return
407+
return true
406408
}
407409

408410
// Send response to c
@@ -420,6 +422,8 @@ func (s *Server) RealTime(stream pb.SpaceCloud_RealTimeServer) error {
420422
res := model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: true}
421423
c.Write(&model.Message{ID: req.ID, Type: req.Type, Data: res})
422424
}
425+
426+
return true
423427
})
424428

425429
return nil

utils/server/websocket.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (s *Server) handleWebsocket() http.HandlerFunc {
3737
ctx := c.Context()
3838
clientID := c.ClientID()
3939

40-
c.Read(func(req *model.Message) {
40+
c.Read(func(req *model.Message) bool {
4141
switch req.Type {
4242
case utils.TypeRealtimeSubscribe:
4343
// For realtime subscribe event
@@ -51,7 +51,7 @@ func (s *Server) handleWebsocket() http.HandlerFunc {
5151
if err != nil {
5252
res := model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: false, Error: err.Error()}
5353
c.Write(&model.Message{ID: req.ID, Type: req.Type, Data: res})
54-
return
54+
return true
5555
}
5656

5757
// Send response to c
@@ -86,6 +86,7 @@ func (s *Server) handleWebsocket() http.HandlerFunc {
8686

8787
s.functions.HandleServiceResponse(data)
8888
}
89+
return true
8990
})
9091
}
9192
}

0 commit comments

Comments
 (0)