-
-
Notifications
You must be signed in to change notification settings - Fork 746
Defer pandas import on worker in P2P shuffle #5695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
In dask#5688, we discovered that dask#5520 was increasing worker unmanaged memory usage by ~10MiB at startup (on my mac). I suspect that this came from importing pandas. The shuffle extension already worked if pandas wasn't available: it just wouldn't install itself on the worker. However, to check if pandas was available, it was importing pandas—so if pandas _was_ available, every worker would have to spend the time and memory to import it at startup, even if it wasn't used at all. With this PR, all use of pandas is deferred. There's also now an explicit test that importing the shuffle does not import pandas.
| column: str, | ||
| npartitions: int | None = None, | ||
| ): | ||
| from dask.dataframe import DataFrame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love how dask.dataframe.DataFrame is also imported if TYPE_CHECKING. It means I could remove this line here, and the code would still pass mypy and flake8, even though it would fail at runtime. Tests would still fail, but I could see making a similar mistake in the future in some place that didn't have test coverage, and unwittingly introducing a runtime error.
However, because I want to use DataFrame as a type annotation, it does need to be imported for type-checkers.
I don't really think there's a way around this. Just curious if anyone had ideas, particularly @crusaderky or @ian-r-rose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this issue is well-known upstream: PyCQA/pyflakes#530
The solution is remembering that linters are a nice and fast way to detect issues but cannot replace unit tests :)
| import subprocess | ||
| import sys | ||
|
|
||
| import_check_code = """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried a couple other options to avoid this kind of nasty "code in a string" pattern:
- Mucking with
sys.modules(settingpandasto None, removingdask.dataframeanddistributed.shuffle, then re-importingdistributed.shuffle). This worked, but felt brittle, because you basically had to list all the placespandasmight be transitively imported from, and remove them fromsys.modules. - Using
multiprocessinginspawnmode to start a fresh interpreter and run this code. Becausespawnmode works by sending pickled function, the the function must be importable. And un-picking the function means importingdistributed.shuffle.tests.test_no_pandas_import.func_name_whatever. Therefore,distributed.shufflegets imported before your function can even execute! So though you can assert that pandas hasn't been imported, if it has been imported, you can't tell from where, which makes the test very annoying to debug.
In the end, the "code in a string" was both much easier to read and more robust than anything else I could come up with.
(Also, if distributed had a mindeps test, we might not need this at all. But it's nice to have anyway for local development.)
| import subprocess | ||
| import sys | ||
|
|
||
| import_check_code = """ | ||
| import sys | ||
| current_pandas_modules = [m for m in sys.modules if m.startswith("pandas")] | ||
| assert ( | ||
| not current_pandas_modules | ||
| ), "pandas is already imported at startup: " + "\\n".join(current_pandas_modules) | ||
| # Make pandas un-importable | ||
| sys.modules["pandas"] = None | ||
| # "if the value is None, then a ModuleNotFoundError is raised" | ||
| # https://docs.python.org/3.6/reference/import.html#the-module-cache | ||
| import distributed.shuffle # noqa: F401 | ||
| """ | ||
|
|
||
|
|
||
| def test_import_no_pandas(): | ||
| p = subprocess.run( | ||
| [sys.executable], | ||
| input=import_check_code, | ||
| text=True, | ||
| timeout=10, | ||
| ) | ||
| assert ( | ||
| p.returncode == 0 | ||
| ), "Importing the shuffle extension without pandas failed. See logs for details." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| import subprocess | |
| import sys | |
| import_check_code = """ | |
| import sys | |
| current_pandas_modules = [m for m in sys.modules if m.startswith("pandas")] | |
| assert ( | |
| not current_pandas_modules | |
| ), "pandas is already imported at startup: " + "\\n".join(current_pandas_modules) | |
| # Make pandas un-importable | |
| sys.modules["pandas"] = None | |
| # "if the value is None, then a ModuleNotFoundError is raised" | |
| # https://docs.python.org/3.6/reference/import.html#the-module-cache | |
| import distributed.shuffle # noqa: F401 | |
| """ | |
| def test_import_no_pandas(): | |
| p = subprocess.run( | |
| [sys.executable], | |
| input=import_check_code, | |
| text=True, | |
| timeout=10, | |
| ) | |
| assert ( | |
| p.returncode == 0 | |
| ), "Importing the shuffle extension without pandas failed. See logs for details." | |
| import sys | |
| from distributed.nanny import Nanny | |
| from distributed.utils_test import gen_cluster, inc | |
| @gen_cluster( | |
| client=True, | |
| Worker=Nanny, | |
| nthreads=[("", 1)], | |
| config={"distributed.worker.multiprocessing-method": "spawn"}, | |
| ) | |
| async def test_import_no_pandas(c, s, a, b): | |
| """Test that pandas is not imported when a worker is spawned, | |
| unless the shuffle extension is actually used | |
| """ | |
| def assert_no_pandas(dask_worker): | |
| assert dask_worker.extensions["shuffle"] | |
| assert "pandas" not in sys.modules | |
| assert await c.submit(inc, 1) == 2 | |
| await c.run(assert_no_pandas) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a neat way of doing it. The one thing is that it won't point you to where pandas is being imported, just that it is (somewhere). The reason I went for running the script in a subprocess was that you'd get an import error (and traceback to) exactly where the import was happening, which made fixing the failure much easier.
If you'd prefer giving up the debuggability for this more conventional pattern, I'm happy to make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that git bisect is enough to offer debuggability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even still, you have to figure out what line of code is causing the import (and know how to use git bisect). Getting the traceback to the exact line of code just feels more friendly to other contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I counter-argue that your version would not notice it if there was an import pandas inside ShuffleWorkerExtension.__init__. IMHO tests should be as black-box as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point.
Unfortunately, the gen_cluster approach doesn't work. Pandas is already imported even if the shuffle extension is not imported on the worker. A good example of how when there isn't a traceback to see where the import is happening, debugging it is tough!
My guess is that the problem is that nannies pre-import pandas (and other modules) automatically into the worker process:
The global mp_context used by Nannies to create worker processes is initialized here at import time:
distributed/distributed/utils.py
Lines 69 to 93 in 338d0be
| def _initialize_mp_context(): | |
| if WINDOWS or PYPY: | |
| return multiprocessing | |
| else: | |
| method = dask.config.get("distributed.worker.multiprocessing-method") | |
| ctx = multiprocessing.get_context(method) | |
| # Makes the test suite much faster | |
| preload = ["distributed"] | |
| if "pkg_resources" in sys.modules: | |
| preload.append("pkg_resources") | |
| from .versions import optional_packages, required_packages | |
| for pkg, _ in required_packages + optional_packages: | |
| try: | |
| importlib.import_module(pkg) | |
| except ImportError: | |
| pass | |
| else: | |
| preload.append(pkg) | |
| ctx.set_forkserver_preload(preload) | |
| return ctx | |
| mp_context = _initialize_mp_context() |
In particular, notice how all required and optional packages that are importable are added to
set_forkserver_preload.
pandas is in the list of optional packages:
distributed/distributed/versions.py
Lines 24 to 29 in 338d0be
| optional_packages = [ | |
| ("numpy", lambda p: p.__version__), | |
| ("pandas", lambda p: p.__version__), | |
| ("lz4", lambda p: p.__version__), | |
| ("blosc", lambda p: p.__version__), | |
| ] |
(Sidenote: we're using spawn by default; I confirmed that's what's used when I run the test. Reading the multiprocessing code, I don't understand why set_forkserver_preload is affecting non-forkserver contexts. But if I comment out the pandas line from optional_packages, the test passes, so I'm not going to look into it much further.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity: @crusaderky and I discovered that set_forkserver_preload has no effect when using spawn, as expected. Instead, the reason pandas was always imported on Nanny workers (but not imported if it was commented out from the list in versions.py) is that the get_versions check on client connect imports all those packages listed in order to get their versions. This could be avoided.
xfail'd until dask#5724
|
...Great, now test_memory is heavily flaky -__- |
|
Merged CI test of #5695 + #5724 running on crusaderky#2 and things don't look very good so far. I can see much more flakiness than main and much slower end-to-end test runtime. |
|
It seems like CI is back on track. I reran all unit tests. |
|
@gjoseph92 please merge from main; you shoud get all green now (well, no more red than usual at least 😆 ) |
a83b5fb to
a591f9a
Compare
|
Thanks @crusaderky for all your work smoothing out the flakes! |
Just on one windows box! Looking very good. |
|
Thank you! |
|
@crusaderky @gjoseph92 - Just a note that this PR seems to have broken Dask's test_scheduler_highlevel_graph_unpack_import tests. It doesn't look like this PR does anything dangerous, since pandas does not need to be available. However, the Dask tests currently assume that the scheduler will not import pandas/numpy. cc @jrbourbeau - In case you have thoughts on |
In #5688, we discovered that #5520 was increasing worker unmanaged memory usage by ~10MiB at startup (on my mac). I suspect that this came from importing pandas.
The shuffle extension already worked if pandas wasn't available: it just wouldn't install itself on the worker. However, to check if pandas was available, it was importing pandas—so if pandas was available, every worker would have to spend the time and memory to import pandas at startup, even if it wasn't used at all.
With this PR, all use of pandas is deferred. There's also now an explicit test that importing the shuffle does not import pandas.
pre-commit run --all-files