Skip to content

Commit 5a1178b

Browse files
committed
changes to review
Signed-off-by: Genady Gurevich <[email protected]>
1 parent 3a395fc commit 5a1178b

File tree

10 files changed

+172
-26
lines changed

10 files changed

+172
-26
lines changed

node/assembler/assembler.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Assembler struct {
3232
prefetcher PrefetcherController
3333
baReplicator delivery.ConsensusBringer
3434
netStopper NetStopper
35+
metrics *AssemblerMetrics
3536
}
3637

3738
func (a *Assembler) Broadcast(server orderer.AtomicBroadcast_BroadcastServer) error {
@@ -43,16 +44,17 @@ func (a *Assembler) Deliver(server orderer.AtomicBroadcast_DeliverServer) error
4344
}
4445

4546
func (a *Assembler) GetTxCount() uint64 {
46-
// TODO do this in a cleaner fashion
47-
return a.assembler.Ledger.(*node_ledger.AssemblerLedger).GetTxCount()
47+
return a.assembler.Ledger.GetTxCount()
4848
}
4949

50+
// Stop stops the assembler and all its components.
5051
func (a *Assembler) Stop() {
5152
a.netStopper.Stop()
5253
a.prefetcher.Stop()
5354
a.assembler.Index.Stop()
5455
a.baReplicator.Stop()
5556
a.assembler.WaitTermination()
57+
a.metrics.Stop()
5658
a.assembler.Ledger.Close()
5759
}
5860

