From 31a9a649e116f5fd3e8e898b70b5f1e14ed12393 Mon Sep 17 00:00:00 2001 From: Rajiv Bharadwaja <4618540+rajivpb@users.noreply.github.com> Date: Fri, 10 Nov 2017 13:04:14 -0800 Subject: [PATCH] Set up script for Composer and support for getting gcsDagLocation from Composer environment (#585) * Bash script for setting up Composer * Parameterizing more stuff * Removing commented out commands and adding default values for the parameters * Finishing up the Composer setup script and starting the airflow setup script * Re-organizing some of the comments * Support for fetching gcs dag location from Composer environment * Add verification of bucket name * Minor refactoring, and added comments * Unit-tests for Composer.gcs_dag_location * Add stop-gap contrib imports to enable e2e * Add comments to release script * Remove timeoutMs from call to Composer API; this arg is invalid * Fix flake8 errors * Correct wrong index for shell script arguments * Refactor all gcs_dag_location checks into the property. * Disallow trailing slash in gcs_dag_location from Composer * Allow trailing slash in gcs dag location --- .../contrib/bigquery/commands/_bigquery.py | 11 +- .../datalab/contrib/pipeline/airflow/setup.sh | 15 +++ .../datalab/contrib/pipeline/composer/_api.py | 12 +- .../contrib/pipeline/composer/_composer.py | 39 ++++-- .../contrib/pipeline/composer/setup.sh | 37 ++++++ release.sh | 2 + tests/bigquery/pipeline_tests.py | 9 +- tests/pipeline/composer_api_tests.py | 4 +- tests/pipeline/composer_tests.py | 125 +++++++++++++++++- 9 files changed, 224 insertions(+), 30 deletions(-) create mode 100755 google/datalab/contrib/pipeline/airflow/setup.sh create mode 100755 google/datalab/contrib/pipeline/composer/setup.sh diff --git a/google/datalab/contrib/bigquery/commands/_bigquery.py b/google/datalab/contrib/bigquery/commands/_bigquery.py index c3589e380..9ff5f49ad 100644 --- a/google/datalab/contrib/bigquery/commands/_bigquery.py +++ b/google/datalab/contrib/bigquery/commands/_bigquery.py @@ -12,7 +12,6 @@ """Google Cloud Platform library - BigQuery IPython Functionality.""" from builtins import str -import google import google.datalab.utils as utils @@ -21,6 +20,10 @@ def _create_pipeline_subparser(parser): 'transform data using BigQuery.') pipeline_parser.add_argument('-n', '--name', type=str, help='BigQuery pipeline name', required=True) + pipeline_parser.add_argument('-e', '--environment', type=str, + help='The name of the Composer or Airflow environment.') + pipeline_parser.add_argument('-z', '--zone', type=str, + help='The name of the Composer or Airflow zone.') return pipeline_parser @@ -44,11 +47,17 @@ def _pipeline_cell(args, cell_body): bq_pipeline_config = utils.commands.parse_config( cell_body, utils.commands.notebook_environment()) pipeline_spec = _get_pipeline_spec_from_config(bq_pipeline_config) + # TODO(rajivpb): This import is a stop-gap for + # https://github.com/googledatalab/pydatalab/issues/593 + import google.datalab.contrib.pipeline._pipeline pipeline = google.datalab.contrib.pipeline._pipeline.Pipeline(name, pipeline_spec) utils.commands.notebook_environment()[name] = pipeline # If a composer environment and zone are specified, we deploy to composer if 'environment' in args and 'zone' in args: + # TODO(rajivpb): This import is a stop-gap for + # https://github.com/googledatalab/pydatalab/issues/593 + import google.datalab.contrib.pipeline.composer._composer composer = google.datalab.contrib.pipeline.composer._composer.Composer( args.get('zone'), args.get('environment')) composer.deploy(name, pipeline._get_airflow_spec()) diff --git a/google/datalab/contrib/pipeline/airflow/setup.sh b/google/datalab/contrib/pipeline/airflow/setup.sh new file mode 100755 index 000000000..b663d4b7b --- /dev/null +++ b/google/datalab/contrib/pipeline/airflow/setup.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +PROJECT=${1:-cloud-ml-dev} +EMAIL=${2:-rajivpb@google.com} +ZONE=${3:-us-central1} +ENVIRONMENT=${3:-rajivpb-airflow} + +gcloud config set project $PROJECT +gcloud config set account $EMAIL +gcloud auth login --activate $EMAIL + +# We use the default cluster spec. +gcloud container clusters create $ENVIRONMENT --zone $ZONE + +# Deploying the airflow container +kubectl run airflow --image=gcr.io/cloud-airflow-releaser/airflow-worker-scheduler-1.8 diff --git a/google/datalab/contrib/pipeline/composer/_api.py b/google/datalab/contrib/pipeline/composer/_api.py index 487902fb5..0cd73febb 100644 --- a/google/datalab/contrib/pipeline/composer/_api.py +++ b/google/datalab/contrib/pipeline/composer/_api.py @@ -20,9 +20,8 @@ class Api(object): _ENDPOINT = 'https://composer.googleapis.com/v1alpha1' _ENVIRONMENTS_PATH = '/projects/%s/locations/%s/environments/%s' - _DEFAULT_TIMEOUT = 60000 - - def environment_details_get(self, zone, environment): + @staticmethod + def environment_details_get(zone, environment): """ Issues a request to load data from GCS to a BQ table Args: @@ -36,9 +35,4 @@ def environment_details_get(self, zone, environment): default_context = google.datalab.Context.default() url = Api._ENDPOINT + (Api._ENVIRONMENTS_PATH % (default_context.project_id, zone, environment)) - args = { - 'timeoutMs': Api._DEFAULT_TIMEOUT, - } - - return google.datalab.utils.Http.request(url, args=args, - credentials=default_context.credentials) + return google.datalab.utils.Http.request(url, credentials=default_context.credentials) diff --git a/google/datalab/contrib/pipeline/composer/_composer.py b/google/datalab/contrib/pipeline/composer/_composer.py index ab4cc323b..5c5c8c88e 100644 --- a/google/datalab/contrib/pipeline/composer/_composer.py +++ b/google/datalab/contrib/pipeline/composer/_composer.py @@ -11,6 +11,8 @@ # the License. import google.cloud.storage as gcs +import re +from google.datalab.contrib.pipeline.composer._api import Api class Composer(object): @@ -20,6 +22,8 @@ class Composer(object): This object can be used to generate the python airflow spec. """ + gcs_file_regexp = re.compile('gs://.*') + def __init__(self, zone, environment): """ Initializes an instance of a Composer object. @@ -29,22 +33,35 @@ def __init__(self, zone, environment): """ self._zone = zone self._environment = environment + self._gcs_dag_location = None def deploy(self, name, dag_string): client = gcs.Client() - bucket = client.get_bucket(self.bucket_name) - filename = 'dags/{0}.py'.format(name) - blob = gcs.Blob(filename, bucket) + _, _, bucket_name, file_path = self.gcs_dag_location.split('/', 3) # setting maxsplit to 3 + file_name = '{0}{1}.py'.format(file_path, name) + + bucket = client.get_bucket(bucket_name) + blob = gcs.Blob(file_name, bucket) blob.upload_from_string(dag_string) @property - def bucket_name(self): - # TODO(rajivpb): Get this programmatically from the Composer API - return 'airflow-staging-test36490808-bucket' + def gcs_dag_location(self): + if not self._gcs_dag_location: + environment_details = Api.environment_details_get(self._zone, self._environment) - @property - def get_bucket_name(self): - # environment_details = Api().environment_details_get(self._zone, self._environment) + if ('config' not in environment_details or + 'gcsDagLocation' not in environment_details.get('config')): + raise ValueError('Dag location unavailable from Composer environment {0}'.format( + self._environment)) + gcs_dag_location = environment_details['config']['gcsDagLocation'] + + if gcs_dag_location is None or not self.gcs_file_regexp.match(gcs_dag_location): + raise ValueError( + 'Dag location {0} from Composer environment {1} is in incorrect format'.format( + gcs_dag_location, self._environment)) + + self._gcs_dag_location = gcs_dag_location + if gcs_dag_location.endswith('/') is False: + self._gcs_dag_location = self._gcs_dag_location + '/' - # TODO(rajivpb): Get this programmatically from the Composer API - return 'airflow-staging-test36490808-bucket' + return self._gcs_dag_location diff --git a/google/datalab/contrib/pipeline/composer/setup.sh b/google/datalab/contrib/pipeline/composer/setup.sh new file mode 100755 index 000000000..31e54a643 --- /dev/null +++ b/google/datalab/contrib/pipeline/composer/setup.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# We remove the local installation and install a version that allows custom repositories. +sudo apt-get -y remove google-cloud-sdk + +# If an installation still exists after executing apt-get remove, try the following: +# gcloud info --format="value(installation.sdk_root)" +# that'll give you the installation directory of whichever installation gets executed +# you can remove that entire directory +# restart shell, rinse and repeat until gcloud is no longer on your path +# you might have to clean up your PATH in .bashrc and nuke the .config/gcloud directory + +# Hopefully by now the machine is clean, so install gcloud +curl https://sdk.cloud.google.com | CLOUDSDK_CORE_DISABLE_PROMPTS=1 bash + +# Recycling shell to pick up the new gcloud. +exec -l $SHELL + +# These have to be set here and not on the top of the script because we recycle the shell somewhere +# between the start of this script and here. +PROJECT=${1:-cloud-ml-dev} +EMAIL=${2:-rajivpb@google.com} +ZONE=${3:-us-central1} +ENVIRONMENT=${4:-datalab-testing-1} + +gcloud config set project $PROJECT +gcloud config set account $EMAIL +gcloud auth login $EMAIL + +gcloud components repositories add https://storage.googleapis.com/composer-trusted-tester/components-2.json +gcloud components update -q +gcloud components install -q alpha kubectl + +gcloud config set composer/location $ZONE +gcloud alpha composer environments create $ENVIRONMENT +gcloud alpha composer environments describe $ENVIRONMENT + + diff --git a/release.sh b/release.sh index d963a0cc8..0ec7f45f1 100755 --- a/release.sh +++ b/release.sh @@ -16,6 +16,7 @@ # Compiles the typescript sources to javascript and submits the files # to the pypi server specified as first parameter, defaults to testpypi # In order to run this script locally, make sure you have the following: +# - A Python 3 environment (due to urllib issues) # - Typescript installed # - A configured ~/.pypirc containing your pypi/testpypi credentials with # the server names matching the name you're passing in. Do not include @@ -28,6 +29,7 @@ tsc --module amd --noImplicitAny datalab/notebook/static/*.ts tsc --module amd --noImplicitAny google/datalab/notebook/static/*.ts +# Provide https://upload.pypi.org/legacy/ for prod binaries server="${1:-https://test.pypi.python.org/pypi}" echo "Submitting package to ${server}" diff --git a/tests/bigquery/pipeline_tests.py b/tests/bigquery/pipeline_tests.py index 1ef88ed0c..27edf2e82 100644 --- a/tests/bigquery/pipeline_tests.py +++ b/tests/bigquery/pipeline_tests.py @@ -435,6 +435,7 @@ def test_get_execute_parameters(self, mock_notebook_item): self.assertDictEqual(actual_execute_config, expected_execute_config) + @mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get') @mock.patch('google.datalab.Context.default') @mock.patch('google.datalab.utils.commands.notebook_environment') @mock.patch('google.datalab.utils.commands.get_notebook_item') @@ -445,7 +446,7 @@ def test_get_execute_parameters(self, mock_notebook_item): @mock.patch('google.cloud.storage.Client.get_bucket') def test_pipeline_cell_golden(self, mock_client_get_bucket, mock_client, mock_blob_class, mock_get_table, mock_table_exists, mock_notebook_item, - mock_environment, mock_default_context): + mock_environment, mock_default_context, mock_composer_env): table = google.datalab.bigquery.Table('project.test.table') mock_get_table.return_value = table mock_table_exists.return_value = True @@ -454,6 +455,12 @@ def test_pipeline_cell_golden(self, mock_client_get_bucket, mock_client, mock_bl mock_client_get_bucket.return_value = mock.Mock(spec=google.cloud.storage.Bucket) mock_blob = mock_blob_class.return_value + mock_composer_env.return_value = { + 'config': { + 'gcsDagLocation': 'gs://foo_bucket/dags' + } + } + env = { 'endpoint': 'Interact2', 'job_id': '1234', diff --git a/tests/pipeline/composer_api_tests.py b/tests/pipeline/composer_api_tests.py index 93c782fd9..256a7046d 100644 --- a/tests/pipeline/composer_api_tests.py +++ b/tests/pipeline/composer_api_tests.py @@ -48,10 +48,10 @@ def validate(self, mock_http_request, expected_url, expected_args=None, expected @mock.patch('google.datalab.utils.Http.request') def test_environment_details_get(self, mock_http_request, mock_context_default): mock_context_default.return_value = TestCases._create_context() - Api().environment_details_get('ZONE', 'ENVIRONMENT') + Api.environment_details_get('ZONE', 'ENVIRONMENT') self.validate(mock_http_request, 'https://composer.googleapis.com/v1alpha1/projects/test_project/locations/ZONE/' - 'environments/ENVIRONMENT', expected_args={'timeoutMs': 60000}) + 'environments/ENVIRONMENT') @staticmethod def _create_context(): diff --git a/tests/pipeline/composer_tests.py b/tests/pipeline/composer_tests.py index 22d3dab71..113e89113 100644 --- a/tests/pipeline/composer_tests.py +++ b/tests/pipeline/composer_tests.py @@ -13,19 +13,132 @@ import unittest import mock -import google.auth -import google.datalab.utils from google.datalab.contrib.pipeline.composer._composer import Composer class TestCases(unittest.TestCase): - @mock.patch('google.cloud.storage.Client') + @mock.patch('google.datalab.Context.default') @mock.patch('google.cloud.storage.Blob') - @mock.patch('google.cloud.storage.Client.get_bucket') - def test_deploy(self, mock_client_get_bucket, mock_blob_class, mock_client): - mock_client_get_bucket.return_value = mock.Mock(spec=google.cloud.storage.Bucket) + @mock.patch('google.cloud.storage.Client') + @mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get') + def test_deploy(self, mock_environment_details, mock_client, mock_blob_class, + mock_default_context): + # Happy path + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'gs://foo_bucket/dags' + } + } + test_composer = Composer('foo_zone', 'foo_environment') + test_composer.deploy('foo_name', 'foo_dag_string') + mock_client.return_value.get_bucket.assert_called_with('foo_bucket') + mock_blob_class.assert_called_with('dags/foo_name.py', mock.ANY) + mock_blob = mock_blob_class.return_value + mock_blob.upload_from_string.assert_called_with('foo_dag_string') + + # Only bucket with no path + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'gs://foo_bucket' + } + } + test_composer = Composer('foo_zone', 'foo_environment') + test_composer.deploy('foo_name', 'foo_dag_string') + mock_client.return_value.get_bucket.assert_called_with('foo_bucket') + mock_blob_class.assert_called_with('foo_name.py', mock.ANY) mock_blob = mock_blob_class.return_value + mock_blob.upload_from_string.assert_called_with('foo_dag_string') + + # GCS dag location has additional parts + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'gs://foo_bucket/foo_random/dags' + } + } test_composer = Composer('foo_zone', 'foo_environment') test_composer.deploy('foo_name', 'foo_dag_string') + mock_client.return_value.get_bucket.assert_called_with('foo_bucket') + mock_blob_class.assert_called_with('foo_random/dags/foo_name.py', mock.ANY) + mock_blob = mock_blob_class.return_value mock_blob.upload_from_string.assert_called_with('foo_dag_string') + + @mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get') + def test_gcs_dag_location(self, mock_environment_details): + # Composer returns good result + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'gs://foo_bucket/dags' + } + } + test_composer = Composer('foo_zone', 'foo_environment') + self.assertEqual('gs://foo_bucket/dags/', test_composer.gcs_dag_location) + + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'gs://foo_bucket' # only bucket + } + } + test_composer = Composer('foo_zone', 'foo_environment') + self.assertEqual('gs://foo_bucket/', test_composer.gcs_dag_location) + + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'gs://foo_bucket/' # with trailing slash + } + } + test_composer = Composer('foo_zone', 'foo_environment') + self.assertEqual('gs://foo_bucket/', test_composer.gcs_dag_location) + + # Composer returns empty result + mock_environment_details.return_value = {} + test_composer = Composer('foo_zone', 'foo_environment') + with self.assertRaisesRegexp( + ValueError, 'Dag location unavailable from Composer environment foo_environment'): + test_composer.gcs_dag_location + + # Composer returns empty result + mock_environment_details.return_value = { + 'config': {} + } + test_composer = Composer('foo_zone', 'foo_environment') + with self.assertRaisesRegexp( + ValueError, 'Dag location unavailable from Composer environment foo_environment'): + test_composer.gcs_dag_location + + # Composer returns None result + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': None + } + } + test_composer = Composer('foo_zone', 'foo_environment') + with self.assertRaisesRegexp( + ValueError, + 'Dag location None from Composer environment foo_environment is in incorrect format'): + test_composer.gcs_dag_location + + # Composer returns incorrect formats + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'gs:/foo_bucket' + } + } + test_composer = Composer('foo_zone', 'foo_environment') + with self.assertRaisesRegexp( + ValueError, + ('Dag location gs:/foo_bucket from Composer environment foo_environment is in' + ' incorrect format')): + test_composer.gcs_dag_location + + mock_environment_details.return_value = { + 'config': { + 'gcsDagLocation': 'as://foo_bucket' + } + } + test_composer = Composer('foo_zone', 'foo_environment') + with self.assertRaisesRegexp( + ValueError, + ('Dag location as://foo_bucket from Composer environment foo_environment is in' + ' incorrect format')): + test_composer.gcs_dag_location