Skip to content

EOExecutor improvements, part 3 #360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 18, 2022
146 changes: 97 additions & 49 deletions core/eolearn/core/eoexecution.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/eolearn/core/eonode.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __post_init__(self):
if self.name is None:
super().__setattr__('name', self.task.__class__.__name__)

super().__setattr__('uid', generate_uid(self.name))
super().__setattr__('uid', generate_uid(self.task.__class__.__name__))

def get_custom_name(self, number=0):
""" Provides custom node name according to the class of the contained task and a given number
Expand Down
8 changes: 4 additions & 4 deletions core/eolearn/core/extra/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def _get_processing_type(*_, **__):
return _ProcessingType.RAY

@classmethod
def _run_execution(cls, processing_args, *_, **__):
def _run_execution(cls, execution_kwargs, *_, **__):
""" Runs ray execution
"""
futures = [_ray_workflow_executor.remote(workflow_args) for workflow_args in processing_args]
futures = [_ray_workflow_executor.remote(workflow_kwargs) for workflow_kwargs in execution_kwargs]

for _ in tqdm(_progress_bar_iterator(futures), total=len(futures)):
pass
Expand All @@ -54,11 +54,11 @@ def _run_execution(cls, processing_args, *_, **__):


@ray.remote
def _ray_workflow_executor(workflow_args):
def _ray_workflow_executor(workflow_kwargs):
""" Called to execute a workflow on a ray worker
"""
# pylint: disable=protected-access
return RayExecutor._execute_workflow(workflow_args)
return RayExecutor._execute_workflow(workflow_kwargs)


def _progress_bar_iterator(futures):
Expand Down
34 changes: 32 additions & 2 deletions core/eolearn/core/fs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_filesystem(path: str, create: bool = False, config: Optional[SHConfig] =
if isinstance(path, Path):
path = str(path)

if path.startswith('s3://'):
if is_s3_path(path):
return load_s3_filesystem(path, config=config, **kwargs)

return fs.open_fs(path, create=create, **kwargs)
Expand Down Expand Up @@ -84,7 +84,7 @@ def load_s3_filesystem(path: str, strict: bool = False, config: Optional[SHConfi
:return: A S3 filesystem object
:rtype: fs_s3fs.S3FS
"""
if not path.startswith('s3://'):
if not is_s3_path(path):
raise ValueError(f"AWS path has to start with s3:// but found '{path}'")

config = config or SHConfig()
Expand Down Expand Up @@ -120,3 +120,33 @@ def get_aws_credentials(aws_profile: str, config: Optional[SHConfig] = None) ->
config.aws_access_key_id = aws_credentials.access_key
config.aws_secret_access_key = aws_credentials.secret_key
return config


def get_full_path(filesystem: fs.base.FS, relative_path: str) -> str:
"""Given a filesystem object and a path, relative to the filesystem it provides a full path."""
if isinstance(filesystem, S3FS):
# pylint: disable=protected-access
return join_path(f"s3://{filesystem._bucket_name}", filesystem.dir_path, relative_path)

return os.path.normpath(filesystem.getsyspath(relative_path))


def join_path(*path_parts: str) -> str:
"""A utility function for joining a path that is either local or S3.

:param path_parts: Partial paths where the first part will be used to decide if it is an S3 path or a local path
:return: Joined path that is also normalized and absolute.
"""
if is_s3_path(path_parts[0]):
path_parts = ((part[5:] if index == 0 else part) for index, part in enumerate(path_parts))
path = "/".join(part.strip("/") for part in path_parts)
path = fs.path.normpath(path)
return f"s3://{path}"

# pylint: disable=no-value-for-parameter
return os.path.abspath(os.path.join(*path_parts))


def is_s3_path(path: str) -> bool:
"""Returns True if the path points to a S3 bucket, False otherwise."""
return path.startswith("s3://")
44 changes: 22 additions & 22 deletions core/eolearn/tests/test_eoexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@ def workflow_fixture(test_nodes):
return workflow


@pytest.fixture(name='execution_args')
def execution_args_fixture(test_nodes):
@pytest.fixture(name='execution_kwargs')
def execution_kwargs_fixture(test_nodes):
example_node = test_nodes['example']