@@ -126,6 +128,9 @@ func NewDefaultAssembler(
126128

127129
assembler.assembler.Run()
128130

131+
assembler.metrics = NewAssemblerMetrics(assembler, al, index, shardIds, partyIds, logger, config.MetricsLogInterval)
132+
assembler.metrics.Start()
133+
129134
return assembler
130135
}
131136

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package assembler
8+
9+
import (
10+
"fmt"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
arma_types "github.com/hyperledger/fabric-x-orderer/common/types"
16+
node_ledger "github.com/hyperledger/fabric-x-orderer/node/ledger"
17+
)
18+
19+
type AssemblerMetrics struct {
20+
assembler *Assembler
21+
index PrefetchIndexer
22+
shardIds []arma_types.ShardID
23+
partyIds []arma_types.PartyID
24+
logger arma_types.Logger
25+
interval time.Duration
26+
stopChan chan struct{}
27+
stopOnce sync.Once
28+
}
29+
30+
func NewAssemblerMetrics(assembler *Assembler, al node_ledger.AssemblerLedgerReaderWriter,
31+
index PrefetchIndexer, shardIds []arma_types.ShardID, partyIds []arma_types.PartyID, logger arma_types.Logger, interval time.Duration,
32+
) *AssemblerMetrics {
33+
return &AssemblerMetrics{
34+
assembler: assembler,
35+
index: index,
36+
shardIds: shardIds,
37+
partyIds: partyIds,
38+
interval: interval,
39+
logger: logger,
40+
stopChan: make(chan struct{}),
41+
}
42+
}
43+
44+
func (m *AssemblerMetrics) Start() {
45+
if m.interval > 0 {
46+
go m.trackMetrics()
47+
}
48+
}
49+
50+
func (m *AssemblerMetrics) Stop() {
51+
m.stopOnce.Do(func() {
52+
close(m.stopChan)
53+
txCommitted := m.assembler.GetTxCount()
54+
blocksCommitted := m.assembler.assembler.Ledger.GetBlocksCount()
55+
blocksSizeCommitted := m.assembler.assembler.Ledger.GetBlocksSize()
56+
57+
totalPbSz := 0
58+
var sb strings.Builder
59+
60+
for _, shardId := range m.shardIds {
61+
for _, partyId := range m.partyIds {
62+
pbsz := m.index.PrefetchBufferSize(ShardPrimary{Shard: shardId, Primary: partyId})
63+
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", shardId, partyId, pbsz))
64+
totalPbSz += pbsz
65+
}
66+
}
67+
68+
m.logger.Infof("ASSEMBLER_METRICS total committed transactions: %d, total blocks: %d, total size: %d, prefetch buffer size: %d(%s) bytes",
69+
txCommitted, blocksCommitted, blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
70+
})
71+
}
72+
73+
func (m *AssemblerMetrics) trackMetrics() {
74+
lastTxCommitted, lastBlocksCommitted := uint64(0), uint64(0)
75+
sec := m.interval.Seconds()
76+
t := time.NewTicker(m.interval)
77+
defer t.Stop()
78+
79+
for {
80+
select {
81+
case <-t.C:
82+
totalPbSz := 0
83+
var sb strings.Builder
84+
85+
for _, shardId := range m.shardIds {
86+
for _, partyId := range m.partyIds {
87+
pbsz := m.index.PrefetchBufferSize(ShardPrimary{Shard: shardId, Primary: partyId})
88+
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", shardId, partyId, pbsz))
89+
totalPbSz += pbsz
90+
}
91+
}
92+
93+
txCommitted := m.assembler.GetTxCount()
94+
blocksCommitted := m.assembler.assembler.Ledger.GetBlocksCount()
95+
blocksSizeCommitted := m.assembler.assembler.Ledger.GetBlocksSize()
96+
97+
m.logger.Infof("ASSEMBLER_METRICS: total committed transactions %d, new transactions in the last %.2f seconds: %d, total blocks: %d, new blocks in the last: %.2f seconds: %d, total size: %d, prefetch buffer size: %d(%s) bytes",
98+
txCommitted, sec, (txCommitted - lastTxCommitted), blocksCommitted,
99+
sec, (blocksCommitted - lastBlocksCommitted), blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
100+
101+
lastTxCommitted, lastBlocksCommitted = txCommitted, blocksCommitted
102+
case <-m.stopChan:
103+
return
104+
}
105+
}
106+
}

node/assembler/assembler_role.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type AssemblerIndex interface {
2424
}
2525

2626
type AssemblerLedgerWriter interface {
27+
GetTxCount() uint64
28+
GetBlocksCount() uint64
29+
GetBlocksSize() uint64
2730
Append(batch types.Batch, orderingInfo types.OrderingInfo)
2831
AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum)
2932
Close()

node/assembler/batch_cache.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
77
package assembler
88

99
import (
10+
"sync/atomic"
11+
1012
"github.com/hyperledger/fabric-x-orderer/common/types"
1113
)
1214

1315
type BatchCache struct {
1416
tag string
1517
shardBatchMapper *BatchMapper[types.BatchID, types.Batch]
1618
partition ShardPrimary
17-
sizeBytes int
19+
sizeBytes uint64
1820
}
1921

2022
//go:generate counterfeiter -o ./mocks/batch_cache_factory.go . BatchCacheFactory
@@ -47,7 +49,7 @@ func (bc *BatchCache) Pop(batchId types.BatchID) (types.Batch, error) {
4749
if err != nil {
4850
return nil, err
4951
}
50-
bc.sizeBytes -= batchSizeBytes(batch)
52+
atomic.AddUint64(&bc.sizeBytes, ^(uint64(batchSizeBytes(batch)) - 1))
5153
return batch, nil
5254
}
5355

@@ -57,7 +59,7 @@ func (bc *BatchCache) Put(batch types.Batch) error {
5759
return ErrBatchAlreadyExists
5860
}
5961
if inserted {
60-
bc.sizeBytes += batchSizeBytes(batch)
62+
atomic.AddUint64(&bc.sizeBytes, uint64(batchSizeBytes(batch)))
6163
}
6264
return nil
6365
}
@@ -71,5 +73,5 @@ func (bc *BatchCache) Get(batchId types.BatchID) (types.Batch, error) {
7173
}
7274

7375
func (bc *BatchCache) SizeBytes() int {
74-
return bc.sizeBytes
76+
return int(atomic.LoadUint64(&bc.sizeBytes))
7577
}

node/assembler/mocks/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/mocks/prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type PartitionPrefetchIndexer interface {
5656
Put(batch types.Batch) error
5757
PutForce(batch types.Batch) error
5858
Stop()
59+
PrefetchBufferSize() int
5960
}
6061

6162
//go:generate counterfeiter -o ./mocks/partition_prefetch_index_factory.go . PartitionPrefetchIndexerFactory
@@ -192,6 +193,10 @@ func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defa
192193
return pi
193194
}
194195

196+
func (pi *PartitionPrefetchIndex) PrefetchBufferSize() int {
197+
return pi.cache.SizeBytes() + pi.forcedPutCache.SizeBytes()
198+
}
199+
195200
func (pi *PartitionPrefetchIndex) getName() string {
196201
return fmt.Sprintf("partition <Sh: %d, Pr: %d>", pi.partition.Shard, pi.partition.Primary)
197202
}

node/assembler/prefetch_index.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type PrefetchIndexer interface {
1818
Put(batch types.Batch) error
1919
PutForce(batch types.Batch) error
2020
Requests() <-chan types.BatchID
21+
PrefetchBufferSize(shardPrimary ShardPrimary) int
2122
Stop()
2223
}
2324

