Skip to content

Commit 5065ea1

Browse files
committed
fixed a linter issue
Signed-off-by: Genady Gurevich <[email protected]>
1 parent efa9485 commit 5065ea1

File tree

7 files changed

+113
-10
lines changed

7 files changed

+113
-10
lines changed

node/assembler/assembler.go

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ package assembler
99
import (
1010
"encoding/hex"
1111
"fmt"
12+
"strings"
13+
"time"
1214

1315
"github.com/hyperledger/fabric/protoutil"
1416

@@ -26,12 +28,14 @@ type NetStopper interface {
2628
}
2729

2830
type Assembler struct {
29-
assembler AssemblerRole
30-
logger types.Logger
31-
ds delivery.DeliverService
32-
prefetcher PrefetcherController
33-
baReplicator delivery.ConsensusBringer
34-
netStopper NetStopper
31+
assembler AssemblerRole
32+
logger types.Logger
33+
ds delivery.DeliverService
34+
prefetcher PrefetcherController
35+
baReplicator delivery.ConsensusBringer
36+
netStopper NetStopper
37+
monitorStopper chan struct{}
38+
monitoringInterval time.Duration
3539
}
3640

3741
func (a *Assembler) Broadcast(server orderer.AtomicBroadcast_BroadcastServer) error {
@@ -47,13 +51,24 @@ func (a *Assembler) GetTxCount() uint64 {
4751
return a.assembler.Ledger.(*node_ledger.AssemblerLedger).GetTxCount()
4852
}
4953

54+
func (a *Assembler) GetBlocksCount() uint64 {
55+
// TODO do this in a cleaner fashion
56+
return a.assembler.Ledger.(*node_ledger.AssemblerLedger).GetBlocksCount()
57+
}
58+
59+
func (a *Assembler) GetBlocksSize() uint64 {
60+
// TODO do this in a cleaner fashion
61+
return a.assembler.Ledger.(*node_ledger.AssemblerLedger).GetBlocksSize()
62+
}
63+
5064
func (a *Assembler) Stop() {
5165
a.netStopper.Stop()
5266
a.prefetcher.Stop()
5367
a.assembler.Index.Stop()
5468
a.baReplicator.Stop()
5569
a.assembler.WaitTermination()
5670
a.assembler.Ledger.Close()
71+
close(a.monitorStopper)
5772
}
5873

5974
func NewDefaultAssembler(
@@ -127,9 +142,54 @@ func NewDefaultAssembler(
127142

128143
assembler.assembler.Run()
129144

145+
assembler.monitoringInterval = 10 // supposed to come from config
146+
147+
assembler.monitorStopper = make(chan struct{})
148+
149+
// Periodically log status reports
150+
go statusTracker(assembler, al, index, shardIds, partyIds, logger)
151+
130152
return assembler
131153
}
132154

155+
func statusTracker(assembler *Assembler, al node_ledger.AssemblerLedgerReaderWriter,
156+
index PrefetchIndexer, shardIds []types.ShardID, partyIds []types.PartyID, logger types.Logger,
157+
) {
158+
var lastTxCommitted uint64 = 0
159+
var lastBlocksCommitted uint64 = 0
160+
monitorTicker := time.NewTicker(assembler.monitoringInterval * time.Second)
161+
defer monitorTicker.Stop()
162+
163+
for {
164+
select {
165+
case <-assembler.monitorStopper:
166+
return
167+
case <-monitorTicker.C:
168+
totalPbSz := 0
169+
var sb strings.Builder
170+
171+
for _, shardId := range shardIds {
172+
for _, partyId := range partyIds {
173+
pbsz := index.PrefetchBufferSize(ShardPrimary{Shard: shardId, Primary: partyId})
174+
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", shardId, partyId, pbsz))
175+
totalPbSz += pbsz
176+
}
177+
}
178+
179+
txCommitted := assembler.GetTxCount()
180+
blocksCommitted := assembler.GetBlocksCount()
181+
blocksSizeCommitted := assembler.GetBlocksSize()
182+
183+
logger.Infof("Ledger height: %d, Total committed transactions: %d, New transactions in the last %d seconds: %d, Total blocks %d bytes, New blocks in the last %d seconds: %d, Total size:%d, Prefetch buffer size: %d(%s) bytes",
184+
al.LedgerReader().Height(), txCommitted, assembler.monitoringInterval, (txCommitted - lastTxCommitted), blocksCommitted,
185+
assembler.monitoringInterval, (blocksCommitted - lastBlocksCommitted), blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
186+
187+
lastTxCommitted = txCommitted
188+
lastBlocksCommitted = blocksCommitted
189+
}
190+
}
191+
}
192+
133193
func NewAssembler(config *config.AssemblerNodeConfig, net NetStopper, genesisBlock *common.Block, logger types.Logger) *Assembler {
134194
return NewDefaultAssembler(
135195
logger,

node/assembler/batch_cache.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
77
package assembler
88

99
import (
10+
"sync/atomic"
11+
1012
"github.com/hyperledger/fabric-x-orderer/common/types"
1113
)
1214

1315
type BatchCache struct {
1416
tag string
1517
shardBatchMapper *BatchMapper[types.BatchID, types.Batch]
1618
partition ShardPrimary
17-
sizeBytes int
19+
sizeBytes uint64
1820
}
1921

2022
//go:generate counterfeiter -o ./mocks/batch_cache_factory.go . BatchCacheFactory
@@ -47,7 +49,7 @@ func (bc *BatchCache) Pop(batchId types.BatchID) (types.Batch, error) {
4749
if err != nil {
4850
return nil, err
4951
}
50-
bc.sizeBytes -= batchSizeBytes(batch)
52+
atomic.AddUint64(&bc.sizeBytes, ^(uint64(batchSizeBytes(batch)) - 1))
5153
return batch, nil
5254
}
5355

@@ -57,7 +59,7 @@ func (bc *BatchCache) Put(batch types.Batch) error {
5759
return ErrBatchAlreadyExists
5860
}
5961
if inserted {
60-
bc.sizeBytes += batchSizeBytes(batch)
62+
atomic.AddUint64(&bc.sizeBytes, uint64(batchSizeBytes(batch)))
6163
}
6264
return nil
6365
}
@@ -71,5 +73,5 @@ func (bc *BatchCache) Get(batchId types.BatchID) (types.Batch, error) {
7173
}
7274

7375
func (bc *BatchCache) SizeBytes() int {
74-
return bc.sizeBytes
76+
return int(atomic.LoadUint64(&bc.sizeBytes))
7577
}

node/assembler/mocks/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/mocks/prefetch_index.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/assembler/partition_prefetch_index.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type PartitionPrefetchIndexer interface {
5656
Put(batch types.Batch) error
5757
PutForce(batch types.Batch) error
5858
Stop()
59+
PrefetchBufferSize() int
5960
}
6061

6162
//go:generate counterfeiter -o ./mocks/partition_prefetch_index_factory.go . PartitionPrefetchIndexerFactory
@@ -192,6 +193,10 @@ func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defa
192193
return pi
193194
}
194195

196+
func (pi *PartitionPrefetchIndex) PrefetchBufferSize() int {
197+
return pi.cache.SizeBytes() + pi.forcedPutCache.SizeBytes()
198+
}
199+
195200
func (pi *PartitionPrefetchIndex) getName() string {
196201
return fmt.Sprintf("partition <Sh: %d, Pr: %d>", pi.partition.Shard, pi.partition.Primary)
197202
}

node/assembler/prefetch_index.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type PrefetchIndexer interface {
1818
Put(batch types.Batch) error
1919
PutForce(batch types.Batch) error
2020
Requests() <-chan types.BatchID
21+
PrefetchBufferSize(shardPrimary ShardPrimary) int
2122
Stop()
2223
}
2324

@@ -106,6 +107,11 @@ func NewPrefetchIndex(
106107
return pi
107108
}
108109

110+
func (pi *PrefetchIndex) PrefetchBufferSize(shardPrimary ShardPrimary) int {
111+
partitionIndex := pi.partitionToIndex[shardPrimary]
112+
return partitionIndex.PrefetchBufferSize()
113+
}
114+
109115
func (pi *PrefetchIndex) PopOrWait(batchId types.BatchID) (types.Batch, error) {
110116
t1 := time.Now()
111117
defer func() {

node/ledger/assembler_ledger.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type AssemblerLedger struct {
5454
Logger types.Logger
5555
Ledger blockledger.ReadWriter
5656
transactionCount uint64
57+
blocksCount uint64
58+
blocksSize uint64
5759
blockStorageProvider *blkstorage.BlockStoreProvider
5860
blockStore *blkstorage.BlockStore
5961
cancellationContext context.Context
@@ -132,6 +134,20 @@ func (l *AssemblerLedger) GetTxCount() uint64 {
132134
return c
133135
}
134136

137+
func (l *AssemblerLedger) GetBlocksCount() uint64 {
138+
c := atomic.LoadUint64(&l.blocksCount)
139+
return c
140+
}
141+
142+
func (l *AssemblerLedger) GetBlocksSize() uint64 {
143+
c := atomic.LoadUint64(&l.blocksSize)
144+
return c
145+
}
146+
147+
func blockSize(block *common.Block) uint64 {
148+
return uint64(len(block.Header.GetDataHash()) + len(block.Header.GetPreviousHash()) + 8 + len(block.Metadata.Metadata) + len(block.Data.Data))
149+
}
150+
135151
func (l *AssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingInfo) {
136152
ordInfo := orderingInfo.(*state.OrderingInformation)
137153
t1 := time.Now()
@@ -198,6 +214,8 @@ func (l *AssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingI
198214
}
199215

200216
atomic.AddUint64(&l.transactionCount, uint64(len(batch.Requests())))
217+
atomic.AddUint64(&l.blocksCount, uint64(1))
218+
atomic.AddUint64(&l.blocksSize, blockSize(block))
201219
}
202220

203221
func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum) {
@@ -211,6 +229,8 @@ func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum ty
211229
l.Logger.Panicf("attempting to AppendConfig a block which is not a config block: %d", configBlock.GetHeader().GetNumber())
212230
}
213231

232+
atomic.AddUint64(&l.blocksCount, uint64(1))
233+
atomic.AddUint64(&l.blocksSize, blockSize(configBlock))
214234
transactionCount := atomic.AddUint64(&l.transactionCount, 1) // len(configBlock.GetData().GetData()) = should always be a single TX
215235
batchID := types.NewSimpleBatch(types.ShardIDConsensus, 0, 0, nil, 0)
216236
ordInfo := &state.OrderingInformation{

0 commit comments

Comments
 (0)