Skip to content

Commit 6807b2e

Browse files
committed
changes to review
Signed-off-by: Genady Gurevich <[email protected]>
1 parent 6cbacac commit 6807b2e

14 files changed

+211
-31
lines changed

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ func (config *Configuration) ExtractAssemblerConfig() *nodeconfig.AssemblerNodeC
346346
Consenter: consenterFromMyParty,
347347
UseTLS: config.LocalConfig.TLSConfig.Enabled,
348348
ClientAuthRequired: config.LocalConfig.TLSConfig.ClientAuthRequired,
349+
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
349350
}
350351
return assemblerConfig
351352
}

config/generate/local_config_gen.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type GeneralConfigParams struct {
5757
shardID types.ShardID
5858
tlsEnabled bool
5959
clientAuthRequired bool
60+
metricsLogInterval time.Duration
6061
}
6162

6263
// CreateArmaLocalConfig creates a config directory that includes the local config yaml files for all nodes for all parties.
@@ -94,9 +95,9 @@ func createNetworkLocalConfig(network Network, cryptoBaseDir string, configBaseD
9495

9596
redundantShardID := types.ShardID(0)
9697
for _, party := range network.Parties {
97-
routerGeneralParams := NewGeneralConfigParams(party.ID, redundantShardID, "router", trimPortFromEndpoint(party.RouterEndpoint), getPortFromEndpoint(party.RouterEndpoint), useTLSRouter, clientAuthRequiredRouter, "info", cryptoBaseDir, configBaseDir)
98-
consensusGeneralParams := NewGeneralConfigParams(party.ID, redundantShardID, "consenter", trimPortFromEndpoint(party.ConsenterEndpoint), getPortFromEndpoint(party.ConsenterEndpoint), true, false, "info", cryptoBaseDir, configBaseDir)
99-
assemblerGeneralParams := NewGeneralConfigParams(party.ID, redundantShardID, "assembler", trimPortFromEndpoint(party.AssemblerEndpoint), getPortFromEndpoint(party.AssemblerEndpoint), useTLSAssembler, clientAuthRequiredAssembler, "info", cryptoBaseDir, configBaseDir)
98+
routerGeneralParams := NewGeneralConfigParams(party.ID, redundantShardID, "router", trimPortFromEndpoint(party.RouterEndpoint), getPortFromEndpoint(party.RouterEndpoint), DefaultMetricsLogInterval, useTLSRouter, clientAuthRequiredRouter, "info", cryptoBaseDir, configBaseDir)
99+
consensusGeneralParams := NewGeneralConfigParams(party.ID, redundantShardID, "consenter", trimPortFromEndpoint(party.ConsenterEndpoint), getPortFromEndpoint(party.ConsenterEndpoint), DefaultMetricsLogInterval, true, false, "info", cryptoBaseDir, configBaseDir)
100+
assemblerGeneralParams := NewGeneralConfigParams(party.ID, redundantShardID, "assembler", trimPortFromEndpoint(party.AssemblerEndpoint), getPortFromEndpoint(party.AssemblerEndpoint), 10*time.Second, useTLSAssembler, clientAuthRequiredAssembler, "info", cryptoBaseDir, configBaseDir)
100101
partyLocalConfig := PartyLocalConfig{
101102
RouterLocalConfig: NewRouterLocalConfig(routerGeneralParams),
102103
BatchersLocalConfig: NewBatchersLocalConfigPerParty(party.ID, party.BatchersEndpoints, cryptoBaseDir, configBaseDir),
@@ -156,7 +157,7 @@ func createPartyConfigFiles(partyLocalConfig PartyLocalConfig, configBaseDir str
156157
return nil
157158
}
158159

159-
func NewGeneralConfigParams(partyID types.PartyID, shardID types.ShardID, role string, listenAddress string, listenPort uint32, tlsEnabled bool, clientAuthRequired bool, logLevel string, cryptoBaseDir string, configBaseDir string) GeneralConfigParams {
160+
func NewGeneralConfigParams(partyID types.PartyID, shardID types.ShardID, role string, listenAddress string, listenPort uint32, metricsLogInterval time.Duration, tlsEnabled bool, clientAuthRequired bool, logLevel string, cryptoBaseDir string, configBaseDir string) GeneralConfigParams {
160161
return GeneralConfigParams{
161162
partyID: partyID,
162163
shardID: shardID,
@@ -168,6 +169,7 @@ func NewGeneralConfigParams(partyID types.PartyID, shardID types.ShardID, role s
168169
listenPort: listenPort,
169170
tlsEnabled: tlsEnabled,
170171
clientAuthRequired: clientAuthRequired,
172+
metricsLogInterval: metricsLogInterval,
171173
}
172174
}
173175

@@ -203,7 +205,7 @@ func NewGeneralConfig(generalConfigParams GeneralConfigParams) *config.GeneralCo
203205
BCCSP: &factory.FactoryOpts{},
204206
LogSpec: generalConfigParams.logLevel,
205207
ClientSignatureVerificationRequired: DefaultClientSignatureVerificationRequired,
206-
MetricsLogInterval: DefaultMetricsLogInterval,
208+
MetricsLogInterval: generalConfigParams.metricsLogInterval,
207209
}
208210

209211
if generalConfigParams.role == "consenter" {
@@ -244,7 +246,7 @@ func createBatcherLocalConfig(batcherGeneralParams GeneralConfigParams) *config.
244246
func NewBatchersLocalConfigPerParty(partyID types.PartyID, batcherEndpoints []string, cryptoBaseDir string, configBaseDir string) []*config.NodeLocalConfig {
245247
var batchers []*config.NodeLocalConfig
246248
for i, batcherEndpoint := range batcherEndpoints {
247-
batcherGeneralParams := NewGeneralConfigParams(partyID, types.ShardID(uint16(i+1)), "batcher", trimPortFromEndpoint(batcherEndpoint), getPortFromEndpoint(batcherEndpoint), true, false, "info", cryptoBaseDir, configBaseDir)
249+
batcherGeneralParams := NewGeneralConfigParams(partyID, types.ShardID(uint16(i+1)), "batcher", trimPortFromEndpoint(batcherEndpoint), getPortFromEndpoint(batcherEndpoint), DefaultMetricsLogInterval, true, false, "info", cryptoBaseDir, configBaseDir)
248250
batcher := createBatcherLocalConfig(batcherGeneralParams)
249251
batchers = append(batchers, batcher)
250252
}

node/assembler/assembler.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Assembler struct {
3232
prefetcher PrefetcherController
3333
baReplicator delivery.ConsensusBringer
3434
netStopper NetStopper
35+
metrics *AssemblerMetrics
3536
}
3637

3738
func (a *Assembler) Broadcast(server orderer.AtomicBroadcast_BroadcastServer) error {
@@ -43,8 +44,15 @@ func (a *Assembler) Deliver(server orderer.AtomicBroadcast_DeliverServer) error
4344
}
4445

4546
func (a *Assembler) GetTxCount() uint64 {
46-
// TODO do this in a cleaner fashion
47-
return a.assembler.Ledger.(*node_ledger.AssemblerLedger).GetTxCount()
47+
return a.assembler.Ledger.GetTxCount()
48+
}
49+
50+
func (a *Assembler) GetBlocksCount() uint64 {
51+
return a.assembler.Ledger.GetBlocksCount()
52+
}
53+
54+
func (a *Assembler) GetBlocksSize() uint64 {
55+
return a.assembler.Ledger.GetBlocksSize()
4856
}
4957

5058
func (a *Assembler) Stop() {
@@ -53,6 +61,7 @@ func (a *Assembler) Stop() {
5361
a.assembler.Index.Stop()
5462
a.baReplicator.Stop()
5563
a.assembler.WaitTermination()
64+
a.metrics.Stop()
5665
a.assembler.Ledger.Close()
5766
}
5867

@@ -127,6 +136,9 @@ func NewDefaultAssembler(
127136

128137
assembler.assembler.Run()
129138

139+
assembler.metrics = NewAssemblerMetrics(assembler, al, index, shardIds, partyIds, logger, config.MetricsLogInterval)
140+
assembler.metrics.Start()
141+
130142
return assembler
131143
}
132144

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package assembler
8+
9+
import (
10+
"fmt"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
arma_types "github.com/hyperledger/fabric-x-orderer/common/types"
16+
node_ledger "github.com/hyperledger/fabric-x-orderer/node/ledger"
17+
)
18+
19+
type AssemblerMetrics struct {
20+
assembler *Assembler
21+
al node_ledger.AssemblerLedgerReaderWriter
22+
index PrefetchIndexer
23+
shardIds []arma_types.ShardID
24+
partyIds []arma_types.PartyID
25+
logger arma_types.Logger
26+
interval time.Duration
27+
stopChan chan struct{}
28+
stopOnce sync.Once
29+
}
30+
31+
func NewAssemblerMetrics(assembler *Assembler, al node_ledger.AssemblerLedgerReaderWriter,
32+
index PrefetchIndexer, shardIds []arma_types.ShardID, partyIds []arma_types.PartyID, logger arma_types.Logger, interval time.Duration,
33+
) *AssemblerMetrics {
34+
return &AssemblerMetrics{
35+
assembler: assembler,
36+
al: al,
37+
index: index,
38+
shardIds: shardIds,
39+
partyIds: partyIds,
40+
interval: interval,
41+
logger: logger,
42+
stopChan: make(chan struct{}),
43+
}
44+
}
45+
46+
func (m *AssemblerMetrics) Start() {
47+
if m.interval > 0 {
48+
go m.trackMetrics()
49+
}
50+
}
51+
52+
func (m *AssemblerMetrics) Stop() {
53+
m.stopOnce.Do(func() {
54+
close(m.stopChan)
55+
txCommitted := m.assembler.GetTxCount()
56+
blocksCommitted := m.assembler.GetBlocksCount()
57+
blocksSizeCommitted := m.assembler.GetBlocksSize()
58+
59+
totalPbSz := 0
60+
var sb strings.Builder
61+
62+
for _, shardId := range m.shardIds {
63+
for _, partyId := range m.partyIds {
64+
pbsz := m.index.PrefetchBufferSize(ShardPrimary{Shard: shardId, Primary: partyId})
65+
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", shardId, partyId, pbsz))
66+
totalPbSz += pbsz
67+
}
68+
}
69+
70+
m.logger.Infof("ASSEMBLER_METRICS ledger height: %d, total committed transactions: %d, total blocks %d bytes, total size:%d, prefetch buffer size: %d(%s) bytes",
71+
m.al.LedgerReader().Height(), txCommitted, blocksCommitted, blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
72+
})
73+
}
74+
75+
func (m *AssemblerMetrics) trackMetrics() {
76+
lastTxCommitted, lastBlocksCommitted := uint64(0), uint64(0)
77+
sec := m.interval.Seconds()
78+
t := time.NewTicker(m.interval)
79+
defer t.Stop()
80+
81+
for {
82+
select {
83+
case <-t.C:
84+
totalPbSz := 0
85+
var sb strings.Builder
86+
87+
for _, shardId := range m.shardIds {
88+
for _, partyId := range m.partyIds {
89+
pbsz := m.index.PrefetchBufferSize(ShardPrimary{Shard: shardId, Primary: partyId})
90+
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", shardId, partyId, pbsz))
91+
totalPbSz += pbsz
92+
}
93+
}
94+
95+
txCommitted := m.assembler.GetTxCount()
96+
blocksCommitted := m.assembler.GetBlocksCount()
97+
blocksSizeCommitted := m.assembler.GetBlocksSize()
98+
99+
m.logger.Infof("ASSEMBLER_METRICS ledger height: %d, total committed transactions: %d, new transactions in the last %.2f seconds: %d, total blocks %d bytes, new blocks in the last %.2f seconds: %d, total size:%d, prefetch buffer size: %d(%s) bytes",
100+
m.al.LedgerReader().Height(), txCommitted, sec, (txCommitted - lastTxCommitted), blocksCommitted,
101+
sec, (blocksCommitted - lastBlocksCommitted), blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
102+
103+
lastTxCommitted, lastBlocksCommitted = txCommitted, blocksCommitted
104+
case <-m.stopChan:
105+
return
106+
}
107+
}
108+
}

node/assembler/assembler_role.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type AssemblerIndex interface {
2626
}
2727

2828
type AssemblerLedgerWriter interface {
29+
GetTxCount() uint64
30+
GetBlocksCount() uint64
31+
GetBlocksSize() uint64
2932
Append(batch types.Batch, orderingInfo types.OrderingInfo)
3033
Close()
3134
}

node/assembler/assembler_role_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ func (n naiveAssemblerLedger) Close() {
7878
close(n)
7979
}
8080

81+
// GetBlocksCount implements the assembler.AssemblerLedgerWriter interface.
82+
func (n naiveAssemblerLedger) GetBlocksCount() uint64 {
83+
// For testing, return 0 or a mock value as needed.
84+
return 0
85+
}
86+
87+
// GetTxCount implements the assembler.AssemblerLedgerWriter interface.
88+
func (n naiveAssemblerLedger) GetTxCount() uint64 {
89+
// For testing, return 0 or a mock value as needed.
90+
return 0
91+
}
92+
93+
// GetBlocksSize implements the assembler.AssemblerLedgerWriter interface.
94+
func (n naiveAssemblerLedger) GetBlocksSize() uint64 {
95+
// For testing, return 0 or a mock value as needed.
96+
return 0
97+
}
98+
8199
func TestAssembler(t *testing.T) {
82100
shardCount := 4
83101
batchNum := 20

node/assembler/batch_cache.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
77
package assembler
88

99
import (
10+
"sync/atomic"
11+
1012
"github.com/hyperledger/fabric-x-orderer/common/types"
1113
)
1214

1315
type BatchCache struct {
1416
tag string
1517
shardBatchMapper *BatchMapper[types.BatchID, types.Batch]
1618
partition ShardPrimary
17-
sizeBytes int
19+
sizeBytes uint64
1820
}
1921

2022
//go:generate counterfeiter -o ./mocks/batch_cache_factory.go . BatchCacheFactory
@@ -47,7 +49,7 @@ func (bc *BatchCache) Pop(batchId types.BatchID) (types.Batch, error) {
4749
if err != nil {
4850
return nil, err
4951
}
50-
bc.sizeBytes -= batchSizeBytes(batch)
52+
atomic.AddUint64(&bc.sizeBytes, ^(uint64(batchSizeBytes(batch)) - 1))
5153
return batch, nil
5254
}
5355

@@ -57,7 +59,7 @@ func (bc *BatchCache) Put(batch types.Batch) error {
5759
return ErrBatchAlreadyExists
5860
}
5961
if inserted {
60-
bc.sizeBytes += batchSizeBytes(batch)
62+
atomic.AddUint64(&bc.sizeBytes, uint64(batchSizeBytes(batch)))
6163
}
6264
return nil
6365
}
@@ -71,5 +73,5 @@ func (bc *BatchCache) Get(batchId types.BatchID) (types.Batch, error) {
7173
}
7274

7375
func (bc *BatchCache) SizeBytes() int {
74-
return bc.sizeBytes
76+
return int(atomic.LoadUint64(&bc.sizeBytes))
7577
}

node/assembler/mocks/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/mocks/prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type PartitionPrefetchIndexer interface {
5656
Put(batch types.Batch) error
5757
PutForce(batch types.Batch) error
5858
Stop()
59+
PrefetchBufferSize() int
5960
}
6061

6162
//go:generate counterfeiter -o ./mocks/partition_prefetch_index_factory.go . PartitionPrefetchIndexerFactory
@@ -192,6 +193,10 @@ func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defa
192193
return pi
193194
}
194195

196+
func (pi *PartitionPrefetchIndex) PrefetchBufferSize() int {
197+
return pi.cache.SizeBytes() + pi.forcedPutCache.SizeBytes()
198+
}
199+
195200
func (pi *PartitionPrefetchIndex) getName() string {
196201
return fmt.Sprintf("partition <Sh: %d, Pr: %d>", pi.partition.Shard, pi.partition.Primary)
197202
}

0 commit comments

Comments
 (0)