From cc2736221249b44b42ceb20ed64efbdb22dc1aa5 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Thu, 3 May 2018 16:54:58 -0700 Subject: [PATCH 01/39] -> Remove unneeded method --- cohorts/views.py | 60 ------------------------------------------------ 1 file changed, 60 deletions(-) diff --git a/cohorts/views.py b/cohorts/views.py index a1b24407..05436cfb 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -74,66 +74,6 @@ def convert(data): else: return data - -# Given a cohort ID, fetches the case_gdc_id or uuid of its samples -# As metadata_data tables are build specific, this checks all builds and coalesces the results; if more than one -# nonnull result is found the first one is kept (though in practice all non-null values should be identical) -def get_cohort_uuids(cohort_id): - if not cohort_id: - raise Exception("A cohort ID was not provided (value={}).".format("None" if cohort_id is None else str(cohort_id))) - - cohort_progs = Cohort.objects.get(id=cohort_id).get_programs() - - data_tables = Public_Data_Tables.objects.filter(program_id__in=cohort_progs) - - uuid_query_base = """ - SELECT cs.sample_barcode, COALESCE ({}) as uuid - FROM cohorts_samples cs - {} - """ - - result = {} - - db = None - cursor = None - - try: - db = get_sql_connection() - cursor = db.cursor() - - query = uuid_query_base - - # Because UUIDs are stored in the data tables, which are build specific, we need to check the values - # in all builds for a given program. If more than one build has a case_gdc_id they should match, but some could - # be null, so we need to coalesce them to find a non-null value - for prog in cohort_progs: - prog_data_tables = data_tables.filter(program_id=prog) - count=1 - uuid_cols = [] - joins = [] - for data_table in prog_data_tables: - uuid_cols.append("ds{}.case_gdc_id".format(str(count))) - joins.append(""" - LEFT JOIN {} ds{} - ON ds{}.sample_barcode = cs.sample_barcode - """.format(data_table.data_table,str(count),str(count),)) - count+=1 - - cursor.execute(query.format(",".join(uuid_cols)," ".join(joins)) + " WHERE cs.cohort_id = %s;", (cohort_id,)) - - for row in cursor.fetchall(): - if row[0] not in result: - result[row[0]] = row[1] - - except Exception as e: - logger.error("[ERROR] While fetching UUIDs for a cohort:") - logger.exception(e) - finally: - if cursor: cursor.close() - if db and db.open: db.close() - - return result - def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None, build='HG19'): if program_id is None and cohort_id is None: From a97a54826fd8eaba1a405e7f76ec3b53de5db233 Mon Sep 17 00:00:00 2001 From: wlongabaugh Date: Fri, 4 May 2018 10:48:39 -0700 Subject: [PATCH 02/39] Missed some places to use build_with_retries --- google_helpers/resourcemanager_service.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/google_helpers/resourcemanager_service.py b/google_helpers/resourcemanager_service.py index a9b99ce9..c8045818 100644 --- a/google_helpers/resourcemanager_service.py +++ b/google_helpers/resourcemanager_service.py @@ -17,8 +17,8 @@ """ from oauth2client.client import GoogleCredentials -from googleapiclient.discovery import build from django.conf import settings +from .utils import build_with_retries CRM_SCOPES = ['https://www.googleapis.com/auth/cloud-platform'] @@ -28,8 +28,8 @@ def get_crm_resource(): Returns: a Cloud Resource Manager service client for calling the API. """ credentials = GoogleCredentials.get_application_default() - return build('cloudresourcemanager', 'v1beta1', credentials=credentials, cache_discovery=False) - + service = build_with_retries('cloudresourcemanager', 'v1beta1', credentials, 2) + return service def get_special_crm_resource(): """ @@ -39,4 +39,5 @@ def get_special_crm_resource(): """ credentials = GoogleCredentials.from_stream( settings.USER_GCP_ACCESS_CREDENTIALS).create_scoped(CRM_SCOPES) - return build('cloudresourcemanager', 'v1beta1', credentials=credentials, cache_discovery=False) + service = build_with_retries('cloudresourcemanager', 'v1beta1', credentials, 2) + return service From e810b8a8d8750d317c2fae0e755de70c35eda81a Mon Sep 17 00:00:00 2001 From: wlongabaugh Date: Fri, 4 May 2018 11:10:27 -0700 Subject: [PATCH 03/39] FIxing multi-click unregister and format problem. --- accounts/sa_utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index ffb2d6f6..6aa51e08 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -595,7 +595,10 @@ def unregister_all_gcp_sa(user_id, gcp_id): def unregister_sa(user_id, sa_name): st_logger = StackDriverLogger.build_from_django_settings() - sa = ServiceAccount.objects.get(service_account=sa_name, active=1) + sa = ServiceAccount.objects.get(service_account=sa_name) + # papid multi-clicks on button can cause this sa to be inactive already. Nothing to be done... + if not sa.active: + return saads = ServiceAccountAuthorizedDatasets.objects.filter(service_account=sa) st_logger.write_text_log_entry(SERVICE_ACCOUNT_LOG_NAME, "[STATUS] User {} is unregistering SA {}".format( @@ -629,7 +632,7 @@ def unregister_sa(user_id, sa_name): 'message': '[ERROR] There was an error in removing SA {0} from Google Group {1}.'.format( str(saad.service_account.service_account), saad.authorized_dataset.acl_google_group)}) st_logger.write_struct_log_entry(SERVICE_ACCOUNT_LOG_NAME, { - 'message': '[ERROR] {}}.'.format(str(e))}) + 'message': '[ERROR] {}.'.format(str(e))}) logger.error('[ERROR] There was an error in removing SA {0} from Google Group {1}: {2}'.format( str(saad.service_account.service_account), saad.authorized_dataset.acl_google_group, e)) logger.exception(e) From bc176d14868728247f7034b61c5d30b3cb526cef Mon Sep 17 00:00:00 2001 From: wlongabaugh Date: Fri, 4 May 2018 11:14:37 -0700 Subject: [PATCH 04/39] FIxing multi-click unregister and format problem. --- accounts/sa_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index 6aa51e08..46555598 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -598,6 +598,9 @@ def unregister_sa(user_id, sa_name): sa = ServiceAccount.objects.get(service_account=sa_name) # papid multi-clicks on button can cause this sa to be inactive already. Nothing to be done... if not sa.active: + st_logger.write_struct_log_entry(SERVICE_ACCOUNT_LOG_NAME, { + 'message': '[STATUS] Attempted to remove INACTIVE SA {0} from Google Group {1}.'. + format(str(sa.service_account))}) return saads = ServiceAccountAuthorizedDatasets.objects.filter(service_account=sa) From 2a8d942e487c968c7aa46778a3b84b3738aa79e5 Mon Sep 17 00:00:00 2001 From: wlongabaugh Date: Fri, 4 May 2018 11:15:10 -0700 Subject: [PATCH 05/39] FIxing multi-click unregister and format problem. --- accounts/sa_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index 46555598..06b64021 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -599,8 +599,7 @@ def unregister_sa(user_id, sa_name): # papid multi-clicks on button can cause this sa to be inactive already. Nothing to be done... if not sa.active: st_logger.write_struct_log_entry(SERVICE_ACCOUNT_LOG_NAME, { - 'message': '[STATUS] Attempted to remove INACTIVE SA {0} from Google Group {1}.'. - format(str(sa.service_account))}) + 'message': '[STATUS] Attempted to remove INACTIVE SA {0}'.format(str(sa.service_account))}) return saads = ServiceAccountAuthorizedDatasets.objects.filter(service_account=sa) From 1e1767bf928cd0b559972591e819d3d6fdf8a805 Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Tue, 8 May 2018 17:31:13 -0700 Subject: [PATCH 06/39] -> Fix for #2326 --- cohorts/metadata_counting.py | 1 - cohorts/views.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index 5b210058..9b01537f 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -224,7 +224,6 @@ def count_public_data_type(user, data_query, inc_filters, program_list, filter_f if db and db.open: db.close() - # Tally counts for metadata filters of public programs def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=None, build='HG19'): diff --git a/cohorts/views.py b/cohorts/views.py index a72f7576..75f3ebf6 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -2106,11 +2106,11 @@ def cohort_files(request, cohort_id, limit=25, page=1, offset=0, sort_column='co FROM {metadata_table} md JOIN ( - SELECT sample_barcode + SELECT DISTINCT case_barcode FROM cohorts_samples WHERE cohort_id = {cohort_id} ) cs - ON cs.sample_barcode = md.sample_barcode + ON cs.case_barcode = md.case_barcode WHERE md.file_uploaded='true' {type_conditions} {filter_conditions} """ From c12057fa20fc6c4143b0f2fe82d629d5dbf98c44 Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Tue, 8 May 2018 19:33:19 -0700 Subject: [PATCH 07/39] -> Fix for #2326 --- cohorts/views.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cohorts/views.py b/cohorts/views.py index 75f3ebf6..6d354e23 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -2103,7 +2103,6 @@ def cohort_files(request, cohort_id, limit=25, page=1, offset=0, sort_column='co SELECT md.sample_barcode, md.case_barcode, md.disease_code, md.file_name, md.file_name_key, md.index_file_name, md.access, md.acl, md.platform, md.data_type, md.data_category, md.experimental_strategy, md.data_format, md.file_gdc_id, md.case_gdc_id, md.project_short_name - FROM {metadata_table} md JOIN ( SELECT DISTINCT case_barcode From ea9e77053e0e48c691d7b19b7ca8671e451363db Mon Sep 17 00:00:00 2001 From: wlongabaugh Date: Wed, 9 May 2018 10:01:59 -0700 Subject: [PATCH 08/39] Another retry added --- dataset_utils/auth_list.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dataset_utils/auth_list.py b/dataset_utils/auth_list.py index 59305aed..8b9b95b4 100644 --- a/dataset_utils/auth_list.py +++ b/dataset_utils/auth_list.py @@ -1,6 +1,6 @@ """ -Copyright 2017, Institute for Systems Biology +Copyright 2017-2018, Institute for Systems Biology Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ """ +from google_helpers.utils import execute_with_retries + class DatasetAuthorizationList(object): @classmethod @@ -29,7 +31,7 @@ def get_object_from_gcs(cls, bucket_name, object_name): storage_service = get_storage_resource() req = storage_service.objects().get_media(bucket=bucket_name, object=object_name) - file_contents = req.execute() + file_contents = execute_with_retries(req, 'GET_MEDIA', 2) return file_contents @classmethod From 57afde0127ec029f4f6de2435d013adb97c28ceb Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Thu, 10 May 2018 17:22:31 -0700 Subject: [PATCH 09/39] -> Fixed an error in a messages response -> Prep to make genomic builds NULL-allowed in data type avail. tables --- cohorts/metadata_counting.py | 6 ++---- cohorts/views.py | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index 9b01537f..bb3175a8 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -377,8 +377,6 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non hugo_symbol=str(params['gene']), var_class=params['var_class'], cohort_dataset=bq_cohort_dataset, cohort_table=bq_cohort_table, cohort=cohort) - print >> sys.stdout, str(query) - bq_service = authorize_credentials_with_Google() query_job = submit_bigquery_job(bq_service, settings.BQ_PROJECT_ID, query) job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, query_job['jobReference']['jobId']) @@ -586,7 +584,7 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non """ % (filter_table, data_avail_table, data_type_table, cohort_join) data_avail_query = """ - SELECT DISTINCT ms.sample_barcode, GROUP_CONCAT(CONCAT(dt.isb_label,'; ',dt.genomic_build)) + SELECT DISTINCT ms.sample_barcode, GROUP_CONCAT(CONCAT(dt.isb_label,'; ',COALESCE(dt.genomic_build,'Avail.'))) FROM %s ms JOIN %s da ON da.sample_barcode = ms.sample_barcode JOIN %s dt ON dt.metadata_data_type_availability_id = da.metadata_data_type_availability_id @@ -608,7 +606,7 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non """ % (base_table, data_avail_table, data_type_table,) data_avail_query = """ - SELECT DISTINCT ms.sample_barcode, GROUP_CONCAT(CONCAT(dt.isb_label,'; ',dt.genomic_build)) + SELECT DISTINCT ms.sample_barcode, GROUP_CONCAT(CONCAT(dt.isb_label,'; ',COALESCE(dt.genomic_build,'Avail.'))) FROM %s ms JOIN %s da ON da.sample_barcode = ms.sample_barcode JOIN %s dt ON dt.metadata_data_type_availability_id = da.metadata_data_type_availability_id diff --git a/cohorts/views.py b/cohorts/views.py index 6d354e23..14ac0e93 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -1770,7 +1770,7 @@ def streaming_csv_view(request, cohort_id=0): except Exception as e: logger.error("[ERROR] While downloading the list of files for user {}:".format(str(request.user.id))) logger.exception(e) - messages.error("There was an error while attempting to download your filelist--please contact the administrator.") + messages.error(request,"There was an error while attempting to download your filelist--please contact the administrator.") return redirect(reverse('cohort_filelist', kwargs={'cohort_id': cohort_id})) From b127ddb809ba6d3483b3d5dc551854a0be23da65 Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Thu, 17 May 2018 13:27:47 -0700 Subject: [PATCH 10/39] -> BQ Support now handles querying and job result fetching --- cohorts/metadata_counting.py | 15 +-- cohorts/metadata_helpers.py | 62 +--------- cohorts/views.py | 52 ++------- google_helpers/bigquery/bq_support.py | 158 +++++++++++++++++++++++++- 4 files changed, 173 insertions(+), 114 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index bb3175a8..27671126 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -26,6 +26,7 @@ from metadata_helpers import * from projects.models import Program, Project, User_Data_Tables, Public_Metadata_Tables from google_helpers.bigquery.service import authorize_credentials_with_Google +from google_helpers.bigquery.cohort_support import BigQuerySupport from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned BQ_ATTEMPT_MAX = 10 @@ -377,26 +378,14 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non hugo_symbol=str(params['gene']), var_class=params['var_class'], cohort_dataset=bq_cohort_dataset, cohort_table=bq_cohort_table, cohort=cohort) - bq_service = authorize_credentials_with_Google() - query_job = submit_bigquery_job(bq_service, settings.BQ_PROJECT_ID, query) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, query_job['jobReference']['jobId']) - barcodes = [] - retries = 0 start = time.time() - while not job_is_done and retries < BQ_ATTEMPT_MAX: - retries += 1 - sleep(1) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, query_job['jobReference']['jobId']) + results = BigQuerySupport.execute_query_and_fetch_results(query) stop = time.time() logger.debug('[BENCHMARKING] Time to query BQ for mutation data: '+(stop - start).__str__()) - results = get_bq_job_results(bq_service, query_job['jobReference']) - - # for-each result, add to list - if len(results) > 0: for barcode in results: barcodes.append(str(barcode['f'][0]['v'])) diff --git a/cohorts/metadata_helpers.py b/cohorts/metadata_helpers.py index 452405df..01700245 100644 --- a/cohorts/metadata_helpers.py +++ b/cohorts/metadata_helpers.py @@ -32,7 +32,6 @@ from uuid import uuid4 from django.conf import settings - debug = settings.DEBUG # RO global for this file logger = logging.getLogger('main_logger') @@ -607,61 +606,6 @@ def format_for_display(item): return formatted_item - -# Builds a BQ API v2 QueryParameter set and WHERE clause string from a set of filters of the form: -# { -# 'field_name': [,...] -# } -# Breaks out ' IS NULL' -# 2+ values are converted to IN (,...) -# Filters must already be pre-bucketed or formatted -# TODO: add support for BETWEEN -# TODO: add support for <>= -def build_bq_filter_and_params(filters): - result = { - 'filter_string': '', - 'parameters': [] - } - - filter_set = [] - - for attr, values in filters.items(): - filter_string = '' - query_param = { - 'name': attr, - 'parameterType': { - - }, - 'parameterValue': { - - } - } - if 'None' in values: - values.remove('None') - filter_string = "{} IS NULL".format(attr) - - if len(values) > 0: - if len(filter_string): - filter_string += " OR " - if len(values) == 1: - # Scalar param - query_param['parameterType']['type'] = ('STRING' if re.compile(ur'[^0-9\.]', re.UNICODE).search(values[0]) else 'INT64') - query_param['parameterValue']['value'] = values[0] - filter_string += "{} = @{}".format(attr, attr) - else: - # Array param - query_param['parameterType']['type'] = "ARRAY" - query_param['parameterValue'] = {'arrayValues': [{'value': x} for x in values]} - query_param['parameterType']['arrayType'] = {'type': ('STRING' if re.compile(ur'[^0-9\.]', re.UNICODE).search(values[0]) else 'INT64')} - filter_string += "{} IN UNNEST(@{})".format(attr,attr) - - filter_set.append('({})'.format(filter_string)) - result['parameters'].append(query_param) - - result['filter_string'] = " AND ".join(filter_set) - - return result - # Construct WHERE clauses for BigQuery and CloudSQL based on a set of filters # If the names of the columns differ across the 2 platforms, the alt_key_map can be # used to map a filter 'key' to a different column name @@ -854,7 +798,7 @@ def build_where_clause(filters, alt_key_map=False, program=None, for_files=False def sql_simple_number_by_200(value, field): - if debug: print >> sys.stderr, 'Called ' + sys._getframe().f_code.co_name + if debug: logger.debug('[DEBUG] Called ' + sys._getframe().f_code.co_name) result = '' if isinstance(value, basestring): @@ -1011,7 +955,7 @@ def sql_bmi_by_ranges(value): def sql_age_by_ranges(value, bin_by_five=False): - if debug: print >> sys.stderr,'Called '+sys._getframe().f_code.co_name + if debug: logger.debug('[DEBUG] Called '+sys._getframe().f_code.co_name) result = '' if isinstance(value, basestring): value = [value] @@ -1064,7 +1008,7 @@ def sql_age_by_ranges(value, bin_by_five=False): def gql_age_by_ranges(q, key, value): - if debug: print >> sys.stderr,'Called '+sys._getframe().f_code.co_name + if debug: logger.debug('[DEBUG] Called '+sys._getframe().f_code.co_name) result = '' if not isinstance(value, basestring): # value is a list of ranges diff --git a/cohorts/views.py b/cohorts/views.py index 14ac0e93..63df767c 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -23,6 +23,7 @@ import time import django +from google_helpers.bigquery.cohort_support import BigQuerySupport from google_helpers.bigquery.cohort_support import BigQueryCohortSupport from google_helpers.bigquery.export_support import BigQueryExportCohort, BigQueryExportFileList from django.contrib import messages @@ -292,23 +293,10 @@ def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None cohort_dataset=bq_cohort_dataset,cohort_table=bq_cohort_table, cohort=cohort ) - bq_service = authorize_credentials_with_Google() - query_job = submit_bigquery_job(bq_service, settings.BQ_PROJECT_ID, query) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, - query_job['jobReference']['jobId']) - barcodes = [] - retries = 0 - - while not job_is_done and retries < BQ_ATTEMPT_MAX: - retries += 1 - sleep(1) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, - query_job['jobReference']['jobId']) - - results = get_bq_job_results(bq_service, query_job['jobReference']) + results = BigQuerySupport.execute_query_and_fetch_results(query) - if len(results) > 0: + if results and len(results) > 0: for barcode in results: barcodes.append(str(barcode['f'][0]['v'])) @@ -2047,46 +2035,28 @@ def cohort_files(request, cohort_id, limit=25, page=1, offset=0, sort_column='co order_clause = "ORDER BY " + col_map[sort_column] + (" DESC" if sort_order == 1 else "") - bq_service = authorize_credentials_with_Google() if do_filter_count: # Query the count - query_job = submit_bigquery_job(bq_service, settings.BQ_PROJECT_ID, file_count_query.format(select_clause=file_list_query_base)) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, - query_job['jobReference']['jobId']) - retries = 0 start = time.time() - while not job_is_done and retries < BQ_ATTEMPT_MAX: - retries += 1 - sleep(1) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, - query_job['jobReference']['jobId']) + results = BigQuerySupport.execute_query_and_fetch_results(file_count_query.format(select_clause=file_list_query_base)) stop = time.time() logger.debug('[BENCHMARKING] Time to query BQ for dicom count: ' + (stop - start).__str__()) - results = get_bq_job_results(bq_service, query_job['jobReference']) for entry in results: total_file_count = int(entry['f'][0]['v']) # Query the file list only if there was anything to find if (total_file_count and do_filter_count) or not do_filter_count: - query_job = submit_bigquery_job(bq_service, settings.BQ_PROJECT_ID, file_list_query.format( - select_clause=file_list_query_base, order_clause=order_clause, limit_clause=limit_clause, offset_clause=offset_clause) - ) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, query_job['jobReference']['jobId']) - - retries = 0 - start = time.time() - while not job_is_done and retries < BQ_ATTEMPT_MAX: - retries += 1 - sleep(1) - job_is_done = is_bigquery_job_finished(bq_service, settings.BQ_PROJECT_ID, - query_job['jobReference']['jobId']) + results = BigQuerySupport.execute_query_and_fetch_results( + file_list_query.format( + select_clause=file_list_query_base, order_clause=order_clause, limit_clause=limit_clause, + offset_clause=offset_clause + ) + ) stop = time.time() logger.debug('[BENCHMARKING] Time to query BQ for dicom data: ' + (stop - start).__str__()) - results = get_bq_job_results(bq_service, query_job['jobReference']) - if len(results) > 0: for entry in results: file_list.append({ @@ -2395,7 +2365,7 @@ def export_data(request, cohort_id=0, export_type=None): inc_filters = json.loads(request.POST.get('filters', '{}')) filter_params = None if len(inc_filters): - filter_and_params = build_bq_filter_and_params(inc_filters) + filter_and_params = BigQuerySupport.build_bq_filter_and_params(inc_filters) filter_params = filter_and_params['parameters'] filter_conditions = "AND {}".format(filter_and_params['filter_string']) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 9090b3f2..f93b6fe4 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -18,12 +18,15 @@ import logging from django.conf import settings +from time import sleep from google_helpers.bigquery.service import get_bigquery_service from abstract import BigQueryABC +from uuid import uuid4 logger = logging.getLogger('main_logger') MAX_INSERT = settings.MAX_BQ_INSERT +BQ_ATTEMPT_MAX = 10 COHORT_DATASETS = { 'prod': 'cloud_deployment_cohorts', @@ -213,4 +216,157 @@ def _confirm_dataset_and_table(self, desc): else: return { 'status': 'TABLE_EXISTS' - } \ No newline at end of file + } + + # Runs a basic, optionally parameterized query + # If self.project_id, self.dataset_id, and self.table_id are set they + # will be used as the destination table for the query + # WRITE_DISPOSITION is assumed to be for an empty table unless specified + def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY'): + + query_results = None + + # Make yourself a job ID + job_id = str(uuid4()) + + # Generate a BQ service to use in submitting/checking on/fetching the job + bqs = get_bigquery_service() + + # Build your job description + job_desc = { + 'jobReference': { + 'projectId': settings.BIGQUERY_PROJECT_NAME, # This is the project which will *execute* the query + 'job_id': job_id + }, + 'configuration': { + 'query': { + 'query': query, + 'priority': 'INTERACTIVE' + } + } + } + + if parameters: + job_desc['configuration']['query']['queryParameters'] = parameters + + if self.project_id and self.dataset_id and self.table_id: + job_desc['configuration']['query']['destinationTable'] = { + 'projectId': self.project_id, + 'datasetId': self.dataset_id, + 'tableId': self.table_id + } + job_desc['configuration']['query']['writeDisposition'] = write_disposition + + query_job = bqs.jobs().insert( + projectId=settings.BIGQUERY_PROJECT_NAME, + body=job_desc).execute(num_retries=5) + + job_is_done = bqs.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, + obId=query_job['jobReference']['jobId']).execute() + + retries = 0 + + while (job_is_done and not job_is_done['status']['state'] == 'DONE') and retries < BQ_ATTEMPT_MAX: + retries += 1 + sleep(1) + job_is_done = bqs.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, + jobId=query_job['jobReference']['jobId']).execute() + + # Parse the final disposition + if job_is_done and job_is_done['status']['state'] == 'DONE': + if 'status' in job_is_done and 'errors' in job_is_done['status']: + logger.error("[ERROR] During query job {}: {}".format(job_id,str(job_is_done['status']['errors']))) + else: + logger.info("[STATUS] Query {} done, fetching results...".format(job_id)) + query_results = BigQuerySupport.get_job_results(bqs, query_job['jobReference']) + logger.info("[STATUS] {} results found for query {}.".format(str(len(query_results)),job_id)) + else: + logger.error("[ERROR] Query {} took longer than the allowed time to execute--" + + "if you check job ID {} manually you can wait for it to finish.".format(job_id)) + + + return query_results + + # Execute a query to be saved on a temp table (shorthand to instance method above) + @classmethod + def execute_query_and_fetch_results(cls, query, parameters=None, write_disposition='WRITE_EMPTY'): + bqs = cls(None, None, None) + return bqs.execute_query(query, parameters, write_disposition) + + # Given a BQ service and a job reference, fetch out the results + @staticmethod + def get_job_results(bq_service, job_reference): + result = [] + page_token = None + + while True: + page = bq_service.jobs().getQueryResults( + pageToken=page_token, + **job_reference).execute(num_retries=2) + + if int(page['totalRows']) == 0: + break + + rows = page['rows'] + result.extend(rows) + + page_token = page.get('pageToken') + if not page_token: + break + + return result + + # Builds a BQ API v2 QueryParameter set and WHERE clause string from a set of filters of the form: + # { + # 'field_name': [,...] + # } + # Breaks out ' IS NULL' + # 2+ values are converted to IN (,...) + # Filters must already be pre-bucketed or formatted + # TODO: add support for BETWEEN + # TODO: add support for <>= + @staticmethod + def build_bq_filter_and_params(filters): + result = { + 'filter_string': '', + 'parameters': [] + } + + filter_set = [] + + for attr, values in filters.items(): + filter_string = '' + query_param = { + 'name': attr, + 'parameterType': { + + }, + 'parameterValue': { + + } + } + if 'None' in values: + values.remove('None') + filter_string = "{} IS NULL".format(attr) + + if len(values) > 0: + if len(filter_string): + filter_string += " OR " + if len(values) == 1: + # Scalar param + query_param['parameterType']['type'] = ('STRING' if re.compile(ur'[^0-9\.,]', re.UNICODE).search(values[0]) else 'INT64') + query_param['parameterValue']['value'] = values[0] + filter_string += "{} = @{}".format(attr, attr) + else: + # Array param + query_param['parameterType']['type'] = "ARRAY" + query_param['parameterValue'] = {'arrayValues': [{'value': x} for x in values]} + query_param['parameterType']['arrayType'] = {'type': ('STRING' if re.compile(ur'[^0-9\.,]', re.UNICODE).search(values[0]) else 'INT64')} + filter_string += "{} IN UNNEST(@{})".format(attr,attr) + + filter_set.append('({})'.format(filter_string)) + result['parameters'].append(query_param) + + result['filter_string'] = " AND ".join(filter_set) + + return result From cdf41e9c3582dc4f459f84b6184106138c3b03c7 Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Thu, 17 May 2018 13:50:31 -0700 Subject: [PATCH 11/39] -> Bugfix for missing letter --- google_helpers/bigquery/bq_support.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index f93b6fe4..42160fec 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -262,7 +262,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY') body=job_desc).execute(num_retries=5) job_is_done = bqs.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, - obId=query_job['jobReference']['jobId']).execute() + jobId=query_job['jobReference']['jobId']).execute() retries = 0 @@ -284,14 +284,13 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY') logger.error("[ERROR] Query {} took longer than the allowed time to execute--" + "if you check job ID {} manually you can wait for it to finish.".format(job_id)) - return query_results - # Execute a query to be saved on a temp table (shorthand to instance method above) + # Execute a query to be saved on a temp table (shorthand to instance method above), optionally parameterized @classmethod - def execute_query_and_fetch_results(cls, query, parameters=None, write_disposition='WRITE_EMPTY'): + def execute_query_and_fetch_results(cls, query, parameters=None): bqs = cls(None, None, None) - return bqs.execute_query(query, parameters, write_disposition) + return bqs.execute_query(query, parameters) # Given a BQ service and a job reference, fetch out the results @staticmethod From d248af79c0cd26156ed97af72e9c748b6c8c7c1d Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Thu, 17 May 2018 14:56:46 -0700 Subject: [PATCH 12/39] -> Build in concept of 'executing project' for BQS --- google_helpers/bigquery/bq_support.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 42160fec..c7ffb3cb 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -42,9 +42,14 @@ class BigQuerySupport(BigQueryABC): - def __init__(self, project_id, dataset_id, table_id): + def __init__(self, project_id, dataset_id, table_id, executing_project=None): + # Project which will execute any jobs run by this class + self.executing_project = executing_project or settings.BIGQUERY_PROJECT_NAME + # Destination project self.project_id = project_id + # Destination dataset self.dataset_id = dataset_id + # Destination table self.table_id = table_id def _build_request_body_from_rows(self, rows): @@ -235,7 +240,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY') # Build your job description job_desc = { 'jobReference': { - 'projectId': settings.BIGQUERY_PROJECT_NAME, # This is the project which will *execute* the query + 'projectId': self.executing_project, # This is the project which will *execute* the query 'job_id': job_id }, 'configuration': { @@ -258,10 +263,10 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY') job_desc['configuration']['query']['writeDisposition'] = write_disposition query_job = bqs.jobs().insert( - projectId=settings.BIGQUERY_PROJECT_NAME, + projectId=self.executing_project, body=job_desc).execute(num_retries=5) - job_is_done = bqs.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, + job_is_done = bqs.jobs().get(projectId=self.executing_project, jobId=query_job['jobReference']['jobId']).execute() retries = 0 @@ -269,7 +274,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY') while (job_is_done and not job_is_done['status']['state'] == 'DONE') and retries < BQ_ATTEMPT_MAX: retries += 1 sleep(1) - job_is_done = bqs.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, + job_is_done = bqs.jobs().get(projectId=self.executing_project, jobId=query_job['jobReference']['jobId']).execute() # Parse the final disposition From d3d251462bf58c83fe1a458418718dbd8d05846e Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Thu, 17 May 2018 18:33:38 -0700 Subject: [PATCH 13/39] -> #2350 --- google_helpers/bigquery/bq_support.py | 145 +++++++++++++--------- google_helpers/bigquery/export_support.py | 3 +- 2 files changed, 85 insertions(+), 63 deletions(-) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index c7ffb3cb..6f15dc71 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -17,11 +17,12 @@ """ import logging -from django.conf import settings +import re from time import sleep +from uuid import uuid4 +from django.conf import settings from google_helpers.bigquery.service import get_bigquery_service from abstract import BigQueryABC -from uuid import uuid4 logger = logging.getLogger('main_logger') @@ -42,7 +43,7 @@ class BigQuerySupport(BigQueryABC): - def __init__(self, project_id, dataset_id, table_id, executing_project=None): + def __init__(self, project_id, dataset_id, table_id, executing_project=None, table_schema=None): # Project which will execute any jobs run by this class self.executing_project = executing_project or settings.BIGQUERY_PROJECT_NAME # Destination project @@ -51,6 +52,8 @@ def __init__(self, project_id, dataset_id, table_id, executing_project=None): self.dataset_id = dataset_id # Destination table self.table_id = table_id + self.bq_service = get_bigquery_service() + self.table_schema = table_schema def _build_request_body_from_rows(self, rows): insertable_rows = [] @@ -64,11 +67,11 @@ def _build_request_body_from_rows(self, rows): } def _streaming_insert(self, rows): - bigquery_service = get_bigquery_service() - table_data = bigquery_service.tabledata() + table_data = self.bq_service.tabledata() index = 0 next = 0 + response = None while index < len(rows) and next is not None: next = MAX_INSERT+index @@ -82,7 +85,7 @@ def _streaming_insert(self, rows): response = table_data.insertAll(projectId=self.project_id, datasetId=self.dataset_id, tableId=self.table_id, - body=body).execute() + body=body).execute(num_retries=5) index = next return response @@ -90,13 +93,14 @@ def _streaming_insert(self, rows): # Get all the tables for this object's project ID def get_tables(self): bq_tables = [] - bigquery_service = get_bigquery_service() - datasets = bigquery_service.datasets().list(projectId=self.project_id).execute() + datasets = self.bq_service.datasets().list(projectId=self.project_id).execute(num_retries=5) if datasets and 'datasets' in datasets: for dataset in datasets['datasets']: - tables = bigquery_service.tables().list(projectId=self.project_id, - datasetId=dataset['datasetReference']['datasetId']).execute() + tables = self.bq_service.tables().list(projectId=self.project_id, + datasetId=dataset['datasetReference']['datasetId']).execute( + num_retries=5 + ) if 'tables' not in tables: bq_tables.append({'dataset': dataset['datasetReference']['datasetId'], 'table_id': None}) @@ -109,8 +113,7 @@ def get_tables(self): # Check if the dataset referenced by dataset_id exists in the project referenced by project_id def _dataset_exists(self): - bigquery_service = get_bigquery_service() - datasets = bigquery_service.datasets().list(projectId=self.project_id).execute() + datasets = self.bq_service.datasets().list(projectId=self.project_id).execute(num_retries=5) dataset_found = False for dataset in datasets['datasets']: @@ -129,9 +132,8 @@ def _insert_dataset(self): # Note this only confirms that fields required by table_schema are found in the proposed table with the appropriate # type, and that no 'required' fields in the proposed table are absent from table_schema def _confirm_table_schema(self): - bigquery_service = get_bigquery_service() - table = bigquery_service.tables().get(projectId=self.project_id, datasetId=self.dataset_id, - tableId=self.table_id).execute() + table = self.bq_service.tables().get(projectId=self.project_id, datasetId=self.dataset_id, + tableId=self.table_id).execute(num_retries=5) table_fields = table['schema']['fields'] proposed_schema = {x['name']: x['type'] for x in table_fields} @@ -152,8 +154,9 @@ def _confirm_table_schema(self): # Check if the table referenced by table_id exists in the dataset referenced by dataset_id and the # project referenced by project_id def _table_exists(self): - bigquery_service = get_bigquery_service() - tables = bigquery_service.tables().list(projectId=self.project_id, datasetId=self.dataset_id).execute() + tables = self.bq_service.tables().list(projectId=self.project_id, datasetId=self.dataset_id).execute( + num_retries=5 + ) table_found = False if 'tables' in tables: @@ -167,21 +170,19 @@ def _table_exists(self): # project referenced by project_id def _delete_table(self): if self._table_exists(): - bigquery_service = get_bigquery_service() - table_delete = bigquery_service.tables().delete( + table_delete = self.bq_service.tables().delete( projectId=self.project_id, datasetId=self.dataset_id, tableId=self.table_id - ).execute() + ).execute(num_retries=5) if 'errors' in table_delete: logger.error("[ERROR] Couldn't delete table {}:{}.{}".format( - self.project_id,self.dataset_id,self.table_id + self.project_id, self.dataset_id, self.table_id )) # Insert an table, optionally providing a list of cohort IDs to include in the description def _insert_table(self, desc): - bigquery_service = get_bigquery_service() - tables = bigquery_service.tables() + tables = self.bq_service.tables() response = tables.insert(projectId=self.project_id, datasetId=self.dataset_id, body={ 'friendlyName': self.table_id, @@ -193,7 +194,7 @@ def _insert_table(self, desc): 'projectId': self.project_id, 'tableId': self.table_id } - }).execute() + }).execute(num_retries=5) return response @@ -207,16 +208,18 @@ def _confirm_dataset_and_table(self, desc): table_result = self._insert_table(desc) if 'tableReference' not in table_result: return { - 'tableErrors': "Unable to create table {} in project {} and dataset {} - please double-check your project's permissions for the ISB-CGC service account.".format( - self.table_id, self.project_id, self.dataset_id) + 'tableErrors': "Unable to create table {} in project {} and dataset {} - please ".format( + self.table_id, self.project_id, self.dataset_id + ) + "double-check your project's permissions for the ISB-CGC service account." } return { 'status': 'TABLE_MADE' } elif not self._confirm_table_schema(): return { - 'tableErrors': "The table schema of {} does not match the required schema for cohort export. Please make a new table, or adjust this table's schema.".format( - self.table_id) + 'tableErrors': "The table schema of {} does not match the required schema for cohort export.".format( + self.table_id + ) + "Please make a new table, or adjust this table's schema." } else: return { @@ -227,16 +230,13 @@ def _confirm_dataset_and_table(self, desc): # If self.project_id, self.dataset_id, and self.table_id are set they # will be used as the destination table for the query # WRITE_DISPOSITION is assumed to be for an empty table unless specified - def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY'): + def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', cost_est=False): query_results = None # Make yourself a job ID job_id = str(uuid4()) - # Generate a BQ service to use in submitting/checking on/fetching the job - bqs = get_bigquery_service() - # Build your job description job_desc = { 'jobReference': { @@ -262,63 +262,86 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY') } job_desc['configuration']['query']['writeDisposition'] = write_disposition - query_job = bqs.jobs().insert( + if cost_est: + job_desc['configuration']['dryRun'] = True + + query_job = self.bq_service.jobs().insert( projectId=self.executing_project, body=job_desc).execute(num_retries=5) - job_is_done = bqs.jobs().get(projectId=self.executing_project, - jobId=query_job['jobReference']['jobId']).execute() + # Cost Estimates don't actually run as fully-fledged jobs, and won't be inserted as such, + # so we just get back the estimate immediately + if cost_est: + if query_job['status']['state'] == 'DONE': + return { + 'total_bytes_billed': query_job['statistics']['query']['totalBytesBilled'], + 'total_bytes_processed': query_job['statistics']['query']['totalBytesProcessed'] + } + + job_is_done = self.bq_service.jobs().get(projectId=self.executing_project, + jobId=job_id).execute(num_retries=5) retries = 0 while (job_is_done and not job_is_done['status']['state'] == 'DONE') and retries < BQ_ATTEMPT_MAX: retries += 1 sleep(1) - job_is_done = bqs.jobs().get(projectId=self.executing_project, - jobId=query_job['jobReference']['jobId']).execute() + job_is_done = self.bq_service.jobs().get(projectId=self.executing_project, + jobId=job_id).execute(num_retries=5) # Parse the final disposition if job_is_done and job_is_done['status']['state'] == 'DONE': if 'status' in job_is_done and 'errors' in job_is_done['status']: - logger.error("[ERROR] During query job {}: {}".format(job_id,str(job_is_done['status']['errors']))) + logger.error("[ERROR] During query job {}: {}".format(job_id, str(job_is_done['status']['errors']))) else: logger.info("[STATUS] Query {} done, fetching results...".format(job_id)) - query_results = BigQuerySupport.get_job_results(bqs, query_job['jobReference']) - logger.info("[STATUS] {} results found for query {}.".format(str(len(query_results)),job_id)) + query_results = self.fetch_job_results(query_job['jobReference']) + logger.info("[STATUS] {} results found for query {}.".format(str(len(query_results)), job_id)) else: logger.error("[ERROR] Query {} took longer than the allowed time to execute--" + "if you check job ID {} manually you can wait for it to finish.".format(job_id)) return query_results + # Fetch the results of a job based on the reference provided + def fetch_job_results(self, job_ref): + result = [] + page_token = None + + while True: + page = self.bq_service.jobs().getQueryResults( + pageToken=page_token, + **job_ref).execute(num_retries=2) + + if int(page['totalRows']) == 0: + break + + rows = page['rows'] + result.extend(rows) + + page_token = page.get('pageToken') + if not page_token: + break + + return result + # Execute a query to be saved on a temp table (shorthand to instance method above), optionally parameterized @classmethod def execute_query_and_fetch_results(cls, query, parameters=None): bqs = cls(None, None, None) return bqs.execute_query(query, parameters) - # Given a BQ service and a job reference, fetch out the results - @staticmethod - def get_job_results(bq_service, job_reference): - result = [] - page_token = None - - while True: - page = bq_service.jobs().getQueryResults( - pageToken=page_token, - **job_reference).execute(num_retries=2) - - if int(page['totalRows']) == 0: - break - - rows = page['rows'] - result.extend(rows) - - page_token = page.get('pageToken') - if not page_token: - break + # Do a 'dry run' query, which estimates the cost + @classmethod + def estimate_query_cost(cls, query, parameters=None): + bqs = cls(None, None, None) + return bqs.execute_query(query, parameters, cost_est=True) - return result + # Given a BQ service and a job reference, fetch out the results + @classmethod + def get_job_results(cls, job_reference): + bqs = cls(None, None, None) + return bqs.fetch_job_results(job_reference) # Builds a BQ API v2 QueryParameter set and WHERE clause string from a set of filters of the form: # { diff --git a/google_helpers/bigquery/export_support.py b/google_helpers/bigquery/export_support.py index 76e00918..dc9754d5 100644 --- a/google_helpers/bigquery/export_support.py +++ b/google_helpers/bigquery/export_support.py @@ -120,8 +120,7 @@ class BigQueryExport(BigQueryExportABC, BigQuerySupport): def __init__(self, project_id, dataset_id, table_id, bucket_path, file_name, table_schema): - super(BigQueryExport, self).__init__(project_id, dataset_id, table_id) - self.table_schema = table_schema + super(BigQueryExport, self).__init__(project_id, dataset_id, table_id, table_schema=table_schema) self.bucket_path = bucket_path self.file_name = file_name From 74cd9463d53f649933132757ff74009c90d6c737 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Mon, 21 May 2018 14:29:00 -0700 Subject: [PATCH 14/39] -> jobId, not job_id --- google_helpers/bigquery/bq_support.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 6f15dc71..097a13a6 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -241,7 +241,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', job_desc = { 'jobReference': { 'projectId': self.executing_project, # This is the project which will *execute* the query - 'job_id': job_id + 'jobId': job_id }, 'configuration': { 'query': { @@ -251,6 +251,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', } } + if parameters: job_desc['configuration']['query']['queryParameters'] = parameters From bc40c48c8c26ebbf4a2ba4649b932a761bbc4171 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Wed, 23 May 2018 15:24:47 -0700 Subject: [PATCH 15/39] -> Fix job_id / jobId error --- cohorts/metadata_helpers.py | 2 +- google_helpers/bigquery/bq_support.py | 1 - google_helpers/bigquery/export_support.py | 14 +++++++------- google_helpers/load_data_from_csv.py | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cohorts/metadata_helpers.py b/cohorts/metadata_helpers.py index 01700245..9b8a6195 100644 --- a/cohorts/metadata_helpers.py +++ b/cohorts/metadata_helpers.py @@ -1055,7 +1055,7 @@ def submit_bigquery_job(bq_service, project_id, query_body, batch=False): job_data = { 'jobReference': { 'projectId': project_id, - 'job_id': str(uuid4()) + 'jobId': str(uuid4()) }, 'configuration': { 'query': { diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 097a13a6..39b8798e 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -251,7 +251,6 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', } } - if parameters: job_desc['configuration']['query']['queryParameters'] = parameters diff --git a/google_helpers/bigquery/export_support.py b/google_helpers/bigquery/export_support.py index dc9754d5..e714ad77 100644 --- a/google_helpers/bigquery/export_support.py +++ b/google_helpers/bigquery/export_support.py @@ -189,14 +189,14 @@ def _table_to_gcs(self, file_format, dataset_and_table, export_type): body=export_config).execute(num_retries=5) job_is_done = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, - jobId=export_job['jobReference']['jobId']).execute() + jobId=job_id).execute() retries = 0 while (job_is_done and not job_is_done['status']['state'] == 'DONE') and retries < BQ_ATTEMPT_MAX: retries += 1 sleep(1) - job_is_done = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, jobId=export_job['jobReference']['jobId']).execute() + job_is_done = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, jobId=job_id).execute() result = { 'status': None, @@ -219,7 +219,7 @@ def _table_to_gcs(self, file_format, dataset_and_table, export_type): if not exported_file: msg = "Export file {}/{} not found".format(self.bucket_path, self.file_name) logger.error("[ERROR] ".format({msg})) - export_result = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, jobId=export_job['jobReference']['jobId']).execute() + export_result = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, jobId=job_id).execute() if 'errors' in export_result: logger.error('[ERROR] Errors seen: {}'.format(export_result['errors'][0]['message'])) result['status'] = 'error' @@ -257,7 +257,7 @@ def _query_to_table(self, query, parameters, export_type, write_disp, to_temp=Fa query_data = { 'jobReference': { 'projectId': settings.BIGQUERY_PROJECT_NAME, - 'job_id': job_id + 'jobId': job_id }, 'configuration': { 'query': { @@ -282,7 +282,7 @@ def _query_to_table(self, query, parameters, export_type, write_disp, to_temp=Fa projectId=settings.BIGQUERY_PROJECT_NAME, body=query_data).execute(num_retries=5) - job_is_done = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, jobId=query_job['jobReference']['jobId']).execute() + job_is_done = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, jobId=job_id).execute() retries = 0 @@ -290,7 +290,7 @@ def _query_to_table(self, query, parameters, export_type, write_disp, to_temp=Fa retries += 1 sleep(1) job_is_done = bq_service.jobs().get(projectId=settings.BIGQUERY_PROJECT_NAME, - jobId=query_job['jobReference']['jobId']).execute() + jobId=job_id).execute() result = { 'status': None, @@ -321,7 +321,7 @@ def _query_to_table(self, query, parameters, export_type, write_disp, to_temp=Fa msg = "Export table {}:{}.{} not found".format(self.project_id,self.dataset_id,self.table_id) logger.error("[ERROR] ".format({msg})) bq_result = bq_service.jobs().getQueryResults(projectId=settings.BIGQUERY_PROJECT_NAME, - jobId=query_job['jobReference']['jobId']).execute() + jobId=job_id).execute() if 'errors' in bq_result: logger.error('[ERROR] Errors seen: {}'.format(bq_result['errors'][0]['message'])) result['status'] = 'error' diff --git a/google_helpers/load_data_from_csv.py b/google_helpers/load_data_from_csv.py index f16a0267..3e1bf953 100644 --- a/google_helpers/load_data_from_csv.py +++ b/google_helpers/load_data_from_csv.py @@ -53,7 +53,7 @@ def load_table(bigquery, project_id, dataset_id, table_name, source_schema, job_data = { 'jobReference': { 'projectId': project_id, - 'job_id': str(uuid.uuid4()) + 'jobId': str(uuid.uuid4()) }, 'configuration': { 'load': { From fedadc154301924e4f0c1d533ac7b45ff3d24e2d Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Mon, 28 May 2018 23:06:03 -0700 Subject: [PATCH 16/39] -> #2315: querying across builds for multiple mutation filters with OR vs. AND --- cohorts/metadata_counting.py | 162 ++++++++++++++++++-------- cohorts/metadata_helpers.py | 17 +-- cohorts/views.py | 3 +- google_helpers/bigquery/bq_support.py | 93 +++++++++++++-- 4 files changed, 213 insertions(+), 62 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index 27671126..f7f881e8 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -226,7 +226,8 @@ def count_public_data_type(user, data_query, inc_filters, program_list, filter_f # Tally counts for metadata filters of public programs -def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=None, build='HG19'): +def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=None, build='HG19', comb_mut_filters='OR'): + comb_mut_filters = comb_mut_filters.upper() counts_and_total = { 'counts': [], @@ -270,16 +271,12 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non if 'MUT:' in key: if not mutation_filters: mutation_filters = {} - mutation_filters[key] = inc_filters[key] - build = key.split(':')[1] + mutation_filters[key] = inc_filters[key]['values'] elif 'data_type' in key: data_type_filters[key.split(':')[-1]] = inc_filters[key] else: filters[key.split(':')[-1]] = inc_filters[key] - if mutation_filters: - mutation_where_clause = build_where_clause(mutation_filters) - data_avail_sample_subquery = None if len(data_type_filters) > 0: @@ -332,7 +329,35 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non # If there is a mutation filter, make a temporary table from the sample barcodes that this query # returns - if mutation_where_clause: + if mutation_filters: + build_queries = {} + + for mut_filt in mutation_filters: + build = mut_filt.split(':')[1] + + if build not in build_queries: + build_queries[build] = { + 'raw_filters': {}, + 'filter_str_params': [], + 'queries': [] + } + build_queries[build]['raw_filters'][mut_filt] = mutation_filters[mut_filt] + + for build in build_queries: + if comb_mut_filters == 'AND': + filter_num = 0 + for filter in build_queries[build]['raw_filters']: + this_filter = {} + this_filter[filter] = build_queries[build]['raw_filters'][filter] + build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( + this_filter, comb_mut_filters, build+'_{}'.format(str(filter_num)) + )) + filter_num += 1 + else: + build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( + build_queries[build]['raw_filters'], comb_mut_filters, build + )) + cohort_join_str = '' cohort_where_str = '' bq_cohort_table = '' @@ -340,48 +365,93 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non bq_cohort_project_id = '' cohort = '' - bq_table_info = BQ_MOLECULAR_ATTR_TABLES[Program.objects.get(id=program_id).name][build] - sample_barcode_col = bq_table_info['sample_barcode_col'] - bq_dataset = bq_table_info['dataset'] - bq_table = bq_table_info['table'] - bq_data_project_id = settings.BIGQUERY_DATA_PROJECT_NAME - - query_template = None - - if cohort_id is not None: - query_template = \ - ("SELECT ct.sample_barcode" - " FROM [{cohort_project_id}:{cohort_dataset}.{cohort_table}] ct" - " JOIN (SELECT sample_barcode_tumor AS barcode " - " FROM [{data_project_id}:{dataset_name}.{table_name}]" - " WHERE " + mutation_where_clause['big_query_str'] + - " GROUP BY barcode) mt" - " ON mt.barcode = ct.sample_barcode" - " WHERE ct.cohort_id = {cohort};") - - bq_cohort_table = settings.BIGQUERY_COHORT_TABLE_ID - bq_cohort_dataset = settings.COHORT_DATASET_ID - bq_cohort_project_id = settings.BIGQUERY_PROJECT_NAME - - cohort = cohort_id - else: - query_template = \ - ("SELECT {barcode_col}" - " FROM [{data_project_id}:{dataset_name}.{table_name}]" - " WHERE " + mutation_where_clause['big_query_str'] + - " GROUP BY {barcode_col}; ") + for build in build_queries: + bq_table_info = BQ_MOLECULAR_ATTR_TABLES[Program.objects.get(id=program_id).name][build] + sample_barcode_col = bq_table_info['sample_barcode_col'] + bq_dataset = bq_table_info['dataset'] + bq_table = bq_table_info['table'] + bq_data_project_id = settings.BIGQUERY_DATA_PROJECT_NAME + + cohort_param = None + + query_template = None + + if cohort_id is not None: + query_template = \ + ("SELECT ct.sample_barcode" + " FROM `{cohort_project_id}.{cohort_dataset}.{cohort_table}` ct" + " JOIN (SELECT sample_barcode_tumor AS barcode " + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {where_clause}" + " GROUP BY barcode) mt" + " ON mt.barcode = ct.sample_barcode" + " WHERE ct.cohort_id = @cohort") + + cohort_param = { + 'name': 'cohort', + 'parameterType': { + 'type': 'STRING' + }, + 'parameterValue': { + 'value': cohort_id + } + } - params = mutation_where_clause['value_tuple'][0] + bq_cohort_table = settings.BIGQUERY_COHORT_TABLE_ID + bq_cohort_dataset = settings.COHORT_DATASET_ID + bq_cohort_project_id = settings.BIGQUERY_PROJECT_NAME + else: + query_template = \ + ("SELECT {barcode_col}" + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {where_clause}" + " GROUP BY {barcode_col} ") + + for filter_str_param in build_queries[build]['filter_str_params']: + build_queries[build]['queries'].append( + query_template.format(dataset_name=bq_dataset, cohort_project_id=bq_cohort_project_id, + data_project_id=bq_data_project_id, table_name=bq_table, + barcode_col=sample_barcode_col, cohort_dataset=bq_cohort_dataset, + cohort_table=bq_cohort_table, where_clause=filter_str_param['filter_string'])) + + cohort_param and filter_str_param['parameters'].append(cohort_param) + + + + query = None + queries = [q for build in build_queries for q in build_queries[build]['queries']] + params = [z for build in build_queries for y in build_queries[build]['filter_str_params'] for z in y['parameters']] - query = query_template.format(dataset_name=bq_dataset, cohort_project_id=bq_cohort_project_id, - data_project_id=bq_data_project_id, table_name=bq_table, barcode_col=sample_barcode_col, - hugo_symbol=str(params['gene']), var_class=params['var_class'], - cohort_dataset=bq_cohort_dataset, cohort_table=bq_cohort_table, cohort=cohort) + if len(queries) > 1: + logger.debug("comb_mut_filters: {}".format(str(comb_mut_filters))) + if comb_mut_filters == 'OR': + query = """ UNION DISTINCT """.join(queries) + else: + query_template = """ + SELECT q0.sample_barcode_tumor + FROM ({query1}) q0 + {join_clauses} + """ + + join_template = """ + JOIN ({query}) q{ct} + ON q{ct}.sample_barcode_tumor = q0.sample_barcode_tumor + """ + + joins = [] + + for i,val in enumerate(queries[1:]): + joins.append(join_template.format(query=val, ct=str(i+1))) + + query = query_template.format(query1=queries[0], join_clauses=" ".join(joins)) + else: + query = queries[0] + logger.debug("mutation query: {}".format(query)) barcodes = [] start = time.time() - results = BigQuerySupport.execute_query_and_fetch_results(query) + results = BigQuerySupport.execute_query_and_fetch_results(query, params) stop = time.time() logger.debug('[BENCHMARKING] Time to query BQ for mutation data: '+(stop - start).__str__()) @@ -785,14 +855,12 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non if db and db.open: db.close() -def public_metadata_counts(req_filters, cohort_id, user, program_id, limit=None): +def public_metadata_counts(req_filters, cohort_id, user, program_id, limit=None, comb_mut_filters='OR'): filters = {} if req_filters is not None: try: for key in req_filters: - # mutation filters do not insert column names into queries, so they don't need to be - # validated if not validate_filter_key(key, program_id): raise Exception('Invalid filter key received: ' + key) this_filter = req_filters[key] @@ -806,7 +874,7 @@ def public_metadata_counts(req_filters, cohort_id, user, program_id, limit=None) raise Exception('Filters must be a valid JSON formatted object of filter sets, with value lists keyed on filter names.') start = time.time() - counts_and_total = count_public_metadata(user, cohort_id, filters, program_id) + counts_and_total = count_public_metadata(user, cohort_id, filters, program_id, comb_mut_filters=comb_mut_filters) # parsets_items = build_data_avail_plot_data(user, cohort_id, filters, program_id) stop = time.time() diff --git a/cohorts/metadata_helpers.py b/cohorts/metadata_helpers.py index 9b8a6195..1efcb3e6 100644 --- a/cohorts/metadata_helpers.py +++ b/cohorts/metadata_helpers.py @@ -609,18 +609,20 @@ def format_for_display(item): # Construct WHERE clauses for BigQuery and CloudSQL based on a set of filters # If the names of the columns differ across the 2 platforms, the alt_key_map can be # used to map a filter 'key' to a different column name -def build_where_clause(filters, alt_key_map=False, program=None, for_files=False): +def build_where_clause(filters, alt_key_map=False, program=None, for_files=False, comb_with='OR'): first = True query_str = '' big_query_str = '' # todo: make this work for non-string values -- use {}.format value_tuple = () key_order = [] keyType = None - gene = None grouped_filters = None for key, value in filters.items(): + gene = None + invert = False + if isinstance(value, dict) and 'values' in value: value = value['values'] @@ -639,6 +641,7 @@ def build_where_clause(filters, alt_key_map=False, program=None, for_files=False keyType = key.split(':')[0] if keyType == 'MUT': gene = key.split(':')[2] + invert = bool(key.split(':')[3] == 'NOT') key = key.split(':')[-1] # Multitable filter lists don't come in as string as they can contain arbitrary text in values @@ -658,20 +661,20 @@ def build_where_clause(filters, alt_key_map=False, program=None, for_files=False if first: first = False else: - big_query_str += ' AND' + big_query_str += ' {}'.format(comb_with) - big_query_str += " %s = '{hugo_symbol}' AND " % 'Hugo_Symbol' + big_query_str += " (%s = '{hugo_symbol}' AND " % 'Hugo_Symbol' params['gene'] = gene if(key == 'category'): if value == 'any': - big_query_str += '%s IS NOT NULL' % 'Variant_Classification' + big_query_str += '%s IS NOT NULL)' % 'Variant_Classification' params['var_class'] = '' else: - big_query_str += '%s IN ({var_class})' % 'Variant_Classification' + big_query_str += '%s {}IN ({var_class}))'.format('Variant_Classification', "NOT " if invert else "") values = MOLECULAR_CATEGORIES[value]['attrs'] else: - big_query_str += '%s IN ({var_class})' % 'Variant_Classification' + big_query_str += '%s {}IN ({var_class}))'.format('Variant_Classification', "NOT " if invert else "") values = value if value != 'any': diff --git a/cohorts/views.py b/cohorts/views.py index 63df767c..342f19d1 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -1826,6 +1826,7 @@ def unshare_cohort(request, cohort_id=0): @login_required def get_metadata(request): filters = json.loads(request.GET.get('filters', '{}')) + comb_mut_filters = request.GET.get('mut_filter_combine', 'OR') cohort = request.GET.get('cohort_id', None) limit = request.GET.get('limit', None) program_id = request.GET.get('program_id', None) @@ -1835,7 +1836,7 @@ def get_metadata(request): user = Django_User.objects.get(id=request.user.id) if program_id is not None and program_id > 0: - results = public_metadata_counts(filters[str(program_id)], cohort, user, program_id, limit) + results = public_metadata_counts(filters[str(program_id)], cohort, user, program_id, limit, comb_mut_filters=comb_mut_filters) # If there is an extent cohort, to get the cohort's new totals per applied filters # we have to check the unfiltered programs for their numbers and tally them in diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 39b8798e..659bc6dd 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -40,6 +40,22 @@ 'staging': 'staging_cohorts' } +MOLECULAR_CATEGORIES = { + 'nonsilent': { + 'name': 'Non-silent', + 'attrs': [ + 'Missense_Mutation', + 'Nonsense_Mutation', + 'Nonstop_Mutation', + 'Frame_Shift_Del', + 'Frame_Shift_Ins', + 'In_Frame_Del', + 'In_Frame_Ins', + 'Translation_Start_Site', + ] + } +} + class BigQuerySupport(BigQueryABC): @@ -253,6 +269,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', if parameters: job_desc['configuration']['query']['queryParameters'] = parameters + job_desc['configuration']['query']['useLegacySql'] = False if self.project_id and self.dataset_id and self.table_id: job_desc['configuration']['query']['destinationTable'] = { @@ -298,7 +315,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', query_results = self.fetch_job_results(query_job['jobReference']) logger.info("[STATUS] {} results found for query {}.".format(str(len(query_results)), job_id)) else: - logger.error("[ERROR] Query {} took longer than the allowed time to execute--" + + logger.error("[ERROR] Query took longer than the allowed time to execute--" + "if you check job ID {} manually you can wait for it to finish.".format(job_id)) return query_results @@ -353,7 +370,9 @@ def get_job_results(cls, job_reference): # TODO: add support for BETWEEN # TODO: add support for <>= @staticmethod - def build_bq_filter_and_params(filters): + def build_bq_filter_and_params(filters, comb_with='AND', param_suffix=None): + + logger.debug("filters received: {}".format(str(filters))) result = { 'filter_string': '', 'parameters': [] @@ -361,10 +380,68 @@ def build_bq_filter_and_params(filters): filter_set = [] - for attr, values in filters.items(): + mutation_filters = {} + other_filters = {} + + # Split mutation filters into their own set, because of repeat use of the same attrs + for attr in filters: + if 'MUT:' in attr: + mutation_filters[attr] = filters[attr] + else: + other_filters[attr] = filters[attr] + + mut_filtr_count = 1 + for attr, values in mutation_filters.items(): + gene = attr.split(':')[2] + type = attr.split(':')[-1] + invert = bool(attr.split(':')[3] == 'NOT') + param_name = 'gene{}{}'.format(str(mut_filtr_count), '_{}'.format(param_suffix) if param_suffix else '') + filter_string = 'Hugo_Symbol = @{} AND '.format(param_name) + + gene_query_param = { + 'name': param_name, + 'parameterType': { + 'type': 'STRING' + }, + 'parameterValue': { + 'value': gene + } + } + + var_query_param = { + 'name': None, + 'parameterType': { + 'type': None + }, + 'parameterValue': { + + } + } + + if type == 'category' and values[0] == 'any': + filter_string += 'Variant_Classification IS NOT NULL' + var_query_param = None + else: + if type == 'category': + values = MOLECULAR_CATEGORIES[values[0]]['attrs'] + var_param_name = "var_class{}{}".format(str(mut_filtr_count), '_{}'.format(param_suffix) if param_suffix else '') + filter_string += 'Variant_Classification {}IN UNNEST(@{})'.format('NOT ' if invert else '', var_param_name) + var_query_param['name'] = var_param_name + var_query_param['parameterType']['type'] = 'ARRAY' + var_query_param['parameterValue'] = {'arrayValues': [{'value': x} for x in values]} + var_query_param['parameterType']['arrayType'] = {'type': 'STRING'} + + filter_set.append('({})'.format(filter_string)) + result['parameters'].append(gene_query_param) + var_query_param and result['parameters'].append(var_query_param) + + mut_filtr_count += 1 + + for attr, values in other_filters.items(): filter_string = '' + param_name = attr + '{}'.format('_{}'.format(param_suffix) if param_suffix else '') query_param = { - 'name': attr, + 'name': param_name, 'parameterType': { }, @@ -383,17 +460,19 @@ def build_bq_filter_and_params(filters): # Scalar param query_param['parameterType']['type'] = ('STRING' if re.compile(ur'[^0-9\.,]', re.UNICODE).search(values[0]) else 'INT64') query_param['parameterValue']['value'] = values[0] - filter_string += "{} = @{}".format(attr, attr) + filter_string += "{} = @{}".format(attr, param_name) else: # Array param query_param['parameterType']['type'] = "ARRAY" query_param['parameterValue'] = {'arrayValues': [{'value': x} for x in values]} query_param['parameterType']['arrayType'] = {'type': ('STRING' if re.compile(ur'[^0-9\.,]', re.UNICODE).search(values[0]) else 'INT64')} - filter_string += "{} IN UNNEST(@{})".format(attr,attr) + filter_string += "{} IN UNNEST(@{})".format(attr, param_name) filter_set.append('({})'.format(filter_string)) result['parameters'].append(query_param) - result['filter_string'] = " AND ".join(filter_set) + result['filter_string'] = " {} ".format(comb_with).join(filter_set) + + logger.debug("bq filter builder result: {}".format(str(result))) return result From f30041060bb4b851d8923bce736c46bae40ac4b4 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 13:17:40 -0700 Subject: [PATCH 17/39] -> #2315: fix extent-cohort interactions --- cohorts/metadata_counting.py | 117 ++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 56 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index f7f881e8..5527aaf3 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -332,6 +332,7 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non if mutation_filters: build_queries = {} + # Split the filters into 'not any' and 'all other filters' for mut_filt in mutation_filters: build = mut_filt.split(':')[1] @@ -339,10 +340,20 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non build_queries[build] = { 'raw_filters': {}, 'filter_str_params': [], - 'queries': [] + 'queries': [], + 'not_any': None } - build_queries[build]['raw_filters'][mut_filt] = mutation_filters[mut_filt] + if 'NOT:' in mut_filt and 'category' in mut_filt and 'any' in mutation_filters[mut_filt]: + if not build_queries[build]['not_any']: + build_queries[build]['not_any'] = {} + build_queries[build]['not_any'][mut_filt] = mutation_filters[mut_filt] + else: + build_queries[build]['raw_filters'][mut_filt] = mutation_filters[mut_filt] + + # If the combination is with AND, further split the 'not-not-any' filters, because they must be + # queried separately and JOIN'd. OR is done with UNION DISINCT and all of one build can go into + # a single query. for build in build_queries: if comb_mut_filters == 'AND': filter_num = 0 @@ -353,18 +364,12 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non this_filter, comb_mut_filters, build+'_{}'.format(str(filter_num)) )) filter_num += 1 - else: + elif comb_mut_filters == 'OR': build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( build_queries[build]['raw_filters'], comb_mut_filters, build )) - cohort_join_str = '' - cohort_where_str = '' - bq_cohort_table = '' - bq_cohort_dataset = '' - bq_cohort_project_id = '' - cohort = '' - + # Create the queries and their parameters for build in build_queries: bq_table_info = BQ_MOLECULAR_ATTR_TABLES[Program.objects.get(id=program_id).name][build] sample_barcode_col = bq_table_info['sample_barcode_col'] @@ -372,58 +377,55 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non bq_table = bq_table_info['table'] bq_data_project_id = settings.BIGQUERY_DATA_PROJECT_NAME - cohort_param = None - - query_template = None - - if cohort_id is not None: - query_template = \ - ("SELECT ct.sample_barcode" - " FROM `{cohort_project_id}.{cohort_dataset}.{cohort_table}` ct" - " JOIN (SELECT sample_barcode_tumor AS barcode " - " FROM `{data_project_id}.{dataset_name}.{table_name}`" - " WHERE {where_clause}" - " GROUP BY barcode) mt" - " ON mt.barcode = ct.sample_barcode" - " WHERE ct.cohort_id = @cohort") - - cohort_param = { - 'name': 'cohort', - 'parameterType': { - 'type': 'STRING' - }, - 'parameterValue': { - 'value': cohort_id - } - } - - bq_cohort_table = settings.BIGQUERY_COHORT_TABLE_ID - bq_cohort_dataset = settings.COHORT_DATASET_ID - bq_cohort_project_id = settings.BIGQUERY_PROJECT_NAME - else: - query_template = \ - ("SELECT {barcode_col}" - " FROM `{data_project_id}.{dataset_name}.{table_name}`" - " WHERE {where_clause}" - " GROUP BY {barcode_col} ") + # Build the query for any filter which *isn't* a not-any query. + query_template = \ + ("SELECT {barcode_col}" + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {where_clause}" + " GROUP BY {barcode_col} ") for filter_str_param in build_queries[build]['filter_str_params']: build_queries[build]['queries'].append( - query_template.format(dataset_name=bq_dataset, cohort_project_id=bq_cohort_project_id, - data_project_id=bq_data_project_id, table_name=bq_table, - barcode_col=sample_barcode_col, cohort_dataset=bq_cohort_dataset, - cohort_table=bq_cohort_table, where_clause=filter_str_param['filter_string'])) - - cohort_param and filter_str_param['parameters'].append(cohort_param) - + query_template.format(dataset_name=bq_dataset, data_project_id=bq_data_project_id, + table_name=bq_table, barcode_col=sample_barcode_col, + where_clause=filter_str_param['filter_string'])) + # Here we build not-any queries + if build_queries[build]['not_any']: + query_template = \ + ("SELECT {barcode_col}" + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {barcode_col} NOT IN (" + "SELECT {barcode_col}" + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {where_clause}" + " GROUP BY {barcode_col}) " + " GROUP BY {barcode_col}") + + any_count = 0 + for not_any in build_queries[build]['not_any']: + filter = not_any.replace("NOT:","") + any_filter = {} + any_filter[filter] = build_queries[build]['not_any'][not_any] + filter_str_param = BigQuerySupport.build_bq_filter_and_params( + any_filter,param_suffix=build+'_any_{}'.format(any_count) + ) + + build_queries[build]['filter_str_params'].append(filter_str_param) + + any_count += 1 + + build_queries[build]['queries'].append(query_template.format( + dataset_name=bq_dataset, data_project_id=bq_data_project_id, table_name=bq_table, + barcode_col=sample_barcode_col, where_clause=filter_str_param['filter_string'])) query = None + # Collect the queries for chaining below with UNION or JOIN queries = [q for build in build_queries for q in build_queries[build]['queries']] + # Because our parameters are uniquely named, they can be combined into a single list params = [z for build in build_queries for y in build_queries[build]['filter_str_params'] for z in y['parameters']] if len(queries) > 1: - logger.debug("comb_mut_filters: {}".format(str(comb_mut_filters))) if comb_mut_filters == 'OR': query = """ UNION DISTINCT """.join(queries) else: @@ -447,7 +449,6 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non else: query = queries[0] - logger.debug("mutation query: {}".format(query)) barcodes = [] start = time.time() @@ -469,7 +470,7 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non make_tmp_mut_table_str = """ CREATE TEMPORARY TABLE %s ( - tumor_sample_id VARCHAR(100) + tumor_sample_id VARCHAR(100) ); """ % tmp_mut_table @@ -513,10 +514,10 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non make_tmp_table_str += (' JOIN (%s) da ON da.da_sample_barcode = ms.sample_barcode ' % data_avail_sample_subquery) params_tuple += data_type_where_clause['value_tuple'] - # Cohorts are built into the mutation table, so we don't need to filter on the cohort if we made a mutation table if tmp_mut_table: make_tmp_table_str += (' JOIN %s sc ON sc.tumor_sample_id = ms.sample_barcode' % tmp_mut_table) - elif cohort_id: + + if cohort_id: make_tmp_table_str += (' JOIN (%s) cs ON cs.cs_sample_barcode = ms.sample_barcode' % cohort_query) params_tuple += (cohort_id,) @@ -541,6 +542,10 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non make_tmp_table_str += (' JOIN (%s) da ON da.da_sample_barcode = ms.sample_barcode' % data_avail_sample_subquery) params_tuple += data_type_where_clause['value_tuple'] + if cohort_id: + make_tmp_table_str += (' JOIN (%s) cs ON cs.cs_sample_barcode = ms.sample_barcode' % cohort_query) + params_tuple += (cohort_id,) + make_tmp_table_str += ';' if len(params_tuple) > 0: From 07c8e76d05a3bc6dc44d7f1e71aace9b6cf929fa Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 14:00:41 -0700 Subject: [PATCH 18/39] -> #2317, catch for non-existant public program, allow for lack of stored display formats --- cohorts/models.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/cohorts/models.py b/cohorts/models.py index 359ca824..2aaa6e9c 100755 --- a/cohorts/models.py +++ b/cohorts/models.py @@ -24,6 +24,7 @@ from django.db.models import Q from django.utils.html import escape from projects.models import Project, Program, User_Feature_Definitions + from sharing.models import Shared_Resource from metadata_helpers import fetch_metadata_value_set, fetch_program_data_types, MOLECULAR_DISPLAY_STRINGS @@ -301,14 +302,22 @@ def format_filters_for_display(cls, filters): prog_data_types = None for cohort_filter in filters: - prog_id = Program.objects.get(name=cohort_filter['program'], is_public=True, active=True).id - if prog_id not in prog_vals: - prog_vals[prog_id] = fetch_metadata_value_set(prog_id) - if prog_id not in prog_dts: - prog_dts[prog_id] = fetch_program_data_types(prog_id, True) - - prog_values = prog_vals[prog_id] - prog_data_types = prog_dts[prog_id] + prog = None + prog_id = None + is_private = False + try: + prog_id = Program.objects.get(name=cohort_filter['program'], is_public=True, active=True).id + except ObjectDoesNotExist: + is_private = True + + if not is_private: + if prog_id not in prog_vals: + prog_vals[prog_id] = fetch_metadata_value_set(prog_id) + if prog_id not in prog_dts: + prog_dts[prog_id] = fetch_program_data_types(prog_id, True) + + prog_values = prog_vals[prog_id] + prog_data_types = prog_dts[prog_id] if 'MUT:' in cohort_filter['name']: cohort_filter['displ_name'] = cohort_filter['name'].split(':')[2].upper() + ' [' + cohort_filter['name'].split(':')[1].upper() + ',' + string.capwords(cohort_filter['name'].split(':')[3]) @@ -317,7 +326,7 @@ def format_filters_for_display(cls, filters): cohort_filter['displ_name'] = 'Data Type' cohort_filter['displ_val'] = prog_data_types[cohort_filter['value']] else: - if cohort_filter['name'] not in prog_values: + if not prog_values or cohort_filter['name'] not in prog_values: cohort_filter['displ_name'] = cohort_filter['name'] cohort_filter['displ_val'] = cohort_filter['value'] else: From 02bea7e55da08ac8b6e7b6fe7adedcbb08ad8b38 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 14:03:53 -0700 Subject: [PATCH 19/39] -> Adding in ObjectDoesNotExist --- cohorts/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cohorts/models.py b/cohorts/models.py index 2aaa6e9c..daa63645 100755 --- a/cohorts/models.py +++ b/cohorts/models.py @@ -24,7 +24,7 @@ from django.db.models import Q from django.utils.html import escape from projects.models import Project, Program, User_Feature_Definitions - +from django.core.exceptions import ObjectDoesNotExist from sharing.models import Shared_Resource from metadata_helpers import fetch_metadata_value_set, fetch_program_data_types, MOLECULAR_DISPLAY_STRINGS From c0840823c08b2c6269b9e1a65213e0e83f77936e Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 15:27:37 -0700 Subject: [PATCH 20/39] -> Removing old log lines -> bring sample/case list up to date with new mutation filter builder --- cohorts/metadata_counting.py | 1 - cohorts/views.py | 177 +++++++++++++++++--------- google_helpers/bigquery/bq_support.py | 3 - 3 files changed, 120 insertions(+), 61 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index 5527aaf3..1175f42c 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -236,7 +236,6 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non } mutation_filters = None - mutation_where_clause = None data_type_where_clause = None filters = {} data_type_filters = {} diff --git a/cohorts/views.py b/cohorts/views.py index 342f19d1..1fd42fd7 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -75,7 +75,7 @@ def convert(data): else: return data -def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None, build='HG19'): +def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None, build='HG19', comb_mut_filters='OR'): if program_id is None and cohort_id is None: # We must always have a program_id or a cohort_id - we cannot have neither, because then @@ -96,7 +96,6 @@ def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None mutation_filters = None user_data_filters = None data_type_filters = False - mutation_where_clause = None if inc_filters is None: inc_filters = {} @@ -106,8 +105,8 @@ def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None if 'MUT:' in key: if not mutation_filters: mutation_filters = {} - mutation_filters[key] = inc_filters[key] - build = key.split(':')[1] + mutation_filters[key] = inc_filters[key]['values'] + elif 'user_' in key: if not user_data_filters: user_data_filters = {} @@ -191,9 +190,6 @@ def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None return samples_and_cases # end user_data - if mutation_filters: - mutation_where_clause = build_where_clause(mutation_filters) - cohort_query = """ SELECT sample_barcode cs_sample_barcode, project_id FROM cohorts_samples @@ -244,57 +240,128 @@ def get_sample_case_list(user, inc_filters=None, cohort_id=None, program_id=None # If there is a mutation filter, make a temporary table from the sample barcodes that this query # returns - if mutation_where_clause: - cohort_join_str = '' - cohort_where_str = '' - bq_cohort_table = '' - bq_cohort_dataset = '' - bq_cohort_project_id = '' - cohort = '' - query_template = None - - bq_table_info = BQ_MOLECULAR_ATTR_TABLES[Program.objects.get(id=program_id).name][build] - sample_barcode_col = bq_table_info['sample_barcode_col'] - bq_dataset = bq_table_info['dataset'] - bq_table = bq_table_info['table'] - bq_data_project_id = settings.BIGQUERY_DATA_PROJECT_NAME - - query_template = None - - if cohort_id is not None: - query_template = \ - ("SELECT ct.sample_barcode" - " FROM [{project_id}:{cohort_dataset}.{cohort_table}] ct" - " JOIN (SELECT sample_barcode_tumor AS barcode " - " FROM [{data_project_id}:{dataset_name}.{table_name}]" - " WHERE " + mutation_where_clause['big_query_str'] + - " GROUP BY barcode) mt" - " ON mt.barcode = ct.sample_barcode" - " WHERE ct.cohort_id = {cohort};") - + if mutation_filters: + build_queries = {} + + # Split the filters into 'not any' and 'all other filters' + for mut_filt in mutation_filters: + build = mut_filt.split(':')[1] + + if build not in build_queries: + build_queries[build] = { + 'raw_filters': {}, + 'filter_str_params': [], + 'queries': [], + 'not_any': None + } + + if 'NOT:' in mut_filt and 'category' in mut_filt and 'any' in mutation_filters[mut_filt]: + if not build_queries[build]['not_any']: + build_queries[build]['not_any'] = {} + build_queries[build]['not_any'][mut_filt] = mutation_filters[mut_filt] + else: + build_queries[build]['raw_filters'][mut_filt] = mutation_filters[mut_filt] + + # If the combination is with AND, further split the 'not-not-any' filters, because they must be + # queried separately and JOIN'd. OR is done with UNION DISINCT and all of one build can go into + # a single query. + for build in build_queries: + if comb_mut_filters == 'AND': + filter_num = 0 + for filter in build_queries[build]['raw_filters']: + this_filter = {} + this_filter[filter] = build_queries[build]['raw_filters'][filter] + build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( + this_filter, comb_mut_filters, build+'_{}'.format(str(filter_num)) + )) + filter_num += 1 + elif comb_mut_filters == 'OR': + build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( + build_queries[build]['raw_filters'], comb_mut_filters, build + )) - bq_cohort_table = settings.BIGQUERY_COHORT_TABLE_ID - bq_cohort_dataset = settings.COHORT_DATASET_ID - bq_cohort_project_id = settings.BIGQUERY_PROJECT_NAME - cohort = cohort_id + # Create the queries and their parameters + for build in build_queries: + bq_table_info = BQ_MOLECULAR_ATTR_TABLES[Program.objects.get(id=program_id).name][build] + sample_barcode_col = bq_table_info['sample_barcode_col'] + bq_dataset = bq_table_info['dataset'] + bq_table = bq_table_info['table'] + bq_data_project_id = settings.BIGQUERY_DATA_PROJECT_NAME - else: + # Build the query for any filter which *isn't* a not-any query. query_template = \ ("SELECT {barcode_col}" - " FROM [{data_project_id}:{dataset_name}.{table_name}]" - " WHERE " + mutation_where_clause['big_query_str'] + - " GROUP BY {barcode_col}; ") + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {where_clause}" + " GROUP BY {barcode_col} ") + + for filter_str_param in build_queries[build]['filter_str_params']: + build_queries[build]['queries'].append( + query_template.format(dataset_name=bq_dataset, data_project_id=bq_data_project_id, + table_name=bq_table, barcode_col=sample_barcode_col, + where_clause=filter_str_param['filter_string'])) + + # Here we build not-any queries + if build_queries[build]['not_any']: + query_template = \ + ("SELECT {barcode_col}" + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {barcode_col} NOT IN (" + "SELECT {barcode_col}" + " FROM `{data_project_id}.{dataset_name}.{table_name}`" + " WHERE {where_clause}" + " GROUP BY {barcode_col}) " + " GROUP BY {barcode_col}") + + any_count = 0 + for not_any in build_queries[build]['not_any']: + filter = not_any.replace("NOT:","") + any_filter = {} + any_filter[filter] = build_queries[build]['not_any'][not_any] + filter_str_param = BigQuerySupport.build_bq_filter_and_params( + any_filter,param_suffix=build+'_any_{}'.format(any_count) + ) + + build_queries[build]['filter_str_params'].append(filter_str_param) + + any_count += 1 + + build_queries[build]['queries'].append(query_template.format( + dataset_name=bq_dataset, data_project_id=bq_data_project_id, table_name=bq_table, + barcode_col=sample_barcode_col, where_clause=filter_str_param['filter_string'])) + + query = None + # Collect the queries for chaining below with UNION or JOIN + queries = [q for build in build_queries for q in build_queries[build]['queries']] + # Because our parameters are uniquely named, they can be combined into a single list + params = [z for build in build_queries for y in build_queries[build]['filter_str_params'] for z in y['parameters']] + + if len(queries) > 1: + if comb_mut_filters == 'OR': + query = """ UNION DISTINCT """.join(queries) + else: + query_template = """ + SELECT q0.sample_barcode_tumor + FROM ({query1}) q0 + {join_clauses} + """ - params = mutation_where_clause['value_tuple'][0] + join_template = """ + JOIN ({query}) q{ct} + ON q{ct}.sample_barcode_tumor = q0.sample_barcode_tumor + """ - query = query_template.format( - dataset_name=bq_dataset, project_id=bq_cohort_project_id, table_name=bq_table, barcode_col=sample_barcode_col, - hugo_symbol=str(params['gene']), data_project_id=bq_data_project_id, var_class=params['var_class'], - cohort_dataset=bq_cohort_dataset,cohort_table=bq_cohort_table, cohort=cohort - ) + joins = [] + + for i,val in enumerate(queries[1:]): + joins.append(join_template.format(query=val, ct=str(i+1))) + + query = query_template.format(query1=queries[0], join_clauses=" ".join(joins)) + else: + query = queries[0] barcodes = [] - results = BigQuerySupport.execute_query_and_fetch_results(query) + results = BigQuerySupport.execute_query_and_fetch_results(query, params) if results and len(results) > 0: for barcode in results: @@ -811,6 +878,7 @@ def save_cohort(request, workbook_id=None, worksheet_id=None, create_workbook=Fa apply_filters = request.POST.getlist('apply-filters') apply_barcodes = request.POST.getlist('apply-barcodes') apply_name = request.POST.getlist('apply-name') + mut_comb_with = request.POST.get('mut_filter_combine') # we only deactivate the source if we are applying filters to a previously-existing # source cohort @@ -857,12 +925,12 @@ def save_cohort(request, workbook_id=None, worksheet_id=None, create_workbook=Fa results = {} for prog in filter_obj: - results[prog] = get_sample_case_list(request.user, filter_obj[prog], source, prog) + results[prog] = get_sample_case_list(request.user, filter_obj[prog], source, prog, comb_mut_filters=mut_comb_with) if cohort_progs: for prog in cohort_progs: if prog.id not in results: - results[prog.id] = get_sample_case_list(request.user, {}, source, prog.id) + results[prog.id] = get_sample_case_list(request.user, {}, source, prog.id, comb_mut_filters=mut_comb_with) if len(barcodes) > 0: for program in barcodes: @@ -2265,8 +2333,6 @@ def export_data(request, cohort_id=0, export_type=None): req_user = User.objects.get(id=request.user.id) export_dest = request.POST.get('export-dest', None) - logger.debug("export_type is {}".format("None" if not export_type else export_type)) - if not export_type or not export_dest: raise Exception("Can't perform export--destination and/or export type weren't provided!") @@ -2494,9 +2560,6 @@ def export_data(request, cohort_id=0, export_type=None): query_string = union_queries[0] query_string = '#standardSQL\n' + query_string - logger.debug("[STATUS] query to table export query: ") - logger.debug(query_string) - # Export the data if export_dest == 'table': bcs = BigQueryExportCohort(bq_proj_id, dataset, table) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 659bc6dd..d3da0a96 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -372,7 +372,6 @@ def get_job_results(cls, job_reference): @staticmethod def build_bq_filter_and_params(filters, comb_with='AND', param_suffix=None): - logger.debug("filters received: {}".format(str(filters))) result = { 'filter_string': '', 'parameters': [] @@ -473,6 +472,4 @@ def build_bq_filter_and_params(filters, comb_with='AND', param_suffix=None): result['filter_string'] = " {} ".format(comb_with).join(filter_set) - logger.debug("bq filter builder result: {}".format(str(result))) - return result From f440a0f0fbde33ae0aad9c16c86163df418e183b Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 15:39:26 -0700 Subject: [PATCH 21/39] -> #2315, fix resultant cohort filter formatting --- cohorts/models.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cohorts/models.py b/cohorts/models.py index daa63645..da001330 100755 --- a/cohorts/models.py +++ b/cohorts/models.py @@ -320,8 +320,15 @@ def format_filters_for_display(cls, filters): prog_data_types = prog_dts[prog_id] if 'MUT:' in cohort_filter['name']: - cohort_filter['displ_name'] = cohort_filter['name'].split(':')[2].upper() + ' [' + cohort_filter['name'].split(':')[1].upper() + ',' + string.capwords(cohort_filter['name'].split(':')[3]) - cohort_filter['displ_val'] = (MOLECULAR_DISPLAY_STRINGS['values'][cohort_filter['value']] if cohort_filter['name'].split(':')[3] != 'category' else MOLECULAR_DISPLAY_STRINGS['categories'][cohort_filter['value']]) + ']' + cohort_filter['displ_name'] = ("NOT(" if 'NOT:' in cohort_filter['name'] else '') \ + + cohort_filter['name'].split(':')[2].upper() \ + + ' [' + cohort_filter['name'].split(':')[1].upper() + ',' \ + + string.capwords(cohort_filter['name'].split(':')[-1]) + cohort_filter['displ_val'] = ( + MOLECULAR_DISPLAY_STRINGS['values'][cohort_filter['value']] if cohort_filter['name'].split(':')[-1] != 'category' + else MOLECULAR_DISPLAY_STRINGS['categories'][cohort_filter['value']]) \ + + ']' \ + + (")" if 'NOT:' in cohort_filter['name'] else '') elif cohort_filter['name'] == 'data_type': cohort_filter['displ_name'] = 'Data Type' cohort_filter['displ_val'] = prog_data_types[cohort_filter['value']] From 3e4c0159092ae26f193a040c7364f1aac6a3f0b0 Mon Sep 17 00:00:00 2001 From: wlongabaugh Date: Tue, 29 May 2018 18:10:05 -0700 Subject: [PATCH 22/39] Fix for 2331 --- accounts/sa_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index 06b64021..10557a46 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -1096,6 +1096,7 @@ def demo_process_success(auth, user_id, saml_response): st_logger.write_text_log_entry(LOG_NAME_ERA_LOGIN_VIEW, "[ERROR] Failed to publish to PubSub topic: {}".format(str(e))) + retval.messages.append(warn_message) return retval From 3fd88f5ae03564bb2fc2576286640c11ba647bd1 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 18:32:08 -0700 Subject: [PATCH 23/39] -> Added 'verify_user_is_in_gcp' to sa_utils -> #2158 --- accounts/sa_utils.py | 25 ++++++++++++++++++++++++- projects/views.py | 8 +++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index 06b64021..b1acde10 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -1099,7 +1099,6 @@ def demo_process_success(auth, user_id, saml_response): return retval - def deactivate_nih_add_to_open(user_id, user_email): try: nih_user = NIH_User.objects.get(user_id=user_id, linked=True) @@ -1151,3 +1150,27 @@ def get_nih_user_details(user_id): return user_details +def verify_user_is_in_gcp(user_id, gcp_id): + user_in_gcp = False + + user_email = User.objects.get(id=user_id) + + try: + crm_service = get_special_crm_resource() + + iam_policy = crm_service.projects().getIamPolicy(resource=gcp_id, body={}).execute() + bindings = iam_policy['bindings'] + for val in bindings: + members = val['members'] + for member in members: + if member.startswith('user:'): + if user_email.lower() == member.split(':')[1].lower(): + user_in_gcp = True + + except Exception as e: + logger.error("[ERROR] While validating user {} membership in GCP {}:".format(str(user_id),gcp_id)) + logger.exception(e) + logger.warn("[WARNING] Because we can't confirm if user {} is in GCP {} we must assume they're not.".format(str(user_id),gcp_id)) + user_in_gcp = False + + return user_in_gcp diff --git a/projects/views.py b/projects/views.py index 8719b3fb..53701d23 100644 --- a/projects/views.py +++ b/projects/views.py @@ -34,6 +34,7 @@ from sharing.service import create_share from accounts.models import GoogleProject, Bucket, BqDataset from googleapiclient.errors import HttpError +from accounts.sa_utils import verify_user_is_in_gcp import json import requests @@ -180,6 +181,8 @@ def upload_files(request): try: + req_user = User.objects.get(id=request.user.id) + # TODO: Validation blacklist = re.compile(BLACKLIST_RE, re.UNICODE) @@ -275,6 +278,9 @@ def upload_files(request): dataset = BqDataset.objects.get(id=request.POST['dataset']) google_project = bucket.google_project + if not verify_user_is_in_gcp(request.user.id, google_project.project_id): + raise Exception("User {} is not a member of GCP {} and so cannot write to its buckets or datasets!".format(req_user.email,google_project.project_id)) + # TODO: This has to be done at the same time as the user data processor config = { "USER_PROJECT": program.id, @@ -480,7 +486,7 @@ def upload_files(request): logger.info(e) except Exception as e: - print >> sys.stdout, "[ERROR] Exception in upload_files:" + logger.error("[ERROR] Exception in upload_files:") logger.exception(e) project.delete() From eb81d6b7c494327c545a1fe7cd74817bfbda902b Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 18:35:09 -0700 Subject: [PATCH 24/39] -> whitespace --- accounts/sa_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index b1acde10..5dee22f4 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -1152,7 +1152,6 @@ def get_nih_user_details(user_id): def verify_user_is_in_gcp(user_id, gcp_id): user_in_gcp = False - user_email = User.objects.get(id=user_id) try: From 0fa95ddf0871fd914239d7ae9051bb8a1394dde4 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Tue, 29 May 2018 18:40:32 -0700 Subject: [PATCH 25/39] -> Better error message --- accounts/sa_utils.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index 5dee22f4..fbcbb3de 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -1152,9 +1152,9 @@ def get_nih_user_details(user_id): def verify_user_is_in_gcp(user_id, gcp_id): user_in_gcp = False - user_email = User.objects.get(id=user_id) - + user_email = None try: + user_email = User.objects.get(id=user_id).email crm_service = get_special_crm_resource() iam_policy = crm_service.projects().getIamPolicy(resource=gcp_id, body={}).execute() @@ -1167,9 +1167,16 @@ def verify_user_is_in_gcp(user_id, gcp_id): user_in_gcp = True except Exception as e: - logger.error("[ERROR] While validating user {} membership in GCP {}:".format(str(user_id),gcp_id)) - logger.exception(e) - logger.warn("[WARNING] Because we can't confirm if user {} is in GCP {} we must assume they're not.".format(str(user_id),gcp_id)) + user = None + if type(e) is ObjectDoesNotExist: + user = str(user_id) + logger.error("[ERROR] While validating user {} membership in GCP {}:".format(user, gcp_id)) + logger.error("Could not find user with ID {}!".format(user)) + else: + user = user_email + logger.error("[ERROR] While validating user {} membership in GCP {}:".format(user, gcp_id)) + logger.exception(e) + logger.warn("[WARNING] Because we can't confirm if user {} is in GCP {} we must assume they're not.".format(user, gcp_id)) user_in_gcp = False return user_in_gcp From 522a03a0f7bdbafbda9a29b5e272972d869f6fac Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Fri, 1 Jun 2018 12:09:41 -0700 Subject: [PATCH 26/39] -> Log lines to debug --- google_helpers/bigquery/bq_support.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index d3da0a96..60cede15 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -318,6 +318,11 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', logger.error("[ERROR] Query took longer than the allowed time to execute--" + "if you check job ID {} manually you can wait for it to finish.".format(job_id)) + logger.debug("[STATUS] Logging statements for test debug:") + logger.debug("State: {}".format(str(job_is_done['status']['state']))) + logger.debug("Exeucting project: {}".format(self.executing_project)) + logger.debug("jobId: {}".format(job_is_done['jobReference']['jobId'])) + return query_results # Fetch the results of a job based on the reference provided From 226886ecd83431a1fc906980aa981dc515fb19e8 Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Fri, 1 Jun 2018 14:12:46 -0700 Subject: [PATCH 27/39] -> Log lines to debug --- google_helpers/bigquery/bq_support.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 60cede15..24314424 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -319,7 +319,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', "if you check job ID {} manually you can wait for it to finish.".format(job_id)) logger.debug("[STATUS] Logging statements for test debug:") - logger.debug("State: {}".format(str(job_is_done['status']['state']))) + logger.debug("State: {}".format(str(job_is_done['status']['state']) if job_is_done and 'status' in job_is_done and 'state' in job_is_done['status'] else 'N/A')) logger.debug("Exeucting project: {}".format(self.executing_project)) logger.debug("jobId: {}".format(job_is_done['jobReference']['jobId'])) From cce70516c4be44c15e54d59947106cd2640ba5d9 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Fri, 1 Jun 2018 16:47:07 -0700 Subject: [PATCH 28/39] -> Log lines to debug --- google_helpers/bigquery/bq_support.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 24314424..442b64b2 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -305,6 +305,10 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', sleep(1) job_is_done = self.bq_service.jobs().get(projectId=self.executing_project, jobId=job_id).execute(num_retries=5) + if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ + job_is_done['statistics']['query']: + for timeline in job_is_done['statistics']['query']['timeline']: + logger.debug("Elapsed: {}".format(str(timeline['elapsedMs']))) # Parse the final disposition if job_is_done and job_is_done['status']['state'] == 'DONE': From 4df2e1aef84633c351096072d081025d7dafc84f Mon Sep 17 00:00:00 2001 From: s-paquette Date: Fri, 1 Jun 2018 16:49:11 -0700 Subject: [PATCH 29/39] -> Log lines to debug --- google_helpers/bigquery/bq_support.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 442b64b2..9e7aaf3e 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -326,6 +326,10 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', logger.debug("State: {}".format(str(job_is_done['status']['state']) if job_is_done and 'status' in job_is_done and 'state' in job_is_done['status'] else 'N/A')) logger.debug("Exeucting project: {}".format(self.executing_project)) logger.debug("jobId: {}".format(job_is_done['jobReference']['jobId'])) + if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ + job_is_done['statistics']['query']: + for timeline in job_is_done['statistics']['query']['timeline']: + logger.debug("Elapsed: {}".format(str(timeline['elapsedMs']))) return query_results From d931272ff71adb05c558e06c7bb2f0065fc9d195 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Fri, 1 Jun 2018 16:50:10 -0700 Subject: [PATCH 30/39] -> Log lines to debug --- google_helpers/bigquery/bq_support.py | 1 + 1 file changed, 1 insertion(+) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 9e7aaf3e..e8a7fbf5 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -326,6 +326,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', logger.debug("State: {}".format(str(job_is_done['status']['state']) if job_is_done and 'status' in job_is_done and 'state' in job_is_done['status'] else 'N/A')) logger.debug("Exeucting project: {}".format(self.executing_project)) logger.debug("jobId: {}".format(job_is_done['jobReference']['jobId'])) + if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ job_is_done['statistics']['query']: for timeline in job_is_done['statistics']['query']['timeline']: From 7697a8c4b468c07a2dc7c7def2fb0a1dbfeef20e Mon Sep 17 00:00:00 2001 From: s-paquette Date: Fri, 1 Jun 2018 16:50:50 -0700 Subject: [PATCH 31/39] -> Log lines to debug --- google_helpers/bigquery/bq_support.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index e8a7fbf5..888b235c 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -305,6 +305,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', sleep(1) job_is_done = self.bq_service.jobs().get(projectId=self.executing_project, jobId=job_id).execute(num_retries=5) + if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ job_is_done['statistics']['query']: for timeline in job_is_done['statistics']['query']['timeline']: @@ -326,7 +327,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', logger.debug("State: {}".format(str(job_is_done['status']['state']) if job_is_done and 'status' in job_is_done and 'state' in job_is_done['status'] else 'N/A')) logger.debug("Exeucting project: {}".format(self.executing_project)) logger.debug("jobId: {}".format(job_is_done['jobReference']['jobId'])) - + if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ job_is_done['statistics']['query']: for timeline in job_is_done['statistics']['query']['timeline']: From b7ab5245fc0d58b2497060a6207c0aa9892f347e Mon Sep 17 00:00:00 2001 From: s-paquette Date: Fri, 1 Jun 2018 20:28:49 -0700 Subject: [PATCH 32/39] -> Fixing log statements --- google_helpers/bigquery/bq_support.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index 888b235c..890cf005 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -27,7 +27,7 @@ logger = logging.getLogger('main_logger') MAX_INSERT = settings.MAX_BQ_INSERT -BQ_ATTEMPT_MAX = 10 +BQ_ATTEMPT_MAX = settings.BQ_MAX_ATTEMPTS COHORT_DATASETS = { 'prod': 'cloud_deployment_cohorts', @@ -305,11 +305,6 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', sleep(1) job_is_done = self.bq_service.jobs().get(projectId=self.executing_project, jobId=job_id).execute(num_retries=5) - - if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ - job_is_done['statistics']['query']: - for timeline in job_is_done['statistics']['query']['timeline']: - logger.debug("Elapsed: {}".format(str(timeline['elapsedMs']))) # Parse the final disposition if job_is_done and job_is_done['status']['state'] == 'DONE': @@ -330,8 +325,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ job_is_done['statistics']['query']: - for timeline in job_is_done['statistics']['query']['timeline']: - logger.debug("Elapsed: {}".format(str(timeline['elapsedMs']))) + logger.debug("Elapsed: {}".format(str(job_is_done['statistics']['query']['timeline'][-1]['elapsedMs']))) return query_results From 23ffd3b704fb32ebfc0ee4a51673a833d20db72c Mon Sep 17 00:00:00 2001 From: s-paquette Date: Mon, 4 Jun 2018 13:34:19 -0700 Subject: [PATCH 33/39] -> Some files have no sample barcodes--use case as well for exporting --- cohorts/views.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cohorts/views.py b/cohorts/views.py index 1fd42fd7..20aca037 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -2448,11 +2448,11 @@ def export_data(request, cohort_id=0, export_type=None): PARSE_TIMESTAMP("%Y-%m-%d %H:%M:%S","{date_added}", "{tz}") as date_added FROM `{metadata_table}` md JOIN ( - SELECT sample_barcode + SELECT sample_barcode, case_barcode FROM `{deployment_project}.{deployment_dataset}.{deployment_cohort_table}` WHERE cohort_id = {cohort_id} ) cs - ON cs.sample_barcode = md.sample_barcode + ON (cs.sample_barcode = md.sample_barcode OR md.case_barcode=cs.case_barcode) WHERE md.file_uploaded {filter_conditions} ORDER BY md.sample_barcode """ From 06fff2f9c73df141ed5f30b14eb93c3b5612f73e Mon Sep 17 00:00:00 2001 From: s-paquette Date: Mon, 4 Jun 2018 13:38:54 -0700 Subject: [PATCH 34/39] -> Only match on case_barcode if sample wasn't available (to avoid duplication of sample-based files across case barcodes) --- cohorts/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cohorts/views.py b/cohorts/views.py index 20aca037..19f2f51b 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -2452,7 +2452,7 @@ def export_data(request, cohort_id=0, export_type=None): FROM `{deployment_project}.{deployment_dataset}.{deployment_cohort_table}` WHERE cohort_id = {cohort_id} ) cs - ON (cs.sample_barcode = md.sample_barcode OR md.case_barcode=cs.case_barcode) + ON (cs.sample_barcode = md.sample_barcode OR (md.sample_barcode IS NULL AND md.case_barcode=cs.case_barcode)) WHERE md.file_uploaded {filter_conditions} ORDER BY md.sample_barcode """ From 52c1adb40e3fafeec950f97b858aa8734ea36a1a Mon Sep 17 00:00:00 2001 From: s-paquette Date: Mon, 4 Jun 2018 17:16:43 -0700 Subject: [PATCH 35/39] -> Only match on case_barcode if sample wasn't available (to avoid duplication of sample-based files across case barcodes) --- cohorts/views.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/cohorts/views.py b/cohorts/views.py index 19f2f51b..48ceba5f 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -2321,7 +2321,7 @@ def cohort_files(request, cohort_id, limit=25, page=1, offset=0, sort_column='co # Master method for exporting data types to BQ, GCS, etc. @login_required @csrf_protect -def export_data(request, cohort_id=0, export_type=None): +def export_data(request, cohort_id=0, export_type=None, export_sub_type=None): if debug: logger.debug('Called ' + sys._getframe().f_code.co_name) redirect_url = reverse('cohort_list') if not cohort_id else reverse('cohort_filelist', args=[cohort_id]) @@ -2439,22 +2439,28 @@ def export_data(request, cohort_id=0, export_type=None): date_added = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Exporting File Manifest + # Some files only have case barcodes, but some have sample barcodes. We have to make sure + # to export any files linked to a case if any sample from that case is in the cohort, but + # if files are linked to a sample, we only export them if the specific sample is in the cohort. if export_type == 'file_manifest': query_string_base = """ - SELECT md.sample_barcode, md.case_barcode, md.file_name_key as cloud_storage_location, - md.platform, md.data_type, md.data_category, md.experimental_strategy as exp_strategy, md.data_format, - md.file_gdc_id as gdc_file_uuid, md.case_gdc_id as gdc_case_uuid, md.project_short_name, - {cohort_id} as cohort_id, "{build}" as build, - PARSE_TIMESTAMP("%Y-%m-%d %H:%M:%S","{date_added}", "{tz}") as date_added - FROM `{metadata_table}` md - JOIN ( - SELECT sample_barcode, case_barcode - FROM `{deployment_project}.{deployment_dataset}.{deployment_cohort_table}` - WHERE cohort_id = {cohort_id} - ) cs - ON (cs.sample_barcode = md.sample_barcode OR (md.sample_barcode IS NULL AND md.case_barcode=cs.case_barcode)) - WHERE md.file_uploaded {filter_conditions} - ORDER BY md.sample_barcode + SELECT md.sample_barcode, md.case_barcode, md.file_name_key as cloud_storage_location, + md.platform, md.data_type, md.data_category, md.experimental_strategy as exp_strategy, md.data_format, + md.file_gdc_id as gdc_file_uuid, md.case_gdc_id as gdc_case_uuid, md.project_short_name, + {cohort_id} as cohort_id, "{build}" as build, + PARSE_TIMESTAMP("%Y-%m-%d %H:%M:%S","{date_added}", "{tz}") as date_added + FROM `{metadata_table}` md + JOIN (SELECT case_barcode, sample_barcode + FROM `{deployment_project}.{deployment_dataset}.{deployment_cohort_table}` + WHERE cohort_id = {cohort_id} + GROUP BY case_barcode, sample_barcode + ) cs + ON ((NOT cs.sample_barcode ='' AND cs.sample_barcode=md.sample_barcode) OR (cs.case_barcode=md.case_barcode)) + WHERE md.file_uploaded {filter_conditions} + GROUP BY md.sample_barcode, md.case_barcode, cloud_storage_location, + md.platform, md.data_type, md.data_category, exp_strategy, md.data_format, + gdc_file_uuid, gdc_case_uuid, md.project_short_name, cohort_id, build, date_added + ORDER BY md.sample_barcode """ for program in cohort_programs: @@ -2563,7 +2569,7 @@ def export_data(request, cohort_id=0, export_type=None): # Export the data if export_dest == 'table': bcs = BigQueryExportCohort(bq_proj_id, dataset, table) - result = bcs.export_cohort_query_to_bq(query_string, None, cohort_id) + result = bcs.export_cohort_query_to_bq(query_string, filter_params, cohort_id) elif export_dest == 'gcs': # Store file list to BigQuery bcs = BigQueryExportCohort(bq_proj_id, None, None, None, gcs_bucket, file_name) From 3124e7852d88ebe71070a484565194334c88a62d Mon Sep 17 00:00:00 2001 From: s-paquette Date: Wed, 6 Jun 2018 15:56:30 -0700 Subject: [PATCH 36/39] -> Failed cohort filelist AJAX load should redirect back to the cohort details page -> Added try/except to cohort filelist AJAX call -> Debug statements for latency tests --- cohorts/views.py | 132 +++++++++++++++++++++++++++-------------------- 1 file changed, 75 insertions(+), 57 deletions(-) diff --git a/cohorts/views.py b/cohorts/views.py index 48ceba5f..26df788a 100755 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -1640,6 +1640,8 @@ def cohort_filelist(request, cohort_id=0, panel_type=None): "This functionality is currently being worked on and will become available in a future release." ) + logger.debug("[STATUS] Returning response from cohort_filelist") + return render(request, template, {'request': request, 'cohort': cohort, 'total_file_count': (items['total_file_count'] if items else 0), @@ -1652,6 +1654,9 @@ def cohort_filelist(request, cohort_id=0, panel_type=None): 'img_thumbs_url': settings.IMG_THUMBS_URL, 'has_user_data': bool(cohort_sample_list.count() > 0), 'build': build}) + + logger.debug("[STATUS] Returning response from cohort_filelist, with exception") + except Exception as e: logger.error("[ERROR] While trying to view the cohort file list: ") logger.exception(e) @@ -1661,68 +1666,79 @@ def cohort_filelist(request, cohort_id=0, panel_type=None): @login_required def cohort_filelist_ajax(request, cohort_id=0, panel_type=None): - if debug: logger.debug('Called '+sys._getframe().f_code.co_name) - if cohort_id == 0: - response_str = '
' \ - '
' \ - '
' \ - '' \ - 'Cohort provided does not exist.' \ - '
' - return HttpResponse(response_str, status=500) - - params = {} - do_filter_count = True - if request.GET.get('files_per_page', None) is not None: - files_per_page = int(request.GET.get('files_per_page')) - params['limit'] = files_per_page - if request.GET.get('page', None) is not None: - do_filter_count = False - page = int(request.GET.get('page')) - params['page'] = page - offset = (page - 1) * files_per_page + status=200 + + try: + if debug: logger.debug('Called '+sys._getframe().f_code.co_name) + if cohort_id == 0: + response_str = '
' \ + '
' \ + '
' \ + '' \ + 'Cohort provided does not exist.' \ + '
' + return HttpResponse(response_str, status=500) + + params = {} + do_filter_count = True + if request.GET.get('files_per_page', None) is not None: + files_per_page = int(request.GET.get('files_per_page')) + params['limit'] = files_per_page + if request.GET.get('page', None) is not None: + do_filter_count = False + page = int(request.GET.get('page')) + params['page'] = page + offset = (page - 1) * files_per_page + params['offset'] = offset + elif request.GET.get('limit', None) is not None: + limit = int(request.GET.get('limit')) + params['limit'] = limit + + if request.GET.get('offset', None) is not None: + offset = int(request.GET.get('offset')) params['offset'] = offset - elif request.GET.get('limit', None) is not None: - limit = int(request.GET.get('limit')) - params['limit'] = limit - - if request.GET.get('offset', None) is not None: - offset = int(request.GET.get('offset')) - params['offset'] = offset - if request.GET.get('sort_column', None) is not None: - sort_column = request.GET.get('sort_column') - params['sort_column'] = sort_column - if request.GET.get('sort_order', None) is not None: - sort_order = int(request.GET.get('sort_order')) - params['sort_order'] = sort_order - - - build = request.GET.get('build','HG19') - - has_access = auth_dataset_whitelists_for_user(request.user.id) - - result = cohort_files(request=request, cohort_id=cohort_id, build=build, access=has_access, type=panel_type, do_filter_count=do_filter_count, **params) - - # If nothing was found, our total file count will reflect that - if do_filter_count: - metadata_data_attr = fetch_build_data_attr(build) - if len(result['metadata_data_counts']): - for attr in result['metadata_data_counts']: - for val in result['metadata_data_counts'][attr]: - metadata_data_attr[attr]['values'][val]['count'] = result['metadata_data_counts'][attr][val] - else: + if request.GET.get('sort_column', None) is not None: + sort_column = request.GET.get('sort_column') + params['sort_column'] = sort_column + if request.GET.get('sort_order', None) is not None: + sort_order = int(request.GET.get('sort_order')) + params['sort_order'] = sort_order + + + build = request.GET.get('build','HG19') + + has_access = auth_dataset_whitelists_for_user(request.user.id) + + result = cohort_files(request=request, cohort_id=cohort_id, build=build, access=has_access, type=panel_type, do_filter_count=do_filter_count, **params) + + # If nothing was found, our total file count will reflect that + if do_filter_count: + metadata_data_attr = fetch_build_data_attr(build) + if len(result['metadata_data_counts']): + for attr in result['metadata_data_counts']: + for val in result['metadata_data_counts'][attr]: + metadata_data_attr[attr]['values'][val]['count'] = result['metadata_data_counts'][attr][val] + else: + for attr in metadata_data_attr: + for val in metadata_data_attr[attr]['values']: + metadata_data_attr[attr]['values'][val]['count'] = 0 + for attr in metadata_data_attr: - for val in metadata_data_attr[attr]['values']: - metadata_data_attr[attr]['values'][val]['count'] = 0 + metadata_data_attr[attr]['values'] = [metadata_data_attr[attr]['values'][x] for x in + metadata_data_attr[attr]['values']] - for attr in metadata_data_attr: - metadata_data_attr[attr]['values'] = [metadata_data_attr[attr]['values'][x] for x in - metadata_data_attr[attr]['values']] + del result['metadata_data_counts'] + result['metadata_data_attr'] = [metadata_data_attr[x] for x in metadata_data_attr] - del result['metadata_data_counts'] - result['metadata_data_attr'] = [metadata_data_attr[x] for x in metadata_data_attr] + logger.debug("[STATUS] Returning response from cohort_filelist_ajax") - return JsonResponse(result, status=200) + except Exception as e: + logger.error("[ERROR] While retrieving cohort file data for AJAX call:") + logger.exception(e) + status=500 + result={'redirect': reverse('cohort_details', args=[cohort_id]), 'message': "Encountered an error while trying to fetch this cohort's filelist--please contact the administrator."} + + return JsonResponse(result, status=status) @login_required @@ -2315,6 +2331,8 @@ def cohort_files(request, cohort_id, limit=25, page=1, offset=0, sort_column='co if cursor: cursor.close() if db and db.open: db.close() + logger.debug("[STATUS] Returning response from cohort_files") + return resp From 2c8f861d08d39db26a098143b5efbca7a229cdb8 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Wed, 6 Jun 2018 18:50:36 -0700 Subject: [PATCH 37/39] -> tmp_mut_table no longer joins to a cohort table --- cohorts/metadata_counting.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index 1175f42c..22fa9b14 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -581,10 +581,10 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non subquery = base_table excl_params_tuple = () - # Cohorts are built into the mutation table, so we don't need to check for the cohort if there is one if tmp_mut_table: subquery += (' JOIN %s ON tumor_sample_id = sample_barcode ' % tmp_mut_table) - elif cohort_id: + + if cohort_id: subquery += (' JOIN (%s) cs ON cs_sample_barcode = sample_barcode' % cohort_query) excl_params_tuple += (cohort_id,) From 67ec2949a55accec64ac442eeeb8b24af54886e1 Mon Sep 17 00:00:00 2001 From: s-paquette Date: Thu, 7 Jun 2018 13:33:06 -0700 Subject: [PATCH 38/39] -> Break out mutation filter individual selections when ANDing --- cohorts/metadata_counting.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index 22fa9b14..d9693b54 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -350,6 +350,8 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non else: build_queries[build]['raw_filters'][mut_filt] = mutation_filters[mut_filt] + logger.debug("filters: {}".format(str(build_queries))) + # If the combination is with AND, further split the 'not-not-any' filters, because they must be # queried separately and JOIN'd. OR is done with UNION DISINCT and all of one build can go into # a single query. @@ -357,12 +359,22 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non if comb_mut_filters == 'AND': filter_num = 0 for filter in build_queries[build]['raw_filters']: - this_filter = {} - this_filter[filter] = build_queries[build]['raw_filters'][filter] - build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( - this_filter, comb_mut_filters, build+'_{}'.format(str(filter_num)) - )) - filter_num += 1 + # Individual selection filters need to be broken out if we're ANDing + if ':specific' in filter: + for indiv_selex in build_queries[build]['raw_filters'][filter]: + this_filter = {} + this_filter[filter] = [indiv_selex,] + build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( + this_filter, comb_mut_filters, build + '_{}'.format(str(filter_num)) + )) + filter_num += 1 + else: + this_filter = {} + this_filter[filter] = build_queries[build]['raw_filters'][filter] + build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( + this_filter, comb_mut_filters, build+'_{}'.format(str(filter_num)) + )) + filter_num += 1 elif comb_mut_filters == 'OR': build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( build_queries[build]['raw_filters'], comb_mut_filters, build @@ -450,6 +462,9 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non barcodes = [] + logger.debug("Mutation query: {}".format(query)) + logger.debug("Params: {}".format(params)) + start = time.time() results = BigQuerySupport.execute_query_and_fetch_results(query, params) stop = time.time() From ccbfc84f6f55bcc47b7ae0cb62da86573858f398 Mon Sep 17 00:00:00 2001 From: "S. Paquette" Date: Mon, 11 Jun 2018 16:40:47 -0700 Subject: [PATCH 39/39] -> Fixed a bug with ORing filters that was causing a malformed query --- cohorts/metadata_counting.py | 39 +++++++++++++++++------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/cohorts/metadata_counting.py b/cohorts/metadata_counting.py index d9693b54..2c140898 100644 --- a/cohorts/metadata_counting.py +++ b/cohorts/metadata_counting.py @@ -350,8 +350,6 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non else: build_queries[build]['raw_filters'][mut_filt] = mutation_filters[mut_filt] - logger.debug("filters: {}".format(str(build_queries))) - # If the combination is with AND, further split the 'not-not-any' filters, because they must be # queried separately and JOIN'd. OR is done with UNION DISINCT and all of one build can go into # a single query. @@ -376,9 +374,10 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non )) filter_num += 1 elif comb_mut_filters == 'OR': - build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( - build_queries[build]['raw_filters'], comb_mut_filters, build - )) + if len(build_queries[build]['raw_filters']): + build_queries[build]['filter_str_params'].append(BigQuerySupport.build_bq_filter_and_params( + build_queries[build]['raw_filters'], comb_mut_filters, build + )) # Create the queries and their parameters for build in build_queries: @@ -403,32 +402,33 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non # Here we build not-any queries if build_queries[build]['not_any']: - query_template = \ - ("SELECT {barcode_col}" - " FROM `{data_project_id}.{dataset_name}.{table_name}`" - " WHERE {barcode_col} NOT IN (" - "SELECT {barcode_col}" - " FROM `{data_project_id}.{dataset_name}.{table_name}`" - " WHERE {where_clause}" - " GROUP BY {barcode_col}) " - " GROUP BY {barcode_col}") + query_template = """ + SELECT {barcode_col} + FROM `{data_project_id}.{dataset_name}.{table_name}` + WHERE {barcode_col} NOT IN ( + SELECT {barcode_col} + FROM `{data_project_id}.{dataset_name}.{table_name}` + WHERE {where_clause} + GROUP BY {barcode_col}) + GROUP BY {barcode_col} + """ any_count = 0 for not_any in build_queries[build]['not_any']: - filter = not_any.replace("NOT:","") + filter = not_any.replace("NOT:", "") any_filter = {} any_filter[filter] = build_queries[build]['not_any'][not_any] - filter_str_param = BigQuerySupport.build_bq_filter_and_params( + any_filter_str_param = BigQuerySupport.build_bq_filter_and_params( any_filter,param_suffix=build+'_any_{}'.format(any_count) ) - build_queries[build]['filter_str_params'].append(filter_str_param) + build_queries[build]['filter_str_params'].append(any_filter_str_param) any_count += 1 build_queries[build]['queries'].append(query_template.format( dataset_name=bq_dataset, data_project_id=bq_data_project_id, table_name=bq_table, - barcode_col=sample_barcode_col, where_clause=filter_str_param['filter_string'])) + barcode_col=sample_barcode_col, where_clause=any_filter_str_param['filter_string'])) query = None # Collect the queries for chaining below with UNION or JOIN @@ -462,9 +462,6 @@ def count_public_metadata(user, cohort_id=None, inc_filters=None, program_id=Non barcodes = [] - logger.debug("Mutation query: {}".format(query)) - logger.debug("Params: {}".format(params)) - start = time.time() results = BigQuerySupport.execute_query_and_fetch_results(query, params) stop = time.time()