Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate input annotations to primary.cwlprov files #1678

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion cwltool/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def copy_job_order(
return customised_job



class ProvenanceProfile:
"""
Provenance profile.
Expand Down Expand Up @@ -296,6 +297,24 @@ def record_process_end(
self.generate_output_prov(outputs, process_run_id, process_name)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)

def _add_nested_annotations(self, annotation_key, annotation_value, e: ProvEntity) -> ProvEntity:
"""Propagate input data annotations to provenance."""
# Change https:// into http:// first
schema2_uri = "https://schema.org/"
if schema2_uri in annotation_key:
annotation_key = SCHEMA[annotation_key.replace(schema2_uri, '')].uri

if not isinstance(annotation_value, (MutableSequence, MutableMapping)):
e.add_attributes({annotation_key: str(annotation_value)})
else:
nested_id = uuid.uuid4().urn
nested_entity = self.document.entity(nested_id)
e.add_attributes({annotation_key: nested_entity.identifier})
for nested_key in annotation_value:
nested_value = annotation_value[nested_key]
RenskeW marked this conversation as resolved.
Show resolved Hide resolved
nested_entity = self._add_nested_annotations(nested_key, nested_value, nested_entity)
return e

def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
if value["class"] != "File":
raise ValueError("Must have class:File: %s" % value)
Expand Down Expand Up @@ -350,6 +369,21 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
self.document.specializationOf(file_entity, entity)

# Identify all schema annotations
schema_annotations = dict([(v, value[v]) for v in value.keys() if 'schema.org' in v])
Fixed Show fixed Hide fixed

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
additional_type = schema_annotations[s].split(sep='/')[-1] # find better method?
RenskeW marked this conversation as resolved.
Show resolved Hide resolved
entity.add_attributes( {PROV_TYPE: SCHEMA[additional_type]})
else: # add support for CommentedSeq
entity = self._add_nested_annotations(s, schema_annotations[s], entity)

# Transfer format annotations to provenance:
if "format" in value:
entity.add_attributes({SCHEMA["encodingFormat"]: value["format"]})

# Check for secondaries
for sec in cast(
MutableSequence[CWLObjectType], value.get("secondaryFiles", [])
Expand Down Expand Up @@ -395,6 +429,7 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
(PROV_TYPE, RO["Folder"]),
],
)

# ORE description of ro:Folder, saved separately
coll_b = dir_bundle.entity(
dir_id,
Expand Down Expand Up @@ -454,7 +489,18 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:

coll.add_attributes(coll_attribs)
coll_b.add_attributes(coll_b_attribs)


# Identify all schema annotations
schema_annotations = dict([(v, value[v]) for v in value.keys() if 'schema.org' in v])
Fixed Show fixed Hide fixed

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
additional_type = schema_annotations[s].split(sep='/')[-1] # find better method?
coll.add_attributes( {PROV_TYPE: SCHEMA[additional_type]})
elif "hasPart" not in s:
coll = self._add_nested_annotations(s, schema_annotations[s], coll)

# Also Save ORE Folder as annotation metadata
ore_doc = ProvDocument()
ore_doc.add_namespace(ORE)
Expand Down