-
Notifications
You must be signed in to change notification settings - Fork 53
Adds long-running websocket stream consumer example #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| # WebSocket Stream Consumer - Bluesky Firehose | ||
|
|
||
| This example demonstrates a long-running Durable Object that connects to the Bluesky firehose (via Jetstream) and filters for post events, with rate limiting to print at most 1 per second. | ||
|
|
||
| ## How to Run | ||
|
|
||
| First ensure that `uv` is installed: | ||
| https://docs.astral.sh/uv/getting-started/installation/#standalone-installer | ||
|
|
||
| Now, if you run `uv run pywrangler dev` within this directory, it should use the config | ||
| in `wrangler.jsonc` to run the example. | ||
|
|
||
| You can also run `uv run pywrangler deploy` to deploy the example. | ||
|
|
||
| ## Testing the Firehose Consumer | ||
|
|
||
| 1. Start the worker: `uv run pywrangler dev` | ||
| 2. Make any request to initialize the DO: `curl "http://localhost:8787/status"` | ||
| 3. Watch the logs to see filtered Bluesky post events in real-time (rate limited to 1/sec)! | ||
|
|
||
| The Durable Object automatically connects to Jetstream when first accessed. It will maintain a persistent WebSocket connection and print out post events to the console, including the author DID, post text (truncated to 100 chars), and timestamp. Posts are rate limited to display at most 1 per second to avoid overwhelming the logs. | ||
|
|
||
| **Available endpoints:** | ||
| - `/status` - Check connection status |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| { | ||
| "name": "python-websocket-stream-consumer", | ||
| "version": "0.0.0", | ||
| "private": true, | ||
| "scripts": { | ||
| "deploy": "uv run pywrangler deploy", | ||
| "dev": "uv run pywrangler dev", | ||
| "start": "uv run pywrangler dev" | ||
| }, | ||
| "devDependencies": { | ||
| "wrangler": "^4.46.0" | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| [project] | ||
| name = "python-websocket-stream-consumer" | ||
| version = "0.1.0" | ||
| description = "Python WebSocket stream consumer example" | ||
| readme = "README.md" | ||
| requires-python = ">=3.12" | ||
| dependencies = [ | ||
| "webtypy>=0.1.7", | ||
| ] | ||
|
|
||
| [dependency-groups] | ||
| dev = [ | ||
| "workers-py", | ||
| "workers-runtime-sdk" | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| from workers import WorkerEntrypoint, Response, DurableObject | ||
| import js | ||
| import json | ||
| import time | ||
| from pyodide.ffi import create_proxy | ||
| from urllib.parse import urlparse | ||
|
|
||
|
|
||
| class BlueskyFirehoseConsumer(DurableObject): | ||
| """Durable Object that maintains a persistent WebSocket connection to Bluesky Jetstream.""" | ||
|
|
||
| def __init__(self, state, env): | ||
| super().__init__(state, env) | ||
| self.websocket = None | ||
| self.connected = False | ||
| self.last_print_time = 0 # Track last time we printed a post | ||
|
|
||
| async def fetch(self, request): | ||
| """Handle incoming requests to the Durable Object.""" | ||
| # If we're not connected then make sure we start a connection. | ||
| if not self.connected: | ||
| await self._schedule_next_alarm() | ||
| await self._connect_to_jetstream() | ||
|
|
||
| url = urlparse(request.url) | ||
| path = url.path | ||
|
|
||
| if path == "/status": | ||
| status = "connected" if self.connected else "disconnected" | ||
| return Response(f"Firehose status: {status}") | ||
| else: | ||
| return Response("Available endpoints: /status") | ||
|
|
||
| async def alarm(self): | ||
| """Handle alarm events - used to ensure that the DO stays alive and connected""" | ||
| print("Alarm triggered - making sure we are connected to jetstream...") | ||
| if not self.connected: | ||
| await self._connect_to_jetstream() | ||
| else: | ||
| print("Already connected, skipping reconnection") | ||
|
|
||
| # Schedule the next alarm to keep the DO alive | ||
| await self._schedule_next_alarm() | ||
|
|
||
| async def _schedule_next_alarm(self): | ||
| """Schedule the next alarm to run in 1 minute to keep the DO alive.""" | ||
| # Schedule alarm for 1 minute from now, overwriting any existing alarms | ||
| next_alarm_time = int(time.time() * 1000) + 60000 | ||
| return await self.ctx.storage.setAlarm(next_alarm_time) | ||
|
|
||
| async def _on_open(self, event): | ||
| """Handle WebSocket open event.""" | ||
| self.connected = True | ||
| print("Connected to Bluesky Jetstream firehose!") | ||
| print("Filtering for: app.bsky.feed.post (post events, rate limited to 1/sec)") | ||
| # Ensure alarm is set when we connect | ||
| await self._schedule_next_alarm() | ||
|
|
||
| def _on_message(self, event): | ||
| """Handle incoming WebSocket messages.""" | ||
| try: | ||
| # Parse the JSON message | ||
| data = json.loads(event.data) | ||
|
|
||
| # Store the timestamp for resumption on reconnect | ||
| if time_us := data.get("time_us"): | ||
| # Store the timestamp asynchronously | ||
| self.ctx.storage.kv.put("last_event_timestamp", time_us) | ||
|
|
||
| # Jetstream sends different event types | ||
| # We're interested in 'commit' events which contain posts | ||
| if data.get("kind") != "commit": | ||
| return | ||
|
|
||
| commit = data.get("commit", {}) | ||
| collection = commit.get("collection") | ||
|
|
||
| # Filter for post events | ||
| if collection != "app.bsky.feed.post": | ||
| return | ||
|
|
||
| # Rate limiting: only print at most 1 per second | ||
| current_time = time.time() | ||
| if current_time - self.last_print_time >= 1.0: | ||
| record = commit.get("record", {}) | ||
| print("Post record", record) | ||
|
|
||
| # Update last print time | ||
| self.last_print_time = current_time | ||
|
|
||
| except Exception as e: | ||
| print(f"Error processing message: {e}") | ||
|
|
||
| def _on_error(self, event): | ||
| """Handle WebSocket error event.""" | ||
| print(f"WebSocket error: {event}") | ||
| self.connected = False | ||
| self.ctx.abort("WebSocket error occurred") | ||
|
|
||
| async def _on_close(self, event): | ||
| """Handle WebSocket close event.""" | ||
| print(f"WebSocket closed: code={event.code}, reason={event.reason}") | ||
| self.connected = False | ||
| self.ctx.abort("WebSocket closed") | ||
|
|
||
| async def _connect_to_jetstream(self): | ||
| """Connect to the Bluesky Jetstream WebSocket and start consuming events.""" | ||
| # Get the last event timestamp from storage to resume from the right position | ||
| last_timestamp = self.ctx.storage.kv.get("last_event_timestamp") | ||
|
|
||
| # Jetstream endpoint - we'll filter for posts | ||
| # Using wantedCollections parameter to only get post events | ||
| jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post" | ||
|
|
||
| # If we have a last timestamp, add it to resume from that point | ||
| if last_timestamp: | ||
| jetstream_url += f"&cursor={last_timestamp}" | ||
| print( | ||
| f"Connecting to Bluesky Jetstream at {jetstream_url} (resuming from timestamp: {last_timestamp})" | ||
| ) | ||
| else: | ||
| print( | ||
| f"Connecting to Bluesky Jetstream at {jetstream_url} (starting fresh)" | ||
| ) | ||
|
|
||
| # Create WebSocket using JS FFI | ||
| ws = js.WebSocket.new(jetstream_url) | ||
| self.websocket = ws | ||
|
|
||
| # Attach event handlers | ||
| # | ||
| # Note that ordinarily proxies need to be destroyed once they are no longer used. | ||
| # However, in this Durable Object context, the WebSocket and its event listeners | ||
| # persist for the lifetime of the Durable Object, so we don't explicitly destroy | ||
| # the proxies here. When the websocket connection closes, the Durable Object | ||
| # is restarted which destroys these proxies. | ||
| # | ||
| # In the future, we plan to provide support for native Python websocket APIs which | ||
| # should eliminate the need for proxy wrappers. | ||
| ws.addEventListener("open", create_proxy(self._on_open)) | ||
| ws.addEventListener("message", create_proxy(self._on_message)) | ||
| ws.addEventListener("error", create_proxy(self._on_error)) | ||
| ws.addEventListener("close", create_proxy(self._on_close)) | ||
|
|
||
|
|
||
| class Default(WorkerEntrypoint): | ||
| """Main worker entry point that routes requests to the Durable Object.""" | ||
|
|
||
| async def fetch(self, request): | ||
| # Get the Durable Object namespace from the environment | ||
| namespace = self.env.BLUESKY_FIREHOSE | ||
|
|
||
| # Use a fixed ID so we always connect to the same Durable Object instance | ||
| # This ensures we maintain a single persistent connection | ||
| id = namespace.idFromName("bluesky-consumer") | ||
| stub = namespace.get(id) | ||
|
|
||
| # Forward the request to the Durable Object | ||
| return await stub.fetch(request) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| { | ||
| "$schema": "node_modules/wrangler/config-schema.json", | ||
| "name": "python-websocket-stream-consumer", | ||
| "main": "src/entry.py", | ||
| "compatibility_date": "2025-11-02", | ||
| "compatibility_flags": [ | ||
| "python_workers" | ||
| ], | ||
| "observability": { | ||
| "enabled": true | ||
| }, | ||
| "durable_objects": { | ||
| "bindings": [ | ||
| { | ||
| "name": "BLUESKY_FIREHOSE", | ||
| "class_name": "BlueskyFirehoseConsumer", | ||
| "script_name": "python-websocket-stream-consumer" | ||
| } | ||
| ] | ||
| }, | ||
| "migrations": [ | ||
| { | ||
| "tag": "v1", | ||
| "new_sqlite_classes": [ | ||
| "BlueskyFirehoseConsumer" | ||
| ] | ||
| } | ||
| ] | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does
ctx.abort()behave well?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should do, why wouldn't it? It restarts the DO.