8
8
raftPB "github.com/ByteStorage/FlyDB/lib/proto/raft"
9
9
"github.com/hashicorp/raft"
10
10
"google.golang.org/grpc"
11
+ "google.golang.org/grpc/credentials/insecure"
11
12
"io"
13
+ "net"
12
14
"sync"
13
15
"time"
14
16
)
@@ -41,24 +43,49 @@ type appendFuture struct {
41
43
type Transport struct {
42
44
//implement me
43
45
localAddr raft.ServerAddress
44
- consumer chan raft.RPC
45
46
clients map [raft.ServerAddress ]* ClientConn
46
- server * grpc.Server
47
- heartbeatFn func (raft.RPC )
47
+ server * TransportServer
48
48
dialOptions []grpc.DialOption
49
49
heartbeatTimeout time.Duration
50
50
sync.RWMutex
51
51
}
52
+ type TransportServer struct {
53
+ server * grpc.Server
54
+ consumer chan raft.RPC
55
+ heartbeatFn func (raft.RPC )
56
+ serverQuit chan struct {}
57
+ raftPB.UnsafeRaftServiceServer
58
+ }
52
59
53
60
// NewTransport returns a new transport, it needs start a grpc server
54
- func newTransport (conf config.Config ) raft.Transport {
61
+ func newTransport (conf config.Config , l net.Listener , do []grpc.DialOption ) (* Transport , error ) {
62
+ s := grpc .NewServer ()
63
+ ts := & TransportServer {
64
+ server : s ,
65
+ consumer : make (chan raft.RPC ),
66
+ serverQuit : make (chan struct {}),
67
+ }
68
+ raftPB .RegisterRaftServiceServer (s , ts )
69
+
70
+ go func () {
71
+ if err := s .Serve (l ); err != nil {
72
+ panic (err )
73
+ }
74
+ }()
75
+
55
76
return & Transport {
56
77
localAddr : conf .LocalAddress ,
57
- dialOptions : []grpc. DialOption { grpc . WithInsecure ()} ,
78
+ dialOptions : do ,
58
79
heartbeatTimeout : conf .HeartbeatTimeout ,
59
- consumer : make ( chan raft. RPC ) ,
80
+ server : ts ,
60
81
clients : map [raft.ServerAddress ]* ClientConn {},
61
- }
82
+ }, nil
83
+ }
84
+ func newListener (conf config.Config ) (net.Listener , error ) {
85
+ return net .Listen ("tcp" , string (conf .LocalAddress ))
86
+ }
87
+ func newDialOption (conf config.Config ) []grpc.DialOption {
88
+ return []grpc.DialOption {grpc .WithTransportCredentials (insecure .NewCredentials ())}
62
89
}
63
90
64
91
// AppendEntriesPipeline returns an interface that can be used to pipeline
@@ -152,7 +179,7 @@ func (t *Transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args
152
179
// Consumer returns a channel that can be used to
153
180
// consume and respond to RPC requests.
154
181
func (t * Transport ) Consumer () <- chan raft.RPC {
155
- return t .consumer
182
+ return t .server . consumer
156
183
}
157
184
158
185
// LocalAddr is used to return our local address to distinguish from our peers.
@@ -177,7 +204,7 @@ func (t *Transport) DecodePeer(p []byte) raft.ServerAddress {
177
204
func (t * Transport ) SetHeartbeatHandler (handler func (rpc raft.RPC )) {
178
205
t .RWMutex .RLock ()
179
206
defer t .RWMutex .RUnlock ()
180
- t .heartbeatFn = handler
207
+ t .server . heartbeatFn = handler
181
208
}
182
209
183
210
func (t * Transport ) getPeer (target raft.ServerAddress ) (raftPB.RaftServiceClient , error ) {
@@ -198,8 +225,8 @@ func (t *Transport) getPeer(target raft.ServerAddress) (raftPB.RaftServiceClient
198
225
if err != nil {
199
226
return nil , err
200
227
}
201
- c .conn = conn
202
228
c .client = raftPB .NewRaftServiceClient (conn )
229
+ c .conn = conn
203
230
}
204
231
}
205
232
@@ -290,3 +317,65 @@ func (af *appendFuture) processMessage(r *raftPipeline) {
290
317
close (af .done )
291
318
r .doneCh <- af
292
319
}
320
+
321
+ func (t * TransportServer ) AppendEntries (ctx context.Context , req * raftPB.AppendEntriesRequest ) (* raftPB.AppendEntriesResponse , error ) {
322
+ resp , err := t .sendRPC (encoding .DecodeAppendEntriesRequest (req ), nil )
323
+ if err != nil {
324
+ return nil , err
325
+ }
326
+ return encoding .EncodeAppendEntriesResponse (resp .(* raft.AppendEntriesResponse )), nil
327
+ }
328
+ func (t * TransportServer ) RequestVote (ctx context.Context , req * raftPB.RequestVoteRequest ) (* raftPB.RequestVoteResponse , error ) {
329
+ resp , err := t .sendRPC (encoding .DecodeRequestVoteRequest (req ), nil )
330
+ if err != nil {
331
+ return nil , err
332
+ }
333
+ return encoding .EncodeRequestVoteResponse (resp .(* raft.RequestVoteResponse )), nil
334
+ }
335
+ func (t * TransportServer ) AppendEntriesPipeline (server raftPB.RaftService_AppendEntriesPipelineServer ) error {
336
+ return nil
337
+ }
338
+ func (t * TransportServer ) TimeoutNow (ctx context.Context , in * raftPB.TimeoutNowRequest ) (* raftPB.TimeoutNowResponse , error ) {
339
+ resp , err := t .sendRPC (encoding .DecodeTimeoutNowRequest (in ), nil )
340
+ if err != nil {
341
+ return nil , err
342
+ }
343
+ return encoding .EncodeTimeoutNowResponse (resp .(* raft.TimeoutNowResponse )), nil
344
+ }
345
+ func (t * TransportServer ) InstallSnapshot (ctx context.Context , req * raftPB.InstallSnapshotRequest ) (* raftPB.InstallSnapshotResponse , error ) {
346
+ resp , err := t .sendRPC (encoding .DecodeInstallSnapshotRequest (req ), nil )
347
+ if err != nil {
348
+ return nil , err
349
+ }
350
+ return encoding .EncodeInstallSnapshotResponse (resp .(* raft.InstallSnapshotResponse )), nil
351
+ }
352
+
353
+ func (t * TransportServer ) sendRPC (command interface {}, data io.Reader ) (interface {}, error ) {
354
+ ch := make (chan raft.RPCResponse , 1 )
355
+ rpc := raft.RPC {
356
+ Command : command ,
357
+ RespChan : ch ,
358
+ Reader : data ,
359
+ }
360
+ if isHeartbeat (command ) {
361
+ fn := t .heartbeatFn
362
+ if fn != nil {
363
+ fn (rpc )
364
+ }
365
+ } else {
366
+ t .consumer <- rpc
367
+ }
368
+ resp := <- ch
369
+ if resp .Error != nil {
370
+ return nil , resp .Error
371
+ }
372
+ return resp .Response , nil
373
+ }
374
+ func (t * TransportServer ) Close () {
375
+ t .server .GracefulStop ()
376
+ return
377
+ }
378
+ func (t * Transport ) Close () {
379
+ t .server .server .GracefulStop ()
380
+ return
381
+ }
0 commit comments