diff --git a/src/igtcloud/client/services/entities_service.py b/src/igtcloud/client/services/entities_service.py index 52614c1..464b27d 100644 --- a/src/igtcloud/client/services/entities_service.py +++ b/src/igtcloud/client/services/entities_service.py @@ -250,7 +250,8 @@ def get_annotation_files(service: EntitiesService, study: RootStudy) -> FilesCol data_store['annotations'] = FilesCollectionWrapper(s3_prefix=study.s3_prefix + 'annotations/', category='annotations', f_auth=lambda action, prefix: s3_creds(action, prefix), - f_init=lambda: get_aux_files(service, study, 'annotations')) + f_init=lambda: get_aux_files(service, study, 'annotations'), + f_extend=lambda x: x) return data_store['annotations'] diff --git a/src/igtcloud/client/tools/cli.py b/src/igtcloud/client/tools/cli.py index 38b9298..25a1e11 100644 --- a/src/igtcloud/client/tools/cli.py +++ b/src/igtcloud/client/tools/cli.py @@ -170,13 +170,22 @@ def _get_domain(domain, environment): type=click.Choice(['flat', 'hierarchical'], case_sensitive=False), help='Folder structure of the data to be uploaded.' ) +@click.option( + '--category', + default=None, + type=click.Choice(['annotations'], case_sensitive=False), + help='Uploads annotation files' +) def upload(local_folder, project, institute, environment, domain, user, submit, debug, concurrent_studies, - concurrent_files, folder_structure): + concurrent_files, folder_structure, category): """Upload data to Philips Interventional Cloud. \b This tool will upload all files in LOCAL_FOLDER to project PROJECT. - the folder structure should be LOCAL_FOLDER / INSTITUTE / / / / < study_id > / Annotation Folder upload structure """ if debug: logging.getLogger('igtcloud.client').setLevel(logging.DEBUG) @@ -192,7 +201,7 @@ def upload(local_folder, project, institute, environment, domain, user, submit, with smart_auth(domain, username=user) as auth: set_auth(auth) logger.info(f"Using url: {auth.domain}") - upload_project(local_folder, project, institute, submit, concurrent_studies, concurrent_files, folder_structure) + upload_project(local_folder, project, institute, submit, concurrent_studies, concurrent_files, folder_structure, category) @click.command(short_help="Login to Philips Interventional Cloud") diff --git a/src/igtcloud/client/tools/upload_project.py b/src/igtcloud/client/tools/upload_project.py index 73134bd..f48c0cb 100644 --- a/src/igtcloud/client/tools/upload_project.py +++ b/src/igtcloud/client/tools/upload_project.py @@ -21,7 +21,8 @@ def upload_project(local_folder: str, project_name: str, institute_name: str = None, submit: bool = False, - max_workers_studies: int = None, max_workers_files: int = None, folder_structure: str = None): + max_workers_studies: int = None, max_workers_files: int = None, folder_structure: str = None, + category: str = None): project, institutes = find_project_and_institutes(project_name, institute_name) if not project and not institutes: @@ -32,6 +33,9 @@ def upload_project(local_folder: str, project_name: str, institute_name: str = N if submit: _password = getpass("For electronic record state it is required to reenter the password") + if category is not None and category.lower() == "annotations": + return upload_annotation_files(institutes, local_folder, max_workers_files) + if project: # Project level file upload when there is a "files" folder in the root directory files_folder = os.path.join(local_folder, 'files') @@ -83,6 +87,79 @@ def upload_project(local_folder: str, project_name: str, institute_name: str = N f"files_skipped: {len(files_skipped)}") +def upload_annotation_files(institutes, local_folder, max_workers_files): + # Annotation file upload + annotation_files = [] + if os.path.isdir(local_folder): + for root, dirs, local_files in os.walk(local_folder): + local_path = root + annotation_files.extend(local_files) + else: + logger.error("Not a valid Directory") + initial_path = local_path.split(os.sep) + if "---" in initial_path[2]: + study_id_human_readable = initial_path[2].replace("---", "/") + annotation_paths = local_path.split(initial_path[2] + os.sep) + annotation_path = annotation_paths[1] + else: + study_id_human_readable = initial_path[2] + "/" + initial_path[3] + annotation_paths = local_path.split(initial_path[3] + os.sep) + annotation_path = annotation_paths[1] + for institute in institutes: + for studies in institute.studies: + if studies.study_id_human_readable == study_id_human_readable: + s3_prefix_for_annotation_file = studies.s3_prefix + hospital_id = studies.hospital_id + study_id = studies.study_database_id + break + + study = entities_service.get_study(hospital_id=hospital_id, study_id=study_id) + + files_uploaded = list() + files_skipped = list() + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor: + fs = dict() + with tqdm(total=0, leave=False, desc=f"Annotation file Upload", unit='B', unit_scale=True, + unit_divisor=1024) as pbar: + def callback(x): + pbar.update(x) + + for annotation_file in annotation_files: + if annotation_file.endswith(".json"): + try: + local_path_json = local_path.replace(os.sep + "name", "") + annotation_path_json = annotation_path.replace(os.sep + "name", "") + file_path = os.path.join(local_path_json, annotation_file) + size = os.path.getsize(file_path) + pbar.total += size + with open(os.path.abspath(file_path), "r") as file: + json.loads(file.read()) + except ValueError as e: + logger.error("Annotation File JSON is not valid : %s" % e) + return + fs[executor.submit(study.annotations.upload, file_path, + annotation_path_json + "/" + annotation_file, callback=callback)] = ( + s3_prefix_for_annotation_file, size) + else: + file_path = os.path.join(local_path, annotation_file) + size = os.path.getsize(file_path) + pbar.total += size + fs[executor.submit(study.annotations.upload, file_path, + annotation_path + "/" + annotation_file, callback=callback)] = ( + s3_prefix_for_annotation_file, size) + + for f in concurrent.futures.as_completed(fs.keys()): + file, size = fs.pop(f) + if f.result(): + files_uploaded.append(file) + else: + files_skipped.append(file) + logger.info(f"files_uploaded: {len(files_uploaded)}, " + f"files_skipped: {len(files_skipped)}") + return + + def upload_study(study_type: str, study_folder: str, patient_name: str, institute_id: str, studies: CollectionWrapper[RootStudy], _submit_password: str = None, max_workers_files: int = None) -> Tuple[RootStudy, List[str], List[str]]: