diff --git a/pandaserver/workflow/psnakemake_test.py b/pandaserver/workflow/psnakemake_test.py index ae03c0950..1427a2dc0 100644 --- a/pandaserver/workflow/psnakemake_test.py +++ b/pandaserver/workflow/psnakemake_test.py @@ -32,7 +32,6 @@ def main(): logging.info(f"{os.path.basename(__file__)}: workflow_file = {workflow_file}") parser = Parser(workflow_file, level=logging.DEBUG) nodes, root_in = parser.parse_nodes() - _ = parser.parse_code() dot_data = parser.get_dot_data() logging.info(f"dot data ={os.linesep}{dot_data}") s_id, t_nodes, nodes = resolve_nodes(nodes, root_in, data, 0, set(), sys.argv[2], logging) diff --git a/pandaserver/workflow/snakeparser/parser.py b/pandaserver/workflow/snakeparser/parser.py index 0dc438abb..39eeefda7 100644 --- a/pandaserver/workflow/snakeparser/parser.py +++ b/pandaserver/workflow/snakeparser/parser.py @@ -6,12 +6,16 @@ import pathlib import re from itertools import chain +from pathlib import Path from types import SimpleNamespace -import snakemake.dag -import snakemake.parser -import snakemake.persistence -import snakemake.workflow +from snakemake.api import ( + OutputSettings, + ResourceSettings, + SnakemakeApi, + StorageSettings, +) + from pandaserver.workflow.snakeparser.utils import ParamRule, param_of from pandaserver.workflow.workflow_utils import ConditionItem, Node @@ -59,16 +63,32 @@ def __init__(self, workflow_file, level=None, logger=None): snakefile = os.path.abspath(workflow_file) workdir = os.path.dirname(snakefile) self._logger.debug("create workflow") - self._workflow = snakemake.workflow.Workflow(snakefile=snakefile, overwrite_workdir=None) + # create workflow through API + with SnakemakeApi( + OutputSettings( + verbose=False, + show_failed_logs=True, + ), + ) as snakemake_api: + workflow_api = snakemake_api.workflow( + storage_settings=StorageSettings(), + resource_settings=ResourceSettings(), + snakefile=Path(snakefile), + ) + dag_api = workflow_api.dag() + self._workflow = workflow_api._workflow self._workflow.default_target = "all" + self._workflow.overwrite_workdir = None current_workdir = os.getcwd() try: inject() self._workflow.workdir(workdir) - self._workflow.include(self._workflow.main_snakefile, overwrite_default_target=True) finally: if current_workdir: os.chdir(current_workdir) + # build DAG + dag_api.unlock() + self._dag = self._workflow.dag @property def jobs(self): @@ -76,15 +96,6 @@ def jobs(self): return list() return self._dag.jobs - def parse_code(self): - if self._workflow is None: - return None - code, _, __ = snakemake.parser.parse( - snakemake.workflow.GenericSourceFile(self._workflow.main_snakefile), - self._workflow, - ) - return code - def parse_nodes(self, in_loop=False): try: return self._parse_nodes(in_loop) @@ -102,8 +113,6 @@ def parse_nodes(self, in_loop=False): raise ex def _parse_nodes(self, in_loop): - if self._dag is None: - self._build_dag() root_job = next(filter(lambda o: o.rule.name == self._workflow.default_target, self.jobs)) root_inputs = {Parser._extract_job_id(self._define_id(name)): value for name, value in root_job.params.items()} root_outputs = set( @@ -269,23 +278,8 @@ def verify_workflow(self): return True def get_dot_data(self): - if self._dag is None: - self._build_dag() return str(self._dag) - def _build_dag(self): - target_rules = list(filter(lambda o: o.name == self._workflow.default_target, self._workflow.rules)) - self._dag = snakemake.dag.DAG( - self._workflow, - rules=self._workflow.rules, - targetrules=target_rules, - targetfiles=set(), - ) - self._workflow.persistence = snakemake.persistence.Persistence(dag=self._dag) - self._dag.init() - self._dag.update_checkpoint_dependencies() - self._dag.check_dynamic() - @staticmethod def _extract_job_id(job_id): if not job_id: diff --git a/pyproject.toml b/pyproject.toml index c888d9536..c494e613b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ 'ruamel.yaml', 'cwl-utils>=0.13', 'packaging', - 'snakemake==7.30.1', + 'snakemake>=9.14.5', 'numpy', 'scipy', 'werkzeug',