diff --git a/.gitignore b/.gitignore index 8c28947..4391a12 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ broadcast/ # testing **/coverage*.txt **/lcov.info +.devcontainer/ diff --git a/go.mod b/go.mod index 0f9726d..06457f9 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/r3labs/sse/v2 v2.10.0 github.com/stretchr/testify v1.9.0 golang.org/x/sync v0.17.0 + gopkg.in/cenkalti/backoff.v1 v1.1.0 ) require ( @@ -262,7 +263,6 @@ require ( golang.org/x/tools/go/expect v0.1.1-deprecated // indirect golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated // indirect google.golang.org/protobuf v1.36.7 // indirect - gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/hermes/latest_price_sse.go b/hermes/latest_price_sse.go index b1d0a2c..1f6887d 100644 --- a/hermes/latest_price_sse.go +++ b/hermes/latest_price_sse.go @@ -12,12 +12,14 @@ import ( "github.com/berachain/go-pyth-client/types" "github.com/ethereum/go-ethereum/common" sse "github.com/r3labs/sse/v2" + backoff "gopkg.in/cenkalti/backoff.v1" ) -// Retry parameters. +// Resubscribe backoff parameters. These bound how fast we re-subscribe after the rare +// unrecoverable error; routine reconnects are handled inside the SSE client itself. const ( initialBackoff = 1 * time.Second - maxRetries = 3 + maxBackoff = 30 * time.Second ) // Subscribe price feed from the streaming `v2/updates/price/stream` endpoint. Ensures this only @@ -26,6 +28,28 @@ func (c *Client) SubscribePriceStreaming(ctx context.Context, priceFeedIDs []str c.subscribeOnce.Do(func() { client := sse.NewClient(c.buildBatchURLStream(priceFeedIDs)) + // hermes.pyth.network sits behind Cloudflare, which periodically resets the HTTP/2 + // stream (~5-12 min, INTERNAL_ERROR) by design while leaving the connection intact. + // Reconnect through these resets indefinitely: the default strategy's 15-min + // MaxElapsedTime is measured from subscription start and never reset across + // reconnects, so it otherwise expires and surfaces a routine reset as a fatal error. + // MaxElapsedTime = 0 means "never give up". + reconnect := backoff.NewExponentialBackOff() + reconnect.MaxElapsedTime = 0 + // Make the strategy context-aware. r3labs runs the reconnect loop via + // backoff.RetryNotify, which only exits on a clean shutdown when the backoff carries + // the caller's context (otherwise ensureContext wraps it in context.Background(), whose + // Done() never fires). With MaxElapsedTime = 0 the loop never stops on its own, so + // without this the subscribe goroutine would leak when the caller cancels ctx. + client.ReconnectStrategy = backoff.WithContext(reconnect, ctx) + + // These reconnects are the recoverable, expected case (Cloudflare stream resets), + // so log them at info. The price-staleness metric is what alerts if reconnects + // stop delivering data; this is purely for visibility. + client.ReconnectNotify = func(err error, d time.Duration) { + c.logger.Info("SSE stream reconnecting", "error", err, "backoff", d.String()) + } + subscribe := func() error { return client.SubscribeRawWithContext(ctx, func(msg *sse.Event) { c.handleSseEvent(msg) @@ -69,6 +93,34 @@ func (c *Client) GetCachedLatestPriceUpdates( return cachedUpdates, nil } +// LastStreamUpdate reports the wall-clock time the SSE stream last delivered a valid price +// update from any feed. A zero time means no update has been received yet. This is the +// stream-liveness signal: callers can export `time.Since(LastStreamUpdate())` as a metric to +// alert when the whole stream goes dark, independent of how many times it reconnects. +func (c *Client) LastStreamUpdate() time.Time { + c.ssePriceCached.mu.RLock() + defer c.ssePriceCached.mu.RUnlock() + return c.ssePriceCached.lastEventAt +} + +// LastFeedPublishTime reports Pyth's publish_time for the latest cached price of the given +// feed. A zero time means the feed has not been seen yet. This is the per-feed staleness +// signal: callers can export `time.Since(LastFeedPublishTime(id))` as a per-feed metric +// (e.g. NoPriceUpdateSince{feed="..."}) to detect a single feed going stale at the source +// even while the stream itself stays alive. +func (c *Client) LastFeedPublishTime(priceFeedID string) time.Time { + priceFeedIDRaw := hex.EncodeToString(common.FromHex(priceFeedID)) + + c.ssePriceCached.mu.RLock() + defer c.ssePriceCached.mu.RUnlock() + + lpd, ok := c.ssePriceCached.latestPrice[priceFeedIDRaw] + if !ok || lpd.PriceFeed == nil || lpd.PriceFeed.Price.PublishTime == nil { + return time.Time{} + } + return time.Unix(lpd.PriceFeed.Price.PublishTime.Int64(), 0) +} + // Handler of the sse streaming event. func (c *Client) handleSseEvent(event *sse.Event) { // Decode the price from sse response to LatestPriceData. @@ -86,6 +138,11 @@ func (c *Client) handleSseEvent(event *sse.Event) { c.logger.Error( "encountered an error when decoding price response from sse stream", "error", err, ) + } else { + // Record stream liveness: the wall-clock time we last received a valid update from + // any feed. Used as the transport-health backstop behind the per-feed publish_time + // staleness signal (see LastStreamUpdate / LastFeedPublishTime). + c.ssePriceCached.lastEventAt = time.Now() } c.ssePriceCached.mu.Unlock() @@ -113,36 +170,42 @@ func (c *Client) waitForReady(ctx context.Context) error { } } -// subscribeWithRetries retries a subscription up to maxRetries with exponential backoff. -// If the task fails after maxRetries, it panics to indicate a critical failure. +// subscribeWithRetries supervises the SSE subscription. The SSE client reconnects through +// transient disconnects on its own (see ReconnectStrategy / ReconnectNotify), so subscribe() +// only returns on a clean shutdown (context cancelled) or an error the client deemed +// unrecoverable. The unrecoverable case is logged at error level and we re-subscribe with a +// capped backoff rather than panicking: a crash is never better than a retry, and the +// price-staleness metric is the authoritative "things are broken" signal regardless. func (c *Client) subscribeWithRetries(ctx context.Context, subscribe func() error) { - backoff := initialBackoff - retries := 0 + wait := initialBackoff + + for { + err := subscribe() + + // Distinguish a clean shutdown from a genuine failure. + if ctx.Err() != nil { + return + } + if err == nil { + return + } + + c.logger.Error( + "SSE subscription returned an unrecoverable error, resubscribing...", + "error", err, "backoff", wait.String(), + ) - for retries < maxRetries { select { case <-ctx.Done(): - c.logger.Error("context cancelled while trying to subscribe to SSE stream") return - default: - err := subscribe() - if err == nil { - return - } + // #nosec:G404 // jitter only, not security-sensitive. + case <-time.After(wait + time.Duration(rand.Intn(1000))*time.Millisecond): + } - retries++ - c.logger.Error( - "encountered an error when subscribing to SSE stream, now retrying...", - "error", err, "num_retries", retries, - ) - if retries >= maxRetries { - panic(fmt.Sprintf("failed to subscribe to SSE stream after %d attempts: %v", - retries, err)) + if wait < maxBackoff { + if wait *= 2; wait > maxBackoff { + wait = maxBackoff } - - // #nosec:G404 // fine. - time.Sleep(backoff + time.Duration(rand.Intn(1000))*time.Millisecond) - backoff *= 2 } } } diff --git a/hermes/latest_price_sse_test.go b/hermes/latest_price_sse_test.go index 841c7b1..9f22fbf 100644 --- a/hermes/latest_price_sse_test.go +++ b/hermes/latest_price_sse_test.go @@ -1,8 +1,16 @@ package hermes_test import ( + "context" + "log/slog" + "net/http" + "net/http/httptest" + "runtime" + "strings" "testing" + "time" + "github.com/berachain/go-pyth-client/hermes" "github.com/stretchr/testify/assert" ) @@ -45,6 +53,91 @@ func TestSubscribePriceStreaming_PriceFeedNotSubscribed(t *testing.T) { assert.Nil(t, prices) } +// TestSubscribePriceStreaming_StopsOnContextCancel is a regression test for a goroutine leak on +// shutdown. The SSE reconnect loop runs inside r3labs via backoff.RetryNotify, which only exits on +// context cancellation when the ReconnectStrategy carries the caller's context. With +// MaxElapsedTime = 0 (never give up) and a non-context-aware backoff, ensureContext wrapped it in +// context.Background() — whose Done() never fires — so cancelling ctx never stopped the loop and +// the subscribe goroutine leaked forever. This test drives the real r3labs + backoff path against +// a local SSE server, cancels the context, and asserts the goroutine actually returns. +func TestSubscribePriceStreaming_StopsOnContextCancel(t *testing.T) { + connected := make(chan struct{}, 1) + + // Minimal SSE server: open the stream, emit a keepalive comment, then hold the connection + // open until the client disconnects (i.e. until the caller's context is cancelled). + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + if f, ok := w.(http.Flusher); ok { + // A comment line keeps the SSE parser happy without needing a valid price payload. + _, _ = w.Write([]byte(": connected\n\n")) + f.Flush() + } + select { + case connected <- struct{}{}: + default: + } + <-r.Context().Done() + })) + defer srv.Close() + + cfg := testConfig + cfg.APIEndpoint = srv.URL + pythClient, err := hermes.NewClient(&cfg, slog.Default()) + assert.NoError(t, err) + + // Count subscribe goroutines by their stack root. Other tests in this package subscribe with + // a background context that is never cancelled, so they leave their own subscribeWithRetries + // goroutines parked in this shared process — measure a delta against that baseline rather than + // an absolute count. + const marker = "hermes.(*Client).subscribeWithRetries" + subscribeGoroutines := func() int { return strings.Count(goroutineDump(), marker) } + + base := subscribeGoroutines() + + ctx, cancel := context.WithCancel(context.Background()) + pythClient.SubscribePriceStreaming(ctx, testPairs) + + // Wait until the stream is actually established before cancelling, so we exercise the + // reconnect loop's cancellation path rather than a connect-time failure. + select { + case <-connected: + case <-time.After(5 * time.Second): + cancel() + t.Fatal("SSE stream never connected to the test server") + } + + cancel() + + // With the fix our subscribe goroutine returns and the count drops back to baseline; with the + // bug it spins on the cancelled context forever and the count stays at baseline+1 until this + // times out. + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) { + if subscribeGoroutines() <= base { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf( + "subscribe goroutine still parked in the retry loop after context cancel (leaked): "+ + "want <= %d subscribe goroutines, still have %d:\n%s", + base, subscribeGoroutines(), goroutineDump(), + ) +} + +// goroutineDump returns the stack traces of all running goroutines. +func goroutineDump() string { + buf := make([]byte, 1<<16) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return string(buf[:n]) + } + buf = make([]byte, 2*len(buf)) + } +} + // To run this benchmark only without other tests: `go test -run=^$ -bench=BenchmarkGetCachedLatestPriceUpdates` func BenchmarkGetCachedLatestPriceUpdates(b *testing.B) { ctx, pythClient := setUp() diff --git a/hermes/types.go b/hermes/types.go index a63e5e5..0185bcd 100644 --- a/hermes/types.go +++ b/hermes/types.go @@ -2,6 +2,7 @@ package hermes import ( "sync" + "time" "github.com/berachain/go-pyth-client/types" ) @@ -14,6 +15,11 @@ type ssePriceData struct { ready chan struct{} broadcastOnce sync.Once latestPrice map[string]*types.LatestPriceData + + // lastEventAt is the wall-clock time the most recent valid update was received from any + // feed. It is the stream-liveness (transport-health) signal; per-feed price freshness is + // derived from each feed's Pyth publish_time instead. Guarded by mu. + lastEventAt time.Time } // JSON formatted price returned from the `v2/updates/price/latest` endpoint.