Skip to content

Commit 5bebcab

Browse files
committed
fixed the assembler_test issue
Signed-off-by: Genady Gurevich <[email protected]>
1 parent f0085d1 commit 5bebcab

File tree

11 files changed

+201
-6
lines changed

11 files changed

+201
-6
lines changed

node/assembler/assembler.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package assembler
99
import (
1010
"encoding/hex"
1111
"fmt"
12+
"time"
1213

1314
"github.com/hyperledger/fabric/protoutil"
1415

@@ -32,6 +33,7 @@ type Assembler struct {
3233
prefetcher PrefetcherController
3334
baReplicator delivery.ConsensusBringer
3435
netStopper NetStopper
36+
metrics *AssemblerMetrics
3537
}
3638

3739
func (a *Assembler) Broadcast(server orderer.AtomicBroadcast_BroadcastServer) error {
@@ -43,8 +45,15 @@ func (a *Assembler) Deliver(server orderer.AtomicBroadcast_DeliverServer) error
4345
}
4446

4547
func (a *Assembler) GetTxCount() uint64 {
46-
// TODO do this in a cleaner fashion
47-
return a.assembler.Ledger.(*node_ledger.AssemblerLedger).GetTxCount()
48+
return a.assembler.Ledger.GetTxCount()
49+
}
50+
51+
func (a *Assembler) GetBlocksCount() uint64 {
52+
return a.assembler.Ledger.GetBlocksCount()
53+
}
54+
55+
func (a *Assembler) GetBlocksSize() uint64 {
56+
return a.assembler.Ledger.GetBlocksSize()
4857
}
4958

5059
func (a *Assembler) Stop() {
@@ -53,6 +62,7 @@ func (a *Assembler) Stop() {
5362
a.assembler.Index.Stop()
5463
a.baReplicator.Stop()
5564
a.assembler.WaitTermination()
65+
a.metrics.Stop()
5666
a.assembler.Ledger.Close()
5767
}
5868

@@ -127,6 +137,9 @@ func NewDefaultAssembler(
127137

128138
assembler.assembler.Run()
129139

140+
assembler.metrics = NewAssemblerMetrics(assembler, al, index, shardIds, partyIds, logger, 10*time.Second)
141+
assembler.metrics.Start()
142+
130143
return assembler
131144
}
132145

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package assembler
8+
9+
import (
10+
"fmt"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
arma_types "github.com/hyperledger/fabric-x-orderer/common/types"
16+
node_ledger "github.com/hyperledger/fabric-x-orderer/node/ledger"
17+
)
18+
19+
type AssemblerMetrics struct {
20+
assembler *Assembler
21+
al node_ledger.AssemblerLedgerReaderWriter
22+
index PrefetchIndexer
23+
shardIds []arma_types.ShardID
24+
partyIds []arma_types.PartyID
25+
logger arma_types.Logger
26+
interval time.Duration
27+
stopChan chan struct{}
28+
stopOnce sync.Once
29+
}
30+
31+
func NewAssemblerMetrics(assembler *Assembler, al node_ledger.AssemblerLedgerReaderWriter,
32+
index PrefetchIndexer, shardIds []arma_types.ShardID, partyIds []arma_types.PartyID, logger arma_types.Logger, interval time.Duration,
33+
) *AssemblerMetrics {
34+
return &AssemblerMetrics{
35+
assembler: assembler,
36+
al: al,
37+
index: index,
38+
shardIds: shardIds,
39+
partyIds: partyIds,
40+
interval: interval,
41+
logger: logger,
42+
stopChan: make(chan struct{}),
43+
}
44+
}
45+
46+
func (m *AssemblerMetrics) Start() {
47+
if m.interval > 0 {
48+
go m.trackMetrics()
49+
}
50+
}
51+
52+
func (m *AssemblerMetrics) Stop() {
53+
m.stopOnce.Do(func() {
54+
close(m.stopChan)
55+
txCommitted := m.assembler.GetTxCount()
56+
blocksCommitted := m.assembler.GetBlocksCount()
57+
blocksSizeCommitted := m.assembler.GetBlocksSize()
58+
59+
totalPbSz := 0
60+
var sb strings.Builder
61+
62+
for _, shardId := range m.shardIds {
63+
for _, partyId := range m.partyIds {
64+
pbsz := m.index.PrefetchBufferSize(ShardPrimary{Shard: shardId, Primary: partyId})
65+
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", shardId, partyId, pbsz))
66+
totalPbSz += pbsz
67+
}
68+
}
69+
70+
m.logger.Infof("ASSEMBLER_METRICS ledger height: %d, total committed transactions: %d, total blocks %d bytes, total size:%d, prefetch buffer size: %d(%s) bytes",
71+
m.al.LedgerReader().Height(), txCommitted, blocksCommitted, blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
72+
})
73+
}
74+
75+
func (m *AssemblerMetrics) trackMetrics() {
76+
lastTxCommitted, lastBlocksCommitted := uint64(0), uint64(0)
77+
sec := m.interval.Seconds()
78+
t := time.NewTicker(m.interval)
79+
defer t.Stop()
80+
81+
for {
82+
select {
83+
case <-t.C:
84+
totalPbSz := 0
85+
var sb strings.Builder
86+
87+
for _, shardId := range m.shardIds {
88+
for _, partyId := range m.partyIds {
89+
pbsz := m.index.PrefetchBufferSize(ShardPrimary{Shard: shardId, Primary: partyId})
90+
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", shardId, partyId, pbsz))
91+
totalPbSz += pbsz
92+
}
93+
}
94+
95+
txCommitted := m.assembler.GetTxCount()
96+
blocksCommitted := m.assembler.GetBlocksCount()
97+
blocksSizeCommitted := m.assembler.GetBlocksSize()
98+
99+
m.logger.Infof("ASSEMBLER_METRICS ledger height: %d, total committed transactions: %d, new transactions in the last %.2f seconds: %d, total blocks %d bytes, new blocks in the last %.2f seconds: %d, total size:%d, prefetch buffer size: %d(%s) bytes",
100+
m.al.LedgerReader().Height(), txCommitted, sec, (txCommitted - lastTxCommitted), blocksCommitted,
101+
sec, (blocksCommitted - lastBlocksCommitted), blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
102+
103+
lastTxCommitted, lastBlocksCommitted = txCommitted, blocksCommitted
104+
case <-m.stopChan:
105+
return
106+
}
107+
}
108+
}

node/assembler/assembler_role.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type AssemblerIndex interface {
2626
}
2727

2828
type AssemblerLedgerWriter interface {
29+
GetTxCount() uint64
30+
GetBlocksCount() uint64
31+
GetBlocksSize() uint64
2932
Append(batch types.Batch, orderingInfo types.OrderingInfo)
3033
Close()
3134
}

node/assembler/assembler_role_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ func (n naiveAssemblerLedger) Close() {
7878
close(n)
7979
}
8080

81+
// GetBlocksCount implements the assembler.AssemblerLedgerWriter interface.
82+
func (n naiveAssemblerLedger) GetBlocksCount() uint64 {
83+
// For testing, return 0 or a mock value as needed.
84+
return 0
85+
}
86+
87+
// GetTxCount implements the assembler.AssemblerLedgerWriter interface.
88+
func (n naiveAssemblerLedger) GetTxCount() uint64 {
89+
// For testing, return 0 or a mock value as needed.
90+
return 0
91+
}
92+
93+
// GetBlocksSize implements the assembler.AssemblerLedgerWriter interface.
94+
func (n naiveAssemblerLedger) GetBlocksSize() uint64 {
95+
// For testing, return 0 or a mock value as needed.
96+
return 0
97+
}
98+
8199
func TestAssembler(t *testing.T) {
82100
shardCount := 4
83101
batchNum := 20

node/assembler/batch_cache.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
77
package assembler
88

99
import (
10+
"sync/atomic"
11+
1012
"github.com/hyperledger/fabric-x-orderer/common/types"
1113
)
1214

1315
type BatchCache struct {
1416
tag string
1517
shardBatchMapper *BatchMapper[types.BatchID, types.Batch]
1618
partition ShardPrimary
17-
sizeBytes int
19+
sizeBytes uint64
1820
}
1921

2022
//go:generate counterfeiter -o ./mocks/batch_cache_factory.go . BatchCacheFactory
@@ -47,7 +49,7 @@ func (bc *BatchCache) Pop(batchId types.BatchID) (types.Batch, error) {
4749
if err != nil {
4850
return nil, err
4951
}
50-
bc.sizeBytes -= batchSizeBytes(batch)
52+
atomic.AddUint64(&bc.sizeBytes, ^(uint64(batchSizeBytes(batch)) - 1))
5153
return batch, nil
5254
}
5355

@@ -57,7 +59,7 @@ func (bc *BatchCache) Put(batch types.Batch) error {
5759
return ErrBatchAlreadyExists
5860
}
5961
if inserted {
60-
bc.sizeBytes += batchSizeBytes(batch)
62+
atomic.AddUint64(&bc.sizeBytes, uint64(batchSizeBytes(batch)))
6163
}
6264
return nil
6365
}
@@ -71,5 +73,5 @@ func (bc *BatchCache) Get(batchId types.BatchID) (types.Batch, error) {
7173
}
7274

7375
func (bc *BatchCache) SizeBytes() int {
74-
return bc.sizeBytes
76+
return int(atomic.LoadUint64(&bc.sizeBytes))
7577
}

node/assembler/mocks/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/mocks/prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type PartitionPrefetchIndexer interface {
5656
Put(batch types.Batch) error
5757
PutForce(batch types.Batch) error
5858
Stop()
59+
PrefetchBufferSize() int
5960
}
6061

6162
//go:generate counterfeiter -o ./mocks/partition_prefetch_index_factory.go . PartitionPrefetchIndexerFactory
@@ -192,6 +193,10 @@ func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defa
192193
return pi
193194
}
194195

196+
func (pi *PartitionPrefetchIndex) PrefetchBufferSize() int {
197+
return pi.cache.SizeBytes() + pi.forcedPutCache.SizeBytes()
198+
}
199+
195200
func (pi *PartitionPrefetchIndex) getName() string {
196201
return fmt.Sprintf("partition <Sh: %d, Pr: %d>", pi.partition.Shard, pi.partition.Primary)
197202
}

node/assembler/prefetch_index.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type PrefetchIndexer interface {
1818
Put(batch types.Batch) error
1919
PutForce(batch types.Batch) error
2020
Requests() <-chan types.BatchID
21+
PrefetchBufferSize(shardPrimary ShardPrimary) int
2122
Stop()
2223
}
2324

@@ -106,6 +107,11 @@ func NewPrefetchIndex(
106107
return pi
107108
}
108109

110+
func (pi *PrefetchIndex) PrefetchBufferSize(shardPrimary ShardPrimary) int {
111+
partitionIndex := pi.partitionToIndex[shardPrimary]
112+
return partitionIndex.PrefetchBufferSize()
113+
}
114+
109115
func (pi *PrefetchIndex) PopOrWait(batchId types.BatchID) (types.Batch, error) {
110116
t1 := time.Now()
111117
defer func() {

node/ledger/assembler_ledger.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type (
3131
//go:generate counterfeiter -o ./mocks/assembler_ledger.go . AssemblerLedgerReaderWriter
3232
type AssemblerLedgerReaderWriter interface {
3333
GetTxCount() uint64
34+
GetBlocksCount() uint64
35+
GetBlocksSize() uint64
3436
Append(batch types.Batch, orderingInfo types.OrderingInfo)
3537
AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum)
3638
LastOrderingInfo() (*state.OrderingInformation, error)
@@ -54,6 +56,8 @@ type AssemblerLedger struct {
5456
Logger types.Logger
5557
Ledger blockledger.ReadWriter
5658
transactionCount uint64
59+
blocksCount uint64
60+
blocksSize uint64
5761
blockStorageProvider *blkstorage.BlockStoreProvider
5862
blockStore *blkstorage.BlockStore
5963
cancellationContext context.Context
@@ -132,6 +136,20 @@ func (l *AssemblerLedger) GetTxCount() uint64 {
132136
return c
133137
}
134138

139+
func (l *AssemblerLedger) GetBlocksCount() uint64 {
140+
c := atomic.LoadUint64(&l.blocksCount)
141+
return c
142+
}
143+
144+
func (l *AssemblerLedger) GetBlocksSize() uint64 {
145+
c := atomic.LoadUint64(&l.blocksSize)
146+
return c
147+
}
148+
149+
func blockSize(block *common.Block) uint64 {
150+
return uint64(len(protoutil.MarshalOrPanic(block)))
151+
}
152+
135153
func (l *AssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingInfo) {
136154
ordInfo := orderingInfo.(*state.OrderingInformation)
137155
t1 := time.Now()
@@ -198,6 +216,8 @@ func (l *AssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingI
198216
}
199217

200218
atomic.AddUint64(&l.transactionCount, uint64(len(batch.Requests())))
219+
atomic.AddUint64(&l.blocksCount, uint64(1))
220+
atomic.AddUint64(&l.blocksSize, blockSize(block))
201221
}
202222

203223
func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum) {
@@ -211,6 +231,8 @@ func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum ty
211231
l.Logger.Panicf("attempting to AppendConfig a block which is not a config block: %d", configBlock.GetHeader().GetNumber())
212232
}
213233

234+
atomic.AddUint64(&l.blocksCount, uint64(1))
235+
atomic.AddUint64(&l.blocksSize, blockSize(configBlock))
214236
transactionCount := atomic.AddUint64(&l.transactionCount, 1) // len(configBlock.GetData().GetData()) = should always be a single TX
215237
batchID := types.NewSimpleBatch(types.ShardIDConsensus, 0, 0, nil, 0)
216238
ordInfo := &state.OrderingInformation{

0 commit comments

Comments
 (0)