Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ broadcast/
# testing
**/coverage*.txt
**/lcov.info
.devcontainer/
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/r3labs/sse/v2 v2.10.0
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.17.0
gopkg.in/cenkalti/backoff.v1 v1.1.0
)

require (
Expand Down Expand Up @@ -262,7 +263,6 @@ require (
golang.org/x/tools/go/expect v0.1.1-deprecated // indirect
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated // indirect
google.golang.org/protobuf v1.36.7 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
113 changes: 88 additions & 25 deletions hermes/latest_price_sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"github.com/berachain/go-pyth-client/types"
"github.com/ethereum/go-ethereum/common"
sse "github.com/r3labs/sse/v2"
backoff "gopkg.in/cenkalti/backoff.v1"
)

// Retry parameters.
// Resubscribe backoff parameters. These bound how fast we re-subscribe after the rare
// unrecoverable error; routine reconnects are handled inside the SSE client itself.
const (
initialBackoff = 1 * time.Second
maxRetries = 3
maxBackoff = 30 * time.Second
)

// Subscribe price feed from the streaming `v2/updates/price/stream` endpoint. Ensures this only
Expand All @@ -26,6 +28,28 @@ func (c *Client) SubscribePriceStreaming(ctx context.Context, priceFeedIDs []str
c.subscribeOnce.Do(func() {
client := sse.NewClient(c.buildBatchURLStream(priceFeedIDs))

// hermes.pyth.network sits behind Cloudflare, which periodically resets the HTTP/2
// stream (~5-12 min, INTERNAL_ERROR) by design while leaving the connection intact.
// Reconnect through these resets indefinitely: the default strategy's 15-min
// MaxElapsedTime is measured from subscription start and never reset across
// reconnects, so it otherwise expires and surfaces a routine reset as a fatal error.
// MaxElapsedTime = 0 means "never give up".
reconnect := backoff.NewExponentialBackOff()
reconnect.MaxElapsedTime = 0
// Make the strategy context-aware. r3labs runs the reconnect loop via
// backoff.RetryNotify, which only exits on a clean shutdown when the backoff carries
// the caller's context (otherwise ensureContext wraps it in context.Background(), whose
// Done() never fires). With MaxElapsedTime = 0 the loop never stops on its own, so
// without this the subscribe goroutine would leak when the caller cancels ctx.
client.ReconnectStrategy = backoff.WithContext(reconnect, ctx)

// These reconnects are the recoverable, expected case (Cloudflare stream resets),
// so log them at info. The price-staleness metric is what alerts if reconnects
// stop delivering data; this is purely for visibility.
client.ReconnectNotify = func(err error, d time.Duration) {
c.logger.Info("SSE stream reconnecting", "error", err, "backoff", d.String())
}

subscribe := func() error {
return client.SubscribeRawWithContext(ctx, func(msg *sse.Event) {
c.handleSseEvent(msg)
Expand Down Expand Up @@ -69,6 +93,34 @@ func (c *Client) GetCachedLatestPriceUpdates(
return cachedUpdates, nil
}

// LastStreamUpdate reports the wall-clock time the SSE stream last delivered a valid price
// update from any feed. A zero time means no update has been received yet. This is the
// stream-liveness signal: callers can export `time.Since(LastStreamUpdate())` as a metric to
// alert when the whole stream goes dark, independent of how many times it reconnects.
func (c *Client) LastStreamUpdate() time.Time {
c.ssePriceCached.mu.RLock()
defer c.ssePriceCached.mu.RUnlock()
return c.ssePriceCached.lastEventAt
}

// LastFeedPublishTime reports Pyth's publish_time for the latest cached price of the given
// feed. A zero time means the feed has not been seen yet. This is the per-feed staleness
// signal: callers can export `time.Since(LastFeedPublishTime(id))` as a per-feed metric
// (e.g. NoPriceUpdateSince{feed="..."}) to detect a single feed going stale at the source
// even while the stream itself stays alive.
func (c *Client) LastFeedPublishTime(priceFeedID string) time.Time {
priceFeedIDRaw := hex.EncodeToString(common.FromHex(priceFeedID))

c.ssePriceCached.mu.RLock()
defer c.ssePriceCached.mu.RUnlock()

lpd, ok := c.ssePriceCached.latestPrice[priceFeedIDRaw]
if !ok || lpd.PriceFeed == nil || lpd.PriceFeed.Price.PublishTime == nil {
return time.Time{}
}
return time.Unix(lpd.PriceFeed.Price.PublishTime.Int64(), 0)
}

// Handler of the sse streaming event.
func (c *Client) handleSseEvent(event *sse.Event) {
// Decode the price from sse response to LatestPriceData.
Expand All @@ -86,6 +138,11 @@ func (c *Client) handleSseEvent(event *sse.Event) {
c.logger.Error(
"encountered an error when decoding price response from sse stream", "error", err,
)
} else {
// Record stream liveness: the wall-clock time we last received a valid update from
// any feed. Used as the transport-health backstop behind the per-feed publish_time
// staleness signal (see LastStreamUpdate / LastFeedPublishTime).
c.ssePriceCached.lastEventAt = time.Now()
}
c.ssePriceCached.mu.Unlock()

Expand Down Expand Up @@ -113,36 +170,42 @@ func (c *Client) waitForReady(ctx context.Context) error {
}
}

// subscribeWithRetries retries a subscription up to maxRetries with exponential backoff.
// If the task fails after maxRetries, it panics to indicate a critical failure.
// subscribeWithRetries supervises the SSE subscription. The SSE client reconnects through
// transient disconnects on its own (see ReconnectStrategy / ReconnectNotify), so subscribe()
// only returns on a clean shutdown (context cancelled) or an error the client deemed
// unrecoverable. The unrecoverable case is logged at error level and we re-subscribe with a
// capped backoff rather than panicking: a crash is never better than a retry, and the
// price-staleness metric is the authoritative "things are broken" signal regardless.
func (c *Client) subscribeWithRetries(ctx context.Context, subscribe func() error) {
backoff := initialBackoff
retries := 0
wait := initialBackoff

for {
err := subscribe()

// Distinguish a clean shutdown from a genuine failure.
if ctx.Err() != nil {
return
}
if err == nil {
return
}

c.logger.Error(
"SSE subscription returned an unrecoverable error, resubscribing...",
"error", err, "backoff", wait.String(),
)

for retries < maxRetries {
select {
case <-ctx.Done():
c.logger.Error("context cancelled while trying to subscribe to SSE stream")
return
default:
err := subscribe()
if err == nil {
return
}
// #nosec:G404 // jitter only, not security-sensitive.
case <-time.After(wait + time.Duration(rand.Intn(1000))*time.Millisecond):
}

retries++
c.logger.Error(
"encountered an error when subscribing to SSE stream, now retrying...",
"error", err, "num_retries", retries,
)
if retries >= maxRetries {
panic(fmt.Sprintf("failed to subscribe to SSE stream after %d attempts: %v",
retries, err))
if wait < maxBackoff {
if wait *= 2; wait > maxBackoff {
wait = maxBackoff
}

// #nosec:G404 // fine.
time.Sleep(backoff + time.Duration(rand.Intn(1000))*time.Millisecond)
backoff *= 2
}
}
}
93 changes: 93 additions & 0 deletions hermes/latest_price_sse_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package hermes_test

import (
"context"
"log/slog"
"net/http"
"net/http/httptest"
"runtime"
"strings"
"testing"
"time"

"github.com/berachain/go-pyth-client/hermes"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -45,6 +53,91 @@ func TestSubscribePriceStreaming_PriceFeedNotSubscribed(t *testing.T) {
assert.Nil(t, prices)
}

// TestSubscribePriceStreaming_StopsOnContextCancel is a regression test for a goroutine leak on
// shutdown. The SSE reconnect loop runs inside r3labs via backoff.RetryNotify, which only exits on
// context cancellation when the ReconnectStrategy carries the caller's context. With
// MaxElapsedTime = 0 (never give up) and a non-context-aware backoff, ensureContext wrapped it in
// context.Background() — whose Done() never fires — so cancelling ctx never stopped the loop and
// the subscribe goroutine leaked forever. This test drives the real r3labs + backoff path against
// a local SSE server, cancels the context, and asserts the goroutine actually returns.
func TestSubscribePriceStreaming_StopsOnContextCancel(t *testing.T) {
connected := make(chan struct{}, 1)

// Minimal SSE server: open the stream, emit a keepalive comment, then hold the connection
// open until the client disconnects (i.e. until the caller's context is cancelled).
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
if f, ok := w.(http.Flusher); ok {
// A comment line keeps the SSE parser happy without needing a valid price payload.
_, _ = w.Write([]byte(": connected\n\n"))
f.Flush()
}
select {
case connected <- struct{}{}:
default:
}
<-r.Context().Done()
}))
defer srv.Close()

cfg := testConfig
cfg.APIEndpoint = srv.URL
pythClient, err := hermes.NewClient(&cfg, slog.Default())
assert.NoError(t, err)

// Count subscribe goroutines by their stack root. Other tests in this package subscribe with
// a background context that is never cancelled, so they leave their own subscribeWithRetries
// goroutines parked in this shared process — measure a delta against that baseline rather than
// an absolute count.
const marker = "hermes.(*Client).subscribeWithRetries"
subscribeGoroutines := func() int { return strings.Count(goroutineDump(), marker) }

base := subscribeGoroutines()

ctx, cancel := context.WithCancel(context.Background())
pythClient.SubscribePriceStreaming(ctx, testPairs)

// Wait until the stream is actually established before cancelling, so we exercise the
// reconnect loop's cancellation path rather than a connect-time failure.
select {
case <-connected:
case <-time.After(5 * time.Second):
cancel()
t.Fatal("SSE stream never connected to the test server")
}

cancel()

// With the fix our subscribe goroutine returns and the count drops back to baseline; with the
// bug it spins on the cancelled context forever and the count stays at baseline+1 until this
// times out.
deadline := time.Now().Add(8 * time.Second)
for time.Now().Before(deadline) {
if subscribeGoroutines() <= base {
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf(
"subscribe goroutine still parked in the retry loop after context cancel (leaked): "+
"want <= %d subscribe goroutines, still have %d:\n%s",
base, subscribeGoroutines(), goroutineDump(),
)
}

// goroutineDump returns the stack traces of all running goroutines.
func goroutineDump() string {
buf := make([]byte, 1<<16)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
return string(buf[:n])
}
buf = make([]byte, 2*len(buf))
}
}

// To run this benchmark only without other tests: `go test -run=^$ -bench=BenchmarkGetCachedLatestPriceUpdates`
func BenchmarkGetCachedLatestPriceUpdates(b *testing.B) {
ctx, pythClient := setUp()
Expand Down
6 changes: 6 additions & 0 deletions hermes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hermes

import (
"sync"
"time"

"github.com/berachain/go-pyth-client/types"
)
Expand All @@ -14,6 +15,11 @@ type ssePriceData struct {
ready chan struct{}
broadcastOnce sync.Once
latestPrice map[string]*types.LatestPriceData

// lastEventAt is the wall-clock time the most recent valid update was received from any
// feed. It is the stream-liveness (transport-health) signal; per-feed price freshness is
// derived from each feed's Pyth publish_time instead. Guarded by mu.
lastEventAt time.Time
}

// JSON formatted price returned from the `v2/updates/price/latest` endpoint.
Expand Down
Loading