22import js
33import json
44import time
5- from pyodide .ffi import to_js , create_proxy
5+ from pyodide .ffi import create_proxy
6+
67
78class BlueskyFirehoseConsumer (DurableObject ):
89 """Durable Object that maintains a persistent WebSocket connection to Bluesky Jetstream."""
@@ -63,9 +64,13 @@ async def _connect_to_jetstream(self):
6364 # If we have a last timestamp, add it to resume from that point
6465 if last_timestamp :
6566 jetstream_url += f"&cursor={ last_timestamp } "
66- print (f"Connecting to Bluesky Jetstream at { jetstream_url } (resuming from timestamp: { last_timestamp } )" )
67+ print (
68+ f"Connecting to Bluesky Jetstream at { jetstream_url } (resuming from timestamp: { last_timestamp } )"
69+ )
6770 else :
68- print (f"Connecting to Bluesky Jetstream at { jetstream_url } (starting fresh)" )
71+ print (
72+ f"Connecting to Bluesky Jetstream at { jetstream_url } (starting fresh)"
73+ )
6974
7075 # Create WebSocket using JS FFI
7176 ws = js .WebSocket .new (jetstream_url )
@@ -75,7 +80,9 @@ async def _connect_to_jetstream(self):
7580 async def on_open (event ):
7681 self .connected = True
7782 print ("Connected to Bluesky Jetstream firehose!" )
78- print ("Filtering for: app.bsky.feed.post (post events, rate limited to 1/sec)" )
83+ print (
84+ "Filtering for: app.bsky.feed.post (post events, rate limited to 1/sec)"
85+ )
7986 # Ensure alarm is set when we connect
8087 await self ._schedule_next_alarm ()
8188
@@ -102,7 +109,7 @@ def on_message(event):
102109 current_time = time .time ()
103110 if current_time - self .last_print_time >= 1.0 :
104111 record = commit .get ("record" , {})
105- print (f "Post record" , record )
112+ print ("Post record" , record )
106113
107114 # Update last print time
108115 self .last_print_time = current_time
@@ -120,7 +127,6 @@ async def on_close(event):
120127 self .connected = False
121128 self .ctx .abort ("WebSocket closed unexpectedly" )
122129
123-
124130 # Attach event handlers
125131 #
126132 # Note that ordinarily proxies need to be destroyed once they are no longer used.
@@ -136,6 +142,7 @@ async def on_close(event):
136142 ws .addEventListener ("error" , create_proxy (on_error ))
137143 ws .addEventListener ("close" , create_proxy (on_close ))
138144
145+
139146class Default (WorkerEntrypoint ):
140147 """Main worker entry point that routes requests to the Durable Object."""
141148
0 commit comments