diff --git a/cluster/store/store.go b/cluster/store/store.go index 3d124c98..d68e441a 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -82,8 +82,14 @@ func (s *store) newRaftNode() error { conf := s.newDefaultConfig() // setup Raft communication - t := newTransport() - + ls, err := newListener(s.conf) + if err != nil { + return err + } + t, err := newTransport(s.conf, ls, newDialOption(s.conf)) + if err != nil { + return err + } // create the snapshot store. This allows the Raft to truncate the log. snapshots, err := newSnapshotStore(s.conf) if err != nil { diff --git a/cluster/store/store_log.go b/cluster/store/store_log.go index 16bc5739..019a9bdd 100644 --- a/cluster/store/store_log.go +++ b/cluster/store/store_log.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/ByteStorage/FlyDB/config" "github.com/ByteStorage/FlyDB/lib/datastore" + "github.com/hashicorp/raft" ) // DataStoreFactory is a function type that creates a new instance of a raft.LogStore. @@ -21,6 +22,10 @@ var datastoreFactories = make(map[string]DataStoreFactory) func Init() error { // Register the "memory" DataStoreFactory implementation with the name "memory" _ = Register("memory", datastore.NewLogInMemStorage) + // Register the "memory" DataStoreFactory implementation with the name "memory" + _ = Register("inMemory", func(conf config.Config) (datastore.DataStore, error) { + return raft.NewInmemStore(), nil + }) // Register the "bolt" DataStoreFactory implementation with the name "boltdb" _ = Register("boltdb", datastore.NewLogBoltDbStorage) // Register the "flydb" DataStoreFactory implementation with the name "flydb" diff --git a/cluster/store/store_transport.go b/cluster/store/store_transport.go index 7f61015b..86050bd1 100644 --- a/cluster/store/store_transport.go +++ b/cluster/store/store_transport.go @@ -1,73 +1,381 @@ package store import ( + "context" + "fmt" + "github.com/ByteStorage/FlyDB/config" + "github.com/ByteStorage/FlyDB/lib/encoding" + raftPB "github.com/ByteStorage/FlyDB/lib/proto/raft" "github.com/hashicorp/raft" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "io" + "net" + "sync" + "time" ) -// transport implements raft.Transport interface +type ClientConn struct { + conn *grpc.ClientConn + client raftPB.RaftServiceClient + mtx sync.Mutex +} +type raftPipeline struct { + stream raftPB.RaftService_AppendEntriesPipelineClient + cancel func() + inflightChMtx sync.Mutex + inflightCh chan *appendFuture + doneCh chan raft.AppendFuture +} + +type appendFuture struct { + raft.AppendFuture + start time.Time + request *raft.AppendEntriesRequest + response raft.AppendEntriesResponse + err error + done chan struct{} +} + +// Transport implements raft.Transport interface // we can use it to send rpc to other raft nodes // and receive rpc from other raft nodes -type transport struct { +type Transport struct { //implement me - localAddr raft.ServerAddress - consumer chan raft.RPC - clients map[raft.ServerAddress]*grpc.ClientConn - server *grpc.Server + localAddr raft.ServerAddress + clients map[raft.ServerAddress]*ClientConn + server *TransportServer + dialOptions []grpc.DialOption + heartbeatTimeout time.Duration + sync.RWMutex +} +type TransportServer struct { + server *grpc.Server + consumer chan raft.RPC + heartbeatFn func(raft.RPC) + serverQuit chan struct{} + raftPB.UnsafeRaftServiceServer } // NewTransport returns a new transport, it needs start a grpc server -func newTransport() raft.Transport { - return &transport{} +func newTransport(conf config.Config, l net.Listener, do []grpc.DialOption) (*Transport, error) { + s := grpc.NewServer() + ts := &TransportServer{ + server: s, + consumer: make(chan raft.RPC), + serverQuit: make(chan struct{}), + } + raftPB.RegisterRaftServiceServer(s, ts) + + go func() { + if err := s.Serve(l); err != nil { + panic(err) + } + }() + + return &Transport{ + localAddr: conf.LocalAddress, + dialOptions: do, + heartbeatTimeout: conf.HeartbeatTimeout, + server: ts, + clients: map[raft.ServerAddress]*ClientConn{}, + }, nil +} +func newListener(conf config.Config) (net.Listener, error) { + return net.Listen("tcp", string(conf.LocalAddress)) +} +func newDialOption(conf config.Config) []grpc.DialOption { + return []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} } -func (t *transport) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) { - //TODO implement me - panic("implement me") +// AppendEntriesPipeline returns an interface that can be used to pipeline +// AppendEntries requests. +func (t *Transport) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) { + c, err := t.getPeer(target) + if err != nil { + return nil, err + } + ctx := context.TODO() + ctx, cancel := context.WithCancel(ctx) + stream, err := c.AppendEntriesPipeline(ctx) + if err != nil { + cancel() + return nil, err + } + rpa := raftPipeline{ + stream: stream, + cancel: cancel, + inflightCh: make(chan *appendFuture, 20), + doneCh: make(chan raft.AppendFuture, 20), + } + go rpa.receiver() + return &rpa, nil } -func (t *transport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error { - //TODO implement me - panic("implement me") +// AppendEntries sends the appropriate RPC to the target node. +func (t *Transport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error { + c, err := t.getPeer(target) + if err != nil { + return err + } + ctx := context.TODO() + if t.heartbeatTimeout > 0 && isHeartbeat(args) { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, t.heartbeatTimeout) + defer cancel() + } + ret, err := c.AppendEntries(ctx, encoding.EncodeAppendEntriesRequest(args)) + if err != nil { + return err + } + *resp = *encoding.DecodeAppendEntriesResponse(ret) + return nil } -func (t *transport) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error { - //TODO implement me - panic("implement me") +// RequestVote sends the appropriate RPC to the target node. +func (t *Transport) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error { + c, err := t.getPeer(target) + if err != nil { + return err + } + vote, err := c.RequestVote(context.TODO(), encoding.EncodeRequestVoteRequest(args)) + if err != nil { + return err + } + *resp = *encoding.DecodeRequestVoteResponse(vote) + return nil } -func (t *transport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error { - //TODO implement me - panic("implement me") +// InstallSnapshot is used to push a snapshot down to a follower. The data is read from +// the ReadCloser and streamed to the client. +func (t *Transport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error { + c, err := t.getPeer(target) + if err != nil { + return err + } + inSnap, err := c.InstallSnapshot(context.TODO(), encoding.EncodeInstallSnapshotRequest(args)) + if err != nil { + return err + } + + *resp = *encoding.DecodeInstallSnapshotResponse(inSnap) + return nil } -func (t *transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error { - //TODO implement me - panic("implement me") +// TimeoutNow is used to start a leadership transfer to the target node. +func (t *Transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error { + c, err := t.getPeer(target) + if err != nil { + return err + } + ret, err := c.TimeoutNow(context.TODO(), encoding.EncodeTimeoutNowRequest(args)) + if err != nil { + return err + } + *resp = *encoding.DecodeTimeoutNowResponse(ret) + return nil } -func (t *transport) Consumer() <-chan raft.RPC { - //implement me - panic("implement me") +// Consumer returns a channel that can be used to +// consume and respond to RPC requests. +func (t *Transport) Consumer() <-chan raft.RPC { + return t.server.consumer } -func (t *transport) LocalAddr() raft.ServerAddress { - //implement me - panic("implement me") +// LocalAddr is used to return our local address to distinguish from our peers. +func (t *Transport) LocalAddr() raft.ServerAddress { + return t.localAddr } -func (t *transport) EncodePeer(id raft.ServerID, addr raft.ServerAddress) []byte { - //implement me - panic("implement me") +// EncodePeer is used to serialize a peer's address. +func (t *Transport) EncodePeer(id raft.ServerID, addr raft.ServerAddress) []byte { + return []byte(addr) } -func (t *transport) DecodePeer([]byte) raft.ServerAddress { - //implement me - panic("implement me") +// DecodePeer is used to deserialize a peer's address. +func (t *Transport) DecodePeer(p []byte) raft.ServerAddress { + return raft.ServerAddress(p) } -func (t *transport) SetHeartbeatHandler(handler func(rpc raft.RPC)) { - //implement me - panic("implement me") +// SetHeartbeatHandler is used to setup a heartbeat handler +// as a fast-pass. This is to avoid head-of-line blocking from +// disk IO. If Transport does not support this, it can simply +// ignore the call, and push the heartbeat onto the Consumer channel. +func (t *Transport) SetHeartbeatHandler(handler func(rpc raft.RPC)) { + t.RWMutex.RLock() + defer t.RWMutex.RUnlock() + t.server.heartbeatFn = handler +} + +func (t *Transport) getPeer(target raft.ServerAddress) (raftPB.RaftServiceClient, error) { + t.RWMutex.Lock() // Locking here + defer t.RWMutex.Unlock() // Unlocking after the map access is done + + c, ok := t.clients[target] + + if !ok { + c = &ClientConn{} + c.mtx.Lock() + defer c.mtx.Unlock() // We know that Lock was obtained and can use defer here + + t.clients[target] = c + + if c.conn == nil { + conn, err := grpc.Dial(string(target), t.dialOptions...) + if err != nil { + return nil, err + } + c.client = raftPB.NewRaftServiceClient(conn) + c.conn = conn + } + } + + return c.client, nil +} +func isHeartbeat(command interface{}) bool { + req, ok := command.(*raft.AppendEntriesRequest) + if !ok { + return false + } + if req == nil { + return false + } + return req.Term != 0 && + len(req.Leader) != 0 && + req.PrevLogEntry == 0 && + req.PrevLogTerm == 0 && + len(req.Entries) == 0 && + req.LeaderCommitIndex == 0 +} + +func (af *appendFuture) Error() error { + <-af.done + return af.err +} +func (af *appendFuture) Start() time.Time { + return af.start +} + +func (af *appendFuture) Request() *raft.AppendEntriesRequest { + return af.request +} +func (af *appendFuture) Response() *raft.AppendEntriesResponse { + return &af.response +} + +// AppendEntries is used to add another request to the pipeline. +// The send may block which is an effective form of back-pressure. +func (r *raftPipeline) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) { + af := &appendFuture{ + start: time.Now(), + request: req, + done: make(chan struct{}), + } + if err := r.stream.Send(encoding.EncodeAppendEntriesRequest(req)); err != nil { + return nil, err + } + select { + case <-r.stream.Context().Done(): + return nil, r.stream.Context().Err() + case r.inflightCh <- af: + default: + return nil, fmt.Errorf("failed to send request to inflightCh") + } + + return af, nil +} + +// Consumer returns a channel that can be used to consume +// response futures when they are ready. +func (r *raftPipeline) Consumer() <-chan raft.AppendFuture { + return r.doneCh +} + +// Close closes the pipeline and cancels all inflight RPCs +func (r *raftPipeline) Close() error { + r.cancel() + r.inflightChMtx.Lock() + defer r.inflightChMtx.Unlock() + close(r.inflightCh) + return nil +} + +func (r *raftPipeline) receiver() { + for af := range r.inflightCh { + af.processMessage(r) + } +} + +// processMessage processes the appendFuture message. +func (af *appendFuture) processMessage(r *raftPipeline) { + msg, err := r.stream.Recv() + if err != nil { + af.err = err + } else if msg != nil { + af.response = *encoding.DecodeAppendEntriesResponse(msg) + } + close(af.done) + r.doneCh <- af +} + +func (t *TransportServer) AppendEntries(ctx context.Context, req *raftPB.AppendEntriesRequest) (*raftPB.AppendEntriesResponse, error) { + resp, err := t.sendRPC(encoding.DecodeAppendEntriesRequest(req), nil) + if err != nil { + return nil, err + } + return encoding.EncodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil +} +func (t *TransportServer) RequestVote(ctx context.Context, req *raftPB.RequestVoteRequest) (*raftPB.RequestVoteResponse, error) { + resp, err := t.sendRPC(encoding.DecodeRequestVoteRequest(req), nil) + if err != nil { + return nil, err + } + return encoding.EncodeRequestVoteResponse(resp.(*raft.RequestVoteResponse)), nil +} +func (t *TransportServer) AppendEntriesPipeline(server raftPB.RaftService_AppendEntriesPipelineServer) error { + return nil +} +func (t *TransportServer) TimeoutNow(ctx context.Context, in *raftPB.TimeoutNowRequest) (*raftPB.TimeoutNowResponse, error) { + resp, err := t.sendRPC(encoding.DecodeTimeoutNowRequest(in), nil) + if err != nil { + return nil, err + } + return encoding.EncodeTimeoutNowResponse(resp.(*raft.TimeoutNowResponse)), nil +} +func (t *TransportServer) InstallSnapshot(ctx context.Context, req *raftPB.InstallSnapshotRequest) (*raftPB.InstallSnapshotResponse, error) { + resp, err := t.sendRPC(encoding.DecodeInstallSnapshotRequest(req), nil) + if err != nil { + return nil, err + } + return encoding.EncodeInstallSnapshotResponse(resp.(*raft.InstallSnapshotResponse)), nil +} + +func (t *TransportServer) sendRPC(command interface{}, data io.Reader) (interface{}, error) { + ch := make(chan raft.RPCResponse, 1) + rpc := raft.RPC{ + Command: command, + RespChan: ch, + Reader: data, + } + if isHeartbeat(command) { + fn := t.heartbeatFn + if fn != nil { + fn(rpc) + } + } else { + t.consumer <- rpc + } + resp := <-ch + if resp.Error != nil { + return nil, resp.Error + } + return resp.Response, nil +} +func (t *TransportServer) Close() { + t.server.GracefulStop() + return +} +func (t *Transport) Close() { + t.server.server.GracefulStop() + return } diff --git a/cluster/store/store_transport_test.go b/cluster/store/store_transport_test.go new file mode 100644 index 00000000..76166414 --- /dev/null +++ b/cluster/store/store_transport_test.go @@ -0,0 +1,308 @@ +package store + +import ( + "bytes" + "github.com/ByteStorage/FlyDB/config" + raftPB "github.com/ByteStorage/FlyDB/lib/proto/raft" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/test/bufconn" + "net" + "reflect" + "testing" + "time" +) + +type MockClientConn struct { + AppendEntriesPipelineFn func() (raftPB.RaftService_AppendEntriesPipelineClient, error) +} + +func testGetNewRaft(addr string) (*raft.Raft, error) { + // setup Raft configuration + s := store{ + id: addr, + opts: config.DefaultOptions, + } + conf := s.newDefaultConfig() + conf.LocalID = raft.ServerID(addr) + s.conf.LogDataStorage = "inMemory" + s.conf.SnapshotStorage = "memory" + // setup Raft communication + s.conf.LocalAddress = raft.ServerAddress(addr) + l1 := bufconn.Listen(1024) + trans1, err := newTransport(s.conf, l1, newDialOption(s.conf)) + if err != nil { + return nil, err + } + + // create the snapshot store. This allows the Raft to truncate the log. + snapshots, err := newSnapshotStore(s.conf) + if err != nil { + return nil, err + } + + // create the log store and stable store + logStore, err := newRaftLog(s.conf) + if err != nil { + return nil, err + } + stableStore, err := newStableLog(s.conf) + if err != nil { + return nil, err + } + + // create a new finite state machine + f := newFSM() + + // instantiate the Raft system + + return raft.NewRaft(conf, f, logStore, stableStore, snapshots, trans1) +} +func testGetNewTransport(addr string) (*Transport, error) { + // setup Raft configuration + s := store{ + id: addr, + opts: config.DefaultOptions, + } + s.conf.LogDataStorage = "inMemory" + s.conf.SnapshotStorage = "memory" + // setup Raft communication + s.conf.LocalAddress = raft.ServerAddress(addr) + s.conf.HeartbeatTimeout = 200 * time.Millisecond + l1, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + do := newDialOption(s.conf) + //do = append(do, grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + // return l1.Accept() + //})) + + return newTransport(s.conf, l1, do) +} +func storeTransportTestAppendEntriesResponseFixture() *raft.AppendEntriesResponse { + return &raft.AppendEntriesResponse{ + RPCHeader: storeTransportTestRPCHeaderFixture(), + Term: 0, + LastLog: 0, + Success: false, + NoRetryBackoff: false, + } +} +func storeTransportTestAppendEntriesRequestFixture() *raft.AppendEntriesRequest { + return &raft.AppendEntriesRequest{ + RPCHeader: storeTransportTestRPCHeaderFixture(), + Term: 0, + Leader: nil, + PrevLogEntry: 0, + PrevLogTerm: 0, + Entries: nil, + LeaderCommitIndex: 0, + } +} +func storeTransportTestRPCHeaderFixture() raft.RPCHeader { + return raft.RPCHeader{ + ProtocolVersion: 3, + ID: []byte(raft.ServerID("")), + Addr: []byte(raft.ServerID("127.0.0.1")), + } +} + +func (m *MockClientConn) AppendEntriesPipeline() (raftPB.RaftService_AppendEntriesPipelineClient, error) { + return m.AppendEntriesPipelineFn() +} + +func TestTransport_AppendEntries(t *testing.T) { + ts1, err := testGetNewTransport("127.0.0.1:8006") + assert.NoError(t, err) + ts2, err := testGetNewTransport("127.0.0.1:8007") + assert.NoError(t, err) + defer ts1.Close() + defer ts2.Close() + stop := make(chan struct{}) + + go func() { + for { + select { + case <-stop: + return + case rpc := <-ts2.Consumer(): + if got, want := rpc.Command.(*raft.AppendEntriesRequest).Leader, []byte{3, 2, 1}; !bytes.Equal(got, want) { + t.Errorf("request.Leader = %v, want %v", got, want) + } + if got, want := rpc.Command.(*raft.AppendEntriesRequest).Entries, []*raft.Log{ + {Type: raft.LogNoop, Extensions: []byte{1}, Data: []byte{55}}, + }; !reflect.DeepEqual(got, want) { + t.Errorf("request.Entries = %v, want %v", got, want) + } + rpc.Respond(&raft.AppendEntriesResponse{ + Success: true, + LastLog: 12396, + }, nil) + } + } + }() + + var resp raft.AppendEntriesResponse + if err := ts1.AppendEntries("127.0.0.1:8007", "127.0.0.1:8007", &raft.AppendEntriesRequest{ + RPCHeader: raft.RPCHeader{ + ProtocolVersion: 3, + ID: []byte("127.0.0.1:8006"), + Addr: []byte("127.0.0.1:8006"), + }, + Leader: []byte{3, 2, 1}, + Entries: []*raft.Log{ + {Type: raft.LogNoop, Extensions: []byte{1}, Data: []byte{55}}, + }, + }, &resp); err != nil { + t.Errorf("AppendEntries() failed: %v", err) + } + if got, want := resp.LastLog, uint64(12396); got != want { + t.Errorf("resp.LastLog = %v, want %v", got, want) + } + close(stop) +} +func TestTransport_RequestVote(t *testing.T) { + ts1, err := testGetNewTransport("127.0.0.1:8006") + assert.NoError(t, err) + ts2, err := testGetNewTransport("127.0.0.1:8007") + assert.NoError(t, err) + defer ts1.Close() + defer ts2.Close() + stop := make(chan struct{}) + + go func() { + for { + select { + case <-stop: + return + case rpc := <-ts2.Consumer(): + if got, want := rpc.Command.(*raft.RequestVoteRequest).LastLogIndex, uint64(11); !assert.Equal(t, got, want) { + t.Errorf("request.Leader = %v, want %v", got, want) + } + if got, want := rpc.Command.(*raft.RequestVoteRequest).LastLogTerm, uint64(12); !reflect.DeepEqual(got, want) { + t.Errorf("request.Entries = %v, want %v", got, want) + } + rpc.Respond(&raft.RequestVoteResponse{ + RPCHeader: raft.RPCHeader{ + ProtocolVersion: 3, + ID: []byte(""), + Addr: []byte(""), + }, + Term: 0, + Peers: []byte("hello"), + Granted: false, + }, nil) + } + } + }() + + var resp raft.RequestVoteResponse + if err := ts1.RequestVote("127.0.0.1:8007", "127.0.0.1:8007", &raft.RequestVoteRequest{ + RPCHeader: raft.RPCHeader{ + ProtocolVersion: 3, + ID: []byte("127.0.0.1:8006"), + Addr: []byte("127.0.0.1:8006"), + }, + Term: 10, + Candidate: []byte("127.0.0.1:8006"), + LastLogIndex: 11, + LastLogTerm: 12, + LeadershipTransfer: false, + }, &resp); err != nil { + t.Errorf("RequestVote() failed: %v", err) + } + if got, want := resp.Peers, []byte("hello"); !bytes.Equal(got, want) { + t.Errorf("resp.LastLog = %v, want %v", got, want) + } + close(stop) +} +func TestTransport_TimeoutNow(t *testing.T) { + ts1, err := testGetNewTransport("127.0.0.1:8006") + assert.NoError(t, err) + ts2, err := testGetNewTransport("127.0.0.1:8007") + assert.NoError(t, err) + defer ts1.Close() + defer ts2.Close() + stop := make(chan struct{}) + + go func() { + for { + select { + case <-stop: + return + case rpc := <-ts2.Consumer(): + if got, want := rpc.Command.(*raft.TimeoutNowRequest).ID, []byte("127.0.0.1:8006"); !assert.Equal(t, got, want) { + t.Errorf("request.Leader = %v, want %v", got, want) + } + if got, want := rpc.Command.(*raft.TimeoutNowRequest).ProtocolVersion, raft.ProtocolVersion(3); !assert.Equal(t, got, want) { + t.Errorf("request.Entries = %v, want %v", got, want) + } + rpc.Respond(&raft.TimeoutNowResponse{ + RPCHeader: raft.RPCHeader{ + ProtocolVersion: 3, + ID: []byte("127.0.0.1:8006"), + Addr: []byte("127.0.0.1:8006"), + }, + }, nil) + } + } + }() + + var resp raft.TimeoutNowResponse + if err := ts1.TimeoutNow("127.0.0.1:8007", "127.0.0.1:8007", &raft.TimeoutNowRequest{ + RPCHeader: raft.RPCHeader{ + ProtocolVersion: 3, + ID: []byte("127.0.0.1:8006"), + Addr: []byte("127.0.0.1:8006"), + }, + }, &resp); err != nil { + t.Errorf("RequestVote() failed: %v", err) + } + if got, want := resp.Addr, []byte("127.0.0.1:8006"); !bytes.Equal(got, want) { + t.Errorf("resp.LastLog = %v, want %v", got, want) + } + close(stop) +} +func TestTransport_Consumer(t *testing.T) { + // Create a new transport with local address + conf := config.Config{LocalAddress: "localhost"} + l1 := bufconn.Listen(1024) + trans, err := newTransport(conf, l1, newDialOption(conf)) + assert.NoError(t, err) + + c := trans.Consumer() + assert.NotNil(t, c) +} +func TestTransport_LocalAddr(t *testing.T) { + // Create a new transport with local address + conf := config.Config{LocalAddress: "localhost"} + l1 := bufconn.Listen(1024) + trans, err := newTransport(conf, l1, newDialOption(conf)) + assert.NoError(t, err) + + c := trans.LocalAddr() + assert.NotNil(t, c) +} +func TestTransport_EncodePeer(t *testing.T) { + // Create a new transport with local address + conf := config.Config{LocalAddress: "localhost"} + l1 := bufconn.Listen(1024) + trans, err := newTransport(conf, l1, newDialOption(conf)) + assert.NoError(t, err) + + c := trans.EncodePeer(raft.ServerID("1"), "127.0.0.1") + assert.NotNil(t, c) + assert.Equal(t, c, []byte("127.0.0.1")) +} +func TestTransport_DecodePeer(t *testing.T) { + // Create a new transport with local address + conf := config.Config{LocalAddress: "localhost"} + l1 := bufconn.Listen(1024) + trans, err := newTransport(conf, l1, newDialOption(conf)) + assert.NoError(t, err) + + c := trans.DecodePeer([]byte("127.0.0.1")) + assert.NotNil(t, c) + assert.EqualValues(t, c, "127.0.0.1") +} diff --git a/config/cluster_options.go b/config/cluster_options.go index ec10dddf..8580beb0 100644 --- a/config/cluster_options.go +++ b/config/cluster_options.go @@ -1,6 +1,9 @@ package config -import "time" +import ( + "github.com/hashicorp/raft" + "time" +) type Config struct { ReplicationFactor int @@ -12,4 +15,6 @@ type Config struct { SnapshotStoragePath string LogDataStorageSize int64 HeartbeatInterval time.Duration + HeartbeatTimeout time.Duration + LocalAddress raft.ServerAddress } diff --git a/go.mod b/go.mod index e4f14199..1b50136c 100644 --- a/go.mod +++ b/go.mod @@ -13,9 +13,10 @@ require ( github.com/hashicorp/raft v1.5.0 github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702 github.com/klauspost/reedsolomon v1.11.7 + github.com/linxGnu/grocksdb v1.6.48 + github.com/pkg/errors v0.9.1 github.com/plar/go-adaptive-radix-tree v1.0.5 - github.com/stretchr/testify v1.8.2 - github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c + github.com/stretchr/testify v1.8.3 github.com/tidwall/wal v1.1.7 go.etcd.io/bbolt v1.3.7 go.uber.org/zap v1.24.0 @@ -30,9 +31,6 @@ require ( github.com/desertbit/columnize v2.1.0+incompatible // indirect github.com/desertbit/go-shlex v0.1.1 // indirect github.com/desertbit/readline v1.5.1 // indirect - github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect - github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect - github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect @@ -43,7 +41,6 @@ require ( github.com/kr/pretty v0.3.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.16 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/tidwall/gjson v1.14.4 // indirect diff --git a/lib/datastore/inmem.go b/lib/datastore/inmem.go index c958a1f4..34f59567 100644 --- a/lib/datastore/inmem.go +++ b/lib/datastore/inmem.go @@ -25,8 +25,8 @@ type InMemStore struct { func NewLogInMemStorage(conf config.Config) (DataStore, error) { a := &InMemStore{ logs: make(map[uint64]*raft.Log), - kv: map[string][]byte{}, - kvUint: map[string]uint64{}, + kv: make(map[string][]byte), + kvUint: make(map[string]uint64), } return a, nil } @@ -145,11 +145,8 @@ func (ds *InMemStore) GetUint64(key []byte) (uint64, error) { if len(key) == 0 { return 0, _const.ErrKeyIsEmpty } - val, ok := ds.kvUint[string(key)] - if !ok { - return 0, _const.ErrKeyNotFound - } - return val, nil + + return ds.kvUint[string(key)], nil } // min is a helper method on InMemStore that returns the smallest index in the log diff --git a/lib/datastore/inmem_test.go b/lib/datastore/inmem_test.go index 0d6d7aa8..a6e7bf0d 100644 --- a/lib/datastore/inmem_test.go +++ b/lib/datastore/inmem_test.go @@ -292,7 +292,7 @@ func TestInMemStore_SetUint64(t *testing.T) { for _, tc := range tests { // set all inputs for _, v := range tc.input { - err := ds.Set(stringToBytes(v.key), uint64ToBytes(v.val)) + err := ds.SetUint64(stringToBytes(v.key), v.val) if tc.expectError { assert.NotNil(t, err) } else { @@ -301,12 +301,12 @@ func TestInMemStore_SetUint64(t *testing.T) { } // recall all inputs for _, v := range tc.input { - val, err := ds.Get(stringToBytes(v.key)) + val, err := ds.GetUint64(stringToBytes(v.key)) if tc.expectError { assert.NotNil(t, err) } else { assert.NoError(t, err) - assert.Equal(t, v.val, bytesToUint64(val)) + assert.Equal(t, v.val, val) } } @@ -348,27 +348,28 @@ func TestInMemStore_GetUint64(t *testing.T) { {key: "1", val: 2}, {key: "2", val: 4}, }, - expectError: true, + expectError: false, // although key does not exist in the db, it should be created upon request }, } for _, tc := range tests { - ds := testMemoryDatastore() - // set all inputs - for _, v := range tc.input { - _ = ds.Set(stringToBytes(v.key), uint64ToBytes(v.val)) + t.Run(tc.name, func(t *testing.T) { + ds := testMemoryDatastore() + // set all inputs + for _, v := range tc.input { + _ = ds.Set(stringToBytes(v.key), uint64ToBytes(v.val)) - } - // recall all inputs - for _, v := range tc.query { - val, err := ds.Get(stringToBytes(v.key)) - if tc.expectError { - assert.NotNil(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, v.val, bytesToUint64(val)) } - } - + // recall all inputs + for _, v := range tc.query { + val, err := ds.GetUint64(stringToBytes(v.key)) + if tc.expectError { + assert.NotNil(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, v.val, val) + } + } + }) } } diff --git a/lib/datastore/rocksdb.go b/lib/datastore/rocksdb.go index 74b3b392..699cedef 100644 --- a/lib/datastore/rocksdb.go +++ b/lib/datastore/rocksdb.go @@ -5,7 +5,7 @@ import ( _const "github.com/ByteStorage/FlyDB/lib/const" "github.com/ByteStorage/FlyDB/lib/encoding" "github.com/hashicorp/raft" - "github.com/tecbot/gorocksdb" + "github.com/linxGnu/grocksdb" "sync" ) @@ -13,16 +13,19 @@ import ( // It uses BoltDB as the underlying storage and a read-write mutex for concurrency control type RocksDbStore struct { mux sync.RWMutex - conn *gorocksdb.DB + conn *grocksdb.DB } // NewLogRocksDbStorage is a function that creates a new RocksDB store // It takes a configuration map as input and returns a raft.LogStore and an error func NewLogRocksDbStorage(conf config.Config) (DataStore, error) { filename := conf.LogDataStoragePath - options := gorocksdb.NewDefaultOptions() + bbto := grocksdb.NewDefaultBlockBasedTableOptions() + bbto.SetBlockCache(grocksdb.NewLRUCache(3 << 30)) + options := grocksdb.NewDefaultOptions() options.SetCreateIfMissing(true) - db, err := gorocksdb.OpenDb(options, filename) + options.SetBlockBasedTableFactory(bbto) + db, err := grocksdb.OpenDb(options, filename) if err != nil { return nil, err } @@ -35,7 +38,7 @@ func NewLogRocksDbStorage(conf config.Config) (DataStore, error) { // FirstIndex is a method on RocksDbStore that returns the first index in the log func (rds *RocksDbStore) FirstIndex() (uint64, error) { - ro := gorocksdb.NewDefaultReadOptions() + ro := grocksdb.NewDefaultReadOptions() defer ro.Destroy() it := rds.conn.NewIterator(ro) it.SeekToFirst() @@ -44,7 +47,7 @@ func (rds *RocksDbStore) FirstIndex() (uint64, error) { // LastIndex is a method on RocksDbStore that returns the last index in the log func (rds *RocksDbStore) LastIndex() (uint64, error) { - ro := gorocksdb.NewDefaultReadOptions() + ro := grocksdb.NewDefaultReadOptions() defer ro.Destroy() it := rds.conn.NewIterator(ro) it.SeekToLast() @@ -53,7 +56,7 @@ func (rds *RocksDbStore) LastIndex() (uint64, error) { // GetLog is a method on RocksDbStore that retrieves a log entry by its index func (rds *RocksDbStore) GetLog(index uint64, log *raft.Log) error { - ro := gorocksdb.NewDefaultReadOptions() + ro := grocksdb.NewDefaultReadOptions() defer ro.Destroy() val, err := rds.conn.Get(ro, uint64ToBytes(index)) @@ -74,8 +77,8 @@ func (rds *RocksDbStore) StoreLog(log *raft.Log) error { // StoreLogs is a method on RocksDbStore that stores multiple log entries func (rds *RocksDbStore) StoreLogs(logs []*raft.Log) error { - wo := gorocksdb.NewDefaultWriteOptions() - wb := gorocksdb.NewWriteBatch() + wo := grocksdb.NewDefaultWriteOptions() + wb := grocksdb.NewWriteBatch() wo.SetSync(true) defer func() { wo.Destroy() @@ -100,9 +103,9 @@ func (rds *RocksDbStore) StoreLogs(logs []*raft.Log) error { // DeleteRange is a method on RocksDbStore that deletes a range of log entries func (rds *RocksDbStore) DeleteRange(min, max uint64) error { - ro := gorocksdb.NewDefaultReadOptions() - wo := gorocksdb.NewDefaultWriteOptions() - wb := gorocksdb.NewWriteBatch() + ro := grocksdb.NewDefaultReadOptions() + wo := grocksdb.NewDefaultWriteOptions() + wb := grocksdb.NewWriteBatch() defer func() { ro.Destroy() wo.Destroy() @@ -120,7 +123,7 @@ func (rds *RocksDbStore) Set(key []byte, val []byte) error { if len(key) == 0 { return _const.ErrKeyIsEmpty } - wo := gorocksdb.NewDefaultWriteOptions() + wo := grocksdb.NewDefaultWriteOptions() wo.SetSync(true) defer wo.Destroy() err := rds.conn.Put(wo, key, val) @@ -134,7 +137,7 @@ func (rds *RocksDbStore) Get(key []byte) ([]byte, error) { if len(key) == 0 { return nil, _const.ErrKeyIsEmpty } - ro := gorocksdb.NewDefaultReadOptions() + ro := grocksdb.NewDefaultReadOptions() defer ro.Destroy() val, err := rds.conn.Get(ro, key) @@ -151,7 +154,7 @@ func (rds *RocksDbStore) SetUint64(key []byte, val uint64) error { if len(key) == 0 { return _const.ErrKeyIsEmpty } - wo := gorocksdb.NewDefaultWriteOptions() + wo := grocksdb.NewDefaultWriteOptions() wo.SetSync(true) defer wo.Destroy() err := rds.conn.Put(wo, key, uint64ToBytes(val)) @@ -165,7 +168,7 @@ func (rds *RocksDbStore) GetUint64(key []byte) (uint64, error) { if len(key) == 0 { return 0, _const.ErrKeyIsEmpty } - ro := gorocksdb.NewDefaultReadOptions() + ro := grocksdb.NewDefaultReadOptions() defer ro.Destroy() val, err := rds.conn.Get(ro, key) diff --git a/lib/encoding/raft.go b/lib/encoding/raft.go new file mode 100644 index 00000000..43d63767 --- /dev/null +++ b/lib/encoding/raft.go @@ -0,0 +1,243 @@ +package encoding + +import ( + raftPB "github.com/ByteStorage/FlyDB/lib/proto/raft" + "github.com/hashicorp/raft" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func EncodeRPCHeader(s raft.RPCHeader) *raftPB.RPCHeader { + return &raftPB.RPCHeader{ + ProtocolVersion: int64(s.ProtocolVersion), + ID: s.ID, + Addr: s.Addr, + } +} +func DecodeRPCHeader(m *raftPB.RPCHeader) raft.RPCHeader { + return raft.RPCHeader{ + ProtocolVersion: raft.ProtocolVersion(m.ProtocolVersion), + ID: m.ID, + Addr: m.Addr, + } +} + +func EncodeRequestVoteRequest(s *raft.RequestVoteRequest) *raftPB.RequestVoteRequest { + return &raftPB.RequestVoteRequest{ + RpcHeader: EncodeRPCHeader(s.RPCHeader), + Term: s.Term, + Candidate: s.Candidate, + LastLogIndex: s.LastLogIndex, + LastLogTerm: s.LastLogTerm, + LeadershipTransfer: s.LeadershipTransfer, + } +} +func DecodeRequestVoteRequest(s *raftPB.RequestVoteRequest) *raft.RequestVoteRequest { + return &raft.RequestVoteRequest{ + RPCHeader: DecodeRPCHeader(s.RpcHeader), + Term: s.Term, + Candidate: s.Candidate, + LastLogIndex: s.LastLogIndex, + LastLogTerm: s.LastLogTerm, + LeadershipTransfer: s.LeadershipTransfer, + } +} + +func DecodeRequestVoteResponse(m *raftPB.RequestVoteResponse) *raft.RequestVoteResponse { + return &raft.RequestVoteResponse{ + RPCHeader: DecodeRPCHeader(m.RpcHeader), + Term: m.Term, + Peers: m.Peers, + Granted: m.Granted, + } +} +func EncodeRequestVoteResponse(m *raft.RequestVoteResponse) *raftPB.RequestVoteResponse { + return &raftPB.RequestVoteResponse{ + RpcHeader: EncodeRPCHeader(m.RPCHeader), + Term: m.Term, + Peers: m.Peers, + Granted: m.Granted, + } +} + +func EncodeAppendEntriesRequest(s *raft.AppendEntriesRequest) *raftPB.AppendEntriesRequest { + return &raftPB.AppendEntriesRequest{ + RpcHeader: EncodeRPCHeader(s.RPCHeader), + Term: s.Term, + Leader: s.Leader, + PrevLogEntry: s.PrevLogEntry, + PrevLogTerm: s.PrevLogTerm, + Entries: encodeLogs(s.Entries), + LeaderCommitIndex: s.LeaderCommitIndex, + } +} +func DecodeAppendEntriesRequest(m *raftPB.AppendEntriesRequest) *raft.AppendEntriesRequest { + return &raft.AppendEntriesRequest{ + RPCHeader: DecodeRPCHeader(m.RpcHeader), + Term: m.Term, + Leader: m.Leader, + PrevLogEntry: m.PrevLogEntry, + PrevLogTerm: m.PrevLogTerm, + Entries: decodeLogs(m.Entries), + LeaderCommitIndex: m.LeaderCommitIndex, + } +} + +func DecodeAppendEntriesResponse(m *raftPB.AppendEntriesResponse) *raft.AppendEntriesResponse { + return &raft.AppendEntriesResponse{ + RPCHeader: DecodeRPCHeader(m.RpcHeader), + Term: m.Term, + LastLog: m.LastLog, + Success: m.Success, + NoRetryBackoff: m.NoRetryBackoff, + } +} +func EncodeAppendEntriesResponse(m *raft.AppendEntriesResponse) *raftPB.AppendEntriesResponse { + return &raftPB.AppendEntriesResponse{ + RpcHeader: EncodeRPCHeader(m.RPCHeader), + Term: m.Term, + LastLog: m.LastLog, + Success: m.Success, + NoRetryBackoff: m.NoRetryBackoff, + } +} + +func EncodeTimeoutNowRequest(s *raft.TimeoutNowRequest) *raftPB.TimeoutNowRequest { + return &raftPB.TimeoutNowRequest{ + RpcHeader: EncodeRPCHeader(s.RPCHeader), + } +} +func DecodeTimeoutNowRequest(s *raftPB.TimeoutNowRequest) *raft.TimeoutNowRequest { + return &raft.TimeoutNowRequest{ + RPCHeader: DecodeRPCHeader(s.RpcHeader), + } +} + +func DecodeTimeoutNowResponse(m *raftPB.TimeoutNowResponse) *raft.TimeoutNowResponse { + return &raft.TimeoutNowResponse{ + RPCHeader: DecodeRPCHeader(m.RpcHeader), + } +} +func EncodeTimeoutNowResponse(m *raft.TimeoutNowResponse) *raftPB.TimeoutNowResponse { + return &raftPB.TimeoutNowResponse{ + RpcHeader: EncodeRPCHeader(m.RPCHeader), + } +} + +func encodeLogs(s []*raft.Log) []*raftPB.Log { + ret := make([]*raftPB.Log, len(s)) + for i, l := range s { + ret[i] = encodeLog(l) + } + return ret +} +func decodeLogs(ls []*raftPB.Log) []*raft.Log { + ret := make([]*raft.Log, len(ls)) + for i, l := range ls { + ret[i] = decodeLog(l) + } + return ret +} + +func encodeLog(l *raft.Log) *raftPB.Log { + ret := &raftPB.Log{ + Index: l.Index, + Term: l.Term, + Type: encodeLogType(l.Type), + Data: l.Data, + Extensions: l.Extensions, + AppendedAt: timestamppb.New(l.AppendedAt), + } + return ret +} +func decodeLog(l *raftPB.Log) *raft.Log { + ret := &raft.Log{ + Index: l.Index, + Term: l.Term, + Type: decodeLogType(l.Type), + Data: l.Data, + Extensions: l.Extensions, + AppendedAt: l.AppendedAt.AsTime(), + } + return ret +} + +func encodeLogType(s raft.LogType) raftPB.Log_LogType { + switch s { + case raft.LogCommand: + return raftPB.Log_LOG_COMMAND + case raft.LogNoop: + return raftPB.Log_LOG_NOOP + case raft.LogAddPeerDeprecated: + return raftPB.Log_LOG_ADD_PEER_DEPRECATED + case raft.LogRemovePeerDeprecated: + return raftPB.Log_LOG_REMOVE_PEER_DEPRECATED + case raft.LogBarrier: + return raftPB.Log_LOG_BARRIER + case raft.LogConfiguration: + return raftPB.Log_LOG_CONFIGURATION + default: + panic("invalid LogType") + } +} +func decodeLogType(s raftPB.Log_LogType) raft.LogType { + switch s { + case raftPB.Log_LOG_COMMAND: + return raft.LogCommand + case raftPB.Log_LOG_NOOP: + return raft.LogNoop + case raftPB.Log_LOG_ADD_PEER_DEPRECATED: + return raft.LogAddPeerDeprecated + case raftPB.Log_LOG_REMOVE_PEER_DEPRECATED: + return raft.LogRemovePeerDeprecated + case raftPB.Log_LOG_BARRIER: + return raft.LogBarrier + case raftPB.Log_LOG_CONFIGURATION: + return raft.LogConfiguration + default: + panic("invalid LogType") + } +} + +func EncodeInstallSnapshotRequest(s *raft.InstallSnapshotRequest) *raftPB.InstallSnapshotRequest { + return &raftPB.InstallSnapshotRequest{ + RpcHeader: EncodeRPCHeader(s.RPCHeader), + SnapshotVersion: int64(s.SnapshotVersion), + Term: s.Term, + Leader: s.Leader, + LastLogIndex: s.LastLogIndex, + LastLogTerm: s.LastLogTerm, + Peers: s.Peers, + Configuration: s.Configuration, + ConfigurationIndex: s.ConfigurationIndex, + Size: s.Size, + } +} +func DecodeInstallSnapshotRequest(s *raftPB.InstallSnapshotRequest) *raft.InstallSnapshotRequest { + return &raft.InstallSnapshotRequest{ + RPCHeader: DecodeRPCHeader(s.RpcHeader), + SnapshotVersion: raft.SnapshotVersion(s.SnapshotVersion), + Term: s.Term, + Leader: s.Leader, + LastLogIndex: s.LastLogIndex, + LastLogTerm: s.LastLogTerm, + Peers: s.Peers, + Configuration: s.Configuration, + ConfigurationIndex: s.ConfigurationIndex, + Size: s.Size, + } +} + +func DecodeInstallSnapshotResponse(m *raftPB.InstallSnapshotResponse) *raft.InstallSnapshotResponse { + return &raft.InstallSnapshotResponse{ + RPCHeader: DecodeRPCHeader(m.RpcHeader), + Term: m.Term, + Success: m.Success, + } +} +func EncodeInstallSnapshotResponse(m *raft.InstallSnapshotResponse) *raftPB.InstallSnapshotResponse { + return &raftPB.InstallSnapshotResponse{ + RpcHeader: EncodeRPCHeader(m.RPCHeader), + Term: m.Term, + Success: m.Success, + } +} diff --git a/lib/proto/raft/doc.go b/lib/proto/raft/doc.go new file mode 100644 index 00000000..7b2256f8 --- /dev/null +++ b/lib/proto/raft/doc.go @@ -0,0 +1,5 @@ +package raftPB + +// note: run ```go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28``` +// note: run ```go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2``` +//go:generate protoc --go_out=. --go-grpc_out=. ./raft.proto diff --git a/lib/proto/raft/raft.pb.go b/lib/proto/raft/raft.pb.go new file mode 100644 index 00000000..c669f42a --- /dev/null +++ b/lib/proto/raft/raft.pb.go @@ -0,0 +1,1212 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0-devel +// protoc v3.14.0 +// source: raft.proto + +package raftPB + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Log_LogType int32 + +const ( + Log_LOG_COMMAND Log_LogType = 0 + Log_LOG_NOOP Log_LogType = 1 + Log_LOG_ADD_PEER_DEPRECATED Log_LogType = 2 + Log_LOG_REMOVE_PEER_DEPRECATED Log_LogType = 3 + Log_LOG_BARRIER Log_LogType = 4 + Log_LOG_CONFIGURATION Log_LogType = 5 +) + +// Enum value maps for Log_LogType. +var ( + Log_LogType_name = map[int32]string{ + 0: "LOG_COMMAND", + 1: "LOG_NOOP", + 2: "LOG_ADD_PEER_DEPRECATED", + 3: "LOG_REMOVE_PEER_DEPRECATED", + 4: "LOG_BARRIER", + 5: "LOG_CONFIGURATION", + } + Log_LogType_value = map[string]int32{ + "LOG_COMMAND": 0, + "LOG_NOOP": 1, + "LOG_ADD_PEER_DEPRECATED": 2, + "LOG_REMOVE_PEER_DEPRECATED": 3, + "LOG_BARRIER": 4, + "LOG_CONFIGURATION": 5, + } +) + +func (x Log_LogType) Enum() *Log_LogType { + p := new(Log_LogType) + *p = x + return p +} + +func (x Log_LogType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Log_LogType) Descriptor() protoreflect.EnumDescriptor { + return file_raft_proto_enumTypes[0].Descriptor() +} + +func (Log_LogType) Type() protoreflect.EnumType { + return &file_raft_proto_enumTypes[0] +} + +func (x Log_LogType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Log_LogType.Descriptor instead. +func (Log_LogType) EnumDescriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{4, 0} +} + +type RPCHeader struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProtocolVersion int64 `protobuf:"varint,1,opt,name=ProtocolVersion,proto3" json:"ProtocolVersion,omitempty"` + ID []byte `protobuf:"bytes,2,opt,name=ID,proto3" json:"ID,omitempty"` + Addr []byte `protobuf:"bytes,3,opt,name=Addr,proto3" json:"Addr,omitempty"` +} + +func (x *RPCHeader) Reset() { + *x = RPCHeader{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RPCHeader) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RPCHeader) ProtoMessage() {} + +func (x *RPCHeader) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RPCHeader.ProtoReflect.Descriptor instead. +func (*RPCHeader) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{0} +} + +func (x *RPCHeader) GetProtocolVersion() int64 { + if x != nil { + return x.ProtocolVersion + } + return 0 +} + +func (x *RPCHeader) GetID() []byte { + if x != nil { + return x.ID + } + return nil +} + +func (x *RPCHeader) GetAddr() []byte { + if x != nil { + return x.Addr + } + return nil +} + +// RequestVoteRequest are initiated by node in Candidate state, on election. +// Fields matched to condensed summary of Raft consensus algorithm in ISUCA. +type RequestVoteRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` + Term uint64 `protobuf:"varint,2,opt,name=term,proto3" json:"term,omitempty"` + Candidate []byte `protobuf:"bytes,3,opt,name=candidate,proto3" json:"candidate,omitempty"` + LastLogIndex uint64 `protobuf:"varint,4,opt,name=last_log_index,json=lastLogIndex,proto3" json:"last_log_index,omitempty"` + LastLogTerm uint64 `protobuf:"varint,5,opt,name=last_log_term,json=lastLogTerm,proto3" json:"last_log_term,omitempty"` + LeadershipTransfer bool `protobuf:"varint,6,opt,name=leadership_transfer,json=leadershipTransfer,proto3" json:"leadership_transfer,omitempty"` +} + +func (x *RequestVoteRequest) Reset() { + *x = RequestVoteRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RequestVoteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestVoteRequest) ProtoMessage() {} + +func (x *RequestVoteRequest) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestVoteRequest.ProtoReflect.Descriptor instead. +func (*RequestVoteRequest) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{1} +} + +func (x *RequestVoteRequest) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +func (x *RequestVoteRequest) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *RequestVoteRequest) GetCandidate() []byte { + if x != nil { + return x.Candidate + } + return nil +} + +func (x *RequestVoteRequest) GetLastLogIndex() uint64 { + if x != nil { + return x.LastLogIndex + } + return 0 +} + +func (x *RequestVoteRequest) GetLastLogTerm() uint64 { + if x != nil { + return x.LastLogTerm + } + return 0 +} + +func (x *RequestVoteRequest) GetLeadershipTransfer() bool { + if x != nil { + return x.LeadershipTransfer + } + return false +} + +type RequestVoteResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` + Term uint64 `protobuf:"varint,2,opt,name=term,proto3" json:"term,omitempty"` + Peers []byte `protobuf:"bytes,3,opt,name=peers,proto3" json:"peers,omitempty"` // deprecated as of version 2+ + Granted bool `protobuf:"varint,4,opt,name=granted,proto3" json:"granted,omitempty"` +} + +func (x *RequestVoteResponse) Reset() { + *x = RequestVoteResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RequestVoteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestVoteResponse) ProtoMessage() {} + +func (x *RequestVoteResponse) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestVoteResponse.ProtoReflect.Descriptor instead. +func (*RequestVoteResponse) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{2} +} + +func (x *RequestVoteResponse) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +func (x *RequestVoteResponse) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *RequestVoteResponse) GetPeers() []byte { + if x != nil { + return x.Peers + } + return nil +} + +func (x *RequestVoteResponse) GetGranted() bool { + if x != nil { + return x.Granted + } + return false +} + +type AppendEntriesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` + Term uint64 `protobuf:"varint,2,opt,name=term,proto3" json:"term,omitempty"` + Leader []byte `protobuf:"bytes,3,opt,name=leader,proto3" json:"leader,omitempty"` + PrevLogEntry uint64 `protobuf:"varint,4,opt,name=prev_log_entry,json=prevLogEntry,proto3" json:"prev_log_entry,omitempty"` + PrevLogTerm uint64 `protobuf:"varint,5,opt,name=prev_log_term,json=prevLogTerm,proto3" json:"prev_log_term,omitempty"` + Entries []*Log `protobuf:"bytes,6,rep,name=entries,proto3" json:"entries,omitempty"` + LeaderCommitIndex uint64 `protobuf:"varint,7,opt,name=leader_commit_index,json=leaderCommitIndex,proto3" json:"leader_commit_index,omitempty"` +} + +func (x *AppendEntriesRequest) Reset() { + *x = AppendEntriesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AppendEntriesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AppendEntriesRequest) ProtoMessage() {} + +func (x *AppendEntriesRequest) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AppendEntriesRequest.ProtoReflect.Descriptor instead. +func (*AppendEntriesRequest) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{3} +} + +func (x *AppendEntriesRequest) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +func (x *AppendEntriesRequest) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *AppendEntriesRequest) GetLeader() []byte { + if x != nil { + return x.Leader + } + return nil +} + +func (x *AppendEntriesRequest) GetPrevLogEntry() uint64 { + if x != nil { + return x.PrevLogEntry + } + return 0 +} + +func (x *AppendEntriesRequest) GetPrevLogTerm() uint64 { + if x != nil { + return x.PrevLogTerm + } + return 0 +} + +func (x *AppendEntriesRequest) GetEntries() []*Log { + if x != nil { + return x.Entries + } + return nil +} + +func (x *AppendEntriesRequest) GetLeaderCommitIndex() uint64 { + if x != nil { + return x.LeaderCommitIndex + } + return 0 +} + +type Log struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` + Term uint64 `protobuf:"varint,2,opt,name=term,proto3" json:"term,omitempty"` + Type Log_LogType `protobuf:"varint,3,opt,name=type,proto3,enum=raft_pb.Log_LogType" json:"type,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` + Extensions []byte `protobuf:"bytes,5,opt,name=extensions,proto3" json:"extensions,omitempty"` + AppendedAt *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=appended_at,json=appendedAt,proto3" json:"appended_at,omitempty"` +} + +func (x *Log) Reset() { + *x = Log{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Log) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Log) ProtoMessage() {} + +func (x *Log) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Log.ProtoReflect.Descriptor instead. +func (*Log) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{4} +} + +func (x *Log) GetIndex() uint64 { + if x != nil { + return x.Index + } + return 0 +} + +func (x *Log) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *Log) GetType() Log_LogType { + if x != nil { + return x.Type + } + return Log_LOG_COMMAND +} + +func (x *Log) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *Log) GetExtensions() []byte { + if x != nil { + return x.Extensions + } + return nil +} + +func (x *Log) GetAppendedAt() *timestamppb.Timestamp { + if x != nil { + return x.AppendedAt + } + return nil +} + +type AppendEntriesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` + Term uint64 `protobuf:"varint,2,opt,name=term,proto3" json:"term,omitempty"` + LastLog uint64 `protobuf:"varint,3,opt,name=last_log,json=lastLog,proto3" json:"last_log,omitempty"` + Success bool `protobuf:"varint,4,opt,name=success,proto3" json:"success,omitempty"` + NoRetryBackoff bool `protobuf:"varint,5,opt,name=no_retry_backoff,json=noRetryBackoff,proto3" json:"no_retry_backoff,omitempty"` +} + +func (x *AppendEntriesResponse) Reset() { + *x = AppendEntriesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AppendEntriesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AppendEntriesResponse) ProtoMessage() {} + +func (x *AppendEntriesResponse) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AppendEntriesResponse.ProtoReflect.Descriptor instead. +func (*AppendEntriesResponse) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{5} +} + +func (x *AppendEntriesResponse) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +func (x *AppendEntriesResponse) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *AppendEntriesResponse) GetLastLog() uint64 { + if x != nil { + return x.LastLog + } + return 0 +} + +func (x *AppendEntriesResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *AppendEntriesResponse) GetNoRetryBackoff() bool { + if x != nil { + return x.NoRetryBackoff + } + return false +} + +type TimeoutNowRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` +} + +func (x *TimeoutNowRequest) Reset() { + *x = TimeoutNowRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TimeoutNowRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TimeoutNowRequest) ProtoMessage() {} + +func (x *TimeoutNowRequest) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TimeoutNowRequest.ProtoReflect.Descriptor instead. +func (*TimeoutNowRequest) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{6} +} + +func (x *TimeoutNowRequest) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +type TimeoutNowResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` +} + +func (x *TimeoutNowResponse) Reset() { + *x = TimeoutNowResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TimeoutNowResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TimeoutNowResponse) ProtoMessage() {} + +func (x *TimeoutNowResponse) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TimeoutNowResponse.ProtoReflect.Descriptor instead. +func (*TimeoutNowResponse) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{7} +} + +func (x *TimeoutNowResponse) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +type InstallSnapshotRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` + SnapshotVersion int64 `protobuf:"varint,2,opt,name=snapshot_version,json=snapshotVersion,proto3" json:"snapshot_version,omitempty"` + Term uint64 `protobuf:"varint,3,opt,name=term,proto3" json:"term,omitempty"` + Leader []byte `protobuf:"bytes,4,opt,name=leader,proto3" json:"leader,omitempty"` + LastLogIndex uint64 `protobuf:"varint,5,opt,name=last_log_index,json=lastLogIndex,proto3" json:"last_log_index,omitempty"` + LastLogTerm uint64 `protobuf:"varint,6,opt,name=last_log_term,json=lastLogTerm,proto3" json:"last_log_term,omitempty"` + Peers []byte `protobuf:"bytes,7,opt,name=peers,proto3" json:"peers,omitempty"` + Configuration []byte `protobuf:"bytes,8,opt,name=configuration,proto3" json:"configuration,omitempty"` + ConfigurationIndex uint64 `protobuf:"varint,9,opt,name=configuration_index,json=configurationIndex,proto3" json:"configuration_index,omitempty"` + Size int64 `protobuf:"varint,10,opt,name=size,proto3" json:"size,omitempty"` + Data []byte `protobuf:"bytes,11,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *InstallSnapshotRequest) Reset() { + *x = InstallSnapshotRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *InstallSnapshotRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InstallSnapshotRequest) ProtoMessage() {} + +func (x *InstallSnapshotRequest) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InstallSnapshotRequest.ProtoReflect.Descriptor instead. +func (*InstallSnapshotRequest) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{8} +} + +func (x *InstallSnapshotRequest) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +func (x *InstallSnapshotRequest) GetSnapshotVersion() int64 { + if x != nil { + return x.SnapshotVersion + } + return 0 +} + +func (x *InstallSnapshotRequest) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *InstallSnapshotRequest) GetLeader() []byte { + if x != nil { + return x.Leader + } + return nil +} + +func (x *InstallSnapshotRequest) GetLastLogIndex() uint64 { + if x != nil { + return x.LastLogIndex + } + return 0 +} + +func (x *InstallSnapshotRequest) GetLastLogTerm() uint64 { + if x != nil { + return x.LastLogTerm + } + return 0 +} + +func (x *InstallSnapshotRequest) GetPeers() []byte { + if x != nil { + return x.Peers + } + return nil +} + +func (x *InstallSnapshotRequest) GetConfiguration() []byte { + if x != nil { + return x.Configuration + } + return nil +} + +func (x *InstallSnapshotRequest) GetConfigurationIndex() uint64 { + if x != nil { + return x.ConfigurationIndex + } + return 0 +} + +func (x *InstallSnapshotRequest) GetSize() int64 { + if x != nil { + return x.Size + } + return 0 +} + +func (x *InstallSnapshotRequest) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type InstallSnapshotResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RpcHeader *RPCHeader `protobuf:"bytes,1,opt,name=rpc_header,json=rpcHeader,proto3" json:"rpc_header,omitempty"` + Term uint64 `protobuf:"varint,2,opt,name=term,proto3" json:"term,omitempty"` + Success bool `protobuf:"varint,3,opt,name=success,proto3" json:"success,omitempty"` +} + +func (x *InstallSnapshotResponse) Reset() { + *x = InstallSnapshotResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *InstallSnapshotResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InstallSnapshotResponse) ProtoMessage() {} + +func (x *InstallSnapshotResponse) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InstallSnapshotResponse.ProtoReflect.Descriptor instead. +func (*InstallSnapshotResponse) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{9} +} + +func (x *InstallSnapshotResponse) GetRpcHeader() *RPCHeader { + if x != nil { + return x.RpcHeader + } + return nil +} + +func (x *InstallSnapshotResponse) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *InstallSnapshotResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +var File_raft_proto protoreflect.FileDescriptor + +var file_raft_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x72, 0x61, 0x66, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x72, 0x61, + 0x66, 0x74, 0x5f, 0x70, 0x62, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x09, 0x52, 0x50, 0x43, 0x48, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x12, 0x28, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0f, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, + 0x02, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x49, 0x44, 0x12, 0x12, 0x0a, + 0x04, 0x41, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x41, 0x64, 0x64, + 0x72, 0x22, 0xf4, 0x01, 0x0a, 0x12, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x0a, 0x72, 0x70, 0x63, 0x5f, + 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x72, + 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x50, 0x43, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x52, 0x09, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, + 0x65, 0x72, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, + 0x1c, 0x0a, 0x09, 0x63, 0x61, 0x6e, 0x64, 0x69, 0x64, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x09, 0x63, 0x61, 0x6e, 0x64, 0x69, 0x64, 0x61, 0x74, 0x65, 0x12, 0x24, 0x0a, + 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x49, 0x6e, + 0x64, 0x65, 0x78, 0x12, 0x22, 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, + 0x74, 0x65, 0x72, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, + 0x4c, 0x6f, 0x67, 0x54, 0x65, 0x72, 0x6d, 0x12, 0x2f, 0x0a, 0x13, 0x6c, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x73, 0x68, 0x69, 0x70, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x22, 0x8c, 0x01, 0x0a, 0x13, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x31, 0x0a, 0x0a, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, + 0x50, 0x43, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x09, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x12, 0x18, 0x0a, + 0x07, 0x67, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, + 0x67, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x64, 0x22, 0x97, 0x02, 0x0a, 0x14, 0x41, 0x70, 0x70, 0x65, + 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x31, 0x0a, 0x0a, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, + 0x50, 0x43, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x09, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x16, 0x0a, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, + 0x24, 0x0a, 0x0e, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x65, 0x6e, 0x74, 0x72, + 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x70, 0x72, 0x65, 0x76, 0x4c, 0x6f, 0x67, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x22, 0x0a, 0x0d, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x6c, 0x6f, + 0x67, 0x5f, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x70, 0x72, + 0x65, 0x76, 0x4c, 0x6f, 0x67, 0x54, 0x65, 0x72, 0x6d, 0x12, 0x26, 0x0a, 0x07, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x72, 0x61, 0x66, + 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x67, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, + 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, + 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x11, + 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x49, 0x6e, 0x64, 0x65, + 0x78, 0x22, 0xda, 0x02, 0x0a, 0x03, 0x4c, 0x6f, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, + 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, + 0x65, 0x72, 0x6d, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x14, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x67, 0x2e, + 0x4c, 0x6f, 0x67, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, + 0x61, 0x12, 0x1e, 0x0a, 0x0a, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, + 0x73, 0x12, 0x3b, 0x0a, 0x0b, 0x61, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x64, 0x5f, 0x61, 0x74, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x0a, 0x61, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x64, 0x41, 0x74, 0x22, 0x8d, + 0x01, 0x0a, 0x07, 0x4c, 0x6f, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x4c, 0x4f, + 0x47, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x4c, + 0x4f, 0x47, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x4c, 0x4f, 0x47, + 0x5f, 0x41, 0x44, 0x44, 0x5f, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x44, 0x45, 0x50, 0x52, 0x45, 0x43, + 0x41, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x4c, 0x4f, 0x47, 0x5f, 0x52, 0x45, + 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x44, 0x45, 0x50, 0x52, 0x45, 0x43, + 0x41, 0x54, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0f, 0x0a, 0x0b, 0x4c, 0x4f, 0x47, 0x5f, 0x42, 0x41, + 0x52, 0x52, 0x49, 0x45, 0x52, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x4c, 0x4f, 0x47, 0x5f, 0x43, + 0x4f, 0x4e, 0x46, 0x49, 0x47, 0x55, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x10, 0x05, 0x22, 0xbd, + 0x01, 0x0a, 0x15, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x0a, 0x72, 0x70, 0x63, 0x5f, + 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x72, + 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x50, 0x43, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x52, 0x09, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, + 0x65, 0x72, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, + 0x19, 0x0a, 0x08, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x07, 0x6c, 0x61, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, + 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, + 0x63, 0x65, 0x73, 0x73, 0x12, 0x28, 0x0a, 0x10, 0x6e, 0x6f, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x79, + 0x5f, 0x62, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, + 0x6e, 0x6f, 0x52, 0x65, 0x74, 0x72, 0x79, 0x42, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x22, 0x46, + 0x0a, 0x11, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4e, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x0a, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, + 0x62, 0x2e, 0x52, 0x50, 0x43, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x09, 0x72, 0x70, 0x63, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x47, 0x0a, 0x12, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, + 0x74, 0x4e, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x0a, + 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x12, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x50, 0x43, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x52, 0x09, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, + 0x81, 0x03, 0x0a, 0x16, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x6e, 0x61, 0x70, 0x73, + 0x68, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x0a, 0x72, 0x70, + 0x63, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, + 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x50, 0x43, 0x48, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x52, 0x09, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x29, 0x0a, + 0x10, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x16, 0x0a, 0x06, + 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6c, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, + 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x6c, 0x61, + 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x22, 0x0a, 0x0d, 0x6c, 0x61, + 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x06, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x54, 0x65, 0x72, 0x6d, 0x12, 0x14, + 0x0a, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2f, 0x0a, 0x13, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x64, 0x65, + 0x78, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, 0x52, 0x12, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x12, 0x0a, 0x04, 0x73, + 0x69, 0x7a, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, + 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x22, 0x7a, 0x0a, 0x17, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x6e, + 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, + 0x0a, 0x0a, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x50, 0x43, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x09, 0x72, 0x70, 0x63, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x32, + 0xa8, 0x03, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x48, 0x0a, 0x0b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x12, 0x1b, + 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x56, 0x6f, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x72, 0x61, + 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a, 0x0d, 0x41, 0x70, 0x70, + 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x1d, 0x2e, 0x72, 0x61, 0x66, + 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, + 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, + 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5c, 0x0a, 0x15, 0x41, + 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x50, 0x69, 0x70, 0x65, + 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x1d, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x41, + 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x70, + 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x47, 0x0a, 0x0a, 0x54, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x4e, 0x6f, 0x77, 0x12, 0x1a, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, + 0x62, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4e, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4e, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x56, 0x0a, 0x0f, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x6e, 0x61, + 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x1f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, 0x2e, + 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x70, 0x62, + 0x2e, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x3b, + 0x72, 0x61, 0x66, 0x74, 0x50, 0x42, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_raft_proto_rawDescOnce sync.Once + file_raft_proto_rawDescData = file_raft_proto_rawDesc +) + +func file_raft_proto_rawDescGZIP() []byte { + file_raft_proto_rawDescOnce.Do(func() { + file_raft_proto_rawDescData = protoimpl.X.CompressGZIP(file_raft_proto_rawDescData) + }) + return file_raft_proto_rawDescData +} + +var file_raft_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_raft_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_raft_proto_goTypes = []interface{}{ + (Log_LogType)(0), // 0: raft_pb.Log.LogType + (*RPCHeader)(nil), // 1: raft_pb.RPCHeader + (*RequestVoteRequest)(nil), // 2: raft_pb.RequestVoteRequest + (*RequestVoteResponse)(nil), // 3: raft_pb.RequestVoteResponse + (*AppendEntriesRequest)(nil), // 4: raft_pb.AppendEntriesRequest + (*Log)(nil), // 5: raft_pb.Log + (*AppendEntriesResponse)(nil), // 6: raft_pb.AppendEntriesResponse + (*TimeoutNowRequest)(nil), // 7: raft_pb.TimeoutNowRequest + (*TimeoutNowResponse)(nil), // 8: raft_pb.TimeoutNowResponse + (*InstallSnapshotRequest)(nil), // 9: raft_pb.InstallSnapshotRequest + (*InstallSnapshotResponse)(nil), // 10: raft_pb.InstallSnapshotResponse + (*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp +} +var file_raft_proto_depIdxs = []int32{ + 1, // 0: raft_pb.RequestVoteRequest.rpc_header:type_name -> raft_pb.RPCHeader + 1, // 1: raft_pb.RequestVoteResponse.rpc_header:type_name -> raft_pb.RPCHeader + 1, // 2: raft_pb.AppendEntriesRequest.rpc_header:type_name -> raft_pb.RPCHeader + 5, // 3: raft_pb.AppendEntriesRequest.entries:type_name -> raft_pb.Log + 0, // 4: raft_pb.Log.type:type_name -> raft_pb.Log.LogType + 11, // 5: raft_pb.Log.appended_at:type_name -> google.protobuf.Timestamp + 1, // 6: raft_pb.AppendEntriesResponse.rpc_header:type_name -> raft_pb.RPCHeader + 1, // 7: raft_pb.TimeoutNowRequest.rpc_header:type_name -> raft_pb.RPCHeader + 1, // 8: raft_pb.TimeoutNowResponse.rpc_header:type_name -> raft_pb.RPCHeader + 1, // 9: raft_pb.InstallSnapshotRequest.rpc_header:type_name -> raft_pb.RPCHeader + 1, // 10: raft_pb.InstallSnapshotResponse.rpc_header:type_name -> raft_pb.RPCHeader + 2, // 11: raft_pb.RaftService.RequestVote:input_type -> raft_pb.RequestVoteRequest + 4, // 12: raft_pb.RaftService.AppendEntries:input_type -> raft_pb.AppendEntriesRequest + 4, // 13: raft_pb.RaftService.AppendEntriesPipeline:input_type -> raft_pb.AppendEntriesRequest + 7, // 14: raft_pb.RaftService.TimeoutNow:input_type -> raft_pb.TimeoutNowRequest + 9, // 15: raft_pb.RaftService.InstallSnapshot:input_type -> raft_pb.InstallSnapshotRequest + 3, // 16: raft_pb.RaftService.RequestVote:output_type -> raft_pb.RequestVoteResponse + 6, // 17: raft_pb.RaftService.AppendEntries:output_type -> raft_pb.AppendEntriesResponse + 6, // 18: raft_pb.RaftService.AppendEntriesPipeline:output_type -> raft_pb.AppendEntriesResponse + 8, // 19: raft_pb.RaftService.TimeoutNow:output_type -> raft_pb.TimeoutNowResponse + 10, // 20: raft_pb.RaftService.InstallSnapshot:output_type -> raft_pb.InstallSnapshotResponse + 16, // [16:21] is the sub-list for method output_type + 11, // [11:16] is the sub-list for method input_type + 11, // [11:11] is the sub-list for extension type_name + 11, // [11:11] is the sub-list for extension extendee + 0, // [0:11] is the sub-list for field type_name +} + +func init() { file_raft_proto_init() } +func file_raft_proto_init() { + if File_raft_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_raft_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RPCHeader); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RequestVoteRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RequestVoteResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AppendEntriesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Log); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AppendEntriesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TimeoutNowRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TimeoutNowResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*InstallSnapshotRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*InstallSnapshotResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_raft_proto_rawDesc, + NumEnums: 1, + NumMessages: 10, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_raft_proto_goTypes, + DependencyIndexes: file_raft_proto_depIdxs, + EnumInfos: file_raft_proto_enumTypes, + MessageInfos: file_raft_proto_msgTypes, + }.Build() + File_raft_proto = out.File + file_raft_proto_rawDesc = nil + file_raft_proto_goTypes = nil + file_raft_proto_depIdxs = nil +} diff --git a/lib/proto/raft/raft.proto b/lib/proto/raft/raft.proto new file mode 100644 index 00000000..27760b40 --- /dev/null +++ b/lib/proto/raft/raft.proto @@ -0,0 +1,98 @@ +syntax="proto3"; + +package raft_pb; +option go_package = ".;raftPB"; +import "google/protobuf/timestamp.proto"; + +message RPCHeader{ + int64 ProtocolVersion=1; + bytes ID=2; + bytes Addr=3; +} +// RequestVoteRequest are initiated by node in Candidate state, on election. +// Fields matched to condensed summary of Raft consensus algorithm in ISUCA. +message RequestVoteRequest { + RPCHeader rpc_header=1; + uint64 term = 2; + bytes candidate = 3; + uint64 last_log_index = 4; + uint64 last_log_term = 5; + bool leadership_transfer=6; +} + +message RequestVoteResponse { + RPCHeader rpc_header=1; + uint64 term = 2; + bytes peers = 3; // deprecated as of version 2+ + bool granted = 4; +} +message AppendEntriesRequest { + RPCHeader rpc_header = 1; + uint64 term = 2; + bytes leader = 3; + uint64 prev_log_entry = 4; + uint64 prev_log_term = 5; + repeated Log entries = 6; + uint64 leader_commit_index = 7; +} +message Log { + enum LogType { + LOG_COMMAND = 0; + LOG_NOOP = 1; + LOG_ADD_PEER_DEPRECATED = 2; + LOG_REMOVE_PEER_DEPRECATED = 3; + LOG_BARRIER = 4; + LOG_CONFIGURATION = 5; + } + uint64 index = 1; + uint64 term = 2; + LogType type = 3; + bytes data = 4; + bytes extensions = 5; + google.protobuf.Timestamp appended_at = 6; +} + +message AppendEntriesResponse { + RPCHeader rpc_header = 1; + uint64 term = 2; + uint64 last_log = 3; + bool success = 4; + bool no_retry_backoff = 5; +} + +message TimeoutNowRequest { + RPCHeader rpc_header = 1; +} +message TimeoutNowResponse { + RPCHeader rpc_header = 1; +} + +message InstallSnapshotRequest { + RPCHeader rpc_header = 1; + + int64 snapshot_version = 2; + uint64 term = 3; + bytes leader = 4; + uint64 last_log_index = 5; + uint64 last_log_term = 6; + bytes peers = 7; + bytes configuration = 8; + uint64 configuration_index = 9; + int64 size = 10; + bytes data = 11; + +} +message InstallSnapshotResponse { + RPCHeader rpc_header = 1; + uint64 term = 2; + bool success=3; +} + +service RaftService { + rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse); + rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {} + rpc AppendEntriesPipeline(stream AppendEntriesRequest) returns (stream AppendEntriesResponse) {} + rpc TimeoutNow(TimeoutNowRequest) returns (TimeoutNowResponse) {} + rpc InstallSnapshot(InstallSnapshotRequest) returns (InstallSnapshotResponse) {} +} + diff --git a/lib/proto/raft/raft_grpc.pb.go b/lib/proto/raft/raft_grpc.pb.go new file mode 100644 index 00000000..5b681d73 --- /dev/null +++ b/lib/proto/raft/raft_grpc.pb.go @@ -0,0 +1,278 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package raftPB + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// RaftServiceClient is the client API for RaftService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type RaftServiceClient interface { + RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) + AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) + AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftService_AppendEntriesPipelineClient, error) + TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) + InstallSnapshot(ctx context.Context, in *InstallSnapshotRequest, opts ...grpc.CallOption) (*InstallSnapshotResponse, error) +} + +type raftServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewRaftServiceClient(cc grpc.ClientConnInterface) RaftServiceClient { + return &raftServiceClient{cc} +} + +func (c *raftServiceClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) { + out := new(RequestVoteResponse) + err := c.cc.Invoke(ctx, "/raft_pb.RaftService/RequestVote", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *raftServiceClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) { + out := new(AppendEntriesResponse) + err := c.cc.Invoke(ctx, "/raft_pb.RaftService/AppendEntries", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *raftServiceClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftService_AppendEntriesPipelineClient, error) { + stream, err := c.cc.NewStream(ctx, &RaftService_ServiceDesc.Streams[0], "/raft_pb.RaftService/AppendEntriesPipeline", opts...) + if err != nil { + return nil, err + } + x := &raftServiceAppendEntriesPipelineClient{stream} + return x, nil +} + +type RaftService_AppendEntriesPipelineClient interface { + Send(*AppendEntriesRequest) error + Recv() (*AppendEntriesResponse, error) + grpc.ClientStream +} + +type raftServiceAppendEntriesPipelineClient struct { + grpc.ClientStream +} + +func (x *raftServiceAppendEntriesPipelineClient) Send(m *AppendEntriesRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *raftServiceAppendEntriesPipelineClient) Recv() (*AppendEntriesResponse, error) { + m := new(AppendEntriesResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *raftServiceClient) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) { + out := new(TimeoutNowResponse) + err := c.cc.Invoke(ctx, "/raft_pb.RaftService/TimeoutNow", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *raftServiceClient) InstallSnapshot(ctx context.Context, in *InstallSnapshotRequest, opts ...grpc.CallOption) (*InstallSnapshotResponse, error) { + out := new(InstallSnapshotResponse) + err := c.cc.Invoke(ctx, "/raft_pb.RaftService/InstallSnapshot", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RaftServiceServer is the server API for RaftService service. +// All implementations must embed UnimplementedRaftServiceServer +// for forward compatibility +type RaftServiceServer interface { + RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) + AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) + AppendEntriesPipeline(RaftService_AppendEntriesPipelineServer) error + TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) + InstallSnapshot(context.Context, *InstallSnapshotRequest) (*InstallSnapshotResponse, error) + mustEmbedUnimplementedRaftServiceServer() +} + +// UnimplementedRaftServiceServer must be embedded to have forward compatible implementations. +type UnimplementedRaftServiceServer struct { +} + +func (UnimplementedRaftServiceServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RequestVote not implemented") +} +func (UnimplementedRaftServiceServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method AppendEntries not implemented") +} +func (UnimplementedRaftServiceServer) AppendEntriesPipeline(RaftService_AppendEntriesPipelineServer) error { + return status.Errorf(codes.Unimplemented, "method AppendEntriesPipeline not implemented") +} +func (UnimplementedRaftServiceServer) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TimeoutNow not implemented") +} +func (UnimplementedRaftServiceServer) InstallSnapshot(context.Context, *InstallSnapshotRequest) (*InstallSnapshotResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method InstallSnapshot not implemented") +} +func (UnimplementedRaftServiceServer) mustEmbedUnimplementedRaftServiceServer() {} + +// UnsafeRaftServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to RaftServiceServer will +// result in compilation errors. +type UnsafeRaftServiceServer interface { + mustEmbedUnimplementedRaftServiceServer() +} + +func RegisterRaftServiceServer(s grpc.ServiceRegistrar, srv RaftServiceServer) { + s.RegisterService(&RaftService_ServiceDesc, srv) +} + +func _RaftService_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RequestVoteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RaftServiceServer).RequestVote(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/raft_pb.RaftService/RequestVote", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RaftServiceServer).RequestVote(ctx, req.(*RequestVoteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _RaftService_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AppendEntriesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RaftServiceServer).AppendEntries(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/raft_pb.RaftService/AppendEntries", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RaftServiceServer).AppendEntries(ctx, req.(*AppendEntriesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _RaftService_AppendEntriesPipeline_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(RaftServiceServer).AppendEntriesPipeline(&raftServiceAppendEntriesPipelineServer{stream}) +} + +type RaftService_AppendEntriesPipelineServer interface { + Send(*AppendEntriesResponse) error + Recv() (*AppendEntriesRequest, error) + grpc.ServerStream +} + +type raftServiceAppendEntriesPipelineServer struct { + grpc.ServerStream +} + +func (x *raftServiceAppendEntriesPipelineServer) Send(m *AppendEntriesResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *raftServiceAppendEntriesPipelineServer) Recv() (*AppendEntriesRequest, error) { + m := new(AppendEntriesRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _RaftService_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TimeoutNowRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RaftServiceServer).TimeoutNow(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/raft_pb.RaftService/TimeoutNow", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RaftServiceServer).TimeoutNow(ctx, req.(*TimeoutNowRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _RaftService_InstallSnapshot_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(InstallSnapshotRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RaftServiceServer).InstallSnapshot(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/raft_pb.RaftService/InstallSnapshot", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RaftServiceServer).InstallSnapshot(ctx, req.(*InstallSnapshotRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// RaftService_ServiceDesc is the grpc.ServiceDesc for RaftService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var RaftService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "raft_pb.RaftService", + HandlerType: (*RaftServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RequestVote", + Handler: _RaftService_RequestVote_Handler, + }, + { + MethodName: "AppendEntries", + Handler: _RaftService_AppendEntries_Handler, + }, + { + MethodName: "TimeoutNow", + Handler: _RaftService_TimeoutNow_Handler, + }, + { + MethodName: "InstallSnapshot", + Handler: _RaftService_InstallSnapshot_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "AppendEntriesPipeline", + Handler: _RaftService_AppendEntriesPipeline_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "raft.proto", +}