execution_args = [
execution_kwargs = [
{example_node: {'arg1': 1}},
{},
{example_node: {'arg1': 3, 'arg3': 10}},
{example_node: {'arg1': None}}
]
return execution_args
return execution_kwargs


@pytest.mark.parametrize(
Expand All @@ -107,11 +107,11 @@ def execution_args_fixture(test_nodes):
]
)
@pytest.mark.parametrize('execution_names', [None, [4, 'x', 'y', 'z']])
def test_read_logs(test_args, execution_names, workflow, execution_args):
def test_read_logs(test_args, execution_names, workflow, execution_kwargs):
workers, multiprocess, filter_logs = test_args
with tempfile.TemporaryDirectory() as tmp_dir_name:
executor = EOExecutor(
workflow, execution_args, save_logs=True,
workflow, execution_kwargs, save_logs=True,
logs_folder=tmp_dir_name,
logs_filter=CustomLogFilter() if filter_logs else None,
execution_names=execution_names
Expand All @@ -123,23 +123,23 @@ def test_read_logs(test_args, execution_names, workflow, execution_args):
for log in execution_logs:
assert len(log.split()) >= 3

log_filenames = sorted(os.listdir(executor.report_folder))
log_filenames = sorted(executor.filesystem.listdir(executor.report_folder))
assert len(log_filenames) == 4

if execution_names:
for name, log_filename in zip(execution_names, log_filenames):
assert log_filename == f'eoexecution-{name}.log'

log_path = os.path.join(executor.report_folder, log_filenames[0])
with open(log_path, 'r') as fp:
with executor.filesystem.open(log_path, 'r') as fp:
line_count = len(fp.readlines())
expected_line_count = 2 if filter_logs else 12
assert line_count == expected_line_count


def test_execution_results(workflow, execution_args):
def test_execution_results(workflow, execution_kwargs):
with tempfile.TemporaryDirectory() as tmp_dir_name:
executor = EOExecutor(workflow, execution_args, logs_folder=tmp_dir_name)
executor = EOExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name)
executor.run(workers=2)

assert len(executor.execution_results) == 4
Expand All @@ -149,9 +149,9 @@ def test_execution_results(workflow, execution_args):


@pytest.mark.parametrize('multiprocess', [True, False])
def test_execution_errors(multiprocess, workflow, execution_args):
def test_execution_errors(multiprocess, workflow, execution_kwargs):
with tempfile.TemporaryDirectory() as tmp_dir_name:
executor = EOExecutor(workflow, execution_args, logs_folder=tmp_dir_name)
executor = EOExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name)
executor.run(workers=5, multiprocess=multiprocess)

for idx, results in enumerate(executor.execution_results):
Expand All @@ -164,8 +164,8 @@ def test_execution_errors(multiprocess, workflow, execution_args):
assert executor.get_failed_executions() == [3]


def test_execution_results(workflow, execution_args):
executor = EOExecutor(workflow, execution_args)
def test_execution_results(workflow, execution_kwargs):
executor = EOExecutor(workflow, execution_kwargs)
results = executor.run(workers=2, multiprocess=True)

assert isinstance(results, list)
Expand All @@ -176,28 +176,28 @@ def test_execution_results(workflow, execution_args):
assert workflow_results.outputs['output'] == 42


def test_exceptions(workflow, execution_args):
def test_exceptions(workflow, execution_kwargs):
with pytest.raises(ValueError):
EOExecutor(workflow, {})
with pytest.raises(ValueError):
EOExecutor(workflow, execution_args, execution_names={1, 2, 3, 4})
EOExecutor(workflow, execution_kwargs, execution_names={1, 2, 3, 4})
with pytest.raises(ValueError):
EOExecutor(workflow, execution_args, execution_names=['a', 'b'])
EOExecutor(workflow, execution_kwargs, execution_names=['a', 'b'])


def test_keyboard_interrupt():
exception_node = EONode(KeyboardExceptionTask())
workflow = EOWorkflow([exception_node])
execution_args = []
execution_kwargs = []
for _ in range(10):
execution_args.append({exception_node: {'arg1': 1}})
execution_kwargs.append({exception_node: {'arg1': 1}})