@@ -106,6 +107,11 @@ func NewPrefetchIndex(
106107
return pi
107108
}
108109

110+
func (pi *PrefetchIndex) PrefetchBufferSize(shardPrimary ShardPrimary) int {
111+
partitionIndex := pi.partitionToIndex[shardPrimary]
112+
return partitionIndex.PrefetchBufferSize()
113+
}
114+
109115
func (pi *PrefetchIndex) PopOrWait(batchId types.BatchID) (types.Batch, error) {
110116
t1 := time.Now()
111117
defer func() {

node/ledger/assembler_ledger.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type (
3131
//go:generate counterfeiter -o ./mocks/assembler_ledger.go . AssemblerLedgerReaderWriter
3232
type AssemblerLedgerReaderWriter interface {
3333
GetTxCount() uint64
34+
GetBlocksCount() uint64
35+
GetBlocksSize() uint64
3436
Append(batch types.Batch, orderingInfo types.OrderingInfo)
3537
AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum)
3638
LastOrderingInfo() (*state.OrderingInformation, error)
@@ -54,6 +56,8 @@ type AssemblerLedger struct {
5456
Logger types.Logger
5557
Ledger blockledger.ReadWriter
5658
transactionCount uint64
59+
blocksCount uint64
60+
blocksSize uint64
5761
blockStorageProvider *blkstorage.BlockStoreProvider
5862
blockStore *blkstorage.BlockStore
5963
cancellationContext context.Context
@@ -99,7 +103,7 @@ func NewAssemblerLedger(logger types.Logger, ledgerPath string) (*AssemblerLedge
99103
cancellationContext: ctx,
100104
cancelContextFunc: cancel,
101105
}
102-
go al.trackThroughput()
106+
103107
return al, nil
104108
}
105109

@@ -109,27 +113,20 @@ func (l *AssemblerLedger) Close() {
109113
l.blockStorageProvider.Close()
110114
}
111115

112-
func (l *AssemblerLedger) trackThroughput() {
113-
firstProbe := true
114-
lastTxCount := uint64(0)
115-
for {
116-
txCount := atomic.LoadUint64(&l.transactionCount)
117-
if !firstProbe {
118-
l.Logger.Infof("Tx Count: %d, Commit throughput: %.2f", txCount, float64(txCount-lastTxCount)/10.0)
119-
}
120-
lastTxCount = txCount
121-
firstProbe = false
122-
select {
123-
case <-time.After(time.Second * 10):
124-
case <-l.cancellationContext.Done():
125-
return
126-
}
127-
}
116+
func (l *AssemblerLedger) GetTxCount() uint64 {
117+
return atomic.LoadUint64(&l.transactionCount)
128118
}
129119

130-
func (l *AssemblerLedger) GetTxCount() uint64 {
131-
c := atomic.LoadUint64(&l.transactionCount)
132-
return c
120+
func (l *AssemblerLedger) GetBlocksCount() uint64 {
121+
return atomic.LoadUint64(&l.blocksCount)
122+
}
123+
124+
func (l *AssemblerLedger) GetBlocksSize() uint64 {
125+
return atomic.LoadUint64(&l.blocksSize)
126+
}
127+
128+
func blockSize(block *common.Block) uint64 {
129+
return uint64(len(protoutil.MarshalOrPanic(block)))
133130
}
134131

135132
func (l *AssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingInfo) {
@@ -194,6 +191,8 @@ func (l *AssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingI
194191
}
195192

196193
atomic.AddUint64(&l.transactionCount, uint64(len(batch.Requests())))
194+
atomic.AddUint64(&l.blocksCount, uint64(1))
195+
atomic.AddUint64(&l.blocksSize, blockSize(block))
197196
}
198197

199198
func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum) {
@@ -207,6 +206,8 @@ func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum ty
207206
l.Logger.Panicf("attempting to AppendConfig a block which is not a config block: %d", configBlock.GetHeader().GetNumber())
208207
}
209208

209+
atomic.AddUint64(&l.blocksCount, uint64(1))
210+
atomic.AddUint64(&l.blocksSize, blockSize(configBlock))
210211
transactionCount := atomic.AddUint64(&l.transactionCount, 1) // len(configBlock.GetData().GetData()) = should always be a single TX
211212
batchID := types.NewSimpleBatch(types.ShardIDConsensus, 0, 0, nil, 0)
212213
ordInfo := &state.OrderingInformation{

node/ledger/mocks/assembler_ledger.go

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)