Skip to content

Commit

Permalink
Merge pull request #33 from philips-labs/feature/s3_performance
Browse files Browse the repository at this point in the history
Improve S3 performance
  • Loading branch information
koendelaat authored Sep 11, 2023
2 parents 1e69144 + b3fe80d commit 91639bf
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 42 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ urllib3 >= 1.25.3
boto3
tqdm
Click >= 8.0.0
jwt >= 1.3
jwt >= 1.3
cachetools
61 changes: 33 additions & 28 deletions src/igtcloud/client/services/entities_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import io
import os
import uuid
from threading import Lock
from typing import Optional, Iterable, Callable, Any

import boto3
import dateutil.parser
from botocore.exceptions import HTTPClientError
from cachetools import cached, LRUCache
from tenacity import retry, wait_exponential, retry_if_exception_type, stop_after_attempt

from .auth.model.credentials import Credentials
Expand Down Expand Up @@ -98,7 +100,10 @@ def get_credentials(self, action):
return self._credentials.get(action)

def upload(self, filename: str, key: str = None, overwrite: bool = False,
callback: Callable[[int], None] = None, trigger_action: bool = True) -> bool:
callback: Callable[[int], None] = None, trigger_action: bool = True,
client_kwargs: dict = None) -> bool:
if client_kwargs is None:
client_kwargs = {}
abs_path = os.path.abspath(filename)
if not os.path.exists(abs_path):
raise FileNotFoundError(f"File {filename} not found")
Expand All @@ -116,12 +121,12 @@ def upload(self, filename: str, key: str = None, overwrite: bool = False,
if callback:
callback(file.file_size)
return False
bucket = get_s3_bucket(file, 'PUT')
s3_client, bucket_name = get_s3_client(file, 'PUT', **client_kwargs)
extra_args = {'ServerSideEncryption': 'AES256'}
kwargs = dict(Filename=abs_path, Key=file.key, ExtraArgs=extra_args)
kwargs = dict(Bucket=bucket_name, Filename=abs_path, Key=file.key, ExtraArgs=extra_args)
if callback:
kwargs['Callback'] = callback
bucket.upload_file(**kwargs)
s3_client.upload_file(**kwargs)
file.is_completed = True
file.is_new = False
file.progress_in_bytes = file.file_size
Expand Down Expand Up @@ -287,23 +292,26 @@ def get_file_creds(file: File, action: str) -> Optional[Credentials]:
@retry(stop=(stop_after_attempt(3)), wait=wait_exponential(),
retry=retry_if_exception_type(HTTPClientError), reraise=True)
def download_file(file: File, destination_dir: os.PathLike, overwrite: bool = True,
callback: Callable[[int], None] = None, include_modified_date: bool = False):
callback: Callable[[int], None] = None, include_modified_date: bool = False,
client_kwargs: dict = None):
if client_kwargs is None:
client_kwargs = {}
if not file.is_completed:
return False
destination = os.path.abspath(os.path.join(destination_dir, file.file_name))
if os.path.exists(destination) and os.path.getsize(destination) == file.file_size and not overwrite:
if callback:
callback(file.file_size)
return False
bucket = get_s3_bucket(file, 'GET')
client, bucket_name = get_s3_client(file, 'GET', **client_kwargs)
os.makedirs(os.path.abspath(os.path.dirname(destination)), exist_ok=True)
kwargs = dict(Key=file.key, Filename=destination)
kwargs = dict(Bucket=bucket_name, Key=file.key, Filename=destination)
if callback:
kwargs['Callback'] = callback
bucket.download_file(**kwargs)
client.download_file(**kwargs)

if include_modified_date:
metadata = get_s3_file_metadata(file, 'GET')
metadata = get_s3_file_metadata(file)
epoch = metadata['LastModified'].timestamp()
os.utime(destination, (epoch, epoch))

