diff --git a/download_abide_preproc.py b/download_abide_preproc.py index d1bd809..98260db 100644 --- a/download_abide_preproc.py +++ b/download_abide_preproc.py @@ -15,10 +15,88 @@ [-lt ] [-gt ] [-x ] [-t ] """ +import concurrent +from concurrent.futures import ThreadPoolExecutor + + +def download_progress(count, block_size, total_size, file_path): + """ + Show progress of download, callback function for urllib.request.urlretrieve + :param count: number of blocks downloaded + :param block_size: size of each block + :param total_size: total size of file + :param file_path: path to file + :return: None + """ + import time + global start_time + if count == 0: + start_time = time.time() + return + duration = time.time() - start_time + progress_size = int(count * block_size) + speed = int(progress_size / (1024 * duration)) + percent = int(count * block_size * 100 / total_size) + print(f"\r{os.path.basename(file_path)}...{percent}% {speed} KB/s", end="") + + +def download_file_from_s3(s3_prefix, s3_path, out_dir, compute_size_only=False): + """ + Download a file from S3 + + :param s3_prefix: url prefix for S3 bucket + :param s3_path: url path for S3 bucket + :param out_dir: output directory + :param compute_size_only: whether to compute size only without downloading + :return: file size in bytes + """ + import urllib.request as request + rel_path = s3_path.lstrip(s3_prefix) + + file_info = request.urlopen(s3_path).info() + file_size = int(file_info["Content-Length"]) + + download_file = os.path.join(out_dir, rel_path) + + if os.path.exists(download_file): + try: + # Check if file is already downloaded + if os.path.getsize(download_file) == file_size: + print(f"\nFile already downloaded: {download_file}\n") + return file_size + else: + # Remove partially downloaded file + os.remove(download_file) + except: + # Remove partially downloaded file + os.remove(download_file) + + if not compute_size_only: + # Actual download process + download_dir = os.path.dirname(download_file) + os.makedirs(download_dir, exist_ok=True) + try: + print(f"\nDownloading: {download_file}\n") + callback = lambda count, block_size, total_size: ( + download_progress(count, block_size, total_size, file_path=download_file) + ) + request.urlretrieve(s3_path, download_file, callback) + except KeyboardInterrupt: + # Remove partially downloaded file if user interrupts + if os.path.exists(download_file): + os.remove(download_file) + print(f"\nUser interrupted download, removed {download_file}\n") + raise RuntimeError(f"User interrupted download, removed {download_file}") + except Exception as exc: + # Download error + print(f"\nError downloading {download_file} from {s3_path}: {exc}\n") + raise RuntimeError(f'Error retrieving the file {download_file} from {s3_path}: {exc}') + return file_size # Main collect and download function -def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, greater_than, site, sex, diagnosis): +def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, greater_than, site, sex, diagnosis, + compute_size_only=False, multi_threaded=True): """ Function to collect and download images from the ABIDE preprocessed @@ -45,23 +123,16 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre diagnosis : string 'asd', 'tdc', or 'both' corresponding to the diagnosis of the participants for whom data should be downloaded + compute_size_only : bool + whether to compute the size of the download only + multi_threaded : bool + whether to use multiple threads to download files Returns ------- None this function does not return a value; it downloads data from S3 to a local directory - - :param derivative: - :param pipeline: - :param strategy: - :param out_dir: - :param less_than: - :param greater_than: - :param site: - :param sex: - :param diagnosis: - :return: """ # Import packages @@ -70,7 +141,7 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre # Init variables mean_fd_thresh = 0.2 - s3_prefix = 'https://s3.amazonaws.com/fcp-indi/data/Projects/'\ + s3_prefix = 'https://s3.amazonaws.com/fcp-indi/data/Projects/' \ 'ABIDE_Initiative' s3_pheno_path = '/'.join([s3_prefix, 'Phenotypic_V1_0b_preprocessed1.csv']) @@ -158,26 +229,46 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre else: continue - # And download the items + # And download the items or compute the size + num_cores = os.cpu_count() + max_workers = max(1, num_cores - 2) # ensure at least 1 worker + print(f"Using {max_workers} workers for download") + + total_size = 0 total_num_files = len(s3_paths) - for path_idx, s3_path in enumerate(s3_paths): - rel_path = s3_path.lstrip(s3_prefix) - download_file = os.path.join(out_dir, rel_path) - download_dir = os.path.dirname(download_file) - if not os.path.exists(download_dir): - os.makedirs(download_dir) - try: - if not os.path.exists(download_file): - print('Retrieving: {0}'.format(download_file)) - request.urlretrieve(s3_path, download_file) - print('{0:3f}% percent complete'.format(100*(float(path_idx+1)/total_num_files))) + path_idx = 0 + if multi_threaded: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(download_file_from_s3, s3_prefix, s3_path, out_dir, compute_size_only) for + s3_path in s3_paths] + errors = [] + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + if result is not None: + total_size += result + print( + f"\nComputed size of {path_idx + 1}/{total_num_files} files, total size: {total_size / 1e9} GB, expected " + f"total size: {total_size * total_num_files / (1e9 * (path_idx + 1))} GB\n") + path_idx += 1 + except Exception as exc: + errors.append(exc) + total_num_files -= 1 + if len(errors) > 0: + print('\nThe following errors were encountered while downloading files:\n') + for error in errors: + print(error) + else: + for path_idx, s3_path in enumerate(s3_paths): + result = download_file_from_s3(s3_prefix, s3_path, out_dir, compute_size_only) + if result is not None: + total_size += result else: - print('File {0} already exists, skipping...'.format(download_file)) - except Exception as exc: - print('There was a problem downloading {0}.\n Check input arguments and try again.'.format(s3_path)) + print('\nFailed to download {0}\n\n'.format(s3_path)) + total_num_files -= 1 # Print all done - print('Done!') + print('\nFinished handling {0} files, totaling {1:3f} GB'.format(total_num_files, total_size / 1e9)) # Make module executable @@ -218,7 +309,10 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre help='Site of interest to download from (e.g. \'Caltech\'') parser.add_argument('-x', '--sex', nargs=1, required=False, type=str, help='Participant sex of interest to download only (e.g. \'M\' or \'F\')') - + parser.add_argument('-so', '--size_only', required=False, default=False, action='store_true', + help='Only compute the size of the download, do not actually download') + parser.add_argument('-m', '--multi_threaded', required=False, default=True, action='store_true', + help='Use multiple threads to download files in parallel') # Parse and gather arguments args = parser.parse_args() @@ -277,4 +371,5 @@ def collect_and_download(derivative, pipeline, strategy, out_dir, less_than, gre # Call the collect and download routine collect_and_download(desired_derivative, desired_pipeline, desired_strategy, download_data_dir, desired_age_max, - desired_age_min, desired_site, desired_sex, desired_diagnosis) + desired_age_min, desired_site, desired_sex, desired_diagnosis, + compute_size_only=args.size_only, multi_threaded=args.multi_threaded)