diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 176aa5e50b..3064641519 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -51,7 +51,8 @@ TypeVar, Union, cast, - Literal, Protocol, + Literal, + Protocol, ) from urllib.parse import quote, unquote, urlparse, urlsplit @@ -66,6 +67,7 @@ import cwltool.main import cwltool.resolver import schema_salad.ref_resolver + # This is also in configargparse but MyPy doesn't know it from argparse import RawDescriptionHelpFormatter from configargparse import ArgParser, Namespace @@ -132,6 +134,8 @@ unwrap, ImportsJob, get_file_sizes, + FileMetadata, + WorkerImportJob, ) from toil.jobStores.abstractJobStore import ( AbstractJobStore, @@ -1893,12 +1897,20 @@ def extract_file_uri_once( return rp return None + V = TypeVar("V", covariant=True) + class VisitFunc(Protocol[V]): - def __call__(self, fileindex: dict[str, str], existing: dict[str, str], - file_metadata: CWLObjectType, mark_broken: bool, - skip_remote: bool) -> V: ... + def __call__( + self, + fileindex: dict[str, str], + existing: dict[str, str], + file_metadata: CWLObjectType, + mark_broken: bool, + skip_remote: bool, + ) -> V: ... + def visit_files( func: VisitFunc[V], @@ -2188,7 +2200,9 @@ def extract_and_convert_file_to_toil_uri( Unless skip_remote is set, also run on remote files and sets their locations to toil URIs as well. """ - location = extract_file_uri_once(fileindex, existing, file_metadata, mark_broken, skip_remote) + location = extract_file_uri_once( + fileindex, existing, file_metadata, mark_broken, skip_remote + ) if location is not None: file_metadata["location"] = convert_file_uri_to_toil_uri( convertfunc, fileindex, existing, location @@ -2896,7 +2910,9 @@ def file_import_function(url: str, log_level: int = logging.DEBUG) -> FileID: logger.log(log_level, "Loading %s...", url) return writeGlobalFileWrapper(file_store, url) - file_upload_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function) + file_upload_function = functools.partial( + extract_and_convert_file_to_toil_uri, file_import_function + ) # Upload all the Files and set their and the Directories' locations, if # needed. @@ -2948,8 +2964,41 @@ def makeRootJob( :return: """ if options.run_imports_on_workers: + filenames = extract_workflow_inputs(options, initialized_job_order, tool) + metadata = get_file_sizes( + filenames, toil._jobStore, include_remote_files=options.reference_inputs + ) + + # Mapping of files to metadata for files that will be imported on the worker + # This will consist of files that we were able to get a file size for + worker_metadata: dict[str, FileMetadata] = dict() + # Mapping of files to metadata for files that will be imported on the leader + # This will consist of files that we were not able to get a file size for + leader_metadata = dict() + for filename, file_data in metadata.items(): + if file_data.size is None: + leader_metadata[filename] = file_data + else: + worker_metadata[filename] = file_data + + # import the files for the leader first + path_to_fileid = WorkerImportJob.import_files( + list(leader_metadata.keys()), toil._jobStore + ) + + # then install the imported files before importing the other files + # this way the control flow can fall from the leader to workers + tool, initialized_job_order = CWLInstallImportsJob.fill_in_files( + initialized_job_order, + tool, + path_to_fileid, + options.basedir, + options.reference_inputs, + options.bypass_file_store, + ) + import_job = CWLImportWrapper( - initialized_job_order, tool, runtime_context, options + initialized_job_order, tool, runtime_context, worker_metadata, options ) return import_job else: @@ -3538,22 +3587,28 @@ def __init__( self.bypass_file_store = bypass_file_store self.import_data = import_data - def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]: + @staticmethod + def fill_in_files( + initialized_job_order: CWLObjectType, + tool: Process, + candidate_to_fileid: dict[str, FileID], + basedir: str, + skip_remote: bool, + bypass_file_store: bool, + ) -> tuple[Process, CWLObjectType]: """ - Convert the filenames in the workflow inputs into the URIs - :return: Promise of transformed workflow inputs. A tuple of the job order and process + Given a mapping of filenames to Toil file IDs, replace the filename with the file IDs throughout the CWL object. """ - candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data) - - initialized_job_order = unwrap(self.initialized_job_order) - tool = unwrap(self.tool) - - def convert_file(filename: str) -> FileID: - fileid = candidate_to_fileid[filename] - return fileid - - file_convert_function = functools.partial(extract_and_convert_file_to_toil_uri, convert_file) - fs_access = ToilFsAccess(self.basedir) + def fill_in_file(filename: str) -> FileID: + """ + Return the file name's associated Toil file ID + """ + return candidate_to_fileid[filename] + + file_convert_function = functools.partial( + extract_and_convert_file_to_toil_uri, fill_in_file + ) + fs_access = ToilFsAccess(basedir) fileindex: dict[str, str] = {} existing: dict[str, str] = {} visit_files( @@ -3563,8 +3618,8 @@ def convert_file(filename: str) -> FileID: existing, initialized_job_order, mark_broken=True, - skip_remote=self.skip_remote, - bypass_file_store=self.bypass_file_store, + skip_remote=skip_remote, + bypass_file_store=bypass_file_store, ) visitSteps( tool, @@ -3575,8 +3630,8 @@ def convert_file(filename: str) -> FileID: fileindex, existing, mark_broken=True, - skip_remote=self.skip_remote, - bypass_file_store=self.bypass_file_store, + skip_remote=skip_remote, + bypass_file_store=bypass_file_store, ), ) @@ -3588,9 +3643,26 @@ def convert_file(filename: str) -> FileID: # This will properly make them cause an error later if they # were required. rm_unprocessed_secondary_files(param_value) - return tool, initialized_job_order + def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]: + """ + Convert the filenames in the workflow inputs into the URIs + :return: Promise of transformed workflow inputs. A tuple of the job order and process + """ + candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data) + + initialized_job_order = unwrap(self.initialized_job_order) + tool = unwrap(self.tool) + return CWLInstallImportsJob.fill_in_files( + initialized_job_order, + tool, + candidate_to_fileid, + self.basedir, + self.skip_remote, + self.bypass_file_store, + ) + class CWLImportWrapper(CWLNamedJob): """ @@ -3605,6 +3677,7 @@ def __init__( initialized_job_order: CWLObjectType, tool: Process, runtime_context: cwltool.context.RuntimeContext, + file_to_data: dict[str, FileMetadata], options: Namespace, ): super().__init__(local=False, disk=options.import_workers_threshold) @@ -3612,15 +3685,14 @@ def __init__( self.tool = tool self.options = options self.runtime_context = runtime_context + self.file_to_data = file_to_data def run(self, file_store: AbstractFileStore) -> Any: - filenames = extract_workflow_inputs( - self.options, self.initialized_job_order, self.tool + imports_job = ImportsJob( + self.file_to_data, + self.options.import_workers_threshold, + self.options.import_workers_disk, ) - file_to_data = get_file_sizes( - filenames, file_store.jobStore, include_remote_files=self.options.reference_inputs - ) - imports_job = ImportsJob(file_to_data, self.options.import_workers_threshold, self.options.import_workers_disk) self.addChild(imports_job) install_imports_job = CWLInstallImportsJob( initialized_job_order=self.initialized_job_order, @@ -3634,7 +3706,9 @@ def run(self, file_store: AbstractFileStore) -> Any: imports_job.addFollowOn(install_imports_job) start_job = CWLStartJob( - install_imports_job.rv(0), install_imports_job.rv(1), runtime_context=self.runtime_context + install_imports_job.rv(0), + install_imports_job.rv(1), + runtime_context=self.runtime_context, ) self.addChild(start_job) install_imports_job.addFollowOn(start_job) @@ -3645,7 +3719,7 @@ def run(self, file_store: AbstractFileStore) -> Any: class CWLStartJob(CWLNamedJob): """ Job responsible for starting the CWL workflow. - + Takes in the workflow/tool and inputs after all files are imported and creates jobs to run those workflows. """ @@ -3744,7 +3818,10 @@ def import_workflow_inputs( def file_import_function(url: str) -> FileID: logger.log(log_level, "Loading %s...", url) return jobstore.import_file(url, symlink=True) - import_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function) + + import_function = functools.partial( + extract_and_convert_file_to_toil_uri, file_import_function + ) # Import all the input files, some of which may be missing optional # files. logger.info("Importing input files...") @@ -3763,8 +3840,13 @@ def file_import_function(url: str) -> FileID: # Make another function for importing tool files. This one doesn't allow # symlinking, since the tools might be coming from storage not accessible # to all nodes. - tool_import_function = functools.partial(extract_and_convert_file_to_toil_uri, - cast(Callable[[str], FileID], functools.partial(jobstore.import_file, symlink=False))) + tool_import_function = functools.partial( + extract_and_convert_file_to_toil_uri, + cast( + Callable[[str], FileID], + functools.partial(jobstore.import_file, symlink=False), + ), + ) # Import all the files associated with tools (binaries, etc.). # Not sure why you would have an optional secondary file here, but @@ -3795,6 +3877,8 @@ def file_import_function(url: str) -> FileID: T = TypeVar("T") + + def visitSteps( cmdline_tool: Process, op: Callable[[CommentedMap], list[T]], @@ -3818,12 +3902,10 @@ def visitSteps( # if they bothered to run the Process __init__. return op(cmdline_tool.tool) raise RuntimeError( - f"Unsupported type encountered in workflow " - f"traversal: {type(cmdline_tool)}" + f"Unsupported type encountered in workflow " f"traversal: {type(cmdline_tool)}" ) - def rm_unprocessed_secondary_files(job_params: Any) -> None: if isinstance(job_params, list): for j in job_params: @@ -4081,7 +4163,8 @@ def get_options(args: list[str]) -> Namespace: parser = ArgParser( allow_abbrev=False, usage="%(prog)s [options] WORKFLOW [INFILE] [WF_OPTIONS...]", - description=textwrap.dedent(""" + description=textwrap.dedent( + """ positional arguments: WORKFLOW CWL file to run. @@ -4096,10 +4179,11 @@ def get_options(args: list[str]) -> Namespace: If an input has the same name as a Toil option, pass '--' before it. - """), + """ + ), formatter_class=RawDescriptionHelpFormatter, ) - + addOptions(parser, jobstore_as_flag=True, cwl=True) options: Namespace options, extra = parser.parse_known_args(args) @@ -4264,14 +4348,12 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: options.tool_help = None options.debug = options.logLevel == "DEBUG" - job_order_object, options.basedir, jobloader = ( - cwltool.main.load_job_order( - options, - sys.stdin, - loading_context.fetcher_constructor, - loading_context.overrides_list, - tool_file_uri, - ) + job_order_object, options.basedir, jobloader = cwltool.main.load_job_order( + options, + sys.stdin, + loading_context.fetcher_constructor, + loading_context.overrides_list, + tool_file_uri, ) if options.overrides: loading_context.overrides_list.extend( @@ -4332,7 +4414,8 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: if err.code == 2: # raised by argparse's parse_args() function print( "\nIf both a CWL file and an input object (YAML/JSON) file were " - "provided, the problem may be the argument order." + usage_message, + "provided, the problem may be the argument order." + + usage_message, file=sys.stderr, ) raise @@ -4345,9 +4428,9 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: shortname(inp["id"]) in initialized_job_order and inp["type"] == "File" ): - cast( - CWLObjectType, initialized_job_order[shortname(inp["id"])] - )["streamable"] = inp.get("streamable", False) + cast(CWLObjectType, initialized_job_order[shortname(inp["id"])])[ + "streamable" + ] = inp.get("streamable", False) # TODO also for nested types that contain streamable Files runtime_context.use_container = not options.no_container diff --git a/src/toil/job.py b/src/toil/job.py index 3f0226899b..27600ddd7e 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -3902,7 +3902,6 @@ def get_filename_size(filename: str) -> FileMetadata: # Now we know this exists, so pass it through # Get filesizes filesize = file_source.get_size(candidate_uri) - except UnimplementedURLException as e: # We can't find anything that can even support this URL scheme. # Report to the user, they are probably missing an extra. @@ -3913,8 +3912,12 @@ def get_filename_size(filename: str) -> FileMetadata: logger.warning( "Checked URL %s but got HTTP status %s", candidate_uri, e.code ) - # Try the next location. - continue + if e.code == 405: + # 405 Method not allowed, maybe HEAD requests are not supported + filesize = None + else: + # Try the next location. + continue except FileNotFoundError: # Wasn't found there continue @@ -3992,6 +3995,7 @@ class WorkerImportJob(Job): def __init__( self, filenames: List[str], + local: bool = False, **kwargs: Any ): """ @@ -4000,7 +4004,7 @@ def __init__( :param kwargs: args for the superclass """ self.filenames = filenames - super().__init__(local=False, **kwargs) + super().__init__(local=local, **kwargs) @staticmethod def import_files(