Skip to content

Commit ffc3528

Browse files
committed
Add ability to transfer data from nodes to a controller
1 parent 9c7b5f4 commit ffc3528

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

src/xdist/dsession.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1+
from queue import Empty, Queue
2+
13
import py
24
import pytest
35

4-
from xdist.workermanage import NodeManager
6+
from xdist.remote import shared_key
57
from xdist.scheduler import (
68
EachScheduling,
79
LoadScheduling,
810
LoadScopeScheduling,
911
LoadFileScheduling,
1012
LoadGroupScheduling,
1113
)
12-
13-
14-
from queue import Empty, Queue
14+
from xdist.workermanage import NodeManager
1515

1616

1717
class Interrupted(KeyboardInterrupt):
@@ -174,6 +174,13 @@ def worker_workerfinished(self, node):
174174
self.shouldstop = "{} received keyboard-interrupt".format(node)
175175
self.worker_errordown(node, "keyboard-interrupt")
176176
return
177+
if "shared" in node.workeroutput:
178+
shared = self.config.stash.setdefault(shared_key, {})
179+
for key, value in node.workeroutput["shared"].items():
180+
if key in shared:
181+
shared[key].append(value)
182+
else:
183+
shared[key] = [value]
177184
if node in self.sched.nodes:
178185
crashitem = self.sched.remove_node(node)
179186
assert not crashitem, (crashitem, node)

src/xdist/plugin.py

+25-1
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
import py
77
import pytest
8-
8+
from _pytest.stash import StashKey
99

1010
PYTEST_GTE_7 = hasattr(pytest, "version_tuple") and pytest.version_tuple >= (7, 0) # type: ignore[attr-defined]
1111

1212
_sys_path = list(sys.path) # freeze a copy of sys.path at interpreter startup
1313

14+
shared_key = StashKey["str"]()
15+
1416

1517
@pytest.hookimpl
1618
def pytest_xdist_auto_num_workers(config):
@@ -302,3 +304,25 @@ def testrun_uid(request):
302304
return request.config.workerinput["testrunuid"]
303305
else:
304306
return uuid.uuid4().hex
307+
308+
309+
def get_shared_data(request_or_session):
310+
"""Return shared data and True, if it is ran from xdist_controller"""
311+
if is_xdist_controller(request_or_session):
312+
return request_or_session.config.stash.setdefault(shared_key, {}), True
313+
return request_or_session.config.stash.setdefault(shared_key, {}), False
314+
315+
316+
@pytest.fixture(scope="session")
317+
def add_shared_data(request, worker_id):
318+
"""Adds data that will be collected from all workers and be accessible from master node in sessionfinish hook"""
319+
320+
def _add(key, value):
321+
shared = request.config.stash.setdefault(shared_key, {})
322+
if worker_id == "master":
323+
# Worker shared_data are grouped together, master data aren't
324+
shared[key] = [value]
325+
else:
326+
shared[key] = value
327+
328+
return _add

src/xdist/remote.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
needs not to be installed in remote environments.
77
"""
88

9-
import sys
109
import os
10+
import sys
1111
import time
1212

1313
import py
1414
import pytest
15+
from _pytest.config import _prepareconfig, Config
1516
from execnet.gateway_base import dumps, DumpError
1617

17-
from _pytest.config import _prepareconfig, Config
18+
from xdist.plugin import shared_key
1819

1920
try:
2021
from setproctitle import setproctitle
@@ -64,6 +65,9 @@ def pytest_sessionstart(self, session):
6465
def pytest_sessionfinish(self, exitstatus):
6566
# in pytest 5.0+, exitstatus is an IntEnum object
6667
self.config.workeroutput["exitstatus"] = int(exitstatus)
68+
shared = self.config.stash.get(shared_key, None)
69+
if shared:
70+
self.config.workeroutput["shared"] = shared
6771
yield
6872
self.sendevent("workerfinished", workeroutput=self.config.workeroutput)
6973

0 commit comments

Comments
 (0)