From 303a8ac02ba6f4b11938dd7370bc6833a09e2eca Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 25 Oct 2023 10:12:58 -0700 Subject: [PATCH] finish Signed-off-by: Ayush Kamat --- latch_cli/snakemake/workflow.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/latch_cli/snakemake/workflow.py b/latch_cli/snakemake/workflow.py index fb4f0d32..37772cb3 100644 --- a/latch_cli/snakemake/workflow.py +++ b/latch_cli/snakemake/workflow.py @@ -163,10 +163,7 @@ def snakemake_dag_to_interface( for x in job.input: if x not in dep_outputs: param = variable_name_for_value(x, job.input) - inputs[param] = ( - LatchFile, - None, - ) + inputs[param] = (LatchFile, None) remote_path = ( Path("/.snakemake_latch") / "workflows" / wf_name / "inputs" / x @@ -744,6 +741,19 @@ def compile(self, **kwargs): else: python_outputs[param] = LatchFile + for x in job.log: + assert isinstance(x, SnakemakeInputVal) + + if x in target_files: + is_target = True + param = variable_name_for_value(x, job.log) + target_file_for_output_param[param] = x + + if x.is_directory: + python_outputs[param] = LatchDir + else: + python_outputs[param] = LatchFile + dep_outputs: Dict[SnakemakeInputVal, JobOutputInfo] = {} for dep, dep_files in self._dag.dependencies[job].items(): for o in dep.output: @@ -758,6 +768,16 @@ def compile(self, **kwargs): type_=LatchDir if o.is_directory else LatchFile, ) + for o in dep.log: + if o in dep_files: + assert isinstance(o, SnakemakeInputVal) + + dep_outputs[o] = JobOutputInfo( + jobid=dep.jobid, + output_param_name=variable_name_for_value(o, dep.log), + type_=LatchDir if o.is_directory else LatchFile, + ) + python_inputs: Dict[str, Union[Type[LatchFile], Type[LatchDir]]] = {} promise_map: Dict[str, JobOutputInfo] = {} for x in job.input: