diff --git a/core/eolearn/core/eoexecution.py b/core/eolearn/core/eoexecution.py index cc4781a04..662b95ce8 100644 --- a/core/eolearn/core/eoexecution.py +++ b/core/eolearn/core/eoexecution.py @@ -13,7 +13,6 @@ This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. """ -import os import logging import threading import concurrent.futures @@ -22,6 +21,7 @@ import warnings from enum import Enum from logging import Logger, Handler, Filter +from dataclasses import dataclass from typing import Sequence, List, Tuple, Dict, Optional, Callable, Iterable, TypeVar import fs @@ -30,6 +30,7 @@ from .eonode import EONode from .eoworkflow import EOWorkflow, WorkflowResults from .exceptions import EORuntimeWarning +from .fs_utils import get_base_filesystem_and_path, get_full_path from .utilities import LogFileFilter LOGGER = logging.getLogger(__name__) @@ -37,7 +38,7 @@ _InputType = TypeVar('_InputType') _OutputType = TypeVar('_OutputType') -_ExecutorProcessingArgsType = Tuple[EOWorkflow, Dict[EONode, Dict[str, object]], Optional[str], bool, Filter] +_HandlerFactoryType = Callable[[str], Handler] class _ProcessingType(Enum): @@ -49,6 +50,18 @@ class _ProcessingType(Enum): RAY = 'ray' +@dataclass(frozen=True) +class _ProcessingData: + """Data to be used in EOExecutor processing. This will be passed to a process pool, so everything has to be + serializable with pickle.""" + workflow: EOWorkflow + workflow_kwargs: Dict[EONode, Dict[str, object]] + log_path: Optional[str] + filter_logs_by_thread: bool + logs_filter: Optional[Filter] + logs_handler_factory: _HandlerFactoryType + + class EOExecutor: """ Simultaneously executes a workflow with different input arguments. In the process it monitors execution and handles errors. It can also save logs and create a html report about each execution. @@ -58,28 +71,33 @@ class EOExecutor: STATS_START_TIME = 'start_time' STATS_END_TIME = 'end_time' - def __init__(self, workflow: EOWorkflow, execution_args: Sequence[Dict[EONode, Dict[str, object]]], *, + def __init__(self, workflow: EOWorkflow, execution_kwargs: Sequence[Dict[EONode, Dict[str, object]]], *, execution_names: Optional[List[str]] = None, save_logs: bool = False, logs_folder: str = '.', - logs_filter: Optional[Filter] = None, filesystem: fs.base.FS = None): + filesystem: Optional[fs.base.FS] = None, logs_filter: Optional[Filter] = None, + logs_handler_factory: _HandlerFactoryType = logging.FileHandler): """ :param workflow: A prepared instance of EOWorkflow class - :param execution_args: A list of dictionaries where each dictionary represents execution inputs for the + :param execution_kwargs: A list of dictionaries where each dictionary represents execution inputs for the workflow. `EOExecutor` will execute the workflow for each of the given dictionaries in the list. The content of such dictionary will be used as `input_kwargs` parameter in `EOWorkflow.execution` method. Check `EOWorkflow.execution` for definition of a dictionary structure. :param execution_names: A list of execution names, which will be shown in execution report :param save_logs: Flag used to specify if execution log files should be saved locally on disk - :param logs_folder: A folder where logs and execution report should be saved + :param logs_folder: A folder where logs and execution report should be saved. If `filesystem` parameter is + defined the folder path should be relative to the filesystem. + :param filesystem: A filesystem object for saving logs and a report. :param logs_filter: An instance of a custom filter object that will filter certain logs from being written into logs. It works only if save_logs parameter is set to True. + :param logs_handler_factory: A callable class or function that takes logging path as its only input parameter + and creates an instance of logging handler object """ self.workflow = workflow - self.execution_args = self._parse_and_validate_execution_args(execution_args) - self.execution_names = self._parse_execution_names(execution_names, self.execution_args) + self.execution_kwargs = self._parse_and_validate_execution_kwargs(execution_kwargs) + self.execution_names = self._parse_execution_names(execution_names, self.execution_kwargs) self.save_logs = save_logs - self.logs_folder = os.path.abspath(logs_folder) + self.filesystem, self.logs_folder = self._parse_logs_filesystem(filesystem, logs_folder) self.logs_filter = logs_filter - self.filesystem = filesystem + self.logs_handler_factory = logs_handler_factory self.start_time = None self.report_folder = None @@ -87,30 +105,38 @@ def __init__(self, workflow: EOWorkflow, execution_args: Sequence[Dict[EONode, D self.execution_results = None @staticmethod - def _parse_and_validate_execution_args( - execution_args: Sequence[Dict[EONode, Dict[str, object]]]) -> List[Dict[EONode, Dict[str, object]]]: + def _parse_and_validate_execution_kwargs( + execution_kwargs: Sequence[Dict[EONode, Dict[str, object]]]) -> List[Dict[EONode, Dict[str, object]]]: """ Parses and validates execution arguments provided by user and raises an error if something is wrong """ - if not isinstance(execution_args, (list, tuple)): - raise ValueError("Parameter 'execution_args' should be a list") + if not isinstance(execution_kwargs, (list, tuple)): + raise ValueError("Parameter 'execution_kwargs' should be a list") - for input_kwargs in execution_args: + for input_kwargs in execution_kwargs: EOWorkflow.validate_input_kwargs(input_kwargs) - return [input_kwargs or {} for input_kwargs in execution_args] + return [input_kwargs or {} for input_kwargs in execution_kwargs] @staticmethod - def _parse_execution_names(execution_names: Optional[List[str]], execution_args: Sequence) -> List[str]: + def _parse_execution_names(execution_names: Optional[List[str]], execution_kwargs: Sequence) -> List[str]: """ Parses a list of execution names """ if execution_names is None: - return [str(num) for num in range(1, len(execution_args) + 1)] + return [str(num) for num in range(1, len(execution_kwargs) + 1)] - if not isinstance(execution_names, (list, tuple)) or len(execution_names) != len(execution_args): + if not isinstance(execution_names, (list, tuple)) or len(execution_names) != len(execution_kwargs): raise ValueError("Parameter 'execution_names' has to be a list of the same size as the list of " "execution arguments") return execution_names + @staticmethod + def _parse_logs_filesystem(filesystem: Optional[fs.base.FS], logs_folder: str) -> Tuple[fs.base.FS, str]: + """ Ensures a filesystem and a file path relative to it. + """ + if filesystem is None: + return get_base_filesystem_and_path(logs_folder) + return filesystem, logs_folder + def run(self, workers: int = 1, multiprocess: bool = True) -> List[WorkflowResults]: """ Runs the executor with n workers. @@ -129,14 +155,22 @@ def run(self, workers: int = 1, multiprocess: bool = True) -> List[WorkflowResul self.start_time = dt.datetime.now() self.report_folder = self._get_report_folder() if self.save_logs: - os.makedirs(self.report_folder, exist_ok=True) + self.filesystem.makedirs(self.report_folder, recreate=True) - log_paths = self.get_log_paths() if self.save_logs else [None] * len(self.execution_args) + log_paths = self.get_log_paths(full_path=True) if self.save_logs else [None] * len(self.execution_kwargs) filter_logs_by_thread = not multiprocess and workers > 1 - processing_args = [(self.workflow, init_args, log_path, filter_logs_by_thread, self.logs_filter) - for init_args, log_path in zip(self.execution_args, log_paths)] processing_type = self._get_processing_type(workers, multiprocess) + processing_args = [ + _ProcessingData( + workflow=self.workflow, + workflow_kwargs=workflow_kwargs, + log_path=log_path, + filter_logs_by_thread=filter_logs_by_thread, + logs_filter=self.logs_filter, + logs_handler_factory=self.logs_handler_factory + ) for workflow_kwargs, log_path in zip(self.execution_kwargs, log_paths) + ] full_execution_results = self._run_execution(processing_args, workers, processing_type) @@ -155,7 +189,7 @@ def _get_processing_type(workers: int, multiprocess: bool) -> _ProcessingType: return _ProcessingType.MULTIPROCESSING return _ProcessingType.MULTITHREADING - def _run_execution(self, processing_args: List[_ExecutorProcessingArgsType], workers: int, + def _run_execution(self, processing_args: List[_ProcessingData], workers: int, processing_type: _ProcessingType) -> List[WorkflowResults]: """ Runs the execution an each item of processing_args list """ @@ -176,15 +210,15 @@ def _run_execution(self, processing_args: List[_ExecutorProcessingArgsType], wor MULTIPROCESSING_LOCK = None @classmethod - def _try_add_logging(cls, log_path: Optional[str], filter_logs_by_thread: bool, - logs_filter: Optional[Filter]) -> Tuple[Optional[Logger], Optional[Handler]]: + def _try_add_logging(cls, log_path: Optional[str], filter_logs_by_thread: bool, logs_filter: Optional[Filter], + logs_handler_factory: _HandlerFactoryType) -> Tuple[Optional[Logger], Optional[Handler]]: """ Adds a handler to a logger and returns them both. In case this fails it shows a warning. """ if log_path: try: logger = logging.getLogger() logger.setLevel(logging.DEBUG) - handler = cls._get_log_handler(log_path, filter_logs_by_thread, logs_filter) + handler = cls._build_log_handler(log_path, filter_logs_by_thread, logs_filter, logs_handler_factory) logger.addHandler(handler) return logger, handler except BaseException as exception: @@ -204,24 +238,28 @@ def _try_remove_logging(cls, log_path: Optional[str], logger: Optional[Logger], warnings.warn(f'Failed to end logging with exception: {repr(exception)}', category=EORuntimeWarning) @classmethod - def _execute_workflow(cls, process_args: _ExecutorProcessingArgsType) -> WorkflowResults: + def _execute_workflow(cls, data: _ProcessingData) -> WorkflowResults: """ Handles a single execution of a workflow """ - workflow, input_args, log_path, filter_logs_by_thread, logs_filter = process_args - logger, handler = cls._try_add_logging(log_path, filter_logs_by_thread, logs_filter) + logger, handler = cls._try_add_logging( + data.log_path, data.filter_logs_by_thread, data.logs_filter, data.logs_handler_factory + ) - results = workflow.execute(input_args, raise_errors=False) + results = data.workflow.execute(data.workflow_kwargs, raise_errors=False) - cls._try_remove_logging(log_path, logger, handler) + cls._try_remove_logging(data.log_path, logger, handler) return results @staticmethod - def _get_log_handler(log_path: str, filter_logs_by_thread: bool, logs_filter: Optional[Filter]) -> Handler: + def _build_log_handler(log_path: str, filter_logs_by_thread: bool, logs_filter: Optional[Filter], + logs_handler_factory: _HandlerFactoryType) -> Handler: """ Provides object which handles logs """ - handler = logging.FileHandler(log_path) - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - handler.setFormatter(formatter) + handler = logs_handler_factory(log_path) + + if not handler.formatter: + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + handler.setFormatter(formatter) if filter_logs_by_thread: handler.addFilter(LogFileFilter(threading.currentThread().getName())) @@ -247,12 +285,11 @@ def _prepare_general_stats(self, workers: int, processing_type: _ProcessingType) def _get_report_folder(self) -> str: """ Returns file path of folder where report will be saved """ - return os.path.join(self.logs_folder, - f'eoexecution-report-{self.start_time.strftime("%Y_%m_%d-%H_%M_%S")}') + return fs.path.combine(self.logs_folder, f'eoexecution-report-{self.start_time.strftime("%Y_%m_%d-%H_%M_%S")}') def get_successful_executions(self) -> List[int]: """ Returns a list of IDs of successful executions. The IDs are integers from interval - `[0, len(execution_args) - 1]`, sorted in increasing order. + `[0, len(execution_kwargs) - 1]`, sorted in increasing order. :return: List of successful execution IDs """ @@ -260,18 +297,23 @@ def get_successful_executions(self) -> List[int]: def get_failed_executions(self) -> List[int]: """ Returns a list of IDs of failed executions. The IDs are integers from interval - `[0, len(execution_args) - 1]`, sorted in increasing order. + `[0, len(execution_kwargs) - 1]`, sorted in increasing order. :return: List of failed execution IDs """ return [idx for idx, results in enumerate(self.execution_results) if results.workflow_failed()] - def get_report_filename(self) -> str: + def get_report_path(self, full_path: bool = True) -> str: """ Returns the filename and file path of the report + :param full_path: A flag to specify if it should return full absolute paths or paths relative to the + filesystem object. :return: Report filename """ - return os.path.join(self.report_folder, self.REPORT_FILENAME) + report_path = fs.path.combine(self.report_folder, self.REPORT_FILENAME) + if full_path: + return get_full_path(self.filesystem, report_path) + return report_path def make_report(self, include_logs: bool = True): """ Makes a html report and saves it into the same folder where logs are stored. @@ -289,26 +331,32 @@ def make_report(self, include_logs: bool = True): return EOExecutorVisualization(self).make_report(include_logs=include_logs) - def get_log_paths(self) -> List[str]: + def get_log_paths(self, full_path: bool = True) -> List[str]: """ Returns a list of file paths containing logs + + :param full_path: A flag to specify if it should return full absolute paths or paths relative to the + filesystem object. + :return: A list of paths to log files. """ - return [os.path.join(self.report_folder, f'eoexecution-{name}.log') for name in self.execution_names] + log_paths = [fs.path.combine(self.report_folder, f'eoexecution-{name}.log') for name in self.execution_names] + if full_path: + return [get_full_path(self.filesystem, path) for path in log_paths] + return log_paths def read_logs(self) -> List[Optional[str]]: """ Loads the content of log files if logs have been saved """ if not self.save_logs: - return [None] * len(self.execution_args) + return [None] * len(self.execution_kwargs) - log_paths = self.get_log_paths() + log_paths = self.get_log_paths(full_path=False) with concurrent.futures.ThreadPoolExecutor() as executor: return list(executor.map(self._read_log_file, log_paths)) - @staticmethod - def _read_log_file(log_path: str) -> str: + def _read_log_file(self, log_path: str) -> str: """Read a content of a log file""" try: - with open(log_path, "r") as file_handle: + with self.filesystem.open(log_path, "r") as file_handle: return file_handle.read() except BaseException as exception: warnings.warn(f'Failed to load logs with exception: {repr(exception)}', category=EORuntimeWarning) diff --git a/core/eolearn/core/eonode.py b/core/eolearn/core/eonode.py index fd4a87255..ff33e3365 100644 --- a/core/eolearn/core/eonode.py +++ b/core/eolearn/core/eonode.py @@ -42,7 +42,7 @@ def __post_init__(self): if self.name is None: super().__setattr__('name', self.task.__class__.__name__) - super().__setattr__('uid', generate_uid(self.name)) + super().__setattr__('uid', generate_uid(self.task.__class__.__name__)) def get_custom_name(self, number=0): """ Provides custom node name according to the class of the contained task and a given number diff --git a/core/eolearn/core/extra/ray.py b/core/eolearn/core/extra/ray.py index a757238c0..72c08beb8 100644 --- a/core/eolearn/core/extra/ray.py +++ b/core/eolearn/core/extra/ray.py @@ -42,10 +42,10 @@ def _get_processing_type(*_, **__): return _ProcessingType.RAY @classmethod - def _run_execution(cls, processing_args, *_, **__): + def _run_execution(cls, execution_kwargs, *_, **__): """ Runs ray execution """ - futures = [_ray_workflow_executor.remote(workflow_args) for workflow_args in processing_args] + futures = [_ray_workflow_executor.remote(workflow_kwargs) for workflow_kwargs in execution_kwargs] for _ in tqdm(_progress_bar_iterator(futures), total=len(futures)): pass @@ -54,11 +54,11 @@ def _run_execution(cls, processing_args, *_, **__): @ray.remote -def _ray_workflow_executor(workflow_args): +def _ray_workflow_executor(workflow_kwargs): """ Called to execute a workflow on a ray worker """ # pylint: disable=protected-access - return RayExecutor._execute_workflow(workflow_args) + return RayExecutor._execute_workflow(workflow_kwargs) def _progress_bar_iterator(futures): diff --git a/core/eolearn/core/fs_utils.py b/core/eolearn/core/fs_utils.py index 162eb3e29..5bbfacbf3 100644 --- a/core/eolearn/core/fs_utils.py +++ b/core/eolearn/core/fs_utils.py @@ -35,7 +35,7 @@ def get_filesystem(path: str, create: bool = False, config: Optional[SHConfig] = if isinstance(path, Path): path = str(path) - if path.startswith('s3://'): + if is_s3_path(path): return load_s3_filesystem(path, config=config, **kwargs) return fs.open_fs(path, create=create, **kwargs) @@ -84,7 +84,7 @@ def load_s3_filesystem(path: str, strict: bool = False, config: Optional[SHConfi :return: A S3 filesystem object :rtype: fs_s3fs.S3FS """ - if not path.startswith('s3://'): + if not is_s3_path(path): raise ValueError(f"AWS path has to start with s3:// but found '{path}'") config = config or SHConfig() @@ -120,3 +120,33 @@ def get_aws_credentials(aws_profile: str, config: Optional[SHConfig] = None) -> config.aws_access_key_id = aws_credentials.access_key config.aws_secret_access_key = aws_credentials.secret_key return config + + +def get_full_path(filesystem: fs.base.FS, relative_path: str) -> str: + """Given a filesystem object and a path, relative to the filesystem it provides a full path.""" + if isinstance(filesystem, S3FS): + # pylint: disable=protected-access + return join_path(f"s3://{filesystem._bucket_name}", filesystem.dir_path, relative_path) + + return os.path.normpath(filesystem.getsyspath(relative_path)) + + +def join_path(*path_parts: str) -> str: + """A utility function for joining a path that is either local or S3. + + :param path_parts: Partial paths where the first part will be used to decide if it is an S3 path or a local path + :return: Joined path that is also normalized and absolute. + """ + if is_s3_path(path_parts[0]): + path_parts = ((part[5:] if index == 0 else part) for index, part in enumerate(path_parts)) + path = "/".join(part.strip("/") for part in path_parts) + path = fs.path.normpath(path) + return f"s3://{path}" + + # pylint: disable=no-value-for-parameter + return os.path.abspath(os.path.join(*path_parts)) + + +def is_s3_path(path: str) -> bool: + """Returns True if the path points to a S3 bucket, False otherwise.""" + return path.startswith("s3://") diff --git a/core/eolearn/tests/test_eoexecutor.py b/core/eolearn/tests/test_eoexecutor.py index 9aca70092..c41caf414 100644 --- a/core/eolearn/tests/test_eoexecutor.py +++ b/core/eolearn/tests/test_eoexecutor.py @@ -85,17 +85,17 @@ def workflow_fixture(test_nodes): return workflow -@pytest.fixture(name='execution_args') -def execution_args_fixture(test_nodes): +@pytest.fixture(name='execution_kwargs') +def execution_kwargs_fixture(test_nodes): example_node = test_nodes['example'] - execution_args = [ + execution_kwargs = [ {example_node: {'arg1': 1}}, {}, {example_node: {'arg1': 3, 'arg3': 10}}, {example_node: {'arg1': None}} ] - return execution_args + return execution_kwargs @pytest.mark.parametrize( @@ -107,11 +107,11 @@ def execution_args_fixture(test_nodes): ] ) @pytest.mark.parametrize('execution_names', [None, [4, 'x', 'y', 'z']]) -def test_read_logs(test_args, execution_names, workflow, execution_args): +def test_read_logs(test_args, execution_names, workflow, execution_kwargs): workers, multiprocess, filter_logs = test_args with tempfile.TemporaryDirectory() as tmp_dir_name: executor = EOExecutor( - workflow, execution_args, save_logs=True, + workflow, execution_kwargs, save_logs=True, logs_folder=tmp_dir_name, logs_filter=CustomLogFilter() if filter_logs else None, execution_names=execution_names @@ -123,7 +123,7 @@ def test_read_logs(test_args, execution_names, workflow, execution_args): for log in execution_logs: assert len(log.split()) >= 3 - log_filenames = sorted(os.listdir(executor.report_folder)) + log_filenames = sorted(executor.filesystem.listdir(executor.report_folder)) assert len(log_filenames) == 4 if execution_names: @@ -131,15 +131,15 @@ def test_read_logs(test_args, execution_names, workflow, execution_args): assert log_filename == f'eoexecution-{name}.log' log_path = os.path.join(executor.report_folder, log_filenames[0]) - with open(log_path, 'r') as fp: + with executor.filesystem.open(log_path, 'r') as fp: line_count = len(fp.readlines()) expected_line_count = 2 if filter_logs else 12 assert line_count == expected_line_count -def test_execution_results(workflow, execution_args): +def test_execution_results(workflow, execution_kwargs): with tempfile.TemporaryDirectory() as tmp_dir_name: - executor = EOExecutor(workflow, execution_args, logs_folder=tmp_dir_name) + executor = EOExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name) executor.run(workers=2) assert len(executor.execution_results) == 4 @@ -149,9 +149,9 @@ def test_execution_results(workflow, execution_args): @pytest.mark.parametrize('multiprocess', [True, False]) -def test_execution_errors(multiprocess, workflow, execution_args): +def test_execution_errors(multiprocess, workflow, execution_kwargs): with tempfile.TemporaryDirectory() as tmp_dir_name: - executor = EOExecutor(workflow, execution_args, logs_folder=tmp_dir_name) + executor = EOExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name) executor.run(workers=5, multiprocess=multiprocess) for idx, results in enumerate(executor.execution_results): @@ -164,8 +164,8 @@ def test_execution_errors(multiprocess, workflow, execution_args): assert executor.get_failed_executions() == [3] -def test_execution_results(workflow, execution_args): - executor = EOExecutor(workflow, execution_args) +def test_execution_results(workflow, execution_kwargs): + executor = EOExecutor(workflow, execution_kwargs) results = executor.run(workers=2, multiprocess=True) assert isinstance(results, list) @@ -176,28 +176,28 @@ def test_execution_results(workflow, execution_args): assert workflow_results.outputs['output'] == 42 -def test_exceptions(workflow, execution_args): +def test_exceptions(workflow, execution_kwargs): with pytest.raises(ValueError): EOExecutor(workflow, {}) with pytest.raises(ValueError): - EOExecutor(workflow, execution_args, execution_names={1, 2, 3, 4}) + EOExecutor(workflow, execution_kwargs, execution_names={1, 2, 3, 4}) with pytest.raises(ValueError): - EOExecutor(workflow, execution_args, execution_names=['a', 'b']) + EOExecutor(workflow, execution_kwargs, execution_names=['a', 'b']) def test_keyboard_interrupt(): exception_node = EONode(KeyboardExceptionTask()) workflow = EOWorkflow([exception_node]) - execution_args = [] + execution_kwargs = [] for _ in range(10): - execution_args.append({exception_node: {'arg1': 1}}) + execution_kwargs.append({exception_node: {'arg1': 1}}) - run_args = [{'workers': 1}, + run_kwargs = [{'workers': 1}, {'workers': 3, 'multiprocess': True}, {'workers': 3, 'multiprocess': False}] - for arg in run_args: + for kwarg in run_kwargs: with pytest.raises(KeyboardInterrupt): - EOExecutor(workflow, execution_args).run(**arg) + EOExecutor(workflow, execution_kwargs).run(**kwarg) def test_with_lock(num_workers): diff --git a/core/eolearn/tests/test_fs_utils.py b/core/eolearn/tests/test_fs_utils.py index 18a40f79a..ff44291ef 100644 --- a/core/eolearn/tests/test_fs_utils.py +++ b/core/eolearn/tests/test_fs_utils.py @@ -19,7 +19,7 @@ from sentinelhub import SHConfig from eolearn.core import get_filesystem, load_s3_filesystem -from eolearn.core.fs_utils import get_aws_credentials +from eolearn.core.fs_utils import get_aws_credentials, get_full_path, join_path def test_get_local_filesystem(tmp_path): @@ -79,3 +79,28 @@ def test_get_aws_credentials(mocked_copy): config = get_aws_credentials('default', config=default_config) assert config.aws_access_key_id != default_config.aws_access_key_id assert config.aws_secret_access_key != default_config.aws_secret_access_key + + +@pytest.mark.parametrize( + argnames="path_parts, expected_path", + ids=["local", "s3"], + argvalues=[ + (["/tmp", "folder", "xyz", "..", "file.json"], os.path.join("/tmp", "folder", "file.json")), + (["s3://xx/", "/y/z", "a", "..", "b.json"], "s3://xx/y/z/b.json"), + ], +) +def test_join_path(path_parts, expected_path): + assert join_path(*path_parts) == expected_path + + +@pytest.mark.parametrize( + "filesystem, path, expected_full_path", + [ + (OSFS("/tmp"), "my/folder", "/tmp/my/folder"), + (S3FS(bucket_name="data", dir_path="/folder"), "/sub/folder", "s3://data/folder/sub/folder"), + (S3FS(bucket_name="data"), "/sub/folder", "s3://data/sub/folder"), + ], +) +def test_get_full_path(filesystem, path, expected_full_path): + full_path = get_full_path(filesystem, path) + assert full_path == expected_full_path diff --git a/core/eolearn/tests/test_rayexecutor.py b/core/eolearn/tests/test_rayexecutor.py index 1963f2a28..e3ad7b850 100644 --- a/core/eolearn/tests/test_rayexecutor.py +++ b/core/eolearn/tests/test_rayexecutor.py @@ -77,32 +77,32 @@ def workflow_fixture(test_nodes): return workflow -@pytest.fixture(name='execution_args') -def execution_args_fixture(test_nodes): +@pytest.fixture(name='execution_kwargs') +def execution_kwargs_fixture(test_nodes): example_node = test_nodes['example'] - execution_args = [ + execution_kwargs = [ {example_node: {'arg1': 1}}, {}, {example_node: {'arg1': 3, 'arg3': 10}}, {example_node: {'arg1': None}} ] - return execution_args + return execution_kwargs -def test_fail_without_ray(workflow, execution_args): - executor = RayExecutor(workflow, execution_args) +def test_fail_without_ray(workflow, execution_kwargs): + executor = RayExecutor(workflow, execution_kwargs) with pytest.raises(RuntimeError): executor.run() @pytest.mark.parametrize('filter_logs', [True, False]) @pytest.mark.parametrize('execution_names', [None, [4, 'x', 'y', 'z']]) -def test_read_logs(filter_logs, execution_names, workflow, execution_args, simple_cluster): +def test_read_logs(filter_logs, execution_names, workflow, execution_kwargs, simple_cluster): with tempfile.TemporaryDirectory() as tmp_dir_name: executor = RayExecutor( - workflow, execution_args, save_logs=True, + workflow, execution_kwargs, save_logs=True, logs_folder=tmp_dir_name, logs_filter=CustomLogFilter() if filter_logs else None, execution_names=execution_names @@ -114,7 +114,7 @@ def test_read_logs(filter_logs, execution_names, workflow, execution_args, simpl for log in execution_logs: assert len(log.split()) >= 3 - log_filenames = sorted(os.listdir(executor.report_folder)) + log_filenames = sorted(executor.filesystem.listdir(executor.report_folder)) assert len(log_filenames) == 4 if execution_names: @@ -122,15 +122,15 @@ def test_read_logs(filter_logs, execution_names, workflow, execution_args, simpl assert log_filename == f'eoexecution-{name}.log' log_path = os.path.join(executor.report_folder, log_filenames[0]) - with open(log_path, 'r') as fp: + with executor.filesystem.open(log_path, 'r') as fp: line_count = len(fp.readlines()) expected_line_count = 2 if filter_logs else 12 assert line_count == expected_line_count -def test_execution_results(workflow, execution_args, simple_cluster): +def test_execution_results(workflow, execution_kwargs, simple_cluster): with tempfile.TemporaryDirectory() as tmp_dir_name: - executor = RayExecutor(workflow, execution_args, logs_folder=tmp_dir_name) + executor = RayExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name) executor.run() assert len(executor.execution_results) == 4 @@ -139,9 +139,9 @@ def test_execution_results(workflow, execution_args, simple_cluster): assert isinstance(time_stat, datetime.datetime) -def test_execution_errors(workflow, execution_args, simple_cluster): +def test_execution_errors(workflow, execution_kwargs, simple_cluster): with tempfile.TemporaryDirectory() as tmp_dir_name: - executor = RayExecutor(workflow, execution_args, logs_folder=tmp_dir_name) + executor = RayExecutor(workflow, execution_kwargs, logs_folder=tmp_dir_name) executor.run() for idx, results in enumerate(executor.execution_results): @@ -154,8 +154,8 @@ def test_execution_errors(workflow, execution_args, simple_cluster): assert executor.get_failed_executions() == [3] -def test_execution_results(workflow, execution_args, simple_cluster): - executor = RayExecutor(workflow, execution_args) +def test_execution_results(workflow, execution_kwargs, simple_cluster): + executor = RayExecutor(workflow, execution_kwargs) results = executor.run() assert isinstance(results, list) @@ -168,33 +168,33 @@ def test_execution_results(workflow, execution_args, simple_cluster): def test_keyboard_interrupt(simple_cluster): exception_node = EONode(KeyboardExceptionTask()) workflow = EOWorkflow([exception_node]) - execution_args = [] + execution_kwargs = [] for _ in range(10): - execution_args.append({exception_node: {'arg1': 1}}) + execution_kwargs.append({exception_node: {'arg1': 1}}) with pytest.raises((ray.exceptions.TaskCancelledError, ray.exceptions.RayTaskError)): - RayExecutor(workflow, execution_args).run() + RayExecutor(workflow, execution_kwargs).run() -def test_reruns(workflow, execution_args, simple_cluster): - executor = RayExecutor(workflow, execution_args) +def test_reruns(workflow, execution_kwargs, simple_cluster): + executor = RayExecutor(workflow, execution_kwargs) for _ in range(100): executor.run() for _ in range(10): - RayExecutor(workflow, execution_args).run() + RayExecutor(workflow, execution_kwargs).run() - executors = [RayExecutor(workflow, execution_args) for _ in range(10)] + executors = [RayExecutor(workflow, execution_kwargs) for _ in range(10)] for executor in executors: executor.run() -def test_run_after_interrupt(workflow, execution_args, simple_cluster): +def test_run_after_interrupt(workflow, execution_kwargs, simple_cluster): foo_node = EONode(FooTask()) exception_node = EONode(KeyboardExceptionTask(), inputs=[foo_node]) exception_workflow = EOWorkflow([foo_node, exception_node]) exception_executor = RayExecutor(exception_workflow, [{}]) - executor = RayExecutor(workflow, execution_args[:-1]) # removes args for exception + executor = RayExecutor(workflow, execution_kwargs[:-1]) # removes args for exception result_preexception = executor.run() with pytest.raises((ray.exceptions.TaskCancelledError, ray.exceptions.RayTaskError)): @@ -204,9 +204,9 @@ def test_run_after_interrupt(workflow, execution_args, simple_cluster): assert [res.outputs for res in result_preexception] == [res.outputs for res in result_postexception] -def test_mix_with_eoexecutor(workflow, execution_args, simple_cluster): - rayexecutor = RayExecutor(workflow, execution_args) - eoexecutor = EOExecutor(workflow, execution_args) +def test_mix_with_eoexecutor(workflow, execution_kwargs, simple_cluster): + rayexecutor = RayExecutor(workflow, execution_kwargs) + eoexecutor = EOExecutor(workflow, execution_kwargs) for _ in range(10): ray_results = rayexecutor.run() eo_results = eoexecutor.run() diff --git a/examples/core/CoreOverview.ipynb b/examples/core/CoreOverview.ipynb index 7a254765e..9384662bb 100644 --- a/examples/core/CoreOverview.ipynb +++ b/examples/core/CoreOverview.ipynb @@ -757,7 +757,7 @@ "\n", "executor.make_report()\n", "\n", - "print('Report was saved to location: {}'.format(executor.get_report_filename()))" + "print('Report was saved to location: {}'.format(executor.get_report_path()))" ] }, { diff --git a/examples/land-cover-map/SI_LULC_pipeline.ipynb b/examples/land-cover-map/SI_LULC_pipeline.ipynb index bbb4cce66..92fdd830e 100644 --- a/examples/land-cover-map/SI_LULC_pipeline.ipynb +++ b/examples/land-cover-map/SI_LULC_pipeline.ipynb @@ -2665,7 +2665,7 @@ "failed_ids = executor.get_failed_executions()\n", "if failed_ids:\n", " raise RuntimeError(f'Execution failed EOPatches with IDs:\\n{failed_ids}\\n'\n", - " f'For more info check report at {executor.get_report_filename()}')" + " f'For more info check report at {executor.get_report_path()}')" ] }, { @@ -3281,7 +3281,7 @@ "failed_ids = executor.get_failed_executions()\n", "if failed_ids:\n", " raise RuntimeError(f'Execution failed EOPatches with IDs:\\n{failed_ids}\\n'\n", - " f'For more info check report at {executor.get_report_filename()}')" + " f'For more info check report at {executor.get_report_path()}')" ] }, { @@ -3960,7 +3960,7 @@ "failed_ids = executor.get_failed_executions()\n", "if failed_ids:\n", " raise RuntimeError(f'Execution failed EOPatches with IDs:\\n{failed_ids}\\n'\n", - " f'For more info check report at {executor.get_report_filename()}')" + " f'For more info check report at {executor.get_report_path()}')" ] }, { diff --git a/visualization/eolearn/tests/test_eoexecutor_visualization.py b/visualization/eolearn/tests/test_eoexecutor_visualization.py index ee50efcde..45ccd4a45 100644 --- a/visualization/eolearn/tests/test_eoexecutor_visualization.py +++ b/visualization/eolearn/tests/test_eoexecutor_visualization.py @@ -34,7 +34,7 @@ def execute(self, *_, **kwargs): NODE = EONode(ExampleTask()) WORKFLOW = EOWorkflow([NODE, EONode(task=ExampleTask(), inputs=[NODE, NODE])]) -EXECUTION_ARGS = [ +EXECUTION_KWARGS = [ {NODE: {'arg1': 1}}, {}, {NODE: {'arg1': 3, 'arg3': 10}}, @@ -47,10 +47,10 @@ def execute(self, *_, **kwargs): def test_report_creation(save_logs, include_logs): with tempfile.TemporaryDirectory() as tmp_dir_name: executor = EOExecutor( - WORKFLOW, EXECUTION_ARGS, logs_folder=tmp_dir_name, save_logs=save_logs, + WORKFLOW, EXECUTION_KWARGS, logs_folder=tmp_dir_name, save_logs=save_logs, execution_names=['ex 1', 2, 0.4, None] ) executor.run(workers=10) executor.make_report(include_logs=include_logs) - assert os.path.exists(executor.get_report_filename()), 'Execution report was not created' + assert os.path.exists(executor.get_report_path()), 'Execution report was not created' diff --git a/visualization/eolearn/visualization/eoexecutor_visualization.py b/visualization/eolearn/visualization/eoexecutor_visualization.py index 9ee7585a3..cf8d10f14 100644 --- a/visualization/eolearn/visualization/eoexecutor_visualization.py +++ b/visualization/eolearn/visualization/eoexecutor_visualization.py @@ -10,11 +10,13 @@ file in the root directory of this source tree. """ import os +import importlib import inspect import warnings import base64 import datetime as dt -from collections import OrderedDict, defaultdict +from collections import defaultdict +from typing import DefaultDict, List, Tuple try: import matplotlib.pyplot as plt @@ -23,6 +25,7 @@ matplotlib.use('agg') import matplotlib.pyplot as plt +import fs import graphviz import pygments import pygments.lexers @@ -67,15 +70,17 @@ def make_report(self, include_logs: bool = True): template = self._get_template() - execution_log_filenames = [os.path.basename(log_path) for log_path in self.eoexecutor.get_log_paths()] + execution_log_filenames = [fs.path.basename(log_path) for log_path in self.eoexecutor.get_log_paths()] if self.eoexecutor.save_logs: execution_logs = self.eoexecutor.read_logs() if include_logs else None else: - execution_logs = ["No logs saved"] * len(self.eoexecutor.execution_args) + execution_logs = ["No logs saved"] * len(self.eoexecutor.execution_kwargs) html = template.render( + title=f'Report {self._format_datetime(self.eoexecutor.start_time)}', dependency_graph=dependency_graph, general_stats=self.eoexecutor.general_stats, + exception_stats=self._get_exception_stats(), task_descriptions=self._get_node_descriptions(), task_sources=self._render_task_sources(formatter), execution_results=self.eoexecutor.execution_results, @@ -86,10 +91,10 @@ def make_report(self, include_logs: bool = True): code_css=formatter.get_style_defs() ) - os.makedirs(self.eoexecutor.report_folder, exist_ok=True) + self.eoexecutor.filesystem.makedirs(self.eoexecutor.report_folder, recreate=True) - with open(self.eoexecutor.get_report_filename(), 'w') as fout: - fout.write(html) + with self.eoexecutor.filesystem.open(self.eoexecutor.get_report_path(full_path=False), 'w') as file_handle: + file_handle.write(html) def _create_dependency_graph(self): """ Provides an image of dependency graph @@ -97,6 +102,45 @@ def _create_dependency_graph(self): dot = self.eoexecutor.workflow.dependency_graph() return base64.b64encode(dot.pipe()).decode() + def _get_exception_stats(self): + """ Creates aggregated stats about exceptions + """ + formatter = HtmlFormatter() + lexer = pygments.lexers.get_lexer_by_name('python', stripall=True) + + exception_stats = defaultdict(lambda: defaultdict(lambda: 0)) + + for workflow_results in self.eoexecutor.execution_results: + if not workflow_results.error_node_uid: + continue + + error_node = workflow_results.stats[workflow_results.error_node_uid] + exception_str = pygments.highlight( + f'{error_node.exception.__class__.__name__}: {error_node.exception}', + lexer, + formatter + ) + exception_stats[error_node.node_uid][exception_str] += 1 + + return self._to_ordered_stats(exception_stats) + + def _to_ordered_stats( + self, + exception_stats: DefaultDict[str, DefaultDict[str, int]] + ) -> List[Tuple[str, str, List[Tuple[str, int]]]]: + """ Exception stats get ordered by nodes in their execution order in workflows. Exception stats that happen + for the the same node get ordered by number of occurrences in a decreasing order. + """ + ordered_exception_stats = [] + for node in self.eoexecutor.workflow.get_nodes(): + if node.uid not in exception_stats: + continue + + node_stats = exception_stats[node.uid] + ordered_exception_stats.append((node.name, node.uid, sorted(node_stats.items(), key=lambda item: -item[1]))) + + return ordered_exception_stats + def _get_node_descriptions(self): """ Prepares a list of node names and initialization parameters of their tasks """ @@ -109,6 +153,7 @@ def _get_node_descriptions(self): descriptions.append({ 'name': f'{node_name} ({node.uid})', + 'uid': node.uid, 'args': { key: value.replace('<', '<').replace('>', '>') for key, value in node.task.private_task_config.init_args.items() @@ -121,25 +166,29 @@ def _render_task_sources(self, formatter): """ Renders source code of EOTasks """ lexer = pygments.lexers.get_lexer_by_name("python", stripall=True) - sources = OrderedDict() + sources = {} for node in self.eoexecutor.workflow.get_nodes(): task = node.task - if task.__module__.startswith("eolearn"): - continue key = f"{task.__class__.__name__} ({task.__module__})" if key in sources: continue - try: - source = inspect.getsource(task.__class__) - source = pygments.highlight(source, lexer, formatter) - except TypeError: - # Jupyter notebook does not have __file__ method to collect source code - # StackOverflow provides no solutions - # Could be investigated further by looking into Jupyter Notebook source code - source = None + if task.__module__.startswith("eolearn"): + subpackage_name = '.'.join(task.__module__.split('.')[:2]) + subpackage = importlib.import_module(subpackage_name) + subpackage_version = subpackage.__version__ if hasattr(subpackage, "__version__") else "unknown" + source = subpackage_name, subpackage_version + else: + try: + source = inspect.getsource(task.__class__) + source = pygments.highlight(source, lexer, formatter) + except TypeError: + # Jupyter notebook does not have __file__ method to collect source code + # StackOverflow provides no solutions + # Could be investigated further by looking into Jupyter Notebook source code + source = None sources[key] = source diff --git a/visualization/eolearn/visualization/report_templates/report.html b/visualization/eolearn/visualization/report_templates/report.html index 1d522bda9..3566ae875 100644 --- a/visualization/eolearn/visualization/report_templates/report.html +++ b/visualization/eolearn/visualization/report_templates/report.html @@ -107,6 +107,26 @@
+ No initialization parameters +
+Cannot collect source code of a task which is not defined in a .py file
- {% else %} + {% elif task_source is string %} {{ task_source }} + {% else %} ++ Imported from {{ task_source[0] }} version {{ task_source[1] }} +
{% endif %}