Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ipykernel/inprocess/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class InProcessKernel(IPythonKernel):

@default("iopub_thread")
def _default_iopub_thread(self):
thread = IOPubThread(self._underlying_iopub_socket)
thread = IOPubThread(self._underlying_iopub_socket, self.session)
thread.start()
return thread

Expand Down
75 changes: 73 additions & 2 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class IOPubThread:
whose IO is always run in a thread.
"""

def __init__(self, socket, pipe=False):
def __init__(self, socket, session=None, pipe=False):
"""Create IOPub thread

Parameters
Expand All @@ -59,6 +59,7 @@ def __init__(self, socket, pipe=False):
piped from subprocesses.
"""
self.socket = socket
self.session = session
self._stopped = False
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
Expand All @@ -73,12 +74,82 @@ def __init__(self, socket, pipe=False):
self._event_pipe_gc_seconds: float = 10
self._event_pipe_gc_task: asyncio.Task[Any] | None = None
self._setup_event_pipe()
self._setup_xpub_listener()
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
self.thread.pydev_do_not_trace = True # type:ignore[attr-defined]
self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined]
self.thread.name = "IOPub"

def _setup_xpub_listener(self):
"""Setup listener for XPUB subscription events"""

# Checks the socket is not a DummySocket
if not hasattr(self.socket, "getsockopt"):
return

socket_type = self.socket.getsockopt(zmq.TYPE)
if socket_type == zmq.XPUB:
self._xpub_stream = ZMQStream(self.socket, self.io_loop)
self._xpub_stream.on_recv(self._handle_subscription)

def _handle_subscription(self, frames):
"""Handle subscription/unsubscription events from XPUB socket

XPUB sockets receive:
- subscribe: single frame with b'\\x01' + topic
- unsubscribe: single frame with b'\\x00' + topic
"""

for frame in frames:
event_type = frame[0]
if event_type == 1:
subscription = frame[1:] if len(frame) > 1 else b""
try:
subscription_str = subscription.decode("utf-8")
except UnicodeDecodeError:
continue
self._send_welcome_message(subscription_str)

def _send_welcome_message(self, subscription):
"""Send iopub_welcome message for new subscription

Parameters
----------
subscription : str
The subscription topic (UTF-8 decoded)
"""

# TODO: This early return is for backward-compatibility with ipyparallel.
# This should be removed when ipykernel has been released with support of
# xpub and ipyparallel has been updated to pass the session parameter
# to IOPubThread upon construction.
# (NB: the call to fix is here:
# https://github.com/ipython/ipyparallel/blob/main/ipyparallel/engine/app.py#L679
if self.session is None:
return

content = {"subscription": subscription}

header = self.session.msg_header("iopub_welcome")
msg = {
"header": header,
"parent_header": {},
"metadata": {},
"content": content,
"buffers": [],
}

msg_list = self.session.serialize(msg)

if subscription:
identity = subscription.encode("utf-8")
full_msg = [identity, *msg_list]
else:
full_msg = msg_list
# Send directly on socket (we're already in IO thread context)
self.socket.send_multipart(full_msg)

def _thread_main(self):
"""The inner loop that's actually run in a thread"""

Expand Down Expand Up @@ -447,7 +518,7 @@ def __init__(
DeprecationWarning,
stacklevel=2,
)
pub_thread = IOPubThread(pub_thread)
pub_thread = IOPubThread(pub_thread, self.session)
pub_thread.start()
self.pub_thread = pub_thread
self.name = name
Expand Down
4 changes: 2 additions & 2 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,12 @@ def init_control(self, context):

def init_iopub(self, context):
"""Initialize the iopub channel."""
self.iopub_socket = context.socket(zmq.PUB)
self.iopub_socket = context.socket(zmq.XPUB)
self.iopub_socket.linger = 1000
self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
self.log.debug("iopub PUB Channel on port: %i", self.iopub_port)
self.configure_tornado_logger()
self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True)
self.iopub_thread = IOPubThread(self.iopub_socket, self.session, pipe=True)
self.iopub_thread.start()
# backward-compat: wrap iopub socket API in background thread
self.iopub_socket = self.iopub_thread.background_socket
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class KernelMixin:

def _initialize(self):
self.context = context = zmq.Context()
self.iopub_socket = context.socket(zmq.PUB)
self.iopub_socket = context.socket(zmq.XPUB)
self.stdin_socket = context.socket(zmq.ROUTER)
self.session = Session()
self.test_sockets = [self.iopub_socket]
Expand Down
5 changes: 3 additions & 2 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ def ctx():

@pytest.fixture()
def iopub_thread(ctx):
session = Session()
with ctx.socket(zmq.PUB) as pub:
thread = IOPubThread(pub)
thread = IOPubThread(pub, session)
thread.start()

yield thread
Expand Down Expand Up @@ -155,7 +156,7 @@ def subprocess_test_echo_watch():
# use PUSH socket to avoid subscription issues
with zmq.Context() as ctx, ctx.socket(zmq.PUSH) as pub:
pub.connect(os.environ["IOPUB_URL"])
iopub_thread = IOPubThread(pub)
iopub_thread = IOPubThread(pub, session)
iopub_thread.start()
stdout_fd = sys.stdout.fileno()
sys.stdout.flush()
Expand Down
Loading