Skip to content

Commit efa9485

Browse files
Added ConfigSubmitter and SubmitConfig RPC. (#271)
Tested against ad-hoc stub consetner. Added context and cancelFunc Signed-off-by: Dor.Katzelnick <[email protected]>
1 parent 22b8e62 commit efa9485

File tree

7 files changed

+538
-25
lines changed

7 files changed

+538
-25
lines changed

node/batcher/stub_consenter_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package batcher_test
88

99
import (
10+
"context"
1011
"fmt"
1112
"io"
1213
"sync"
@@ -81,6 +82,10 @@ func (sc *stubConsenter) NotifyEvent(stream protos.Consensus_NotifyEventServer)
8182
}
8283
}
8384

85+
func (sc *stubConsenter) SubmitConfig(ctx context.Context, request *protos.Request) (*protos.SubmitResponse, error) {
86+
return nil, fmt.Errorf("not implemented")
87+
}
88+
8489
func (sc *stubConsenter) Stop() {
8590
// Stop() of stub consenter does nothing
8691
// use NetStop() to stop the stub consenter network

node/consensus/consensus.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package consensus
88

99
import (
1010
"bytes"
11+
"context"
1112
"encoding/asn1"
1213
"encoding/base64"
1314
"encoding/hex"
@@ -139,6 +140,12 @@ func (c *Consensus) NotifyEvent(stream protos.Consensus_NotifyEventServer) error
139140
}
140141
}
141142

