This repository was archived by the owner on Nov 5, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdispatch_context.go
201 lines (168 loc) · 4.07 KB
/
dispatch_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package edge
import (
"context"
"fmt"
"reflect"
"sync"
"github.com/ronaksoft/rony"
"github.com/ronaksoft/rony/tools"
"google.golang.org/protobuf/proto"
)
// DispatchCtx holds the context of the dispatcher's request. Each DispatchCtx could
// hold one or many RequestCtx. DispatchCtx lives until the last of its RequestCtx children.
type DispatchCtx struct {
edge *Server
streamID int64
serverID []byte
conn rony.Conn
req *rony.MessageEnvelope
reqFilled bool
kind MessageKind
buf *tools.LinkedList
// KeyValue Store Parameters
mtx sync.RWMutex
kv map[string]interface{}
ctx context.Context
cf func()
}
func newDispatchCtx(edge *Server) *DispatchCtx {
return &DispatchCtx{
edge: edge,
req: &rony.MessageEnvelope{},
kv: make(map[string]interface{}, 3),
buf: tools.NewLinkedList(),
}
}
func (ctx *DispatchCtx) reset() {
ctx.reqFilled = false
for k := range ctx.kv {
delete(ctx.kv, k)
}
ctx.buf.Reset()
}
func (ctx *DispatchCtx) ServerID() string {
return string(ctx.serverID)
}
func (ctx *DispatchCtx) Debug() {
fmt.Println("###")
t := reflect.Indirect(reflect.ValueOf(ctx))
for i := 0; i < t.NumField(); i++ {
fmt.Println(t.Type().Field(i).Name, t.Type().Field(i).Offset, t.Type().Field(i).Type.Size())
}
}
func (ctx *DispatchCtx) Conn() rony.Conn {
return ctx.conn
}
func (ctx *DispatchCtx) StreamID() int64 {
return ctx.streamID
}
func (ctx *DispatchCtx) FillEnvelope(e *rony.MessageEnvelope) {
if ctx.reqFilled {
panic("BUG!!! request has been already filled")
}
ctx.reqFilled = true
e.DeepCopy(ctx.req)
}
func (ctx *DispatchCtx) Fill(
requestID uint64, constructor uint64, p proto.Message, kv ...*rony.KeyValue,
) {
if ctx.reqFilled {
panic("BUG!!! request has been already filled")
}
ctx.reqFilled = true
ctx.req.Fill(requestID, constructor, p, kv...)
}
func (ctx *DispatchCtx) Set(key string, v interface{}) {
ctx.mtx.Lock()
ctx.kv[key] = v
ctx.mtx.Unlock()
}
func (ctx *DispatchCtx) Get(key string) interface{} {
ctx.mtx.RLock()
v := ctx.kv[key]
ctx.mtx.RUnlock()
return v
}
func (ctx *DispatchCtx) GetBytes(key string, defaultValue []byte) []byte {
v, ok := ctx.Get(key).([]byte)
if ok {
return v
}
return defaultValue
}
func (ctx *DispatchCtx) GetString(key string, defaultValue string) string {
v := ctx.Get(key)
switch x := v.(type) {
case []byte:
return tools.ByteToStr(x)
case string:
return x
default:
return defaultValue
}
}
func (ctx *DispatchCtx) GetInt64(key string, defaultValue int64) int64 {
v, ok := ctx.Get(key).(int64)
if ok {
return v
}
return defaultValue
}
func (ctx *DispatchCtx) GetBool(key string) bool {
v, ok := ctx.Get(key).(bool)
if ok {
return v
}
return false
}
// Kind identifies that this dispatch context is generated from Tunnel or Gateway. This helps
// developer to apply different strategies based on the source of the incoming message
func (ctx *DispatchCtx) Kind() MessageKind {
return ctx.kind
}
func (ctx *DispatchCtx) BufferPush(m *rony.MessageEnvelope) {
ctx.buf.Append(m)
}
func (ctx *DispatchCtx) BufferPop(f func(envelope *rony.MessageEnvelope)) bool {
me, _ := ctx.buf.PickHeadData().(*rony.MessageEnvelope)
if me == nil {
return false
}
f(me)
rony.PoolMessageEnvelope.Put(me)
return true
}
func (ctx *DispatchCtx) BufferPopAll(f func(envelope *rony.MessageEnvelope)) {
for ctx.BufferPop(f) {
}
}
func (ctx *DispatchCtx) BufferSize() int32 {
return ctx.buf.Size()
}
var dispatchCtxPool = sync.Pool{}
func acquireDispatchCtx(
edge *Server, conn rony.Conn,
streamID int64, serverID []byte, kind MessageKind,
) *DispatchCtx {
var ctx *DispatchCtx
if v := dispatchCtxPool.Get(); v == nil {
ctx = newDispatchCtx(edge)
} else {
ctx = v.(*DispatchCtx)
ctx.req.Reset()
}
ctx.conn = conn
ctx.kind = kind
ctx.streamID = streamID
ctx.serverID = append(ctx.serverID[:0], serverID...)
ctx.ctx, ctx.cf = context.WithCancel(context.TODO())
return ctx
}
func releaseDispatchCtx(ctx *DispatchCtx) {
// call cancel func
ctx.cf()
// Reset the Key-Value store
ctx.reset()
// Put back the context into the pool
dispatchCtxPool.Put(ctx)
}