Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 88 additions & 24 deletions sqlitestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/Arkiv-Network/sqlite-bitmap-store/store"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/metrics"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
Expand All @@ -23,6 +24,19 @@ import (
"github.com/Arkiv-Network/arkiv-events/events"
)

var (
// Metrics for tracking operations
metricOperationStarted = metrics.NewRegisteredCounter("arkiv_store/operations_started", nil)
metricOperationSuccessful = metrics.NewRegisteredCounter("arkiv_store/operations_successful", nil)
metricCreates = metrics.NewRegisteredCounter("arkiv_store/creates", nil)
metricUpdates = metrics.NewRegisteredCounter("arkiv_store/updates", nil)
metricDeletes = metrics.NewRegisteredCounter("arkiv_store/deletes", nil)
metricExtends = metrics.NewRegisteredCounter("arkiv_store/extends", nil)
metricOwnerChanges = metrics.NewRegisteredCounter("arkiv_store/owner_changes", nil)
// more responsive
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment "more responsive" is unclear and provides no context. Consider replacing with a more descriptive comment explaining why these specific parameters (sample size of 100 and alpha of 0.4) were chosen for the exponential decay sample, or what "more responsive" means in this context (e.g., "Uses exponential decay sample for more responsive metrics that weight recent measurements higher").

Suggested change
// more responsive
// Tracks operation duration (ms) using an exponential decay sample so the histogram
// is more responsive to recent performance by weighting newer measurements higher
// (sample size 100, alpha 0.4).

Copilot uses AI. Check for mistakes.
metricOperationTime = metrics.NewRegisteredHistogram("arkiv_store/operation_time_ms", nil, metrics.NewExpDecaySample(100, 0.4))
)

type SQLiteStore struct {
writePool *sql.DB
readPool *sql.DB
Expand Down Expand Up @@ -99,18 +113,23 @@ func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) {
return store.New(s.writePool).GetLastBlock(ctx)
}

type blockStats struct {
creates int
updates int
deletes int
extends int
ownerChanges int
}

func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.BatchIterator) error {

for batch := range iterator {
if batch.Error != nil {
return fmt.Errorf("failed to follow events: %w", batch.Error)
}

totalCreates := 0
totalUpdates := 0
totalDeletes := 0
totalExtends := 0
totalOwnerChanges := 0
// We will calculate totals for the log at the end, but track per-block for metrics
stats := make(map[uint64]*blockStats)

err := func() error {

Expand Down Expand Up @@ -138,20 +157,22 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat

startTime := time.Now()

metricOperationStarted.Inc(1)
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metricOperationStarted counter is incremented before checking if any blocks will actually be processed. If all blocks in the batch are skipped (line 165-167), the batch will still show as both started and successful (line 528), even though no database modifications occurred. While this accurately tracks batch processing, it may not reflect actual data processing work. Consider whether the metric should track batches processed or blocks processed, and adjust accordingly for clearer observability.

Copilot uses AI. Check for mistakes.

mainLoop:
for _, block := range batch.Batch.Blocks {

updates := 0
deletes := 0
extends := 0
creates := 0
ownerChanges := 0

if block.Number <= uint64(lastBlockFromDB) {
s.log.Info("skipping block", "block", block.Number, "lastBlockFromDB", lastBlockFromDB)
continue mainLoop
}

// Initialize stats for this block
if _, ok := stats[block.Number]; !ok {
stats[block.Number] = &blockStats{}
}
blockStat := stats[block.Number]

updatesMap := map[common.Hash][]*events.OPUpdate{}

for _, operation := range block.Operations {
Expand All @@ -162,15 +183,14 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
}
}

// blockNumber := block.Number
operationLoop:
for _, operation := range block.Operations {

switch {

case operation.Create != nil:
// expiresAtBlock := blockNumber + operation.Create.BTL
creates++
blockStat.creates++
key := operation.Create.Key

stringAttributes := maps.Clone(operation.Create.StringAttributes)
Expand Down Expand Up @@ -225,7 +245,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
}
}
case operation.Update != nil:
updates++
blockStat.updates++
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blockStat.updates counter is incremented before checking if the update operation should be skipped (line 253-255). When an update is not the last update for a key, it continues to the next operation without processing, but the counter has already been incremented. This leads to overcounting of update operations in metrics, as they count all update operations including those that were skipped. Consider moving the increment to after the skip check at line 255, so only processed updates are counted.

Copilot uses AI. Check for mistakes.

updates := updatesMap[operation.Update.Key]
lastUpdate := updates[len(updates)-1]
Expand Down Expand Up @@ -319,7 +339,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat

case operation.Delete != nil || operation.Expire != nil:

deletes++
blockStat.deletes++
var key []byte
if operation.Delete != nil {
key = common.Hash(*operation.Delete).Bytes()
Expand Down Expand Up @@ -363,7 +383,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat

case operation.ExtendBTL != nil:

extends++
blockStat.extends++

key := operation.ExtendBTL.Key.Bytes()

Expand Down Expand Up @@ -403,7 +423,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
}

case operation.ChangeOwner != nil:
ownerChanges++
blockStat.ownerChanges++
key := operation.ChangeOwner.Key.Bytes()

latestPayload, err := st.GetPayloadForEntityKey(ctx, key)
Expand Down Expand Up @@ -449,12 +469,8 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat

}

s.log.Info("block updated", "block", block.Number, "creates", creates, "updates", updates, "deletes", deletes, "extends", extends, "ownerChanges", ownerChanges)
totalCreates += creates
totalUpdates += updates
totalDeletes += deletes
totalExtends += extends
totalOwnerChanges += ownerChanges
// Log per block if needed, but we can now rely on the map for totals later
s.log.Info("block updated", "block", block.Number, "creates", blockStat.creates, "updates", blockStat.updates, "deletes", blockStat.deletes, "extends", blockStat.extends, "ownerChanges", blockStat.ownerChanges)
}

err = st.UpsertLastBlock(ctx, lastBlock)
Expand All @@ -472,7 +488,55 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
return fmt.Errorf("failed to commit transaction: %w", err)
}