Expand All @@ -313,16 +321,16 @@ def download_file(file: File, destination_dir: os.PathLike, overwrite: bool = Tr
@retry(stop=(stop_after_attempt(3)), wait=wait_exponential(),
retry=retry_if_exception_type(HTTPClientError), reraise=True)
def download_fileobj(file: File, file_obj: io.IOBase):
bucket = get_s3_bucket(file, 'GET')
bucket.download_fileobj(Key=file.key, Fileobj=file_obj)
client, bucket_name = get_s3_client(file, 'GET')
client.download_fileobj(Bucket=bucket_name, Key=file.key, Fileobj=file_obj)


def open_file(file: File, mode='rb', buffering=-1, **kwargs):
if 'r' not in mode:
raise RuntimeError("Mode should contain 'r'")
binary = 'b' in mode
bucket = get_s3_bucket(file, 'GET')
s3_file = S3File(bucket.Object(file.key), mode=mode)
s3_client, bucket_name = get_s3_client(file, "GET")
s3_file = S3File(s3_client, bucket_name, file.key, mode=mode)
if binary:
if buffering == 0:
return s3_file
Expand All @@ -334,31 +342,28 @@ def open_file(file: File, mode='rb', buffering=-1, **kwargs):
return io.TextIOWrapper(s3_file, **kwargs)


def get_s3_bucket(file: File, action: str):
creds = file.credentials(action)
if not creds:
raise RuntimeError("No valid credentials")
creds_dict = dict(aws_access_key_id=creds.access_key,
aws_secret_access_key=creds.secret_key,
aws_session_token=creds.session_token)
creds_dict = {k: v for k, v in creds_dict.items() if k}
session = boto3.session.Session(**creds_dict)
s3 = session.resource('s3')
return s3.Bucket(creds.bucket)
@cached(cache=LRUCache(10), lock=Lock())
def _cached_boto3_client(*args, **kwargs):
return boto3.client(*args, **kwargs)


def get_s3_file_metadata(file: File, action: str):
def get_s3_client(file: File, action: str, **kwargs):
creds = file.credentials(action)
if not creds:
raise RuntimeError("No valid credentials")
creds_dict = dict(aws_access_key_id=creds.access_key,
aws_secret_access_key=creds.secret_key,
aws_session_token=creds.session_token)
creds_dict = {k: v for k, v in creds_dict.items() if k}
session = boto3.session.Session(**creds_dict)
s3 = session.client('s3')
client = _cached_boto3_client('s3', **creds_dict, **kwargs)

return client, creds.bucket


def get_s3_file_metadata(file: File):
s3_client, bucket_name = get_s3_client(file, 'GET')

return s3.head_object(Bucket=creds.bucket, Key=file.key)
return s3_client.head_object(Bucket=bucket_name, Key=file.key)


def file_upload_completed(service: EntitiesService, study: RootStudy, file: File):
Expand Down
10 changes: 6 additions & 4 deletions src/igtcloud/client/services/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@


class S3File(io.RawIOBase):
def __init__(self, s3_object, mode):
self._s3_object = s3_object
def __init__(self, s3_client, bucket, key, mode):
self._s3_client = s3_client
self._bucket = bucket
self._key = key
self._position: int = 0
self.mode = mode

def __repr__(self):
return "<%s s3_object=%r>" % (type(self).__name__, self._s3_object)
return "<%s s3_object=%s/%s>" % (type(self).__name__, self._bucket, self._key)

def readinto(self, buffer) -> Optional[int]:
data = self.read(len(buffer))
Expand All @@ -35,7 +37,7 @@ def read(self, size: int = -1) -> Optional[bytes]:
range_header = "bytes=%d-%d" % (self._position, new_position - 1)
self.seek(offset=size, whence=io.SEEK_CUR)

return self._s3_object.get(Range=range_header)["Body"].read()
return self._s3_client.get_object(Bucket=self._bucket, Key=self._key, Range=range_header)["Body"].read()

def readable(self) -> bool:
return True
Expand Down
11 changes: 8 additions & 3 deletions src/igtcloud/client/tools/download_institute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from concurrent.futures import as_completed
from typing import Callable, List

from botocore.config import Config
from tqdm.auto import tqdm

from .common import find_project_and_institutes
Expand Down Expand Up @@ -45,6 +46,8 @@ def download_institutes(project_name: str, institute_name: str, destination: str
if callable(studies_filter):
studies = list(filter(studies_filter, studies))

client_kwargs = dict(config=Config(max_pool_connections=(max_workers_files or 4) * (max_workers_studies or 4)))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_studies or 4) as executor:
with tqdm(total=len(studies), desc="Studies", unit='study') as pbar:
fs = {executor.submit(_download_study,
Expand All @@ -53,7 +56,7 @@ def download_institutes(project_name: str, institute_name: str, destination: str
categories,
files_filter,
include_modified_date,
max_workers_files): study for study in studies}
max_workers_files, client_kwargs=client_kwargs): study for study in studies}
for future in as_completed(fs):
study = fs.pop(future)
pbar.update()
Expand All @@ -71,7 +74,8 @@ def _create_study_destination_path(destination: str, institute, study, folder_st
return os.path.join(destination, institute.name, os.path.normpath(study_destination))


def _download_study(study, study_destination, categories, files_filter, include_modified_date, max_workers_files: int):
def _download_study(study, study_destination, categories, files_filter, include_modified_date, max_workers_files: int,
client_kwargs: dict = None):
study_json_file = os.path.join(study_destination, 'study.json')
os.makedirs(os.path.dirname(study_json_file), exist_ok=True)
with open(study_json_file, 'w') as f:
Expand All @@ -92,7 +96,8 @@ def study_destination_fn(file: File) -> str:

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor:
fs = {executor.submit(file.download, study_destination_fn(file), overwrite=False,
include_modified_date=include_modified_date): file for file in files}
include_modified_date=include_modified_date,
client_kwargs=client_kwargs): file for file in files}
study_folder = os.path.basename(study_destination)
with tqdm(total=total_size, leave=False, desc=f"Study {study_folder}", unit='B', unit_scale=True,
unit_divisor=1024) as pbar:
Expand Down
24 changes: 18 additions & 6 deletions src/igtcloud/client/tools/upload_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from getpass import getpass
from typing import Tuple, List

