Skip to content

Commit e4b04ed

Browse files
committed
Adds long-running websocket stream consumer example
1 parent 606f91f commit e4b04ed

File tree

6 files changed

+402
-0
lines changed

6 files changed

+402
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# WebSocket Stream Consumer - Bluesky Firehose
2+
3+
This example demonstrates a long-running Durable Object that connects to the Bluesky firehose (via Jetstream) and filters for post events, with rate limiting to print at most 1 per second.
4+
5+
## How to Run
6+
7+
First ensure that `uv` is installed:
8+
https://docs.astral.sh/uv/getting-started/installation/#standalone-installer
9+
10+
Now, if you run `uv run pywrangler dev` within this directory, it should use the config
11+
in `wrangler.jsonc` to run the example.
12+
13+
You can also run `uv run pywrangler deploy` to deploy the example.
14+
15+
## Testing the Firehose Consumer
16+
17+
1. Start the worker: `uv run pywrangler dev`
18+
2. Make any request to initialize the DO: `curl "http://localhost:8787/status"`
19+
3. Watch the logs to see filtered Bluesky post events in real-time (rate limited to 1/sec)!
20+
21+
The Durable Object automatically connects to Jetstream when first accessed. It will maintain a persistent WebSocket connection and print out post events to the console, including the author DID, post text (truncated to 100 chars), and timestamp. Posts are rate limited to display at most 1 per second to avoid overwhelming the logs.
22+
23+
**Available endpoints:**
24+
- `/status` - Check connection status
25+
- `/reconnect` - Manually trigger reconnection if disconnected
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"name": "python-websocket-stream-consumer",
3+
"version": "0.0.0",
4+
"private": true,
5+
"scripts": {
6+
"deploy": "uv run pywrangler deploy",
7+
"dev": "uv run pywrangler dev",
8+
"start": "uv run pywrangler dev"
9+
},
10+
"devDependencies": {
11+
"wrangler": "^4.46.0"
12+
}
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[project]
2+
name = "python-websocket-stream-consumer"
3+
version = "0.1.0"
4+
description = "Python WebSocket stream consumer example"
5+
readme = "README.md"
6+
requires-python = ">=3.12"
7+
dependencies = [
8+
"webtypy>=0.1.7",
9+
]
10+
11+
[dependency-groups]
12+
dev = [
13+
"workers-py",
14+
"workers-runtime-sdk"
15+
]
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
from workers import WorkerEntrypoint, Response, DurableObject
2+
import js
3+
import json
4+
import time
5+
from pyodide.ffi import to_js, create_proxy
6+
7+
class BlueskyFirehoseConsumer(DurableObject):
8+
"""Durable Object that maintains a persistent WebSocket connection to Bluesky Jetstream."""
9+
10+
def __init__(self, state, env):
11+
super().__init__(state, env)
12+
self.websocket = None
13+
self.connected = False
14+
self.last_print_time = 0 # Track last time we printed a post
15+
16+
async def fetch(self, request):
17+
"""Handle incoming requests to the Durable Object."""
18+
# If we're not connected then make sure we start a connection.
19+
if not self.connected:
20+
await self._schedule_next_alarm()
21+
await self._connect_to_jetstream()
22+
23+
url = js.URL.new(request.url)
24+
path = url.pathname
25+
26+
if path == "/status":
27+
status = "connected" if self.connected else "disconnected"
28+
return Response(f"Firehose status: {status}")
29+
else:
30+
return Response("Available endpoints: /status, /reconnect")
31+
32+
async def alarm(self):
33+
"""Handle alarm events - used to ensure that the DO stays alive and connected"""
34+
print("Alarm triggered - making sure we are connected to jetstream...")
35+
if not self.connected:
36+
await self._connect_to_jetstream()
37+
else:
38+
print("Already connected, skipping reconnection")
39+
40+
# Schedule the next alarm to keep the DO alive
41+
await self._schedule_next_alarm()
42+
43+
async def _schedule_next_alarm(self):
44+
"""Schedule the next alarm to run in 1 minute to keep the DO alive."""
45+
# Check whether an alarm already exists and don't reschedule if so
46+
existing_alarm = await self.ctx.storage.getAlarm()
47+
if existing_alarm and existing_alarm > js.Date.now() + 10000:
48+
return
49+
50+
# Schedule alarm for 1 minute from now
51+
next_alarm_time = js.Date.now() + 60000
52+
return await self.ctx.storage.setAlarm(next_alarm_time)
53+
54+
async def _connect_to_jetstream(self):
55+
"""Connect to the Bluesky Jetstream WebSocket and start consuming events."""
56+
# Get the last event timestamp from storage to resume from the right position
57+
last_timestamp = self.ctx.storage.kv.get("last_event_timestamp")
58+
59+
# Jetstream endpoint - we'll filter for posts
60+
# Using wantedCollections parameter to only get post events
61+
jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post"
62+
63+
# If we have a last timestamp, add it to resume from that point
64+
if last_timestamp:
65+
jetstream_url += f"&cursor={last_timestamp}"
66+
print(f"Connecting to Bluesky Jetstream at {jetstream_url} (resuming from timestamp: {last_timestamp})")
67+
else:
68+
print(f"Connecting to Bluesky Jetstream at {jetstream_url} (starting fresh)")
69+
70+
# Create WebSocket using JS FFI
71+
ws = js.WebSocket.new(jetstream_url)
72+
self.websocket = ws
73+
74+
# Set up event handlers using JS FFI
75+
async def on_open(event):
76+
self.connected = True
77+
print("Connected to Bluesky Jetstream firehose!")
78+
print("Filtering for: app.bsky.feed.post (post events, rate limited to 1/sec)")
79+
# Ensure alarm is set when we connect
80+
await self._schedule_next_alarm()
81+
82+
def on_message(event):
83+
try:
84+
# Parse the JSON message
85+
data = json.loads(str(event.data))
86+
87+
# Store the timestamp for resumption on reconnect
88+
time_us = data.get("time_us")
89+
if time_us:
90+
# Store the timestamp asynchronously
91+
self.ctx.storage.kv.put("last_event_timestamp", time_us)
92+
93+
# Jetstream sends different event types
94+
# We're interested in 'commit' events which contain posts
95+
if data.get("kind") == "commit":
96+
commit = data.get("commit", {})
97+
collection = commit.get("collection")
98+
99+
# Filter for post events
100+
if collection == "app.bsky.feed.post":
101+
# Rate limiting: only print at most 1 per second
102+
current_time = time.time()
103+
if current_time - self.last_print_time >= 1.0:
104+
record = commit.get("record", {})
105+
print(f"Post record", record)
106+
107+
# Update last print time
108+
self.last_print_time = current_time
109+
110+
except Exception as e:
111+
print(f"Error processing message: {e}")
112+
113+
def on_error(event):
114+
print(f"WebSocket error: {event}")
115+
self.connected = False
116+
self.ctx.abort("WebSocket error occurred")
117+
118+
async def on_close(event):
119+
print(f"WebSocket closed: code={event.code}, reason={event.reason}")
120+
self.connected = False
121+
self.ctx.abort("WebSocket closed unexpectedly")
122+
123+
124+
# Attach event handlers
125+
#
126+
# Note that ordinarily proxies need to be destroyed once they are no longer used.
127+
# However, in this Durable Object context, the WebSocket and its event listeners
128+
# persist for the lifetime of the Durable Object, so we don't explicitly destroy
129+
# the proxies here. When the websocket connection closes, the Durable Object
130+
# is restarted which destroys these proxies.
131+
#
132+
# In the future, we plan to provide support for native Python websocket APIs which
133+
# should eliminate the need for proxy wrappers.
134+
ws.addEventListener("open", create_proxy(on_open))
135+
ws.addEventListener("message", create_proxy(on_message))
136+
ws.addEventListener("error", create_proxy(on_error))
137+
ws.addEventListener("close", create_proxy(on_close))
138+
139+
class Default(WorkerEntrypoint):
140+
"""Main worker entry point that routes requests to the Durable Object."""
141+
142+
async def fetch(self, request):
143+
# Get the Durable Object namespace from the environment
144+
namespace = self.env.BLUESKY_FIREHOSE
145+
146+
# Use a fixed ID so we always connect to the same Durable Object instance
147+
# This ensures we maintain a single persistent connection
148+
id = namespace.idFromName("bluesky-consumer")
149+
stub = namespace.get(id)
150+
151+
# Forward the request to the Durable Object
152+
return await stub.fetch(request)

14-websocket-stream-consumer/uv.lock

Lines changed: 168 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)