s.log.Info("batch processed", "firstBlock", firstBlock, "lastBlock", lastBlock, "processingTime", time.Since(startTime).Milliseconds(), "creates", totalCreates, "updates", totalUpdates, "deletes", totalDeletes, "extends", totalExtends, "ownerChanges", totalOwnerChanges)
// Calculate batch totals for logging and update metrics PER BLOCK
var (
totalCreates int
totalUpdates int
totalDeletes int
totalExtends int
totalOwnerChanges int
)

// Iterate blocks again to preserve order and update metrics per block
for _, block := range batch.Batch.Blocks {
if s, ok := stats[block.Number]; ok {
totalCreates += s.creates
totalUpdates += s.updates
totalDeletes += s.deletes
totalExtends += s.extends
totalOwnerChanges += s.ownerChanges

// Update metrics specifically per block
if s.creates > 0 {
metricCreates.Inc(int64(s.creates))
}
if s.updates > 0 {
metricUpdates.Inc(int64(s.updates))
}
if s.deletes > 0 {
metricDeletes.Inc(int64(s.deletes))
}
if s.extends > 0 {
metricExtends.Inc(int64(s.extends))
}
if s.ownerChanges > 0 {
metricOwnerChanges.Inc(int64(s.ownerChanges))
Comment on lines +502 to +523
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name 's' shadows the receiver variable 's' (the SQLiteStore instance). While this works, it reduces code clarity and could lead to confusion. Consider using a more descriptive name like 'blockStat' or 'stat' instead of 's' to avoid shadowing and improve readability.

Suggested change
if s, ok := stats[block.Number]; ok {
totalCreates += s.creates
totalUpdates += s.updates
totalDeletes += s.deletes
totalExtends += s.extends
totalOwnerChanges += s.ownerChanges
// Update metrics specifically per block
if s.creates > 0 {
metricCreates.Inc(int64(s.creates))
}
if s.updates > 0 {
metricUpdates.Inc(int64(s.updates))
}
if s.deletes > 0 {
metricDeletes.Inc(int64(s.deletes))
}
if s.extends > 0 {
metricExtends.Inc(int64(s.extends))
}
if s.ownerChanges > 0 {
metricOwnerChanges.Inc(int64(s.ownerChanges))
if stat, ok := stats[block.Number]; ok {
totalCreates += stat.creates
totalUpdates += stat.updates
totalDeletes += stat.deletes
totalExtends += stat.extends
totalOwnerChanges += stat.ownerChanges
// Update metrics specifically per block
if stat.creates > 0 {
metricCreates.Inc(int64(stat.creates))
}
if stat.updates > 0 {
metricUpdates.Inc(int64(stat.updates))
}
if stat.deletes > 0 {
metricDeletes.Inc(int64(stat.deletes))
}
if stat.extends > 0 {
metricExtends.Inc(int64(stat.extends))
}
if stat.ownerChanges > 0 {
metricOwnerChanges.Inc(int64(stat.ownerChanges))

Copilot uses AI. Check for mistakes.
}
Comment on lines +510 to +524
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zero checks (e.g., if s.creates > 0) before incrementing metrics are unnecessary. Counter.Inc() can safely be called with 0, and these conditional checks add complexity without providing any benefit. Consider removing these checks and unconditionally incrementing the metrics with the count values to simplify the code.

Suggested change
if s.creates > 0 {
metricCreates.Inc(int64(s.creates))
}
if s.updates > 0 {
metricUpdates.Inc(int64(s.updates))
}
if s.deletes > 0 {
metricDeletes.Inc(int64(s.deletes))
}
if s.extends > 0 {
metricExtends.Inc(int64(s.extends))
}
if s.ownerChanges > 0 {
metricOwnerChanges.Inc(int64(s.ownerChanges))
}
metricCreates.Inc(int64(s.creates))
metricUpdates.Inc(int64(s.updates))
metricDeletes.Inc(int64(s.deletes))
metricExtends.Inc(int64(s.extends))
metricOwnerChanges.Inc(int64(s.ownerChanges))

Copilot uses AI. Check for mistakes.
}
}

metricOperationSuccessful.Inc(1)
metricOperationTime.Update(time.Since(startTime).Milliseconds())

s.log.Info("batch processed",
"firstBlock", firstBlock,
"lastBlock", lastBlock,
"processingTime", time.Since(startTime).Milliseconds(),
"creates", totalCreates,
"updates", totalUpdates,
"deletes", totalDeletes,
"extends", totalExtends,
"ownerChanges", totalOwnerChanges)

return nil
}()
Expand Down
Loading