Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Set up script for Composer and support for getting gcsDagLocation from Composer environment #585

Merged
merged 21 commits into from
Nov 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion google/datalab/contrib/bigquery/commands/_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

"""Google Cloud Platform library - BigQuery IPython Functionality."""
from builtins import str
import google
import google.datalab.utils as utils


Expand All @@ -21,6 +20,10 @@ def _create_pipeline_subparser(parser):
'transform data using BigQuery.')
pipeline_parser.add_argument('-n', '--name', type=str, help='BigQuery pipeline name',
required=True)
pipeline_parser.add_argument('-e', '--environment', type=str,
help='The name of the Composer or Airflow environment.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only the name of the Composer right? Not used in Airflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. At the of putting the comment, I made it slightly future looking since we plan to work on an Airflow environment. Will change in a different PR mostly because I'm trying to get this in before demo time today.

pipeline_parser.add_argument('-z', '--zone', type=str,
help='The name of the Composer or Airflow zone.')
return pipeline_parser


Expand All @@ -44,11 +47,17 @@ def _pipeline_cell(args, cell_body):
bq_pipeline_config = utils.commands.parse_config(
cell_body, utils.commands.notebook_environment())
pipeline_spec = _get_pipeline_spec_from_config(bq_pipeline_config)
# TODO(rajivpb): This import is a stop-gap for
# https://github.com/googledatalab/pydatalab/issues/593
import google.datalab.contrib.pipeline._pipeline
pipeline = google.datalab.contrib.pipeline._pipeline.Pipeline(name, pipeline_spec)
utils.commands.notebook_environment()[name] = pipeline

# If a composer environment and zone are specified, we deploy to composer
if 'environment' in args and 'zone' in args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "'environment' in args and 'zone' in args" is always True. If user does not provide values for them, they will be None but still 'environment' and 'zone' exist in args.

Perhaps you want "if args['environment'] and args[ 'zone']:".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't know that. Will verify and fix in a different PR mostly because I'm trying to get this in before demo time today.

# TODO(rajivpb): This import is a stop-gap for
# https://github.com/googledatalab/pydatalab/issues/593
import google.datalab.contrib.pipeline.composer._composer
composer = google.datalab.contrib.pipeline.composer._composer.Composer(
args.get('zone'), args.get('environment'))
composer.deploy(name, pipeline._get_airflow_spec())
Expand Down
15 changes: 15 additions & 0 deletions google/datalab/contrib/pipeline/airflow/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash
PROJECT=${1:-cloud-ml-dev}
EMAIL=${2:[email protected]}
ZONE=${3:-us-central1}
ENVIRONMENT=${3:-rajivpb-airflow}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be 4?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct; fixed now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you fixed the composer one but not airflow one yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my bad - I kept thinking that all the comments referred to the composer's setup.sh and was a little confused on getting multiple comments for the same change. Will fix in a different PR mostly because I'm trying to get this in before demo time today.


gcloud config set project $PROJECT
gcloud config set account $EMAIL
gcloud auth login --activate $EMAIL

# We use the default cluster spec.
gcloud container clusters create $ENVIRONMENT --zone $ZONE

# Deploying the airflow container
kubectl run airflow --image=gcr.io/cloud-airflow-releaser/airflow-worker-scheduler-1.8
12 changes: 3 additions & 9 deletions google/datalab/contrib/pipeline/composer/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ class Api(object):
_ENDPOINT = 'https://composer.googleapis.com/v1alpha1'
_ENVIRONMENTS_PATH = '/projects/%s/locations/%s/environments/%s'

_DEFAULT_TIMEOUT = 60000

def environment_details_get(self, zone, environment):
@staticmethod
def environment_details_get(zone, environment):
""" Issues a request to load data from GCS to a BQ table

Args:
Expand All @@ -36,9 +35,4 @@ def environment_details_get(self, zone, environment):
default_context = google.datalab.Context.default()
url = Api._ENDPOINT + (Api._ENVIRONMENTS_PATH % (default_context.project_id, zone, environment))

args = {
'timeoutMs': Api._DEFAULT_TIMEOUT,
}

return google.datalab.utils.Http.request(url, args=args,
credentials=default_context.credentials)
return google.datalab.utils.Http.request(url, credentials=default_context.credentials)
39 changes: 28 additions & 11 deletions google/datalab/contrib/pipeline/composer/_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# the License.

import google.cloud.storage as gcs
import re
from google.datalab.contrib.pipeline.composer._api import Api


class Composer(object):
Expand All @@ -20,6 +22,8 @@ class Composer(object):
This object can be used to generate the python airflow spec.
"""

gcs_file_regexp = re.compile('gs://.*')

def __init__(self, zone, environment):
""" Initializes an instance of a Composer object.

Expand All @@ -29,22 +33,35 @@ def __init__(self, zone, environment):
"""
self._zone = zone
self._environment = environment
self._gcs_dag_location = None

def deploy(self, name, dag_string):
client = gcs.Client()
bucket = client.get_bucket(self.bucket_name)
filename = 'dags/{0}.py'.format(name)
blob = gcs.Blob(filename, bucket)
_, _, bucket_name, file_path = self.gcs_dag_location.split('/', 3) # setting maxsplit to 3
file_name = '{0}{1}.py'.format(file_path, name)

bucket = client.get_bucket(bucket_name)
blob = gcs.Blob(file_name, bucket)
blob.upload_from_string(dag_string)

@property
def bucket_name(self):
# TODO(rajivpb): Get this programmatically from the Composer API
return 'airflow-staging-test36490808-bucket'
def gcs_dag_location(self):
if not self._gcs_dag_location:
environment_details = Api.environment_details_get(self._zone, self._environment)

@property
def get_bucket_name(self):
# environment_details = Api().environment_details_get(self._zone, self._environment)
if ('config' not in environment_details or
'gcsDagLocation' not in environment_details.get('config')):
raise ValueError('Dag location unavailable from Composer environment {0}'.format(
self._environment))
gcs_dag_location = environment_details['config']['gcsDagLocation']

if gcs_dag_location is None or not self.gcs_file_regexp.match(gcs_dag_location):
raise ValueError(
'Dag location {0} from Composer environment {1} is in incorrect format'.format(
gcs_dag_location, self._environment))

self._gcs_dag_location = gcs_dag_location
if gcs_dag_location.endswith('/') is False:
self._gcs_dag_location = self._gcs_dag_location + '/'

# TODO(rajivpb): Get this programmatically from the Composer API
return 'airflow-staging-test36490808-bucket'
return self._gcs_dag_location
37 changes: 37 additions & 0 deletions google/datalab/contrib/pipeline/composer/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env bash
# We remove the local installation and install a version that allows custom repositories.
sudo apt-get -y remove google-cloud-sdk

# If an installation still exists after executing apt-get remove, try the following:
# gcloud info --format="value(installation.sdk_root)"
# that'll give you the installation directory of whichever installation gets executed
# you can remove that entire directory
# restart shell, rinse and repeat until gcloud is no longer on your path
# you might have to clean up your PATH in .bashrc and nuke the .config/gcloud directory

# Hopefully by now the machine is clean, so install gcloud
curl https://sdk.cloud.google.com | CLOUDSDK_CORE_DISABLE_PROMPTS=1 bash

# Recycling shell to pick up the new gcloud.
exec -l $SHELL

# These have to be set here and not on the top of the script because we recycle the shell somewhere
# between the start of this script and here.
PROJECT=${1:-cloud-ml-dev}
EMAIL=${2:[email protected]}
ZONE=${3:-us-central1}
ENVIRONMENT=${4:-datalab-testing-1}

gcloud config set project $PROJECT
gcloud config set account $EMAIL
gcloud auth login $EMAIL

gcloud components repositories add https://storage.googleapis.com/composer-trusted-tester/components-2.json
gcloud components update -q
gcloud components install -q alpha kubectl

gcloud config set composer/location $ZONE
gcloud alpha composer environments create $ENVIRONMENT
gcloud alpha composer environments describe $ENVIRONMENT


2 changes: 2 additions & 0 deletions release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# Compiles the typescript sources to javascript and submits the files
# to the pypi server specified as first parameter, defaults to testpypi
# In order to run this script locally, make sure you have the following:
# - A Python 3 environment (due to urllib issues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue open to fix this for Py2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# - Typescript installed
# - A configured ~/.pypirc containing your pypi/testpypi credentials with
# the server names matching the name you're passing in. Do not include
Expand All @@ -28,6 +29,7 @@
tsc --module amd --noImplicitAny datalab/notebook/static/*.ts
tsc --module amd --noImplicitAny google/datalab/notebook/static/*.ts

# Provide https://upload.pypi.org/legacy/ for prod binaries
server="${1:-https://test.pypi.python.org/pypi}"
echo "Submitting package to ${server}"

Expand Down
9 changes: 8 additions & 1 deletion tests/bigquery/pipeline_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def test_get_execute_parameters(self, mock_notebook_item):

self.assertDictEqual(actual_execute_config, expected_execute_config)

@mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get')
@mock.patch('google.datalab.Context.default')
@mock.patch('google.datalab.utils.commands.notebook_environment')
@mock.patch('google.datalab.utils.commands.get_notebook_item')
Expand All @@ -445,7 +446,7 @@ def test_get_execute_parameters(self, mock_notebook_item):
@mock.patch('google.cloud.storage.Client.get_bucket')
def test_pipeline_cell_golden(self, mock_client_get_bucket, mock_client, mock_blob_class,
mock_get_table, mock_table_exists, mock_notebook_item,
mock_environment, mock_default_context):
mock_environment, mock_default_context, mock_composer_env):
table = google.datalab.bigquery.Table('project.test.table')
mock_get_table.return_value = table
mock_table_exists.return_value = True
Expand All @@ -454,6 +455,12 @@ def test_pipeline_cell_golden(self, mock_client_get_bucket, mock_client, mock_bl
mock_client_get_bucket.return_value = mock.Mock(spec=google.cloud.storage.Bucket)
mock_blob = mock_blob_class.return_value

mock_composer_env.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/dags'
}
}

env = {
'endpoint': 'Interact2',
'job_id': '1234',
Expand Down
4 changes: 2 additions & 2 deletions tests/pipeline/composer_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def validate(self, mock_http_request, expected_url, expected_args=None, expected
@mock.patch('google.datalab.utils.Http.request')
def test_environment_details_get(self, mock_http_request, mock_context_default):
mock_context_default.return_value = TestCases._create_context()
Api().environment_details_get('ZONE', 'ENVIRONMENT')
Api.environment_details_get('ZONE', 'ENVIRONMENT')
self.validate(mock_http_request,
'https://composer.googleapis.com/v1alpha1/projects/test_project/locations/ZONE/'
'environments/ENVIRONMENT', expected_args={'timeoutMs': 60000})
'environments/ENVIRONMENT')

@staticmethod
def _create_context():
Expand Down
125 changes: 119 additions & 6 deletions tests/pipeline/composer_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,132 @@
import unittest
import mock

import google.auth
import google.datalab.utils
from google.datalab.contrib.pipeline.composer._composer import Composer


class TestCases(unittest.TestCase):

@mock.patch('google.cloud.storage.Client')
@mock.patch('google.datalab.Context.default')
@mock.patch('google.cloud.storage.Blob')
@mock.patch('google.cloud.storage.Client.get_bucket')
def test_deploy(self, mock_client_get_bucket, mock_blob_class, mock_client):
mock_client_get_bucket.return_value = mock.Mock(spec=google.cloud.storage.Bucket)
@mock.patch('google.cloud.storage.Client')
@mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get')
def test_deploy(self, mock_environment_details, mock_client, mock_blob_class,
mock_default_context):
# Happy path
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/dags'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
test_composer.deploy('foo_name', 'foo_dag_string')
mock_client.return_value.get_bucket.assert_called_with('foo_bucket')
mock_blob_class.assert_called_with('dags/foo_name.py', mock.ANY)
mock_blob = mock_blob_class.return_value
mock_blob.upload_from_string.assert_called_with('foo_dag_string')

# Only bucket with no path
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
test_composer.deploy('foo_name', 'foo_dag_string')
mock_client.return_value.get_bucket.assert_called_with('foo_bucket')
mock_blob_class.assert_called_with('foo_name.py', mock.ANY)
mock_blob = mock_blob_class.return_value
mock_blob.upload_from_string.assert_called_with('foo_dag_string')

# GCS dag location has additional parts
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/foo_random/dags'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
test_composer.deploy('foo_name', 'foo_dag_string')
mock_client.return_value.get_bucket.assert_called_with('foo_bucket')
mock_blob_class.assert_called_with('foo_random/dags/foo_name.py', mock.ANY)
mock_blob = mock_blob_class.return_value
mock_blob.upload_from_string.assert_called_with('foo_dag_string')

@mock.patch('google.datalab.contrib.pipeline.composer._api.Api.environment_details_get')
def test_gcs_dag_location(self, mock_environment_details):
# Composer returns good result
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/dags'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
self.assertEqual('gs://foo_bucket/dags/', test_composer.gcs_dag_location)

mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket' # only bucket
}
}
test_composer = Composer('foo_zone', 'foo_environment')
self.assertEqual('gs://foo_bucket/', test_composer.gcs_dag_location)

mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs://foo_bucket/' # with trailing slash
}
}
test_composer = Composer('foo_zone', 'foo_environment')
self.assertEqual('gs://foo_bucket/', test_composer.gcs_dag_location)

# Composer returns empty result
mock_environment_details.return_value = {}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError, 'Dag location unavailable from Composer environment foo_environment'):
test_composer.gcs_dag_location

# Composer returns empty result
mock_environment_details.return_value = {
'config': {}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError, 'Dag location unavailable from Composer environment foo_environment'):
test_composer.gcs_dag_location

# Composer returns None result
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': None
}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError,
'Dag location None from Composer environment foo_environment is in incorrect format'):
test_composer.gcs_dag_location

# Composer returns incorrect formats
mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'gs:/foo_bucket'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError,
('Dag location gs:/foo_bucket from Composer environment foo_environment is in'
' incorrect format')):
test_composer.gcs_dag_location

mock_environment_details.return_value = {
'config': {
'gcsDagLocation': 'as://foo_bucket'
}
}
test_composer = Composer('foo_zone', 'foo_environment')
with self.assertRaisesRegexp(
ValueError,
('Dag location as://foo_bucket from Composer environment foo_environment is in'
' incorrect format')):
test_composer.gcs_dag_location