-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.go
109 lines (90 loc) · 2.6 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package gorpc
import (
log "github.com/Sirupsen/logrus"
"reflect"
"sync/atomic"
)
type Connection struct {
FactoryGetter
ConnectionAddress
transport ITransport
pendingRequests map[uint32]chan interface{}
router IRouter
rootController IController
}
// Global id generator
var _rpc_id uint32 = 0
func (this *Connection) Init(transport ITransport, addr IConnectionAddress) {
this.transport = transport
this.SetFactory(transport.Factory())
this.ConnectionAddress = *addr.(*ConnectionAddress)
}
func (this *Connection) Close() {
this.transport.(*Transport).removeConnection(this.Source())
}
func (this *Connection) Call(method string, params interface{}) (interface{},error) {
if this.pendingRequests == nil {
this.pendingRequests = make(map[uint32]chan interface{})
}
id := atomic.AddUint32(&_rpc_id, 1)
this.pendingRequests[id] = make(chan interface{})
f := this.Factory()
r := f.MakeRequest(id, method, params)
log.Debugf("Connection(%s).Call(%v)", this.Source(), r)
rw := f.MakeRequestWrapper()
rw.AddRequest(r)
this.transport.Send(this.Destination(), rw)
log.Debugf("Connection(%s).Call() waiting for result. Id:%d", this.Source(), id)
var result interface{} = <-this.pendingRequests[id]
var err error
switch x := result.(type) {
case error:
err = x
result = nil
default:
err = nil
}
return result, err
}
func (this *Connection) Notify(method string, params interface{}) {
f := this.Factory()
r := f.MakeRequest(nil, method, params)
rw := f.MakeRequestWrapper()
rw.AddRequest(r)
this.transport.Send(this.Destination(), rw)
}
func (this *Connection) Response(id interface{}, result interface{}) {
var i uint32
v := reflect.ValueOf(id).Convert(reflect.TypeOf(i))
i = v.Interface().(uint32)
// i := uint32(id.(float64))
ch, ok := this.pendingRequests[i]
if ok {
ch<- result
}
}
func (this *Connection) RootController() IController {
if this.rootController == nil {
this.SetRootController( this.Factory().MakeController() )
}
return this.rootController
}
func (this *Connection) SetRootController(obj IController) {
obj.SetConnection(this)
this.rootController = obj
}
func (this *Connection) Invoke(request IRequest) interface{} {
if this.router == nil {
this.router = this.Factory().MakeRouter()
}
log.Debug("Connection.invoke step 1")
method := this.router.GetRoute(this.RootController(), request.Method())
log.Debug("Connection.invoke step 2")
p := this.router.CheckParams(method, request.Params())
log.Debug("Connection.invoke step 3")
r := method.Call(p)
if len(r) > 0 {
return r[0].Interface()
}
return nil
}