Skip to content

Commit e10d81d

Browse files
authored
[connection] Remove redundant DialConfig (#152)
#### Type of change - Improvement (improvement to code, performance, etc) #### Description - Remove redundant `DialConfig` #### Related issues - resolves #49 Signed-off-by: Liran Funaro <[email protected]>
1 parent 10a6c98 commit e10d81d

File tree

19 files changed

+146
-226
lines changed

19 files changed

+146
-226
lines changed

integration/runner/runtime.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/hyperledger/fabric-x-common/internaltools/configtxgen"
1919
"github.com/stretchr/testify/assert"
2020
"github.com/stretchr/testify/require"
21-
"google.golang.org/grpc"
2221
"google.golang.org/protobuf/types/known/durationpb"
2322

2423
"github.com/hyperledger/fabric-x-committer/api/protoblocktx"
@@ -223,15 +222,15 @@ func NewRuntime(t *testing.T, conf *Config) *CommitterRuntime {
223222

224223
t.Log("Create clients")
225224
c.CoordinatorClient = protocoordinatorservice.NewCoordinatorClient(
226-
clientConnWithTLS(t, s.Endpoints.Coordinator.Server, c.SystemConfig.ClientTLS),
225+
test.NewSecuredConnection(t, s.Endpoints.Coordinator.Server, c.SystemConfig.ClientTLS),
227226
)
228227

229228
c.QueryServiceClient = protoqueryservice.NewQueryServiceClient(
230-
clientConnWithTLS(t, s.Endpoints.Query.Server, c.SystemConfig.ClientTLS),
229+
test.NewSecuredConnection(t, s.Endpoints.Query.Server, c.SystemConfig.ClientTLS),
231230
)
232231

233232
c.notifyClient = protonotify.NewNotifierClient(
234-
clientConnWithTLS(t, s.Endpoints.Sidecar.Server, c.SystemConfig.ClientTLS),
233+
test.NewSecuredConnection(t, s.Endpoints.Sidecar.Server, c.SystemConfig.ClientTLS),
235234
)
236235

237236
c.ordererStream, err = test.NewBroadcastStream(t.Context(), &ordererconn.Config{
@@ -357,14 +356,6 @@ func (c *CommitterRuntime) startBlockDelivery(t *testing.T) {
357356
})
358357
}
359358

360-
// clientConnWithTLS creates a service connection using its given server endpoint and TLS configuration.
361-
func clientConnWithTLS(t *testing.T, e *connection.Endpoint, tlsConfig connection.TLSConfig) *grpc.ClientConn {
362-
t.Helper()
363-
serviceConnection, err := connection.Connect(test.NewSecuredDialConfig(t, e, tlsConfig))
364-
require.NoError(t, err)
365-
return serviceConnection
366-
}
367-
368359
// AddOrUpdateNamespaces adds policies for namespaces. If already exists, the policy will be updated.
369360
func (c *CommitterRuntime) AddOrUpdateNamespaces(t *testing.T, namespaces ...string) {
370361
t.Helper()

loadgen/adapters/coordinator.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,10 @@ func NewCoordinatorAdapter(config *connection.ClientConfig, res *ClientResources
3636

3737
// RunWorkload applies load on the coordinator.
3838
func (c *CoordinatorAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
39-
coordinatorDialConfig, err := connection.NewSingleDialConfig(c.config)
40-
if err != nil {
41-
return errors.Wrapf(err, "failed creating coordinator dial config")
42-
}
4339
// connecting to the coordinator.
44-
conn, connErr := connection.Connect(coordinatorDialConfig)
40+
conn, connErr := connection.NewSingleConnection(c.config)
4541
if connErr != nil {
46-
return errors.Wrapf(err, "failed to connect to coordinator at %s", c.config.Endpoint.Address())
42+
return errors.Wrapf(connErr, "failed to connect to coordinator at %s", c.config.Endpoint.Address())
4743
}
4844
defer connection.CloseConnectionsLog(conn)
4945
client := protocoordinatorservice.NewCoordinatorClient(conn)

loadgen/adapters/loadgen.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,7 @@ func NewLoadGenAdapter(config *connection.ClientConfig, res *ClientResources) *L
4343

4444
// RunWorkload applies load on the SV.
4545
func (c *LoadGenAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
46-
loadgenDialConfig, err := connection.NewSingleDialConfig(c.config)
47-
if err != nil {
48-
return errors.Wrapf(err, "failed creating loadgen dial config")
49-
}
50-
conn, err := connection.Connect(loadgenDialConfig)
46+
conn, err := connection.NewSingleConnection(c.config)
5147
if err != nil {
5248
return errors.Wrapf(err, "failed to connect to %s", c.config.Endpoint)
5349
}

loadgen/adapters/sigverifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (c *SvAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWi
4343
if err != nil {
4444
return err
4545
}
46-
connections, err := connection.OpenConnections(*c.config)
46+
connections, err := connection.NewConnectionPerEndpoint(c.config)
4747
if err != nil {
4848
return err
4949
}

loadgen/adapters/vcservice.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,11 @@ func NewVCAdapter(config *connection.MultiClientConfig, res *ClientResources) *V
3636

3737
// RunWorkload applies load on the VC.
3838
func (c *VcAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
39-
commonDial, dialErr := connection.NewLoadBalancedDialConfig(*c.config)
40-
if dialErr != nil {
41-
return errors.Wrapf(dialErr, "could not create dial config for vcs")
42-
}
43-
commonConn, connErr := connection.Connect(commonDial)
39+
commonConn, connErr := connection.NewLoadBalancedConnection(c.config)
4440
if connErr != nil {
4541
return errors.Wrapf(connErr, "failed to create connection to validator persisters")
4642
}
43+
defer connection.CloseConnectionsLog(commonConn)
4744
commonClient := protovcservice.NewValidationAndCommitServiceClient(commonConn)
4845
_, setupError := commonClient.SetupSystemTablesAndNamespaces(ctx, nil)
4946
if setupError != nil {
@@ -57,7 +54,7 @@ func (c *VcAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWi
5754
} else {
5855
c.nextBlockNum.Store(0)
5956
}
60-
connections, connErr := connection.OpenConnections(*c.config)
57+
connections, connErr := connection.NewConnectionPerEndpoint(c.config)
6158
if connErr != nil {
6259
return connErr
6360
}

service/coordinator/signature_verifier_manager.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,26 +97,20 @@ func (svm *signatureVerifierManager) run(ctx context.Context) error {
9797
return nil
9898
})
9999

100-
dialConfigs, dialErr := connection.NewDialConfigPerEndpoint(c.clientConfig)
101-
if dialErr != nil {
102-
return dialErr
100+
connections, connErr := connection.NewConnectionPerEndpoint(c.clientConfig)
101+
if connErr != nil {
102+
return fmt.Errorf("failed to create connection to signature verifier: %w", connErr)
103103
}
104-
for i, d := range dialConfigs {
105-
conn, err := connection.Connect(d)
106-
if err != nil {
107-
return fmt.Errorf("failed to create connection to signature verifier [%d] at %s: %w",
108-
i, d.Address, err)
109-
}
110-
logger.Infof("connected to signature verifier [%d] at %s", i, d.Address)
104+
defer connection.CloseConnectionsLog(connections...)
105+
for i, conn := range connections {
111106
label := conn.CanonicalTarget()
112107
c.metrics.verifiersConnection.Disconnected(label)
113108

114109
sv := newSignatureVerifier(c, conn)
115110
svm.signVerifier[i] = sv
116-
logger.Debugf("Client [%d] successfully created and connected to sv", i)
111+
logger.Infof("Client [%d] successfully created and connected to sv at %s", i, label)
117112

118113
g.Go(func() error {
119-
defer connection.CloseConnectionsLog(conn)
120114
// error should never occur unless there is a bug or malicious activity. Hence, it is fine to crash for now.
121115
return connection.Sustain(eCtx, func() error {
122116
defer sv.recoverPendingTransactions(txBatchQueue)

service/coordinator/validator_committer_manager.go

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -93,38 +93,31 @@ func (vcm *validatorCommitterManager) run(ctx context.Context) error {
9393
return nil
9494
})
9595

96-
commonDial, dialErr := connection.NewLoadBalancedDialConfig(*c.clientConfig)
97-
if dialErr != nil {
98-
return fmt.Errorf("failed to create connection to validator persisters: %w", dialErr)
99-
}
100-
commonConn, err := connection.Connect(commonDial)
96+
commonConn, err := connection.NewLoadBalancedConnection(c.clientConfig)
10197
if err != nil {
10298
return fmt.Errorf("failed to create connection to validator persisters: %w", err)
10399
}
100+
defer connection.CloseConnectionsLog(commonConn)
104101
vcm.commonClient = protovcservice.NewValidationAndCommitServiceClient(commonConn)
105102
_, setupErr := vcm.commonClient.SetupSystemTablesAndNamespaces(ctx, nil)
106103
if setupErr != nil {
107104
return errors.Wrap(setupErr, "failed to setup system tables and namespaces")
108105
}
109106

110-
dialConfigs, dialErr := connection.NewDialConfigPerEndpoint(c.clientConfig)
111-
if dialErr != nil {
112-
return dialErr
107+
connections, connErr := connection.NewConnectionPerEndpoint(c.clientConfig)
108+
if connErr != nil {
109+
return fmt.Errorf("failed to create connection to validator persister: %w", connErr)
113110
}
114-
for i, d := range dialConfigs {
115-
logger.Debugf("vc manager creates client to vc [%d] listening on %s", i, d.Address)
116-
conn, connErr := connection.Connect(d)
117-
if connErr != nil {
118-
return fmt.Errorf("failed to create connection to validator persister running at %s", d.Address)
119-
}
120-
logger.Infof("validator persister manager connected to validator persister at %s", d.Address)
121-
vc := newValidatorCommitter(conn, c.metrics, c.policyMgr)
111+
defer connection.CloseConnectionsLog(connections...)
112+
for i, conn := range connections {
113+
label := conn.CanonicalTarget()
114+
c.metrics.vcservicesConnection.Disconnected(label)
122115

123-
logger.Debugf("Client [%d] successfully created and connected to vc", i)
116+
vc := newValidatorCommitter(conn, c.metrics, c.policyMgr)
124117
vcm.validatorCommitter[i] = vc
118+
logger.Infof("Client [%d] successfully created and connected to vc at %s", i, label)
125119

126120
g.Go(func() error {
127-
defer connection.CloseConnectionsLog(vc.conn)
128121
return connection.Sustain(eCtx, func() (err error) {
129122
defer vc.recoverPendingTransactions(txBatchQueue)
130123
return vc.sendTransactionsAndForwardStatus(
@@ -198,12 +191,9 @@ func (vcm *validatorCommitterManager) recoverPolicyManagerFromStateDB(ctx contex
198191
}
199192

200193
func newValidatorCommitter(conn *grpc.ClientConn, metrics *perfMetrics, policyMgr *policyManager) *validatorCommitter {
201-
label := conn.CanonicalTarget()
202-
metrics.vcservicesConnection.Disconnected(label)
203-
client := protovcservice.NewValidationAndCommitServiceClient(conn)
204194
return &validatorCommitter{
205195
conn: conn,
206-
client: client,
196+
client: protovcservice.NewValidationAndCommitServiceClient(conn),
207197
metrics: metrics,
208198
policyMgr: policyMgr,
209199
}

service/sidecar/notify_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,7 @@ func TestNotifierStream(t *testing.T) {
288288
protonotify.RegisterNotifierServer(server, env.n)
289289
})
290290
endpoint := &config.Endpoint
291-
conn, err := connection.Connect(test.NewInsecureDialConfig(endpoint))
292-
require.NoError(t, err)
291+
conn := test.NewInsecureConnection(t, endpoint)
293292
client := protonotify.NewNotifierClient(conn)
294293

295294
stream, err := client.OpenNotificationStream(t.Context())

service/sidecar/relay_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
"github.com/hyperledger/fabric-protos-go-apiv2/common"
15-
"github.com/stretchr/testify/assert"
1615
"github.com/stretchr/testify/require"
1716

1817
"github.com/hyperledger/fabric-x-committer/api/protoblocktx"
@@ -52,9 +51,7 @@ func newRelayTestEnv(t *testing.T) *relayTestEnv {
5251
metrics,
5352
)
5453

55-
conn, err := connection.Connect(test.NewInsecureDialConfig(&coordinatorEndpoint))
56-
require.NoError(t, err)
57-
t.Cleanup(func() { assert.NoError(t, conn.Close()) })
54+
conn := test.NewInsecureConnection(t, &coordinatorEndpoint)
5855

5956
logger.Infof("sidecar connected to coordinator at %s", &coordinatorEndpoint)
6057

service/sidecar/sidecar.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,7 @@ func (s *Service) Run(ctx context.Context) error {
110110
}()
111111

112112
logger.Infof("Create coordinator client and connect to %s", s.config.Committer.Endpoint)
113-
committerDialConfig, err := connection.NewSingleDialConfig(s.config.Committer)
114-
if err != nil {
115-
return errors.Wrapf(err, "could not load coordinator dial config")
116-
}
117-
conn, connErr := connection.Connect(committerDialConfig)
113+
conn, connErr := connection.NewSingleConnection(s.config.Committer)
118114
if connErr != nil {
119115
return errors.Wrapf(connErr, "failed to connect to coordinator")
120116
}

0 commit comments

Comments
 (0)