diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index b68bec442..7ed47443b 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -56,7 +56,7 @@ class InProcessKernel(IPythonKernel): @default("iopub_thread") def _default_iopub_thread(self): - thread = IOPubThread(self._underlying_iopub_socket) + thread = IOPubThread(self._underlying_iopub_socket, self.session) thread.start() return thread diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 0a2115f3b..a426139ed 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -47,7 +47,7 @@ class IOPubThread: whose IO is always run in a thread. """ - def __init__(self, socket, pipe=False): + def __init__(self, socket, session=None, pipe=False): """Create IOPub thread Parameters @@ -59,6 +59,7 @@ def __init__(self, socket, pipe=False): piped from subprocesses. """ self.socket = socket + self.session = session self._stopped = False self.background_socket = BackgroundSocket(self) self._master_pid = os.getpid() @@ -73,12 +74,82 @@ def __init__(self, socket, pipe=False): self._event_pipe_gc_seconds: float = 10 self._event_pipe_gc_task: asyncio.Task[Any] | None = None self._setup_event_pipe() + self._setup_xpub_listener() self.thread = threading.Thread(target=self._thread_main, name="IOPub") self.thread.daemon = True self.thread.pydev_do_not_trace = True # type:ignore[attr-defined] self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined] self.thread.name = "IOPub" + def _setup_xpub_listener(self): + """Setup listener for XPUB subscription events""" + + # Checks the socket is not a DummySocket + if not hasattr(self.socket, "getsockopt"): + return + + socket_type = self.socket.getsockopt(zmq.TYPE) + if socket_type == zmq.XPUB: + self._xpub_stream = ZMQStream(self.socket, self.io_loop) + self._xpub_stream.on_recv(self._handle_subscription) + + def _handle_subscription(self, frames): + """Handle subscription/unsubscription events from XPUB socket + + XPUB sockets receive: + - subscribe: single frame with b'\\x01' + topic + - unsubscribe: single frame with b'\\x00' + topic + """ + + for frame in frames: + event_type = frame[0] + if event_type == 1: + subscription = frame[1:] if len(frame) > 1 else b"" + try: + subscription_str = subscription.decode("utf-8") + except UnicodeDecodeError: + continue + self._send_welcome_message(subscription_str) + + def _send_welcome_message(self, subscription): + """Send iopub_welcome message for new subscription + + Parameters + ---------- + subscription : str + The subscription topic (UTF-8 decoded) + """ + + # TODO: This early return is for backward-compatibility with ipyparallel. + # This should be removed when ipykernel has been released with support of + # xpub and ipyparallel has been updated to pass the session parameter + # to IOPubThread upon construction. + # (NB: the call to fix is here: + # https://github.com/ipython/ipyparallel/blob/main/ipyparallel/engine/app.py#L679 + if self.session is None: + return + + content = {"subscription": subscription} + + header = self.session.msg_header("iopub_welcome") + msg = { + "header": header, + "parent_header": {}, + "metadata": {}, + "content": content, + "buffers": [], + } + + msg_list = self.session.serialize(msg) + + if subscription: + identity = subscription.encode("utf-8") + full_msg = [identity, *msg_list] + else: + full_msg = msg_list + # Send directly on socket (we're already in IO thread context) + self.socket.send_multipart(full_msg) + def _thread_main(self): """The inner loop that's actually run in a thread""" @@ -447,7 +518,7 @@ def __init__( DeprecationWarning, stacklevel=2, ) - pub_thread = IOPubThread(pub_thread) + pub_thread = IOPubThread(pub_thread, self.session) pub_thread.start() self.pub_thread = pub_thread self.name = name diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 86b275a82..a450d6ca7 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -377,12 +377,12 @@ def init_control(self, context): def init_iopub(self, context): """Initialize the iopub channel.""" - self.iopub_socket = context.socket(zmq.PUB) + self.iopub_socket = context.socket(zmq.XPUB) self.iopub_socket.linger = 1000 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) self.log.debug("iopub PUB Channel on port: %i", self.iopub_port) self.configure_tornado_logger() - self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True) + self.iopub_thread = IOPubThread(self.iopub_socket, self.session, pipe=True) self.iopub_thread.start() # backward-compat: wrap iopub socket API in background thread self.iopub_socket = self.iopub_thread.background_socket diff --git a/tests/conftest.py b/tests/conftest.py index 540ad36c3..ad6c5830a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,7 +48,7 @@ class KernelMixin: def _initialize(self): self.context = context = zmq.Context() - self.iopub_socket = context.socket(zmq.PUB) + self.iopub_socket = context.socket(zmq.XPUB) self.stdin_socket = context.socket(zmq.ROUTER) self.session = Session() self.test_sockets = [self.iopub_socket] diff --git a/tests/test_io.py b/tests/test_io.py index c7320af84..da1000f4e 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -26,8 +26,9 @@ def ctx(): @pytest.fixture() def iopub_thread(ctx): + session = Session() with ctx.socket(zmq.PUB) as pub: - thread = IOPubThread(pub) + thread = IOPubThread(pub, session) thread.start() yield thread @@ -155,7 +156,7 @@ def subprocess_test_echo_watch(): # use PUSH socket to avoid subscription issues with zmq.Context() as ctx, ctx.socket(zmq.PUSH) as pub: pub.connect(os.environ["IOPUB_URL"]) - iopub_thread = IOPubThread(pub) + iopub_thread = IOPubThread(pub, session) iopub_thread.start() stdout_fd = sys.stdout.fileno() sys.stdout.flush()