|
51 | 51 | )
|
52 | 52 | from .writablebagfile import create_job, write_bag_file # change this later
|
53 | 53 |
|
| 54 | +# from schema_salad.utils import convert_to_dict |
| 55 | + |
| 56 | + |
54 | 57 | if TYPE_CHECKING:
|
55 | 58 | from .ro import ResearchObject
|
56 | 59 |
|
| 60 | +ProvType = Dict[Union[str, Identifier], Any] |
| 61 | + |
57 | 62 |
|
58 | 63 | def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType:
|
59 | 64 | """Create copy of job object for provenance."""
|
@@ -177,14 +182,14 @@ def host_provenance(document: ProvDocument) -> None:
|
177 | 182 | # by a user account, as cwltool is a command line tool
|
178 | 183 | account = self.document.agent(ACCOUNT_UUID)
|
179 | 184 | if self.orcid or self.full_name:
|
180 |
| - person: Dict[Union[str, Identifier], Any] = { |
| 185 | + person: ProvType = { |
181 | 186 | PROV_TYPE: PROV["Person"],
|
182 | 187 | "prov:type": SCHEMA["Person"],
|
183 | 188 | }
|
184 | 189 | if self.full_name:
|
185 | 190 | person["prov:label"] = self.full_name
|
186 | 191 | person["foaf:name"] = self.full_name
|
187 |
| - person["schema:name"] = self.full_name |
| 192 | + person[SCHEMA["name"]] = self.full_name |
188 | 193 | else:
|
189 | 194 | # TODO: Look up name from ORCID API?
|
190 | 195 | pass
|
@@ -235,13 +240,13 @@ def evaluate(
|
235 | 240 | """Evaluate the nature of job."""
|
236 | 241 | if not hasattr(process, "steps"):
|
237 | 242 | # record provenance of independent commandline tool executions
|
238 |
| - self.prospective_prov(job) |
| 243 | + self.prospective_prov(job, process) |
239 | 244 | customised_job = copy_job_order(job, job_order_object)
|
240 | 245 | self.used_artefacts(customised_job, self.workflow_run_uri)
|
241 | 246 | create_job(research_obj, customised_job)
|
242 | 247 | elif hasattr(job, "workflow"):
|
243 | 248 | # record provenance of workflow executions
|
244 |
| - self.prospective_prov(job) |
| 249 | + self.prospective_prov(job, process) |
245 | 250 | customised_job = copy_job_order(job, job_order_object)
|
246 | 251 | self.used_artefacts(customised_job, self.workflow_run_uri)
|
247 | 252 | # if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place
|
@@ -306,8 +311,7 @@ def _add_nested_annotations(
|
306 | 311 | ) -> ProvEntity:
|
307 | 312 | """Propagate input data annotations to provenance."""
|
308 | 313 | # Change https:// into http:// first
|
309 |
| - schema2_uri = "https://schema.org/" |
310 |
| - if schema2_uri in annotation_key: |
| 314 | + if (schema2_uri := "https://schema.org/") in annotation_key: |
311 | 315 | annotation_key = SCHEMA[annotation_key.replace(schema2_uri, "")].uri
|
312 | 316 |
|
313 | 317 | if not isinstance(annotation_value, (MutableSequence, MutableMapping)):
|
@@ -377,9 +381,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
|
377 | 381 | self.document.specializationOf(file_entity, entity)
|
378 | 382 |
|
379 | 383 | # Identify all schema annotations
|
380 |
| - schema_annotations = dict( |
381 |
| - [(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")] |
382 |
| - ) |
| 384 | + schema_annotations = { |
| 385 | + v: value[v] for v in value.keys() if v.startswith("https://schema.org") |
| 386 | + } |
383 | 387 |
|
384 | 388 | # Transfer SCHEMA annotations to provenance
|
385 | 389 | for s in schema_annotations:
|
@@ -509,9 +513,9 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
|
509 | 513 | coll_b.add_attributes(coll_b_attribs)
|
510 | 514 |
|
511 | 515 | # Identify all schema annotations
|
512 |
| - schema_annotations = dict( |
513 |
| - [(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")] |
514 |
| - ) |
| 516 | + schema_annotations = { |
| 517 | + v: value[v] for v in value.keys() if v.startswith("https://schema.org") |
| 518 | + } |
515 | 519 |
|
516 | 520 | # Transfer SCHEMA annotations to provenance
|
517 | 521 | for s in schema_annotations:
|
@@ -571,7 +575,7 @@ def declare_artefact(self, value: Any) -> ProvEntity:
|
571 | 575 | self.research_object.add_uri(entity.identifier.uri)
|
572 | 576 | return entity
|
573 | 577 |
|
574 |
| - if isinstance(value, (str, str)): |
| 578 | + if isinstance(value, str): |
575 | 579 | (entity, _) = self.declare_string(value)
|
576 | 580 | return entity
|
577 | 581 |
|
@@ -734,35 +738,39 @@ def generate_output_prov(
|
734 | 738 | entity, process_run_id, timestamp, None, {"prov:role": role}
|
735 | 739 | )
|
736 | 740 |
|
737 |
| - def prospective_prov(self, job: JobsType) -> None: |
| 741 | + def prospective_prov(self, job: JobsType, process: Process) -> None: |
738 | 742 | """Create prospective prov recording as wfdesc prov:Plan."""
|
| 743 | + prov_items: ProvType = { |
| 744 | + PROV_TYPE: WFDESC["Workflow"] if isinstance(job, WorkflowJob) else WFDESC["Process"], |
| 745 | + "prov:type": PROV["Plan"], |
| 746 | + "prov:label": "Prospective provenance", |
| 747 | + } |
| 748 | + if "doc" in process.tool: |
| 749 | + prov_items[SCHEMA["description"]] = process.tool["doc"] |
| 750 | + if "label" in process.tool: |
| 751 | + prov_items[SCHEMA["name"]] = process.tool["label"] |
| 752 | + # # TypeError: unhashable type: 'list' |
| 753 | + # if "intent" in process.tool: |
| 754 | + # prov_items[SCHEMA["featureList"]] = convert_to_dict(process.tool["intent"]) |
| 755 | + self.document.entity("wf:main", prov_items) |
739 | 756 | if not isinstance(job, WorkflowJob):
|
740 |
| - # direct command line tool execution |
741 |
| - self.document.entity( |
742 |
| - "wf:main", |
743 |
| - { |
744 |
| - PROV_TYPE: WFDESC["Process"], |
745 |
| - "prov:type": PROV["Plan"], |
746 |
| - "prov:label": "Prospective provenance", |
747 |
| - }, |
748 |
| - ) |
749 | 757 | return
|
750 | 758 |
|
751 |
| - self.document.entity( |
752 |
| - "wf:main", |
753 |
| - { |
754 |
| - PROV_TYPE: WFDESC["Workflow"], |
755 |
| - "prov:type": PROV["Plan"], |
756 |
| - "prov:label": "Prospective provenance", |
757 |
| - }, |
758 |
| - ) |
759 |
| - |
760 | 759 | for step in job.steps:
|
761 | 760 | stepnametemp = "wf:main/" + str(step.name)[5:]
|
762 | 761 | stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
|
| 762 | + provstep_items: ProvType = { |
| 763 | + PROV_TYPE: WFDESC["Process"], |
| 764 | + "prov:type": PROV["Plan"], |
| 765 | + } |
| 766 | + # WorkflowStep level annotations |
| 767 | + if "doc" in step.tool: |
| 768 | + provstep_items[SCHEMA["description"]] = step.tool["doc"] |
| 769 | + if "label" in step.tool: |
| 770 | + provstep_items[SCHEMA["name"]] = step.tool["label"] |
763 | 771 | provstep = self.document.entity(
|
764 | 772 | stepname,
|
765 |
| - {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, |
| 773 | + provstep_items, |
766 | 774 | )
|
767 | 775 | self.document.entity(
|
768 | 776 | "wf:main",
|
|
0 commit comments