From 9aa68a3816b31f9dc1326d8821a6bc1e43224c0a Mon Sep 17 00:00:00 2001 From: Koen de Laat Date: Fri, 8 Sep 2023 14:10:28 +0200 Subject: [PATCH 1/4] Switch to use boto3 client instead of resources See also: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html --- .../client/services/entities_service.py | 44 ++++++++----------- src/igtcloud/client/services/utils/s3.py | 10 +++-- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/src/igtcloud/client/services/entities_service.py b/src/igtcloud/client/services/entities_service.py index 464b27d..cc599f1 100644 --- a/src/igtcloud/client/services/entities_service.py +++ b/src/igtcloud/client/services/entities_service.py @@ -116,12 +116,12 @@ def upload(self, filename: str, key: str = None, overwrite: bool = False, if callback: callback(file.file_size) return False - bucket = get_s3_bucket(file, 'PUT') + s3_client, bucket_name = get_s3_client(file, 'PUT') extra_args = {'ServerSideEncryption': 'AES256'} - kwargs = dict(Filename=abs_path, Key=file.key, ExtraArgs=extra_args) + kwargs = dict(Bucket=bucket_name, Filename=abs_path, Key=file.key, ExtraArgs=extra_args) if callback: kwargs['Callback'] = callback - bucket.upload_file(**kwargs) + s3_client.upload_file(**kwargs) file.is_completed = True file.is_new = False file.progress_in_bytes = file.file_size @@ -295,15 +295,15 @@ def download_file(file: File, destination_dir: os.PathLike, overwrite: bool = Tr if callback: callback(file.file_size) return False - bucket = get_s3_bucket(file, 'GET') + client, bucket_name = get_s3_client(file, 'GET') os.makedirs(os.path.abspath(os.path.dirname(destination)), exist_ok=True) - kwargs = dict(Key=file.key, Filename=destination) + kwargs = dict(Bucket=bucket_name, Key=file.key, Filename=destination) if callback: kwargs['Callback'] = callback - bucket.download_file(**kwargs) + client.download_file(**kwargs) if include_modified_date: - metadata = get_s3_file_metadata(file, 'GET') + metadata = get_s3_file_metadata(file) epoch = metadata['LastModified'].timestamp() os.utime(destination, (epoch, epoch)) @@ -313,16 +313,16 @@ def download_file(file: File, destination_dir: os.PathLike, overwrite: bool = Tr @retry(stop=(stop_after_attempt(3)), wait=wait_exponential(), retry=retry_if_exception_type(HTTPClientError), reraise=True) def download_fileobj(file: File, file_obj: io.IOBase): - bucket = get_s3_bucket(file, 'GET') - bucket.download_fileobj(Key=file.key, Fileobj=file_obj) + client, bucket_name = get_s3_client(file, 'GET') + client.download_fileobj(Bucket=bucket_name, Key=file.key, Fileobj=file_obj) def open_file(file: File, mode='rb', buffering=-1, **kwargs): if 'r' not in mode: raise RuntimeError("Mode should contain 'r'") binary = 'b' in mode - bucket = get_s3_bucket(file, 'GET') - s3_file = S3File(bucket.Object(file.key), mode=mode) + s3_client, bucket_name = get_s3_client(file, "GET") + s3_file = S3File(s3_client, bucket_name, file.key, mode=mode) if binary: if buffering == 0: return s3_file @@ -334,7 +334,7 @@ def open_file(file: File, mode='rb', buffering=-1, **kwargs): return io.TextIOWrapper(s3_file, **kwargs) -def get_s3_bucket(file: File, action: str): +def get_s3_client(file: File, action: str): creds = file.credentials(action) if not creds: raise RuntimeError("No valid credentials") @@ -342,23 +342,15 @@ def get_s3_bucket(file: File, action: str): aws_secret_access_key=creds.secret_key, aws_session_token=creds.session_token) creds_dict = {k: v for k, v in creds_dict.items() if k} - session = boto3.session.Session(**creds_dict) - s3 = session.resource('s3') - return s3.Bucket(creds.bucket) + client = boto3.client('s3', **creds_dict) + return client, creds.bucket -def get_s3_file_metadata(file: File, action: str): - creds = file.credentials(action) - if not creds: - raise RuntimeError("No valid credentials") - creds_dict = dict(aws_access_key_id=creds.access_key, - aws_secret_access_key=creds.secret_key, - aws_session_token=creds.session_token) - creds_dict = {k: v for k, v in creds_dict.items() if k} - session = boto3.session.Session(**creds_dict) - s3 = session.client('s3') - return s3.head_object(Bucket=creds.bucket, Key=file.key) +def get_s3_file_metadata(file: File): + s3_client, bucket_name = get_s3_client(file, 'GET') + + return s3_client.head_object(Bucket=bucket_name, Key=file.key) def file_upload_completed(service: EntitiesService, study: RootStudy, file: File): diff --git a/src/igtcloud/client/services/utils/s3.py b/src/igtcloud/client/services/utils/s3.py index 73f476d..f2abacb 100644 --- a/src/igtcloud/client/services/utils/s3.py +++ b/src/igtcloud/client/services/utils/s3.py @@ -3,13 +3,15 @@ class S3File(io.RawIOBase): - def __init__(self, s3_object, mode): - self._s3_object = s3_object + def __init__(self, s3_client, bucket, key, mode): + self._s3_client = s3_client + self._bucket = bucket + self._key = key self._position: int = 0 self.mode = mode def __repr__(self): - return "<%s s3_object=%r>" % (type(self).__name__, self._s3_object) + return "<%s s3_object=%s/%s>" % (type(self).__name__, self._bucket, self._key) def readinto(self, buffer) -> Optional[int]: data = self.read(len(buffer)) @@ -35,7 +37,7 @@ def read(self, size: int = -1) -> Optional[bytes]: range_header = "bytes=%d-%d" % (self._position, new_position - 1) self.seek(offset=size, whence=io.SEEK_CUR) - return self._s3_object.get(Range=range_header)["Body"].read() + return self._s3_client.get_object(Bucket=self._bucket, Key=self._key, Range=range_header)["Body"].read() def readable(self) -> bool: return True From 4c8f79d2f5d9a30ce96774ac64ce5780b5f72178 Mon Sep 17 00:00:00 2001 From: Koen de Laat Date: Mon, 11 Sep 2023 09:47:16 +0200 Subject: [PATCH 2/4] Share boto3 S3 clients between threads --- requirements.txt | 3 ++- src/igtcloud/client/services/entities_service.py | 16 ++++++++++++---- src/igtcloud/client/tools/download_institute.py | 6 +++++- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/requirements.txt b/requirements.txt index 65471e7..4ac9ad5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ urllib3 >= 1.25.3 boto3 tqdm Click >= 8.0.0 -jwt >= 1.3 \ No newline at end of file +jwt >= 1.3 +cachetools \ No newline at end of file diff --git a/src/igtcloud/client/services/entities_service.py b/src/igtcloud/client/services/entities_service.py index cc599f1..e025b88 100644 --- a/src/igtcloud/client/services/entities_service.py +++ b/src/igtcloud/client/services/entities_service.py @@ -2,11 +2,13 @@ import io import os import uuid +from threading import Lock from typing import Optional, Iterable, Callable, Any import boto3 import dateutil.parser from botocore.exceptions import HTTPClientError +from cachetools import cached, LRUCache from tenacity import retry, wait_exponential, retry_if_exception_type, stop_after_attempt from .auth.model.credentials import Credentials @@ -287,7 +289,8 @@ def get_file_creds(file: File, action: str) -> Optional[Credentials]: @retry(stop=(stop_after_attempt(3)), wait=wait_exponential(), retry=retry_if_exception_type(HTTPClientError), reraise=True) def download_file(file: File, destination_dir: os.PathLike, overwrite: bool = True, - callback: Callable[[int], None] = None, include_modified_date: bool = False): + callback: Callable[[int], None] = None, include_modified_date: bool = False, + client_kwargs: dict = None): if not file.is_completed: return False destination = os.path.abspath(os.path.join(destination_dir, file.file_name)) @@ -295,7 +298,7 @@ def download_file(file: File, destination_dir: os.PathLike, overwrite: bool = Tr if callback: callback(file.file_size) return False - client, bucket_name = get_s3_client(file, 'GET') + client, bucket_name = get_s3_client(file, 'GET', **client_kwargs) os.makedirs(os.path.abspath(os.path.dirname(destination)), exist_ok=True) kwargs = dict(Bucket=bucket_name, Key=file.key, Filename=destination) if callback: @@ -334,7 +337,12 @@ def open_file(file: File, mode='rb', buffering=-1, **kwargs): return io.TextIOWrapper(s3_file, **kwargs) -def get_s3_client(file: File, action: str): +@cached(cache=LRUCache(10), lock=Lock()) +def _cached_boto3_client(*args, **kwargs): + return boto3.client(*args, **kwargs) + + +def get_s3_client(file: File, action: str, **kwargs): creds = file.credentials(action) if not creds: raise RuntimeError("No valid credentials") @@ -342,7 +350,7 @@ def get_s3_client(file: File, action: str): aws_secret_access_key=creds.secret_key, aws_session_token=creds.session_token) creds_dict = {k: v for k, v in creds_dict.items() if k} - client = boto3.client('s3', **creds_dict) + client = _cached_boto3_client('s3', **creds_dict, **kwargs) return client, creds.bucket diff --git a/src/igtcloud/client/tools/download_institute.py b/src/igtcloud/client/tools/download_institute.py index dfc6925..9634bb3 100644 --- a/src/igtcloud/client/tools/download_institute.py +++ b/src/igtcloud/client/tools/download_institute.py @@ -5,6 +5,7 @@ from concurrent.futures import as_completed from typing import Callable, List +from botocore.config import Config from tqdm.auto import tqdm from .common import find_project_and_institutes @@ -90,9 +91,12 @@ def study_destination_fn(file: File) -> str: return os.path.join(study_destination, file.category) return study_destination + client_kwargs = dict(config=Config(max_pool_connections=max_workers_files or 10)) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor: fs = {executor.submit(file.download, study_destination_fn(file), overwrite=False, - include_modified_date=include_modified_date): file for file in files} + include_modified_date=include_modified_date, + client_kwargs=client_kwargs): file for file in files} study_folder = os.path.basename(study_destination) with tqdm(total=total_size, leave=False, desc=f"Study {study_folder}", unit='B', unit_scale=True, unit_divisor=1024) as pbar: From 92551fb331d604a9dc7f704ec56a7371c1f4c721 Mon Sep 17 00:00:00 2001 From: Koen de Laat Date: Mon, 11 Sep 2023 12:44:17 +0200 Subject: [PATCH 3/4] Improve upload performance --- src/igtcloud/client/services/entities_service.py | 9 +++++++-- src/igtcloud/client/tools/download_institute.py | 9 +++++---- src/igtcloud/client/tools/upload_project.py | 11 ++++++++--- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/igtcloud/client/services/entities_service.py b/src/igtcloud/client/services/entities_service.py index e025b88..a2c225a 100644 --- a/src/igtcloud/client/services/entities_service.py +++ b/src/igtcloud/client/services/entities_service.py @@ -100,7 +100,10 @@ def get_credentials(self, action): return self._credentials.get(action) def upload(self, filename: str, key: str = None, overwrite: bool = False, - callback: Callable[[int], None] = None, trigger_action: bool = True) -> bool: + callback: Callable[[int], None] = None, trigger_action: bool = True, + client_kwargs: dict = None) -> bool: + if client_kwargs is None: + client_kwargs = {} abs_path = os.path.abspath(filename) if not os.path.exists(abs_path): raise FileNotFoundError(f"File {filename} not found") @@ -118,7 +121,7 @@ def upload(self, filename: str, key: str = None, overwrite: bool = False, if callback: callback(file.file_size) return False - s3_client, bucket_name = get_s3_client(file, 'PUT') + s3_client, bucket_name = get_s3_client(file, 'PUT', **client_kwargs) extra_args = {'ServerSideEncryption': 'AES256'} kwargs = dict(Bucket=bucket_name, Filename=abs_path, Key=file.key, ExtraArgs=extra_args) if callback: @@ -291,6 +294,8 @@ def get_file_creds(file: File, action: str) -> Optional[Credentials]: def download_file(file: File, destination_dir: os.PathLike, overwrite: bool = True, callback: Callable[[int], None] = None, include_modified_date: bool = False, client_kwargs: dict = None): + if client_kwargs is None: + client_kwargs = {} if not file.is_completed: return False destination = os.path.abspath(os.path.join(destination_dir, file.file_name)) diff --git a/src/igtcloud/client/tools/download_institute.py b/src/igtcloud/client/tools/download_institute.py index 9634bb3..d116b6a 100644 --- a/src/igtcloud/client/tools/download_institute.py +++ b/src/igtcloud/client/tools/download_institute.py @@ -46,6 +46,8 @@ def download_institutes(project_name: str, institute_name: str, destination: str if callable(studies_filter): studies = list(filter(studies_filter, studies)) + client_kwargs = dict(config=Config(max_pool_connections=(max_workers_files or 4) * (max_workers_studies or 4))) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_studies or 4) as executor: with tqdm(total=len(studies), desc="Studies", unit='study') as pbar: fs = {executor.submit(_download_study, @@ -54,7 +56,7 @@ def download_institutes(project_name: str, institute_name: str, destination: str categories, files_filter, include_modified_date, - max_workers_files): study for study in studies} + max_workers_files, client_kwargs=client_kwargs): study for study in studies} for future in as_completed(fs): study = fs.pop(future) pbar.update() @@ -72,7 +74,8 @@ def _create_study_destination_path(destination: str, institute, study, folder_st return os.path.join(destination, institute.name, os.path.normpath(study_destination)) -def _download_study(study, study_destination, categories, files_filter, include_modified_date, max_workers_files: int): +def _download_study(study, study_destination, categories, files_filter, include_modified_date, max_workers_files: int, + client_kwargs: dict = None): study_json_file = os.path.join(study_destination, 'study.json') os.makedirs(os.path.dirname(study_json_file), exist_ok=True) with open(study_json_file, 'w') as f: @@ -91,8 +94,6 @@ def study_destination_fn(file: File) -> str: return os.path.join(study_destination, file.category) return study_destination - client_kwargs = dict(config=Config(max_pool_connections=max_workers_files or 10)) - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor: fs = {executor.submit(file.download, study_destination_fn(file), overwrite=False, include_modified_date=include_modified_date, diff --git a/src/igtcloud/client/tools/upload_project.py b/src/igtcloud/client/tools/upload_project.py index f48c0cb..c1b5a99 100644 --- a/src/igtcloud/client/tools/upload_project.py +++ b/src/igtcloud/client/tools/upload_project.py @@ -6,6 +6,8 @@ from getpass import getpass from typing import Tuple, List +from botocore.config import Config + from igtcloud.client.services.entities.model.project import Project from tqdm.auto import tqdm @@ -53,6 +55,8 @@ def upload_project(local_folder: str, project_name: str, institute_name: str = N if folder_structure not in ['flat', 'hierarchical']: folder_structure = 'flat' + client_kwargs = dict(config=Config(max_pool_connections=(max_workers_files or 4) * (max_workers_studies or 4))) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_studies or 4) as executor: for institute in institutes: logger.info(f"Uploading to institute: {institute.name}") @@ -79,7 +83,7 @@ def upload_project(local_folder: str, project_name: str, institute_name: str = N local_studies[study_dir] = patient_name fs = [executor.submit(upload_study, institute.study_type, study_folder, local_studies[study_folder], - institute.id, existing_studies, _password, max_workers_files) for study_folder in local_studies] + institute.id, existing_studies, _password, max_workers_files, client_kwargs=client_kwargs) for study_folder in local_studies] for f in tqdm(concurrent.futures.as_completed(fs), total=len(fs), desc="Studies", unit='study'): study, files_uploaded, files_skipped = f.result() @@ -162,7 +166,7 @@ def callback(x): def upload_study(study_type: str, study_folder: str, patient_name: str, institute_id: str, studies: CollectionWrapper[RootStudy], _submit_password: str = None, - max_workers_files: int = None) -> Tuple[RootStudy, List[str], List[str]]: + max_workers_files: int = None, client_kwargs: dict = None) -> Tuple[RootStudy, List[str], List[str]]: local_study = None study_cls = entities_service.study_type_classes.get(study_type) study_json_file = os.path.join(study_folder, 'study.json') @@ -219,7 +223,8 @@ def callback(x): key = os.path.relpath(file_path, study_folder).replace(os.path.sep, '/') size = os.path.getsize(file_path) pbar.total += size - fs[executor.submit(study.files.upload, file_path, key, callback=callback)] = (key, size) + fs[executor.submit(study.files.upload, file_path, key, callback=callback, + client_kwargs=client_kwargs)] = (key, size) for f in concurrent.futures.as_completed(fs.keys()): file, size = fs.pop(f) if f.result(): From b3fe80d3499703d3a1aad2215e49e3f86c316a66 Mon Sep 17 00:00:00 2001 From: Koen de Laat Date: Mon, 11 Sep 2023 14:13:48 +0200 Subject: [PATCH 4/4] Improve upload performance for annotations and project files --- src/igtcloud/client/tools/upload_project.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/igtcloud/client/tools/upload_project.py b/src/igtcloud/client/tools/upload_project.py index c1b5a99..b09d995 100644 --- a/src/igtcloud/client/tools/upload_project.py +++ b/src/igtcloud/client/tools/upload_project.py @@ -122,6 +122,8 @@ def upload_annotation_files(institutes, local_folder, max_workers_files): files_uploaded = list() files_skipped = list() + client_kwargs = dict(config=Config(max_pool_connections=max_workers_files or 4)) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor: fs = dict() with tqdm(total=0, leave=False, desc=f"Annotation file Upload", unit='B', unit_scale=True, @@ -143,14 +145,16 @@ def callback(x): logger.error("Annotation File JSON is not valid : %s" % e) return fs[executor.submit(study.annotations.upload, file_path, - annotation_path_json + "/" + annotation_file, callback=callback)] = ( + annotation_path_json + "/" + annotation_file, callback=callback, + client_kwargs=client_kwargs)] = ( s3_prefix_for_annotation_file, size) else: file_path = os.path.join(local_path, annotation_file) size = os.path.getsize(file_path) pbar.total += size fs[executor.submit(study.annotations.upload, file_path, - annotation_path + "/" + annotation_file, callback=callback)] = ( + annotation_path + "/" + annotation_file, callback=callback, + client_kwargs=client_kwargs)] = ( s3_prefix_for_annotation_file, size) for f in concurrent.futures.as_completed(fs.keys()): @@ -255,6 +259,8 @@ def upload_project_files(project: Project, files_folder: str, max_workers_files: files_uploaded = list() files_skipped = list() + client_kwargs = dict(config=Config(max_pool_connections=max_workers_files or 4)) + with ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor: with tqdm(total=0, desc="Project files", unit='B', unit_scale=True, unit_divisor=1024) as pbar: for root, _, files in os.walk(files_folder): @@ -263,7 +269,8 @@ def upload_project_files(project: Project, files_folder: str, max_workers_files: size = os.path.getsize(file_path) key = os.path.relpath(file_path, files_folder).replace(os.path.sep, '/') pbar.total += size - future = executor.submit(project.files.upload, file_path, key, callback=pbar.update) + future = executor.submit(project.files.upload, file_path, key, callback=pbar.update, + client_kwargs=client_kwargs) fs[future] = (file, size) for f in as_completed(fs.keys()):