diff --git a/common/monitoring/provider.go b/common/monitoring/provider.go index 42b18a93..f1db8873 100644 --- a/common/monitoring/provider.go +++ b/common/monitoring/provider.go @@ -197,7 +197,7 @@ func GetMetricValue(m prometheus.Metric, logger types.Logger) float64 { gm := promgo.Metric{} err := m.Write(&gm) if err != nil { - logger.Infof("%v", err.Error()) + logger.Errorf("%v", err.Error()) return 0 } @@ -213,7 +213,7 @@ func GetMetricValue(m prometheus.Metric, logger types.Logger) float64 { case gm.Histogram != nil: return gm.Histogram.GetSampleSum() default: - logger.Infof("unsupported metric") + logger.Errorf("unsupported metric") return 0 } } diff --git a/node/assembler/assembler.go b/node/assembler/assembler.go index ab0bf490..c08cbe6f 100644 --- a/node/assembler/assembler.go +++ b/node/assembler/assembler.go @@ -32,6 +32,7 @@ type Assembler struct { prefetcher PrefetcherController baReplicator delivery.ConsensusBringer netStopper NetStopper + metrics *Metrics } func (a *Assembler) Broadcast(server orderer.AtomicBroadcast_BroadcastServer) error { @@ -44,10 +45,12 @@ func (a *Assembler) Deliver(server orderer.AtomicBroadcast_DeliverServer) error func (a *Assembler) GetTxCount() uint64 { // TODO do this in a cleaner fashion - return a.collator.Ledger.(*node_ledger.AssemblerLedger).GetTxCount() + return a.collator.Ledger.GetTxCount() } +// Stop stops the assembler and all its components. func (a *Assembler) Stop() { + a.metrics.Stop() a.netStopper.Stop() a.prefetcher.Stop() a.collator.Index.Stop() @@ -94,9 +97,6 @@ func NewDefaultAssembler( logger.Infof("Starting with BatchFrontier: %s", node_ledger.BatchFrontierToString(batchFrontier)) index := prefetchIndexFactory.Create(shardIds, partyIds, logger, config.PrefetchEvictionTtl, config.PrefetchBufferMemoryBytes, config.BatchRequestsChannelSize, &DefaultTimerFactory{}, &DefaultBatchCacheFactory{}, &DefaultPartitionPrefetchIndexerFactory{}, config.PopWaitMonitorTimeout) - if err != nil { - logger.Panicf("Failed creating index: %v", err) - } baReplicator := consensusBringerFactory.Create(config.Consenter.TLSCACerts, config.TLSPrivateKeyFile, config.TLSCertificateFile, config.Consenter.Endpoint, al, logger) @@ -126,6 +126,9 @@ func NewDefaultAssembler( assembler.collator.Run() + assembler.metrics = NewMetrics(al.Metrics(), index.Metrics(), logger, config.MetricsLogInterval) + assembler.metrics.Start() + return assembler } diff --git a/node/assembler/batch_cache.go b/node/assembler/batch_cache.go index b42a3b87..1d1f43d1 100644 --- a/node/assembler/batch_cache.go +++ b/node/assembler/batch_cache.go @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0 package assembler import ( + "sync/atomic" + "github.com/hyperledger/fabric-x-orderer/common/types" ) @@ -14,7 +16,7 @@ type BatchCache struct { tag string shardBatchMapper *BatchMapper[types.BatchID, types.Batch] partition ShardPrimary - sizeBytes int + sizeBytes uint64 } //go:generate counterfeiter -o ./mocks/batch_cache_factory.go . BatchCacheFactory @@ -47,7 +49,7 @@ func (bc *BatchCache) Pop(batchId types.BatchID) (types.Batch, error) { if err != nil { return nil, err } - bc.sizeBytes -= batchSizeBytes(batch) + atomic.AddUint64(&bc.sizeBytes, ^(uint64(batchSizeBytes(batch)) - 1)) return batch, nil } @@ -57,7 +59,7 @@ func (bc *BatchCache) Put(batch types.Batch) error { return ErrBatchAlreadyExists } if inserted { - bc.sizeBytes += batchSizeBytes(batch) + atomic.AddUint64(&bc.sizeBytes, uint64(batchSizeBytes(batch))) } return nil } @@ -71,5 +73,5 @@ func (bc *BatchCache) Get(batchId types.BatchID) (types.Batch, error) { } func (bc *BatchCache) SizeBytes() int { - return bc.sizeBytes + return int(atomic.LoadUint64(&bc.sizeBytes)) } diff --git a/node/assembler/collator.go b/node/assembler/collator.go index 110a3c02..d173f9fe 100644 --- a/node/assembler/collator.go +++ b/node/assembler/collator.go @@ -25,6 +25,7 @@ type AssemblerIndex interface { type AssemblerLedgerWriter interface { Append(batch types.Batch, orderingInfo *state.OrderingInformation) + GetTxCount() uint64 AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum) Close() } diff --git a/node/assembler/metrics.go b/node/assembler/metrics.go new file mode 100644 index 00000000..9990a081 --- /dev/null +++ b/node/assembler/metrics.go @@ -0,0 +1,112 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package assembler + +import ( + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + arma_types "github.com/hyperledger/fabric-x-orderer/common/types" + node_ledger "github.com/hyperledger/fabric-x-orderer/node/ledger" +) + +type PartitionPrefetchIndexMetrics struct { + cacheSize *uint64 + forcedPutCacheSize *uint64 +} + +type Metrics struct { + ledgerMetrics *node_ledger.AssemblerLedgerMetrics + indexMetrics *PrefetchIndexMetrics + logger arma_types.Logger + interval time.Duration + stopChan chan struct{} + stopOnce sync.Once +} + +func NewMetrics(alm *node_ledger.AssemblerLedgerMetrics, pim *PrefetchIndexMetrics, logger arma_types.Logger, interval time.Duration, +) *Metrics { + return &Metrics{ + ledgerMetrics: alm, + indexMetrics: pim, + interval: interval, + logger: logger, + stopChan: make(chan struct{}), + } +} + +func (m *Metrics) Start() { + if m.interval > 0 { + go m.trackMetrics() + } +} + +func (m *Metrics) Stop() { + m.stopOnce.Do(func() { + close(m.stopChan) + txCommitted := atomic.LoadUint64(&m.ledgerMetrics.TransactionCount) + blocksCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksCount) + blocksSizeCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksSize) + + totalPbSz := 0 + var sb strings.Builder + + for partition, pim := range *m.indexMetrics { + pbsz := atomic.LoadUint64(pim.cacheSize) + atomic.LoadUint64(pim.forcedPutCacheSize) + sb.WriteString(fmt.Sprintf(":%d; ", partition.Shard, partition.Primary, pbsz)) + totalPbSz += int(pbsz) + } + + m.logger.Infof("ASSEMBLER_METRICS: total: TXs %d, blocks %d, block size %d, prefetch buffer size: %d(%s) bytes", + txCommitted, blocksCommitted, blocksSizeCommitted, totalPbSz, strings.TrimRight(sb.String(), "; ")) + }) +} + +func (m *Metrics) trackMetrics() { + lastTxCommitted, lastBlocksCommitted := atomic.LoadUint64(&m.ledgerMetrics.TransactionCount), atomic.LoadUint64(&m.ledgerMetrics.BlocksCount) + sec := m.interval.Seconds() + t := time.NewTicker(m.interval) + defer t.Stop() + + for { + select { + case <-t.C: + totalPbSz := 0 + var sb strings.Builder + + for partition, pim := range *m.indexMetrics { + pbsz := atomic.LoadUint64(pim.cacheSize) + atomic.LoadUint64(pim.forcedPutCacheSize) + sb.WriteString(fmt.Sprintf(":%d; ", partition.Shard, partition.Primary, pbsz)) + totalPbSz += int(pbsz) + } + + txCommitted := atomic.LoadUint64(&m.ledgerMetrics.TransactionCount) + blocksCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksCount) + blocksSizeCommitted := atomic.LoadUint64(&m.ledgerMetrics.BlocksSize) + + newBlocks := uint64(0) + if blocksCommitted > lastBlocksCommitted { + newBlocks = blocksCommitted - lastBlocksCommitted + } + + newTXs := uint64(0) + if txCommitted > lastTxCommitted { + newTXs = txCommitted - lastTxCommitted + } + + m.logger.Infof("ASSEMBLER_METRICS: total: TXs %d, blocks %d, block size %d, in the last %.2f seconds: TXs %d, block %d, prefetch buffer size: %d(%s) bytes", + txCommitted, blocksCommitted, blocksSizeCommitted, sec, newTXs, newBlocks, totalPbSz, strings.TrimRight(sb.String(), "; ")) + + lastTxCommitted, lastBlocksCommitted = txCommitted, blocksCommitted + case <-m.stopChan: + return + } + } +} diff --git a/node/assembler/mocks/batch_cache_factory.go b/node/assembler/mocks/batch_cache_factory.go index b3c12840..a4843458 100644 --- a/node/assembler/mocks/batch_cache_factory.go +++ b/node/assembler/mocks/batch_cache_factory.go @@ -161,10 +161,6 @@ func (fake *FakeBatchCacheFactory) CreateWithTagReturnsOnCall(i int, result1 *as func (fake *FakeBatchCacheFactory) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.createMutex.RLock() - defer fake.createMutex.RUnlock() - fake.createWithTagMutex.RLock() - defer fake.createWithTagMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/mocks/batch_fetcher.go b/node/assembler/mocks/batch_fetcher.go index 31174dd6..8f178e08 100644 --- a/node/assembler/mocks/batch_fetcher.go +++ b/node/assembler/mocks/batch_fetcher.go @@ -193,12 +193,6 @@ func (fake *FakeBatchBringer) StopCalls(stub func()) { func (fake *FakeBatchBringer) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.getBatchMutex.RLock() - defer fake.getBatchMutex.RUnlock() - fake.replicateMutex.RLock() - defer fake.replicateMutex.RUnlock() - fake.stopMutex.RLock() - defer fake.stopMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/mocks/batch_fetcher_factory.go b/node/assembler/mocks/batch_fetcher_factory.go index 92e10011..f7e000ea 100644 --- a/node/assembler/mocks/batch_fetcher_factory.go +++ b/node/assembler/mocks/batch_fetcher_factory.go @@ -93,8 +93,6 @@ func (fake *FakeBatchBringerFactory) CreateReturnsOnCall(i int, result1 assemble func (fake *FakeBatchBringerFactory) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.createMutex.RLock() - defer fake.createMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/mocks/partition_prefetch_index.go b/node/assembler/mocks/partition_prefetch_index.go index ceecfb9f..81382573 100644 --- a/node/assembler/mocks/partition_prefetch_index.go +++ b/node/assembler/mocks/partition_prefetch_index.go @@ -9,6 +9,16 @@ import ( ) type FakePartitionPrefetchIndexer struct { + MetricsStub func() *assembler.PartitionPrefetchIndexMetrics + metricsMutex sync.RWMutex + metricsArgsForCall []struct { + } + metricsReturns struct { + result1 *assembler.PartitionPrefetchIndexMetrics + } + metricsReturnsOnCall map[int]struct { + result1 *assembler.PartitionPrefetchIndexMetrics + } PopOrWaitStub func(types.BatchID) (types.Batch, error) popOrWaitMutex sync.RWMutex popOrWaitArgsForCall []struct { @@ -52,6 +62,59 @@ type FakePartitionPrefetchIndexer struct { invocationsMutex sync.RWMutex } +func (fake *FakePartitionPrefetchIndexer) Metrics() *assembler.PartitionPrefetchIndexMetrics { + fake.metricsMutex.Lock() + ret, specificReturn := fake.metricsReturnsOnCall[len(fake.metricsArgsForCall)] + fake.metricsArgsForCall = append(fake.metricsArgsForCall, struct { + }{}) + stub := fake.MetricsStub + fakeReturns := fake.metricsReturns + fake.recordInvocation("Metrics", []interface{}{}) + fake.metricsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakePartitionPrefetchIndexer) MetricsCallCount() int { + fake.metricsMutex.RLock() + defer fake.metricsMutex.RUnlock() + return len(fake.metricsArgsForCall) +} + +func (fake *FakePartitionPrefetchIndexer) MetricsCalls(stub func() *assembler.PartitionPrefetchIndexMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = stub +} + +func (fake *FakePartitionPrefetchIndexer) MetricsReturns(result1 *assembler.PartitionPrefetchIndexMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = nil + fake.metricsReturns = struct { + result1 *assembler.PartitionPrefetchIndexMetrics + }{result1} +} + +func (fake *FakePartitionPrefetchIndexer) MetricsReturnsOnCall(i int, result1 *assembler.PartitionPrefetchIndexMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = nil + if fake.metricsReturnsOnCall == nil { + fake.metricsReturnsOnCall = make(map[int]struct { + result1 *assembler.PartitionPrefetchIndexMetrics + }) + } + fake.metricsReturnsOnCall[i] = struct { + result1 *assembler.PartitionPrefetchIndexMetrics + }{result1} +} + func (fake *FakePartitionPrefetchIndexer) PopOrWait(arg1 types.BatchID) (types.Batch, error) { fake.popOrWaitMutex.Lock() ret, specificReturn := fake.popOrWaitReturnsOnCall[len(fake.popOrWaitArgsForCall)] @@ -265,14 +328,6 @@ func (fake *FakePartitionPrefetchIndexer) StopCalls(stub func()) { func (fake *FakePartitionPrefetchIndexer) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.popOrWaitMutex.RLock() - defer fake.popOrWaitMutex.RUnlock() - fake.putMutex.RLock() - defer fake.putMutex.RUnlock() - fake.putForceMutex.RLock() - defer fake.putForceMutex.RUnlock() - fake.stopMutex.RLock() - defer fake.stopMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/mocks/prefetch_index.go b/node/assembler/mocks/prefetch_index.go index d06606a6..1b9099d8 100644 --- a/node/assembler/mocks/prefetch_index.go +++ b/node/assembler/mocks/prefetch_index.go @@ -9,6 +9,16 @@ import ( ) type FakePrefetchIndexer struct { + MetricsStub func() *assembler.PrefetchIndexMetrics + metricsMutex sync.RWMutex + metricsArgsForCall []struct { + } + metricsReturns struct { + result1 *assembler.PrefetchIndexMetrics + } + metricsReturnsOnCall map[int]struct { + result1 *assembler.PrefetchIndexMetrics + } PopOrWaitStub func(types.BatchID) (types.Batch, error) popOrWaitMutex sync.RWMutex popOrWaitArgsForCall []struct { @@ -62,6 +72,59 @@ type FakePrefetchIndexer struct { invocationsMutex sync.RWMutex } +func (fake *FakePrefetchIndexer) Metrics() *assembler.PrefetchIndexMetrics { + fake.metricsMutex.Lock() + ret, specificReturn := fake.metricsReturnsOnCall[len(fake.metricsArgsForCall)] + fake.metricsArgsForCall = append(fake.metricsArgsForCall, struct { + }{}) + stub := fake.MetricsStub + fakeReturns := fake.metricsReturns + fake.recordInvocation("Metrics", []interface{}{}) + fake.metricsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakePrefetchIndexer) MetricsCallCount() int { + fake.metricsMutex.RLock() + defer fake.metricsMutex.RUnlock() + return len(fake.metricsArgsForCall) +} + +func (fake *FakePrefetchIndexer) MetricsCalls(stub func() *assembler.PrefetchIndexMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = stub +} + +func (fake *FakePrefetchIndexer) MetricsReturns(result1 *assembler.PrefetchIndexMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = nil + fake.metricsReturns = struct { + result1 *assembler.PrefetchIndexMetrics + }{result1} +} + +func (fake *FakePrefetchIndexer) MetricsReturnsOnCall(i int, result1 *assembler.PrefetchIndexMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = nil + if fake.metricsReturnsOnCall == nil { + fake.metricsReturnsOnCall = make(map[int]struct { + result1 *assembler.PrefetchIndexMetrics + }) + } + fake.metricsReturnsOnCall[i] = struct { + result1 *assembler.PrefetchIndexMetrics + }{result1} +} + func (fake *FakePrefetchIndexer) PopOrWait(arg1 types.BatchID) (types.Batch, error) { fake.popOrWaitMutex.Lock() ret, specificReturn := fake.popOrWaitReturnsOnCall[len(fake.popOrWaitArgsForCall)] @@ -328,16 +391,6 @@ func (fake *FakePrefetchIndexer) StopCalls(stub func()) { func (fake *FakePrefetchIndexer) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.popOrWaitMutex.RLock() - defer fake.popOrWaitMutex.RUnlock() - fake.putMutex.RLock() - defer fake.putMutex.RUnlock() - fake.putForceMutex.RLock() - defer fake.putForceMutex.RUnlock() - fake.requestsMutex.RLock() - defer fake.requestsMutex.RUnlock() - fake.stopMutex.RLock() - defer fake.stopMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/mocks/prefetcher_controller.go b/node/assembler/mocks/prefetcher_controller.go index d0612a96..1c849587 100644 --- a/node/assembler/mocks/prefetcher_controller.go +++ b/node/assembler/mocks/prefetcher_controller.go @@ -71,10 +71,6 @@ func (fake *FakePrefetcherController) StopCalls(stub func()) { func (fake *FakePrefetcherController) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.startMutex.RLock() - defer fake.startMutex.RUnlock() - fake.stopMutex.RLock() - defer fake.stopMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/mocks/prefetcher_factory.go b/node/assembler/mocks/prefetcher_factory.go index 197964e1..cc80739f 100644 --- a/node/assembler/mocks/prefetcher_factory.go +++ b/node/assembler/mocks/prefetcher_factory.go @@ -106,8 +106,6 @@ func (fake *FakePrefetcherFactory) CreateReturnsOnCall(i int, result1 assembler. func (fake *FakePrefetcherFactory) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.createMutex.RLock() - defer fake.createMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/mocks/timer_factory.go b/node/assembler/mocks/timer_factory.go index 26cbda53..ea1e149e 100644 --- a/node/assembler/mocks/timer_factory.go +++ b/node/assembler/mocks/timer_factory.go @@ -90,8 +90,6 @@ func (fake *FakeTimerFactory) CreateReturnsOnCall(i int, result1 assembler.Stopp func (fake *FakeTimerFactory) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.createMutex.RLock() - defer fake.createMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/node/assembler/partition_prefetch_index.go b/node/assembler/partition_prefetch_index.go index 0b5e761b..1b6e9c2a 100644 --- a/node/assembler/partition_prefetch_index.go +++ b/node/assembler/partition_prefetch_index.go @@ -56,6 +56,7 @@ type PartitionPrefetchIndexer interface { Put(batch types.Batch) error PutForce(batch types.Batch) error Stop() + Metrics() *PartitionPrefetchIndexMetrics } //go:generate counterfeiter -o ./mocks/partition_prefetch_index_factory.go . PartitionPrefetchIndexerFactory @@ -175,6 +176,8 @@ type PartitionPrefetchIndex struct { cancellationContext context.Context cancelContextFunc context.CancelFunc + // delay before PopOrWait requests the batch + metrics PartitionPrefetchIndexMetrics } func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defaultTtl time.Duration, maxSizeBytes int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, batchRequestChan chan types.BatchID, popWaitMonitorTimeout time.Duration) *PartitionPrefetchIndex { @@ -194,9 +197,17 @@ func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defa cancelContextFunc: cancel, popWaitMonitorTimeout: popWaitMonitorTimeout, } + pi.metrics = PartitionPrefetchIndexMetrics{ + cacheSize: &pi.cache.sizeBytes, + forcedPutCacheSize: &pi.forcedPutCache.sizeBytes, + } return pi } +func (pi *PartitionPrefetchIndex) Metrics() *PartitionPrefetchIndexMetrics { + return &pi.metrics +} + func (pi *PartitionPrefetchIndex) getName() string { return fmt.Sprintf("partition ", pi.partition.Shard, pi.partition.Primary) } diff --git a/node/assembler/prefetch_index.go b/node/assembler/prefetch_index.go index c2fb4b75..1a85649c 100644 --- a/node/assembler/prefetch_index.go +++ b/node/assembler/prefetch_index.go @@ -18,6 +18,7 @@ type PrefetchIndexer interface { Put(batch types.Batch) error PutForce(batch types.Batch) error Requests() <-chan types.BatchID + Metrics() *PrefetchIndexMetrics Stop() } @@ -65,10 +66,13 @@ func (f *DefaultPrefetchIndexerFactory) Create( ) } +type PrefetchIndexMetrics map[ShardPrimary]*PartitionPrefetchIndexMetrics + type PrefetchIndex struct { logger types.Logger partitionToIndex map[ShardPrimary]PartitionPrefetchIndexer batchRequestChan chan types.BatchID + metrics PrefetchIndexMetrics } func NewPrefetchIndex( @@ -87,6 +91,7 @@ func NewPrefetchIndex( logger: logger, partitionToIndex: make(map[ShardPrimary]PartitionPrefetchIndexer, len(shards)*len(parties)), batchRequestChan: make(chan types.BatchID, requestChannelSize), + metrics: make(map[ShardPrimary]*PartitionPrefetchIndexMetrics, len(shards)*len(parties)), } for _, shardId := range shards { for _, partyId := range parties { @@ -101,11 +106,16 @@ func NewPrefetchIndex( pi.batchRequestChan, popWaitMonitorTimeout, ) + pi.metrics[partition] = pi.partitionToIndex[partition].Metrics() } } return pi } +func (pi *PrefetchIndex) Metrics() *PrefetchIndexMetrics { + return &pi.metrics +} + func (pi *PrefetchIndex) PopOrWait(batchId types.BatchID) (types.Batch, error) { t1 := time.Now() defer func() { diff --git a/node/ledger/assembler_ledger.go b/node/ledger/assembler_ledger.go index c0e86697..af07f852 100644 --- a/node/ledger/assembler_ledger.go +++ b/node/ledger/assembler_ledger.go @@ -31,6 +31,7 @@ type ( //go:generate counterfeiter -o ./mocks/assembler_ledger.go . AssemblerLedgerReaderWriter type AssemblerLedgerReaderWriter interface { GetTxCount() uint64 + Metrics() *AssemblerLedgerMetrics Append(batch types.Batch, orderingInfo *state.OrderingInformation) AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum) LastOrderingInfo() (*state.OrderingInformation, error) @@ -53,11 +54,11 @@ func (f *DefaultAssemblerLedgerFactory) Create(logger types.Logger, ledgerPath s type AssemblerLedger struct { Logger types.Logger Ledger blockledger.ReadWriter - transactionCount uint64 blockStorageProvider *blkstorage.BlockStoreProvider blockStore *blkstorage.BlockStore cancellationContext context.Context cancelContextFunc context.CancelFunc + metrics AssemblerLedgerMetrics } func NewAssemblerLedger(logger types.Logger, ledgerPath string) (*AssemblerLedger, error) { @@ -77,29 +78,38 @@ func NewAssemblerLedger(logger types.Logger, ledgerPath string) (*AssemblerLedge } logger.Infof("Assembler ledger opened block store: path: %s, ledger-ID: %s", ledgerPath, channelName) ledger := fileledger.NewFileLedger(armaLedger) - transactionCount := uint64(0) + alm := AssemblerLedgerMetrics{} height := ledger.Height() if height > 0 { block, err := ledger.RetrieveBlockByNumber(height - 1) if err != nil { return nil, fmt.Errorf("error while fetching last block from ledger %w", err) } - _, _, transactionCount, err = AssemblerBatchIdOrderingInfoAndTxCountFromBlock(block) + _, _, alm.TransactionCount, err = AssemblerBatchIdOrderingInfoAndTxCountFromBlock(block) if err != nil { return nil, fmt.Errorf("error while fetching last block ordering info %w", err) } + alm.BlocksCount = height + + for i := range height { + block, err := ledger.RetrieveBlockByNumber(i) + if err != nil { + return nil, fmt.Errorf("error while fetching block %d from ledger %w", i, err) + } + alm.BlocksSize += blockSize(block) + } } ctx, cancel := context.WithCancel(context.Background()) al := &AssemblerLedger{ Logger: logger, Ledger: ledger, - transactionCount: transactionCount, + metrics: alm, blockStorageProvider: provider, blockStore: armaLedger, cancellationContext: ctx, cancelContextFunc: cancel, } - go al.trackThroughput() + return al, nil } @@ -109,27 +119,16 @@ func (l *AssemblerLedger) Close() { l.blockStorageProvider.Close() } -func (l *AssemblerLedger) trackThroughput() { - firstProbe := true - lastTxCount := uint64(0) - for { - txCount := atomic.LoadUint64(&l.transactionCount) - if !firstProbe { - l.Logger.Infof("Tx Count: %d, Commit throughput: %.2f", txCount, float64(txCount-lastTxCount)/10.0) - } - lastTxCount = txCount - firstProbe = false - select { - case <-time.After(time.Second * 10): - case <-l.cancellationContext.Done(): - return - } - } +func (l *AssemblerLedger) GetTxCount() uint64 { + return atomic.LoadUint64(&l.metrics.TransactionCount) } -func (l *AssemblerLedger) GetTxCount() uint64 { - c := atomic.LoadUint64(&l.transactionCount) - return c +func (l *AssemblerLedger) Metrics() *AssemblerLedgerMetrics { + return &l.metrics +} + +func blockSize(block *common.Block) uint64 { + return uint64(len(protoutil.MarshalOrPanic(block))) } func (l *AssemblerLedger) Append(batch types.Batch, ordInfo *state.OrderingInformation) { @@ -174,7 +173,7 @@ func (l *AssemblerLedger) Append(batch types.Batch, ordInfo *state.OrderingInfor //=== // TODO Ordering metadata marshal orderingInfo and batchID - ordererBlockMetadata, err := AssemblerBlockMetadataToBytes(batch, ordInfo, atomic.LoadUint64(&l.transactionCount)+uint64(len(batch.Requests()))) + ordererBlockMetadata, err := AssemblerBlockMetadataToBytes(batch, ordInfo, atomic.LoadUint64(&l.metrics.TransactionCount)+uint64(len(batch.Requests()))) if err != nil { l.Logger.Panicf("failed to invoke AssemblerBlockMetadataToBytes: %s", err) } @@ -192,7 +191,9 @@ func (l *AssemblerLedger) Append(batch types.Batch, ordInfo *state.OrderingInfor panic(err) } - atomic.AddUint64(&l.transactionCount, uint64(len(batch.Requests()))) + atomic.AddUint64(&l.metrics.TransactionCount, uint64(len(batch.Requests()))) + atomic.AddUint64(&l.metrics.BlocksSize, blockSize(block)) + atomic.AddUint64(&l.metrics.BlocksCount, uint64(1)) } func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum) { @@ -206,7 +207,7 @@ func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum ty l.Logger.Panicf("attempting to AppendConfig a block which is not a config block: %d", configBlock.GetHeader().GetNumber()) } - transactionCount := atomic.AddUint64(&l.transactionCount, 1) // len(configBlock.GetData().GetData()) = should always be a single TX + transactionCount := atomic.AddUint64(&l.metrics.TransactionCount, 1) // len(configBlock.GetData().GetData()) = should always be a single TX batchID := types.NewSimpleBatch(types.ShardIDConsensus, 0, 0, nil, 0) ordInfo := &state.OrderingInformation{ DecisionNum: decisionNum, @@ -229,6 +230,8 @@ func (l *AssemblerLedger) AppendConfig(configBlock *common.Block, decisionNum ty if err := l.Ledger.Append(configBlock); err != nil { panic(err) } + atomic.AddUint64(&l.metrics.BlocksSize, blockSize(configBlock)) + atomic.AddUint64(&l.metrics.BlocksCount, uint64(1)) } func (l *AssemblerLedger) LedgerReader() blockledger.Reader { diff --git a/node/ledger/metrics.go b/node/ledger/metrics.go new file mode 100644 index 00000000..6601fce1 --- /dev/null +++ b/node/ledger/metrics.go @@ -0,0 +1,12 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ +package ledger + +type AssemblerLedgerMetrics struct { + TransactionCount uint64 + BlocksSize uint64 + BlocksCount uint64 +} diff --git a/node/ledger/mocks/assembler_ledger.go b/node/ledger/mocks/assembler_ledger.go index a601d3c2..7b01ff5e 100644 --- a/node/ledger/mocks/assembler_ledger.go +++ b/node/ledger/mocks/assembler_ledger.go @@ -76,6 +76,16 @@ type FakeAssemblerLedgerReaderWriter struct { ledgerReaderReturnsOnCall map[int]struct { result1 blockledger.Reader } + MetricsStub func() *ledger.AssemblerLedgerMetrics + metricsMutex sync.RWMutex + metricsArgsForCall []struct { + } + metricsReturns struct { + result1 *ledger.AssemblerLedgerMetrics + } + metricsReturnsOnCall map[int]struct { + result1 *ledger.AssemblerLedgerMetrics + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -408,6 +418,59 @@ func (fake *FakeAssemblerLedgerReaderWriter) LedgerReaderReturnsOnCall(i int, re }{result1} } +func (fake *FakeAssemblerLedgerReaderWriter) Metrics() *ledger.AssemblerLedgerMetrics { + fake.metricsMutex.Lock() + ret, specificReturn := fake.metricsReturnsOnCall[len(fake.metricsArgsForCall)] + fake.metricsArgsForCall = append(fake.metricsArgsForCall, struct { + }{}) + stub := fake.MetricsStub + fakeReturns := fake.metricsReturns + fake.recordInvocation("Metrics", []interface{}{}) + fake.metricsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAssemblerLedgerReaderWriter) MetricsCallCount() int { + fake.metricsMutex.RLock() + defer fake.metricsMutex.RUnlock() + return len(fake.metricsArgsForCall) +} + +func (fake *FakeAssemblerLedgerReaderWriter) MetricsCalls(stub func() *ledger.AssemblerLedgerMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = stub +} + +func (fake *FakeAssemblerLedgerReaderWriter) MetricsReturns(result1 *ledger.AssemblerLedgerMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = nil + fake.metricsReturns = struct { + result1 *ledger.AssemblerLedgerMetrics + }{result1} +} + +func (fake *FakeAssemblerLedgerReaderWriter) MetricsReturnsOnCall(i int, result1 *ledger.AssemblerLedgerMetrics) { + fake.metricsMutex.Lock() + defer fake.metricsMutex.Unlock() + fake.MetricsStub = nil + if fake.metricsReturnsOnCall == nil { + fake.metricsReturnsOnCall = make(map[int]struct { + result1 *ledger.AssemblerLedgerMetrics + }) + } + fake.metricsReturnsOnCall[i] = struct { + result1 *ledger.AssemblerLedgerMetrics + }{result1} +} + func (fake *FakeAssemblerLedgerReaderWriter) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock()