diff --git a/accounts/dcf_support.py b/accounts/dcf_support.py index bab72b0e..8477900d 100755 --- a/accounts/dcf_support.py +++ b/accounts/dcf_support.py @@ -31,7 +31,10 @@ DCF_TOKEN_URL = settings.DCF_TOKEN_URL DCF_GOOGLE_URL = settings.DCF_GOOGLE_URL - +DCF_GOOGLE_SA_REGISTER_URL = settings.DCF_GOOGLE_SA_REGISTER_URL +DCF_GOOGLE_SA_VERIFY_URL = settings.DCF_GOOGLE_SA_VERIFY_URL +DCF_GOOGLE_SA_MONITOR_URL = settings.DCF_GOOGLE_SA_MONITOR_URL +DCF_GOOGLE_SA_URL = settings.DCF_GOOGLE_SA_URL class DCFCommFailure(Exception): """Thrown if we have problems communicating with DCF """ @@ -104,6 +107,260 @@ def drop_dcf_token(user_id): return None +def unregister_sa_via_dcf(user_id, sa_id): + """ + Delete the given service account + """ + try: + full_url = '{0}{1}'.format(DCF_GOOGLE_SA_URL, sa_id) + resp = _dcf_call(full_url, user_id, mode='delete') + except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: + logger.error("[ERROR] Attempt to contact DCF for SA information (user {})".format(user_id)) + raise e + except Exception as e: + logger.error("[ERROR] Attempt to contact DCF for SA information failed (user {})".format(user_id)) + raise e + + success = False + messages = None + if resp.status_code == 200: + success = True + messages = ["Service account {} was dropped".format(sa_id)] + elif resp.status_code == 400: + messages = ["Service account {} was not found".format(sa_id)] + elif resp.status_code == 403: + messages = ["User cannot delete service account {}".format(sa_id)] + else: + messages = ["Unexpected response '{}' from Data Commons while dropping service account: {}".format(resp.status_code, sa_id)] + + return success, messages + + +def service_account_info_from_dcf_for_project(user_id, proj): + """ + Get all service accounts tied to a project + """ + retval = [] + + try: + full_url = '{0}?google_project_ids={1}'.format(DCF_GOOGLE_SA_URL, proj) + logger.info("[INFO] Calling DCF URL {}".format(full_url)) + resp = _dcf_call(full_url, user_id, mode='get') + except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: + logger.error("[ERROR] Attempt to contact DCF for SA information (user {})".format(user_id)) + raise e + except Exception as e: + logger.error("[ERROR] Attempt to contact DCF for SA information failed (user {})".format(user_id)) + raise e + + messages = None + if resp.status_code == 200: + response_dict = json_loads(resp.text) + sa_list = response_dict['service_accounts'] + for sa in sa_list: + ret_entry = { + 'gcp_id': sa['google_project_id'], + 'sa_dataset_ids': sa['project_access'], + 'sa_name': sa['service_account_email'], + 'sa_exp': sa['project_access_exp'] + } + retval.append(ret_entry) + elif resp.status_code == 403: + messages = ["User is not a member of Google project {}".format(proj)] + elif resp.status_code == 401: # Have seen this when the google sa scope was not requested in key + messages = ["User does not have permissions for this operation on Google project {}".format(proj)] + elif resp.status_code == 400: # If they don't like the request, say it was empty: + logger.info("[INFO] DCF response of 400 for URL {}".format(full_url)) + else: + messages = ["Unexpected response from Data Commons: {}".format(resp.status_code)] + + return retval, messages + + +def service_account_info_from_dcf(user_id, proj_list): + """ + Get all service accounts tied to the list of projects + """ + try: + proj_string = ','.join(proj_list) + full_url = '{0}?google_project_ids={1}'.format(DCF_GOOGLE_SA_URL, proj_string) + resp = _dcf_call(full_url, user_id, mode='get') + except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: + logger.error("[ERROR] Attempt to contact DCF for SA information (user {})".format(user_id)) + raise e + except Exception as e: + logger.error("[ERROR] Attempt to contact DCF for SA information failed (user {})".format(user_id)) + raise e + + retval = {} + messages = None + response_dict = json_loads(resp.text) + if resp.status_code == 200: + sa_list = response_dict['service_accounts'] + for sa in sa_list: + ret_entry = { + 'gcp_id': sa['google_project_id'], + 'sa_dataset_ids': sa['project_access'], + 'sa_name': sa['service_account_email'], + 'sa_exp': sa['project_access_exp'] + } + retval[sa['service_account_email']] = ret_entry + elif resp.status_code == 403: + messages = ["User is not a member on one or more of these Google projects: {}".format(proj_string)] + else: + messages = ["Unexpected response from Data Commons: {}".format(resp.status_code)] + + return retval, messages + + +def verify_sa_at_dcf(user_id, gcp_id, service_account_id, datasets): + """ + :raise TokenFailure: + :raise InternalTokenError: + :raise DCFCommFailure: + :raise RefreshTokenExpired: + """ + + sa_data = { + "service_account_email": service_account_id, + "google_project_id": gcp_id, + "project_access": datasets + } + + # + # Call DCF to see if there would be problems with the service account registration. + # + + try: + resp = _dcf_call(DCF_GOOGLE_SA_VERIFY_URL, user_id, mode='post', post_body=sa_data) + except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: + logger.error("[ERROR] Attempt to contact DCF for SA verification failed (user {})".format(user_id)) + raise e + except Exception as e: + logger.error("[ERROR] Attempt to contact DCF for SA verification failed (user {})".format(user_id)) + raise e + + messages = [] + + if resp: + logger.info("[INFO] DCF SA verification response code was {} with body: {} ".format(resp.status_code, resp.text)) + response_dict = json_loads(resp.text) + if resp.status_code == 200: + messages = [] + success = response_dict['success'] + if not success: + logger.error("[ERROR] Inconsistent success response from DCF! Code: {} Text: {}".format(resp.status_code, success)) + else: + messages.append("Service account {}: was verified".format(service_account_id)) + elif resp.status_code == 400: + messages = [] + error_info = response_dict['errors'] + sa_error_info = error_info['service_account_email'] + if sa_error_info['status'] == 200: + messages.append("Service account {}: no issues".format(service_account_id)) + else: + messages.append("Service account {} error ({}): {}".format(service_account_id, + sa_error_info['error'], + sa_error_info['error_description'])) + gcp_error_info = error_info['google_project_id'] + if gcp_error_info['status'] == 200: + messages.append("Google cloud project {}: no issues".format(gcp_id)) + else: + messages.append("Google cloud project {} error ({}): {}".format(gcp_id, + gcp_error_info['error'], + gcp_error_info['error_description'])) + project_access_error_info = error_info['project_access'] + messages.append("Requested projects:") + for project_name in project_access_error_info: + project = project_access_error_info[project_name] + if project['status'] == 200: + messages.append("Dataset {}: no issues".format(project_name)) + else: + messages.append("Dataset {} error ({}): {}".format(project_name, + project['error'], + project['error_description'])) + else: + logger.error("[ERROR] Unexpected response from DCF: {}".format(resp.status_code)) + + return messages + + +def register_sa_at_dcf(user_id, gcp_id, service_account_id, datasets): + """ + :raise TokenFailure: + :raise InternalTokenError: + :raise DCFCommFailure: + :raise RefreshTokenExpired: + """ + + sa_data = { + "service_account_email": service_account_id, + "google_project_id": gcp_id, + "project_access": datasets + } + + # + # Call DCF to see if there would be problems with the service account registration. + # + + try: + logger.info("[INFO] Calling DCF at {}".format(json_dumps(sa_data))) + resp = _dcf_call(DCF_GOOGLE_SA_REGISTER_URL, user_id, mode='post', post_body=sa_data) + logger.info("[INFO] Just called DCF at {}".format(DCF_GOOGLE_SA_REGISTER_URL)) + except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: + logger.error("[ERROR] Attempt to contact DCF for SA registration failed (user {})".format(user_id)) + raise e + except Exception as e: + logger.error("[ERROR] Attempt to contact DCF for SA registration failed (user {})".format(user_id)) + raise e + + messages = [] + + if resp: + logger.info("[INFO] DCF SA registration response code was {} with body: {} ".format(resp.status_code, resp.text)) + response_dict = json_loads(resp.text) + if resp.status_code == 200: + messages = [] + success = response_dict['success'] + if not success: + logger.error("[ERROR] Inconsistent success response from DCF! Code: {} Text: {}".format(resp.status_code, success)) + else: + messages.append("Service account {}: was verified".format(service_account_id)) + elif resp.status_code == 400: + messages = [] + error_info = response_dict['errors'] + sa_error_info = error_info['service_account_email'] + if sa_error_info['status'] == 200: + messages.append("Service account {}: no issues".format(service_account_id)) + else: + messages.append("Service account {} error ({}): {}".format(service_account_id, + sa_error_info['error'], + sa_error_info['error_description'])) + gcp_error_info = error_info['google_project_id'] + if gcp_error_info['status'] == 200: + messages.append("Google cloud project {}: no issues".format(gcp_id)) + else: + messages.append("Google cloud project {} error ({}): {}".format(gcp_id, + gcp_error_info['error'], + gcp_error_info['error_description'])) + project_access_error_info = error_info['project_access'] + messages.append("Requested projects:") + for project_name in project_access_error_info: + project = project_access_error_info[project_name] + if project['status'] == 200: + messages.append("Dataset {}: no issues".format(project_name)) + else: + messages.append("Dataset {} error ({}): {}".format(project_name, + project['error'], + project['error_description'])) + else: + logger.error("[ERROR] Unexpected response from DCF: {}".format(resp.status_code)) + else: + logger.error("[ERROR] No response from DCF for registration") + + return messages + + def get_auth_elapsed_time(user_id): """ There is benefit in knowing when the user did their NIH login at DCF, allowing us to e.g. estimate @@ -410,7 +667,7 @@ def refresh_at_dcf(user_id): # try: - resp = dcf_call(DCF_GOOGLE_URL, user_id, mode='patch') + resp = _dcf_call(DCF_GOOGLE_URL, user_id, mode='patch') except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: throw_later = e except Exception as e: @@ -565,7 +822,7 @@ def _decode_token(token): return decode_token_chunk(token, 1) -def dcf_call(full_url, user_id, mode='get', post_body=None, force_token=False): +def _dcf_call(full_url, user_id, mode='get', post_body=None, force_token=False, params_dict=None): """ All the stuff around a DCF call that handles token management and refreshes. @@ -612,7 +869,7 @@ def token_storage_for_user(my_token_dict): try: resp = dcf.request(mode, full_url, client_id=client_id, - client_secret=client_secret, data=post_body) + client_secret=client_secret, data=post_body, params=params_dict) except (TokenFailure, RefreshTokenExpired) as e: # bubbles up from token_storage_for_user call logger.error("[ERROR] _dcf_call {} aborted: {}".format(full_url, str(e))) @@ -728,7 +985,16 @@ def refresh_token_storage(token_dict, decoded_jwt, user_token, nih_username_from # "sub": "The users's DCF ID" # } - refresh_expire_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(refresh_token_dict['exp'])) + dcf_expire_timestamp = refresh_token_dict['exp'] + + # + # For testing purposes ONLY, we want the refresh token to expire in two days, not in 30. So mess with the returned + # value: + # + + #dcf_expire_timestamp -= (28 * 86400) # ONLY USE THIS HACK FOR TESTING + + refresh_expire_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(dcf_expire_timestamp)) # This refers to the *access key* expiration (~20 minutes) if token_dict.has_key('expires_at'): @@ -787,7 +1053,7 @@ def unlink_at_dcf(user_id, do_refresh): # try: - resp = dcf_call(DCF_GOOGLE_URL, user_id, mode='delete') # can raise TokenFailure, DCFCommFailure + resp = _dcf_call(DCF_GOOGLE_URL, user_id, mode='delete') # can raise TokenFailure, DCFCommFailure except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: throw_later = e # hold off so we can try a refresh first... except Exception as e: diff --git a/accounts/dcf_views.py b/accounts/dcf_views.py index 79b368f6..3f3bab9e 100755 --- a/accounts/dcf_views.py +++ b/accounts/dcf_views.py @@ -80,7 +80,7 @@ def oauth2_login(request): # Found that 'user' scope had to be included to be able to do the user query on callback, and the data scope # to do data queries. Starting to recognize a pattern here... - oauth = OAuth2Session(client_id, redirect_uri=full_callback, scope=['openid', 'user']) + oauth = OAuth2Session(client_id, redirect_uri=full_callback, scope=['openid', 'user', 'google_service_account']) authorization_url, state = oauth.authorization_url(DCF_AUTH_URL) # stash the state string in the session! diff --git a/accounts/sa_utils.py b/accounts/sa_utils.py index 42286918..b18e26ec 100644 --- a/accounts/sa_utils.py +++ b/accounts/sa_utils.py @@ -41,12 +41,12 @@ from dataset_utils.dataset_config import DatasetGoogleGroupPair from google_helpers.pubsub_service import get_pubsub_service, get_full_topic_name -from dcf_support import get_stored_dcf_token, \ +from dcf_support import get_stored_dcf_token, verify_sa_at_dcf, register_sa_at_dcf, \ TokenFailure, RefreshTokenExpired, InternalTokenError, DCFCommFailure, \ - GoogleLinkState, get_auth_elapsed_time, \ + GoogleLinkState, get_auth_elapsed_time, unregister_sa_via_dcf, \ get_google_link_from_user_dict, get_projects_from_user_dict, \ get_nih_id_from_user_dict, user_data_token_to_user_dict, get_user_data_token_string, \ - compare_google_ids + compare_google_ids, service_account_info_from_dcf logger = logging.getLogger('main_logger') @@ -57,11 +57,208 @@ MANAGED_SERVICE_ACCOUNTS_PATH = settings.MANAGED_SERVICE_ACCOUNTS_PATH -def verify_service_account(gcp_id, service_account, datasets, user_email, is_refresh=False, is_adjust=False, remove_all=False): +class SAModes: + REMOVE_ALL = 1 + ADJUST = 2 + EXTEND = 3 + REGISTER = 4 + CANNOT_OCCUR = 5 + + +def _derive_sa_mode(is_refresh, is_adjust, remove_all): + """ + We have three different flag driving only four different modes. Try to make this more + comprehensible: + """ + if is_adjust: + if is_refresh: + if remove_all: + return SAModes.CANNOT_OCCUR + else: + return SAModes.CANNOT_OCCUR + else: + if remove_all: + return SAModes.REMOVE_ALL + else: + return SAModes.ADJUST + else: + if is_refresh: + if remove_all: + return SAModes.CANNOT_OCCUR + else: + return SAModes.EXTEND + else: + if remove_all: + return SAModes.CANNOT_OCCUR + else: + return SAModes.REGISTER + + +def _load_black_and_white(st_logger, log_name, service_account): + """ + Even with DCF handling registration, we would still maybe want to e.g. catch our SAs: + """ + + # + # Block verification of service accounts used by the application + # We should keep this around even with DCF, since we can catch these without even asking. + # + try: + sab = ServiceAccountBlacklist.from_json_file_path(SERVICE_ACCOUNT_BLACKLIST_PATH) + msa = ManagedServiceAccounts.from_json_file_path(MANAGED_SERVICE_ACCOUNTS_PATH) + gow = GoogleOrgWhitelist.from_json_file_path(GOOGLE_ORG_WHITELIST_PATH) + except Exception as e: + logger.error("[ERROR] Exception while creating ServiceAccountBlacklist or GoogleOrgWhitelist instance: ") + logger.exception(e) + trace_msg = traceback.format_exc() + st_logger.write_text_log_entry(log_name, "[ERROR] Exception while creating ServiceAccountBlacklist or GoogleOrgWhitelist instance: ") + st_logger.write_text_log_entry(log_name, trace_msg) + return None, None, None, {'message': 'An error occurred while validating the service account.'} + + if sab.is_blacklisted(service_account): + st_logger.write_text_log_entry(log_name, "Cannot register {0}: Service account is blacklisted.".format(service_account)) + return None, None, None, {'message': 'This service account cannot be registered.'} + + return sab, msa, gow, None + + +def _check_sa_sanity(st_logger, log_name, service_account, sa_mode, controlled_datasets, user_email): + # Refreshes and adjustments require a service account to exist, and, you cannot register an account if it already + # exists with the same datasets + + sa_qset = ServiceAccount.objects.filter(service_account=service_account, active=1) + if len(sa_qset) == 0: + if sa_mode == SAModes.REMOVE_ALL or sa_mode == SAModes.ADJUST or sa_mode == SAModes.EXTEND: + return { + 'message': 'Service account {} was not found so cannot be {}.'.format(escape(service_account), ( + "adjusted" if (sa_mode == SAModes.REMOVE_ALL or sa_mode == SAModes.ADJUST) else "refreshed")), + 'level': 'error' + } + # determine if this is a re-registration, or a brand-new one + sa_qset = ServiceAccount.objects.filter(service_account=service_account, active=0) + if len(sa_qset) > 0: + logger.info("[STATUS] Verification for SA {} being re-registered by user {}".format(service_account, + user_email)) + st_logger.write_text_log_entry(log_name, + "[STATUS] Verification for SA {} being re-registered by user {}".format( + service_account, user_email)) + else: + sa = sa_qset.first() + if sa_mode == SAModes.REGISTER: + return { + 'message': 'Service account {} has already been registered. Please use the adjustment and refresh options to add/remove datasets or extend your access.'.format(escape(service_account)), + 'level': 'error' + } + + # if is_adjust or not is_refresh: + if sa_mode == SAModes.REMOVE_ALL or sa_mode == SAModes.ADJUST or sa_mode == SAModes.REGISTER: + reg_change = False + # Check the private datasets to see if there's a registration change + saads = AuthorizedDataset.objects.filter(id__in=ServiceAccountAuthorizedDatasets.objects.filter(service_account=sa).values_list('authorized_dataset', flat=True), public=False).values_list('whitelist_id', flat=True) + + # If we're removing all datasets and there are 1 or more, this is automatically a registration change + if (sa_mode == SAModes.REMOVE_ALL) and saads.count(): + reg_change = True + else: + if controlled_datasets.count() or saads.count(): + ads = controlled_datasets.values_list('whitelist_id', flat=True) + # A private dataset missing from either list means this is a registration change + for ad in ads: + if ad not in saads: + reg_change = True + if not reg_change: + for saad in saads: + if saad not in ads: + reg_change = True + else: + reg_change = (len(AuthorizedDataset.objects.filter(id__in=ServiceAccountAuthorizedDatasets.objects.filter(service_account=sa).values_list('authorized_dataset', flat=True), public=True)) <= 0) + # If this isn't a refresh but the requested datasets aren't changing (except to be removed), we don't need to do anything + if not reg_change: + return { + 'message': 'Service account {} already exists with these datasets, and so does not need to be {}.'.format(escape(service_account),('re-registered' if (sa_mode == SAModes.REGISTER) else 'adjusted')), + 'level': 'warning' + } + return None + + +def verify_service_account(gcp_id, service_account, datasets, user_email, user_id, is_refresh=False, is_adjust=False, remove_all=False): + try: + if settings.SA_VIA_DCF: + return _verify_service_account_dcf(gcp_id, service_account, datasets, user_email, user_id, is_refresh, is_adjust, remove_all) + else: + return _verify_service_account_isb(gcp_id, service_account, datasets, user_email, is_refresh, is_adjust, remove_all) + except Exception as e: + logger.error("[ERROR] Exception while verifying ServiceAccount") + logger.exception(e) + return {'message': 'An error occurred while validating the service account.'} + + +def _verify_service_account_dcf(gcp_id, service_account, datasets, user_email, user_id, is_refresh=False, is_adjust=False, remove_all=False): + + sa_mode = _derive_sa_mode(is_refresh, is_adjust, remove_all) + + # Only verify for protected datasets + controlled_datasets = AuthorizedDataset.objects.filter(whitelist_id__in=datasets, public=False) + controlled_dataset_names = controlled_datasets.values_list('name', flat=True) + project_id_re = re.compile(ur'(@' + re.escape(gcp_id) + ur'\.)', re.UNICODE) + projectNumber = None + sab = None + gow = None + sa = None + + # log the reports using Cloud logging API + st_logger = StackDriverLogger.build_from_django_settings() + + log_name = SERVICE_ACCOUNT_LOG_NAME + resp = { + 'message': '{0}: Begin verification of service account.'.format(service_account) + } + st_logger.write_struct_log_entry(log_name, resp) + + # + # load the lists: + # + + sab, msa, gow, msg = _load_black_and_white(st_logger, log_name, service_account) + if msg: + return msg + + # + # Check SA sanity: + # + + msg = _check_sa_sanity(st_logger, log_name, service_account, sa_mode, controlled_datasets, user_email) + if msg: + return msg + + # + # Ask DCF if we are cool: + # + + try: + messages = verify_sa_at_dcf(user_id, gcp_id, service_account, datasets) + logger.info("[INFO] messages from DCF {}".format(','.join(messages))) + if messages: + return { + 'message': '\n'.join(messages), + 'level': 'error' + } + except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: + logger.exception(e) + return {'message': "FIXME There was an error while verifying this service account. Please contact the administrator."} + + return_obj = {'roles': 'FIXME!!', + 'all_user_datasets_verified': 'FIXME!!'} + return return_obj + + +def _verify_service_account_isb(gcp_id, service_account, datasets, user_email, is_refresh=False, is_adjust=False, remove_all=False): # Only verify for protected datasets - controlled_datasets = AuthorizedDataset.objects.filter(id__in=datasets, public=False) + controlled_datasets = AuthorizedDataset.objects.filter(whitelist_id__in=datasets, public=False) controlled_dataset_names = controlled_datasets.values_list('name', flat=True) + logger.info("[INFO] Datasets: {} {} {}".format(str(datasets), len(controlled_datasets), str(controlled_dataset_names))) + project_id_re = re.compile(ur'(@' + re.escape(gcp_id) + ur'\.)', re.UNICODE) projectNumber = None sab = None @@ -186,9 +383,9 @@ def verify_service_account(gcp_id, service_account, datasets, user_email, is_ref or msa.is_managed(service_account)): msg = "Service Account {} is ".format(escape(service_account),) if msa.is_managed(service_account): - msg += "a Google System Managed Service Account, and so cannot be regsitered. Please register a user-managed Service Account." + msg += "a Google System Managed Service Account, and so cannot be registered. Please register a user-managed Service Account." else: - msg += "not from GCP {}, and so cannot be regsitered. Only service accounts originating from this project can be registered.".format(str(gcp_id), ) + msg += "not from GCP {}, and so cannot be registered. Only service accounts originating from this project can be registered.".format(str(gcp_id), ) return { 'message': msg, 'level': 'error' @@ -401,7 +598,19 @@ def verify_service_account(gcp_id, service_account, datasets, user_email, is_ref return return_obj -def register_service_account(user_email, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all): +def register_service_account(user_email, user_id, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all): + try: + if settings.SA_VIA_DCF: + return _register_service_account_dcf(user_email, user_id, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all) + else: + return _register_service_account_isb(user_email, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all) + except Exception as e: + logger.error("[ERROR] Exception while registering ServiceAccount") + logger.exception(e) + return {('An error occurred while registering the service account.', 'error')} + + +def _register_service_account_dcf(user_email, user_id, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all): ret_msg = [] @@ -416,11 +625,59 @@ def register_service_account(user_email, gcp_id, user_sa, datasets, is_refresh, if len(datasets) == 1 and datasets[0] == '': datasets = [] - else: - datasets = map(int, datasets) + # datasets are now identified by their whitelist id: + #else: + # datasets = map(int, datasets) # VERIFY AGAIN JUST IN CASE USER TRIED TO GAME THE SYSTEM - result = verify_service_account(gcp_id, user_sa, datasets, user_email, is_refresh, is_adjust) + result = _verify_service_account_dcf(gcp_id, user_sa, datasets, user_email, user_id, is_refresh, is_adjust, remove_all) + logger.info("[INFO] FIXME ACTUALLY CHECK RESULTS") + + err_msgs = [] + + # + # Ask DCF if we are cool: + # + + try: + messages = register_sa_at_dcf(user_id, gcp_id, user_sa, datasets) + logger.info("[INFO] messages from DCF {}".format(','.join(messages))) + if messages: + return { + 'message': '\n'.join(messages), + 'level': 'error' + } + except (TokenFailure, InternalTokenError, RefreshTokenExpired, DCFCommFailure) as e: + logger.exception(e) + return { + 'message': "FIXME There was an error while verifying this service account. Please contact the administrator."} + + return_obj = {'roles': 'FIXME!!', + 'all_user_datasets_verified': 'FIXME!!'} + return return_obj + + +def _register_service_account_isb(user_email, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all): + + ret_msg = [] + + # log the reports using Cloud logging API + st_logger = StackDriverLogger.build_from_django_settings() + + user_gcp = GoogleProject.objects.get(project_id=gcp_id, active=1) + + # If we've received a remove-all request, ignore any provided datasets + if remove_all: + datasets = [''] + + if len(datasets) == 1 and datasets[0] == '': + datasets = [] + # datasets are now identified by their whitelist id: + #else: + # datasets = map(int, datasets) + + # VERIFY AGAIN JUST IN CASE USER TRIED TO GAME THE SYSTEM + result = _verify_service_account_isb(gcp_id, user_sa, datasets, user_email, is_refresh, is_adjust) err_msgs = [] @@ -431,7 +688,7 @@ def register_service_account(user_email, gcp_id, user_sa, datasets, is_refresh, user_sa, user_email)}) # Datasets verified, add service accounts to appropriate acl groups - protected_datasets = AuthorizedDataset.objects.filter(id__in=datasets) + protected_datasets = AuthorizedDataset.objects.filter(whitelist_id__in=datasets) # ADD SERVICE ACCOUNT TO ALL PUBLIC AND PROTECTED DATASETS ACL GROUPS public_datasets = AuthorizedDataset.objects.filter(public=True) @@ -590,11 +847,21 @@ def register_service_account(user_email, gcp_id, user_sa, datasets, is_refresh, return ret_msg -def unregister_sa_with_id(user_id, sa_id): - unregister_sa(user_id, ServiceAccount.objects.get(id=sa_id).service_account) +def unregister_all_gcp_sa(user_id, gcp_id): + if settings.SA_VIA_DCF: + # FIXME Throws ex ceptions: + success = None + msgs = None + #success, msgs = unregister_all_gcp_sa_via_dcf(user_id, gcp_id) + pass + else: + success = None + msgs = None + _unregister_all_gcp_sa_db(user_id, gcp_id) + return success, msgs -def unregister_all_gcp_sa(user_id, gcp_id): +def _unregister_all_gcp_sa_db(user_id, gcp_id): gcp = GoogleProject.objects.get(id=gcp_id, active=1) # Remove Service Accounts associated to this Google Project and remove them from acl_google_groups @@ -604,6 +871,17 @@ def unregister_all_gcp_sa(user_id, gcp_id): def unregister_sa(user_id, sa_name): + if settings.SA_VIA_DCF: + # FIXME Throws exceptions: + success, msgs = unregister_sa_via_dcf(user_id, sa_name) + else: + success = None + msgs = None + _unregister_sa_db(user_id, sa_name) + return success, msgs + + +def _unregister_sa_db(user_id, sa_name): st_logger = StackDriverLogger.build_from_django_settings() sa = ServiceAccount.objects.get(service_account=sa_name) @@ -655,24 +933,77 @@ def unregister_sa(user_id, sa_name): sa.active = False sa.save() -def service_account_dict(sa_id): - service_account = ServiceAccount.objects.get(id=sa_id, active=1) + +def controlled_auth_datasets(): + datasets = AuthorizedDataset.objects.filter(public=False) + return [{'whitelist_id': x.whitelist_id, 'name': x.name, 'duca': x.duca_id} for x in datasets] + + +def service_account_dict(user_id, sa_id): + if settings.SA_VIA_DCF: + # FIXME This is throwing DCF token exceptions! + return _service_account_dict_from_dcf(user_id, sa_id) + else: + return _service_account_dict_from_db(sa_id) + + +def _service_account_dict_from_dcf(user_id, sa_name): + + # + # DCF currently (8/2/18) requires us to provide the list of Google projects + # that we want service accounts for. If we are just given the SA ID, we need + # to query for all projects and then use those results to find the SA matching + # the ID (unless we go through funny business trying to parse the project out + # of the service account name: + # + user = User.objects.get(id=user_id) + gcp_list = GoogleProject.objects.filter(user=user, active=1) + proj_list = [x.project_id for x in gcp_list] + + sa_dict, messages = service_account_info_from_dcf(user_id, proj_list) + return sa_dict[sa_name] if sa_name in sa_dict else None, messages + + +def _service_account_dict_from_db(sa_name): + service_account = ServiceAccount.objects.get(service_account=sa_name, active=1) + datasets = service_account.get_auth_datasets() + ds_ids = [] + for dataset in datasets: + ds_ids.append(dataset.whitelist_id) + + expired_time = service_account.authorized_date + datetime.timedelta(days=7) + retval = { - 'gcp_id': service_account.google_project.project_id, - 'sa_datasets': service_account.get_auth_datasets(), - 'sa_id': service_account.service_account + 'gcp_id': service_account.google_project.project_id, + 'sa_dataset_ids': ds_ids, + 'sa_name': service_account.service_account, + 'sa_exp': expired_time + } - return retval + return retval, None + + +def auth_dataset_whitelists_for_user(user_id): + nih_users = NIH_User.objects.filter(user_id=user_id, linked=True) + num_users = len(nih_users) + if num_users != 1: + if num_users > 1: + logger.warn("Multiple objects when retrieving nih_user with user_id {}.".format(str(user_id))) + else: + logger.warn("No objects when retrieving nih_user with user_id {}.".format(str(user_id))) + return None + nih_user = nih_users.first() + expired_time = nih_user.NIH_assertion_expiration + now_time = pytz.utc.localize(datetime.datetime.utcnow()) + if now_time >= expired_time: + return None -def auth_dataset_whitelists_for_user(use_user_id): - nih_user = NIH_User.objects.filter(user_id=use_user_id, active=True) has_access = None - if len(nih_user) > 0: - user_auth_sets = UserAuthorizedDatasets.objects.filter(nih_user=nih_user) - for dataset in user_auth_sets: - if not has_access: - has_access = [] - has_access.append(dataset.authorized_dataset.whitelist_id) + user_auth_sets = UserAuthorizedDatasets.objects.filter(nih_user=nih_user) + for dataset in user_auth_sets: + if not has_access: + has_access = [] + has_access.append(dataset.authorized_dataset.whitelist_id) return has_access diff --git a/accounts/urls.py b/accounts/urls.py index c268e5f0..5aa43a57 100755 --- a/accounts/urls.py +++ b/accounts/urls.py @@ -50,7 +50,7 @@ url(r'^users/(?P\d+)/register_sa/$', views.register_sa, name='register_sa'), url(r'^users/(?P\d+)/verify_sa/$', views.verify_sa, name='verify_sa'), url(r'^users/(?P\d+)/adjust_sa/$', views.register_sa, name='adjust_sa'), - url(r'^users/(?P\d+)/delete_sa/(?P\d+)/$', views.delete_sa, name='delete_sa'), + url(r'^users/(?P\d+)/delete_sa/(?P[-a-zA-Z0-9@.]+)/$', views.delete_sa, name='delete_sa'), url(r'^users/(?P\d+)/register_bucket/(?P\d+)/$', views.register_bucket, name='register_bucket'), url(r'^users/(?P\d+)/delete_bucket/(?P\d+)/$', views.delete_bucket, name='delete_bucket'), url(r'^users/(?P\d+)/register_bqdataset/(?P\d+)/$', views.register_bqdataset, name='register_bqdataset'), diff --git a/accounts/views.py b/accounts/views.py index 75c56556..888bd155 100644 --- a/accounts/views.py +++ b/accounts/views.py @@ -38,8 +38,11 @@ from projects.models import User_Data_Tables from django.utils.html import escape from sa_utils import verify_service_account, register_service_account, \ - unregister_all_gcp_sa, unregister_sa_with_id, service_account_dict, \ - do_nih_unlink, deactivate_nih_add_to_open + unregister_all_gcp_sa, unregister_sa, service_account_dict, \ + do_nih_unlink, deactivate_nih_add_to_open, controlled_auth_datasets + +from dcf_support import service_account_info_from_dcf_for_project + from json import loads as json_loads logger = logging.getLogger('main_logger') @@ -119,7 +122,14 @@ def user_gcp_list(request, user_id): 'last_name': user.last_name } - context = {'user': user, 'user_details': user_details, 'gcp_list': gcp_list} + gcp_and_sa_tuples = [] + for gcp in gcp_list: + sa_dicts, sa_err_msg = _buid_sa_list_for_gcp(request, user_id, gcp.id, gcp) + if sa_err_msg is not None: + template = '500.html' + return render(request, template, context) + gcp_and_sa_tuples.append((gcp, sa_dicts)) + context = {'user': user, 'user_details': user_details, 'gcp_sa_tups': gcp_and_sa_tuples} except (MultipleObjectsReturned, ObjectDoesNotExist) as e: logger.error("[ERROR] While fetching user GCP list: ") @@ -139,6 +149,99 @@ def user_gcp_list(request, user_id): return render(request, template, context) +def _buid_sa_list_for_gcp(request, user_id, gcp_id, gcp_context): + + retval = [] + sa_messages = None + + try: + if settings.SA_VIA_DCF: + sa_info, sa_messages = service_account_info_from_dcf_for_project(user_id, gcp_id) + if sa_messages: + for message in sa_messages: + logger.error("[ERROR] {}:".format(message)) + messages.error(request, message) + return None, sa_messages + + for sa_dict in sa_info: + sa_data = {} + retval.append(sa_data) + sa_data['name'] = sa_dict['sa_name'] + # for modal names: + sa_data['esc_name'] = sa_dict['sa_name'].replace('@', "-at-").replace('.', '-dot-') + now_time = pytz.utc.localize(datetime.datetime.utcnow()) + exp_time = datetime.datetime.fromtimestamp(sa_data['sa_exp']) + sa_data['is_expired'] = exp_time < now_time + sa_data['authorized_date'] = exp_time + datetime.timedelta(days=-7) + auth_names = [] + auth_ids = [] + sa_data['num_auth'] = len(sa_data['sa_dataset_ids']) + logger.info("[INFO] Listing ADs for GCP {} {}:".format(gcp_id, len(sa_data['sa_dataset_ids']))) + for auth_data in sa_data['sa_dataset_ids']: + logger.info("[INFO] AD {}:".format(gcp_id, str(auth_data))) + protected_dataset = AuthorizedDataset.objects.get(whitelist_id=auth_data) + auth_names.append(protected_dataset.name) + auth_ids.append(str(protected_dataset.id)) + sa_data['auth_dataset_names'] = ', '.join(auth_names) + sa_data['auth_dataset_ids'] = ', '.join(auth_ids) + + # for sa in sa_list: + # ret_entry = { + # 'gcp_id': sa['google_project_id'], + # 'sa_dataset_ids': sa['project_access'], + # 'sa_name': sa['service_account_email'], + # 'sa_exp': sa['project_access_exp'] + # } + # retval.append(ret_entry) + + else: + + # google_project = models.ForeignKey(GoogleProject, null=False) + # service_account = models.CharField(max_length=1024, null=False) + # active = models.BooleanField(default=False, null=False) + # authorized_date = models.DateTimeField(auto_now=True) + + active_sas = gcp_context.active_service_accounts() + for service_account in active_sas: + logger.info("[INFO] Listing SA {}:".format(service_account.service_account)) + auth_datasets = service_account.get_auth_datasets() + sa_data = {} + retval.append(sa_data) + sa_data['name'] = service_account.service_account + # for modal names: + sa_data['esc_name'] = service_account.service_account.replace('@', "-at-").replace('.', '-dot-') + sa_data['is_expired'] = service_account.is_expired() + sa_data['authorized_date'] = service_account.authorized_date + auth_names = [] + auth_ids = [] + sa_data['num_auth'] = len(auth_datasets) + logger.info("[INFO] Listing ADs for GCP {} {}:".format(gcp_id, len(auth_datasets))) + for auth_data in auth_datasets: + auth_names.append(auth_data.name) + auth_ids.append(str(auth_data.id)) + sa_data['auth_dataset_names'] = ', '.join(auth_names) + sa_data['auth_dataset_ids'] = ', '.join(auth_ids) + + # We should get back all service accounts, even ones that have expired (I hope). Note we no longer should be + # getting back "inactive" service accounts; that is for DCF to sort out and manage internally. + # + + + # we need: + # + # service_account.get_auth_datasets + # dataset names, separated by "," + # if we have auth datasets and they are expired, want the authorized_date as: 'M d, Y, g:i a' + # dataset ids, separated by ", " + + except Exception as e: + logger.error("[ERROR] While detailing a GCP: ") + logger.exception(e) + sa_messages = ["There was an error while attempting to list Service Accouts."] + + return retval, sa_messages + + @login_required def verify_gcp(request, user_id): status = None @@ -313,8 +416,99 @@ def register_gcp(request, user_id): @login_required def gcp_detail(request, user_id, gcp_id): - context = {} - context['gcp'] = GoogleProject.objects.get(id=gcp_id, active=1) + try: + logger.info("[INFO] gcp_detail {}:".format(gcp_id)) + context = {} + context['gcp'] = GoogleProject.objects.get(id=gcp_id, active=1) + logger.info("[INFO] Listing SAs for GCP {}:".format(gcp_id)) + if settings.SA_VIA_DCF: + context['sa_list'] = [] + gcp_project_id = context['gcp'].project_id + sa_info, sa_messages = service_account_info_from_dcf_for_project(user_id, gcp_project_id) + if sa_messages: + for message in sa_messages: + logger.error("[ERROR] {}:".format(message)) + messages.error(request, message) + return render(request, 'GenespotRE/gcp_detail.html', context) + + for sa_dict in sa_info: + sa_data = {} + context['sa_list'].append(sa_data) + sa_data['name'] = sa_dict['sa_name'] + # for modal names: + sa_data['esc_name'] = sa_dict['sa_name'].replace('@', "-at-").replace('.', '-dot-') + now_time = pytz.utc.localize(datetime.datetime.utcnow()) + exp_time = datetime.datetime.fromtimestamp(sa_data['sa_exp']) + sa_data['is_expired'] = exp_time < now_time + sa_data['authorized_date'] = exp_time + datetime.timedelta(days=-7) + auth_names = [] + auth_ids = [] + sa_data['num_auth'] = len(sa_data['sa_dataset_ids']) + logger.info("[INFO] Listing ADs for GCP {} {}:".format(gcp_id, len(sa_data['sa_dataset_ids']))) + for auth_data in sa_data['sa_dataset_ids']: + logger.info("[INFO] AD {}:".format(gcp_id, str(auth_data))) + protected_dataset = AuthorizedDataset.objects.get(whitelist_id=auth_data) + auth_names.append(protected_dataset.name) + auth_ids.append(str(protected_dataset.id)) + sa_data['auth_dataset_names'] = ', '.join(auth_names) + sa_data['auth_dataset_ids'] = ', '.join(auth_ids) + + # for sa in sa_list: + # ret_entry = { + # 'gcp_id': sa['google_project_id'], + # 'sa_dataset_ids': sa['project_access'], + # 'sa_name': sa['service_account_email'], + # 'sa_exp': sa['project_access_exp'] + # } + # retval.append(ret_entry) + + else: + context['sa_list'] = [] + + #google_project = models.ForeignKey(GoogleProject, null=False) + #service_account = models.CharField(max_length=1024, null=False) + #active = models.BooleanField(default=False, null=False) + #authorized_date = models.DateTimeField(auto_now=True) + + active_sas = context['gcp'].active_service_accounts() + for service_account in active_sas: + logger.info("[INFO] Listing SA {}:".format(service_account.service_account)) + auth_datasets = service_account.get_auth_datasets() + sa_data = {} + context['sa_list'].append(sa_data) + sa_data['name'] = service_account.service_account + # for modal names: + sa_data['esc_name'] = service_account.service_account.replace('@', "-at-").replace('.', '-dot-') + sa_data['is_expired'] = service_account.is_expired() + sa_data['authorized_date'] = service_account.authorized_date + auth_names = [] + auth_ids = [] + sa_data['num_auth'] = len(auth_datasets) + logger.info("[INFO] Listing ADs for GCP {} {}:".format(gcp_id, len(auth_datasets))) + for auth_data in auth_datasets: + auth_names.append(auth_data.name) + auth_ids.append(str(auth_data.id)) + sa_data['auth_dataset_names'] = ', '.join(auth_names) + sa_data['auth_dataset_ids'] = ', '.join(auth_ids) + + # We should get back all service accounts, even ones that have expired (I hope). Note we no longer should be + # getting back "inactive" service accounts; that is for DCF to sort out and manage internally. + # + + + # we need: + # + # service_account.get_auth_datasets + # dataset names, separated by "," + # if we have auth datasets and they are expired, want the authorized_date as: 'M d, Y, g:i a' + # dataset ids, separated by ", " + + except Exception as e: + logger.error("[ERROR] While detailing a GCP: ") + logger.exception(e) + messages.error(request, + "Encountered an error while trying to detail this Google Cloud Project - please contact the administrator.") + return render(request, 'GenespotRE/gcp_detail.html', context) @@ -359,9 +553,12 @@ def verify_sa(request, user_id): if remove_all: datasets = [] - result = verify_service_account(gcp_id, user_sa, datasets, user_email, is_refresh, is_adjust, remove_all) + logger.info("[INFO] Verifying Service Account {} for datasets {}".format(user_sa, str(datasets))) + result = verify_service_account(gcp_id, user_sa, datasets, user_email, user_id, is_refresh, is_adjust, remove_all) + logger.info("[INFO] Verified Service Account {} for datasets {}".format(user_sa, str(datasets))) if 'message' in result.keys(): + logger.info("[INFO] Gotta message") status = '400' st_logger.write_struct_log_entry(SERVICE_ACCOUNT_LOG_NAME, {'message': '{}: For user {}, {}'.format(user_sa, user_email, result['message'])}) # Users attempting to refresh a project they're not on go back to their GCP list (because this GCP was probably removed from it) @@ -374,8 +571,10 @@ def verify_sa(request, user_id): gcp.save() else: if result['all_user_datasets_verified']: + logger.info("[INFO] all verified") st_logger.write_struct_log_entry(SERVICE_ACCOUNT_LOG_NAME, {'message': '{}: Service account was successfully verified for user {}.'.format(user_sa,user_email)}) else: + logger.info("[INFO] not all verified") st_logger.write_struct_log_entry(SERVICE_ACCOUNT_LOG_NAME, {'message': '{}: Service account was not successfully verified for user {}.'.format(user_sa,user_email)}) result['user_sa'] = user_sa result['datasets'] = datasets @@ -398,18 +597,24 @@ def register_sa(request, user_id): try: # This is a Service Account dataset adjustment or an initial load of the service account registration page - if request.GET.get('sa_id') or request.GET.get('gcp_id'): + if request.GET.get('sa_name') or request.GET.get('gcp_id'): template = 'GenespotRE/register_sa.html' context = { - 'authorized_datasets': AuthorizedDataset.objects.filter(public=False) + 'authorized_datasets': controlled_auth_datasets() } - if request.GET.get('sa_id'): + if request.GET.get('sa_name'): template = 'GenespotRE/adjust_sa.html' - sa_dict = service_account_dict(request.GET.get('sa_id')) + sa_dict, sa_msgs = service_account_dict(user_id, request.GET.get('sa_name')) + # FIXME!!! What to do next if there is an error message (Coming from DCF)?? + if sa_msgs: + for sa_msg in sa_msgs: + messages.error(request, sa_msg) + context['gcp_id'] = sa_dict['gcp_id'] - context['sa_datasets'] = sa_dict['sa_datasets'] - context['sa_id'] = sa_dict['sa_id'] + context['sa_dataset_ids'] = sa_dict['sa_dataset_ids'] + context['sa_name'] = sa_dict['sa_name'] + else: gcp_id = escape(request.GET.get('gcp_id')) crm_service = get_special_crm_resource() @@ -429,7 +634,7 @@ def register_sa(request, user_id): is_refresh = bool(request.POST.get('is_refresh') == 'true') is_adjust = bool(request.POST.get('is_adjust') == 'true') remove_all = bool(request.POST.get('remove_all') == 'true') - err_msgs = register_service_account(user_email, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all) + err_msgs = register_service_account(user_email, user_id, gcp_id, user_sa, datasets, is_refresh, is_adjust, remove_all) for msg_tuple in err_msgs: if msg_tuple[1] == 'error': @@ -437,7 +642,7 @@ def register_sa(request, user_id): elif msg_tuple[1] == 'warning': messages.warning(request, msg_tuple[0]) else: - logger.error("[ERROR] Unimplemented message level: {}, {}".format(msg_tuple[1], msg_tuple[0])) + logger.info("[INFO] {}".format(msg_tuple)) return redirect('user_gcp_list', user_id=user_id) else: @@ -452,12 +657,13 @@ def register_sa(request, user_id): @login_required -def delete_sa(request, user_id, sa_id): +def delete_sa(request, user_id, sa_name): + # FIXME: No longer have an SA_ID to work with try: if request.POST: - unregister_sa_with_id(user_id, sa_id) + unregister_sa(user_id, sa_name) except Exception as e: - logger.error("[ERROR] While trying to unregister Service Account {}: ".format(sa_id)) + logger.error("[ERROR] While trying to unregister Service Account {}: ".format(sa_name)) logger.exception(e) messages.error(request, "Encountered an error while trying to remove this service account - please contact the administrator.") diff --git a/cohorts/metadata_helpers.py b/cohorts/metadata_helpers.py index 812b2575..ea06014d 100644 --- a/cohorts/metadata_helpers.py +++ b/cohorts/metadata_helpers.py @@ -755,8 +755,8 @@ def build_where_clause(filters, alt_key_map=False, program=None, for_files=False # If it's looking for None values elif value == 'None': - query_str += ' %s is null' % key - big_query_str += ' %s is null' % key + query_str += ' (%s is null or %s = "")' % (key, key) + big_query_str += ' (%s is null or %s = "")' % (key, key) # For the general case else: diff --git a/cohorts/views.py b/cohorts/views.py old mode 100755 new mode 100644 index baaf733e..97bac1b7 --- a/cohorts/views.py +++ b/cohorts/views.py @@ -1849,9 +1849,11 @@ def streaming_csv_view(request, cohort_id=0): # rows that can be handled by a single sheet in most spreadsheet # applications. rows = (["File listing for Cohort '{}', Build {}".format(cohort.name, build)],) - rows += (["Sample", "Program", "Platform", "Exp. Strategy", "Data Category", "Data Type", "Data Format", "Cloud Storage Location", "Access Type"],) + rows += (["Case", "Sample", "Program", "Platform", "Exp. Strategy", "Data Category", "Data Type", "Data Format", "Cloud Storage Location", "Access Type"],) for file in file_list: - rows += ([file['sample'], file['program'], file['platform'], file['exp_strat'], file['datacat'], file['datatype'], file['dataformat'], file['cloudstorage_location'], file['access'].replace("-", " ")],) + rows += ([file['case'], file['sample'], file['program'], file['platform'], file['exp_strat'], file['datacat'], + file['datatype'], file['dataformat'], file['cloudstorage_location'], + file['access'].replace("-", " ")],) pseudo_buffer = Echo() writer = csv.writer(pseudo_buffer) response = StreamingHttpResponse((writer.writerow(row) for row in rows), diff --git a/google_helpers/bigquery/bq_support.py b/google_helpers/bigquery/bq_support.py index b7d18a41..0117aefd 100644 --- a/google_helpers/bigquery/bq_support.py +++ b/google_helpers/bigquery/bq_support.py @@ -318,6 +318,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', if job_is_done and job_is_done['status']['state'] == 'DONE': if 'status' in job_is_done and 'errors' in job_is_done['status']: logger.error("[ERROR] During query job {}: {}".format(job_id, str(job_is_done['status']['errors']))) + logger.error("[ERROR] Error'd out query: {}".format(query)) else: logger.info("[STATUS] Query {} done, fetching results...".format(job_id)) query_results = self.fetch_job_results(query_job['jobReference']) @@ -325,6 +326,7 @@ def execute_query(self, query, parameters=None, write_disposition='WRITE_EMPTY', else: logger.error("[ERROR] Query took longer than the allowed time to execute--" + "if you check job ID {} manually you can wait for it to finish.".format(job_id)) + logger.error("[ERROR] Timed out query: {}".format(query)) if 'statistics' in job_is_done and 'query' in job_is_done['statistics'] and 'timeline' in \ job_is_done['statistics']['query']: @@ -361,6 +363,9 @@ def fetch_job_results(self, job_ref): return result + def fetch_job_resource(self, job_ref): + return self.bq_service.jobs().get(**job_ref).execute(num_retries=5) + # Execute a query to be saved on a temp table (shorthand to instance method above), optionally parameterized # and fetch its results @classmethod @@ -393,6 +398,12 @@ def get_job_results(cls, job_reference): bqs = cls(None, None, None) return bqs.fetch_job_results(job_reference) + # Given a BQ service and a job reference, fetch out the results + @classmethod + def get_job_resource(cls, job_id, project_id): + bqs = cls(None, None, None) + return bqs.fetch_job_resource({'jobId': job_id, 'projectId': project_id}) + # Builds a BQ API v2 QueryParameter set and WHERE clause string from a set of filters of the form: # { # 'field_name': [,...] diff --git a/google_helpers/bigquery/cohort_support.py b/google_helpers/bigquery/cohort_support.py index 7364e5b2..16d3a392 100644 --- a/google_helpers/bigquery/cohort_support.py +++ b/google_helpers/bigquery/cohort_support.py @@ -35,6 +35,7 @@ 'staging': 'staging_cohorts' } + class BigQueryCohortSupport(BigQuerySupport): def __init__(self, project_id, dataset_id, table_id): diff --git a/google_helpers/bigquery/gcs_path_support.py b/google_helpers/bigquery/gcs_path_support.py new file mode 100644 index 00000000..ca4096a2 --- /dev/null +++ b/google_helpers/bigquery/gcs_path_support.py @@ -0,0 +1,105 @@ +""" + +Copyright 2018, Institute for Systems Biology + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import logging +from django.conf import settings +from bq_support import BigQuerySupport + +logger = logging.getLogger('main_logger') + +MAX_INSERT = settings.MAX_BQ_INSERT + +TEMP_PATH_SCHEMA = { + 'fields': [ + { + 'name': 'file_gdc_id', + 'type': 'STRING', + 'mode': 'REQUIRED' + }, { + 'name': 'case_barcode', + 'type': 'STRING', + 'mode': 'REQUIRED' + }, { + 'name': 'sample_barcode', + 'type': 'STRING' + }, { + 'name': 'gdc_case_uuid', + 'type': 'STRING' + }, { + 'name': 'gdc_sample_uuid', + 'type': 'STRING' + }, { + 'name': 'file_gcs_path', + 'type': 'STRING', + 'mode': 'REQUIRED' + } + ] +} + + +class BigQueryGcsPathSupport(BigQuerySupport): + + def __init__(self, project_id, dataset_id, table_id): + super(BigQueryGcsPathSupport, self).__init__(project_id, dataset_id, table_id, table_schema=TEMP_PATH_SCHEMA) + + def _build_row(self, file_gdc_id, case_barcode, sample_barcode, case_gdc_id, sample_gdc_id, gcs_path): + return { + 'file_gdc_id': file_gdc_id, + 'case_barcode': case_barcode, + 'sample_barcode': sample_barcode, + 'gdc_case_uuid': case_gdc_id, + 'gdc_sample_uuid': sample_gdc_id, + 'file_gcs_path': gcs_path + } + + # Create the path table and optionally insert a set of rows + def add_temp_path_table(self, paths=None): + + response = self._confirm_dataset_and_table( + "Temporary metadata_data GCS path table for {}, Build {}".format( + self.table_id.split('_')[0].upper(), self.table_id.split('_')[1].upper(), + ) + ) + + if 'status' in response and response['status'] == 'TABLE_MADE': + if paths: + rows = [] + for gdc_file_id in paths: + rows.append(self._build_row( + gdc_file_id, paths[gdc_file_id]['case_barcode'], paths[gdc_file_id]['sample_barcode'], + paths[gdc_file_id]['case_gdc_id'], paths[gdc_file_id]['sample_gdc_id'], paths[gdc_file_id]['gcs_path']) + ) + + response = self._streaming_insert(rows) + else: + logger.warn("[WARNING] Table {} was not successfully made!".format(self.table_id)) + + return response + + # Add rows to the GCS path table + def add_rows(self, paths): + rows = [] + for gdc_file_id in paths: + rows.append(self._build_row( + gdc_file_id, paths[gdc_file_id]['case_barcode'], paths[gdc_file_id]['sample_barcode'], + paths[gdc_file_id]['case_gdc_id'], paths[gdc_file_id]['sample_gdc_id'], paths[gdc_file_id]['gcs_path']) + ) + + response = self._streaming_insert(rows) + + return response