Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions node/assembler/assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Assembler struct {
prefetcher PrefetcherController
baReplicator delivery.ConsensusBringer
netStopper NetStopper
metrics *Metrics
}

func (a *Assembler) Broadcast(server orderer.AtomicBroadcast_BroadcastServer) error {
Expand All @@ -43,11 +44,12 @@ func (a *Assembler) Deliver(server orderer.AtomicBroadcast_DeliverServer) error
}

func (a *Assembler) GetTxCount() uint64 {
// TODO do this in a cleaner fashion
return a.assembler.Ledger.(*node_ledger.AssemblerLedger).GetTxCount()
return a.assembler.Ledger.GetTxCount()
}

// Stop stops the assembler and all its components.
func (a *Assembler) Stop() {
a.metrics.Stop()
a.netStopper.Stop()
a.prefetcher.Stop()
a.assembler.Index.Stop()
Expand Down Expand Up @@ -94,9 +96,6 @@ func NewDefaultAssembler(
logger.Infof("Starting with BatchFrontier: %s", node_ledger.BatchFrontierToString(batchFrontier))

index := prefetchIndexFactory.Create(shardIds, partyIds, logger, config.PrefetchEvictionTtl, config.PrefetchBufferMemoryBytes, config.BatchRequestsChannelSize, &DefaultTimerFactory{}, &DefaultBatchCacheFactory{}, &DefaultPartitionPrefetchIndexerFactory{}, config.PopWaitMonitorTimeout)
if err != nil {
logger.Panicf("Failed creating index: %v", err)
}

baReplicator := consensusBringerFactory.Create(config.Consenter.TLSCACerts, config.TLSPrivateKeyFile, config.TLSCertificateFile, config.Consenter.Endpoint, al, logger)

Expand Down Expand Up @@ -126,6 +125,9 @@ func NewDefaultAssembler(

assembler.assembler.Run()

assembler.metrics = NewMetrics(al.Metrics(), index.Metrics(), logger, config.MetricsLogInterval)
assembler.metrics.Start()

return assembler
}

Expand Down
1 change: 1 addition & 0 deletions node/assembler/assembler_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type AssemblerIndex interface {
}

type AssemblerLedgerWriter interface {
GetTxCount() uint64
Append(batch types.Batch, orderingInfo types.OrderingInfo)
AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum)
Close()
Expand Down
10 changes: 6 additions & 4 deletions node/assembler/batch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package assembler

import (
"sync/atomic"

"github.com/hyperledger/fabric-x-orderer/common/types"
)

type BatchCache struct {
tag string
shardBatchMapper *BatchMapper[types.BatchID, types.Batch]
partition ShardPrimary
sizeBytes int
sizeBytes uint64
}

//go:generate counterfeiter -o ./mocks/batch_cache_factory.go . BatchCacheFactory
Expand Down Expand Up @@ -47,7 +49,7 @@ func (bc *BatchCache) Pop(batchId types.BatchID) (types.Batch, error) {
if err != nil {
return nil, err
}
bc.sizeBytes -= batchSizeBytes(batch)
atomic.AddUint64(&bc.sizeBytes, ^(uint64(batchSizeBytes(batch)) - 1))
return batch, nil
}

Expand All @@ -57,7 +59,7 @@ func (bc *BatchCache) Put(batch types.Batch) error {
return ErrBatchAlreadyExists
}
if inserted {
bc.sizeBytes += batchSizeBytes(batch)
atomic.AddUint64(&bc.sizeBytes, uint64(batchSizeBytes(batch)))
}
return nil
}
Expand All @@ -71,5 +73,5 @@ func (bc *BatchCache) Get(batchId types.BatchID) (types.Batch, error) {
}

func (bc *BatchCache) SizeBytes() int {
return bc.sizeBytes
return int(atomic.LoadUint64(&bc.sizeBytes))
}
112 changes: 112 additions & 0 deletions node/assembler/metrics.go

This comment was marked as resolved.

Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package assembler

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

arma_types "github.com/hyperledger/fabric-x-orderer/common/types"
node_ledger "github.com/hyperledger/fabric-x-orderer/node/ledger"
)