run_args = [{'workers': 1},
run_kwargs = [{'workers': 1},
{'workers': 3, 'multiprocess': True},
{'workers': 3, 'multiprocess': False}]
for arg in run_args:
for kwarg in run_kwargs:
with pytest.raises(KeyboardInterrupt):
EOExecutor(workflow, execution_args).run(**arg)
EOExecutor(workflow, execution_kwargs).run(**kwarg)


def test_with_lock(num_workers):
Expand Down
27 changes: 26 additions & 1 deletion core/eolearn/tests/test_fs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from sentinelhub import SHConfig
from eolearn.core import get_filesystem, load_s3_filesystem
from eolearn.core.fs_utils import get_aws_credentials
from eolearn.core.fs_utils import get_aws_credentials, get_full_path, join_path


def test_get_local_filesystem(tmp_path):
Expand Down Expand Up @@ -79,3 +79,28 @@ def test_get_aws_credentials(mocked_copy):
config = get_aws_credentials('default', config=default_config)
assert config.aws_access_key_id != default_config.aws_access_key_id
assert config.aws_secret_access_key != default_config.aws_secret_access_key


@pytest.mark.parametrize(
argnames="path_parts, expected_path",
ids=["local", "s3"],
argvalues=[
(["/tmp", "folder", "xyz", "..", "file.json"], os.path.join("/tmp", "folder", "file.json")),
(["s3://xx/", "/y/z", "a", "..", "b.json"], "s3://xx/y/z/b.json"),
],
)
def test_join_path(path_parts, expected_path):
assert join_path(*path_parts) == expected_path


@pytest.mark.parametrize(
"filesystem, path, expected_full_path",
[
(OSFS("/tmp"), "my/folder", "/tmp/my/folder"),
(S3FS(bucket_name="data", dir_path="/folder"), "/sub/folder", "s3://data/folder/sub/folder"),
(S3FS(bucket_name="data"), "/sub/folder", "s3://data/sub/folder"),
],
)
def test_get_full_path(filesystem, path, expected_full_path):
full_path = get_full_path(filesystem, path)
assert full_path == expected_full_path
56 changes: 28 additions & 28 deletions core/eolearn/tests/test_rayexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,32 @@ def workflow_fixture(test_nodes):
return workflow


@pytest.fixture(name='execution_args')
def execution_args_fixture(test_nodes):
@pytest.fixture(name='execution_kwargs')
def execution_kwargs_fixture(test_nodes):
example_node = test_nodes['example']

execution_args = [
execution_kwargs = [
{example_node: {'arg1': 1}},
{},
{example_node: {'arg1': 3, 'arg3': 10}},
{example_node: {'arg1': None}}
]
return execution_args
return execution_kwargs


def test_fail_without_ray(workflow, execution_args):
executor = RayExecutor(workflow, execution_args)
def test_fail_without_ray(workflow, execution_kwargs):
executor = RayExecutor(workflow, execution_kwargs)
with pytest.raises(RuntimeError):
executor.run()


@pytest.mark.parametrize('filter_logs', [True, False])
@pytest.mark.parametrize('execution_names', [None, [4, 'x', 'y', 'z']])
def test_read_logs(filter_logs, execution_names, workflow, execution_args, simple_cluster):
def test_read_logs(filter_logs, execution_names, workflow, execution_kwargs, simple_cluster):

with tempfile.TemporaryDirectory() as tmp_dir_name:
executor = RayExecutor(
workflow, execution_args, save_logs=True,
workflow, execution_kwargs, save_logs=True,
logs_folder=tmp_dir_name,
logs_filter=CustomLogFilter() if filter_logs else None,
execution_names=execution_names
Expand All @@ -114,23 +114,23 @@ def test_read_logs(filter_logs, execution_names, workflow, execution_args, simpl
for log in execution_logs:
assert len(log.split()) >= 3

log_filenames = sorted(os.listdir(executor.report_folder))
log_filenames = sorted(executor.filesystem.listdir(executor.report_folder))
assert len(log_filenames) == 4

if execution_names:
for name, log_filename in zip(execution_names, log_filenames):
assert log_filename == f'eoexecution-{name}.log'

log_path = os.path.join(executor.report_folder, log_filenames[0])
with open(log_path, 'r') as fp:
with executor.filesystem.open(log_path, 'r') as fp:
line_count = len(fp.readlines())
expected_line_count = 2 if filter_logs else 12
assert line_count == expected_line_count


def test_execution_results(workflow, execution_args, simple_cluster):
def test_execution_results(workflow, execution_kwargs, simple_cluster):
with tempfile.TemporaryDirectory() as tmp_dir_name:
executor = RayExecutor(workflow, execution_args, logs_folder=tmp_dir_name)
executor = RayExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name)
executor.run()

assert len(executor.execution_results) == 4
Expand All @@ -139,9 +139,9 @@ def test_execution_results(workflow, execution_args, simple_cluster):
assert isinstance(time_stat, datetime.datetime)


def test_execution_errors(workflow, execution_args, simple_cluster):
def test_execution_errors(workflow, execution_kwargs, simple_cluster):
with tempfile.TemporaryDirectory() as tmp_dir_name:
executor = RayExecutor(workflow, execution_args, logs_folder=tmp_dir_name)
executor = RayExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name)
executor.run()

for idx, results in enumerate(executor.execution_results):
Expand All @@ -154,8 +154,8 @@ def test_execution_errors(workflow, execution_args, simple_cluster):
assert executor.get_failed_executions() == [3]


def test_execution_results(workflow, execution_args, simple_cluster):
executor = RayExecutor(workflow, execution_args)
def test_execution_results(workflow, execution_kwargs, simple_cluster):
executor = RayExecutor(workflow, execution_kwargs)
results = executor.run()

assert isinstance(results, list)
Expand All @@ -168,33 +168,33 @@ def test_execution_results(workflow, execution_args, simple_cluster):
def test_keyboard_interrupt(simple_cluster):
exception_node = EONode(KeyboardExceptionTask())
workflow = EOWorkflow([exception_node])
execution_args = []
execution_kwargs = []
for _ in range(10):
execution_args.append({exception_node: {'arg1': 1}})
execution_kwargs.append({exception_node: {'arg1': 1}})

with pytest.raises((ray.exceptions.TaskCancelledError, ray.exceptions.RayTaskError)):
RayExecutor(workflow, execution_args).run()
RayExecutor(workflow, execution_kwargs).run()


def test_reruns(workflow, execution_args, simple_cluster):
executor = RayExecutor(workflow, execution_args)
def test_reruns(workflow, execution_kwargs, simple_cluster):
executor = RayExecutor(workflow, execution_kwargs)
for _ in range(100):
executor.run()

for _ in range(10):
RayExecutor(workflow, execution_args).run()
RayExecutor(workflow, execution_kwargs).run()

executors = [RayExecutor(workflow, execution_args) for _ in range(10)]
executors = [RayExecutor(workflow, execution_kwargs) for _ in range(10)]
for executor in executors:
executor.run()


def test_run_after_interrupt(workflow, execution_args, simple_cluster):
def test_run_after_interrupt(workflow, execution_kwargs, simple_cluster):
foo_node = EONode(FooTask())
exception_node = EONode(KeyboardExceptionTask(), inputs=[foo_node])
exception_workflow = EOWorkflow([foo_node, exception_node])
exception_executor = RayExecutor(exception_workflow, [{}])
executor = RayExecutor(workflow, execution_args[:-1]) # removes args for exception
executor = RayExecutor(workflow, execution_kwargs[:-1]) # removes args for exception

result_preexception = executor.run()
with pytest.raises((ray.exceptions.TaskCancelledError, ray.exceptions.RayTaskError)):
Expand All @@ -204,9 +204,9 @@ def test_run_after_interrupt(workflow, execution_args, simple_cluster):
assert [res.outputs for res in result_preexception] == [res.outputs for res in result_postexception]


def test_mix_with_eoexecutor(workflow, execution_args, simple_cluster):
rayexecutor = RayExecutor(workflow, execution_args)
eoexecutor = EOExecutor(workflow, execution_args)
def test_mix_with_eoexecutor(workflow, execution_kwargs, simple_cluster):
rayexecutor = RayExecutor(workflow, execution_kwargs)
eoexecutor = EOExecutor(workflow, execution_kwargs)
for _ in range(10):
ray_results = rayexecutor.run()
eo_results = eoexecutor.run()
Expand Down
Loading