Skip to content

Commit 8be7770

Browse files
committed
Replaced PUB socket with XPUB socket
1 parent 327589f commit 8be7770

File tree

5 files changed

+71
-8
lines changed

5 files changed

+71
-8
lines changed

ipykernel/inprocess/ipkernel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class InProcessKernel(IPythonKernel):
5656

5757
@default("iopub_thread")
5858
def _default_iopub_thread(self):
59-
thread = IOPubThread(self._underlying_iopub_socket)
59+
thread = IOPubThread(self._underlying_iopub_socket, self.session)
6060
thread.start()
6161
return thread
6262

ipykernel/iostream.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class IOPubThread:
4747
whose IO is always run in a thread.
4848
"""
4949

50-
def __init__(self, socket, pipe=False):
50+
def __init__(self, socket, session, pipe=False):
5151
"""Create IOPub thread
5252
5353
Parameters
@@ -59,6 +59,7 @@ def __init__(self, socket, pipe=False):
5959
piped from subprocesses.
6060
"""
6161
self.socket = socket
62+
self.session = session
6263
self._stopped = False
6364
self.background_socket = BackgroundSocket(self)
6465
self._master_pid = os.getpid()
@@ -73,12 +74,73 @@ def __init__(self, socket, pipe=False):
7374
self._event_pipe_gc_seconds: float = 10
7475
self._event_pipe_gc_task: asyncio.Task[Any] | None = None
7576
self._setup_event_pipe()
77+
self._setup_xpub_listener()
7678
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
7779
self.thread.daemon = True
7880
self.thread.pydev_do_not_trace = True # type:ignore[attr-defined]
7981
self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined]
8082
self.thread.name = "IOPub"
8183

84+
def _setup_xpub_listener(self):
85+
"""Setup listener for XPUB subscription events"""
86+
87+
# Checks the socket is not a DummySocket
88+
if not hasattr(self.socket, "getsockopt"):
89+
return
90+
91+
socket_type = self.socket.getsockopt(zmq.TYPE)
92+
if socket_type == zmq.XPUB:
93+
self._xpub_stream = ZMQStream(self.socket, self.io_loop)
94+
self._xpub_stream.on_recv(self._handle_subscription)
95+
96+
def _handle_subscription(self, frames):
97+
"""Handle subscription/unsubscription events from XPUB socket
98+
99+
XPUB sockets receive:
100+
- subscribe: single frame with b'\\x01' + topic
101+
- unsubscribe: single frame with b'\\x00' + topic
102+
"""
103+
104+
for frame in frames:
105+
event_type = frame[0]
106+
if event_type == 1:
107+
subscription = frame[1:] if len(frame) > 1 else b""
108+
try:
109+
subscription_str = subscription.decode("utf-8")
110+
except UnicodeDecodeError:
111+
continue
112+
self._send_welcome_message(subscription_str)
113+
114+
def _send_welcome_message(self, subscription):
115+
"""Send iopub_welcome message for new subscription
116+
117+
Parameters
118+
----------
119+
subscription : str
120+
The subscription topic (UTF-8 decoded)
121+
"""
122+
123+
content = {"subscription": subscription}
124+
125+
header = self.session.msg_header("iopub_welcome")
126+
msg = {
127+
"header": header,
128+
"parent_header": {},
129+
"metadata": {},
130+
"content": content,
131+
"buffers": [],
132+
}
133+
134+
msg_list = self.session.serialize(msg)
135+
136+
if subscription:
137+
identity = subscription.encode("utf-8")
138+
full_msg = [identity, *msg_list]
139+
else:
140+
full_msg = msg_list
141+
# Send directly on socket (we're already in IO thread context)
142+
self.socket.send_multipart(full_msg)
143+
82144
def _thread_main(self):
83145
"""The inner loop that's actually run in a thread"""
84146

@@ -447,7 +509,7 @@ def __init__(
447509
DeprecationWarning,
448510
stacklevel=2,
449511
)
450-
pub_thread = IOPubThread(pub_thread)
512+
pub_thread = IOPubThread(pub_thread, self.session)
451513
pub_thread.start()
452514
self.pub_thread = pub_thread
453515
self.name = name

ipykernel/kernelapp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,12 +377,12 @@ def init_control(self, context):
377377

378378
def init_iopub(self, context):
379379
"""Initialize the iopub channel."""
380-
self.iopub_socket = context.socket(zmq.PUB)
380+
self.iopub_socket = context.socket(zmq.XPUB)
381381
self.iopub_socket.linger = 1000
382382
self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
383383
self.log.debug("iopub PUB Channel on port: %i", self.iopub_port)
384384
self.configure_tornado_logger()
385-
self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True)
385+
self.iopub_thread = IOPubThread(self.iopub_socket, self.session, pipe=True)
386386
self.iopub_thread.start()
387387
# backward-compat: wrap iopub socket API in background thread
388388
self.iopub_socket = self.iopub_thread.background_socket

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class KernelMixin:
4848

4949
def _initialize(self):
5050
self.context = context = zmq.Context()
51-
self.iopub_socket = context.socket(zmq.PUB)
51+
self.iopub_socket = context.socket(zmq.XPUB)
5252
self.stdin_socket = context.socket(zmq.ROUTER)
5353
self.session = Session()
5454
self.test_sockets = [self.iopub_socket]

tests/test_io.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ def ctx():
2626

2727
@pytest.fixture()
2828
def iopub_thread(ctx):
29+
session = Session()
2930
with ctx.socket(zmq.PUB) as pub:
30-
thread = IOPubThread(pub)
31+
thread = IOPubThread(pub, session)
3132
thread.start()
3233

3334
yield thread
@@ -155,7 +156,7 @@ def subprocess_test_echo_watch():
155156
# use PUSH socket to avoid subscription issues
156157
with zmq.Context() as ctx, ctx.socket(zmq.PUSH) as pub:
157158
pub.connect(os.environ["IOPUB_URL"])
158-
iopub_thread = IOPubThread(pub)
159+
iopub_thread = IOPubThread(pub, session)
159160
iopub_thread.start()
160161
stdout_fd = sys.stdout.fileno()
161162
sys.stdout.flush()

0 commit comments

Comments
 (0)