|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import importlib.resources |
3 | 4 | import inspect |
4 | 5 | import multiprocessing |
5 | 6 | import os |
6 | 7 | import subprocess |
7 | 8 | import sys |
8 | 9 | from contextlib import AbstractContextManager, ExitStack, contextmanager |
9 | | -from typing import TYPE_CHECKING, Any |
| 10 | +from typing import TYPE_CHECKING, Any, Literal |
10 | 11 |
|
11 | 12 | try: |
12 | 13 | import rich_click as click |
|
21 | 22 | console, |
22 | 23 | create_ssl_files, |
23 | 24 | isatty, |
| 25 | + populate_repl_globals, |
24 | 26 | remove_default_schema_routes, |
25 | 27 | remove_routes_with_patterns, |
26 | 28 | show_app_info, |
@@ -365,3 +367,86 @@ def _handle_http_route(self, route: HTTPRoute) -> None: |
365 | 367 | branch.add(" ".join([f"[green]{path}[green]", *handler_info])) |
366 | 368 | else: |
367 | 369 | branch.add(" ".join(handler_info)) |
| 370 | + |
| 371 | + |
| 372 | +def _autoselect_repl_module() -> Literal["repl", "asyncio", "ipython"]: |
| 373 | + import importlib.util |
| 374 | + |
| 375 | + if importlib.util.find_spec("IPython"): |
| 376 | + return "ipython" |
| 377 | + |
| 378 | + if sys.version_info >= (3, 13): |
| 379 | + return "asyncio" |
| 380 | + |
| 381 | + return "repl" |
| 382 | + |
| 383 | + |
| 384 | +@click.command(name="shell") |
| 385 | +@click.option( |
| 386 | + "--repl", |
| 387 | + default=None, |
| 388 | + type=click.Choice(("repl", "asyncio", "ipython")), |
| 389 | + envvar="LITESTAR_REPL", |
| 390 | + required=False, |
| 391 | + help="Start a Python shell with the Litestar application and other values provided by plugins preloaded", |
| 392 | +) |
| 393 | +def shell_command( |
| 394 | + app: Litestar, |
| 395 | + repl: Literal["repl", "asyncio", "ipython"] | None = None, |
| 396 | +) -> None: # pragma: no cover |
| 397 | + if repl is None: |
| 398 | + repl = _autoselect_repl_module() |
| 399 | + click.secho(f"Starting Litestar shell using autoselected REPL {repl!r}", fg="blue") |
| 400 | + |
| 401 | + if repl == "asyncio": |
| 402 | + if sys.version_info < (3, 13): |
| 403 | + click.secho("Litestar shell using the asyncio REPL requires Python 3.13 or greater", fg="red") |
| 404 | + click.secho( |
| 405 | + "To use the Litestar shell with an async REPL in this version of Python, y" |
| 406 | + "ou can install Litestar with the 'ipython' extra (litestar[ipython]) " |
| 407 | + "and then select ipython as the repl, either by starting he Litestar " |
| 408 | + "shell with 'litestar shell --repl=ipython' or setting the " |
| 409 | + "'LITESTAR_REPL=ipython' environment variable", |
| 410 | + fg="blue", |
| 411 | + ) |
| 412 | + quit(1) |
| 413 | + |
| 414 | + # since it's currently not possible to customise e.g. the namespace of the |
| 415 | + # asyncio REPL, we have to get a bit creative here |
| 416 | + |
| 417 | + if "PYTHONSTARTUP" in os.environ: # type: ignore[unreachable] |
| 418 | + click.secho( |
| 419 | + "Cannot run Litestar shell with asyncio REPL when PYTHONSTARTUP is " |
| 420 | + "set. PYTHONSTARTUP is currently set to " |
| 421 | + f"{os.environ['PYTHONSTARTUP']!r}. Either unset PYTHONSTARTUP or use a " |
| 422 | + "different REPL ('repl' or 'ipython').", |
| 423 | + fg="red", |
| 424 | + ) |
| 425 | + |
| 426 | + subprocess.run( # noqa: S603 |
| 427 | + [sys.executable, "-m", "asyncio"], |
| 428 | + check=False, |
| 429 | + env={ |
| 430 | + **os.environ, |
| 431 | + "PYTHONSTARTUP": str(importlib.resources.files("litestar.cli").joinpath("_shell_startup")) + ".py", |
| 432 | + }, |
| 433 | + ) |
| 434 | + elif repl == "repl": |
| 435 | + import code |
| 436 | + |
| 437 | + repl_locals, banner = populate_repl_globals(app=app) |
| 438 | + interpreter = code.InteractiveConsole(locals=repl_locals) |
| 439 | + interpreter.interact(banner=banner) |
| 440 | + elif repl == "ipython": |
| 441 | + import IPython |
| 442 | + from traitlets.config.loader import Config |
| 443 | + |
| 444 | + repl_locals, banner = populate_repl_globals(app=app) |
| 445 | + |
| 446 | + config = Config() |
| 447 | + config.TerminalInteractiveShell.banner2 = banner |
| 448 | + |
| 449 | + IPython.start_ipython(argv=[], user_ns=repl_locals, config=config) # type: ignore[no-untyped-call] |
| 450 | + else: |
| 451 | + click.secho(f"Unsupported REPL {repl!r}") # type: ignore[unreachable] |
| 452 | + quit(1) |
0 commit comments