Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ jobs:
- name: Test
run: |
cd tests
export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor
pytest --durations=0 --durations-min=1.0 -v
build_and_test_p2pd:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -72,7 +71,6 @@ jobs:
- name: Test
run: |
cd tests
export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor
pytest -k "p2p" -v
codecov_in_develop_mode:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -101,7 +99,6 @@ jobs:
pip install -e . --no-use-pep517
- name: Test
run: |
export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor
pytest --cov hivemind --cov-config=pyproject.toml -v tests
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
32 changes: 20 additions & 12 deletions hivemind/p2p/p2p_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,18 +684,26 @@

async def _read_outputs(self, ready: asyncio.Future) -> None:
last_line = None
while True:
line = await self._child.stdout.readline()
if not line: # Stream closed
break
last_line = line.rstrip().decode(errors="ignore")

self._log_p2pd_message(last_line)
if last_line.startswith("Peer ID:"):
ready.set_result(None)

if not ready.done():
ready.set_exception(P2PDaemonError(f"Daemon failed to start: {last_line}"))
try:
while True:
try:
line = await self._child.stdout.readline()
if not line: # Stream closed
break
last_line = line.rstrip().decode(errors="ignore")

self._log_p2pd_message(last_line)
if last_line.startswith("Peer ID:"):
ready.set_result(None)
except (asyncio.CancelledError, RuntimeError):
# Task was cancelled or event loop closed
break

if not ready.done():
ready.set_exception(P2PDaemonError(f"Daemon failed to start: {last_line}"))
except (asyncio.CancelledError, RuntimeError):

Check warning on line 704 in hivemind/p2p/p2p_daemon.py

View check run for this annotation

Codecov / codecov/patch

hivemind/p2p/p2p_daemon.py#L704

Added line #L704 was not covered by tests
# Task was cancelled or event loop closed during cleanup
pass

Check warning on line 706 in hivemind/p2p/p2p_daemon.py

View check run for this annotation

Codecov / codecov/patch

hivemind/p2p/p2p_daemon.py#L706

Added line #L706 was not covered by tests

@staticmethod
def _log_p2pd_message(line: str) -> None:
Expand Down
78 changes: 47 additions & 31 deletions hivemind/p2p/p2p_daemon_bindings/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

import asyncio
from contextlib import asynccontextmanager, closing
from contextlib import asynccontextmanager
from typing import AsyncIterator, Awaitable, Callable, Dict, Iterable, Optional, Sequence, Tuple
from uuid import UUID, uuid4

Expand Down Expand Up @@ -170,46 +170,62 @@
yield self

async def _read_from_persistent_conn(self, reader: asyncio.StreamReader):
while True:
resp = p2pd_pb.PersistentConnectionResponse()
try:
await read_pbmsg_safe(reader, resp)
except asyncio.IncompleteReadError:
break

call_id = UUID(bytes=resp.callId)

if resp.HasField("callUnaryResponse"):
if call_id in self._pending_calls and resp.callUnaryResponse.HasField("response"):
self._pending_calls[call_id].set_result(resp.callUnaryResponse.response)
elif call_id in self._pending_calls and resp.callUnaryResponse.HasField("error"):
remote_exc = P2PHandlerError(resp.callUnaryResponse.error.decode(errors="ignore"))
self._pending_calls[call_id].set_exception(remote_exc)
else:
logger.debug(f"Received unexpected unary call: {resp}")
try:
while True:
resp = p2pd_pb.PersistentConnectionResponse()
try:
await read_pbmsg_safe(reader, resp)
except asyncio.IncompleteReadError:
break

call_id = UUID(bytes=resp.callId)

elif resp.HasField("requestHandling"):
handler_task = asyncio.create_task(self._handle_persistent_request(call_id, resp.requestHandling))
self._handler_tasks[call_id] = handler_task
if resp.HasField("callUnaryResponse"):
if call_id in self._pending_calls and resp.callUnaryResponse.HasField("response"):
self._pending_calls[call_id].set_result(resp.callUnaryResponse.response)
elif call_id in self._pending_calls and resp.callUnaryResponse.HasField("error"):
remote_exc = P2PHandlerError(resp.callUnaryResponse.error.decode(errors="ignore"))
self._pending_calls[call_id].set_exception(remote_exc)
else:
logger.debug(f"Received unexpected unary call: {resp}")

Check warning on line 190 in hivemind/p2p/p2p_daemon_bindings/control.py

View check run for this annotation

Codecov / codecov/patch

hivemind/p2p/p2p_daemon_bindings/control.py#L190

Added line #L190 was not covered by tests

elif call_id in self._handler_tasks and resp.HasField("cancel"):
cancel_task_if_running(self._handler_tasks[call_id])
elif resp.HasField("requestHandling"):
handler_task = asyncio.create_task(self._handle_persistent_request(call_id, resp.requestHandling))
self._handler_tasks[call_id] = handler_task

elif call_id in self._pending_calls and resp.HasField("daemonError"):
daemon_exc = P2PDaemonError(resp.daemonError.message)
self._pending_calls[call_id].set_exception(daemon_exc)
elif call_id in self._handler_tasks and resp.HasField("cancel"):
cancel_task_if_running(self._handler_tasks[call_id])

elif call_id in self._pending_calls:
self._pending_calls[call_id].set_result(None)
elif call_id in self._pending_calls and resp.HasField("daemonError"):
daemon_exc = P2PDaemonError(resp.daemonError.message)
self._pending_calls[call_id].set_exception(daemon_exc)

else:
logger.debug(f"Received unexpected response from daemon: {resp}")
elif call_id in self._pending_calls:
self._pending_calls[call_id].set_result(None)

else:
logger.debug(f"Received unexpected response from daemon: {resp}")
except asyncio.CancelledError:
# Task was cancelled, clean up gracefully
pass

async def _write_to_persistent_conn(self, writer: asyncio.StreamWriter):
with closing(writer):
try:
while True:
msg = await self._pending_messages.get()
await write_pbmsg(writer, msg)
except (asyncio.CancelledError, RuntimeError):
# Task was cancelled or event loop closed
pass
finally:
# Close writer safely, avoiding "Event loop is closed" errors
try:
if not writer.is_closing():
writer.close()
await writer.wait_closed()
except (RuntimeError, ConnectionError):
# Event loop might be closed or connection already closed
pass

async def _handle_persistent_request(self, call_id: UUID, request: p2pd_pb.CallUnaryRequest):
if request.proto not in self.unary_handlers:
Expand Down
4 changes: 2 additions & 2 deletions hivemind/utils/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,6 @@ def cancel_task_if_running(task: Optional[asyncio.Task]) -> None:
if loop.is_running():
task.cancel()
except RuntimeError as e:
# Only ignore event loop closure errors
if "Event loop is closed" not in str(e):
# Ignore event loop closure and missing event loop errors
if "Event loop is closed" not in str(e) and "There is no current event loop" not in str(e):
raise
Loading
Loading