from botocore.config import Config

from igtcloud.client.services.entities.model.project import Project
from tqdm.auto import tqdm

Expand Down Expand Up @@ -53,6 +55,8 @@ def upload_project(local_folder: str, project_name: str, institute_name: str = N
if folder_structure not in ['flat', 'hierarchical']:
folder_structure = 'flat'

client_kwargs = dict(config=Config(max_pool_connections=(max_workers_files or 4) * (max_workers_studies or 4)))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_studies or 4) as executor:
for institute in institutes:
logger.info(f"Uploading to institute: {institute.name}")
Expand All @@ -79,7 +83,7 @@ def upload_project(local_folder: str, project_name: str, institute_name: str = N
local_studies[study_dir] = patient_name

fs = [executor.submit(upload_study, institute.study_type, study_folder, local_studies[study_folder],
institute.id, existing_studies, _password, max_workers_files) for study_folder in local_studies]
institute.id, existing_studies, _password, max_workers_files, client_kwargs=client_kwargs) for study_folder in local_studies]

for f in tqdm(concurrent.futures.as_completed(fs), total=len(fs), desc="Studies", unit='study'):
study, files_uploaded, files_skipped = f.result()
Expand Down Expand Up @@ -118,6 +122,8 @@ def upload_annotation_files(institutes, local_folder, max_workers_files):
files_uploaded = list()
files_skipped = list()

client_kwargs = dict(config=Config(max_pool_connections=max_workers_files or 4))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor:
fs = dict()
with tqdm(total=0, leave=False, desc=f"Annotation file Upload", unit='B', unit_scale=True,
Expand All @@ -139,14 +145,16 @@ def callback(x):
logger.error("Annotation File JSON is not valid : %s" % e)
return
fs[executor.submit(study.annotations.upload, file_path,
annotation_path_json + "/" + annotation_file, callback=callback)] = (
annotation_path_json + "/" + annotation_file, callback=callback,
client_kwargs=client_kwargs)] = (
s3_prefix_for_annotation_file, size)
else:
file_path = os.path.join(local_path, annotation_file)
size = os.path.getsize(file_path)
pbar.total += size
fs[executor.submit(study.annotations.upload, file_path,
annotation_path + "/" + annotation_file, callback=callback)] = (
annotation_path + "/" + annotation_file, callback=callback,
client_kwargs=client_kwargs)] = (
s3_prefix_for_annotation_file, size)

for f in concurrent.futures.as_completed(fs.keys()):
Expand All @@ -162,7 +170,7 @@ def callback(x):

def upload_study(study_type: str, study_folder: str, patient_name: str, institute_id: str,
studies: CollectionWrapper[RootStudy], _submit_password: str = None,
max_workers_files: int = None) -> Tuple[RootStudy, List[str], List[str]]:
max_workers_files: int = None, client_kwargs: dict = None) -> Tuple[RootStudy, List[str], List[str]]:
local_study = None
study_cls = entities_service.study_type_classes.get(study_type)
study_json_file = os.path.join(study_folder, 'study.json')
Expand Down Expand Up @@ -219,7 +227,8 @@ def callback(x):
key = os.path.relpath(file_path, study_folder).replace(os.path.sep, '/')
size = os.path.getsize(file_path)
pbar.total += size
fs[executor.submit(study.files.upload, file_path, key, callback=callback)] = (key, size)
fs[executor.submit(study.files.upload, file_path, key, callback=callback,
client_kwargs=client_kwargs)] = (key, size)
for f in concurrent.futures.as_completed(fs.keys()):
file, size = fs.pop(f)
if f.result():
Expand Down Expand Up @@ -250,6 +259,8 @@ def upload_project_files(project: Project, files_folder: str, max_workers_files:
files_uploaded = list()
files_skipped = list()

client_kwargs = dict(config=Config(max_pool_connections=max_workers_files or 4))

with ThreadPoolExecutor(max_workers=max_workers_files or 4) as executor:
with tqdm(total=0, desc="Project files", unit='B', unit_scale=True, unit_divisor=1024) as pbar:
for root, _, files in os.walk(files_folder):
Expand All @@ -258,7 +269,8 @@ def upload_project_files(project: Project, files_folder: str, max_workers_files:
size = os.path.getsize(file_path)
key = os.path.relpath(file_path, files_folder).replace(os.path.sep, '/')
pbar.total += size
future = executor.submit(project.files.upload, file_path, key, callback=pbar.update)
future = executor.submit(project.files.upload, file_path, key, callback=pbar.update,
client_kwargs=client_kwargs)
fs[future] = (file, size)

for f in as_completed(fs.keys()):
Expand Down

0 comments on commit 91639bf

Please sign in to comment.