diff --git a/cmflib/cli/parser.py b/cmflib/cli/parser.py index dc59407d..80a3265e 100644 --- a/cmflib/cli/parser.py +++ b/cmflib/cli/parser.py @@ -17,11 +17,11 @@ """Main parser for the cmf cli""" import argparse -from cmflib.commands import artifact, metadata, init, execution, pipeline +from cmflib.commands import artifact, metadata, init, execution, pipeline, repo from cmflib.cli import CmfParserError -COMMANDS = [artifact, metadata, init, execution, pipeline] +COMMANDS = [artifact, metadata, init, execution, pipeline, repo] def _find_parser(parser, cmd_cls): diff --git a/cmflib/cmf.py b/cmflib/cmf.py index be181c4c..19100d55 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -71,6 +71,8 @@ _artifact_list, _pipeline_list, _execution_list, + _repo_push, + _repo_pull, ) class Cmf: @@ -1996,42 +1998,42 @@ def commit_existing(self, uri: str, props: t.Optional[t.Dict] = None, custom_pro # print(last) # os.symlink(str(index), slicedir + "/ " + last) -def metadata_push(pipeline_name: str, filepath = "./mlmd", tensorboard_path: str = "", execution_id: str = ""): +def metadata_push(pipeline_name: str, filepath = "./mlmd", tensorboard_path: str = "", execution_uuid: str = ""): """ Pushes MLMD file to CMF-server. Example: ```python - result = metadata_push("example_pipeline", "mlmd_file", "3") + result = metadata_push("example_pipeline", "mlmd_file", "eg_execution_uuid") ``` Args: pipeline_name: Name of the pipeline. filepath: Path to the MLMD file. - execution_id: Optional execution ID. + execution_uuid: Optional execution UUID. tensorboard_path: Path to tensorboard logs. Returns: Response output from the _metadata_push function. """ # Required arguments: pipeline_name - # Optional arguments: Execution_ID, filepath (mlmd file path, tensorboard_path - output = _metadata_push(pipeline_name, filepath, execution_id, tensorboard_path) + # Optional arguments: Execution_UUID, filepath (mlmd file path), tensorboard_path + output = _metadata_push(pipeline_name, filepath, execution_uuid, tensorboard_path) return output -def metadata_pull(pipeline_name: str, filepath = "./mlmd", execution_id: str = ""): +def metadata_pull(pipeline_name: str, filepath = "./mlmd", execution_uuid: str = ""): """ Pulls MLMD file from CMF-server. Example: ```python - result = metadata_pull("example_pipeline", "./mlmd_directory", "execution_123") + result = metadata_pull("example_pipeline", "./mlmd_directory", "eg_execution_uuid") ``` Args: pipeline_name: Name of the pipeline. filepath: File path to store the MLMD file. - execution_id: Optional execution ID. + execution_uuid: Optional execution UUID. Returns: Message from the _metadata_pull function. """ # Required arguments: pipeline_name - #Optional arguments: Execution_ID, filepath(file path to store mlmd file) - output = _metadata_pull(pipeline_name, filepath, execution_id) + #Optional arguments: Execution_UUID, filepath(file path to store mlmd file) + output = _metadata_pull(pipeline_name, filepath, execution_uuid) return output def metadata_export(pipeline_name: str, jsonfilepath: str = "", filepath = "./mlmd"): @@ -2331,23 +2333,23 @@ def pipeline_list(filepath = "./mlmd"): return output -def execution_list(pipeline_name: str, filepath = "./mlmd", execution_id: str = ""): +def execution_list(pipeline_name: str, filepath = "./mlmd", execution_uuid: str = ""): """Displays executions from the MLMD file with a few properties in a 7-column table, limited to 20 records per page. Example: ```python - result = _execution_list("example_pipeline", "./mlmd_directory", "example_execution_id") + result = _execution_list("example_pipeline", "./mlmd_directory", "example_execution_uuid") ``` Args: pipeline_name: Name of the pipeline. filepath: Path to store the mlmd file. - execution_id: Executions for particular execution id. + execution_uuid: Executions for particular execution uuid. Returns: Output from the _execution_list function. """ # Required arguments: pipeline_name - # Optional arguments: filepath( path to store mlmd file), execution_id - output = _execution_list(pipeline_name, filepath, execution_id) + # Optional arguments: filepath( path to store mlmd file), execution_uuid + output = _execution_list(pipeline_name, filepath, execution_uuid) return output @@ -2369,3 +2371,44 @@ def artifact_list(pipeline_name: str, filepath = "./mlmd", artifact_name: str = # Optional arguments: filepath( path to store mlmd file), artifact_name output = _artifact_list(pipeline_name, filepath, artifact_name) return output + + +def repo_push(pipeline_name: str, filepath = "./mlmd", tensorboard_path: str = "", execution_uuid: str = ""): + """ Push artifacts, metadata files, and source code to the user's artifact repository, cmf-server, and git respectively. + Example: + ```python + result = _repo_push("example_pipeline", "./mlmd_directory", "example_execution_uuid", "./tensorboard_path") + ``` + Args: + pipeline_name: Name of the pipeline. + filepath: Path to store the mlmd file. + execution_uuid: Executions for particular execution uuid. + tensorboard_path: Path to tensorboard logs. + Returns: + Output from the _repo_push function. + """ + + # Required arguments: pipeline_name + # Optional arguments: filepath, execution_uuid, tensorboard_path + output = _repo_push(pipeline_name, filepath, execution_uuid, tensorboard_path) + return output + + +def repo_pull(pipeline_name: str, filepath = "./mlmd", execution_uuid: str = ""): + """ Pull artifacts, metadata files, and source code from the user's artifact repository, cmf-server, and git respectively. + Example: + ```python + result = _repo_pull("example_pipeline", "./mlmd_directory", "example_execution_uuid") + ``` + Args: + pipeline_name: Name of the pipeline. + filepath: Path to store the mlmd file. + execution_uuid: Executions for particular execution uuid. + Returns: + Output from the _repo_pull function. + """ + + # Required arguments: pipeline_name + # Optional arguments: filepath, execution_uuid + output = _repo_pull(pipeline_name, filepath, execution_uuid) + return output \ No newline at end of file diff --git a/cmflib/cmf_commands_wrapper.py b/cmflib/cmf_commands_wrapper.py index 4e2c5325..c656f626 100644 --- a/cmflib/cmf_commands_wrapper.py +++ b/cmflib/cmf_commands_wrapper.py @@ -17,7 +17,7 @@ from cmflib import cli -def _metadata_push(pipeline_name, file_name, execution_id, tensorboard): +def _metadata_push(pipeline_name, file_name, execution_uuid, tensorboard): cli_args = cli.parse_args( [ "metadata", @@ -27,7 +27,7 @@ def _metadata_push(pipeline_name, file_name, execution_id, tensorboard): "-f", file_name, "-e", - execution_id, + execution_uuid, "-t", tensorboard ] @@ -37,7 +37,7 @@ def _metadata_push(pipeline_name, file_name, execution_id, tensorboard): print(msg) return msg -def _metadata_pull(pipeline_name, file_name, execution_id): +def _metadata_pull(pipeline_name, file_name, execution_uuid): cli_args = cli.parse_args( [ "metadata", @@ -47,7 +47,7 @@ def _metadata_pull(pipeline_name, file_name, execution_id): "-f", file_name, "-e", - execution_id, + execution_uuid, ] ) cmd = cli_args.func(cli_args) @@ -316,7 +316,7 @@ def _pipeline_list(file_name): print(msg) return msg -def _execution_list(pipeline_name, file_name, execution_id): +def _execution_list(pipeline_name, file_name, execution_uuid): cli_args = cli.parse_args( [ "execution", @@ -326,7 +326,45 @@ def _execution_list(pipeline_name, file_name, execution_id): "-f", file_name, "-e", - execution_id + execution_uuid + ] + ) + cmd = cli_args.func(cli_args) + msg = cmd.do_run() + print(msg) + return msg + +def _repo_push(pipeline_name, file_name, tensorboard_path, execution_uuid): + cli_args = cli.parse_args( + [ + "repo", + "push", + "-p", + pipeline_name, + "-f", + file_name, + "-e", + execution_uuid, + "-t", + tensorboard_path + ] + ) + cmd = cli_args.func(cli_args) + msg = cmd.do_run() + print(msg) + return msg + +def _repo_pull(pipeline_name, file_name, execution_uuid): + cli_args = cli.parse_args( + [ + "repo", + "pull", + "-p", + pipeline_name, + "-f", + file_name, + "-e", + execution_uuid ] ) cmd = cli_args.func(cli_args) diff --git a/cmflib/cmf_exception_handling.py b/cmflib/cmf_exception_handling.py index 5cc80678..12b63013 100644 --- a/cmflib/cmf_exception_handling.py +++ b/cmflib/cmf_exception_handling.py @@ -201,13 +201,13 @@ def handle(self): return f"ERROR: Executions not found." -class ExecutionIDNotFound(CmfFailure): - def __init__(self, exec_id, return_code=105): - self.exec_id = exec_id +class ExecutionUUIDNotFound(CmfFailure): + def __init__(self, exec_uuid, return_code=105): + self.exec_uuid = exec_uuid super().__init__(return_code) def handle(self): - return f"ERROR: Execution id {self.exec_id} is not present in mlmd." + return f"ERROR: Execution uuid {self.exec_uuid} is not present in mlmd." class ArtifactNotFound(CmfFailure): diff --git a/cmflib/cmf_merger.py b/cmflib/cmf_merger.py index 82c6082d..978ff8e4 100644 --- a/cmflib/cmf_merger.py +++ b/cmflib/cmf_merger.py @@ -23,7 +23,7 @@ from ml_metadata.proto import metadata_store_pb2 as mlpb from typing import Union -def parse_json_to_mlmd(mlmd_json, path_to_store: str, cmd: str, exec_id: Union[str, int]) -> Union[str, None]: +def parse_json_to_mlmd(mlmd_json, path_to_store: str, cmd: str, exec_uuid: Union[str, str]) -> Union[str, None]: try: mlmd_data = json.loads(mlmd_json) pipelines = mlmd_data["Pipeline"] @@ -52,17 +52,16 @@ def parse_json_to_mlmd(mlmd_json, path_to_store: str, cmd: str, exec_id: Union[s graph=graph, is_server=True) for stage in data["Pipeline"][0]["stages"]: # Iterates over all the stages - if exec_id is None: #if exec_id is None we pass all the executions. + if exec_uuid is None: #if exec_uuid is None we pass all the executions. list_executions = [execution for execution in stage["executions"]] - elif exec_id is not None: # elif exec_id is not None, we pass executions for that specific id. + elif exec_uuid is not None: # elif exec_uuid is not None, we pass executions for that specific uuid. list_executions = [ execution for execution in stage["executions"] - if execution["id"] == int(exec_id) + if exec_uuid in execution['properties']["Execution_uuid"].split(",") ] else: - return "Invalid execution id given." - + return "Invalid execution uuid given." for execution in list_executions: # Iterates over all the executions try: _ = cmf_class.merge_created_context( diff --git a/cmflib/cmfquery.py b/cmflib/cmfquery.py index 47d2ac9f..68f613a8 100644 --- a/cmflib/cmfquery.py +++ b/cmflib/cmfquery.py @@ -244,7 +244,7 @@ def _get_executions(self, stage_id: int, execution_id: t.Optional[int] = None) - Args: stage_id: Stage identifier. - execution_id: If not None, return only execution with this ID. + execution_id: If not None, return execution with this ID. Returns: List of executions matching input parameters. """ @@ -889,17 +889,14 @@ def get_one_hop_parent_artifacts_with_id(self, artifact_id: int) -> pd.DataFrame ) return df - def dumptojson(self, pipeline_name: str, exec_id: t.Optional[int] = None) -> t.Optional[str]: + def dumptojson(self, pipeline_name: str, exec_uuid: t.Optional[str] = None) -> t.Optional[str]: """Return JSON-parsable string containing details about the given pipeline. Args: pipeline_name: Name of an AI pipelines. - exec_id: Optional stage execution ID - filter stages by this execution ID. + exec_uuid: Optional stage execution_uuid - filter stages by this execution_uuid. Returns: Pipeline in JSON format. """ - if exec_id is not None: - exec_id = int(exec_id) - def _get_node_attributes(_node: t.Union[mlpb.Context, mlpb.Execution, mlpb.Event], _attrs: t.Dict) -> t.Dict: for attr in CONTEXT_LIST: #Artifacts getattr call on Type was giving empty string, which was overwriting @@ -921,7 +918,7 @@ def _get_node_attributes(_node: t.Union[mlpb.Context, mlpb.Execution, mlpb.Event pipeline_attrs = _get_node_attributes(pipeline, {"stages": []}) for stage in self._get_stages(pipeline.id): stage_attrs = _get_node_attributes(stage, {"executions": []}) - for execution in self._get_executions(stage.id, execution_id=exec_id): + for execution in self.get_all_executions_by_uuid(stage.id, execution_uuid=exec_uuid): # name will be an empty string for executions that are created with # create new execution as true(default) # In other words name property will there only for execution @@ -982,6 +979,24 @@ def get_all_executions_for_artifact_id(self, artifact_id: int) -> pd.DataFrame: except: return df return df + + def get_all_executions_by_uuid(self, stage_id: int, execution_uuid: t.Optional[str] = None) -> t.List[mlpb.Execution]: + """Return executions of the given stage. + Args: + stage_id: Stage identifier. + execution_uuid: If not None, return execution with this uuid. + Returns: + List of executions matching input parameters. + """ + executions: t.List[mlpb.Execution] = self.store.get_executions_by_context(stage_id) + if execution_uuid is None: + return executions + executions_with_uuid = [] + for execution in executions: + exec_uuid_list = execution.properties['Execution_uuid'].string_value.split(",") + if execution_uuid in exec_uuid_list: + executions_with_uuid.append(execution) + return executions_with_uuid """def materialize(self, artifact_name:str): artifacts = self.store.get_artifacts() diff --git a/cmflib/commands/artifact/list.py b/cmflib/commands/artifact/list.py index 9670d040..8f1b5e75 100644 --- a/cmflib/commands/artifact/list.py +++ b/cmflib/commands/artifact/list.py @@ -135,16 +135,23 @@ def search_artifact(self, df: pd.DataFrame) -> Union[int, List[int]]: return -1 def run(self): - + cmd_args = { + "file_name": self.args.file_name, + "pipeline_name": self.args.pipeline_name, + "artifact_name": self.args.artifact_name + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("-"+arg_name[0])) + # default path for mlmd file name mlmd_file_name = "./mlmd" current_directory = os.getcwd() if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). mlmd_file_name = "./mlmd" # Default path for mlmd file name. - elif len(self.args.file_name) > 1: # If the user provided more than one file name. - raise DuplicateArgumentNotAllowed("file_name", "-f") - elif not self.args.file_name[0]: # self.args.file_name[0] is an empty string (""). - raise MissingArgument("file name") else: mlmd_file_name = self.args.file_name[0].strip() if mlmd_file_name == "mlmd": @@ -152,17 +159,12 @@ def run(self): current_directory = os.path.dirname(mlmd_file_name) if not os.path.exists(mlmd_file_name): raise FileNotFound(mlmd_file_name, current_directory) + # Creating cmfquery object. query = cmfquery.CmfQuery(mlmd_file_name) # Check if pipeline exists in mlmd. - if self.args.pipeline_name is not None and len(self.args.pipeline_name) > 1: - raise DuplicateArgumentNotAllowed("pipeline_name", "-p") - elif not self.args.pipeline_name[0]: # self.args.pipeline_name[0] is an empty string (""). - raise MissingArgument("pipeline name") - else: - pipeline_name = self.args.pipeline_name[0] - + pipeline_name = self.args.pipeline_name[0] df = query.get_all_artifacts_by_context(pipeline_name) if df.empty: @@ -170,10 +172,6 @@ def run(self): else: if not self.args.artifact_name: # If self.args.artifact_name is None or an empty list ([]). pass - elif len(self.args.artifact_name) > 1: # If the user provided more than one artifact_name. - raise DuplicateArgumentNotAllowed("artifact_name", "-a") - elif not self.args.artifact_name[0]: # self.args.artifact_name[0] is an empty string (""). - raise MissingArgument("artifact name") else: artifact_ids = self.search_artifact(df) if(artifact_ids != -1): @@ -223,7 +221,7 @@ def run(self): break return MsgSuccess(msg_str = "End of records..") else: - raise ArtifactNotFound(self.args.artifact_name) + raise ArtifactNotFound(self.args.artifact_name[0]) df = self.convert_to_datetime(df, "create_time_since_epoch") self.display_table(df) diff --git a/cmflib/commands/artifact/pull.py b/cmflib/commands/artifact/pull.py index 8305be1e..c5babd10 100644 --- a/cmflib/commands/artifact/pull.py +++ b/cmflib/commands/artifact/pull.py @@ -46,7 +46,6 @@ from cmflib.cmf_exception_handling import CmfNotConfigured class CmdArtifactPull(CmdBase): - def split_url_pipeline(self, url: str, pipeline_name: str): # This function takes url and pipeline_name as a input parameter # return string which contains the artifact repo path of the artifact @@ -81,14 +80,11 @@ def extract_repo_args(self, type: str, name: str, url: str, current_directory: s # information from the user-supplied arguments. # url = Test-env:/home/user/local-storage/files/md5/06/d100ff3e04e2c87bf20f0feacc9034, # Second-env:/home/user/local-storage/files/md5/06/d100ff3e04e2c" - # s_url = Url without pipeline name - s_url = self.split_url_pipeline(url, self.args.pipeline_name) - + s_url = self.split_url_pipeline(url, self.args.pipeline_name[0]) # got url in the form of /home/user/local-storage/files/md5/06/d100ff3e04e2c # spliting url using '/' delimiter token = s_url.split("/") - # name = artifacts/model/model.pkl name = name.split(":")[0] if type == "minio": @@ -116,15 +112,12 @@ def extract_repo_args(self, type: str, name: str, url: str, current_directory: s elif type == "local": token_length = len(token) download_loc = current_directory + "/" + name - # local artifact repo path = local-storage/files/md5/23/69v2uu3jeejjeiw. # token is a list = ['local-storage', 'files', 'md5', '23', '69v2uu3jeejjeiw'] # get last 4 element inside token token = token[(token_length-4):] - # join last 4 token using '/' delimiter current_dvc_loc = "/".join(token) - return current_dvc_loc, download_loc elif type == "ssh": @@ -160,39 +153,73 @@ def extract_repo_args(self, type: str, name: str, url: str, current_directory: s return "", "", "" def search_artifact(self, input_dict, remote): + flag = True + artifact_name = self.args.artifact_name[0] + # This function takes input_dict as input artifact for name, url in input_dict.items(): if not isinstance(url, str): continue - # Splitting the 'name' using ':' as the delimiter and storing the first argument in the 'name' variable. - name = name.split(":")[0] + # Splitting the 'name' using ':' as the delimiter and storing the first argument in the 'file_path' variable. + # eg name = ./a/data.xml.gz:12345abcd --> a/data.xml.gz + file_path = name.split(":")[0] # Splitting the path on '/' to extract the file name, excluding the directory structure. - file_name = name.split('/')[-1] - if file_name == self.args.artifact_name and remote == "osdf": + # eg name = ./a/data.xml.gz --> data.xml.gz + file_name = file_path.split('/')[-1] + + if remote == "osdf": artifact_hash = name = name.split(":")[1] return name, url, artifact_hash - else: - return name, url + elif name == artifact_name or file_path == artifact_name or file_name == artifact_name: + flag = False + break + if flag: + raise ArtifactNotFound(artifact_name) + return name, url def run(self): + output = DvcConfig.get_dvc_config() # pulling dvc config + if type(output) is not dict: + raise CmfNotConfigured(output) + + cmd_args = { + "file_name": self.args.file_name, + "pipeline_name": self.args.pipeline_name, + "artifact_name": self.args.artifact_name + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("-"+arg_name[0])) + # check whether 'mlmd' file exist in current directory # or in the directory provided by user # pipeline_name = self.args.pipeline_name current_directory = os.getcwd() mlmd_file_name = "./mlmd" - if self.args.file_name: - mlmd_file_name = self.args.file_name - if mlmd_file_name == "mlmd": - mlmd_file_name = "./mlmd" - current_directory = os.path.dirname(mlmd_file_name) + if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). + mlmd_file_name = "./mlmd" # Default path for mlmd file name. + else: + mlmd_file_name = self.args.file_name[0].strip() + if "/" not in mlmd_file_name: + mlmd_file_name = "./"+mlmd_file_name + current_directory = os.path.dirname(mlmd_file_name) + + if not self.args.artifact_name: # If self.args.artifact_name[0] is None or an empty list ([]). + pass + if not os.path.exists(mlmd_file_name): #checking if MLMD files exists raise FileNotFound(mlmd_file_name, current_directory) query = cmfquery.CmfQuery(mlmd_file_name) - if not query.get_pipeline_id(self.args.pipeline_name) > 0: #checking if pipeline name exists in mlmd - raise PipelineNotFound(self.args.pipeline_name) + + if not query.get_pipeline_id(self.args.pipeline_name[0]) > 0: #checking if pipeline name exists in mlmd + raise PipelineNotFound(self.args.pipeline_name[0]) + # getting all pipeline stages[i.e Prepare, Featurize, Train and Evaluate] - stages = query.get_pipeline_stages(self.args.pipeline_name) + stages = query.get_pipeline_stages(self.args.pipeline_name[0]) executions = [] identifiers = [] for stage in stages: @@ -220,12 +247,9 @@ def run(self): #print(name_url_dict) # name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81' # name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81,Second-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81') - output = DvcConfig.get_dvc_config() # pulling dvc config - if type(output) is not dict: - raise CmfNotConfigured(output) """ There are multiple scenarios for cmf artifact pull - Code checks if self.args.artifact_name is provided by user or not + Code checks if self.args.artifact_name[0] is provided by user or not under these conditions there are two more conditions 1. if file is not .dir (single file) Download single file @@ -243,43 +267,40 @@ def run(self): # output[0] = artifact_name # output[1] = url # output[2] = hash - if output is None: - raise ArtifactNotFound(self.args.artifact_name) - else: - # Extract repository arguments specific to MinIO. - minio_args = self.extract_repo_args("minio", output[0], output[1], current_directory) + # Extract repository arguments specific to MinIO. + minio_args = self.extract_repo_args("minio", output[0], output[1], current_directory) - # Check if the object name doesn't end with `.dir` (indicating it's a file). - if not minio_args[1].endswith(".dir"): - # Download a single file from MinIO. - object_name, download_loc, download_flag = minio_class_obj.download_file( - current_directory, - minio_args[0], # bucket_name - minio_args[1], # object_name - minio_args[2], # path_name - ) - if download_flag: - # Return success if the file is downloaded successfully. - return ObjectDownloadSuccess(object_name, download_loc) - else: - return ObjectDownloadFailure(object_name) + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not minio_args[1].endswith(".dir"): + # Download a single file from MinIO. + object_name, download_loc, download_flag = minio_class_obj.download_file( + current_directory, + minio_args[0], # bucket_name + minio_args[1], # object_name + minio_args[2], # path_name + ) + if download_flag: + # Return success if the file is downloaded successfully. + return ObjectDownloadSuccess(object_name, download_loc) + else: + raise ObjectDownloadFailure(object_name) + else: + # If object name ends with `.dir`, download multiple files from a directory + # return total_files_in_directory, files_downloaded + total_files_in_directory, dir_files_downloaded, download_flag = minio_class_obj.download_directory( + current_directory, + minio_args[0], # bucket_name + minio_args[1], # object_name + minio_args[2], # path_name + ) + + if download_flag: + # Return success if all files in the directory are downloaded. + return BatchDownloadSuccess(dir_files_downloaded) else: - # If object name ends with `.dir`, download multiple files from a directory - # return total_files_in_directory, files_downloaded - total_files_in_directory, dir_files_downloaded, download_flag = minio_class_obj.download_directory( - current_directory, - minio_args[0], # bucket_name - minio_args[1], # object_name - minio_args[2], # path_name - ) - - if download_flag: - # Return success if all files in the directory are downloaded. - return BatchDownloadSuccess(dir_files_downloaded) - else: - # Calculate the number of files that failed to download. - file_failed_to_download = total_files_in_directory - dir_files_downloaded - return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) + # Calculate the number of files that failed to download. + file_failed_to_download = total_files_in_directory - dir_files_downloaded + raise BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) else: # Handle the case where no specific artifact name is provided. @@ -328,7 +349,7 @@ def run(self): if not files_failed_to_download: return BatchDownloadSuccess(files_downloaded) else: - return BatchDownloadFailure(files_downloaded, files_failed_to_download) + raise BatchDownloadFailure(files_downloaded, files_failed_to_download) elif dvc_config_op["core.remote"] == "local-storage": local_class_obj = local_artifacts.LocalArtifacts(dvc_config_op) @@ -343,36 +364,32 @@ def run(self): output = self.search_artifact(name_url_dict, dvc_config_op["core.remote"]) # output[0] = name # output[1] = url - - if output is None: - raise ArtifactNotFound(self.args.artifact_name) + # Extract repository arguments specific to Local repo. + local_args = self.extract_repo_args("local", output[0], output[1], current_directory) + # local_args [0] = current_dvc_loc + # local_args [1] = download_loc + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not local_args[0].endswith(".dir"): + # Download a single file from Local. + object_name, download_loc, download_flag = local_class_obj.download_file(current_directory, local_args[0], local_args[1]) + if download_flag: + # Return success if the file is downloaded successfully. + return ObjectDownloadSuccess(object_name, download_loc) + else: + raise ObjectDownloadFailure(object_name) + else: - # Extract repository arguments specific to Local repo. - local_args = self.extract_repo_args("local", output[0], output[1], current_directory) - # local_args [0] = current_dvc_loc - # local_args [1] = download_loc - # Check if the object name doesn't end with `.dir` (indicating it's a file). - if not local_args[0].endswith(".dir"): - # Download a single file from Local. - object_name, download_loc, download_flag = local_class_obj.download_file(current_directory, local_args[0], local_args[1]) - if download_flag: - # Return success if the file is downloaded successfully. - return ObjectDownloadSuccess(object_name, download_loc) - else: - return ObjectDownloadFailure(object_name) - + # If object name ends with `.dir`, download multiple files from a directory + # return total_files_in_directory, files_downloaded + total_files_in_directory, dir_files_downloaded, download_flag = local_class_obj.download_directory(current_directory, local_args[0], local_args[1]) + + if download_flag: + # Return success if all files in the directory are downloaded. + return BatchDownloadSuccess(dir_files_downloaded) else: - # If object name ends with `.dir`, download multiple files from a directory - # return total_files_in_directory, files_downloaded - total_files_in_directory, dir_files_downloaded, download_flag = local_class_obj.download_directory(current_directory, local_args[0], local_args[1]) - - if download_flag: - # Return success if all files in the directory are downloaded. - return BatchDownloadSuccess(dir_files_downloaded) - else: - # Calculate the number of files that failed to download. - file_failed_to_download = total_files_in_directory - dir_files_downloaded - return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) + # Calculate the number of files that failed to download. + file_failed_to_download = total_files_in_directory - dir_files_downloaded + raise BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) else: # Handle the case where no specific artifact name is provided. files_downloaded = 0 @@ -418,7 +435,7 @@ def run(self): if not files_failed_to_download: return BatchDownloadSuccess(files_downloaded) else: - return BatchDownloadFailure( + raise BatchDownloadFailure( files_downloaded, files_failed_to_download) elif dvc_config_op["core.remote"] == "ssh-storage": @@ -429,42 +446,39 @@ def run(self): output = self.search_artifact(name_url_dict, dvc_config_op["core.remote"]) # output[0] = name # output[1] = url - if output is None: - raise ArtifactNotFound(self.args.artifact_name) + # Extract repository arguments specific to ssh-remote. + args = self.extract_repo_args("ssh", output[0], output[1], current_directory) + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not args[1].endswith(".dir"): + # Download a single file from ssh-remote. + object_name, download_loc, download_flag = sshremote_class_obj.download_file( + args[0], # host, + current_directory, + args[1], # remote_loc of the artifact + args[2] # name + ) + if download_flag: + # Return success if the file is downloaded successfully. + return ObjectDownloadSuccess(object_name, download_loc) + else: + raise ObjectDownloadFailure(object_name) + else: - # Extract repository arguments specific to ssh-remote. - args = self.extract_repo_args("ssh", output[0], output[1], current_directory) - # Check if the object name doesn't end with `.dir` (indicating it's a file). - if not args[1].endswith(".dir"): - # Download a single file from ssh-remote. - object_name, download_loc, download_flag = sshremote_class_obj.download_file( - args[0], # host, - current_directory, - args[1], # remote_loc of the artifact - args[2] # name + # If object name ends with `.dir`, download multiple files from a directory + # return total_files_in_directory, files_downloaded + total_files_in_directory, dir_files_downloaded, download_flag = sshremote_class_obj.download_directory( + args[0], # host, + current_directory, + args[1], # remote_loc of the artifact + args[2] # name ) - if download_flag: - # Return success if the file is downloaded successfully. - return ObjectDownloadSuccess(object_name, download_loc) - else: - return ObjectDownloadFailure(object_name) - - else: - # If object name ends with `.dir`, download multiple files from a directory - # return total_files_in_directory, files_downloaded - total_files_in_directory, dir_files_downloaded, download_flag = sshremote_class_obj.download_directory( - args[0], # host, - current_directory, - args[1], # remote_loc of the artifact - args[2] # name - ) - if download_flag: - # Return success if all files in the directory are downloaded. - return BatchDownloadSuccess(dir_files_downloaded) - else: - # Calculate the number of files that failed to download. - file_failed_to_download = total_files_in_directory - dir_files_downloaded - return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) + if download_flag: + # Return success if all files in the directory are downloaded. + return BatchDownloadSuccess(dir_files_downloaded) + else: + # Calculate the number of files that failed to download. + file_failed_to_download = total_files_in_directory - dir_files_downloaded + raise BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) else: # Handle the case where no specific artifact name is provided. files_downloaded = 0 @@ -508,7 +522,7 @@ def run(self): if not files_failed_to_download: return BatchDownloadSuccess(files_downloaded) else: - return BatchDownloadFailure(files_downloaded, files_failed_to_download) + raise BatchDownloadFailure(files_downloaded, files_failed_to_download) elif dvc_config_op["core.remote"] == "osdf": #Regenerate Token for OSDF from cmflib.utils.helper_functions import generate_osdf_token @@ -537,25 +551,22 @@ def run(self): # output[0] = name # output[1] = url # output[3]=artifact_hash - if output is None: - raise ArtifactNotFound(self.args.artifact_name) + args = self.extract_repo_args("osdf", output[0], output[1], current_directory) + download_flag, message = osdfremote_class_obj.download_artifacts( + dvc_config_op, + args[0], # s_url of the artifact + cache_path, + current_directory, + args[1], # download_loc of the artifact + args[2], # name of the artifact + output[3] #Artifact Hash + ) + + if download_flag : + status = MsgSuccess(msg_str = message) else: - args = self.extract_repo_args("osdf", output[0], output[1], current_directory) - download_flag, message = osdfremote_class_obj.download_artifacts( - dvc_config_op, - args[0], # s_url of the artifact - cache_path, - current_directory, - args[1], # download_loc of the artifact - args[2], # name of the artifact - output[3] #Artifact Hash - ) - - if download_flag : - status = MsgSuccess(msg_str = message) - else: - status = MsgFailure(msg_str = message) - return status + status = MsgFailure(msg_str = message) + return status else: for name, url in name_url_dict.items(): total_files_count += 1 @@ -593,35 +604,31 @@ def run(self): output = self.search_artifact(name_url_dict, dvc_config_op["core.remote"]) # output[0] = name # output[1] = url - if output is None: - raise ArtifactNotFound(self.args.artifact_name) - else: - args = self.extract_repo_args("amazons3", output[0], output[1], current_directory) - if args[0] and args[1] and args[2]: - if not args[1].endswith(".dir"): - object_name, download_loc, download_flag = amazonS3_class_obj.download_file( - current_directory, - args[0], # bucket_name - args[1], # object_name - args[2], # download_loc - ) - if download_flag: - return ObjectDownloadSuccess(object_name, download_loc) - else: - return ObjectDownloadFailure(object_name) - else: - total_files_in_directory, dir_files_downloaded, download_flag = amazonS3_class_obj.download_directory(current_directory, - args[0], # bucket_name - args[1], # object_name - args[2], # download_loc - ) + args = self.extract_repo_args("amazons3", output[0], output[1], current_directory) + if args[0] and args[1] and args[2]: + if not args[1].endswith(".dir"): + object_name, download_loc, download_flag = amazonS3_class_obj.download_file( + current_directory, + args[0], # bucket_name + args[1], # object_name + args[2], # download_loc + ) if download_flag: - return BatchDownloadSuccess(dir_files_downloaded) - else: - file_failed_to_download = total_files_in_directory - dir_files_downloaded - return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) - - + return ObjectDownloadSuccess(object_name, download_loc) + else: + return ObjectDownloadFailure(object_name) + else: + total_files_in_directory, dir_files_downloaded, download_flag = amazonS3_class_obj.download_directory(current_directory, + args[0], # bucket_name + args[1], # object_name + args[2], # download_loc + ) + if download_flag: + return BatchDownloadSuccess(dir_files_downloaded) + else: + file_failed_to_download = total_files_in_directory - dir_files_downloaded + raise BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) + else: files_downloaded = 0 files_failed_to_download = 0 @@ -661,11 +668,11 @@ def run(self): if not files_failed_to_download: return BatchDownloadSuccess(files_downloaded) else: - return BatchDownloadFailure(files_downloaded, files_failed_to_download) + raise BatchDownloadFailure(files_downloaded, files_failed_to_download) else: remote = dvc_config_op["core.remote"] msg = f"{remote} is not valid artifact repository for CMF.\n Reinitialize CMF." - return msg + raise MsgFailure(msg_str=msg) def add_parser(subparsers, parent_parser): @@ -685,17 +692,17 @@ def add_parser(subparsers, parent_parser): "-p", "--pipeline_name", required=True, + action="append", help="Specify Pipeline name.", metavar="", ) parser.add_argument( - "-f", "--file_name", help="Specify mlmd file name.", metavar="" + "-f", "--file_name", action="append", help="Specify mlmd file name.", metavar="" ) parser.add_argument( - "-a", "--artifact_name", help="Specify artifact name.", metavar="" + "-a", "--artifact_name", action="append", help="Specify artifact name.", metavar="" ) parser.set_defaults(func=CmdArtifactPull) - diff --git a/cmflib/commands/artifact/push.py b/cmflib/commands/artifact/push.py index 9360bd77..998373d2 100644 --- a/cmflib/commands/artifact/push.py +++ b/cmflib/commands/artifact/push.py @@ -28,7 +28,15 @@ from cmflib.dvc_wrapper import dvc_add_attribute from cmflib.cli.utils import find_root from cmflib.utils.cmf_config import CmfConfig -from cmflib.cmf_exception_handling import PipelineNotFound, Minios3ServerInactive, FileNotFound, ExecutionsNotFound, CmfNotConfigured, ArtifactPushSuccess, MissingArgument, DuplicateArgumentNotAllowed +from cmflib.cmf_exception_handling import ( + PipelineNotFound, Minios3ServerInactive, + FileNotFound, + ExecutionsNotFound, + CmfNotConfigured, + ArtifactPushSuccess, + MissingArgument, + DuplicateArgumentNotAllowed +) class CmdArtifactPush(CmdBase): def run(self): @@ -43,6 +51,16 @@ def run(self): if output.find("'cmf' is not configured.") != -1: raise CmfNotConfigured(output) + cmd_args = { + "file_name": self.args.file_name, + "pipeline_name": self.args.pipeline_name + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("-"+arg_name[0])) out_msg = check_minio_server(dvc_config_op) if dvc_config_op["core.remote"] == "minio" and out_msg != "SUCCESS": @@ -61,20 +79,22 @@ def run(self): return result # Default path of mlmd file - mlmd_file_name = "./mlmd" current_directory = os.getcwd() - if self.args.file_name: - mlmd_file_name = self.args.file_name + if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). + mlmd_file_name = "./mlmd" # Default path for mlmd file name. + else: + mlmd_file_name = self.args.file_name[0].strip() if mlmd_file_name == "mlmd": mlmd_file_name = "./mlmd" - current_directory = os.path.dirname(mlmd_file_name) + current_directory = os.path.dirname(mlmd_file_name) if not os.path.exists(mlmd_file_name): raise FileNotFound(mlmd_file_name, current_directory) + # creating cmfquery object query = cmfquery.CmfQuery(mlmd_file_name) - # Put a check to see whether pipline exists or not - pipeline_name = self.args.pipeline_name + pipeline_name = self.args.pipeline_name[0] + # Put a check to see whether pipline exists or not if not query.get_pipeline_id(pipeline_name) > 0: raise PipelineNotFound(pipeline_name) @@ -124,7 +144,7 @@ def run(self): #print("file_set = ", final_list) result = dvc_push(list(final_list)) return ArtifactPushSuccess(result) - + def add_parser(subparsers, parent_parser): HELP = "Push artifacts to the user configured artifact repo." @@ -142,12 +162,17 @@ def add_parser(subparsers, parent_parser): "-p", "--pipeline_name", required=True, + action="append", help="Specify Pipeline name.", metavar="", ) parser.add_argument( - "-f", "--file_name", help="Specify mlmd file name.", metavar="" + "-f", + "--file_name", + action="append", + help="Specify mlmd file name.", + metavar="" ) parser.set_defaults(func=CmdArtifactPush) diff --git a/cmflib/commands/execution/list.py b/cmflib/commands/execution/list.py index 4771c777..146775b7 100644 --- a/cmflib/commands/execution/list.py +++ b/cmflib/commands/execution/list.py @@ -28,7 +28,7 @@ DuplicateArgumentNotAllowed, MissingArgument, MsgSuccess, - ExecutionsNotFound + ExecutionUUIDNotFound ) class CmdExecutionList(CmdBase): @@ -81,13 +81,21 @@ def display_table(self, df: pd.DataFrame) -> None: start_index = end_index def run(self): + cmd_args = { + "file_name": self.args.file_name, + "pipeline_name": self.args.pipeline_name, + "execution_uuid": self.args.execution_uuid + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("-"+arg_name[0])) + current_directory = os.getcwd() if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). mlmd_file_name = "./mlmd" # Default path for mlmd file name. - elif len(self.args.file_name) > 1: # If the user provided more than one file name. - raise DuplicateArgumentNotAllowed("file_name", "-f") - elif not self.args.file_name[0]: # self.args.file_name[0] is an empty string (""). - raise MissingArgument("file name") else: mlmd_file_name = self.args.file_name[0].strip() if mlmd_file_name == "mlmd": @@ -100,14 +108,7 @@ def run(self): # Creating cmfquery object. query = cmfquery.CmfQuery(mlmd_file_name) - # Check if pipeline exists in mlmd. - if self.args.pipeline_name is not None and len(self.args.pipeline_name) > 1: - raise DuplicateArgumentNotAllowed("pipeline_name", "-p") - elif not self.args.pipeline_name[0]: # self.args.pipeline_name[0] is an empty string (""). - raise MissingArgument("pipeline name") - else: - pipeline_name = self.args.pipeline_name[0] - + pipeline_name = self.args.pipeline_name[0] df = query.get_all_executions_in_pipeline(pipeline_name) # Check if the DataFrame is empty, indicating the pipeline name does not exist. @@ -119,47 +120,41 @@ def run(self): df = df.drop(['Python_Env'], axis=1) # Type of df is series of integers. # Process execution ID if provided - if not self.args.execution_id: # If self.args.execution_id is None or an empty list ([]). + if not self.args.execution_uuid: # If self.args.execution_uuid is None or an empty list ([]). pass - elif len(self.args.execution_id) > 1: # If the user provided more than one execution_id. - raise DuplicateArgumentNotAllowed("execution_id", "-e") - elif not self.args.execution_id[0]: # self.args.execution_id[0] is an empty string (""). - raise MissingArgument("execution id") else: - if self.args.execution_id[0].isdigit(): - if int(self.args.execution_id[0]) in list(df['id']): # Converting series to list. - df = df.query(f'id == {int(self.args.execution_id[0])}') # Used dataframe based on execution id - - # Rearranging columns: Start with fixed columns and appending the remaining columns. - updated_columns = ["id", "Context_Type", "Execution", "Execution_uuid", "name", "Pipeline_Type", "Git_Repo"] - updated_columns += [ col for col in df.columns if col not in updated_columns] - - df = df[updated_columns] - - # Drop columns that start with 'custom_properties_' and that contains NaN values - columns_to_drop = [col for col in df.columns if col.startswith('custom_properties_') and df[col].isna().any()] - df = df.drop(columns=columns_to_drop) - - # Wrap text in object-type columns to a width of 30 characters. - for col in df.select_dtypes(include=['object']).columns: - df[col] = df[col].apply(lambda x: textwrap.fill(x, width=30) if isinstance(x, str) else x) - - # Set 'id' as the DataFrame index and transpose it for display horizontally. - df.set_index("id", inplace=True) - df = df.T.reset_index() - df.columns.values[0] = 'id' # Rename the first column back to 'id'. - - # Display the updated DataFrame as a formatted table. - table = tabulate( - df, - headers=df.columns, - tablefmt="grid", - showindex=False, - ) - print(table) - print() - return MsgSuccess(msg_str = "Done.") - raise ExecutionsNotFound(self.args.execution_id[0]) + df = df[df['Execution_uuid'].apply(lambda x: self.args.execution_uuid[0] in x.split(","))] # Used dataframe based on execution uuid + if not df.empty: + # Rearranging columns: Start with fixed columns and appending the remaining columns. + updated_columns = ["id", "Context_Type", "Execution", "Execution_uuid", "name", "Pipeline_Type", "Git_Repo"] + updated_columns += [ col for col in df.columns if col not in updated_columns] + + df = df[updated_columns] + + # Drop columns that start with 'custom_properties_' and that contains NaN values + columns_to_drop = [col for col in df.columns if col.startswith('custom_properties_') and df[col].isna().any()] + df = df.drop(columns=columns_to_drop) + + # Wrap text in object-type columns to a width of 30 characters. + for col in df.select_dtypes(include=['object']).columns: + df[col] = df[col].apply(lambda x: textwrap.fill(x, width=30) if isinstance(x, str) else x) + + # Set 'id' as the DataFrame index and transpose it for display horizontally. + df.set_index("id", inplace=True) + df = df.T.reset_index() + df.columns.values[0] = 'id' # Rename the first column back to 'id'. + + # Display the updated DataFrame as a formatted table. + table = tabulate( + df, + headers=df.columns, + tablefmt="grid", + showindex=False, + ) + print(table) + print() + return MsgSuccess(msg_str = "Done.") + return ExecutionUUIDNotFound(self.args.execution_uuid[0]) self.display_table(df) return MsgSuccess(msg_str = "Done.") @@ -197,10 +192,10 @@ def add_parser(subparsers, parent_parser): parser.add_argument( "-e", - "--execution_id", + "--execution_uuid", action="append", - help="Specify the execution id to retrieve execution.", - metavar="", + help="Specify the execution uuid to retrieve execution.", + metavar="", ) parser.set_defaults(func=CmdExecutionList) \ No newline at end of file diff --git a/cmflib/commands/init/amazonS3.py b/cmflib/commands/init/amazonS3.py index bc5f96c4..793559cc 100644 --- a/cmflib/commands/init/amazonS3.py +++ b/cmflib/commands/init/amazonS3.py @@ -27,36 +27,54 @@ dvc_quiet_init, dvc_add_remote_repo, dvc_add_attribute, + git_modify_remote_url, ) from cmflib.utils.cmf_config import CmfConfig from cmflib.utils.helper_functions import is_git_repo -from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed +from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed, DuplicateArgumentNotAllowed, MissingArgument +import sys class CmdInitAmazonS3(CmdBase): def run(self): # Reading CONFIG_FILE variable cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") + + cmd_args = { + "url": self.args.url, + "access-key-id": self.args.access_key_id, + "secret-key": self.args.secret_key, + "git-remote-url": self.args.git_remote_url, + "session-token" : self.args.session_token, + "neo4j-user" : self.args.neo4j_user, + "neo4j-password" : self.args.neo4j_password, + "neo4j_uri" : self.args.neo4j_uri + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("--"+arg_name)) + # checking if config file exists if not os.path.exists(cmf_config): # writing default value to config file attr_dict = {} attr_dict["server-ip"] = "http://127.0.0.1:80" CmfConfig.write_config(cmf_config, "cmf", attr_dict) - # if user gave --cmf-server-ip, override the config file - if self.args.cmf_server_url: + if self.args.cmf_server_url: attr_dict = {} attr_dict["server-ip"] = self.args.cmf_server_url CmfConfig.write_config(cmf_config, "cmf", attr_dict, True) - # read --neo4j details and add to the exsting file if self.args.neo4j_user and self.args.neo4j_password and self.args.neo4j_uri: attr_dict = {} - attr_dict["user"] = self.args.neo4j_user - attr_dict["password"] = self.args.neo4j_password - attr_dict["uri"] = self.args.neo4j_uri + attr_dict["user"] = self.args.neo4j_user[0] + attr_dict["password"] = self.args.neo4j_password[0] + attr_dict["uri"] = self.args.neo4j_uri[0] CmfConfig.write_config(cmf_config, "neo4j", attr_dict, True) - elif ( + elif( not self.args.neo4j_user and not self.args.neo4j_password and not self.args.neo4j_uri @@ -64,7 +82,6 @@ def run(self): pass else: raise Neo4jArgumentNotProvided - output = is_git_repo() if not output: branch_name = "master" @@ -72,19 +89,23 @@ def run(self): git_quiet_init() git_checkout_new_branch(branch_name) git_initial_commit() - git_add_remote(self.args.git_remote_url) + git_add_remote(self.args.git_remote_url[0]) + print("git init complete.") + else: + git_modify_remote_url(self.args.git_remote_url[0]) print("git init complete.") + print("Starting cmf init.") dvc_quiet_init() repo_type = "amazons3" - output = dvc_add_remote_repo(repo_type, self.args.url) + output = dvc_add_remote_repo(repo_type, self.args.url[0]) if not output: raise CmfInitFailed print(output) - dvc_add_attribute(repo_type, "access_key_id", self.args.access_key_id) - dvc_add_attribute(repo_type, "secret_access_key", self.args.secret_key) - dvc_add_attribute(repo_type, "session_token", self.args.session_token) + dvc_add_attribute(repo_type, "access_key_id", self.args.access_key_id[0]) + dvc_add_attribute(repo_type, "secret_access_key", self.args.secret_key[0]) + dvc_add_attribute(repo_type, "session_token", self.args.session_token[0]) status = CmfInitComplete() return status @@ -105,6 +126,7 @@ def add_parser(subparsers, parent_parser): required_arguments.add_argument( "--url", required=True, + action="append", help="Specify Amazon S3 bucket url.", metavar="", default=argparse.SUPPRESS, @@ -113,6 +135,7 @@ def add_parser(subparsers, parent_parser): required_arguments.add_argument( "--access-key-id", required=True, + action="append", help="Specify Access Key Id.", metavar="", default=argparse.SUPPRESS, @@ -122,6 +145,7 @@ def add_parser(subparsers, parent_parser): "--secret-key", required=True, help="Specify Secret Key.", + action="append", metavar="", default=argparse.SUPPRESS, ) @@ -131,13 +155,14 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify Session Token.", metavar="", - default="", + action="append", ) required_arguments.add_argument( "--git-remote-url", required=True, help="Specify git repo url. eg: https://github.com/XXX/example.git", + action="append", metavar="", default=argparse.SUPPRESS, ) @@ -146,24 +171,28 @@ def add_parser(subparsers, parent_parser): "--cmf-server-url", help="Specify cmf-server URL.", metavar="", - default="http://127.0.0.1:80", + action="append", + default=["http://127.0.0.1:80"], ) parser.add_argument( "--neo4j-user", help="Specify neo4j user.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-password", help="Specify neo4j password.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-uri", help="Specify neo4j uri.eg bolt://localhost:7687", metavar="", + action="append", # default=argparse.SUPPRESS, ) diff --git a/cmflib/commands/init/local.py b/cmflib/commands/init/local.py index 6eeb64a7..be35bded 100644 --- a/cmflib/commands/init/local.py +++ b/cmflib/commands/init/local.py @@ -26,14 +26,32 @@ git_add_remote, dvc_quiet_init, dvc_add_remote_repo, + git_modify_remote_url, ) from cmflib.utils.cmf_config import CmfConfig from cmflib.utils.helper_functions import is_git_repo +from cmflib.cmf_exception_handling import MissingArgument, DuplicateArgumentNotAllowed class CmdInitLocal(CmdBase): def run(self): # Reading CONFIG_FILE variable cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") + + cmd_args = { + "path": self.args.path, + "git-remote-url": self.args.git_remote_url, + "neo4j-user" : self.args.neo4j_user, + "neo4j-password" : self.args.neo4j_password, + "neo4j_uri" : self.args.neo4j_uri + } + + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("--"+arg_name)) + # checking if config file exists if not os.path.exists(cmf_config): # writing default value to config file @@ -50,9 +68,9 @@ def run(self): # read --neo4j details and add to the exsting file if self.args.neo4j_user and self.args.neo4j_password and self.args.neo4j_uri: attr_dict = {} - attr_dict["user"] = self.args.neo4j_user - attr_dict["password"] = self.args.neo4j_password - attr_dict["uri"] = self.args.neo4j_uri + attr_dict["user"] = self.args.neo4j_user[0] + attr_dict["password"] = self.args.neo4j_password[0] + attr_dict["uri"] = self.args.neo4j_uri[0] CmfConfig.write_config(cmf_config, "neo4j", attr_dict, True) elif ( not self.args.neo4j_user @@ -71,13 +89,16 @@ def run(self): git_quiet_init() git_checkout_new_branch(branch_name) git_initial_commit() - git_add_remote(self.args.git_remote_url) + git_add_remote(self.args.git_remote_url[0]) + print("git init complete.") + else: + git_modify_remote_url(self.args.git_remote_url[0]) print("git init complete.") print("Starting cmf init.") dvc_quiet_init() repo_type = "local-storage" - output = dvc_add_remote_repo(repo_type, self.args.path) + output = dvc_add_remote_repo(repo_type, self.args.path[0]) if not output: raise CmfInitFailed print(output) @@ -100,6 +121,7 @@ def add_parser(subparsers, parent_parser): required_arguments.add_argument( "--path", required=True, + action="append", help="Specify local directory path.", metavar="", default=argparse.SUPPRESS, @@ -108,6 +130,7 @@ def add_parser(subparsers, parent_parser): required_arguments.add_argument( "--git-remote-url", required=True, + action="append", help="Specify git repo url, eg: https://github.com/XXX/example.git", metavar="", # default=argparse.SUPPRESS @@ -124,18 +147,21 @@ def add_parser(subparsers, parent_parser): "--neo4j-user", help="Specify neo4j user.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-password", help="Specify neo4j password.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-uri", help="Specify neo4j uri. eg bolt://localhost:7687", metavar="", + action="append", # default=argparse.SUPPRESS, ) diff --git a/cmflib/commands/init/minioS3.py b/cmflib/commands/init/minioS3.py index 38f0bfbf..cd798f79 100644 --- a/cmflib/commands/init/minioS3.py +++ b/cmflib/commands/init/minioS3.py @@ -27,15 +27,36 @@ dvc_quiet_init, dvc_add_remote_repo, dvc_add_attribute, + git_modify_remote_url, ) from cmflib.utils.cmf_config import CmfConfig from cmflib.utils.helper_functions import is_git_repo from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed +from cmflib.cmf_exception_handling import MissingArgument, DuplicateArgumentNotAllowed class CmdInitMinioS3(CmdBase): def run(self): # Reading CONFIG_FILE variable cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") + + cmd_args = { + "url": self.args.url, + "endpoint-url": self.args.endpoint_url, + "access-key-id": self.args.access_key_id, + "secret-key": self.args.secret_key, + "git-remote-url": self.args.git_remote_url, + "neo4j-user" : self.args.neo4j_user, + "neo4j-password" : self.args.neo4j_password, + "neo4j_uri" : self.args.neo4j_uri + } + + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("--"+arg_name)) + # checking if config file exists if not os.path.exists(cmf_config): # writing default value to config file @@ -52,9 +73,9 @@ def run(self): # read --neo4j details and add to the exsting file if self.args.neo4j_user and self.args.neo4j_password and self.args.neo4j_uri: attr_dict = {} - attr_dict["user"] = self.args.neo4j_user - attr_dict["password"] = self.args.neo4j_password - attr_dict["uri"] = self.args.neo4j_uri + attr_dict["user"] = self.args.neo4j_user[0] + attr_dict["password"] = self.args.neo4j_password[0] + attr_dict["uri"] = self.args.neo4j_uri[0] CmfConfig.write_config(cmf_config, "neo4j", attr_dict, True) elif ( not self.args.neo4j_user @@ -71,24 +92,27 @@ def run(self): git_quiet_init() git_checkout_new_branch(branch_name) git_initial_commit() - git_add_remote(self.args.git_remote_url) + git_add_remote(self.args.git_remote_url[0]) + print("git init complete.") + else: + git_modify_remote_url(self.args.git_remote_url[0]) print("git init complete.") + print("Starting cmf init.") dvc_quiet_init() repo_type = "minio" - output = dvc_add_remote_repo(repo_type, self.args.url) + output = dvc_add_remote_repo(repo_type, self.args.url[0]) if not output: raise CmfInitFailed print(output) - dvc_add_attribute(repo_type, "endpointurl", self.args.endpoint_url) - dvc_add_attribute(repo_type, "access_key_id", self.args.access_key_id) - dvc_add_attribute(repo_type, "secret_access_key", self.args.secret_key) + dvc_add_attribute(repo_type, "endpointurl", self.args.endpoint_url[0]) + dvc_add_attribute(repo_type, "access_key_id", self.args.access_key_id[0]) + dvc_add_attribute(repo_type, "secret_access_key", self.args.secret_key[0]) status = CmfInitComplete() return status - def add_parser(subparsers, parent_parser): HELP = "Initialises Minio S3 bucket as artifact repository." @@ -105,6 +129,7 @@ def add_parser(subparsers, parent_parser): "--url", required=True, help="Specify Minio S3 bucket url.", + action="append", metavar="", default=argparse.SUPPRESS, ) @@ -113,6 +138,7 @@ def add_parser(subparsers, parent_parser): "--endpoint-url", required=True, help="Specify endpoint url which is used to access Minio's locally/remotely running UI.", + action="append", metavar="", default=argparse.SUPPRESS, ) @@ -122,6 +148,7 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify Access Key Id.", metavar="", + action="append", default=argparse.SUPPRESS, ) @@ -129,6 +156,7 @@ def add_parser(subparsers, parent_parser): "--secret-key", required=True, help="Specify Secret Key.", + action="append", metavar="", default=argparse.SUPPRESS, ) @@ -138,6 +166,7 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify git repo url. eg: https://github.com/XXX/example.git", metavar="", + action="append", default=argparse.SUPPRESS, ) @@ -152,18 +181,21 @@ def add_parser(subparsers, parent_parser): "--neo4j-user", help="Specify neo4j user.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-password", help="Specify neo4j password.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-uri", help="Specify neo4j uri.eg bolt://localhost:7687", metavar="", + action="append", # default=argparse.SUPPRESS, ) diff --git a/cmflib/commands/init/sshremote.py b/cmflib/commands/init/sshremote.py index ca1636fb..eb82ea76 100644 --- a/cmflib/commands/init/sshremote.py +++ b/cmflib/commands/init/sshremote.py @@ -31,12 +31,31 @@ ) from cmflib.utils.cmf_config import CmfConfig from cmflib.utils.helper_functions import is_git_repo -from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed +from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed, DuplicateArgumentNotAllowed, MissingArgument class CmdInitSSHRemote(CmdBase): def run(self): # Reading CONFIG_FILE variable cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") + + cmd_args = { + "path": self.args.path, + "user": self.args.user, + "port": self.args.port, + "password": self.args.password, + "git-remote-url": self.args.git_remote_url, + "neo4j-user" : self.args.neo4j_user, + "neo4j-password" : self.args.neo4j_password, + "neo4j_uri" : self.args.neo4j_uri + } + + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("--"+arg_name)) + # checking if config file exists if not os.path.exists(cmf_config): # writing default value to config file @@ -53,9 +72,9 @@ def run(self): # read --neo4j details and add to the exsting file if self.args.neo4j_user and self.args.neo4j_password and self.args.neo4j_uri: attr_dict = {} - attr_dict["user"] = self.args.neo4j_user - attr_dict["password"] = self.args.neo4j_password - attr_dict["uri"] = self.args.neo4j_uri + attr_dict["user"] = self.args.neo4j_user[0] + attr_dict["password"] = self.args.neo4j_password[0] + attr_dict["uri"] = self.args.neo4j_uri[0] CmfConfig.write_config(cmf_config, "neo4j", attr_dict, True) elif ( not self.args.neo4j_user @@ -72,19 +91,19 @@ def run(self): git_quiet_init() git_checkout_new_branch(branch_name) git_initial_commit() - git_add_remote(self.args.git_remote_url) + git_add_remote(self.args.git_remote_url[0]) print("git init complete.") print("Starting cmf init.") repo_type = "ssh-storage" dvc_quiet_init() - output = dvc_add_remote_repo(repo_type, self.args.path) + output = dvc_add_remote_repo(repo_type, self.args.path[0]) if not output: raise CmfInitFailed print(output) - dvc_add_attribute(repo_type, "user", self.args.user) - dvc_add_attribute(repo_type, "password", self.args.password) - dvc_add_attribute(repo_type, "port", self.args.port) + dvc_add_attribute(repo_type, "user", self.args.user[0]) + dvc_add_attribute(repo_type, "password", self.args.password[0]) + dvc_add_attribute(repo_type, "port", self.args.port[0]) status = CmfInitComplete() return status @@ -107,6 +126,7 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify remote ssh directory path.", metavar="", + action="append", default=argparse.SUPPRESS, ) @@ -115,6 +135,7 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify username.", metavar="", + action="append", default=argparse.SUPPRESS, ) @@ -123,6 +144,7 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify port.", metavar="", + action="append", default=argparse.SUPPRESS, ) @@ -131,6 +153,7 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify password. This will be saved only on local", metavar="", + action="append", default=argparse.SUPPRESS, ) @@ -139,6 +162,7 @@ def add_parser(subparsers, parent_parser): required=True, help="Specify git repo url. eg: https://github.com/XXX/example.git", metavar="", + action="append", default=argparse.SUPPRESS, ) @@ -153,18 +177,21 @@ def add_parser(subparsers, parent_parser): "--neo4j-user", help="Specify neo4j user.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-password", help="Specify neo4j password.", metavar="", + action="append", # default=argparse.SUPPRESS, ) parser.add_argument( "--neo4j-uri", help="Specify neo4j uri.eg bolt://localhost:7687", metavar="", + action="append", # default=argparse.SUPPRESS, ) diff --git a/cmflib/commands/metadata/export.py b/cmflib/commands/metadata/export.py index 3fdbaf06..d56ebdfc 100644 --- a/cmflib/commands/metadata/export.py +++ b/cmflib/commands/metadata/export.py @@ -27,7 +27,9 @@ DuplicateArgumentNotAllowed, MissingArgument, NoChangesMadeInfo, - MetadataExportToJson + MetadataExportToJson, + DirectoryNotfound, + MsgFailure ) # This class export local mlmd data to a json file @@ -42,20 +44,28 @@ def create_full_path(self, current_directory: str, json_file_name: str) -> str: full_path_to_dump = json_file_name return full_path_to_dump else: - return f"{current_directory} doesn't exists." + raise DirectoryNotfound(current_directory) else: - return "Provide path with file name." + raise MsgFailure(msg_str = "Provide path with file name.") def run(self): + cmd_args = { + "file_name": self.args.file_name, + "pipeline_name": self.args.pipeline_name, + "json_file_name": self.args.json_file_name + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("-"+arg_name[0])) + current_directory = os.getcwd() full_path_to_dump = "" if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). mlmd_file_name = "./mlmd" # Default path for mlmd file name. - elif len(self.args.file_name) > 1: # If the user provided more than one file name. - raise DuplicateArgumentNotAllowed("file_name", "-f") - elif not self.args.file_name[0]: # self.args.file_name[0] is an empty string (""). - raise MissingArgument("file name") else: mlmd_file_name = self.args.file_name[0].strip() # Removing starting and ending whitespaces. if mlmd_file_name == "mlmd": @@ -68,23 +78,12 @@ def run(self): # Initialising cmfquery class. query = cmfquery.CmfQuery(mlmd_file_name) - # Check if pipeline exists in mlmd . - if self.args.pipeline_name is not None and len(self.args.pipeline_name) > 1: - raise DuplicateArgumentNotAllowed("pipeline_name", "-p") - elif not self.args.pipeline_name[0]: # self.args.pipeline_name[0] is an empty string (""). - raise MissingArgument("pipeline name") - else: - pipeline_name = self.args.pipeline_name[0] - + pipeline_name = self.args.pipeline_name[0] pipeline = query.get_pipeline_id(pipeline_name) if pipeline > 0: if not self.args.json_file_name: # If self.args.json_file_name is None or an empty list ([]). json_file_name = self.args.json_file_name - elif len(self.args.json_file_name) > 1: # If the user provided more than one json file name. - raise DuplicateArgumentNotAllowed("json file", "-j") - elif not self.args.json_file_name[0]: # self.args.json_file_name[0] is an empty string (""). - raise MissingArgument("json file") else: json_file_name = self.args.json_file_name[0].strip() @@ -112,7 +111,7 @@ def run(self): full_path_to_dump = os.getcwd() + f"/{pipeline_name}.json" # Pulling data from local mlmd file. - json_payload = query.dumptojson(pipeline_name,None) + json_payload = query.dumptojson(pipeline_name, None) # Write metadata into json file. with open(full_path_to_dump, 'w') as f: diff --git a/cmflib/commands/metadata/pull.py b/cmflib/commands/metadata/pull.py index 2ad40736..c066f477 100644 --- a/cmflib/commands/metadata/pull.py +++ b/cmflib/commands/metadata/pull.py @@ -23,8 +23,11 @@ from cmflib.server_interface import server_interface from cmflib.utils.cmf_config import CmfConfig from cmflib.cmf_exception_handling import ( + DuplicateArgumentNotAllowed, PipelineNotFound, - CmfNotConfigured, ExecutionIDNotFound, + MissingArgument, + CmfNotConfigured, + ExecutionUUIDNotFound, MlmdNotFoundOnServer, MlmdFilePullSuccess, CmfServerNotAvailable, @@ -36,6 +39,7 @@ # This class pulls mlmd file from cmf-server class CmdMetadataPull(CmdBase): + def run(self): cmfconfig = os.environ.get("CONFIG_FILE", ".cmfconfig") # find root_dir of .cmfconfig @@ -50,37 +54,54 @@ def run(self): full_path_to_dump = "" cmd = "pull" status = 0 - exec_id = None + exec_uuid = None + + cmd_args = { + "file_name": self.args.file_name, + "pipeline_name": self.args.pipeline_name, + "execution_uuid": self.args.execution_uuid + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("-"+arg_name[0])) + + if not self.args.execution_uuid: # If self.args.execution_uuid[0] is None or an empty list ([]). + pass + if self.args.file_name: # setting directory where mlmd file will be dumped - if not os.path.isdir(self.args.file_name): - temp = os.path.dirname(self.args.file_name) + if not os.path.isdir(self.args.file_name[0]): + temp = os.path.dirname(self.args.file_name[0]) if temp != "": current_directory = temp if os.path.exists(current_directory): - full_path_to_dump = self.args.file_name + full_path_to_dump = self.args.file_name[0] else: raise DirectoryNotfound(current_dir= current_directory) else: raise FileNameNotfound else: full_path_to_dump = os.getcwd() + "/mlmd" - if self.args.execution: - exec_id = self.args.execution + + if self.args.execution_uuid: + exec_uuid = self.args.execution_uuid[0] output = server_interface.call_mlmd_pull( - url, self.args.pipeline_name, exec_id + url, self.args.pipeline_name[0], exec_uuid ) # calls cmf-server api to get mlmd file data(Json format) status = output.status_code # checks If given pipeline does not exists/ elif pull mlmd file/ else mlmd file is not available if output.content.decode() == None: - raise PipelineNotFound(self.args.pipeline_name) - elif output.content.decode() == "no_exec_id": - raise ExecutionIDNotFound(exec_id) + raise PipelineNotFound(self.args.pipeline_name[0]) + elif output.content.decode() == "no_exec_uuid": + raise ExecutionUUIDNotFound(exec_uuid) elif output.content: if status == 200: try: cmf_merger.parse_json_to_mlmd( - output.content, full_path_to_dump, cmd, None + output.content, full_path_to_dump, cmd, exec_uuid ) # converts mlmd json data to mlmd file pull_status = MlmdFilePullSuccess(full_path_to_dump) return pull_status @@ -89,7 +110,7 @@ def run(self): elif status == 413: raise MlmdNotFoundOnServer elif status == 406: - raise PipelineNotFound(self.args.pipeline_name) + raise PipelineNotFound(self.args.pipeline_name[0]) elif status == 404: raise CmfServerNotAvailable elif status == 500: @@ -113,6 +134,7 @@ def add_parser(subparsers, parent_parser): "-p", "--pipeline_name", required=True, + action="append", help="Specify Pipeline name.", metavar="", ) @@ -120,12 +142,13 @@ def add_parser(subparsers, parent_parser): parser.add_argument( "-f", "--file_name", + action="append", help="Specify mlmd file name with full path.", metavar="", ) parser.add_argument( - "-e", "--execution", help="Specify Execution id", metavar="" + "-e", "--execution_uuid", action="append", help="Specify execution_uuid", metavar="" ) parser.set_defaults(func=CmdMetadataPull) diff --git a/cmflib/commands/metadata/push.py b/cmflib/commands/metadata/push.py index 32e3253a..1ac443be 100644 --- a/cmflib/commands/metadata/push.py +++ b/cmflib/commands/metadata/push.py @@ -29,13 +29,15 @@ MlmdFilePushSuccess, ExecutionsAlreadyExists, FileNotFound, - ExecutionIDNotFound, + ExecutionUUIDNotFound, PipelineNotFound, UpdateCmfVersion, CmfServerNotAvailable, InternalServerError, CmfNotConfigured, - InvalidTensorboardFilePath + InvalidTensorboardFilePath, + MissingArgument, + DuplicateArgumentNotAllowed ) # This class pushes mlmd file to cmf-server class CmdMetadataPush(CmdBase): @@ -54,13 +56,30 @@ def run(self): attr_dict = CmfConfig.read_config(config_file_path) url = attr_dict.get("cmf-server-ip", "http://127.0.0.1:80") - mlmd_file_name = "./mlmd" - current_directory = os.getcwd() - # checks if mlmd filepath is given - if self.args.file_name: - mlmd_file_name = self.args.file_name - current_directory = os.path.dirname(self.args.file_name) + cmd_args = { + "file_name": self.args.file_name, + "pipeline_name": self.args.pipeline_name, + "execution_uuid": self.args.execution_uuid, + "tensorboad": self.args.tensorboard + } + for arg_name, arg_value in cmd_args.items(): + if arg_value: + if arg_value[0] == "": + raise MissingArgument(arg_name) + elif len(arg_value) > 1: + raise DuplicateArgumentNotAllowed(arg_name,("-"+arg_name[0])) + + current_directory = os.getcwd() + if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). + mlmd_file_name = "./mlmd" # Default path for mlmd file name. + else: + mlmd_file_name = self.args.file_name[0].strip() + if mlmd_file_name == "mlmd": + mlmd_file_name = "./mlmd" + + current_directory = os.path.dirname(mlmd_file_name) + # checks if mlmd file is present in current directory or given directory if not os.path.exists(mlmd_file_name): raise FileNotFound(mlmd_file_name, current_directory) @@ -71,30 +90,37 @@ def run(self): status_code = 0 # Checks if pipeline name exists - if self.args.pipeline_name in query.get_pipeline_names(): + pipeline_name = self.args.pipeline_name[0] + if pipeline_name in query.get_pipeline_names(): print("metadata push started") print("........................................") # converts mlmd file to json format - json_payload = query.dumptojson(self.args.pipeline_name, None) - # checks if execution_id is given by user - if self.args.execution: - exec_id = self.args.execution + json_payload = query.dumptojson(pipeline_name, None) + + # checks if execution is given by user + if not self.args.execution_uuid: # If self.args.execution_uuid is None or an empty list ([]). + exec_uuid = None + response = server_interface.call_mlmd_push(json_payload, url, exec_uuid, pipeline_name) + else: + exec_uuid = self.args.execution_uuid[0] mlmd_data = json.loads(json_payload)["Pipeline"] - # checks if given execution_id present in mlmd + # checks if given execution present in mlmd for i in mlmd_data[0]["stages"]: for j in i["executions"]: - if j["id"] == int(exec_id): + # created exec_uuid of list if multiple uuid present for single execution. + # for eg: f9da581c-d16c-11ef-9809-9350156ed1ac,32f17f4a-d16d-11ef-9809-9350156ed1ac + uuid_list = j['properties']['Execution_uuid'].split(",") + # check if user specified exec_uuid exists inside local mlmd + if exec_uuid in uuid_list: execution_flag = 1 - # calling mlmd_push api to push mlmd file to cmf-server + # calling mlmd_push api to push mlmd_data = json.loads(json_payload)["Pipeline"] + # checks if given execution present in mlmdmlmd file to cmf-server response = server_interface.call_mlmd_push( - json_payload, url, exec_id, self.args.pipeline_name + json_payload, url, exec_uuid, pipeline_name ) break if execution_flag == 0: - raise ExecutionIDNotFound(exec_id) - else: - exec_id = None - response = server_interface.call_mlmd_push(json_payload, url, exec_id, self.args.pipeline_name) + raise ExecutionUUIDNotFound(exec_uuid) status_code = response.status_code if status_code == 200: output = "" @@ -105,7 +131,6 @@ def run(self): if response.json()["status"]=="exists": display_output = "Executions already exists." output = ExecutionsAlreadyExists() - if not self.args.tensorboard: return output print(display_output) @@ -115,10 +140,12 @@ def run(self): print("tensorboard logs upload started!!") print("......................................") + + tensorboard = self.args.tensorboard[0] # check if the path provided is for a file - if os.path.isfile(self.args.tensorboard): - file_name = os.path.basename(self.args.tensorboard) - tresponse = server_interface.call_tensorboard(url, self.args.pipeline_name, file_name, self.args.tensorboard) + if os.path.isfile(tensorboard): + file_name = os.path.basename(tensorboard) + tresponse = server_interface.call_tensorboard(url, pipeline_name, file_name, tensorboard) tstatus_code = tresponse.status_code if tstatus_code == 200: # give status code as success @@ -127,13 +154,13 @@ def run(self): # give status code as failure return TensorboardPushFailure(file_name,tresponse.text) # If path provided is a directory - elif os.path.isdir(self.args.tensorboard): + elif os.path.isdir(tensorboard): # Recursively push all files and subdirectories - for root, dirs, files in os.walk(self.args.tensorboard): + for root, dirs, files in os.walk(tensorboard): for file_name in files: file_path = os.path.join(root, file_name) - relative_path = os.path.relpath(file_path, self.args.tensorboard) - tresponse = server_interface.call_tensorboard(url, self.args.pipeline_name, relative_path, file_path) + relative_path = os.path.relpath(file_path, tensorboard) + tresponse = server_interface.call_tensorboard(url, pipeline_name, relative_path, file_path) if tresponse.status_code == 200: print(f"tensorboard logs: File {file_name} uploaded successfully.") else: @@ -151,7 +178,7 @@ def run(self): else: return "ERROR: Status Code = {status_code}. Unable to push mlmd." else: - raise PipelineNotFound(self.args.pipeline_name) + raise PipelineNotFound(pipeline_name) def add_parser(subparsers, parent_parser): @@ -170,24 +197,31 @@ def add_parser(subparsers, parent_parser): "-p", "--pipeline_name", required=True, + action="append", help="Specify Pipeline name.", metavar="", ) parser.add_argument( - "-f", "--file_name", help="Specify mlmd file name.", metavar="" + "-f", + "--file_name", + action="append", + help="Specify mlmd file name.", + metavar="" ) parser.add_argument( "-e", - "--execution", - help="Specify Execution id.", - metavar="", + "--execution_uuid", + action="append", + help="Specify Execution uuid.", + metavar="", ) parser.add_argument( "-t", "--tensorboard", + action="append", help="Specify path to tensorboard logs for the pipeline.", metavar="" ) diff --git a/cmflib/commands/repo/__init__.py b/cmflib/commands/repo/__init__.py new file mode 100644 index 00000000..3fefb0d8 --- /dev/null +++ b/cmflib/commands/repo/__init__.py @@ -0,0 +1,43 @@ +### +# Copyright (2023) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import argparse + +from cmflib.commands.repo import push, pull +from cmflib.cli.utils import * + +SUB_COMMANDS = [push, pull] + +# This parser adds positional arguments to the main parser +def add_parser(subparsers, parent_parser): + REPO_HELP = "Push and pull artifacts, metadata files, and source code to and from the user's artifact repository, cmf-server, and git respectively." + + + metadata_parser = subparsers.add_parser( + "repo", + parents=[parent_parser], + description="Push and pull artifacts, metadata files, and source code to and from the user's artifact repository, cmf-server, and git respectively.", + help=REPO_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + metadata_subparsers = metadata_parser.add_subparsers( + dest="cmd", help="Use `cmf repo CMD --help` for " "command-specific help." + ) + + fix_subparsers(metadata_subparsers) + for cmd in SUB_COMMANDS: + cmd.add_parser(metadata_subparsers, parent_parser) diff --git a/cmflib/commands/repo/pull.py b/cmflib/commands/repo/pull.py new file mode 100644 index 00000000..f5e75f26 --- /dev/null +++ b/cmflib/commands/repo/pull.py @@ -0,0 +1,125 @@ +### +# Copyright (2024) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +#!/usr/bin/env python3 +import argparse +import requests + +from cmflib.cli.command import CmdBase +from cmflib.dvc_wrapper import git_get_repo, git_get_pull, git_get_branch +from cmflib.commands.artifact.pull import CmdArtifactPull +from cmflib.commands.metadata.pull import CmdMetadataPull +from cmflib.cmf_exception_handling import MsgSuccess, MsgFailure + + +class CmdRepoPull(CmdBase): + def branch_exists(self, repo_own: str, repo_name: str, branch_name: str) -> bool: + """ + Check if a branch exists in a GitHub repository. + + Args: + repo_owner: The owner of the GitHub repository. + repo_name: The name of the GitHub repository. + branch_name: The name of the branch to check. + + Returns: + bool: True if the branch exists, otherwise False. + """ + url = f"https://api.github.com/repos/{repo_own}/{repo_name}/branches/{branch_name}" + res = requests.get(url) + + if res.status_code == 200: + return True + return False + + def git_pull(self): + # Getting github url from cmf init command + url = git_get_repo() + # Example url = https://github.com/ABC/my-repo + splited_url = url.split("/") + branch_name = git_get_branch()[0] + # Check whether branch exists in git repo or not + # url[-2] = ABC, url-1] = my-repo + if self.branch_exists(splited_url[-2], splited_url[-1], branch_name): + # pull the code from mlmd branch + print("git pull started...") + stdout, stderr, returncode = git_get_pull(branch_name) + if returncode != 0: + raise MsgFailure(msg_str=f"{stderr}") + return MsgSuccess(msg_str=stdout) + else: + raise MsgFailure(msg_str=f"{branch_name} inside {url} does not exists!!") + + def run(self): + print("metadata pull started...") + instance_of_metadata = CmdMetadataPull(self.args) + if instance_of_metadata.run().status == "success": + print("artifact pull started...") + instance_of_artifact = CmdArtifactPull(self.args) + if instance_of_artifact.run().status == "success": + return self.git_pull() + + +def add_parser(subparsers, parent_parser): + PULL_HELP = "Pull artifacts, metadata files, and source code from the user's artifact repository, cmf-server, and git respectively." + + parser = subparsers.add_parser( + "pull", + parents=[parent_parser], + description="Pull artifacts, metadata files, and source code from the user's artifact repository, cmf-server, and git respectively.", + help=PULL_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + required_arguments = parser.add_argument_group("required arguments") + + required_arguments.add_argument( + "-p", + "--pipeline_name", + required=True, + action="append", + help="Specify Pipeline name.", + metavar="", + ) + + parser.add_argument( + "-f", + "--file_name", + action="append", + help="Specify mlmd file name.", + metavar="", + ) + + parser.add_argument( + "-e", + "--execution_uuid", + action="append", + help="Specify Execution uuid.", + metavar="", + ) + + # The 'artifact_name' parameter is used inside 'cmf artifact pull' command. + # To avoid errors, it is defined here with a default value of 'None' and hidden from the help text using 'argparse.SUPPRESS'. + parser.add_argument( + "-a", + "--artifact_name", + action="store_const", + const="None", + help=argparse.SUPPRESS, + metavar="", + ) + + parser.set_defaults(func=CmdRepoPull) diff --git a/cmflib/commands/repo/push.py b/cmflib/commands/repo/push.py new file mode 100644 index 00000000..dc753d68 --- /dev/null +++ b/cmflib/commands/repo/push.py @@ -0,0 +1,221 @@ +### +# Copyright (2024) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +#!/usr/bin/env python3 +import argparse +import requests +import os +import re + +from cmflib import cmfquery +from cmflib.cli.utils import check_minio_server, find_root +from cmflib.utils.helper_functions import generate_osdf_token +from cmflib.utils.dvc_config import DvcConfig +from cmflib.dvc_wrapper import dvc_add_attribute +from cmflib.utils.cmf_config import CmfConfig +from cmflib.cli.command import CmdBase +from cmflib.dvc_wrapper import git_get_repo, git_get_pull, git_get_push, git_get_branch, dvc_push +from cmflib.commands.artifact.push import CmdArtifactPush +from cmflib.commands.metadata.push import CmdMetadataPush +from cmflib.cmf_exception_handling import ( + MsgSuccess, + MsgFailure, + ArtifactPushSuccess, + Minios3ServerInactive, + CmfNotConfigured, + FileNotFound + ) + + +class CmdRepoPush(CmdBase): + def branch_exists(self, repo_own: str, repo_name: str, branch_name: str) -> bool: + """ + Check if a branch exists in a GitHub repository. + + Args: + repo_owner: The owner of the GitHub repository. + repo_name: The name of the GitHub repository. + branch_name: The name of the branch to check. + + Returns: + bool: True if the branch exists, otherwise False. + """ + url = f"https://api.github.com/repos/{repo_own}/{repo_name}/branches/{branch_name}" + res = requests.get(url) + + if res.status_code == 200: + return True + return False + + def git_push(self): + # Getting github url from cmf init command + url = git_get_repo() + # Example url = https://github.com/ABC/my-repo + url = url.split("/") + branch_name = git_get_branch()[0] + # Check whether branch exists in git repo or not + # url[-2] = ABC, url-1] = my-repo + if self.branch_exists(url[-2], url[-1], branch_name): + # 1. pull the code from mlmd branch + # 2. push the code inside mlmd branch + stdout, stderr, returncode = git_get_pull(branch_name) + if returncode != 0: + raise MsgFailure(msg_str=f"{stderr}") + print(stdout) + # push the code inside mlmd branch + stdout, stderr, returncode = git_get_push(branch_name) + if returncode != 0: + raise MsgFailure(msg_str=f"{stderr}") + return MsgSuccess(msg_str="Successfully pushed and pulled changes!") + + def artifact_push(self): + result = "" + dvc_config_op = DvcConfig.get_dvc_config() + cmf_config_file = os.environ.get("CONFIG_FILE", ".cmfconfig") + + # find root_dir of .cmfconfig + output = find_root(cmf_config_file) + + # in case, there is no .cmfconfig file + if output.find("'cmf' is not configured.") != -1: + raise CmfNotConfigured(output) + + out_msg = check_minio_server(dvc_config_op) + if dvc_config_op["core.remote"] == "minio" and out_msg != "SUCCESS": + raise Minios3ServerInactive() + if dvc_config_op["core.remote"] == "osdf": + config_file_path = os.path.join(output, cmf_config_file) + cmf_config={} + cmf_config=CmfConfig.read_config(config_file_path) + #print("key_id="+cmf_config["osdf-key_id"]) + dynamic_password = generate_osdf_token(cmf_config["osdf-key_id"],cmf_config["osdf-key_path"],cmf_config["osdf-key_issuer"]) + #print("Dynamic Password"+dynamic_password) + dvc_add_attribute(dvc_config_op["core.remote"],"password",dynamic_password) + #The Push URL will be something like: https:///files/md5/[First Two of MD5 Hash] + result = dvc_push() + return result + + current_directory = os.getcwd() + if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). + mlmd_file_name = "./mlmd" # Default path for mlmd file name. + else: + mlmd_file_name = self.args.file_name[0] + if mlmd_file_name == "mlmd": + mlmd_file_name = "./mlmd" + current_directory = os.path.dirname(mlmd_file_name) + if not os.path.exists(mlmd_file_name): #checking if MLMD files exists + raise FileNotFound(mlmd_file_name, current_directory) + + # creating cmfquery object + query = cmfquery.CmfQuery(mlmd_file_name) + names = [] + df = query.get_all_executions_in_pipeline(self.args.pipeline_name[0]) + # fetching execution id from df based on execution_uuid + exec_id_df = df[df['Execution_uuid'].apply(lambda x: self.args.execution_uuid[0] in x.split(","))]['id'] + exec_id = int (exec_id_df.iloc[0]) + + artifacts = query.get_all_artifacts_for_execution(exec_id) # getting all artifacts based on execution id + # dropping artifact with type 'metrics' as metrics doesn't have physical file + if not artifacts.empty: + artifacts = artifacts[artifacts['type'] != 'Metrics'] + # adding .dvc at the end of every file as it is needed for pull + artifacts['name'] = artifacts['name'].apply(lambda name: f"{name.split(':')[0]}.dvc") + names.extend(artifacts['name'].tolist()) + + final_list = [] + for file in set(names): + # checking if the .dvc exists + if os.path.exists(file): + final_list.append(file) + # checking if the .dvc exists in user's project working directory + elif os.path.isabs(file): + file = re.split("/",file)[-1] + file = os.path.join(os.getcwd(), file) + if os.path.exists(file): + final_list.append(file) + else: + # not adding the .dvc to the final list in case .dvc doesn't exists in both the places + pass + result = dvc_push(list(final_list)) + return ArtifactPushSuccess(result) + + + def run(self): + print("Executing cmf metadata push command..") + metadata_push_instance = CmdMetadataPush(self.args) + if metadata_push_instance.run().status == "success": + print("Executing cmf artifact push command..") + if(self.args.execution_uuid): + # If an execution uuid exists, push the artifacts associated with that execution. + artifact_push_result = self.artifact_push() + else: + # Pushing all artifacts. + artifact_push_instance = CmdArtifactPush(self.args) + artifact_push_result = artifact_push_instance.run() + + if artifact_push_result.status == "success": + print("Executing git push command..") + return self.git_push() + + +def add_parser(subparsers, parent_parser): + PUSH_HELP = "Push artifacts, metadata files, and source code to the user's artifact repository, cmf-server, and git respectively." + + parser = subparsers.add_parser( + "push", + parents=[parent_parser], + description="Push artifacts, metadata files, and source code to the user's artifact repository, cmf-server, and git respectively.", + help=PUSH_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + required_arguments = parser.add_argument_group("required arguments") + + required_arguments.add_argument( + "-p", + "--pipeline_name", + required=True, + action="append", + help="Specify Pipeline name.", + metavar="", + ) + + parser.add_argument( + "-f", + "--file_name", + action="append", + help="Specify mlmd file name.", + metavar="" + ) + + parser.add_argument( + "-e", + "--execution_uuid", + action="append", + help="Specify Execution uuid.", + default=None, + metavar="", + ) + + parser.add_argument( + "-t", + "--tensorboard", + action="append", + help="Specify path to tensorboard logs for the pipeline.", + metavar="" + ) + + parser.set_defaults(func=CmdRepoPush) diff --git a/cmflib/dvc_wrapper.py b/cmflib/dvc_wrapper.py index cd52b57b..26239362 100644 --- a/cmflib/dvc_wrapper.py +++ b/cmflib/dvc_wrapper.py @@ -474,3 +474,65 @@ def dvc_push(file_list: Optional[List[str]] = None) -> str: print(f"Unexpected {outs}") print(f"Unexpected {errs}") return commit + + +# Change the existing remote repo url +def git_modify_remote_url(git_url) -> str: + commit = "" + try: + process = subprocess.Popen(['git', 'remote', 'set-url', 'cmf_origin', f"{git_url}"], + stdout=subprocess.PIPE, + universal_newlines=True) + output, errs = process.communicate(timeout=60) + commit = output.strip() + + except Exception as err: + print(f"Unexpected {err}, {type(err)}") + if isinstance(object, subprocess.Popen): + process.kill() + outs, errs = process.communicate() + print(f"Unexpected {outs}") + print(f"Unexpected {errs}") + return commit + +# Pulling code from branch +def git_get_pull(branch_name: str) -> str: + process = subprocess.Popen(f'git pull cmf_origin {branch_name}', + cwd=None, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + return ( + stdout.decode('utf-8').strip() if stdout else '', + stderr.decode('utf-8').strip() if stderr else '', + process.returncode + ) + +# Pusing code inside branch +def git_get_push(branch_name: str) -> str: + process = subprocess.Popen(f'git push -u cmf_origin {branch_name}', + cwd=None, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + return ( + stdout.decode('utf-8').strip() if stdout else '', + stderr.decode('utf-8').strip() if stderr else '', + process.returncode + ) + +# Getting current branch +def git_get_branch() -> tuple: + process = subprocess.Popen('git branch --show-current', + cwd=None, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + return ( + stdout.decode('utf-8').strip() if stdout else '', + stderr.decode('utf-8').strip() if stderr else '', + process.returncode + ) diff --git a/cmflib/server_interface/server_interface.py b/cmflib/server_interface/server_interface.py index 3b3730c6..fecf1116 100644 --- a/cmflib/server_interface/server_interface.py +++ b/cmflib/server_interface/server_interface.py @@ -18,18 +18,18 @@ import json # This function posts mlmd data to mlmd_push api on cmf-server -def call_mlmd_push(json_payload, url, exec_id, pipeline_name): +def call_mlmd_push(json_payload, url, exec_uuid, pipeline_name): url_to_pass = f"{url}/mlmd_push" - json_data = {"id": exec_id, "json_payload": json_payload, "pipeline_name": pipeline_name} + json_data = {"exec_uuid": exec_uuid, "json_payload": json_payload, "pipeline_name": pipeline_name} response = requests.post(url_to_pass, json=json_data) # Post request # print("Status code -", response.status_code) return response # This function gets mlmd data from mlmd_pull api from cmf-server -def call_mlmd_pull(url, pipeline_name, exec_id): +def call_mlmd_pull(url, pipeline_name, exec_uuid): url_to_pass = f"{url}/mlmd_pull/{pipeline_name}" - response = requests.get(url_to_pass, json={"exec_id": exec_id}) # Get request + response = requests.get(url_to_pass, json={"exec_uuid": exec_uuid}) # Get request return response diff --git a/docs/cmf_client/cmf_client.md b/docs/cmf_client/cmf_client.md index ed13bb9a..258af6c2 100644 --- a/docs/cmf_client/cmf_client.md +++ b/docs/cmf_client/cmf_client.md @@ -2,7 +2,7 @@ # cmf ``` -Usage: cmf [-h] {init, artifact, metadata, execution, pipeline} +Usage: cmf [-h] {init, artifact, metadata, execution, pipeline, repo} ``` The `cmf` command is a comprehensive tool designed to initialize an artifact repository and perform various operations on artifacts, execution, pipeline and metadata. @@ -284,11 +284,11 @@ Usage: cmf metadata [-h] {pull,push,export} `cmf metadata` push, pull or export the metadata file to and from the cmf-server, respectively. ### cmf metadata pull ``` -Usage: cmf metadata pull [-h] -p [pipeline_name] -f [file_name] -e [exec_id] +Usage: cmf metadata pull [-h] -p [pipeline_name] -f [file_name] -e [exec_uuid] ``` `cmf metadata pull` command pulls the metadata file from the cmf-server to the user's local machine. ``` -cmf metadata pull -p 'pipeline-name' -f '/path/to/mlmd-file-name' -e 'execution_id' +cmf metadata pull -p 'pipeline-name' -f '/path/to/mlmd-file-name' -e 'execution_uuid' ``` Required Arguments ``` @@ -296,17 +296,17 @@ Required Arguments ``` Optional Arguments ``` --h, --help show this help message and exit. --e [exec_id], --execution [exec_id] Specify execution id. --f [file_name], --file_name [file_name] Specify mlmd file name with full path(either relative or absolute). +-h, --help show this help message and exit. +-e [exec_uuid], --execution_uuid [exec_uuid] Specify execution uuid. +-f [file_name], --file_name [file_name] Specify mlmd file name with full path(either relative or absolute). ``` ### cmf metadata push ``` -Usage: cmf metadata push [-h] -p [pipeline_name] -f [file_name] -e [exec_id] -t [tensorboard] +Usage: cmf metadata push [-h] -p [pipeline_name] -f [file_name] -e [exec_uuid] -t [tensorboard] ``` `cmf metadata push` command pushes the metadata file from the local machine to the cmf-server. ``` -cmf metadata push -p 'pipeline-name' -f '/path/to/mlmd-file-name' -e 'execution_id' -t '/path/to/tensorboard-log' +cmf metadata push -p 'pipeline-name' -f '/path/to/mlmd-file-name' -e 'execution_uuid' -t '/path/to/tensorboard-log' ``` Required Arguments ``` @@ -314,10 +314,10 @@ Required Arguments ``` Optional Arguments ``` - -h, --help show this help message and exit. - -f [file_name], --file_name [file_name] Specify mlmd file name. - -e [exec_id], --execution [exec_id] Specify execution id. - -t [tensorboard], --tensorboard [tensorboard] Specify path to tensorboard logs for the pipeline. + -h, --help show this help message and exit. + -f [file_name], --file_name [file_name] Specify mlmd file name. + -e [exec_uuid], --execution [exec_uuid] Specify execution uuid. + -t [tensorboard], --tensorboard [tensorboard] Specify path to tensorboard logs for the pipeline. ``` ### cmf metadata export ``` @@ -329,7 +329,7 @@ cmf metadata export -p 'pipeline-name' -j '/path/to/json-file-name' -f '/path/to ``` Required Arguments ``` --p [pipeline_name], --pipeline_name [pipeline_name] Specify Pipeline name. +-p [pipeline_name], --pipeline_name [pipeline_name] Specify Pipeline name. ``` Optional Arguments ``` @@ -345,21 +345,21 @@ Usage: cmf execution [-h] {list} `cmf execution` command to displays executions from the MLMD file. ### cmf executions list ``` -Usage: cmf execution list [-h] -p [pipeline_name] -f [file_name] -e [execution_id] +Usage: cmf execution list [-h] -p [pipeline_name] -f [file_name] -e [execution_uuid] ``` `cmf execution list` command to displays executions from the MLMD file with a few properties in a 7-column table, limited to 20 records per page. ``` -cmf execution list -p 'pipeline_name' -f '/path/to/mlmd-file-name' -e 'execution_id' +cmf execution list -p 'pipeline_name' -f '/path/to/mlmd-file-name' -e 'execution_uuid' ``` Required Arguments ``` - -p [pipeline_name], --pipeline-name [pipeline_name] Specify Pipeline name. + -p [pipeline_name], --pipeline-name [pipeline_name] Specify Pipeline name. ``` Optional Arguments ``` - -h, --help show this help message and exit. - --f [file_name], --file-name [file_name] Specify the absolute or relative path for the input MLMD file. - -e [exe_id], --execution_id [exe_id] Specify the execution id to retrieve execution. + -h, --help show this help message and exit. + --f [file_name], --file-name [file_name] Specify the absolute or relative path for the input MLMD file. + -e [exe_uuid], --execution_id [exe_uuid] Specify the execution uuid to retrieve execution. ``` ## cmf pipeline @@ -380,3 +380,46 @@ Optional Arguments -h, --help show this help message and exit. --f [file_name], --file-name [file_name] Specify the absolute or relative path for the input MLMD file. ``` + +## cmf repo +``` +Usage: cmf repo [-h] {push, pull} +``` +`cmf repo` command push and pull artifacts, metadata files, and source code to and from the user's artifact repository, cmf-server, and git respectively. +### cmf repo push +``` +Usage: cmf repo push [-h] -p [pipeline_name] -f [file_name] -e [exec_uuid] -t [tensorboard] +``` +`cmf repo push` command push artifacts, metadata files, and source code to the user's artifact repository, cmf-server, and git respectively. +``` +cmf repo push -p 'pipeline-name' -f '/path/to/mlmd-file-name' -e 'execution_uuid' -t 'tensorboard_log_path' +``` +Required Arguments +``` + -p [pipeline_name], --pipeline-name [pipeline_name] Specify Pipeline name. +``` +Optional Arguments +``` + -h, --help show this help message and exit. + -f [file_name], --file-name [file_name] Specify mlmd file name. + -e [exec_uuid], --execution_uuid [exec_uuid] Specify execution uuid. + -t [tensorboard], --tensorboard [tensorboard] Specify path to tensorboard logs for the pipeline. +``` +### cmf repo pull +``` +Usage: cmf repo pull [-h] -p [pipeline_name] -f [file_name] -e [exec_uuid] +``` +`cmf repo pull` command pull artifacts, metadata files, and source code from the user's artifact repository, cmf-server, and git respectively. +``` +cmf repo pull -p 'pipeline-name' -f '/path/to/mlmd-file-name' -e 'execution_uuid' +``` +Required Arguments +``` + -p [pipeline_name], --pipeline-name [pipeline_name] Specify Pipeline name. +``` +Optional Arguments +``` + -h, --help show this help message and exit. + -f [file_name], --file-name [file_name] Specify mlmd file name. + -e [exec_uuid], --execution_uuid [exec_uuid] Specify execution uuid. +``` diff --git a/examples/example-get-started/src/featurize.py b/examples/example-get-started/src/featurize.py index 3a9e594c..448471e2 100644 --- a/examples/example-get-started/src/featurize.py +++ b/examples/example-get-started/src/featurize.py @@ -108,6 +108,7 @@ def featurize(input_dir: str, output_dir: str) -> None: _ = metawriter.log_dataset(output_ds.train, "output") _ = metawriter.log_dataset(output_ds.test, "output") + metawriter.finalize() @click.command() diff --git a/examples/example-get-started/src/parse.py b/examples/example-get-started/src/parse.py index 87992ce4..ff890f44 100644 --- a/examples/example-get-started/src/parse.py +++ b/examples/example-get-started/src/parse.py @@ -77,6 +77,8 @@ def parse(input_file: str, output_dir: str) -> None: _ = metawriter.log_dataset(output_ds.train, "output") _ = metawriter.log_dataset(output_ds.test, "output") + # Automatically commits code, ensuring no need to manually commit before using the 'git repo push' command. + metawriter.finalize() @click.command() diff --git a/examples/example-get-started/src/test.py b/examples/example-get-started/src/test.py index ae55e032..5718087f 100644 --- a/examples/example-get-started/src/test.py +++ b/examples/example-get-started/src/test.py @@ -58,6 +58,7 @@ def test(model_dir: str, dataset_dir: str, output_dir: str) -> None: model_name="RandomForest_default" ) _ = metawriter.log_dataset(artifacts.dataset, "input") + metawriter.finalize() with open(artifacts.model, "rb") as fd: model = pickle.load(fd) diff --git a/examples/example-get-started/src/train.py b/examples/example-get-started/src/train.py index eb456e14..9a4d5e7d 100644 --- a/examples/example-get-started/src/train.py +++ b/examples/example-get-started/src/train.py @@ -72,6 +72,7 @@ def train(input_dir: str, output_dir: str) -> None: path=model_file, event="output", model_framework="SKlearn", model_type="RandomForestClassifier", model_name="RandomForestClassifier:default" ) + metawriter.finalize() @click.command() diff --git a/examples/example-get-started/test-data-slice.py b/examples/example-get-started/test-data-slice.py index a25d1c95..5b4692e7 100644 --- a/examples/example-get-started/test-data-slice.py +++ b/examples/example-get-started/test-data-slice.py @@ -87,3 +87,5 @@ def generate_dataset(): for label, content in df.iterrows(): if label == record: print(content) + +metawriter.finalize() \ No newline at end of file diff --git a/server/app/get_data.py b/server/app/get_data.py index 20e7d649..9613bd12 100644 --- a/server/app/get_data.py +++ b/server/app/get_data.py @@ -261,31 +261,35 @@ def create_unique_executions(server_store_path, req_info) -> str: for i in mlmd_data["Pipeline"]: i['stages']=[stage for stage in i['stages'] if stage['executions']!=[]] + for i in mlmd_data["Pipeline"]: - if len(i['stages']) == 0 : status="exists" else: cmf_merger.parse_json_to_mlmd( - json.dumps(mlmd_data), "/cmf-server/data/mlmd", "push", req_info["id"] + json.dumps(mlmd_data), "/cmf-server/data/mlmd", "push", req_info["exec_uuid"] ) status='success' return status -def get_mlmd_from_server(server_store_path: str, pipeline_name: str, exec_id: str): +def get_mlmd_from_server(server_store_path: str, pipeline_name: str, exec_uuid: str, dict_of_exe_ids: dict): query = cmfquery.CmfQuery(server_store_path) json_payload = None - df = pd.DataFrame() + flag=0 if(query.get_pipeline_id(pipeline_name)!=-1): # checks if pipeline name is available in mlmd - if exec_id != None: - exec_id = int(exec_id) - df = query.get_all_executions_by_ids_list([exec_id]) - if df.empty: - json_payload = "no_exec_id" + if exec_uuid != None: + dict_of_exe_ids = dict_of_exe_ids[pipeline_name] + for index, row in dict_of_exe_ids.iterrows(): + exec_uuid_list = row['Execution_uuid'].split(",") + if exec_uuid in exec_uuid_list: + flag=1 + break + if not flag: + json_payload = "no_exec_uuid" return json_payload - json_payload = query.dumptojson(pipeline_name, exec_id) + json_payload = query.dumptojson(pipeline_name, exec_uuid) return json_payload def get_lineage_data(server_store_path,pipeline_name,type,dict_of_art_ids,dict_of_exe_ids): diff --git a/server/app/main.py b/server/app/main.py index 9fe72a03..8e04a09d 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -123,7 +123,7 @@ async def mlmd_pull(info: Request, pipeline_name: str): req_info = await info.json() if os.path.exists(server_store_path): #json_payload values can be json data, NULL or no_exec_id. - json_payload= await async_api(get_mlmd_from_server, server_store_path, pipeline_name, req_info['exec_id']) + json_payload= await async_api(get_mlmd_from_server, server_store_path, pipeline_name, req_info['exec_uuid'], dict_of_exe_ids) else: raise HTTPException(status_code=413, detail=f"mlmd file not available on cmf-server.") if json_payload == None: diff --git a/ui/src/components/ExecutionTable/index.jsx b/ui/src/components/ExecutionTable/index.jsx index 0f86bdad..f0042f23 100644 --- a/ui/src/components/ExecutionTable/index.jsx +++ b/ui/src/components/ExecutionTable/index.jsx @@ -128,6 +128,11 @@ const ExecutionTable = ({ executions, onSort, onFilter }) => { } }; + const createUniqueUuids = (exe_uuid) =>{ + // Removing repeated execution uuid from executions. + return [...new Set(exe_uuid.split(","))].join(",") + } + return (
{ + Execution uuid { {expandedRow === index ? "-" : "+"} + {createUniqueUuids(data.Execution_uuid)} {data.Context_Type} {data.Execution} {data.Git_Repo} @@ -209,10 +216,14 @@ const ExecutionTable = ({ executions, onSort, onFilter }) => { return ( - {key} - - {value ? value : "Null"} - + {key !='Execution_uuid' && + <> + {key} + + {value ? value : "Null"} + + + } );