diff --git a/hfs/prop1/gen_blobs_1.py b/hfs/prop1/gen_blobs_1.py index 6121c85..5fc7ab9 100644 --- a/hfs/prop1/gen_blobs_1.py +++ b/hfs/prop1/gen_blobs_1.py @@ -74,7 +74,7 @@ def gen_collection_object(args, sess, collection): print(f'{level} {collection.uuid} completed') return -def gen_studies(args): +def gen_all(args): sql_uri = f'postgresql+psycopg2://{settings.CLOUD_USERNAME}:{settings.CLOUD_PASSWORD}@{settings.CLOUD_HOST}:{settings.CLOUD_PORT}/{settings.CLOUD_DATABASE}' # sql_engine = create_engine(sql_uri, echo=True) @@ -102,4 +102,4 @@ def gen_studies(args): print(f'args: {json.dumps(args.__dict__, indent=2)}') args.dst_bucket = client.bucket(args.dst_bucket_name) - gen_studies(args) + gen_all(args) diff --git a/hfs/prop3/collection_list_3.py b/hfs/prop3/collection_list_3.py new file mode 100644 index 0000000..909bdad --- /dev/null +++ b/hfs/prop3/collection_list_3.py @@ -0,0 +1,17 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +collection_list = ['APOLLO-5-LSCC', 'MIDRC-RICORD-1C', 'TCGA-READ'] diff --git a/hfs/prop3/copy_and_rename_instances.py b/hfs/prop3/copy_and_rename_instances.py deleted file mode 100644 index 02424d5..0000000 --- a/hfs/prop3/copy_and_rename_instances.py +++ /dev/null @@ -1,156 +0,0 @@ -# -# Copyright 2015-2021, Institute for Systems Biology -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import argparse -from utilities.logging_config import successlogger, progresslogger, errlogger -from google.cloud import bigquery, storage -import time -from multiprocessing import Process, Queue - -# Copy the blobs that are new to a version from dev pre-staging buckets -# to dev staging buckets. - -def get_urls(args): - client = bigquery.Client() - query = f""" - SELECT - distinct - collection_id, - st_uuid, - se_uuid, - i_uuid, - series_instance_uid, - sop_instance_uid - FROM - `idc-dev-etl.idc_v{args.version}_dev.all_joined` - WHERE - collection_id in {args.collections} - OR (collection_id='TCGA-READ' and i_source='tcia') - OR (collection_id='TCGA-ESCA' and i_source='tcia') - """ - # urls = list(client.query(query)) - query_job = client.query(query) # Make an API request. - query_job.result() # Wait for the query to complete. - destination = query_job.destination - destination = client.get_table(destination) - return destination - -def copy_some_blobs(args, client, dones, metadata, n): - for blob in metadata: - src_blob_name = f"{blob['i_uuid']}.dcm" - if args.hfs_level == 'series': - dst_blob_name = f"{blob['se_uuid']}/{blob['i_uuid']}.dcm" - else: - dst_blob_name = f"{blob['st_uuid']}/{blob['se_uuid']}/{blob['i_uuid']}.dcm" - if not src_blob_name in dones: - src_bucket_name='idc-dev-open' - src_bucket = client.bucket(src_bucket_name) - src_blob = src_bucket.blob(src_blob_name) - dst_bucket_name = args.dst_bucket - dst_bucket = client.bucket(dst_bucket_name) - dst_blob = dst_bucket.blob(dst_blob_name) - - # Large blobs need to special handling - try: - rewrite_token = False - while True: - rewrite_token, bytes_rewritten, bytes_to_rewrite = dst_blob.rewrite( - src_blob, token=rewrite_token - ) - if not rewrite_token: - break - successlogger.info('%s', src_blob_name) - print(f'p{args.id}: {n}of{len(metadata)}: {src_bucket_name}/{src_blob_name} --> {dst_bucket_name}/{dst_blob_name}') - except Exception as exc: - errlogger.error('p%s: %sof%s Blob: %s: %s', args.id, n, len(metadata), src_blob_name, exc) - n += 1 - - -def worker(input, args, dones): - # proglogger.info('p%s: Worker starting: args: %s', args.id, args ) - # print(f'p{args.id}: Worker starting: args: {args}') - - RETRIES = 3 - - client = storage.Client() - for metadata, n in iter(input.get, 'STOP'): - copy_some_blobs(args, client, dones, metadata, n) - - -def copy_all_blobs(args): - # try: - # # dones = open(f'{args.log_dir}/success.log').read().splitlines() - # dones = open(successlogger.handlers[0].baseFilename).read().splitlines() - # except: - # dones = [] - dones = [] - - bq_client = bigquery.Client() - destination = get_urls(args) - - num_processes = args.processes - processes = [] - # Create a pair of queue for each process - - task_queue = Queue() - - strt = time.time() - - # Start worker processes - for process in range(num_processes): - args.id = process + 1 - processes.append( - Process(group=None, target=worker, args=(task_queue, args, dones))) - processes[-1].start() - - # Distribute the work across the task_queues - n = 0 - for page in bq_client.list_rows(destination, page_size=args.batch).pages: - metadata = [{"collection_id": row.collection_id, "st_uuid": row.st_uuid, "se_uuid": row.se_uuid, "series_instance_uid": row.series_instance_uid, "sop_instance_uid": row.sop_instance_uid, "i_uuid": row.i_uuid} for row in page] - task_queue.put((metadata, n)) - # print(f'Queued {n}:{n+args.batch-1}') - n += page.num_items - print('Primary work distribution complete; {} blobs'.format(n)) - - # Tell child processes to stop - for i in range(num_processes): - task_queue.put('STOP') - - - # Wait for process to terminate - for process in processes: - print(f'Joining process: {process.name}, {process.is_alive()}') - process.join() - - delta = time.time() - strt - rate = (n)/delta - - -# if __name__ == '__main__': -# parser = argparse.ArgumentParser() -# parser.add_argument('--version', default=8, help='Version to work on') -# parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_and_rename_instances') -# parser.add_argument('--collections', default="('APOLLO-5-LSCC', 'CPTAC-SAR')") -# parser.add_argument('--hfs_level', default='series',help='Name blobs as study/series/instance if study, series/instance if series') -# parser.add_argument('--src_bucket', default='idc-dev-open', help='Bucket from which to copy blobs') -# parser.add_argument('--dst_bucket', default='whc_series_instance', help='Bucket into which to copy blobs') -# parser.add_argument('--batch', default=100) -# parser.add_argument('--processes', default=16) -# args = parser.parse_args() -# args.id = 0 # Default process ID -# -# copy_all_blobs(args) diff --git a/hfs/prop3/copy_and_rename_instances.series_instance.py b/hfs/prop3/copy_and_rename_instances.series_instance.py deleted file mode 100644 index e78d6fa..0000000 --- a/hfs/prop3/copy_and_rename_instances.series_instance.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Copyright 2015-2021, Institute for Systems Biology -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import argparse -from utilities.logging_config import successlogger, progresslogger, errlogger -from google.cloud import bigquery, storage -import time -from multiprocessing import Process, Queue -from copy_and_rename_instances import copy_all_blobs - -# Copy the blobs that are new to a version from dev pre-staging buckets -# to dev staging buckets. - - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--version', default=8, help='Version to work on') - parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_and_rename_instances') - parser.add_argument('--collections', default="('APOLLO-5-LSCC', 'CPTAC-SAR')") - parser.add_argument('--hfs_level', default='series',help='Name blobs as study/series/instance if study, series/instance if series') - parser.add_argument('--src_bucket', default='idc-dev-open', help='Bucket from which to copy blobs') - parser.add_argument('--dst_bucket', default='whc_si', help='Bucket into which to copy blobs') - parser.add_argument('--batch', default=100) - parser.add_argument('--processes', default=32) - args = parser.parse_args() - args.id = 0 # Default process ID - - if not os.path.exists('{}'.format(args.log_dir)): - os.mkdir('{}'.format(args.log_dir)) - - copy_all_blobs(args) diff --git a/hfs/prop3/copy_and_rename_instances.study_series_instance.py b/hfs/prop3/copy_and_rename_instances.study_series_instance.py deleted file mode 100644 index 4323ea5..0000000 --- a/hfs/prop3/copy_and_rename_instances.study_series_instance.py +++ /dev/null @@ -1,45 +0,0 @@ -# -# Copyright 2015-2021, Institute for Systems Biology -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import argparse - -import settings -from utilities.logging_config import successlogger, progresslogger, errlogger -from google.cloud import bigquery, storage -import time -from multiprocessing import Process, Queue -from copy_and_rename_instances import copy_all_blobs - -# Copy the blobs that are new to a version from dev pre-staging buckets -# to dev staging buckets. -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--version', default=10, help='Version to work on') - parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_and_rename_instances') - parser.add_argument('--collections', default="('APOLLO-5-LSCC', 'CPTAC-SAR')") - parser.add_argument('--hfs_level', default='study',help='Name blobs as study/series/instance if study, series/instance if series') - parser.add_argument('--src_bucket', default='idc-dev-open', help='Bucket from which to copy blobs') - parser.add_argument('--dst_bucket', default='whc_prop3a', help='Bucket into which to copy blobs') - parser.add_argument('--batch', default=100) - parser.add_argument('--processes', default=64) - args = parser.parse_args() - args.id = 0 # Default process ID - - if not os.path.exists('{}'.format(args.log_dir)): - os.mkdir('{}'.format(args.log_dir)) - - copy_all_blobs(args) diff --git a/hfs/prop3/gen_blobs_3.py b/hfs/prop3/gen_blobs_3.py new file mode 100644 index 0000000..e20c63e --- /dev/null +++ b/hfs/prop3/gen_blobs_3.py @@ -0,0 +1,151 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# In this version, we generate hierarchically name blobs for studies and series. +# Blob contents are lists of child series and child instances respectively. +# Child object names include the bucket in which the child is found. +import json +import argparse + +from collection_list_3 import collection_list +from utilities.logging_config import successlogger, progresslogger, errlogger +from idc.models import Base, Version, Collection, Patient, Study, Series, Instance, All_Included_Collections +from google.cloud import storage +from sqlalchemy.orm import Session +from python_settings import settings +from sqlalchemy import create_engine, update + + +def gen_series_object(args, sess, collection, patient, study, series): + level = "Series" + if not args.dst_bucket.blob(f"{study.uuid}/{series.uuid}/").exists(): + print(f'\t\t\t{level} {series.uuid} started') + # Create a combined "folder" and "bundle" blob + contents = "\n".join( + [f"{sess.query(All_Included_Collections.pub_tcia_url).filter(All_Included_Collections.tcia_api_collection_id == collection.collection_id).first().pub_tcia_url}/{instance.uuid}.dcm" if instance.source.name == 'tcia' else \ + f"{sess.query(All_Included_Collections.pub_tcia_url).filter(All_Included_Collections.tcia_api_collection_id == collection.collection_id).first().pub_path_url}/{instance.uuid}.dcm" \ + for instance in series.instances])+'\n' + blob = args.dst_bucket.blob(f"{study.uuid}/{series.uuid}.idc").upload_from_string(contents) + if not args.dst_bucket.blob(f"{study.uuid}/{series.uuid}.idc").exists(): + errlogger.error(f"{study.uuid}/{series.uuid}/ doesn't exist") + print(f'\t\t\t{level} {series.uuid} completed') + else: + + print(f'\t\t\t{level} {series.uuid} skipped') + return + + +# def gen_series_object(args, sess, collection, patient, study, series): +# level = "Series" +# if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/").exists(): +# print(f'\t\t\t{level} {series.uuid} started') +# for instance in series.instances: +# gen_instance_object(args, sess, collection, patient, study, series, instance) +# contents = { +# "path": f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/", +# "children": +# [ +# f"{instance.uuid}/" for instance in series.instances +# ] +# } +# blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/").upload_from_string(json.dumps(contents)) +# print(f'\t\t\t{level} {series.uuid} completed') +# else: +# print(f'\t\t\t{level} {series.uuid} skipped') +# return + + +def gen_study_object(args, sess, collection, patient, study): + level = "Study" + if not args.dst_bucket.blob(f"{study.uuid}/").exists(): + print(f'\t\t{level} {study.uuid} started') + for series in study.seriess: + if series.sources.tcia: + gen_series_object(args, sess, collection, patient, study, series) + # Create a combined "folder" and "bundle" blob + contents = "\n".join([f"{args.dst_bucket_name}/{study.uuid}/{series.uuid}/" for series in study.seriess])+'\n' + blob = args.dst_bucket.blob(f"{study.uuid}/").upload_from_string(contents) + if not args.dst_bucket.blob(f"{study.uuid}/").exists(): + errlogger.error(f"{study.uuid}/ doesn't exist") + print(f'\t\t{level} {study.uuid} completed') + else: + print(f'\t\t{level} {study.uuid} skipped') + return + +# def gen_study_object(args, sess, collection, patient, study): +# level = "Study" +# if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/").exists(): +# print(f'\t\t{level} {study.uuid} started') +# for series in study.seriess: +# gen_series_object(args, sess, collection, patient, study, series) +# contents = { +# "path": f"{collection.uuid}/{patient.uuid}/{study.uuid}/", +# "children": +# [ +# f"{series.uuid}/" for series in study.seriess +# ] +# } +# blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/").upload_from_string(json.dumps(contents)) +# print(f'\t\t{level} {study.uuid} completed') +# else: +# print(f'\t\t{level} {study.uuid} skipped') +# return + + +def gen_patient_object(args, sess, collection, patient): + level = "Patient" + for study in patient.studies: + if study.sources.tcia: + gen_study_object(args, sess, collection, patient, study) + print(f'\t{level} {patient.uuid} completed') + return + +def gen_collection_object(args, sess, collection): + level = "Collection" + for patient in collection.patients: + gen_patient_object(args, sess, collection, patient) + print(f'{level} {collection.uuid} completed') + return + +def gen_all(args): + + sql_uri = f'postgresql+psycopg2://{settings.CLOUD_USERNAME}:{settings.CLOUD_PASSWORD}@{settings.CLOUD_HOST}:{settings.CLOUD_PORT}/{settings.CLOUD_DATABASE}' + # sql_engine = create_engine(sql_uri, echo=True) + sql_engine = create_engine(sql_uri) + # Create the tables if they do not already exist + Base.metadata.create_all(sql_engine) + + with Session(sql_engine) as sess: + for collection_id in collection_list: + collections = sess.query(Collection).filter(Collection.collection_id == collection_id) + for collection in collections: + gen_collection_object(args, sess, collection) + + + +if __name__ == '__main__': + client = storage.Client() + parser = argparse.ArgumentParser() + parser.add_argument('--version', default=11, help='Version to work on') + # parser.add_argument('--hfs_levels', default=['study', 'series'], help='Name blobs as study/series/instance if study, series/instance if series') + parser.add_argument('--dst_bucket_name', default='whc_prop3', help='Bucket into which to copy blobs') + args = parser.parse_args() + + args.id = 0 # Default process ID + print(f'args: {json.dumps(args.__dict__, indent=2)}') + args.dst_bucket = client.bucket(args.dst_bucket_name) + + gen_all(args) diff --git a/hfs/prop3a/collection_list_3a.py b/hfs/prop3a/collection_list_3a.py new file mode 100644 index 0000000..278ae67 --- /dev/null +++ b/hfs/prop3a/collection_list_3a.py @@ -0,0 +1,17 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +collection_list = ['APOLLO-5-LSCC', 'CPTAC-SAR', 'TCGA-READ'] diff --git a/hfs/prop3a/copy_and_rename_instances.series_instance.py b/hfs/prop3a/copy_and_rename_instances.series_instance.py deleted file mode 100644 index e78d6fa..0000000 --- a/hfs/prop3a/copy_and_rename_instances.series_instance.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Copyright 2015-2021, Institute for Systems Biology -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import argparse -from utilities.logging_config import successlogger, progresslogger, errlogger -from google.cloud import bigquery, storage -import time -from multiprocessing import Process, Queue -from copy_and_rename_instances import copy_all_blobs - -# Copy the blobs that are new to a version from dev pre-staging buckets -# to dev staging buckets. - - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--version', default=8, help='Version to work on') - parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_and_rename_instances') - parser.add_argument('--collections', default="('APOLLO-5-LSCC', 'CPTAC-SAR')") - parser.add_argument('--hfs_level', default='series',help='Name blobs as study/series/instance if study, series/instance if series') - parser.add_argument('--src_bucket', default='idc-dev-open', help='Bucket from which to copy blobs') - parser.add_argument('--dst_bucket', default='whc_si', help='Bucket into which to copy blobs') - parser.add_argument('--batch', default=100) - parser.add_argument('--processes', default=32) - args = parser.parse_args() - args.id = 0 # Default process ID - - if not os.path.exists('{}'.format(args.log_dir)): - os.mkdir('{}'.format(args.log_dir)) - - copy_all_blobs(args) diff --git a/hfs/prop3a/copy_and_rename_instances.study_series_instance.py b/hfs/prop3a/copy_and_rename_instances.study_series_instance.py deleted file mode 100644 index ae66c96..0000000 --- a/hfs/prop3a/copy_and_rename_instances.study_series_instance.py +++ /dev/null @@ -1,43 +0,0 @@ -# -# Copyright 2015-2021, Institute for Systems Biology -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import argparse -from utilities.logging_config import successlogger, progresslogger, errlogger -from google.cloud import bigquery, storage -import time -from multiprocessing import Process, Queue -from copy_and_rename_instances import copy_all_blobs - -# Copy the blobs that are new to a version from dev pre-staging buckets -# to dev staging buckets. -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--version', default=8, help='Version to work on') - parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_and_rename_instances') - parser.add_argument('--collections', default="('APOLLO-5-LSCC', 'CPTAC-SAR', 'MIDRC-RICORD-1C', 'TCGA-READ')") - parser.add_argument('--hfs_level', default='study',help='Name blobs as study/series/instance if study, series/instance if series') - parser.add_argument('--src_bucket', default='idc-dev-open', help='Bucket from which to copy blobs') - parser.add_argument('--dst_bucket', default='whc_ssi', help='Bucket into which to copy blobs') - parser.add_argument('--batch', default=100) - parser.add_argument('--processes', default=64) - args = parser.parse_args() - args.id = 0 # Default process ID - - if not os.path.exists('{}'.format(args.log_dir)): - os.mkdir('{}'.format(args.log_dir)) - - copy_all_blobs(args) diff --git a/hfs/prop3a/copy_and_rename_instances.py b/hfs/prop3a/copy_instances_3a.py similarity index 83% rename from hfs/prop3a/copy_and_rename_instances.py rename to hfs/prop3a/copy_instances_3a.py index c65a097..7b57111 100644 --- a/hfs/prop3a/copy_and_rename_instances.py +++ b/hfs/prop3a/copy_instances_3a.py @@ -135,17 +135,21 @@ def copy_all_blobs(args): rate = (n)/delta -# if __name__ == '__main__': -# parser = argparse.ArgumentParser() -# parser.add_argument('--version', default=8, help='Version to work on') -# parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_and_rename_instances') -# parser.add_argument('--collections', default="('APOLLO-5-LSCC', 'CPTAC-SAR')") -# parser.add_argument('--hfs_level', default='series',help='Name blobs as study/series/instance if study, series/instance if series') -# parser.add_argument('--src_bucket', default='idc-dev-open', help='Bucket from which to copy blobs') -# parser.add_argument('--dst_bucket', default='whc_series_instance', help='Bucket into which to copy blobs') -# parser.add_argument('--batch', default=100) -# parser.add_argument('--processes', default=16) -# args = parser.parse_args() -# args.id = 0 # Default process ID -# -# copy_all_blobs(args) +# Copy the blobs that are new to a version from dev pre-staging buckets +# to dev staging buckets. +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--version', default=8, help='Version to work on') + parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_and_rename_instances') + parser.add_argument('--collections', default="('APOLLO-5-LSCC', 'CPTAC-SAR', 'MIDRC-RICORD-1C', 'TCGA-READ')") + parser.add_argument('--src_bucket', default='idc-dev-open', help='Bucket from which to copy blobs') + parser.add_argument('--dst_bucket', default='whc_prop3a', help='Bucket into which to copy blobs') + parser.add_argument('--batch', default=100) + parser.add_argument('--processes', default=64) + args = parser.parse_args() + args.id = 0 # Default process ID + + if not os.path.exists('{}'.format(args.log_dir)): + os.mkdir('{}'.format(args.log_dir)) + + copy_all_blobs(args) diff --git a/hfs/prop3a/gen_blobs.py b/hfs/prop3a/gen_blobs.py deleted file mode 100644 index 69868d9..0000000 --- a/hfs/prop3a/gen_blobs.py +++ /dev/null @@ -1,149 +0,0 @@ -# -# Copyright 2015-2021, Institute for Systems Biology -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import argparse - -from utilities.logging_config import successlogger, progresslogger, errlogger -from idc.models import Base, Version, Collection, Patient, Study, Series, Instance, All_Included_Collections -from google.cloud import storage -from sqlalchemy.orm import Session -from python_settings import settings -from sqlalchemy import create_engine, update - -def gen_instance_object(args, sess, collection, patient, study, series, instance): - level = "Instance" - if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/{instance.uuid}/").exists(): - # print(f'\t\t{level} {instance.uuid} started') - if instance.source.name == 'tcia': - bucket = sess.query(All_Included_Collections.pub_tcia_url).filter(All_Included_Collections.tcia_api_collection_id == collection.collection_id).first().pub_tcia_url - else: - bucket = sess.query(All_Included_Collections.pub_path_url).filter(All_Included_Collections.tcia_api_collection_id == collection.collection_id).first().pub_path_url - - contents = { "child": f"{bucket}/{instance.uuid}.dcm" } - blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/{instance.uuid}").upload_from_string(json.dumps(contents)) - print(f'\t\t\t\t{level} {instance.uuid} completed') - else: - print(f'\t\t\t\t{level} {instance.uuid} skipped') - return - - -def gen_series_object(args, sess, collection, patient, study, series): - level = "Series" - if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/").exists(): - print(f'\t\t\t{level} {series.uuid} started') - for instance in series.instances: - gen_instance_object(args, sess, collection, patient, study, series, instance) - contents = { - "path": f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/", - "children": - [ - f"{instance.uuid}/" for instance in series.instances - ] - } - blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/").upload_from_string(json.dumps(contents)) - print(f'\t\t\t{level} {series.uuid} completed') - else: - print(f'\t\t\t{level} {series.uuid} skipped') - return - - -def gen_study_object(args, sess, collection, patient, study): - level = "Study" - if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/").exists(): - print(f'\t\t{level} {study.uuid} started') - for series in study.seriess: - gen_series_object(args, sess, collection, patient, study, series) - contents = { - "path": f"{collection.uuid}/{patient.uuid}/{study.uuid}/", - "children": - [ - f"{series.uuid}/" for series in study.seriess - ] - } - blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/").upload_from_string(json.dumps(contents)) - print(f'\t\t{level} {study.uuid} completed') - else: - print(f'\t\t{level} {study.uuid} skipped') - return - - -def gen_patient_object(args, sess, collection, patient): - level = "Patient" - if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/").exists(): - print(f'\t{level} {patient.uuid} started') - for study in patient.studies: - gen_study_object(args, sess, collection, patient, study) - contents = { - "path": f"{collection.uuid}/{patient.uuid}/", - "children": - [ - f"{study.uuid}/" for study in patient.studies - ] - } - blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/").upload_from_string(json.dumps(contents)) - print(f'\t{level} {patient.uuid} completed') - else: - print(f'\t{level} {patient.uuid} skipped') - return - -def gen_collection_object(args, sess, collection): - level = "Collection" - if not args.dst_bucket.blob(f"{collection.uuid}/").exists(): - print(f'{level} {collection.uuid} started') - for patient in collection.patients: - gen_patient_object(args, sess, collection, patient) - contents = { - "path": f"{collection.uuid}/", - "children": - [ - f"{patient.uuid}/" for patient in collection.patients - ] - } - blob = args.dst_bucket.blob(f"{collection.uuid}/").upload_from_string(json.dumps(contents)) - print(f'{level} {collection.uuid} completed') - else: - print(f'{level} {collection.uuid} skipped') - return - -def gen_collections(args): - sql_uri = f'postgresql+psycopg2://{settings.CLOUD_USERNAME}:{settings.CLOUD_PASSWORD}@{settings.CLOUD_HOST}:{settings.CLOUD_PORT}/{settings.CLOUD_DATABASE}' - # sql_engine = create_engine(sql_uri, echo=True) - sql_engine = create_engine(sql_uri) - # Create the tables if they do not already exist - Base.metadata.create_all(sql_engine) - - with Session(sql_engine) as sess: - for collection_id in args.collections: - collections = sess.query(Collection).filter(Collection.collection_id == collection_id) - for collection in collections: - gen_collection_object(args, sess, collection) - - -if __name__ == '__main__': - client = storage.Client() - parser = argparse.ArgumentParser() - parser.add_argument('--version', default=9, help='Version to work on') - parser.add_argument('--collections', default=['APOLLO-5-LSCC', 'CPTAC-SAR', 'MIDRC-RICORD-1C', 'TCGA-READ']) - # parser.add_argument('--hfs_levels', default=['study', 'series'], help='Name blobs as study/series/instance if study, series/instance if series') - parser.add_argument('--dst_bucket_name', default='whc_prop3', help='Bucket into which to copy blobs') - args = parser.parse_args() - - args.id = 0 # Default process ID - print(f'args: {json.dumps(args.__dict__, indent=2)}') - args.dst_bucket = client.bucket(args.dst_bucket_name) - - gen_collections(args) diff --git a/hfs/prop3/gen_blobs.py b/hfs/prop3a/gen_blobs_3a.py similarity index 100% rename from hfs/prop3/gen_blobs.py rename to hfs/prop3a/gen_blobs_3a.py diff --git a/hfs/prop3b/collection_list_3b.py b/hfs/prop3b/collection_list_3b.py new file mode 100644 index 0000000..87fc8a9 --- /dev/null +++ b/hfs/prop3b/collection_list_3b.py @@ -0,0 +1,17 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +collection_list = ['APOLLO-5-LSCC', 'CPTA-CM', 'TCGA-READ'] diff --git a/hfs/prop3b/gen_blobs_3b.py b/hfs/prop3b/gen_blobs_3b.py new file mode 100644 index 0000000..b48d8c6 --- /dev/null +++ b/hfs/prop3b/gen_blobs_3b.py @@ -0,0 +1,166 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# In this version, we generate hierarchically name blobs for studies and series. +# Blob contents are lists of child series and child instances respectively. +# Child object names include the bucket in which the child is found. +import json +import argparse + +from collection_list_3b import collection_list +from utilities.logging_config import successlogger, progresslogger, errlogger +from idc.models import Base, Version, Collection, Patient, Study, Series, Instance, All_Included_Collections +from google.cloud import storage +from sqlalchemy.orm import Session +from python_settings import settings +from sqlalchemy import create_engine, update + +def gen_instance_object(args, sess, collection, patient, study, series, instance): + level = "Instance" + if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/{instance.uuid}/").exists(): + # print(f'\t\t{level} {instance.uuid} started') + if instance.source.name == 'tcia': + bucket = sess.query(All_Included_Collections.pub_tcia_url).filter(All_Included_Collections.tcia_api_collection_id == collection.collection_id).first().pub_tcia_url + else: + bucket = sess.query(All_Included_Collections.pub_path_url).filter(All_Included_Collections.tcia_api_collection_id == collection.collection_id).first().pub_path_url + + contents = f"{bucket}/{instance.uuid}.dcm" + blob = args.dst_bucket.blob(f"{study.uuid}/{series.uuid}/{instance.uuid}.idc").upload_from_string(contents) + if not args.dst_bucket.blob(f"{study.uuid}/{series.uuid}/{instance.uuid}.idc").exists(): + errlogger.error(f"{study.uuid}/{series.uuid}/{instance.uuid}.idc doesn't exist") + print(f'\t\t\t\t{level} {instance.uuid} completed') + else: + print(f'\t\t\t\t{level} {instance.uuid} skipped') + return + +def gen_series_object(args, sess, collection, patient, study, series): + level = "Series" + if not args.dst_bucket.blob(f"{study.uuid}/{series.uuid}/").exists(): + print(f'\t\t\t{level} {series.uuid} started') + # Create a combined "folder" and "bundle" blob + for instance in series.instances: + gen_instance_object(args, sess, collection, patient, study, series, instance) + contents = "\n".join( + [f"{args.dst_bucket_name}/{study.uuid}/{series.uuid}/{instance.uuid}.idc" for instance in series.instances]) + blob = args.dst_bucket.blob(f"{study.uuid}/{series.uuid}/").upload_from_string(contents) + if not args.dst_bucket.blob(f"{study.uuid}/{series.uuid}/").exists(): + errlogger.error(f"{study.uuid}/{series.uuid}/ doesn't exist") + print(f'\t\t\t{level} {series.uuid} completed') + else: + + print(f'\t\t\t{level} {series.uuid} skipped') + return + + +# def gen_series_object(args, sess, collection, patient, study, series): +# level = "Series" +# if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/").exists(): +# print(f'\t\t\t{level} {series.uuid} started') +# for instance in series.instances: +# gen_instance_object(args, sess, collection, patient, study, series, instance) +# contents = { +# "path": f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/", +# "children": +# [ +# f"{instance.uuid}/" for instance in series.instances +# ] +# } +# blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/{series.uuid}/").upload_from_string(json.dumps(contents)) +# print(f'\t\t\t{level} {series.uuid} completed') +# else: +# print(f'\t\t\t{level} {series.uuid} skipped') +# return + + +def gen_study_object(args, sess, collection, patient, study): + level = "Study" + if not args.dst_bucket.blob(f"{study.uuid}/").exists(): + print(f'\t\t{level} {study.uuid} started') + for series in study.seriess: + gen_series_object(args, sess, collection, patient, study, series) + # Create a combined "folder" and "bundle" blob + contents = "\n".join([f"{args.dst_bucket_name}/{study.uuid}/{series.uuid}/" for series in study.seriess]) + blob = args.dst_bucket.blob(f"{study.uuid}/").upload_from_string(contents) + if not args.dst_bucket.blob(f"{study.uuid}/").exists(): + errlogger.error(f"{study.uuid}/ doesn't exist") + print(f'\t\t{level} {study.uuid} completed') + else: + print(f'\t\t{level} {study.uuid} skipped') + return + +# def gen_study_object(args, sess, collection, patient, study): +# level = "Study" +# if not args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/").exists(): +# print(f'\t\t{level} {study.uuid} started') +# for series in study.seriess: +# gen_series_object(args, sess, collection, patient, study, series) +# contents = { +# "path": f"{collection.uuid}/{patient.uuid}/{study.uuid}/", +# "children": +# [ +# f"{series.uuid}/" for series in study.seriess +# ] +# } +# blob = args.dst_bucket.blob(f"{collection.uuid}/{patient.uuid}/{study.uuid}/").upload_from_string(json.dumps(contents)) +# print(f'\t\t{level} {study.uuid} completed') +# else: +# print(f'\t\t{level} {study.uuid} skipped') +# return + + +def gen_patient_object(args, sess, collection, patient): + level = "Patient" + for study in patient.studies: + gen_study_object(args, sess, collection, patient, study) + print(f'\t{level} {patient.uuid} completed') + return + +def gen_collection_object(args, sess, collection): + level = "Collection" + for patient in collection.patients: + gen_patient_object(args, sess, collection, patient) + print(f'{level} {collection.uuid} completed') + return + +def gen_all(args): + + sql_uri = f'postgresql+psycopg2://{settings.CLOUD_USERNAME}:{settings.CLOUD_PASSWORD}@{settings.CLOUD_HOST}:{settings.CLOUD_PORT}/{settings.CLOUD_DATABASE}' + # sql_engine = create_engine(sql_uri, echo=True) + sql_engine = create_engine(sql_uri) + # Create the tables if they do not already exist + Base.metadata.create_all(sql_engine) + + with Session(sql_engine) as sess: + for collection_id in collection_list: + collections = sess.query(Collection).filter(Collection.collection_id == collection_id) + for collection in collections: + gen_collection_object(args, sess, collection) + + + +if __name__ == '__main__': + client = storage.Client() + parser = argparse.ArgumentParser() + parser.add_argument('--version', default=11, help='Version to work on') + # parser.add_argument('--hfs_levels', default=['study', 'series'], help='Name blobs as study/series/instance if study, series/instance if series') + parser.add_argument('--dst_bucket_name', default='whc_prop3b', help='Bucket into which to copy blobs') + args = parser.parse_args() + + args.id = 0 # Default process ID + print(f'args: {json.dumps(args.__dict__, indent=2)}') + args.dst_bucket = client.bucket(args.dst_bucket_name) + + gen_all(args)