Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two new features: multi_threaded and size_only #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 126 additions & 31 deletions download_abide_preproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,88 @@
[-lt <less_than>] [-gt <greater_than>]
[-x <sex>] [-t <site>]
"""
import concurrent
from concurrent.futures import ThreadPoolExecutor


def download_progress(count, block_size, total_size, file_path):
"""
Show progress of download, callback function for urllib.request.urlretrieve
:param count: number of blocks downloaded
:param block_size: size of each block
:param total_size: total size of file
:param file_path: path to file
:return: None
"""
import time
global start_time
if count == 0:
start_time = time.time()
return
duration = time.time() - start_time
progress_size = int(count * block_size)
speed = int(progress_size / (1024 * duration))
percent = int(count * block_size * 100 / total_size)
print(f"\r{os.path.basename(file_path)}...{percent}% {speed} KB/s", end="")


def download_file_from_s3(s3_prefix, s3_path, out_dir, compute_size_only=False):
"""
Download a file from S3

:param s3_prefix: url prefix for S3 bucket
:param s3_path: url path for S3 bucket
:param out_dir: output directory
:param compute_size_only: whether to compute size only without downloading
:return: file size in bytes
"""
import urllib.request as request
rel_path = s3_path.lstrip(s3_prefix)

file_info = request.urlopen(s3_path).info()
file_size = int(file_info["Content-Length"])

download_file = os.path.join(out_dir, rel_path)

if os.path.exists(download_file):
try:
# Check if file is already downloaded
if os.path.getsize(download_file) == file_size:
print(f"\nFile already downloaded: {download_file}\n")
return file_size
else:
# Remove partially downloaded file
os.remove(download_file)
except:
# Remove partially downloaded file
os.remove(download_file)

if not compute_size_only:
# Actual download process
download_dir = os.path.dirname(download_file)
os.makedirs(download_dir, exist_ok=True)
try:
print(f"\nDownloading: {download_file}\n")
callback = lambda count, block_size, total_size: (
download_progress(count, block_size, total_size, file_path=download_file)
)
request.urlretrieve(s3_path, download_file, callback)
except KeyboardInterrupt:
# Remove partially downloaded file if user interrupts
if os.path.exists(download_file):
os.remove(download_file)
print(f"\nUser interrupted download, removed {download_file}\n")
raise RuntimeError(f"User interrupted download, removed {download_file}")
except Exception as exc:
# Download error
print(f"\nError downloading {download_file} from {s3_path}: {exc}\n")
raise RuntimeError(f'Error retrieving the file {download_file} from {s3_path}: {exc}')
return file_size


# Main collect and download function
def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, greater_than, site, sex, diagnosis):
def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, greater_than, site, sex, diagnosis,
compute_size_only=False, multi_threaded=True):
"""

Function to collect and download images from the ABIDE preprocessed
Expand All @@ -45,23 +123,16 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre
diagnosis : string
'asd', 'tdc', or 'both' corresponding to the diagnosis of the
participants for whom data should be downloaded
compute_size_only : bool
whether to compute the size of the download only
multi_threaded : bool
whether to use multiple threads to download files

Returns
-------
None
this function does not return a value; it downloads data from
S3 to a local directory

:param derivative:
:param pipeline:
:param strategy:
:param out_dir:
:param less_than:
:param greater_than:
:param site:
:param sex:
:param diagnosis:
:return:
"""

# Import packages
Expand All @@ -70,7 +141,7 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre

# Init variables
mean_fd_thresh = 0.2
s3_prefix = 'https://s3.amazonaws.com/fcp-indi/data/Projects/'\
s3_prefix = 'https://s3.amazonaws.com/fcp-indi/data/Projects/' \
'ABIDE_Initiative'
s3_pheno_path = '/'.join([s3_prefix, 'Phenotypic_V1_0b_preprocessed1.csv'])

Expand Down Expand Up @@ -158,26 +229,46 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre
else:
continue

# And download the items
# And download the items or compute the size
num_cores = os.cpu_count()
max_workers = max(1, num_cores - 2) # ensure at least 1 worker
print(f"Using {max_workers} workers for download")

total_size = 0
total_num_files = len(s3_paths)
for path_idx, s3_path in enumerate(s3_paths):
rel_path = s3_path.lstrip(s3_prefix)
download_file = os.path.join(out_dir, rel_path)
download_dir = os.path.dirname(download_file)
if not os.path.exists(download_dir):
os.makedirs(download_dir)
try:
if not os.path.exists(download_file):
print('Retrieving: {0}'.format(download_file))
request.urlretrieve(s3_path, download_file)
print('{0:3f}% percent complete'.format(100*(float(path_idx+1)/total_num_files)))
path_idx = 0
if multi_threaded:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(download_file_from_s3, s3_prefix, s3_path, out_dir, compute_size_only) for
s3_path in s3_paths]
errors = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
if result is not None:
total_size += result
print(
f"\nComputed size of {path_idx + 1}/{total_num_files} files, total size: {total_size / 1e9} GB, expected "
f"total size: {total_size * total_num_files / (1e9 * (path_idx + 1))} GB\n")
path_idx += 1
except Exception as exc:
errors.append(exc)
total_num_files -= 1
if len(errors) > 0:
print('\nThe following errors were encountered while downloading files:\n')
for error in errors:
print(error)
else:
for path_idx, s3_path in enumerate(s3_paths):
result = download_file_from_s3(s3_prefix, s3_path, out_dir, compute_size_only)
if result is not None:
total_size += result
else:
print('File {0} already exists, skipping...'.format(download_file))
except Exception as exc:
print('There was a problem downloading {0}.\n Check input arguments and try again.'.format(s3_path))
print('\nFailed to download {0}\n\n'.format(s3_path))
total_num_files -= 1

# Print all done
print('Done!')
print('\nFinished handling {0} files, totaling {1:3f} GB'.format(total_num_files, total_size / 1e9))


# Make module executable
Expand Down Expand Up @@ -218,7 +309,10 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre
help='Site of interest to download from (e.g. \'Caltech\'')
parser.add_argument('-x', '--sex', nargs=1, required=False, type=str,
help='Participant sex of interest to download only (e.g. \'M\' or \'F\')')

parser.add_argument('-so', '--size_only', required=False, default=False, action='store_true',
help='Only compute the size of the download, do not actually download')
parser.add_argument('-m', '--multi_threaded', required=False, default=True, action='store_true',
help='Use multiple threads to download files in parallel')
# Parse and gather arguments
args = parser.parse_args()

Expand Down Expand Up @@ -277,4 +371,5 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre

# Call the collect and download routine
collect_and_download(desired_derivative, desired_pipeline, desired_strategy, download_data_dir, desired_age_max,
desired_age_min, desired_site, desired_sex, desired_diagnosis)
desired_age_min, desired_site, desired_sex, desired_diagnosis,
compute_size_only=args.size_only, multi_threaded=args.multi_threaded)