type PartitionPrefetchIndexMetrics struct {
cacheSize *uint64
forcedPutCacheSize *uint64
}

type Metrics struct {
ledgerMetrics *node_ledger.AssemblerLedgerMetrics
indexMetrics *PrefetchIndexMetrics
logger arma_types.Logger
interval time.Duration
stopChan chan struct{}
stopOnce sync.Once
}

func NewMetrics(alm *node_ledger.AssemblerLedgerMetrics, pim *PrefetchIndexMetrics, logger arma_types.Logger, interval time.Duration,
) *Metrics {
return &Metrics{
ledgerMetrics: alm,
indexMetrics: pim,
interval: interval,
logger: logger,
stopChan: make(chan struct{}),
}
}

func (m *Metrics) Start() {
if m.interval > 0 {
go m.trackMetrics()
}
}

func (m *Metrics) Stop() {
m.stopOnce.Do(func() {
close(m.stopChan)
txCommitted := atomic.LoadUint64(&m.ledgerMetrics.TransactionCount)
blocksCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksCount)
blocksSizeCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksSize)

totalPbSz := 0
var sb strings.Builder

for partition, pim := range *m.indexMetrics {
pbsz := atomic.LoadUint64(pim.cacheSize) + atomic.LoadUint64(pim.forcedPutCacheSize)
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", partition.Shard, partition.Primary, pbsz))
totalPbSz += int(pbsz)
}

m.logger.Infof("ASSEMBLER_METRICS: total: TXs %d, blocks %d, block size %d, prefetch buffer size: %d(%s) bytes",
txCommitted, blocksCommitted, blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; "))
})
}

func (m *Metrics) trackMetrics() {
lastTxCommitted, lastBlocksCommitted := atomic.LoadUint64(&m.ledgerMetrics.TransactionCount), atomic.LoadUint64(&m.ledgerMetrics.BlocksCount)
sec := m.interval.Seconds()
t := time.NewTicker(m.interval)
defer t.Stop()

for {
select {
case <-t.C:
totalPbSz := 0
var sb strings.Builder

for partition, pim := range *m.indexMetrics {
pbsz := atomic.LoadUint64(pim.cacheSize) + atomic.LoadUint64(pim.forcedPutCacheSize)
sb.WriteString(fmt.Sprintf("<Sh: %d, Pr:%d>:%d; ", partition.Shard, partition.Primary, pbsz))
totalPbSz += int(pbsz)
}

txCommitted := atomic.LoadUint64(&m.ledgerMetrics.TransactionCount)
blocksCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksCount)
blocksSizeCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksSize)

newBlocks := uint64(0)
if blocksCommitted > lastBlocksCommitted {
newBlocks = blocksCommitted - lastBlocksCommitted
}

newTXs := uint64(0)
if txCommitted > lastTxCommitted {
newTXs = txCommitted - lastTxCommitted
}

m.logger.Infof("ASSEMBLER_METRICS: total: TXs %d, blocks %d, block size %d, in the last %.2f seconds: TXs %d, block %d, prefetch buffer size: %d(%s) bytes",
txCommitted, blocksCommitted, blocksSizeCommitted, sec, newTXs, newBlocks, totalPbSz, strings.TrimRight(sb.String(), "; "))

lastTxCommitted, lastBlocksCommitted = txCommitted, blocksCommitted
case <-m.stopChan:
return
}
}
}
4 changes: 0 additions & 4 deletions node/assembler/mocks/batch_cache_factory.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions node/assembler/mocks/batch_fetcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions node/assembler/mocks/batch_fetcher_factory.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 63 additions & 8 deletions node/assembler/mocks/partition_prefetch_index.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading