Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Commit

Permalink
Set up script for Composer and support for getting gcsDagLocation fro…
Browse files Browse the repository at this point in the history
…m Composer environment (#585)

* Bash script for setting up Composer

* Parameterizing more stuff

* Removing commented out commands and adding default values for the parameters

* Finishing up the Composer setup script and starting the airflow setup script

* Re-organizing some of the comments

* Support for fetching gcs dag location from Composer environment

* Add verification of bucket name

* Minor refactoring, and added comments

* Unit-tests for Composer.gcs_dag_location

* Add stop-gap contrib imports to enable e2e

* Add comments to release script

* Remove timeoutMs from call to Composer API; this arg is invalid

* Fix flake8 errors

* Correct wrong index for shell script arguments

* Refactor all gcs_dag_location checks into the property.

* Disallow trailing slash in gcs_dag_location from Composer

* Allow trailing slash in gcs dag location
  • Loading branch information
rajivpb authored Nov 10, 2017
1 parent 7a472e9 commit 31a9a64
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 30 deletions.
11 changes: 10 additions & 1 deletion google/datalab/contrib/bigquery/commands/_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

"""Google Cloud Platform library - BigQuery IPython Functionality."""
from builtins import str
import google
import google.datalab.utils as utils


Expand All @@ -21,6 +20,10 @@ def _create_pipeline_subparser(parser):
'transform data using BigQuery.')
pipeline_parser.add_argument('-n', '--name', type=str, help='BigQuery pipeline name',
required=True)
pipeline_parser.add_argument('-e', '--environment', type=str,
help='The name of the Composer or Airflow environment.')
pipeline_parser.add_argument('-z', '--zone', type=str,
help='The name of the Composer or Airflow zone.')
return pipeline_parser


Expand All @@ -44,11 +47,17 @@ def _pipeline_cell(args, cell_body):
bq_pipeline_config = utils.commands.parse_config(
cell_body, utils.commands.notebook_environment())
pipeline_spec = _get_pipeline_spec_from_config(bq_pipeline_config)
# TODO(rajivpb): This import is a stop-gap for
# https://github.com/googledatalab/pydatalab/issues/593
import google.datalab.contrib.pipeline._pipeline
pipeline = google.datalab.contrib.pipeline._pipeline.Pipeline(name, pipeline_spec)
utils.commands.notebook_environment()[name] = pipeline

# If a composer environment and zone are specified, we deploy to composer
if 'environment' in args and 'zone' in args:
# TODO(rajivpb): This import is a stop-gap for
# https://github.com/googledatalab/pydatalab/issues/593
import google.datalab.contrib.pipeline.composer._composer
composer = google.datalab.contrib.pipeline.composer._composer.Composer(
args.get('zone'), args.get('environment'))
composer.deploy(name, pipeline._get_airflow_spec())
Expand Down
15 changes: 15 additions & 0 deletions google/datalab/contrib/pipeline/airflow/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash
PROJECT=${1:-cloud-ml-dev}
EMAIL=${2:-rajivpb@google.com}
ZONE=${3:-us-central1}
ENVIRONMENT=${3:-rajivpb-airflow}

gcloud config set project $PROJECT
gcloud config set account $EMAIL
gcloud auth login --activate $EMAIL

# We use the default cluster spec.
gcloud container clusters create $ENVIRONMENT --zone $ZONE

# Deploying the airflow container
kubectl run airflow --image=gcr.io/cloud-airflow-releaser/airflow-worker-scheduler-1.8
12 changes: 3 additions & 9 deletions google/datalab/contrib/pipeline/composer/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ class Api(object):
_ENDPOINT = 'https://composer.googleapis.com/v1alpha1'
_ENVIRONMENTS_PATH = '/projects/%s/locations/%s/environments/%s'

_DEFAULT_TIMEOUT = 60000

def environment_details_get(self, zone, environment):
@staticmethod
def environment_details_get(zone, environment):
""" Issues a request to load data from GCS to a BQ table
Args:
Expand All @@ -36,9 +35,4 @@ def environment_details_get(self, zone, environment):
default_context = google.datalab.Context.default()
url = Api._ENDPOINT + (Api._ENVIRONMENTS_PATH % (default_context.project_id, zone, environment))

args = {
'timeoutMs': Api._DEFAULT_TIMEOUT,
}

return google.datalab.utils.Http.request(url, args=args,
credentials=default_context.credentials)
return google.datalab.utils.Http.request(url, credentials=default_context.credentials)
39 changes: 28 additions & 11 deletions google/datalab/contrib/pipeline/composer/_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# the License.

import google.cloud.storage as gcs
import re
from google.datalab.contrib.pipeline.composer._api import Api


class Composer(object):
Expand All @@ -20,6 +22,8 @@ class Composer(object):
This object can be used to generate the python airflow spec.
"""

gcs_file_regexp = re.compile('gs://.*')

def __init__(self, zone, environment):
""" Initializes an instance of a Composer object.
Expand All @@ -29,22 +33,35 @@ def __init__(self, zone, environment):
"""
self._zone = zone
self._environment = environment
self._gcs_dag_location = None

def deploy(self, name, dag_string):
client = gcs.Client()
bucket = client.get_bucket(self.bucket_name)
filename = 'dags/{0}.py'.format(name)
blob = gcs.Blob(filename, bucket)
_, _, bucket_name, file_path = self.gcs_dag_location.split('/', 3) # setting maxsplit to 3
file_name = '{0}{1}.py'.format(file_path, name)

bucket = client.get_bucket(bucket_name)
blob = gcs.Blob(file_name, bucket)
blob.upload_from_string(dag_string)

@property
def bucket_name(self):
# TODO(rajivpb): Get this programmatically from the Composer API
return 'airflow-staging-test36490808-bucket'
def gcs_dag_location(self):
if not self._gcs_dag_location:
environment_details = Api.environment_details_get(self._zone, self._environment)

@property
def get_bucket_name(self):
# environment_details = Api().environment_details_get(self._zone, self._environment)
if ('config' not in environment_details or
'gcsDagLocation' not in environment_details.get('config')):
raise ValueError('Dag location unavailable from Composer environment {0}'.format(
self._environment))
gcs_dag_location = environment_details['config']['gcsDagLocation']

if gcs_dag_location is None or not self.gcs_file_regexp.match(gcs_dag_location):
raise ValueError(
'Dag location {0} from Composer environment {1} is in incorrect format'.format(
gcs_dag_location, self._environment))

self._gcs_dag_location = gcs_dag_location
if gcs_dag_location.endswith('/') is False:
self._gcs_dag_location = self._gcs_dag_location + '/'

# TODO(rajivpb): Get this programmatically from the Composer API
return 'airflow-staging-test36490808-bucket'
return self._gcs_dag_location
37 changes: 37 additions & 0 deletions google/datalab/contrib/pipeline/composer/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env bash
# We remove the local installation and install a version that allows custom repositories.
sudo apt-get -y remove google-cloud-sdk

# If an installation still exists after executing apt-get remove, try the following:
# gcloud info --format="value(installation.sdk_root)"
# that'll give you the installation directory of whichever installation gets executed
# you can remove that entire directory
# restart shell, rinse and repeat until gcloud is no longer on your path
# you might have to clean up your PATH in .bashrc and nuke the .config/gcloud directory

# Hopefully by now the machine is clean, so install gcloud
curl https://sdk.cloud.google.com | CLOUDSDK_CORE_DISABLE_PROMPTS=1 bash

# Recycling shell to pick up the new gcloud.
exec -l $SHELL

# These have to be set here and not on the top of the script because we recycle the shell somewhere
# between the start of this script and here.
PROJECT=${1:-cloud-ml-dev}
EMAIL=${2:-rajivpb@google.com}
ZONE=${3:-us-central1}
ENVIRONMENT=${4:-datalab-testing-1}

gcloud config set project $PROJECT
gcloud config set account $EMAIL
gcloud auth login $EMAIL

gcloud components repositories add https://storage.googleapis.com/composer-trusted-tester/components-2.json
gcloud components update -q
gcloud components install -q alpha kubectl

gcloud config set composer/location $ZONE
gcloud alpha composer environments create $ENVIRONMENT
gcloud alpha composer environments describe $ENVIRONMENT


2 changes: 2 additions & 0 deletions release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# Compiles the typescript sources to javascript and submits the files
# to the pypi server specified as first parameter, defaults to testpypi
# In order to run this script locally, make sure you have the following:
# - A Python 3 environment (due to urllib issues)
# - Typescript installed
# - A configured ~/.pypirc containing your pypi/testpypi credentials with
# the server names matching the name you're passing in. Do not include
Expand All @@ -28,6 +29,7 @@
tsc --module amd --noImplicitAny datalab/notebook/static/*.ts
tsc --module amd --noImplicitAny google/datalab/notebook/static/*.ts

# Provide https://upload.pypi.org/legacy/ for prod binaries
server="${1:-https://test.pypi.python.org/pypi}"
echo "Submitting package to ${server}"

Expand Down
9 changes: 8 additions & 1 deletion tests/bigquery/pipeline_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def test_get_execute_parameters(self, mock_notebook_item):

self.assertDictEqual(actual_execute_config, expected_execute_config)

@mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get')
@mock.patch('google.datalab.Context.default')
@mock.patch('google.datalab.utils.commands.notebook_environment')
@mock.patch('google.datalab.utils.commands.get_notebook_item')
Expand All @@ -445,7 +446,7 @@ def test_get_execute_parameters(self, mock_notebook_item):
@mock.patch('google.cloud.storage.Client.get_bucket')
def test_pipeline_cell_golden(self, mock_client_get_bucket, mock_client, mock_blob_class,
mock_get_table, mock_table_exists, mock_notebook_item,
mock_environment, mock_default_context):
mock_environment, mock_default_context, mock_composer_env):
table = google.datalab.bigquery.Table('project.test.table')
mock_get_table.return_value = table
mock_table_exists.return_value = True
Expand All @@ -454,6 +455,12 @@ def test_pipeline_cell_golden(self, mock_client_get_bucket, mock_client, mock_bl
mock_client_get_bucket.return_value = mock.Mock(spec=google.cloud.storage.Bucket)
mock_blob = mock_blob_class.return_value

mock_composer_env.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/dags'
}
}

env = {
'endpoint': 'Interact2',
'job_id': '1234',
Expand Down
4 changes: 2 additions & 2 deletions tests/pipeline/composer_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def validate(self, mock_http_request, expected_url, expected_args=None, expected
@mock.patch('google.datalab.utils.Http.request')
def test_environment_details_get(self, mock_http_request, mock_context_default):
mock_context_default.return_value = TestCases._create_context()
Api().environment_details_get('ZONE', 'ENVIRONMENT')
Api.environment_details_get('ZONE', 'ENVIRONMENT')
self.validate(mock_http_request,
'https://composer.googleapis.com/v1alpha1/projects/test_project/locations/ZONE/'
'environments/ENVIRONMENT', expected_args={'timeoutMs': 60000})
'environments/ENVIRONMENT')

@staticmethod
def _create_context():
Expand Down
125 changes: 119 additions & 6 deletions tests/pipeline/composer_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,132 @@
import unittest
import mock

import google.auth
import google.datalab.utils
from google.datalab.contrib.pipeline.composer._composer import Composer


class TestCases(unittest.TestCase):

@mock.patch('google.cloud.storage.Client')
@mock.patch('google.datalab.Context.default')
@mock.patch('google.cloud.storage.Blob')
@mock.patch('google.cloud.storage.Client.get_bucket')
def test_deploy(self, mock_client_get_bucket, mock_blob_class, mock_client):
mock_client_get_bucket.return_value = mock.Mock(spec=google.cloud.storage.Bucket)
@mock.patch('google.cloud.storage.Client')
@mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get')
def test_deploy(self, mock_environment_details, mock_client, mock_blob_class,
mock_default_context):
# Happy path
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/dags'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
test_composer.deploy('foo_name', 'foo_dag_string')
mock_client.return_value.get_bucket.assert_called_with('foo_bucket')
mock_blob_class.assert_called_with('dags/foo_name.py', mock.ANY)
mock_blob = mock_blob_class.return_value
mock_blob.upload_from_string.assert_called_with('foo_dag_string')

# Only bucket with no path
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
test_composer.deploy('foo_name', 'foo_dag_string')
mock_client.return_value.get_bucket.assert_called_with('foo_bucket')
mock_blob_class.assert_called_with('foo_name.py', mock.ANY)
mock_blob = mock_blob_class.return_value
mock_blob.upload_from_string.assert_called_with('foo_dag_string')

# GCS dag location has additional parts
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/foo_random/dags'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
test_composer.deploy('foo_name', 'foo_dag_string')
mock_client.return_value.get_bucket.assert_called_with('foo_bucket')
mock_blob_class.assert_called_with('foo_random/dags/foo_name.py', mock.ANY)
mock_blob = mock_blob_class.return_value
mock_blob.upload_from_string.assert_called_with('foo_dag_string')

@mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get')
def test_gcs_dag_location(self, mock_environment_details):
# Composer returns good result
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/dags'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
self.assertEqual('gs://foo_bucket/dags/', test_composer.gcs_dag_location)

mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket' # only bucket
}
}
test_composer = Composer('foo_zone', 'foo_environment')
self.assertEqual('gs://foo_bucket/', test_composer.gcs_dag_location)

mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/' # with trailing slash
}
}
test_composer = Composer('foo_zone', 'foo_environment')
self.assertEqual('gs://foo_bucket/', test_composer.gcs_dag_location)

# Composer returns empty result
mock_environment_details.return_value = {}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError, 'Dag location unavailable from Composer environment foo_environment'):
test_composer.gcs_dag_location

# Composer returns empty result
mock_environment_details.return_value = {
'config': {}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError, 'Dag location unavailable from Composer environment foo_environment'):
test_composer.gcs_dag_location

# Composer returns None result
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': None
}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError,
'Dag location None from Composer environment foo_environment is in incorrect format'):
test_composer.gcs_dag_location

# Composer returns incorrect formats
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs:/foo_bucket'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError,
('Dag location gs:/foo_bucket from Composer environment foo_environment is in'
' incorrect format')):
test_composer.gcs_dag_location

mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'as://foo_bucket'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError,
('Dag location as://foo_bucket from Composer environment foo_environment is in'
' incorrect format')):
test_composer.gcs_dag_location

0 comments on commit 31a9a64

Please sign in to comment.