diff --git a/sqlitestore.go b/sqlitestore.go index 8834eaf..cee21db 100644 --- a/sqlitestore.go +++ b/sqlitestore.go @@ -14,6 +14,7 @@ import ( "github.com/Arkiv-Network/sqlite-bitmap-store/store" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4/database/sqlite3" "github.com/golang-migrate/migrate/v4/source/iofs" @@ -23,6 +24,19 @@ import ( "github.com/Arkiv-Network/arkiv-events/events" ) +var ( + // Metrics for tracking operations + metricOperationStarted = metrics.NewRegisteredCounter("arkiv_store/operations_started", nil) + metricOperationSuccessful = metrics.NewRegisteredCounter("arkiv_store/operations_successful", nil) + metricCreates = metrics.NewRegisteredCounter("arkiv_store/creates", nil) + metricUpdates = metrics.NewRegisteredCounter("arkiv_store/updates", nil) + metricDeletes = metrics.NewRegisteredCounter("arkiv_store/deletes", nil) + metricExtends = metrics.NewRegisteredCounter("arkiv_store/extends", nil) + metricOwnerChanges = metrics.NewRegisteredCounter("arkiv_store/owner_changes", nil) + // more responsive + metricOperationTime = metrics.NewRegisteredHistogram("arkiv_store/operation_time_ms", nil, metrics.NewExpDecaySample(100, 0.4)) +) + type SQLiteStore struct { writePool *sql.DB readPool *sql.DB @@ -99,6 +113,14 @@ func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) { return store.New(s.writePool).GetLastBlock(ctx) } +type blockStats struct { + creates int + updates int + deletes int + extends int + ownerChanges int +} + func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.BatchIterator) error { for batch := range iterator { @@ -106,11 +128,8 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to follow events: %w", batch.Error) } - totalCreates := 0 - totalUpdates := 0 - totalDeletes := 0 - totalExtends := 0 - totalOwnerChanges := 0 + // We will calculate totals for the log at the end, but track per-block for metrics + stats := make(map[uint64]*blockStats) err := func() error { @@ -138,20 +157,22 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat startTime := time.Now() + metricOperationStarted.Inc(1) + mainLoop: for _, block := range batch.Batch.Blocks { - updates := 0 - deletes := 0 - extends := 0 - creates := 0 - ownerChanges := 0 - if block.Number <= uint64(lastBlockFromDB) { s.log.Info("skipping block", "block", block.Number, "lastBlockFromDB", lastBlockFromDB) continue mainLoop } + // Initialize stats for this block + if _, ok := stats[block.Number]; !ok { + stats[block.Number] = &blockStats{} + } + blockStat := stats[block.Number] + updatesMap := map[common.Hash][]*events.OPUpdate{} for _, operation := range block.Operations { @@ -162,7 +183,6 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat } } - // blockNumber := block.Number operationLoop: for _, operation := range block.Operations { @@ -170,7 +190,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat case operation.Create != nil: // expiresAtBlock := blockNumber + operation.Create.BTL - creates++ + blockStat.creates++ key := operation.Create.Key stringAttributes := maps.Clone(operation.Create.StringAttributes) @@ -225,7 +245,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat } } case operation.Update != nil: - updates++ + blockStat.updates++ updates := updatesMap[operation.Update.Key] lastUpdate := updates[len(updates)-1] @@ -319,7 +339,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat case operation.Delete != nil || operation.Expire != nil: - deletes++ + blockStat.deletes++ var key []byte if operation.Delete != nil { key = common.Hash(*operation.Delete).Bytes() @@ -363,7 +383,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat case operation.ExtendBTL != nil: - extends++ + blockStat.extends++ key := operation.ExtendBTL.Key.Bytes() @@ -403,7 +423,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat } case operation.ChangeOwner != nil: - ownerChanges++ + blockStat.ownerChanges++ key := operation.ChangeOwner.Key.Bytes() latestPayload, err := st.GetPayloadForEntityKey(ctx, key) @@ -449,12 +469,8 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat } - s.log.Info("block updated", "block", block.Number, "creates", creates, "updates", updates, "deletes", deletes, "extends", extends, "ownerChanges", ownerChanges) - totalCreates += creates - totalUpdates += updates - totalDeletes += deletes - totalExtends += extends - totalOwnerChanges += ownerChanges + // Log per block if needed, but we can now rely on the map for totals later + s.log.Info("block updated", "block", block.Number, "creates", blockStat.creates, "updates", blockStat.updates, "deletes", blockStat.deletes, "extends", blockStat.extends, "ownerChanges", blockStat.ownerChanges) } err = st.UpsertLastBlock(ctx, lastBlock) @@ -472,7 +488,55 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to commit transaction: %w", err) } - s.log.Info("batch processed", "firstBlock", firstBlock, "lastBlock", lastBlock, "processingTime", time.Since(startTime).Milliseconds(), "creates", totalCreates, "updates", totalUpdates, "deletes", totalDeletes, "extends", totalExtends, "ownerChanges", totalOwnerChanges) + // Calculate batch totals for logging and update metrics PER BLOCK + var ( + totalCreates int + totalUpdates int + totalDeletes int + totalExtends int + totalOwnerChanges int + ) + + // Iterate blocks again to preserve order and update metrics per block + for _, block := range batch.Batch.Blocks { + if s, ok := stats[block.Number]; ok { + totalCreates += s.creates + totalUpdates += s.updates + totalDeletes += s.deletes + totalExtends += s.extends + totalOwnerChanges += s.ownerChanges + + // Update metrics specifically per block + if s.creates > 0 { + metricCreates.Inc(int64(s.creates)) + } + if s.updates > 0 { + metricUpdates.Inc(int64(s.updates)) + } + if s.deletes > 0 { + metricDeletes.Inc(int64(s.deletes)) + } + if s.extends > 0 { + metricExtends.Inc(int64(s.extends)) + } + if s.ownerChanges > 0 { + metricOwnerChanges.Inc(int64(s.ownerChanges)) + } + } + } + + metricOperationSuccessful.Inc(1) + metricOperationTime.Update(time.Since(startTime).Milliseconds()) + + s.log.Info("batch processed", + "firstBlock", firstBlock, + "lastBlock", lastBlock, + "processingTime", time.Since(startTime).Milliseconds(), + "creates", totalCreates, + "updates", totalUpdates, + "deletes", totalDeletes, + "extends", totalExtends, + "ownerChanges", totalOwnerChanges) return nil }()