143+
// SubmitConfig is used to submit a config request from the router in the consenter's party.
144+
// TODO - add certificate pinning of the router, and forward the request.
145+
func (sc *Consensus) SubmitConfig(ctx context.Context, request *protos.Request) (*protos.SubmitResponse, error) {
146+
return nil, fmt.Errorf("SubmitConfig not implemented")
147+
}
148+
142149
func (c *Consensus) SubmitRequest(req []byte) error {
143150
_, ce, err := c.verifyCE(req)
144151
if err != nil {

node/protos/comm/communication.pb.go

Lines changed: 67 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/protos/comm/communication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ message Event {
3939

4040
service Consensus {
4141
rpc NotifyEvent(stream Event) returns (stream EventResponse);
42+
rpc SubmitConfig(Request) returns (SubmitResponse);
4243
}
4344

4445
message Ack {

node/router/config_submitter.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package router
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"time"
13+
14+
"github.com/hyperledger/fabric-x-orderer/common/types"
15+
"github.com/hyperledger/fabric-x-orderer/node/comm"
16+
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
17+
"google.golang.org/grpc"
18+
)
19+
20+
type ConfigurationSubmitter interface {
21+
Start()
22+
Stop()
23+
// Update() // TODO implement a thread-safe update method for config submitter
24+
Forward(tr *TrackedRequest) error
25+
}
26+
27+
type configSubmitter struct {
28+
consensusEndpoint string
29+
consensusRootCAs [][]byte
30+
tlsCert []byte
31+
tlsKey []byte
32+
logger types.Logger
33+
configRequestsChannel chan *TrackedRequest
34+
ctx context.Context
35+
cancelFunc func()
36+
}
37+
38+
func NewConfigSubmitter(consensusEndpoint string, consensusRootCAs [][]byte, tlsCert []byte, tlsKey []byte, logger types.Logger) *configSubmitter {
39+
cs := &configSubmitter{
40+
consensusEndpoint: consensusEndpoint,
41+
consensusRootCAs: consensusRootCAs,
42+
tlsCert: tlsCert,
43+
tlsKey: tlsKey,
44+
logger: logger,
45+
configRequestsChannel: make(chan *TrackedRequest, 100),
46+
}
47+
return cs
48+
}
49+
50+
func (cs *configSubmitter) Start() {
51+
cs.logger.Infof("config submitter is starting")
52+
cs.ctx, cs.cancelFunc = context.WithCancel(context.Background())
53+
go cs.readConfigRequests()
54+
}
55+
56+
func (cs *configSubmitter) Stop() {
57+
cs.logger.Infof("config submitter is stopping")
58+
cs.cancelFunc()
59+
}
60+
61+
func (cs *configSubmitter) readConfigRequests() {
62+
cs.logger.Infof("config submitter start listening for requests")
63+
for {
64+
select {
65+
case <-cs.ctx.Done():
66+
cs.logger.Infof("context is done, stop listening on channel for config requests")
67+
return
68+
case tr, ok := <-cs.configRequestsChannel:
69+
if !ok {
70+
cs.logger.Infof("config requests channel was closed, stop listening for requests")
71+
return
72+
}
73+
err := cs.forwardRequest(tr)
74+
if err != nil {
75+
cs.logger.Errorf("error forwarding config request to consenter: %v", err)
76+
}
77+
}
78+
}
79+
}
80+
81+
func (cs *configSubmitter) Forward(tr *TrackedRequest) {
82+
cs.configRequestsChannel <- tr
83+
}
84+
85+
func (cs *configSubmitter) forwardRequest(tr *TrackedRequest) error {
86+
if tr == nil {
87+
return fmt.Errorf("received nil tracked request")
88+
}
89+
90+
feedback := Response{}
91+
var err error
92+
93+
// TODO - propose a config update (with signing), validate with config-tx-validator, second stage verification (size and signature)
94+
// TODO - add verifier and bundle to the config-submitter.
95+
configRequest, err := proposeConfigUpdate(tr.request)
96+
97+
if err != nil {
98+
feedback.err = fmt.Errorf("error in verification and proposing update: %s", err)
99+
} else {
100+
var resp *protos.SubmitResponse
101+
resp, err = cs.submitConfigRequestToConsensus(configRequest)
102+
feedback.SubmitResponse = resp
103+
if err != nil {
104+
feedback.err = fmt.Errorf("error forwarding config request to consenter: %v", err)
105+
}
106+
}
107+
108+
tr.responses <- feedback
109+
return err
110+
}
111+
112+
func (cs *configSubmitter) submitConfigRequestToConsensus(req *protos.Request) (*protos.SubmitResponse, error) {
113+
conn, err := cs.connectToConsenter()
114+
if err != nil {
115+
return nil, err
116+
}
117+
defer conn.Close()
118+
119+
cl := protos.NewConsensusClient(conn)
120+
121+
resp, err := cl.SubmitConfig(cs.ctx, req)
122+
123+
return resp, err
124+
}
125+
126+
func (cs *configSubmitter) connectToConsenter() (*grpc.ClientConn, error) {
127+
conn, err := cs.tryToConnect()
128+
if err == nil {
129+
return conn, err
130+
}
131+
132+
// repeatedly try to connect, with backoff
133+
interval := minRetryInterval
134+
numOfRetries := 1
135+
for {
136+
select {
137+
case <-cs.ctx.Done():
138+
return nil, fmt.Errorf("reconnection to consensus %s aborted, because context is done and configSubmitter stopped", cs.consensusEndpoint)
139+
case <-time.After(interval):
140+
cs.logger.Debugf("Retry attempt #%d", numOfRetries)
141+
numOfRetries++
142+
conn, err := cs.tryToConnect()
143+
if err != nil {
144+
interval = min(interval*2, maxRetryInterval)
145+
cs.logger.Errorf("Reconnection to consensus failed: %v, trying again in: %s", err, interval)
146+
continue
147+
} else {
148+
cs.logger.Debugf("Reconnection to consensus %s succeeded", cs.consensusEndpoint)
149+
return conn, nil
150+
}
151+
}
152+
}
153+
}
154+
155+
func (cs *configSubmitter) tryToConnect() (*grpc.ClientConn, error) {
156+
// TODO - make it thread-safe, when implementing the update method
157+
158+
cc := comm.ClientConfig{
159+
AsyncConnect: false,
160+
KaOpts: comm.KeepaliveOptions{
161+
ClientInterval: 30 * time.Second,
162+
ClientTimeout: 30 * time.Second,
163+
},
164+
SecOpts: comm.SecureOptions{
165+
UseTLS: true,
166+
ServerRootCAs: cs.consensusRootCAs,
167+
Key: cs.tlsKey,
168+
Certificate: cs.tlsCert,
169+
RequireClientCert: true,
170+
},
171+
DialTimeout: time.Second * 20,
172+
}
173+
174+
conn, err := cc.Dial(cs.consensusEndpoint)
175+
if err != nil {
176+
return nil, err
177+
}
178+
return conn, nil
179+
}
180+
181+
// TODO - implement.
182+
func proposeConfigUpdate(request *protos.Request) (*protos.Request, error) {
183+
return request, nil
184+
}

0 commit comments

Comments
 (0)