|
51 | 51 | )
|
52 | 52 | from .writablebagfile import create_job, write_bag_file # change this later
|
53 | 53 |
|
| 54 | +# from schema_salad.utils import convert_to_dict |
| 55 | + |
| 56 | + |
54 | 57 | if TYPE_CHECKING:
|
55 | 58 | from .ro import ResearchObject
|
56 | 59 |
|
| 60 | +_attributes_type = Dict[str | Identifier, Any] |
| 61 | + |
57 | 62 |
|
58 | 63 | def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType:
|
59 | 64 | """Create copy of job object for provenance."""
|
@@ -235,13 +240,13 @@ def evaluate(
|
235 | 240 | """Evaluate the nature of job."""
|
236 | 241 | if not hasattr(process, "steps"):
|
237 | 242 | # record provenance of independent commandline tool executions
|
238 |
| - self.prospective_prov(job) |
| 243 | + self.prospective_prov(job, process) |
239 | 244 | customised_job = copy_job_order(job, job_order_object)
|
240 | 245 | self.used_artefacts(customised_job, self.workflow_run_uri)
|
241 | 246 | create_job(research_obj, customised_job)
|
242 | 247 | elif hasattr(job, "workflow"):
|
243 | 248 | # record provenance of workflow executions
|
244 |
| - self.prospective_prov(job) |
| 249 | + self.prospective_prov(job, process) |
245 | 250 | customised_job = copy_job_order(job, job_order_object)
|
246 | 251 | self.used_artefacts(customised_job, self.workflow_run_uri)
|
247 | 252 | # if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place
|
@@ -734,35 +739,38 @@ def generate_output_prov(
|
734 | 739 | entity, process_run_id, timestamp, None, {"prov:role": role}
|
735 | 740 | )
|
736 | 741 |
|
737 |
| - def prospective_prov(self, job: JobsType) -> None: |
| 742 | + def prospective_prov(self, job: JobsType, process: Process) -> None: |
738 | 743 | """Create prospective prov recording as wfdesc prov:Plan."""
|
| 744 | + prov_items: _attributes_type = { |
| 745 | + PROV_TYPE: WFDESC["Workflow"] if isinstance(job, WorkflowJob) else WFDESC["Process"], |
| 746 | + "prov:type": PROV["Plan"], |
| 747 | + "prov:label": "Prospective provenance", |
| 748 | + } |
| 749 | + if "doc" in process.tool: |
| 750 | + prov_items["schema:description"] = process.tool["doc"] |
| 751 | + if "label" in process.tool: |
| 752 | + prov_items["schema:name"] = process.tool["label"] |
| 753 | + # # TypeError: unhashable type: 'list' |
| 754 | + # if "intent" in process.tool: |
| 755 | + # prov_items["schema:featureList"] = convert_to_dict(process.tool["intent"]) |
| 756 | + self.document.entity("wf:main", prov_items) |
739 | 757 | if not isinstance(job, WorkflowJob):
|
740 |
| - # direct command line tool execution |
741 |
| - self.document.entity( |
742 |
| - "wf:main", |
743 |
| - { |
744 |
| - PROV_TYPE: WFDESC["Process"], |
745 |
| - "prov:type": PROV["Plan"], |
746 |
| - "prov:label": "Prospective provenance", |
747 |
| - }, |
748 |
| - ) |
749 | 758 | return
|
750 | 759 |
|
751 |
| - self.document.entity( |
752 |
| - "wf:main", |
753 |
| - { |
754 |
| - PROV_TYPE: WFDESC["Workflow"], |
755 |
| - "prov:type": PROV["Plan"], |
756 |
| - "prov:label": "Prospective provenance", |
757 |
| - }, |
758 |
| - ) |
759 |
| - |
760 | 760 | for step in job.steps:
|
761 | 761 | stepnametemp = "wf:main/" + str(step.name)[5:]
|
762 | 762 | stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
|
| 763 | + provstep_items: _attributes_type = { |
| 764 | + PROV_TYPE: WFDESC["Process"], |
| 765 | + "prov:type": PROV["Plan"], |
| 766 | + } |
| 767 | + if "doc" in step.tool: |
| 768 | + provstep_items["schema:description"] = step.tool["doc"] |
| 769 | + if "label" in step.tool: |
| 770 | + provstep_items["schema:name"] = step.tool["label"] |
763 | 771 | provstep = self.document.entity(
|
764 | 772 | stepname,
|
765 |
| - {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, |
| 773 | + provstep_items, |
766 | 774 | )
|
767 | 775 | self.document.entity(
|
768 | 776 | "wf:main",
|
|
0 commit comments