diff --git a/mypy/dmypy/client.py b/mypy/dmypy/client.py index 4791fe337f09..08a2a0a272d9 100644 --- a/mypy/dmypy/client.py +++ b/mypy/dmypy/client.py @@ -138,6 +138,20 @@ def __init__(self, prog: str, **kwargs: Any) -> None: ) p.add_argument("--remove", metavar="FILE", nargs="*", help="Files to remove from the run") + +watch_parser = p = subparsers.add_parser("watch", help="Check on a set interval (requires daemon)") +p.add_argument("--log-file", metavar="FILE", type=str, help="Direct daemon stdout/stderr to FILE") +p.add_argument("files", metavar="FILE", nargs="+", help="File (or directory) to check") +p.add_argument( + "--interval", metavar="INTERVAL", default=1, type=int, help="Time between checks (in seconds)" +) +p.add_argument( + "--export-types", + action="store_true", + help="Store types of all expressions in a shared location (useful for inspections)", +) + + suggest_parser = p = subparsers.add_parser( "suggest", help="Suggest a signature or show call sites for a specific function" ) @@ -599,7 +613,7 @@ def show_stats(response: Mapping[str, object]) -> None: # Special case text output to display just 40 characters of text value = repr(value)[1:-1] if len(value) > 50: - value = f"{value[:40]} ... {len(value)-40} more characters" + value = f"{value[:40]} ... {len(value) - 40} more characters" print("%-24s: %s" % (key, value)) continue print("%-24s: %10s" % (key, "%.3f" % value if isinstance(value, float) else value)) @@ -611,6 +625,31 @@ def do_hang(args: argparse.Namespace) -> None: print(request(args.status_file, "hang", timeout=1)) +@action(watch_parser) +def do_watch(args: argparse.Namespace) -> None: + """Recheck the same set of files every few seconds""" + previous_output = None + previous_err = None + while True: + try: + time.sleep(args.interval) + response = request( + args.status_file, "check", files=args.files, export_types=args.export_types + ) + output = response["out"] + err = response["err"] + if output != previous_output or err != previous_err: + os.system("cls" if os.name == "nt" else "clear") + sys.stdout.write(output) + sys.stdout.flush() + sys.stderr.write(err) + sys.stderr.flush() + previous_output = output + previous_err = err + except KeyboardInterrupt: + break + + @action(daemon_parser) def do_daemon(args: argparse.Namespace) -> None: """Serve requests in the foreground.""" diff --git a/mypy/test/testdaemon.py b/mypy/test/testdaemon.py index 7115e682e60d..ae0af6b70449 100644 --- a/mypy/test/testdaemon.py +++ b/mypy/test/testdaemon.py @@ -8,9 +8,13 @@ from __future__ import annotations import os +import pathlib +import signal import subprocess import sys import tempfile +import threading +import time import unittest from mypy.dmypy_server import filter_out_missing_top_level_packages @@ -130,3 +134,86 @@ def make_file(self, base: str, path: str) -> None: if not path.endswith("/"): with open(fullpath, "w") as f: f.write("# test file") + + +class DaemonWatchSuite(unittest.TestCase): + def setUp(self) -> None: + self.temp_dir = tempfile.TemporaryDirectory() + self.temp_path = pathlib.Path(self.temp_dir.name) + self.output_lines: list[str] = [] + self.stop_reader = False + + def _read_output(self) -> None: + assert self.process.stdout + for line in self.process.stdout: + self.output_lines.append(line.strip()) + if self.stop_reader: + break + + def _start_watching(self, args: str, start_daemon: bool = True) -> None: + if start_daemon: + subprocess.run( + [sys.executable, "-m", "mypy.dmypy", "start"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=self.temp_path, + check=True, + ) + + self.process = subprocess.Popen( + [sys.executable, "-m", "mypy.dmypy", "watch", args], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=self.temp_path, + text=True, + universal_newlines=True, + bufsize=1, + ) + + self.reader_thread = threading.Thread(target=self._read_output, daemon=True) + self.reader_thread.start() + + def _wait_for_output(self, text: str, timeout: int = 5) -> bool: + """Wait for text to appear in output within timeout seconds.""" + start_time = time.time() + while time.time() - start_time < timeout: + if any(text in line for line in self.output_lines): + return True + time.sleep(0.1) + return False + + def test_watcher_reacts_to_file_changes(self) -> None: + (self.temp_path / "valid.py").write_text( + "def hello_world() -> str:\n return 'Hello World!'" + ) + + self._start_watching(".") + + # The initial run can take a bit longer, therefore the 10s timeout + self.assertTrue(self._wait_for_output("Success: no issues found in 1 source file", 10)) + + (self.temp_path / "invalid.py").write_text( + "def hello_world() -> int:\n return 'Hello World!'" + ) + + self.assertTrue(self._wait_for_output("Incompatible return value type")) + self.assertTrue(self._wait_for_output("Found 1 error in 1 file")) + + def tearDown(self) -> None: + subprocess.run([sys.executable, "-m", "mypy.dmypy", "stop"], cwd=self.temp_path) + + if self.process.poll() is None: + if sys.platform == "win32": + self.process.kill() + else: + self.process.send_signal(signal.SIGINT) + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + + self.stop_reader = True + if self.reader_thread.is_alive(): + self.reader_thread.join(timeout=5) + + self.temp_dir.cleanup()