diff --git a/.gitignore b/.gitignore index cc58c8aa..3c68561e 100644 --- a/.gitignore +++ b/.gitignore @@ -75,3 +75,6 @@ helpers/foundation-deployer/.steps.json # File to populate env vars used by Docker test runs .envrc + +#ignore directory pynb checkpoints +.ipynb_checkpoints/ diff --git a/1-org/envs/shared/ml_ops_org_policy.tf b/1-org/envs/shared/ml_ops_org_policy.tf index 96857918..c406df0c 100644 --- a/1-org/envs/shared/ml_ops_org_policy.tf +++ b/1-org/envs/shared/ml_ops_org_policy.tf @@ -27,25 +27,25 @@ locals { #Disable root access on new Vertex AI Workbench user-managed notebooks and instances #Control ID: VAI-CO-4.3 #NIST 800-53: AC-3 AC-17 AC-20 - #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 + #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 "ainotebooks.disableRootAccess", #Disable terminal on new Vertexx AI Workbench instances #Control ID: VAI-CO-4.4 #NIST 800-53: AC-3 AC-17 AC-20 - #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 + #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 "ainotebooks.disableTerminal", #Restrict public IP access on new Vertex AI Workbench notebooks and instances #Control ID: VAI-CO-4.7 #NIST 800-53: AC-3 AC-17 AC-20 - #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 + #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 "ainotebooks.restrictPublicIp", #Require automatic scheduled upgrades on new Vertex AI Workbench user-managed notebooks and instances #Control ID: VAI-CO-4.6 #NIST 800-53: AC-3 AC-17 AC-20 - #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 + #CRI Profile: PR.AC-3.1 PR.AC-3.2 PR.AC-4.1 PR.AC-4.2 PR.AC-4.3 PR.AC-6.1 PR.PT-3.1 PR.PT-4.1 "ainotebooks.requireAutoUpgradeSchedule", #Require VPC Connector diff --git a/2-environments/README.md b/2-environments/README.md index 21ff168f..352088c5 100644 --- a/2-environments/README.md +++ b/2-environments/README.md @@ -147,7 +147,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get ### `N.B.` Read this before continuing further!! -A logging project will be created in every environment (`development`, `non-production`, `production`) when running this code. This project contains a storage bucket for the purposes of project logging within its respective environment. This requires the `cloud-storage-analytics@google.com` group permissions for the storage bucket. Since foundations has more restricted security measures, a domain restriction constraint is enforced. This restraint will prevent Google service accounts to be added to any permissions. In order for this terraform code to execute without error, manual intervention must be made to ensure everything applies without issue. +A logging project will be created in every environment (`development`, `non-production`, `production`) when running this code. This project contains a storage bucket for the purposes of project logging within its respective environment. This requires the `cloud-storage-analytics@google.com` group permissions for the storage bucket. Since foundations has more restricted security measures, a domain restriction constraint is enforced. This restraint will prevent Google service accounts to be added to any permissions. In order for this terraform code to execute without error, manual intervention must be made to ensure everything applies without issue. You must disable the contraint in every folder that is about to be configured by terraform, push your code and then apply the contraint again: #### Do this before you push development, non-production & production diff --git a/4-projects/.gitignore b/4-projects/.gitignore index 06b8566e..224b9f89 100644 --- a/4-projects/.gitignore +++ b/4-projects/.gitignore @@ -59,4 +59,4 @@ credentials.json # Ignore any tfplan files *.tfplan -**/*.tfplan \ No newline at end of file +**/*.tfplan diff --git a/4-projects/business_unit_3/development/locals.tf b/4-projects/business_unit_3/development/locals.tf index 8fed7637..1fa2b16a 100644 --- a/4-projects/business_unit_3/development/locals.tf +++ b/4-projects/business_unit_3/development/locals.tf @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# locals { repo_name = "bu3-composer" business_code = "bu3" diff --git a/4-projects/business_unit_3/development/versions.tf b/4-projects/business_unit_3/development/versions.tf index 34095aac..b48e9763 100644 --- a/4-projects/business_unit_3/development/versions.tf +++ b/4-projects/business_unit_3/development/versions.tf @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# terraform { required_version = ">= 0.13" diff --git a/4-projects/business_unit_3/non-production/locals.tf b/4-projects/business_unit_3/non-production/locals.tf index 8fed7637..1fa2b16a 100644 --- a/4-projects/business_unit_3/non-production/locals.tf +++ b/4-projects/business_unit_3/non-production/locals.tf @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# locals { repo_name = "bu3-composer" business_code = "bu3" diff --git a/4-projects/business_unit_3/production/locals.tf b/4-projects/business_unit_3/production/locals.tf index 8fed7637..1fa2b16a 100644 --- a/4-projects/business_unit_3/production/locals.tf +++ b/4-projects/business_unit_3/production/locals.tf @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# locals { repo_name = "bu3-composer" business_code = "bu3" diff --git a/4-projects/business_unit_3/shared/example_service_catalog.tf b/4-projects/business_unit_3/shared/example_service_catalog.tf index 4360333c..46cae7ab 100644 --- a/4-projects/business_unit_3/shared/example_service_catalog.tf +++ b/4-projects/business_unit_3/shared/example_service_catalog.tf @@ -69,7 +69,7 @@ resource "google_project_service_identity" "secretmanager_agent" { service = "secretmanager.googleapis.com" } -// Add Secret Manager Service Agent to key with encrypt/decrypt permissions +// Add Secret Manager Service Agent to key with encrypt/decrypt permissions resource "google_kms_crypto_key_iam_member" "secretmanager_agent" { for_each = module.app_service_catalog_project[0].kms_keys crypto_key_id = each.value.id diff --git a/4-projects/business_unit_3/shared/versions.tf b/4-projects/business_unit_3/shared/versions.tf index 34095aac..b48e9763 100644 --- a/4-projects/business_unit_3/shared/versions.tf +++ b/4-projects/business_unit_3/shared/versions.tf @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# terraform { required_version = ">= 0.13" diff --git a/4-projects/modules/composer_env/crypto.tf b/4-projects/modules/composer_env/crypto.tf index f910a7c7..7aedbf42 100644 --- a/4-projects/modules/composer_env/crypto.tf +++ b/4-projects/modules/composer_env/crypto.tf @@ -59,7 +59,7 @@ resource "google_kms_crypto_key_iam_member" "app_key" { member = "serviceAccount:${local.app_infra_pipeline_service_accounts[var.repo_name]}" } -// Add Secret Manager Service Agent to key with encrypt/decrypt permissions +// Add Secret Manager Service Agent to key with encrypt/decrypt permissions resource "google_kms_crypto_key_iam_binding" "secretmanager_agent" { for_each = module.app_cloudbuild_project.crypto_key crypto_key_id = each.value.id diff --git a/4-projects/modules/composer_env/main.tf b/4-projects/modules/composer_env/main.tf index 780abb6f..e83cf8cd 100644 --- a/4-projects/modules/composer_env/main.tf +++ b/4-projects/modules/composer_env/main.tf @@ -93,7 +93,7 @@ module "app_cloudbuild_project" { # member = "serviceAccount:${local.app_infra_pipeline_service_accounts[var.repo_name]}" # } -# // Add Secret Manager Service Agent to key with encrypt/decrypt permissions +# // Add Secret Manager Service Agent to key with encrypt/decrypt permissions # resource "google_kms_crypto_key_iam_member" "secretmanager_agent" { # for_each = module.app_cloudbuild_project.crypto_key # crypto_key_id = each.value.id diff --git a/4-projects/modules/composer_env/playground.py b/4-projects/modules/composer_env/playground.py index 738cfa8c..fcee1f43 100644 --- a/4-projects/modules/composer_env/playground.py +++ b/4-projects/modules/composer_env/playground.py @@ -1,16 +1,31 @@ - +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# sa_accounts = [ "sa1", "sa2", "sa3", - ] +] service_catalog_crypto_key = { "projects/prj-d-kms-cgvl/locations/us-central1/keyRings/sample-keyring": { "destroy_scheduled_duration": "86400s", - "id": "projects/prj-d-kms-cgvl/locations/us-central1/keyRings/sample-keyring/cryptoKeys/prj-d-bu3cmpsr-pipeln", + "id": ("projects/prj-d-kms-cgvl/locations/us-central1/keyRings" + "/sample-keyring/cryptoKeys/prj-d-bu3cmpsr-pipeln"), "import_only": False, - "key_ring": "projects/prj-d-kms-cgvl/locations/us-central1/keyRings/sample-keyring", + "key_ring": ("projects/prj-d-kms-cgvl/locations/us-central1" + "/keyRings/sample-keyring"), "labels": {}, "name": "prj-d-bu3cmpsr-pipeln", "purpose": "ENCRYPT_DECRYPT", @@ -26,9 +41,11 @@ }, "projects/prj-d-kms-cgvl/locations/us-east4/keyRings/sample-keyring": { "destroy_scheduled_duration": "86400s", - "id": "projects/prj-d-kms-cgvl/locations/us-east4/keyRings/sample-keyring/cryptoKeys/prj-d-bu3cmpsr-pipeln", + "id": ("projects/prj-d-kms-cgvl/locations/us-east4/keyRings" + "/sample-keyring/cryptoKeys/prj-d-bu3cmpsr-pipeln"), "import_only": False, - "key_ring": "projects/prj-d-kms-cgvl/locations/us-east4/keyRings/sample-keyring", + "key_ring": ("projects/prj-d-kms-cgvl/locations/us-east4" + "/keyRings/sample-keyring"), "labels": {}, "name": "prj-d-bu3cmpsr-pipeln", "purpose": "ENCRYPT_DECRYPT", @@ -51,4 +68,4 @@ result_list.append({"id": value["id"], "sa_account": sa}) # Print the result list -print(result_list) \ No newline at end of file +print(result_list) diff --git a/4-projects/modules/composer_env/versions.tf b/4-projects/modules/composer_env/versions.tf index 34095aac..b48e9763 100644 --- a/4-projects/modules/composer_env/versions.tf +++ b/4-projects/modules/composer_env/versions.tf @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# terraform { required_version = ">= 0.13" diff --git a/5-app-infra/0-gcp-policies/README.md b/5-app-infra/0-gcp-policies/README.md index c5b2277a..3995e7c9 100644 --- a/5-app-infra/0-gcp-policies/README.md +++ b/5-app-infra/0-gcp-policies/README.md @@ -87,4 +87,4 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get ```bash cd .. - ``` \ No newline at end of file + ``` diff --git a/5-app-infra/1-artifact-publish/README.md b/5-app-infra/1-artifact-publish/README.md index 8cf9fc2e..6d592a31 100644 --- a/5-app-infra/1-artifact-publish/README.md +++ b/5-app-infra/1-artifact-publish/README.md @@ -191,7 +191,7 @@ commands. The `-T` flag is needed for Linux, but causes problems for MacOS. echo "remote_state_bucket = ${remote_state_bucket}" sed -i "s/REMOTE_STATE_BUCKET/${remote_state_bucket}/" ./common.auto.tfvars ``` - + 1. Update `backend.tf` with your bucket from the infra pipeline output. ```bash diff --git a/5-app-infra/2-artifact-publish-repo/README.md b/5-app-infra/2-artifact-publish-repo/README.md index 6d4f02f5..c08fc84f 100644 --- a/5-app-infra/2-artifact-publish-repo/README.md +++ b/5-app-infra/2-artifact-publish-repo/README.md @@ -64,9 +64,9 @@ commands. The `-T` flag is needed for Linux, but causes problems for MacOS. ```shell export ARTIFACT_PROJECT_ID=$(terraform -chdir="gcp-projects/business_unit_3/shared" output -raw common_artifacts_project_id) echo ${ARTIFACT_PROJECT_ID} - ``` + ``` -1. Clone the freshly minted Cloud Source Repository that was created for this project. +1. Clone the freshly minted Cloud Source Repository that was created for this project. ```shell gcloud source repos clone publish-artifacts --project=${ARTIFACT_PROJECT_ID} ``` diff --git a/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-13:0.1/Dockerfile b/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-13:0.1/Dockerfile index bab449ab..731a8e02 100644 --- a/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-13:0.1/Dockerfile +++ b/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-13:0.1/Dockerfile @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# FROM us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest -RUN echo "Hello World" > helloworld.txt +RUN echo "Hello World" > helloworld.txt diff --git a/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-8:01/Dockerfile b/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-8:01/Dockerfile index 3628ba62..d1d7fc14 100644 --- a/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-8:01/Dockerfile +++ b/5-app-infra/2-artifact-publish-repo/images/tf2-cpu.2-8:01/Dockerfile @@ -1,2 +1,16 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# FROM us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:cmle_op_images_20240312_0210_RC00 diff --git a/5-app-infra/2-artifact-publish-repo/images/tf2-gpu.2-13:0.1/Dockerfile b/5-app-infra/2-artifact-publish-repo/images/tf2-gpu.2-13:0.1/Dockerfile index 3e442f59..d8015e53 100644 --- a/5-app-infra/2-artifact-publish-repo/images/tf2-gpu.2-13:0.1/Dockerfile +++ b/5-app-infra/2-artifact-publish-repo/images/tf2-gpu.2-13:0.1/Dockerfile @@ -1,2 +1,16 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# FROM us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-13:latest -RUN echo "Hello World" > helloworld.txt \ No newline at end of file +RUN echo "Hello World" > helloworld.txt diff --git a/5-app-infra/2-artifact-publish-repo/images/vertexpipeline:v2/Dockerfile b/5-app-infra/2-artifact-publish-repo/images/vertexpipeline:v2/Dockerfile index 31b1580b..b466e248 100644 --- a/5-app-infra/2-artifact-publish-repo/images/vertexpipeline:v2/Dockerfile +++ b/5-app-infra/2-artifact-publish-repo/images/vertexpipeline:v2/Dockerfile @@ -1,2 +1,16 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# FROM tensorflow/tensorflow:2.8.0 -RUN pip install tensorflow-io==0.25.0 protobuf==3.20.0 google-cloud-bigquery==3.13.0 pandas==2.0.3 db-dtypes==1.2.0 google-cloud-aiplatform==1.36.0 google-cloud-storage==2.14.0 kfp google-cloud-pipeline-components \ No newline at end of file +RUN pip install tensorflow-io==0.25.0 protobuf==3.20.0 google-cloud-bigquery==3.13.0 pandas==2.0.3 db-dtypes==1.2.0 google-cloud-aiplatform==1.36.0 google-cloud-storage==2.14.0 kfp google-cloud-pipeline-components diff --git a/5-app-infra/3-service-catalog/README.md b/5-app-infra/3-service-catalog/README.md index 76014d96..8333b28a 100644 --- a/5-app-infra/3-service-catalog/README.md +++ b/5-app-infra/3-service-catalog/README.md @@ -57,11 +57,11 @@ file. This project has two main purposes: 1. To deploy a pipeline and a bucket which is linked to a Google Cloud Repository that houses terraform modules for the use in Service Catalog. -Although Service Catalog itself must be manually deployed, the modules which will be used can still be automated. +Although Service Catalog itself must be manually deployed, the modules which will be used can still be automated. 2. To deploy infrastructure for operational environments (ie. `non-production` & `production`.) -The resoning behind utilizing one repository with two deployment methodologies is due to how close interactive (`development`) and operational environments are. +The resoning behind utilizing one repository with two deployment methodologies is due to how close interactive (`development`) and operational environments are. The repository has the structure (truncated for brevity): ``` @@ -100,7 +100,7 @@ The repository has the structure (truncated for brevity): │   ├── outputs.tf │   └── variables.tf ``` -Each folder under `modules` represents a terraform module. +Each folder under `modules` represents a terraform module. When there is a change in any of the terraform module folders, the pipeline will find whichever module has been changed since the last push, `tar.gz` that file and place it in a bucket for Service Catalog to access. This pipeline is listening to the `main` branch of this repository for changes in order for the modules to be uploaded to service catalog. diff --git a/5-app-infra/4-service-catalog-repo/README.md b/5-app-infra/4-service-catalog-repo/README.md index 1622030b..ebdacf3d 100644 --- a/5-app-infra/4-service-catalog-repo/README.md +++ b/5-app-infra/4-service-catalog-repo/README.md @@ -51,9 +51,9 @@ This repo provides a number of the [Google Service Catalog](https://cloud.google ```shell export SERVICE_CATALOG_PROJECT_ID=$(terraform -chdir="gcp-projects/business_unit_3/shared" output -raw service_catalog_project_id) echo ${SERVICE_CATALOG_PROJECT_ID} - ``` + ``` -1. Clone the freshly minted Cloud Source Repository that was created for this project. +1. Clone the freshly minted Cloud Source Repository that was created for this project. ```shell gcloud source repos clone service-catalog --project=${SERVICE_CATALOG_PROJECT_ID} ``` diff --git a/5-app-infra/4-service-catalog-repo/modules/bucket/README.md b/5-app-infra/4-service-catalog-repo/modules/bucket/README.md index f846d01a..e351f625 100644 --- a/5-app-infra/4-service-catalog-repo/modules/bucket/README.md +++ b/5-app-infra/4-service-catalog-repo/modules/bucket/README.md @@ -145,4 +145,5 @@ The following table outlines which of the suggested controls for Vertex Generati |------|-------------| | storage\_bucket | Storage Bucket. | - \ No newline at end of file + + diff --git a/5-app-infra/4-service-catalog-repo/modules/composer/terraform.tfvars.example b/5-app-infra/4-service-catalog-repo/modules/composer/terraform.tfvars.example index 1b5b4dd6..3d4db2c1 100644 --- a/5-app-infra/4-service-catalog-repo/modules/composer/terraform.tfvars.example +++ b/5-app-infra/4-service-catalog-repo/modules/composer/terraform.tfvars.example @@ -26,4 +26,4 @@ web_server_allowed_ip_ranges = [ github_name_prefix = "github-composer-cloudbuild" github_app_installation_id = "APP_INSTALATION_ID_HERE" github_api_token = "GITHUB_API_TOKEN_HERE" -github_remote_uri = "LINK_TO_GITHUB_REPO_CONTAINING_DAGS" \ No newline at end of file +github_remote_uri = "LINK_TO_GITHUB_REPO_CONTAINING_DAGS" diff --git a/5-app-infra/5-vpc-sc/README.md b/5-app-infra/5-vpc-sc/README.md index 86f92765..39aaa3ca 100644 --- a/5-app-infra/5-vpc-sc/README.md +++ b/5-app-infra/5-vpc-sc/README.md @@ -62,12 +62,12 @@ cd into gcp-networks ```bash cd ../gcp-networks ``` - + Below, you can find the values that will need to be applied to `common.auto.tfvars` and your `development.auto.tfvars`, `non-production.auto.tfvars` & `production.auto.tfvars`. In `common.auto.tfvars` update your `perimeter_additional_members` to include: ``` -"serviceAccount:sa-tf-cb-bu3-machine-learning@[prj-c-bu3infra-pipeline-project-id].iam.gserviceaccount.com" +"serviceAccount:sa-tf-cb-bu3-machine-learning@[prj-c-bu3infra-pipeline-project-id].iam.gserviceaccount.com" "serviceAccount:sa-terraform-env@[prj-b-seed-project-id].iam.gserviceaccount.com" "serviceAccount:service-[prj-d-logging-project-number]@gs-project-accounts.iam.gserviceaccount.com" "serviceAccount:[prj-d-machine-learning-project-number]@cloudbuild.gserviceaccount.com" @@ -76,7 +76,7 @@ In `common.auto.tfvars` update your `perimeter_additional_members` to include: In each respective environment folders, update your `development.auto.tfvars`, `non-production.auto.tfvars` & `production.auto.tfvars` to include these changes under `ingress_policies` -You can find the `sources.access_level` information by going to `Security` in your GCP Organization. +You can find the `sources.access_level` information by going to `Security` in your GCP Organization. Once there, select the perimeter that is associated with the environment (eg. `development`). Copy the string under Perimeter Name and place it under `YOUR_ACCESS_LEVEL` @@ -126,8 +126,8 @@ egress_policies = [ { "from" = { "identity_type" = "" - "identities" = [ - "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", + "identities" = [ + "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@compute-system.iam.gserviceaccount.com", ] }, @@ -144,4 +144,4 @@ egress_policies = [ } }, ] -``` \ No newline at end of file +``` diff --git a/5-app-infra/6-machine-learning/README.md b/5-app-infra/6-machine-learning/README.md index 9edf9a8b..ce5c33ff 100644 --- a/5-app-infra/6-machine-learning/README.md +++ b/5-app-infra/6-machine-learning/README.md @@ -88,7 +88,7 @@ Have a github token for access to your repository ready, along with an [Applicat These environmental project inflations are closely tied to the `service-catalog` project that have already deployed. By now, the `bu3-service-catalog` should have been inflated. `service-catalog` contains modules that are being deployed in an interactive (development) environment. Since they already exist; they can be used as terraform modules for operational (non-production, production) environments. This was done in order to avoid code redundancy. One area for all `machine-learning` deployments. -Under `modules/base_env/main.tf` you will notice all module calls are using `git` links as sources. These links refer to the `service-catalog` cloud source repository we have already set up. +Under `modules/base_env/main.tf` you will notice all module calls are using `git` links as sources. These links refer to the `service-catalog` cloud source repository we have already set up. Step 12 in "Deploying with Cloud Build" highlights the necessary steps needed to point the module resources to the correct location. @@ -201,7 +201,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get 1. Update `modules/base_env/main.tf` with the name of service catalog project id to complete the git fqdn for module sources: ```bash export service_catalog_project_id=$(terraform -chdir="../gcp-projects/business_unit_3/shared/" output -raw service_catalog_project_id) - + ##LINUX sed -i "s/SERVICE-CATALOG-PROJECT-ID/${service_catalog_project_id}/" ./modules/base_env/main.tf @@ -221,7 +221,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get github_token = "YOUR-GITHUB-TOKEN" for env in "${envs[@]}"; do - output=$(terraform -chdir="../gcp-projects/business_unit_3/${env}" output -raw machine_learning_project_id) + output=$(terraform -chdir="../gcp-projects/business_unit_3/${env}" output -raw machine_learning_project_id) project_ids+=("$output") done diff --git a/5-app-infra/7-machine-learning-post-deployment/README.md b/5-app-infra/7-machine-learning-post-deployment/README.md index 9e09d10c..24267c55 100644 --- a/5-app-infra/7-machine-learning-post-deployment/README.md +++ b/5-app-infra/7-machine-learning-post-deployment/README.md @@ -40,7 +40,7 @@ 1. Many of the necessary service agents and permissions were deployed in all project environments for machine-learning. Additional entries will be needed for each environment. -1. Add in more agents to the DEVELOPMENT.AUTO.TFVARS file under `egress_policies`. +1. Add in more agents to the DEVELOPMENT.AUTO.TFVARS file under `egress_policies`. Notably: * "serviceAccount:bq-[prj-d-bu3machine-learning-project-number]@bigquery-encryption.iam.gserviceaccount.com" @@ -52,10 +52,10 @@ Notably: { "from" = { "identity_type" = "" - "identities" = [ - "serviceAccount:bq-[prj-d-bu3machine-learning-project-number]@bigquery-encryption.iam.gserviceaccount.com" << New Addition - "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", - "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@compute-system.iam.gserviceaccount.com", + "identities" = [ + "serviceAccount:bq-[prj-d-bu3machine-learning-project-number]@bigquery-encryption.iam.gserviceaccount.com" << New Addition + "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", + "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@compute-system.iam.gserviceaccount.com", ] }, "to" = { @@ -100,7 +100,7 @@ Notably: ] }, "to" = { - "resources" = ["projects/[prj-n-bu3machine-learning-number]"] + "resources" = ["projects/[prj-n-bu3machine-learning-number]"] "operations" = { "compute.googleapis.com" = { "methods" = ["*"] diff --git a/5-app-infra/README.md b/5-app-infra/README.md index 19adc50f..b0b98c3a 100644 --- a/5-app-infra/README.md +++ b/5-app-infra/README.md @@ -55,13 +55,13 @@ file. ## Purpose -Folders `1-artifact-publish`, `3-serive-catalog` and `5-machine-learning` are projects that will be _expanded_ upon. In step 4, we have initiated the creation of these projects, enabled API's and assigned roles to various service accounts, service agents and cryptography keys that are needed for each respective project to operate successfully. Folders `2-artifact-publish-repo` and `4-service-catalog-repo` are seperate cloud build repositories that have their own unique piplelines configured. These are used for building out in-house Docker images for your machine-learning pipelines and terraform modules that will be used in `notebooks` in your interactive (development) environment, as well as deployment modules for your operational (non-production, production) environments respectively. +Folders `1-artifact-publish`, `3-serive-catalog` and `5-machine-learning` are projects that will be _expanded_ upon. In step 4, we have initiated the creation of these projects, enabled API's and assigned roles to various service accounts, service agents and cryptography keys that are needed for each respective project to operate successfully. Folders `2-artifact-publish-repo` and `4-service-catalog-repo` are seperate cloud build repositories that have their own unique piplelines configured. These are used for building out in-house Docker images for your machine-learning pipelines and terraform modules that will be used in `notebooks` in your interactive (development) environment, as well as deployment modules for your operational (non-production, production) environments respectively. -For the purposes of this demonstration, we assume that you are using Cloud Build or manual deployment. +For the purposes of this demonstration, we assume that you are using Cloud Build or manual deployment. When viewing each folder under `projects`, consider them as seperate repositories which will be used to deploy out each respective project. In the case of using Cloud Build (which is what this example is primarily based on), each folder will be placed in its own GCP cloud source repository for deployment. There is a README placed in each project folder which will highlight the necessary steps to achieve deployment. -When deploying/expanding upon each project, you will find your Cloud Build pipelines being executed in `prj-c-bu3infra-pipeline`. +When deploying/expanding upon each project, you will find your Cloud Build pipelines being executed in `prj-c-bu3infra-pipeline`. The order of deployments for the machine-learning's project is as follows: @@ -77,8 +77,8 @@ The order of deployments for the machine-learning's project is as follows: ## VPC-SC Be aware that for the purposes of this machine learning project, there are several projects in each respective environment that have been placed within a `service perimeter`. -As such, during your deployment process, you _will_ encounter deployment errors related to VPC-SC violations. Before continuing onto `5-app-infra/projects`, you will need to go _back_ into `3-networks-dual-svpc` and _update_ -your ingress rules. +As such, during your deployment process, you _will_ encounter deployment errors related to VPC-SC violations. Before continuing onto `5-app-infra/projects`, you will need to go _back_ into `3-networks-dual-svpc` and _update_ +your ingress rules. Below, you can find the values that will need to be applied to `common.auto.tfvars` and your `development.auto.tfvars`, ###`non-production.auto.tfvars` & `production.auto.tfvars`. @@ -136,9 +136,9 @@ for your DEVELOPMENT.AUTO.TFVARS file, also include this as an egress policy: { "from" = { "identity_type" = "" - "identities" = [ - "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", - "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@compute-system.iam.gserviceaccount.com", + "identities" = [ + "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", + "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@compute-system.iam.gserviceaccount.com", ] }, "to" = { @@ -166,7 +166,7 @@ Please note that this will cover some but not ALL the policies that will be need "identities" = [] }, "to" = { - "resources" = ["projects/[some random google project id]"] + "resources" = ["projects/[some random google project id]"] "operations" = { "cloudbuild.googleapis.com" = { "methods" = ["*"] @@ -174,4 +174,4 @@ Please note that this will cover some but not ALL the policies that will be need } } }, - ``` \ No newline at end of file + ``` diff --git a/5-app-infra/projects/artifact-publish/README.md b/5-app-infra/projects/artifact-publish/README.md index c31be688..7e44405d 100644 --- a/5-app-infra/projects/artifact-publish/README.md +++ b/5-app-infra/projects/artifact-publish/README.md @@ -210,9 +210,9 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get ```shell export ARTIFACT_PROJECT_ID=$(terraform -chdir="gcp-projects/business_unit_3/shared" output -raw common_artifacts_project_id) echo ${ARTIFACT_PROJECT_ID} - ``` + ``` -1. Clone the freshly minted Cloud Source Repository that was created for this project. +1. Clone the freshly minted Cloud Source Repository that was created for this project. ```shell gcloud source repos clone publish-artifacts --project=${ARTIFACT_PROJECT_ID} ``` @@ -255,7 +255,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get echo "remote_state_bucket = ${remote_state_bucket}" sed -i "s/REMOTE_STATE_BUCKET/${remote_state_bucket}/" ./common.auto.tfvars ``` - + 1. Update `backend.tf` with your bucket from the infra pipeline output. ```bash diff --git a/5-app-infra/projects/machine-learning/README.md b/5-app-infra/projects/machine-learning/README.md index 23095e57..6475afac 100644 --- a/5-app-infra/projects/machine-learning/README.md +++ b/5-app-infra/projects/machine-learning/README.md @@ -89,7 +89,7 @@ Have a github token for access to your repository ready, along with an [Applicat These environmental project inflations are closely tied to the `service-catalog` project that have already deployed. By now, the `bu3-service-catalog` should have been inflated. `service-catalog` contains modules that are being deployed in an interactive (development) environment. Since they already exist; they can be used as terraform modules for operational (non-production, production) environments. This was done in order to avoid code redundancy. One area for all `machine-learning` deployments. -Under `modules/base_env/main.tf` you will notice all module calls are using `git` links as sources. These links refer to the `service-catalog` cloud source repository we have already set up. +Under `modules/base_env/main.tf` you will notice all module calls are using `git` links as sources. These links refer to the `service-catalog` cloud source repository we have already set up. Step 12 in "Deploying with Cloud Build" highlights the necessary steps needed to point the module resources to the correct location. @@ -169,7 +169,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get echo "remote_state_bucket = ${remote_state_bucket}" sed -i "s/REMOTE_STATE_BUCKET/${remote_state_bucket}/" ./common.auto.tfvars ``` - + 1. Update `backend.tf` with your bucket from the infra pipeline output. ```bash @@ -185,7 +185,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get 1. Update `modules/base_env/main.tf` with the name of service catalog project id to complete the git fqdn for module sources: ```bash export service_catalog_project_id=$(terraform -chdir="../gcp-projects/business_unit_3/shared/" output -raw service_catalog_project_id) - + ##LINUX sed -i "s/SERVICE-CATALOG-PROJECT-ID/${service_catalog_project_id}/" ./modules/base_env/main.tf @@ -205,7 +205,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get github_token = "YOUR-GITHUB-TOKEN" for env in "${envs[@]}"; do - output=$(terraform -chdir="../gcp-projects/business_unit_3/${env}" output -raw machine_learning_project_id) + output=$(terraform -chdir="../gcp-projects/business_unit_3/${env}" output -raw machine_learning_project_id) project_ids+=("$output") done @@ -247,7 +247,7 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get ## Post Deployment -Since this project is in a service perimiter, there will be _additional_ entries that will be needed. This is most notable for the `interactive` environment (development). Since many of the necessary service agents and permissions were deployed in this project, we will _need to return to `3-networks`_ and add in more agents to the DEVELOPMENT.AUTO.TFVARS file under `egress_policies`. +Since this project is in a service perimiter, there will be _additional_ entries that will be needed. This is most notable for the `interactive` environment (development). Since many of the necessary service agents and permissions were deployed in this project, we will _need to return to `3-networks`_ and add in more agents to the DEVELOPMENT.AUTO.TFVARS file under `egress_policies`. Notably: * "serviceAccount:bq-[prj-d-bu3machine-learning-project-number]@bigquery-encryption.iam.gserviceaccount.com" @@ -259,10 +259,10 @@ This should be added under identities. It should look like this:: { "from" = { "identity_type" = "" - "identities" = [ - "serviceAccount:bq-[prj-d-bu3machine-learning-project-number]@bigquery-encryption.iam.gserviceaccount.com" << New Addition - "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", - "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@compute-system.iam.gserviceaccount.com", + "identities" = [ + "serviceAccount:bq-[prj-d-bu3machine-learning-project-number]@bigquery-encryption.iam.gserviceaccount.com" << New Addition + "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@gcp-sa-notebooks.iam.gserviceaccount.com", + "serviceAccount:service-[prj-d-bu3machine-learning-project-number]@compute-system.iam.gserviceaccount.com", ] }, "to" = { @@ -294,7 +294,7 @@ egressViolations: [ } ] ``` -we want the `unknown-project-number` here. Add this into your `egress_policies` in `3-networks` under DEVELOPMENT.AUTO.TFVARS +we want the `unknown-project-number` here. Add this into your `egress_policies` in `3-networks` under DEVELOPMENT.AUTO.TFVARS ``` // Service Catalog { diff --git a/5-app-infra/projects/service-catalog/README.md b/5-app-infra/projects/service-catalog/README.md index be550ab2..a309c3f0 100644 --- a/5-app-infra/projects/service-catalog/README.md +++ b/5-app-infra/projects/service-catalog/README.md @@ -57,11 +57,11 @@ file. This project has two main purposes: 1. To deploy a pipeline and a bucket which is linked to a Google Cloud Repository that houses terraform modules for the use in Service Catalog. -Although Service Catalog itself must be manually deployed, the modules which will be used can still be automated. +Although Service Catalog itself must be manually deployed, the modules which will be used can still be automated. 2. To deploy infrastructure for operational environments (ie. `non-production` & `production`.) -The resoning behind utilizing one repository with two deployment methodologies is due to how close interactive (`development`) and operational environments are. +The resoning behind utilizing one repository with two deployment methodologies is due to how close interactive (`development`) and operational environments are. The repository has the structure (truncated for brevity): ``` @@ -100,7 +100,7 @@ The repository has the structure (truncated for brevity): │   ├── outputs.tf │   └── variables.tf ``` -Each folder under `modules` represents a terraform module. +Each folder under `modules` represents a terraform module. When there is a change in any of the terraform module folders, the pipeline will find whichever module has been changed since the last push, `tar.gz` that file and place it in a bucket for Service Catalog to access. This pipeline is listening to the `main` branch of this repository for changes in order for the modules to be uploaded to service catalog. @@ -244,9 +244,9 @@ Run `terraform output cloudbuild_project_id` in the `0-bootstrap` folder to get ```shell export SERVICE_CATALOG_PROJECT_ID=$(terraform -chdir="gcp-projects/business_unit_3/shared" output -raw service_catalog_project_id) echo ${SERVICE_CATALOG_PROJECT_ID} - ``` + ``` -1. Clone the freshly minted Cloud Source Repository that was created for this project. +1. Clone the freshly minted Cloud Source Repository that was created for this project. ```shell gcloud source repos clone service-catalog --project=${SERVICE_CATALOG_PROJECT_ID} ``` diff --git a/5-app-infra/source_repos/artifact-publish/images/tf2-cpu.2-13:0.1/Dockerfile b/5-app-infra/source_repos/artifact-publish/images/tf2-cpu.2-13:0.1/Dockerfile index bab449ab..731a8e02 100644 --- a/5-app-infra/source_repos/artifact-publish/images/tf2-cpu.2-13:0.1/Dockerfile +++ b/5-app-infra/source_repos/artifact-publish/images/tf2-cpu.2-13:0.1/Dockerfile @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# FROM us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest -RUN echo "Hello World" > helloworld.txt +RUN echo "Hello World" > helloworld.txt diff --git a/5-app-infra/source_repos/artifact-publish/images/tf2-gpu.2-13:0.1/Dockerfile b/5-app-infra/source_repos/artifact-publish/images/tf2-gpu.2-13:0.1/Dockerfile index 3e442f59..d8015e53 100644 --- a/5-app-infra/source_repos/artifact-publish/images/tf2-gpu.2-13:0.1/Dockerfile +++ b/5-app-infra/source_repos/artifact-publish/images/tf2-gpu.2-13:0.1/Dockerfile @@ -1,2 +1,16 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# FROM us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-13:latest -RUN echo "Hello World" > helloworld.txt \ No newline at end of file +RUN echo "Hello World" > helloworld.txt diff --git a/5-app-infra/source_repos/service-catalog/modules/bucket/README.md b/5-app-infra/source_repos/service-catalog/modules/bucket/README.md index f846d01a..e351f625 100644 --- a/5-app-infra/source_repos/service-catalog/modules/bucket/README.md +++ b/5-app-infra/source_repos/service-catalog/modules/bucket/README.md @@ -145,4 +145,5 @@ The following table outlines which of the suggested controls for Vertex Generati |------|-------------| | storage\_bucket | Storage Bucket. | - \ No newline at end of file + + diff --git a/5-app-infra/source_repos/service-catalog/modules/composer/terraform.tfvars.example b/5-app-infra/source_repos/service-catalog/modules/composer/terraform.tfvars.example index 1b5b4dd6..3d4db2c1 100644 --- a/5-app-infra/source_repos/service-catalog/modules/composer/terraform.tfvars.example +++ b/5-app-infra/source_repos/service-catalog/modules/composer/terraform.tfvars.example @@ -26,4 +26,4 @@ web_server_allowed_ip_ranges = [ github_name_prefix = "github-composer-cloudbuild" github_app_installation_id = "APP_INSTALATION_ID_HERE" github_api_token = "GITHUB_API_TOKEN_HERE" -github_remote_uri = "LINK_TO_GITHUB_REPO_CONTAINING_DAGS" \ No newline at end of file +github_remote_uri = "LINK_TO_GITHUB_REPO_CONTAINING_DAGS" diff --git a/6-ml-pipeline/dev/Readme.md b/6-ml-pipeline/dev/Readme.md index a1aed33d..4b1f24e1 100644 --- a/6-ml-pipeline/dev/Readme.md +++ b/6-ml-pipeline/dev/Readme.md @@ -27,7 +27,7 @@ You can read more about the details of the pipeline components on the [pipeline' # Step by step Before you start, make sure you have your personal git access token ready. The git menu option on the left bar of the workbench requires the personal token to connect to git and clone the repo. Also make sure to have a gcs bucket ready to store the artifacts for the tutorial. To deploy the bucket, you can go to service catalog and create a new deployment from the storage bucket solution. -### 1. Run the notebook +### 1. Run the notebook - Take 7-vertexpipeline folder and make you own copy as a standalone git repository and clone it in the workbench in your dev project. Create a dev branch of the new repository. Switch to the dev branch by choosing it in the branch section of the git view. Now go back to the file browser view by clicking the first option on the left bar menu. Navigate to the directory you just clone and run [the notebook](https://github.com/GoogleCloudPlatform/terraform-google-enterprise-genai/blob/main/7-vertexpipeline/census_pipeline.ipynb) cell by cell. Pay attention to the instructions and comments in the notebook and don't forget to set the correct values corresponding to your dev project. ### 2. Configure cloud build @@ -42,7 +42,7 @@ Also make sure to have a gcs bucket ready to store the artifacts for the tutoria |Configuration|Autodetected/Cloud Build configuration file (yaml or json)| |Location|Repository| |Cloud Build configuration file location|cloudbuild.yaml| - + - Open the cloudbuild.yaml file in your workbench and for steps 1 which uploads the source code for the dataflow job to your bucket. @@ -64,7 +64,7 @@ Also make sure to have a gcs bucket ready to store the artifacts for the tutoria entrypoint: 'python' args: ['compile_pipeline.py'] id: 'compile_job' - + # run pipeline - name: 'us-central1-docker.pkg.dev/{your-artifact-project}/c-publish-artifacts/vertexpipeline:v2' entrypoint: 'python' @@ -79,7 +79,7 @@ Also make sure to have a gcs bucket ready to store the artifacts for the tutoria - name: 'gcr.io/cloud-builders/gsutil' args: ['cp', './common/vertex-ai-pipeline/pipeline_package.yaml', 'gs://{your-composer-bucket}/dags/common/vertex-ai-pipeline/'] id: 'upload_composer_file' - + # upload pipeline dag to composer - name: 'gcr.io/cloud-builders/gsutil' args: ['cp', './composer/dags/dag.py', 'gs://{your-composer-bucket}/dags/'] diff --git a/7-composer/components/.ipynb_checkpoints/deployment-checkpoint.py b/7-composer/components/.ipynb_checkpoints/deployment-checkpoint.py deleted file mode 100644 index a2d3ee27..00000000 --- a/7-composer/components/.ipynb_checkpoints/deployment-checkpoint.py +++ /dev/null @@ -1,130 +0,0 @@ -import argparse - - -def get_args(): - parser = argparse.ArgumentParser() - parser.add_argument('--serving-container', dest='serving_container') - parser.add_argument('--model-name', dest='model_name') - parser.add_argument('--model-dir', dest='model_dir') - parser.add_argument('--endpoint-name', dest='endpoint_name') - parser.add_argument('--project', dest='project') - parser.add_argument('--region', dest='region') - parser.add_argument('--split', dest='split') - parser.add_argument('--min-nodes', dest='min_nodes') - parser.add_argument('--max-nodes', dest='max_nodes') - parser.add_argument('--service-account', dest='service_account') - args = parser.parse_args() - return args - - -def deploy_model( - serving_container_image_uri: str, - model_name: str, - model_dir: str, - endpoint_name: str, - project_id: str, - region: str, - split: int, - min_nodes: int, - max_nodes: int, - service_account: str, - model: Input[Model], - vertex_model: Output[Model], - vertex_endpoint: Output[Model] -): - from google.cloud import aiplatform - aiplatform.init(service_account=service_account) - def create_endpoint(): - endpoints = aiplatform.Endpoint.list( - filter=f'display_name="{endpoint_name}"', - order_by='create_time desc', - project=project_id, - location=region, - ) - if len(endpoints) > 0: - endpoint = endpoints[0] # most recently created - else: - endpoint = aiplatform.Endpoint.create( - display_name=endpoint_name, - project=project_id, - location=region - ) - return endpoint - - endpoint = create_endpoint() - - - def upload_model(): - listed_model = aiplatform.Model.list( - filter=f'display_name="{model_name}"', - project=project_id, - location=region, - ) - if len(listed_model) > 0: - model_version = listed_model[0] - model_upload = aiplatform.Model.upload( - display_name=model_name, - parent_model=model_version.resource_name, - artifact_uri=model_dir, - serving_container_image_uri=serving_container_image_uri, - location=region, - ) - else: - model_upload = aiplatform.Model.upload( - display_name=model_name, - artifact_uri=model_dir, - serving_container_image_uri=serving_container_image_uri, - location=region, - ) - return model_upload - - uploaded_model = upload_model() - - # Save data to the output params - vertex_model.uri = uploaded_model.resource_name - def deploy_to_endpoint(model, endpoint): - deployed_models = endpoint.list_models() - if len(deployed_models) > 0: - latest_model_id = deployed_models[-1].id - print("your objects properties:", deployed_models[0].create_time.__dir__()) - model_deploy = uploaded_model.deploy( - # machine_type="n1-standard-4", - endpoint=endpoint, - traffic_split={"0": 25, latest_model_id: 75}, - deployed_model_display_name=model_name, - min_replica_count=min_nodes, - max_replica_count=max_nodes, - # service_account="compute default" - ) - else: - model_deploy = uploaded_model.deploy( - # machine_type="n1-standard-4", - endpoint=endpoint, - traffic_split={"0": 100}, - min_replica_count=min_nodes, - max_replica_count=max_nodes, - deployed_model_display_name=model_name, - # service_account="compute default" - ) - return model_deploy.resource_name - - vertex_endpoint.uri = deploy_to_endpoint(vertex_model, endpoint) - vertex_endpoint.metadata['resourceName']=endpoint.resource_name - - - - -if __name__=="__main__": - args = get_args() - deploy_model( - serving_container_image_uri=args.serving_container, - model_name=args.model_name, - model_dir=args.model_dir, - endpoint_name=args.endpoint_name, - project_id=args.project_id, - region=args.region, - split=args.split, - min_nodes=args.min_nodes, - max_nodes=args.max_nodes, - service_account=args.service_account, -) diff --git a/7-composer/components/.ipynb_checkpoints/eval-checkpoint.py b/7-composer/components/.ipynb_checkpoints/eval-checkpoint.py deleted file mode 100644 index 4cb16a80..00000000 --- a/7-composer/components/.ipynb_checkpoints/eval-checkpoint.py +++ /dev/null @@ -1,110 +0,0 @@ -import argparse - - -def get_args(): - parser = argparse.ArgumentParser() - parser.add_argument('--project', dest='project') - parser.add_argument('--bq-table', dest='table_id') - parser.add_argument('--bq-dataset', dest='dataset_id') - parser.add_argument('--tb-log-dir', dest='tb_log_dir') - parser.add_argument('--model-dir', dest='model_dir') - parser.add_argument('--batch_size', dest='batch_size') - args = parser.parse_args() - return args - -# evaluation component -@component( - base_image=Image, - output_component_file=f"{KFP_COMPONENTS_PATH}/custom_eval_component/eval.yaml" -) -def custom_eval_model( - model_dir: str, - project: str, - table: str, - dataset: str, - tb_log_dir: str, - model: Input[Model], - metrics: Output[Metrics], - batch_size: int = 32, -)-> NamedTuple("Outputs", [("dep_decision", str)]): - from tensorflow.python.framework import ops - from tensorflow.python.framework import dtypes - from tensorflow_io.bigquery import BigQueryClient - from tensorflow_io.bigquery import BigQueryReadSession - from tensorflow import feature_column - from google.cloud import bigquery - - - import tensorflow as tf - CSV_SCHEMA = [ - bigquery.SchemaField("age", "FLOAT64"), - bigquery.SchemaField("workclass", "STRING"), - bigquery.SchemaField("fnlwgt", "FLOAT64"), - bigquery.SchemaField("education", "STRING"), - bigquery.SchemaField("education_num", "FLOAT64"), - bigquery.SchemaField("marital_status", "STRING"), - bigquery.SchemaField("occupation", "STRING"), - bigquery.SchemaField("relationship", "STRING"), - bigquery.SchemaField("race", "STRING"), - bigquery.SchemaField("gender", "STRING"), - bigquery.SchemaField("capital_gain", "FLOAT64"), - bigquery.SchemaField("capital_loss", "FLOAT64"), - bigquery.SchemaField("hours_per_week", "FLOAT64"), - bigquery.SchemaField("native_country", "STRING"), - bigquery.SchemaField("income_bracket", "STRING"), - ] - - UNUSED_COLUMNS = ["fnlwgt", "education_num"] - def transform_row(row_dict): - # Trim all string tensors - trimmed_dict = { column: - (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) - for (column,tensor) in row_dict.items() - } - # Extract feature column - income_bracket = trimmed_dict.pop('income_bracket') - # Convert feature column to 0.0/1.0 - income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), - lambda: tf.constant(1.0), - lambda: tf.constant(0.0)) - return (trimmed_dict, income_bracket_float) - - def read_bigquery(table_name, dataset=dataset): - tensorflow_io_bigquery_client = BigQueryClient() - read_session = tensorflow_io_bigquery_client.read_session( - "projects/" + project, - project, table, dataset, - list(field.name for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - list(dtypes.double if field.field_type == 'FLOAT64' - else dtypes.string for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - requested_streams=2) - - dataset = read_session.parallel_read_rows() - transformed_ds = dataset.map(transform_row) - return transformed_ds - - eval_ds = read_bigquery(table).batch(batch_size) - keras_model = tf.keras.models.load_model(model.path) - tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir) - loss, accuracy = keras_model.evaluate(eval_ds, callbacks=[tensorboard]) - metrics.log_metric("accuracy", accuracy) - - if accuracy > 0.8: - dep_decision = "true" - keras_model.save(model_dir) - else: - dep_decision = "false" - return (dep_decision,) - -if __name__=="__main__": - args = get_args() - custom_eval_model( - project=args.project, - table=args.table_id, - dataset=args.dataset_id, - tb_log_dir=args.tb_log_dir, - model_dir=args.model_dir, - batch_size=args.batch_size, -) \ No newline at end of file diff --git a/7-composer/components/.ipynb_checkpoints/monitoring-checkpoint.py b/7-composer/components/.ipynb_checkpoints/monitoring-checkpoint.py deleted file mode 100644 index 1ab7110e..00000000 --- a/7-composer/components/.ipynb_checkpoints/monitoring-checkpoint.py +++ /dev/null @@ -1,125 +0,0 @@ -import argparse - - -def get_args(): - parser = argparse.ArgumentParser() - parser.add_argument('--monitoring-name', dest='monitoring_name') - parser.add_argument('--endpoint-name', dest='endpoint_name') - parser.add_argument('--project', dest='project') - parser.add_argument('--region', dest='region') - parser.add_argument('--bq-data-uri', dest='bq_data_uri') - parser.add_argument('--bucket-name', dest='bucket_name') - parser.add_argument('--email', dest='email') - parser.add_argument('--service-account', dest='service_account') - args = parser.parse_args() - return args - - -def create_monitoring( - monitoring_name: str, - project_id: str, - region: str, - endpoint: Input[Model], - bq_data_uri: str, - bucket_name: str, - email: str, - service_account: str, -): - from google.cloud.aiplatform import model_monitoring - from google.cloud import aiplatform - from google.cloud import bigquery - from google.cloud import storage - import time - aiplatform.init(service_account=service_account) - list_monitors = aiplatform.ModelDeploymentMonitoringJob.list(filter=f'display_name="{monitoring_name}"', project=project_id) - if len(list_monitors) == 0: - alerting_config = model_monitoring.EmailAlertConfig( - user_emails=[email], enable_logging=True - ) - # schedule config - MONITOR_INTERVAL = 1 - schedule_config = model_monitoring.ScheduleConfig(monitor_interval=MONITOR_INTERVAL) - # sampling strategy - SAMPLE_RATE = 0.5 - logging_sampling_strategy = model_monitoring.RandomSampleConfig(sample_rate=SAMPLE_RATE) - # drift config - DRIFT_THRESHOLD_VALUE = 0.05 - DRIFT_THRESHOLDS = { - "capital_gain": DRIFT_THRESHOLD_VALUE, - "capital_loss": DRIFT_THRESHOLD_VALUE, - } - drift_config = model_monitoring.DriftDetectionConfig(drift_thresholds=DRIFT_THRESHOLDS) - # Skew config - DATASET_BQ_URI = bq_data_uri - TARGET = "income_bracket" - SKEW_THRESHOLD_VALUE = 0.5 - SKEW_THRESHOLDS = { - "capital_gain": SKEW_THRESHOLD_VALUE, - "capital_loss": SKEW_THRESHOLD_VALUE, - } - skew_config = model_monitoring.SkewDetectionConfig( - data_source=DATASET_BQ_URI, skew_thresholds=SKEW_THRESHOLDS, target_field=TARGET - ) - # objective config out of skew and drift configs - objective_config = model_monitoring.ObjectiveConfig( - skew_detection_config=skew_config, - drift_detection_config=drift_config, - explanation_config=None, - ) - - bqclient = bigquery.Client() - table = bigquery.TableReference.from_string(DATASET_BQ_URI[5:]) - bq_table = bqclient.get_table(table) - yaml = """type: object - properties: - """ - schema = bq_table.schema - for feature in schema: - if feature.name == TARGET: - continue - if feature.field_type == "STRING": - f_type = "string" - else: - f_type = "float" - yaml += f""" {feature.name}: - type: {f_type} - """ - - yaml += """required: - """ - for feature in schema: - if feature.name == "income_bracket": - continue - yaml += f"""- {feature.name} - """ - with open("monitoring_schema.yaml", "w") as f: - f.write(yaml) - storage_client = storage.Client() - bucket = storage_client.bucket(bucket_name) - blob = bucket.blob("monitoring_schema.yaml") - blob.upload_from_string(yaml) - - monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create( - display_name=monitoring_name, - project=project_id, - location=region, - endpoint=endpoint.metadata['resourceName'], - logging_sampling_strategy=logging_sampling_strategy, - schedule_config=schedule_config, - alert_config=alerting_config, - objective_configs=objective_config, - analysis_instance_schema_uri=f"{bucket_name}/monitoring_schema.yaml", - ) - - -if __name__=="__main__": - args = get_args() - create_monitoring( - monitoring_name=args.monitoring_name, - project_id=args.project, - region=args.region, - bq_data_uri=args.bq_data_uri, - bucket_name=args.bucket_name, - email=args.email, - service_account=args.service_account, -) \ No newline at end of file diff --git a/7-composer/components/.ipynb_checkpoints/training-checkpoint.py b/7-composer/components/.ipynb_checkpoints/training-checkpoint.py deleted file mode 100644 index 4a97e68b..00000000 --- a/7-composer/components/.ipynb_checkpoints/training-checkpoint.py +++ /dev/null @@ -1,196 +0,0 @@ -import argparse -from typing import Optional, Dict, List -from google.cloud import aiplatform -from google.cloud.aiplatform.metadata.schema.system import artifact_schema -from google.cloud.aiplatform.metadata.schema.system import execution_schema - -def create_artifact_sample( - project: str, - location: str, - uri: Optional[str] = None, - artifact_id: Optional[str] = None, - display_name: Optional[str] = None, - schema_version: Optional[str] = None, - description: Optional[str] = None, - metadata: Optional[Dict] = None, -): - system_artifact_schema = artifact_schema.Artifact( - uri=uri, - artifact_id=artifact_id, - display_name=display_name, - schema_version=schema_version, - description=description, - metadata=metadata, - ) - return system_artifact_schema.create(project=project, location=location,) - -def create_execution_sample( - display_name: str, - input_artifacts: List[aiplatform.Artifact], - output_artifacts: List[aiplatform.Artifact], - project: str, - location: str, - execution_id: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, - schema_version: Optional[str] = None, - description: Optional[str] = None, -): - aiplatform.init(project=project, location=location) - - with execution_schema.ContainerExecution( - display_name=display_name, - execution_id=execution_id, - metadata=metadata, - schema_version=schema_version, - description=description, - ).create() as execution: - execution.assign_input_artifacts(input_artifacts) - execution.assign_output_artifacts(output_artifacts) - return execution - - -def get_args(): - parser = argparse.ArgumentParser() - parser.add_argument('--project', dest='project') - parser.add_argument('--bq-table', dest='table_id') - parser.add_argument('--bq-dataset', dest='dataset_id') - parser.add_argument('--tb-log-dir', dest='tb_log_dir') - parser.add_argument('--epochs', dest='epochs') - parser.add_argument('--batch_size', dest='batch_size') - parser.add_argument('--lr', dest='lr') - args = parser.parse_args() - return args - - - -def custom_train_model( - project: str, - table: str, - dataset: str, - tb_log_dir: str, - model: Output[Model], - epochs: int = 5, - batch_size: int = 32, - lr: float = 0.01, # not used here but can be passed to an optimizer -): - - from tensorflow.python.framework import ops - from tensorflow.python.framework import dtypes - from tensorflow_io.bigquery import BigQueryClient - from tensorflow_io.bigquery import BigQueryReadSession - from tensorflow import feature_column - from google.cloud import bigquery - - import tensorflow as tf - CSV_SCHEMA = [ - bigquery.SchemaField("age", "FLOAT64"), - bigquery.SchemaField("workclass", "STRING"), - bigquery.SchemaField("fnlwgt", "FLOAT64"), - bigquery.SchemaField("education", "STRING"), - bigquery.SchemaField("education_num", "FLOAT64"), - bigquery.SchemaField("marital_status", "STRING"), - bigquery.SchemaField("occupation", "STRING"), - bigquery.SchemaField("relationship", "STRING"), - bigquery.SchemaField("race", "STRING"), - bigquery.SchemaField("gender", "STRING"), - bigquery.SchemaField("capital_gain", "FLOAT64"), - bigquery.SchemaField("capital_loss", "FLOAT64"), - bigquery.SchemaField("hours_per_week", "FLOAT64"), - bigquery.SchemaField("native_country", "STRING"), - bigquery.SchemaField("income_bracket", "STRING"), - ] - - UNUSED_COLUMNS = ["fnlwgt", "education_num"] - def transform_row(row_dict): - # Trim all string tensors - trimmed_dict = { column: - (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) - for (column,tensor) in row_dict.items() - } - # Extract feature column - income_bracket = trimmed_dict.pop('income_bracket') - # Convert feature column to 0.0/1.0 - income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), - lambda: tf.constant(1.0), - lambda: tf.constant(0.0)) - return (trimmed_dict, income_bracket_float) - - def read_bigquery(table_name, dataset=dataset): - tensorflow_io_bigquery_client = BigQueryClient() - read_session = tensorflow_io_bigquery_client.read_session( - "projects/" + project, - project, table, dataset, - list(field.name for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - list(dtypes.double if field.field_type == 'FLOAT64' - else dtypes.string for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - requested_streams=2) - - dataset = read_session.parallel_read_rows() - transformed_ds = dataset.map(transform_row) - return transformed_ds - - training_ds = read_bigquery(table).shuffle(10000).batch(batch_size) - - - - feature_columns = [] - def get_categorical_feature_values(column): - query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, project, dataset, table) - client = bigquery.Client(project=project) - dataset_ref = client.dataset(dataset) - job_config = bigquery.QueryJobConfig() - query_job = client.query(query, job_config=job_config) - result = query_job.to_dataframe() - return result.values[:,0] - - # numeric cols - for header in ['capital_gain', 'capital_loss', 'hours_per_week']: - feature_columns.append(feature_column.numeric_column(header)) - - # categorical cols - for header in ['workclass', 'marital_status', 'occupation', 'relationship', - 'race', 'native_country', 'education']: - categorical_feature = feature_column.categorical_column_with_vocabulary_list( - header, get_categorical_feature_values(header)) - categorical_feature_one_hot = feature_column.indicator_column(categorical_feature) - feature_columns.append(categorical_feature_one_hot) - - # bucketized cols - age = feature_column.numeric_column('age') - age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) - feature_columns.append(age_buckets) - - feature_layer = tf.keras.layers.DenseFeatures(feature_columns) - - - Dense = tf.keras.layers.Dense - keras_model = tf.keras.Sequential( - [ - feature_layer, - Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), - Dense(75, activation=tf.nn.relu), - Dense(50, activation=tf.nn.relu), - Dense(25, activation=tf.nn.relu), - Dense(1, activation=tf.nn.sigmoid) - ]) - - tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir) - # Compile Keras model - keras_model.compile(loss='binary_crossentropy', metrics=['accuracy']) - keras_model.fit(training_ds, epochs=epochs, callbacks=[tensorboard]) - keras_model.save(model.path) - - -if __name__=="__main__": - args = get_args() - custom_train_model( - project=args.project, - table=args.table_id, - dataset=args.dataset_id, - tb_log_dir=args.tb_log_dir, - epochs=args.epochs, - batch_size=args.batch_size, - lr=args.lr, # not used here but can be passed to an optimizer -) \ No newline at end of file diff --git a/7-composer/components/deployment.py b/7-composer/components/deployment.py index ade67884..39a99491 100644 --- a/7-composer/components/deployment.py +++ b/7-composer/components/deployment.py @@ -1,5 +1,24 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# import argparse -from common.components.utils import create_artifact_sample, create_execution_sample, list_artifact_sample +from common.components.utils import ( + create_artifact_sample, + create_execution_sample, + list_artifact_sample +) + def get_args(): parser = argparse.ArgumentParser() @@ -33,17 +52,18 @@ def deploy_model( # vertex_model: Output[Model], # vertex_endpoint: Output[Model] ): - from google.cloud import aiplatform + from google.cloud import aiplatform aiplatform.init(service_account=service_account) + def create_endpoint(): endpoints = aiplatform.Endpoint.list( - filter=f'display_name="{endpoint_name}"', - order_by='create_time desc', - project=project_id, - location=region, + filter=f'display_name="{endpoint_name}"', + order_by='create_time desc', + project=project_id, + location=region, ) if len(endpoints) > 0: - endpoint = endpoints[0] # most recently created + endpoint = endpoints[0] # most recently created endpoint_artifact = list_artifact_sample( project=project_id, location=region, @@ -57,69 +77,75 @@ def create_endpoint(): project=project_id, location=region, encryption_spec_key_name=encryption_keyname, - ) + ) + metadata = { + 'create_time': endpoint.create_time.strftime("%D %H:%m:%s"), + 'display_namme': endpoint.display_name, + 'resource_name': endpoint.resource_name, + 'update_time': endpoint.update_time.strftime("%D %H:%m:%s") + } endpoint_artifact = create_artifact_sample( project=project_id, location=region, uri=endpoint.resource_name, display_name='composer_modelendpoint', description='model endpoint created via composer dag', - metadata={'create_time': endpoint.create_time.strftime("%D %H:%m:%s"), - 'display_namme': endpoint.display_name, - 'resource_name': endpoint.resource_name, - 'update_time': endpoint.update_time.strftime("%D %H:%m:%s")} + metadata=metadata ) return endpoint, endpoint_artifact endpoint, endpoint_artifact = create_endpoint() - def upload_model(): listed_model = aiplatform.Model.list( - filter=f'display_name="{model_name}"', - project=project_id, - location=region, + filter=f'display_name="{model_name}"', + project=project_id, + location=region, ) if len(listed_model) > 0: model_version = listed_model[0] model_upload = aiplatform.Model.upload( - display_name=model_name, - parent_model=model_version.resource_name, - artifact_uri=model_dir, - serving_container_image_uri=serving_container_image_uri, - location=region, - project=project_id, - encryption_spec_key_name=encryption_keyname, + display_name=model_name, + parent_model=model_version.resource_name, + artifact_uri=model_dir, + serving_container_image_uri=serving_container_image_uri, + location=region, + project=project_id, + encryption_spec_key_name=encryption_keyname, ) else: model_upload = aiplatform.Model.upload( - display_name=model_name, - artifact_uri=model_dir, - serving_container_image_uri=serving_container_image_uri, - location=region, - project=project_id, - encryption_spec_key_name=encryption_keyname + display_name=model_name, + artifact_uri=model_dir, + serving_container_image_uri=serving_container_image_uri, + location=region, + project=project_id, + encryption_spec_key_name=encryption_keyname ) - + custom_filter = "display_name=\"composer_trained_census_model\"" model_artifact = list_artifact_sample( project=project_id, location=region, - display_name_filter="display_name=\"composer_trained_census_model\"", + display_name_filter=custom_filter, order_by="LAST_UPDATE_TIME desc" )[0] + metadata = { + 'create_time': model_upload.create_time.strftime("%D %H:%m:%s"), + 'container_spec': model_upload.container_spec.image_uri, + 'resource_name': model_upload.resource_name, + 'update_time': model_upload.update_time.strftime("%D %H:%m:%s"), + 'version_id': model_upload.version_id + } vertexmodel_artifact = create_artifact_sample( - project=project_id, - location=region, - uri=model_upload.uri, - display_name='composer_vertexmodel', - description='uploaded vertex model via composer dag', - metadata={'create_time': model_upload.create_time.strftime("%D %H:%m:%s"), - 'container_spec': model_upload.container_spec.image_uri, - 'resource_name': model_upload.resource_name, - 'update_time': model_upload.update_time.strftime("%D %H:%m:%s"), - 'version_id': model_upload.version_id}, - ) - model_upload_event = create_execution_sample( + project=project_id, + location=region, + uri=model_upload.uri, + display_name='composer_vertexmodel', + description='uploaded vertex model via composer dag', + metadata=metadata, + ) + # model_upload_event + _ = create_execution_sample( display_name='composer_model_upload', input_artifacts=[model_artifact], output_artifacts=[vertexmodel_artifact], @@ -128,14 +154,14 @@ def upload_model(): description='Composer event uploading model to vertex', ) return model_upload, vertexmodel_artifact - + uploaded_model, vertexmodel_artifact = upload_model() - - + def deploy_to_endpoint(model, endpoint): deployed_models = endpoint.list_models() if len(deployed_models) > 0: - latest_model = sorted(deployed_models, key=lambda x: float(x.model_version_id), reverse=True)[0] + latest_model = sorted(deployed_models, key=lambda x: float( + x.model_version_id), reverse=True)[0] latest_model_id = latest_model.id deployed_endpoint = uploaded_model.deploy( # machine_type="n1-standard-4", @@ -149,26 +175,31 @@ def deploy_to_endpoint(model, endpoint): ) else: deployed_endpoint = uploaded_model.deploy( - # machine_type="n1-standard-4", - endpoint=endpoint, - traffic_split={"0": 100}, - min_replica_count=min_nodes, - max_replica_count=max_nodes, - deployed_model_display_name=model_name, - encryption_spec_key_name=encryption_keyname - # service_account="compute default" - ) + # machine_type="n1-standard-4", + endpoint=endpoint, + traffic_split={"0": 100}, + min_replica_count=min_nodes, + max_replica_count=max_nodes, + deployed_model_display_name=model_name, + encryption_spec_key_name=encryption_keyname + # service_account="compute default" + ) + create_time = deployed_endpoint.create_time.strftime("%D %H:%m:%s") + update_time = deployed_endpoint.update_time.strftime("%D %H:%m:%s") + metadata = { + 'create_time': create_time, + 'display_namme': deployed_endpoint.display_name, + 'resource_name': deployed_endpoint.resource_name, + 'update_time': update_time, + 'traffic_split': deployed_endpoint.traffic_split + } deployed_endpoint_artifact = create_artifact_sample( project=project_id, location=region, uri=deployed_endpoint.resource_name, display_name="composer_deployed_endpoint", description='The endpoint with deployed model via composer', - metadata={'create_time': deployed_endpoint.create_time.strftime("%D %H:%m:%s"), - 'display_namme': deployed_endpoint.display_name, - 'resource_name': deployed_endpoint.resource_name, - 'update_time': deployed_endpoint.update_time.strftime("%D %H:%m:%s"), - 'traffic_split': deployed_endpoint.traffic_split} + metadata=metadata ) return deployed_endpoint_artifact @@ -183,8 +214,7 @@ def deploy_to_endpoint(model, endpoint): ) - -if __name__=="__main__": +if __name__ == "__main__": args = get_args() deploy_model( serving_container_image_uri=args.serving_container, @@ -197,4 +227,4 @@ def deploy_to_endpoint(model, endpoint): min_nodes=args.min_nodes, max_nodes=args.max_nodes, service_account=args.service_account, -) + ) diff --git a/7-composer/components/eval.py b/7-composer/components/eval.py index f5028bef..958ed538 100644 --- a/7-composer/components/eval.py +++ b/7-composer/components/eval.py @@ -1,4 +1,19 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# import argparse +# flake8: noqa from typing import Optional, Dict, List, Any from google.cloud import aiplatform from google.cloud.aiplatform.metadata.schema.system import artifact_schema @@ -6,8 +21,6 @@ from common.components.utils import create_artifact_sample, create_execution_sample, list_artifact_sample - - def get_args(): parser = argparse.ArgumentParser() parser.add_argument('--project', dest='project') @@ -20,6 +33,8 @@ def get_args(): return args # evaluation component + + def custom_eval_model( model_dir: str, project: str, @@ -34,53 +49,54 @@ def custom_eval_model( from tensorflow_io.bigquery import BigQueryReadSession from tensorflow import feature_column from google.cloud import bigquery - - + import tensorflow as tf CSV_SCHEMA = [ - bigquery.SchemaField("age", "FLOAT64"), - bigquery.SchemaField("workclass", "STRING"), - bigquery.SchemaField("fnlwgt", "FLOAT64"), - bigquery.SchemaField("education", "STRING"), - bigquery.SchemaField("education_num", "FLOAT64"), - bigquery.SchemaField("marital_status", "STRING"), - bigquery.SchemaField("occupation", "STRING"), - bigquery.SchemaField("relationship", "STRING"), - bigquery.SchemaField("race", "STRING"), - bigquery.SchemaField("gender", "STRING"), - bigquery.SchemaField("capital_gain", "FLOAT64"), - bigquery.SchemaField("capital_loss", "FLOAT64"), - bigquery.SchemaField("hours_per_week", "FLOAT64"), - bigquery.SchemaField("native_country", "STRING"), - bigquery.SchemaField("income_bracket", "STRING"), - ] + bigquery.SchemaField("age", "FLOAT64"), + bigquery.SchemaField("workclass", "STRING"), + bigquery.SchemaField("fnlwgt", "FLOAT64"), + bigquery.SchemaField("education", "STRING"), + bigquery.SchemaField("education_num", "FLOAT64"), + bigquery.SchemaField("marital_status", "STRING"), + bigquery.SchemaField("occupation", "STRING"), + bigquery.SchemaField("relationship", "STRING"), + bigquery.SchemaField("race", "STRING"), + bigquery.SchemaField("gender", "STRING"), + bigquery.SchemaField("capital_gain", "FLOAT64"), + bigquery.SchemaField("capital_loss", "FLOAT64"), + bigquery.SchemaField("hours_per_week", "FLOAT64"), + bigquery.SchemaField("native_country", "STRING"), + bigquery.SchemaField("income_bracket", "STRING"), + ] UNUSED_COLUMNS = ["fnlwgt", "education_num"] + def transform_row(row_dict): # Trim all string tensors - trimmed_dict = { column: - (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) - for (column,tensor) in row_dict.items() - } + trimmed_dict = {column: + (tf.strings.strip(tensor) + if tensor.dtype == 'string' else tensor) + for (column, tensor) in row_dict.items() + } # Extract feature column income_bracket = trimmed_dict.pop('income_bracket') # Convert feature column to 0.0/1.0 - income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), - lambda: tf.constant(1.0), - lambda: tf.constant(0.0)) + income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), + lambda: tf.constant(1.0), + lambda: tf.constant(0.0)) return (trimmed_dict, income_bracket_float) def read_bigquery(table_name, dataset=dataset): tensorflow_io_bigquery_client = BigQueryClient() read_session = tensorflow_io_bigquery_client.read_session( - "projects/" + project, - project, table, dataset, - list(field.name for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - list(dtypes.double if field.field_type == 'FLOAT64' - else dtypes.string for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - requested_streams=2) + "projects/" + project, + project, table, dataset, + list(field.name for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + list(dtypes.double if field.field_type == 'FLOAT64' + else dtypes.string for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + requested_streams=2) dataset = read_session.parallel_read_rows() transformed_ds = dataset.map(transform_row) @@ -90,7 +106,7 @@ def read_bigquery(table_name, dataset=dataset): keras_model = tf.keras.models.load_model(model_dir) tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir) loss, accuracy = keras_model.evaluate(eval_ds, callbacks=[tensorboard]) - + metric = create_artifact_sample( project=project, location='us-central1', @@ -98,17 +114,17 @@ def read_bigquery(table_name, dataset=dataset): description='Eval metrics produced from composer dag', metadata={'accuracy': accuracy} ) - - model_artifact = list_artifact_sample(project=project, - location='us-central1', - display_name_filter="display_name=\"composer_trained_census_model\"", - order_by="LAST_UPDATE_TIME desc")[0] - - data_artifact = list_artifact_sample(project=project, - location='us-central1', - display_name_filter="display_name=\"composer_training_data\"", - order_by="LAST_UPDATE_TIME desc")[0] - + + model_artifact = list_artifact_sample(project=project, + location='us-central1', + display_name_filter="display_name=\"composer_trained_census_model\"", + order_by="LAST_UPDATE_TIME desc")[0] + + data_artifact = list_artifact_sample(project=project, + location='us-central1', + display_name_filter="display_name=\"composer_training_data\"", + order_by="LAST_UPDATE_TIME desc")[0] + execution_event = create_execution_sample( display_name='evaluation_execution_composer', input_artifacts=[data_artifact, model_artifact], @@ -125,15 +141,13 @@ def read_bigquery(table_name, dataset=dataset): return dep_decision - - -if __name__=="__main__": +if __name__ == "__main__": args = get_args() custom_eval_model( - project=args.project, - table=args.table_id, - dataset=args.dataset_id, - tb_log_dir=args.tb_log_dir, - model_dir=args.model_dir, - batch_size=args.batch_size, -) \ No newline at end of file + project=args.project, + table=args.table_id, + dataset=args.dataset_id, + tb_log_dir=args.tb_log_dir, + model_dir=args.model_dir, + batch_size=args.batch_size, + ) diff --git a/7-composer/components/monitoring.py b/7-composer/components/monitoring.py index b3f03f8f..b2e875e1 100644 --- a/7-composer/components/monitoring.py +++ b/7-composer/components/monitoring.py @@ -1,6 +1,22 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# flake8: noqa import argparse from common.components.utils import create_artifact_sample, create_execution_sample, list_artifact_sample + def get_args(): parser = argparse.ArgumentParser() parser.add_argument('--monitoring-name', dest='monitoring_name') @@ -33,29 +49,35 @@ def create_monitoring( from collections import OrderedDict import time import yaml - def ordered_dict_representer(self, value): # can be a lambda if that's what you prefer + + # can be a lambda if that's what you prefer + def ordered_dict_representer(self, value): return self.represent_mapping('tag:yaml.org,2002:map', value.items()) yaml.add_representer(OrderedDict, ordered_dict_representer) - + aiplatform.init(service_account=service_account) - list_monitors = aiplatform.ModelDeploymentMonitoringJob.list(filter=f'state="JOB_STATE_SUCCEEDED" AND display_name="{monitoring_name}"', project=project_id) + list_monitors = aiplatform.ModelDeploymentMonitoringJob.list( + filter=f'state="JOB_STATE_SUCCEEDED" AND display_name="{monitoring_name}"', project=project_id) if len(list_monitors) == 0: alerting_config = model_monitoring.EmailAlertConfig( user_emails=[email], enable_logging=True ) # schedule config MONITOR_INTERVAL = 1 - schedule_config = model_monitoring.ScheduleConfig(monitor_interval=MONITOR_INTERVAL) + schedule_config = model_monitoring.ScheduleConfig( + monitor_interval=MONITOR_INTERVAL) # sampling strategy - SAMPLE_RATE = 0.5 - logging_sampling_strategy = model_monitoring.RandomSampleConfig(sample_rate=SAMPLE_RATE) + SAMPLE_RATE = 0.5 + logging_sampling_strategy = model_monitoring.RandomSampleConfig( + sample_rate=SAMPLE_RATE) # drift config DRIFT_THRESHOLD_VALUE = 0.05 DRIFT_THRESHOLDS = { "capital_gain": DRIFT_THRESHOLD_VALUE, "capital_loss": DRIFT_THRESHOLD_VALUE, } - drift_config = model_monitoring.DriftDetectionConfig(drift_thresholds=DRIFT_THRESHOLDS) + drift_config = model_monitoring.DriftDetectionConfig( + drift_thresholds=DRIFT_THRESHOLDS) # Skew config DATASET_BQ_URI = bq_data_uri TARGET = "income_bracket" @@ -93,7 +115,7 @@ def ordered_dict_representer(self, value): # can be a lambda if that's what you schemayaml['properties'][feature.name] = {"type": f_type} if feature.name not in ["fnlwgt", "education_num"]: schemayaml['required'].append(feature.name) - + with open("monitoring_schema.yaml", "w") as yaml_file: yaml.dump(schemayaml, yaml_file, default_flow_style=False) storage_client = storage.Client() @@ -119,7 +141,8 @@ def ordered_dict_representer(self, value): # can be a lambda if that's what you encryption_spec_key_name=encryption_keyname, ) -if __name__=="__main__": + +if __name__ == "__main__": args = get_args() create_monitoring( monitoring_name=args.monitoring_name, @@ -129,4 +152,4 @@ def ordered_dict_representer(self, value): # can be a lambda if that's what you bucket_name=args.bucket_name, email=args.email, service_account=args.service_account, -) \ No newline at end of file + ) diff --git a/7-composer/components/training.py b/7-composer/components/training.py index b094ec11..c17fcc1d 100644 --- a/7-composer/components/training.py +++ b/7-composer/components/training.py @@ -1,3 +1,18 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# flake8: noqa import argparse from typing import Optional, Dict, List, Any from google.cloud import aiplatform @@ -17,7 +32,6 @@ def get_args(): return args - def custom_train_model( project: str, table: str, @@ -26,61 +40,63 @@ def custom_train_model( model_dir: str, epochs: int = 5, batch_size: int = 32, - lr: float = 0.01, # not used here but can be passed to an optimizer + lr: float = 0.01, # not used here but can be passed to an optimizer ): - + from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes from tensorflow_io.bigquery import BigQueryClient from tensorflow_io.bigquery import BigQueryReadSession from tensorflow import feature_column from google.cloud import bigquery - + import tensorflow as tf CSV_SCHEMA = [ - bigquery.SchemaField("age", "FLOAT64"), - bigquery.SchemaField("workclass", "STRING"), - bigquery.SchemaField("fnlwgt", "FLOAT64"), - bigquery.SchemaField("education", "STRING"), - bigquery.SchemaField("education_num", "FLOAT64"), - bigquery.SchemaField("marital_status", "STRING"), - bigquery.SchemaField("occupation", "STRING"), - bigquery.SchemaField("relationship", "STRING"), - bigquery.SchemaField("race", "STRING"), - bigquery.SchemaField("gender", "STRING"), - bigquery.SchemaField("capital_gain", "FLOAT64"), - bigquery.SchemaField("capital_loss", "FLOAT64"), - bigquery.SchemaField("hours_per_week", "FLOAT64"), - bigquery.SchemaField("native_country", "STRING"), - bigquery.SchemaField("income_bracket", "STRING"), - ] + bigquery.SchemaField("age", "FLOAT64"), + bigquery.SchemaField("workclass", "STRING"), + bigquery.SchemaField("fnlwgt", "FLOAT64"), + bigquery.SchemaField("education", "STRING"), + bigquery.SchemaField("education_num", "FLOAT64"), + bigquery.SchemaField("marital_status", "STRING"), + bigquery.SchemaField("occupation", "STRING"), + bigquery.SchemaField("relationship", "STRING"), + bigquery.SchemaField("race", "STRING"), + bigquery.SchemaField("gender", "STRING"), + bigquery.SchemaField("capital_gain", "FLOAT64"), + bigquery.SchemaField("capital_loss", "FLOAT64"), + bigquery.SchemaField("hours_per_week", "FLOAT64"), + bigquery.SchemaField("native_country", "STRING"), + bigquery.SchemaField("income_bracket", "STRING"), + ] UNUSED_COLUMNS = ["fnlwgt", "education_num"] + def transform_row(row_dict): # Trim all string tensors - trimmed_dict = { column: - (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) - for (column,tensor) in row_dict.items() - } + trimmed_dict = {column: + (tf.strings.strip(tensor) + if tensor.dtype == 'string' else tensor) + for (column, tensor) in row_dict.items() + } # Extract feature column income_bracket = trimmed_dict.pop('income_bracket') # Convert feature column to 0.0/1.0 - income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), - lambda: tf.constant(1.0), - lambda: tf.constant(0.0)) + income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), + lambda: tf.constant(1.0), + lambda: tf.constant(0.0)) return (trimmed_dict, income_bracket_float) def read_bigquery(table_name, dataset=dataset): tensorflow_io_bigquery_client = BigQueryClient() read_session = tensorflow_io_bigquery_client.read_session( - "projects/" + project, - project, table, dataset, - list(field.name for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - list(dtypes.double if field.field_type == 'FLOAT64' - else dtypes.string for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - requested_streams=2) + "projects/" + project, + project, table, dataset, + list(field.name for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + list(dtypes.double if field.field_type == 'FLOAT64' + else dtypes.string for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + requested_streams=2) dataset = read_session.parallel_read_rows() transformed_ds = dataset.map(transform_row) @@ -88,17 +104,17 @@ def read_bigquery(table_name, dataset=dataset): training_ds = read_bigquery(table).shuffle(10000).batch(batch_size) - - feature_columns = [] + def get_categorical_feature_values(column): - query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, project, dataset, table) + query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format( + column, project, dataset, table) client = bigquery.Client(project=project) dataset_ref = client.dataset(dataset) job_config = bigquery.QueryJobConfig() query_job = client.query(query, job_config=job_config) result = query_job.to_dataframe() - return result.values[:,0] + return result.values[:, 0] # numeric cols for header in ['capital_gain', 'capital_loss', 'hours_per_week']: @@ -109,34 +125,35 @@ def get_categorical_feature_values(column): 'race', 'native_country', 'education']: categorical_feature = feature_column.categorical_column_with_vocabulary_list( header, get_categorical_feature_values(header)) - categorical_feature_one_hot = feature_column.indicator_column(categorical_feature) + categorical_feature_one_hot = feature_column.indicator_column( + categorical_feature) feature_columns.append(categorical_feature_one_hot) # bucketized cols age = feature_column.numeric_column('age') - age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) + age_buckets = feature_column.bucketized_column( + age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) feature_columns.append(age_buckets) feature_layer = tf.keras.layers.DenseFeatures(feature_columns) - Dense = tf.keras.layers.Dense keras_model = tf.keras.Sequential( - [ - feature_layer, - Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), - Dense(75, activation=tf.nn.relu), - Dense(50, activation=tf.nn.relu), - Dense(25, activation=tf.nn.relu), - Dense(1, activation=tf.nn.sigmoid) - ]) + [ + feature_layer, + Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), + Dense(75, activation=tf.nn.relu), + Dense(50, activation=tf.nn.relu), + Dense(25, activation=tf.nn.relu), + Dense(1, activation=tf.nn.sigmoid) + ]) tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir) # Compile Keras model keras_model.compile(loss='binary_crossentropy', metrics=['accuracy']) keras_model.fit(training_ds, epochs=epochs, callbacks=[tensorboard]) keras_model.save(model_dir) - + data_artifact = create_artifact_sample( project=project, location="us-central1", @@ -144,7 +161,7 @@ def get_categorical_feature_values(column): display_name='composer_training_data', description='census training data for composer', metadata={'table': table, 'dataset': dataset} - ) + ) model_artifact = create_artifact_sample( project=project, location="us-central1", @@ -152,7 +169,7 @@ def get_categorical_feature_values(column): display_name='composer_trained_census_model', description='census model trained through composer', metadata={'model_dir': model_dir} - ) + ) execution_artifact = create_execution_sample( display_name='composer_model_training', @@ -162,16 +179,16 @@ def get_categorical_feature_values(column): location='us-central1', description='execution representing model training via composer', ) - -if __name__=="__main__": + +if __name__ == "__main__": args = get_args() custom_train_model( - project=args.project, - table=args.table_id, - dataset=args.dataset_id, - tb_log_dir=args.tb_log_dir, - epochs=args.epochs, - batch_size=args.batch_size, - lr=args.lr, # not used here but can be passed to an optimizer -) \ No newline at end of file + project=args.project, + table=args.table_id, + dataset=args.dataset_id, + tb_log_dir=args.tb_log_dir, + epochs=args.epochs, + batch_size=args.batch_size, + lr=args.lr, # not used here but can be passed to an optimizer + ) diff --git a/7-composer/components/utils.py b/7-composer/components/utils.py index c39e37db..ba72d7a5 100644 --- a/7-composer/components/utils.py +++ b/7-composer/components/utils.py @@ -1,8 +1,23 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# from google.cloud.aiplatform.metadata.schema.system import artifact_schema from google.cloud.aiplatform.metadata.schema.system import execution_schema from google.cloud import aiplatform from typing import Optional, Dict, List, Any + def create_artifact_sample( project: str, location: str, @@ -23,6 +38,7 @@ def create_artifact_sample( ) return system_artifact_schema.create(project=project, location=location,) + def create_execution_sample( display_name: str, input_artifacts: List[aiplatform.Artifact], @@ -61,4 +77,4 @@ def list_artifact_sample( return aiplatform.Artifact.list( filter=combined_filters, order_by=order_by, - ) \ No newline at end of file + ) diff --git a/7-composer/dag.py b/7-composer/dag.py index d9bc7476..949fb8dc 100644 --- a/7-composer/dag.py +++ b/7-composer/dag.py @@ -1,3 +1,18 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# flake8: noqa from datetime import timedelta, datetime from airflow import DAG from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator @@ -12,45 +27,47 @@ from common.components.deployment import deploy_model from common.components.monitoring import create_monitoring -REGION = "us-central1" -BUCKET_URI = "gs://testairflowpipe" -PROJECT_ID = "majid-test-407120" -DATASET_ID = 'census_dataset_composer' -TRAINING_TABLE_ID = 'census_train_table_composer' -EVAL_TABLE_ID = 'census_eval_table_composer' -RUNNER = "DataflowRunner" -REGION = "us-central1" -JOB_NAME = "census-ingest-composer" +REGION = "us-central1" +BUCKET_URI = "gs://testairflowpipe" +PROJECT_ID = "majid-test-407120" +DATASET_ID = 'census_dataset_composer' +TRAINING_TABLE_ID = 'census_train_table_composer' +EVAL_TABLE_ID = 'census_eval_table_composer' +RUNNER = "DataflowRunner" +REGION = "us-central1" +JOB_NAME = "census-ingest-composer" default_kms_key_name = "projects/prj-d-kms-cgvl/locations/us-central1/keyRings/sample-keyring/cryptoKeys/prj-d-bu3machine-learning" -deployment_image = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest" -service_account = "728034955955-compute@developer.gserviceaccount.com" +deployment_image = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest" +service_account = "728034955955-compute@developer.gserviceaccount.com" prod_service_account = "728034955955-compute@developer.gserviceaccount.com" default_args = { - 'owner' : 'airflow', - 'depends_on_past' : False, - 'start_date' : datetime(2023, 1, 1), + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), 'email_on_failure': False, - 'email_on_retry' : False, - 'retries' : 1, - 'retry_delay' : timedelta(minutes=5), + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), } dag = DAG( 'census_dag', default_args=default_args, description='census pipeline built with airlfow', - schedule_interval=timedelta(days=1), # Set the schedule interval (e.g., daily) + # Set the schedule interval (e.g., daily) + schedule_interval=timedelta(days=1), catchup=False, start_date=default_args['start_date'] ) bqOperator = BigQueryCreateEmptyDatasetOperator( - task_id = "bqtask", + task_id="bqtask", dataset_id=DATASET_ID, project_id=PROJECT_ID, location=REGION, - dataset_reference={"defaultEncryptionConfiguration":{"kmsKeyName": default_kms_key_name}}, + dataset_reference={"defaultEncryptionConfiguration": { + "kmsKeyName": default_kms_key_name}}, dag=dag, ) @@ -122,15 +139,14 @@ ) - training_op = PythonOperator( task_id='model_training', python_callable=custom_train_model, - op_kwargs={'project': PROJECT_ID, - 'table':TRAINING_TABLE_ID, - 'dataset': DATASET_ID, - 'tb_log_dir': f"{BUCKET_URI}/tblogs", - 'model_dir': f"{BUCKET_URI}/modelartifact",}, + op_kwargs={'project': PROJECT_ID, + 'table': TRAINING_TABLE_ID, + 'dataset': DATASET_ID, + 'tb_log_dir': f"{BUCKET_URI}/tblogs", + 'model_dir': f"{BUCKET_URI}/modelartifact", }, dag=dag ) @@ -139,22 +155,21 @@ provide_context=True, python_callable=custom_eval_model, op_kwargs={ - 'project': PROJECT_ID, - 'table':TRAINING_TABLE_ID, - 'dataset': DATASET_ID, - 'tb_log_dir': f"{BUCKET_URI}/tblogs", - 'model_dir': f"{BUCKET_URI}/modelartifact",}, + 'project': PROJECT_ID, + 'table': TRAINING_TABLE_ID, + 'dataset': DATASET_ID, + 'tb_log_dir': f"{BUCKET_URI}/tblogs", + 'model_dir': f"{BUCKET_URI}/modelartifact", }, dag=dag ) - deploy_op = PythonOperator( task_id='model_deployment', python_callable=deploy_model, op_kwargs={ 'serving_container_image_uri': deployment_image, - 'model_name':'composer_census_model', + 'model_name': 'composer_census_model', 'model_dir': f"{BUCKET_URI}/modelartifact", 'endpoint_name': 'composer_census_endpoint', 'project_id': PROJECT_ID, @@ -187,4 +202,5 @@ bqOperator >> traindata_ingest_op >> wait_for_traindata_ingest_op bqOperator >> evaldata_ingest_op >> wait_for_evaldata_ingest_op -[wait_for_traindata_ingest_op, wait_for_evaldata_ingest_op] >> training_op >> eval_op >> deploy_op >> monitoring_op \ No newline at end of file +[wait_for_traindata_ingest_op, + wait_for_evaldata_ingest_op] >> training_op >> eval_op >> deploy_op >> monitoring_op diff --git a/7-composer/monitoring_schema.yaml b/7-composer/monitoring_schema.yaml index 62fc59e3..7ae0f79f 100644 --- a/7-composer/monitoring_schema.yaml +++ b/7-composer/monitoring_schema.yaml @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# type: object properties: age: diff --git a/7-composer/requirements.txt b/7-composer/requirements.txt index 91d0a79d..443ba627 100644 --- a/7-composer/requirements.txt +++ b/7-composer/requirements.txt @@ -2,7 +2,7 @@ tensorflow-io-gcs-filesystem==0.25.0 tensorflow-io==0.25.0 google-cloud-bigquery<3.0.0,>=1.11.1 pandas==2.0.3 -db-dtypes==1.2.0 +db-dtypes==1.2.0 google-cloud-aiplatform==1.36.0 google-cloud-storage==2.14.0 protobuf==3.20.0 diff --git a/7-composer/us-central1-test-census-034e6abc-bucket/dags b/7-composer/us-central1-test-census-034e6abc-bucket/dags index 592c1a78..80d289fb 100644 --- a/7-composer/us-central1-test-census-034e6abc-bucket/dags +++ b/7-composer/us-central1-test-census-034e6abc-bucket/dags @@ -60,4 +60,4 @@ start_python_job = BeamRunPythonPipelineOperator( "wait_until_finished": False, }, dag=dag -) \ No newline at end of file +) diff --git a/7-vertexpipeline/Dockerfile b/7-vertexpipeline/Dockerfile index 3b3a06a6..b466e248 100644 --- a/7-vertexpipeline/Dockerfile +++ b/7-vertexpipeline/Dockerfile @@ -1,2 +1,16 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# FROM tensorflow/tensorflow:2.8.0 RUN pip install tensorflow-io==0.25.0 protobuf==3.20.0 google-cloud-bigquery==3.13.0 pandas==2.0.3 db-dtypes==1.2.0 google-cloud-aiplatform==1.36.0 google-cloud-storage==2.14.0 kfp google-cloud-pipeline-components diff --git a/7-vertexpipeline/Readme.md b/7-vertexpipeline/Readme.md index 11315b15..cc157d92 100644 --- a/7-vertexpipeline/Readme.md +++ b/7-vertexpipeline/Readme.md @@ -4,7 +4,7 @@ Machine learning pipeline from development to production # Use case This example illustrates the promotion of a a machine learning pipeline from an interactive tenant to a production tenant. The example specifically trains a model on a [UCI census dataset](%28https://archive.ics.uci.edu/dataset/20/census+income%29) for binary classification. -The steps in the vertex pipelines version of the pipeline are as follows. +The steps in the vertex pipelines version of the pipeline are as follows. *This pipeline is also replicated with airflow operators for cloud composer.* ## Bigquery dataset creation In the first step, a bigquery dataset is created using a bigquery operator offered by google as such: @@ -12,7 +12,7 @@ In the first step, a bigquery dataset is created using a bigquery operator offer create_bq_dataset_query = f""" CREATE SCHEMA IF NOT EXISTS {DATASET_ID} """ - + bq_dataset_op = BigqueryQueryJobOp( query=create_bq_dataset_query, project=project, @@ -35,7 +35,7 @@ Dataflow operator from google operators is used to ingest data raw data from a g bq_project=project, subnet=dataflow_subnet ).after(bq_dataset_op) - + dataflow_python_train_op = DataflowPythonJobOp( requirements_file_path=requirements_file_path, python_module_path=python_file_path, @@ -44,7 +44,7 @@ Dataflow operator from google operators is used to ingest data raw data from a g location=region, temp_location=f"{dataflow_temp_location}/train", ).after(dataflow_args_train) - + dataflow_wait_train_op = WaitGcpResourcesOp( gcp_resources=dataflow_python_train_op.outputs["gcp_resources"] ).after(dataflow_python_train_op) @@ -64,7 +64,7 @@ Once the data lands in the tables, the costume training process kick starts tb_log_dir=tb_log_dir, batch_size=batch_size ).after(dataflow_wait_train_op) - + ## Model evaluation @@ -158,8 +158,8 @@ The following method runs the pipeline. Note that a kms encryption key is suppli "service_account": deployment_config["service_account"], "prod_service_account": deployment_config["prod_service_account"], "monitoring_name": monitoring_config['name'], - "monitoring_email": monitoring_config['email'], - + "monitoring_email": monitoring_config['email'], + }, enable_caching=False, ) @@ -168,7 +168,7 @@ The following method runs the pipeline. Note that a kms encryption key is suppli # Promotion workflow -The interactive (dev) tenant is where the experimentation takes place. Once a training algorithm is chosen and the ML pipeline steps are outlined, the pipeline is created using vertex pipeline components (managed kubeflow). This ML pipeline is configured such that it runs in non-prod (staging), however, the deployment and the model monitoring steps take place in prod. This is to save training resources as the data is already available in non-prod (staging). +The interactive (dev) tenant is where the experimentation takes place. Once a training algorithm is chosen and the ML pipeline steps are outlined, the pipeline is created using vertex pipeline components (managed kubeflow). This ML pipeline is configured such that it runs in non-prod (staging), however, the deployment and the model monitoring steps take place in prod. This is to save training resources as the data is already available in non-prod (staging). > graph @@ -177,17 +177,17 @@ Once the data scientist creates their pipeline they can push (the pipeline.yaml - Install the dependencies - run the scripts to generate the pipeline.yaml file and run the the pipeline - upload the pipeline.yaml to the composer bucket on gcs (for scheduled runs) - + The first version of the of model is trained and deployed due to the cloud build trigger. However, to keep the model up to date with incoming data, a simple composer dag is configured to run the same pipeline on scheduled intervals. Note: For the first iteration, the pipeline.yaml file is generated and directly pushed to the repo. However, this can be done as a cloud build step as well in order to integrate security checks. ## Service accounts -Note that the is triggered by cloud build (for the first time) and cloud composer (for the scheduled run). Therefore, by default the respective service accounts are used for the vertex pipeline run. However, in our code we have configured two service accounts to run different steps. +Note that the is triggered by cloud build (for the first time) and cloud composer (for the scheduled run). Therefore, by default the respective service accounts are used for the vertex pipeline run. However, in our code we have configured two service accounts to run different steps. - The bigquery service agent on the non-prod project will need EncryptDecrypt permission on the kms key so that it can create the dataset using the CMEK key. - First, a non-prod service account to take care of components that run in non-prod (dataset creation, dataflow, training, and evaluation). This could simply be the default compute engine service account for the non-prod tenant. This service account needs write permission to upload the trained model from the non-prod bucket to the Vertex environment of prod. - - Another service account that has permissions on the prod tenant in order to deploy the model and the model monitoring job. This could simply be the default service account for the prod tenant. This service account will also need read permission on bigquery of non-prod where the data exists so that the monitoring job deployed by this service account in prod + - Another service account that has permissions on the prod tenant in order to deploy the model and the model monitoring job. This could simply be the default service account for the prod tenant. This service account will also need read permission on bigquery of non-prod where the data exists so that the monitoring job deployed by this service account in prod diff --git a/7-vertexpipeline/cloudbuild.yaml b/7-vertexpipeline/cloudbuild.yaml index 22a8c36e..f2f7a620 100644 --- a/7-vertexpipeline/cloudbuild.yaml +++ b/7-vertexpipeline/cloudbuild.yaml @@ -1,31 +1,45 @@ -steps: +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +steps: # upload dataflow src file to bucket - name: 'gcr.io/cloud-builders/gsutil' args: ['cp', '-r', './src', 'gs://bkt-n-ml-storage-akdv'] - + - name: 'gcr.io/cloud-builders/gsutil' args: ['cp', '-r', './data', 'gs://bkt-n-ml-storage-akdv'] - + # compile pipeline - name: 'us-central1-docker.pkg.dev/prj-c-bu3artifacts-5wdo/c-publish-artifacts/vertexpipeline:v2' entrypoint: 'python' args: ['compile_pipeline.py'] id: 'compile_job' - + # run pipeline - name: 'us-central1-docker.pkg.dev/prj-c-bu3artifacts-5wdo/c-publish-artifacts/vertexpipeline:v2' entrypoint: 'python' args: ['runpipeline.py'] id: 'run_job' waitFor: ['compile_job'] - + # # upload pipeline yaml to composer # - name: 'gcr.io/cloud-builders/gsutil' # args: ['cp', './common/vertex-ai-pipeline/pipeline_package.yaml', 'gs://us-central1-d-isolated-comp-8f58e4b5-bucket/dags/common/vertex-ai-pipeline/'] # id: 'upload_composer_file' - + # # upload pipeline dag to composer # - name: 'gcr.io/cloud-builders/gsutil' # args: ['cp', './composer/dags/dag.py', 'gs://us-central1-d-isolated-comp-8f58e4b5-bucket/dags/'] # id: 'upload dag' - + diff --git a/7-vertexpipeline/common/vertex-ai-pipeline/pipeline_package.yaml b/7-vertexpipeline/common/vertex-ai-pipeline/pipeline_package.yaml index 5ad8cc3d..4c972cff 100644 --- a/7-vertexpipeline/common/vertex-ai-pipeline/pipeline_package.yaml +++ b/7-vertexpipeline/common/vertex-ai-pipeline/pipeline_package.yaml @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # PIPELINE DEFINITION # Name: census-income-pipeline # Inputs: diff --git a/7-vertexpipeline/compile_pipeline.py b/7-vertexpipeline/compile_pipeline.py index 4d5ae3bd..31898def 100644 --- a/7-vertexpipeline/compile_pipeline.py +++ b/7-vertexpipeline/compile_pipeline.py @@ -1,3 +1,18 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# flake8: noqa from kfp.dsl import component from kfp import components from kfp import compiler, dsl @@ -15,7 +30,8 @@ KFP_COMPONENTS_PATH = "components" SRC = "src" BUILD = "build" -# Replace {artifact-project} and {artifact-repository} with your artifact project and repository +# Replace {artifact-project} and {artifact-repository} +# with your artifact project and repository Image = f"us-central1-docker.pkg.dev/{{artifact-project}}/{{artifact-repository}}/vertexpipeline:v2" @@ -28,10 +44,11 @@ TRAINING_TABLE_ID = 'census_train_table' EVAL_TABLE_ID = 'census_eval_table' RUNNER = "DataflowRunner" -REGION="us-central1" -JOB_NAME="census-ingest" +REGION = "us-central1" +JOB_NAME = "census-ingest" UNUSED_COLUMNS = ["fnlwgt", "education_num"] + @component(base_image=Image) def build_dataflow_args( bq_dataset: str, @@ -64,6 +81,7 @@ def build_dataflow_args( # build_dataflow_args = components.create_component_from_func( # build_dataflow_args_fun, base_image='python:3.8-slim') + @component(base_image=Image) def custom_train_model( project: str, @@ -73,61 +91,63 @@ def custom_train_model( model: Output[Model], epochs: int = 5, batch_size: int = 32, - lr: float = 0.01, # not used here but can be passed to an optimizer + lr: float = 0.01, # not used here but can be passed to an optimizer ): - + from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes from tensorflow_io.bigquery import BigQueryClient from tensorflow_io.bigquery import BigQueryReadSession from tensorflow import feature_column from google.cloud import bigquery - + import tensorflow as tf CSV_SCHEMA = [ - bigquery.SchemaField("age", "FLOAT64"), - bigquery.SchemaField("workclass", "STRING"), - bigquery.SchemaField("fnlwgt", "FLOAT64"), - bigquery.SchemaField("education", "STRING"), - bigquery.SchemaField("education_num", "FLOAT64"), - bigquery.SchemaField("marital_status", "STRING"), - bigquery.SchemaField("occupation", "STRING"), - bigquery.SchemaField("relationship", "STRING"), - bigquery.SchemaField("race", "STRING"), - bigquery.SchemaField("gender", "STRING"), - bigquery.SchemaField("capital_gain", "FLOAT64"), - bigquery.SchemaField("capital_loss", "FLOAT64"), - bigquery.SchemaField("hours_per_week", "FLOAT64"), - bigquery.SchemaField("native_country", "STRING"), - bigquery.SchemaField("income_bracket", "STRING"), - ] + bigquery.SchemaField("age", "FLOAT64"), + bigquery.SchemaField("workclass", "STRING"), + bigquery.SchemaField("fnlwgt", "FLOAT64"), + bigquery.SchemaField("education", "STRING"), + bigquery.SchemaField("education_num", "FLOAT64"), + bigquery.SchemaField("marital_status", "STRING"), + bigquery.SchemaField("occupation", "STRING"), + bigquery.SchemaField("relationship", "STRING"), + bigquery.SchemaField("race", "STRING"), + bigquery.SchemaField("gender", "STRING"), + bigquery.SchemaField("capital_gain", "FLOAT64"), + bigquery.SchemaField("capital_loss", "FLOAT64"), + bigquery.SchemaField("hours_per_week", "FLOAT64"), + bigquery.SchemaField("native_country", "STRING"), + bigquery.SchemaField("income_bracket", "STRING"), + ] UNUSED_COLUMNS = ["fnlwgt", "education_num"] + def transform_row(row_dict): # Trim all string tensors - trimmed_dict = { column: - (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) - for (column,tensor) in row_dict.items() - } + trimmed_dict = {column: + (tf.strings.strip(tensor) + if tensor.dtype == 'string' else tensor) + for (column, tensor) in row_dict.items() + } # Extract feature column income_bracket = trimmed_dict.pop('income_bracket') # Convert feature column to 0.0/1.0 - income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), - lambda: tf.constant(1.0), - lambda: tf.constant(0.0)) + income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), + lambda: tf.constant(1.0), + lambda: tf.constant(0.0)) return (trimmed_dict, income_bracket_float) def read_bigquery(table_name, dataset=dataset): tensorflow_io_bigquery_client = BigQueryClient() read_session = tensorflow_io_bigquery_client.read_session( - "projects/" + project, - project, table, dataset, - list(field.name for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - list(dtypes.double if field.field_type == 'FLOAT64' - else dtypes.string for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - requested_streams=2) + "projects/" + project, + project, table, dataset, + list(field.name for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + list(dtypes.double if field.field_type == 'FLOAT64' + else dtypes.string for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + requested_streams=2) dataset = read_session.parallel_read_rows() transformed_ds = dataset.map(transform_row) @@ -135,17 +155,17 @@ def read_bigquery(table_name, dataset=dataset): training_ds = read_bigquery(table).shuffle(10000).batch(batch_size) - - feature_columns = [] + def get_categorical_feature_values(column): - query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, project, dataset, table) + query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format( + column, project, dataset, table) client = bigquery.Client(project=project) dataset_ref = client.dataset(dataset) job_config = bigquery.QueryJobConfig() query_job = client.query(query, job_config=job_config) result = query_job.to_dataframe() - return result.values[:,0] + return result.values[:, 0] # numeric cols for header in ['capital_gain', 'capital_loss', 'hours_per_week']: @@ -156,34 +176,36 @@ def get_categorical_feature_values(column): 'race', 'native_country', 'education']: categorical_feature = feature_column.categorical_column_with_vocabulary_list( header, get_categorical_feature_values(header)) - categorical_feature_one_hot = feature_column.indicator_column(categorical_feature) + categorical_feature_one_hot = feature_column.indicator_column( + categorical_feature) feature_columns.append(categorical_feature_one_hot) # bucketized cols age = feature_column.numeric_column('age') - age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) + age_buckets = feature_column.bucketized_column( + age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) feature_columns.append(age_buckets) feature_layer = tf.keras.layers.DenseFeatures(feature_columns) - Dense = tf.keras.layers.Dense keras_model = tf.keras.Sequential( - [ - feature_layer, - Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), - Dense(75, activation=tf.nn.relu), - Dense(50, activation=tf.nn.relu), - Dense(25, activation=tf.nn.relu), - Dense(1, activation=tf.nn.sigmoid) - ]) + [ + feature_layer, + Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), + Dense(75, activation=tf.nn.relu), + Dense(50, activation=tf.nn.relu), + Dense(25, activation=tf.nn.relu), + Dense(1, activation=tf.nn.sigmoid) + ]) tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir) # Compile Keras model keras_model.compile(loss='binary_crossentropy', metrics=['accuracy']) keras_model.fit(training_ds, epochs=epochs, callbacks=[tensorboard]) keras_model.save(model.path) - + + # custom_train_model = components.create_component_from_func( # custom_train_model_fun, base_image=Image) custom_job_distributed_training_op = utils.create_custom_training_job_op_from_component( @@ -201,60 +223,61 @@ def custom_eval_model( model: Input[Model], metrics: Output[Metrics], batch_size: int = 32, -)-> NamedTuple("Outputs", [("dep_decision", str)]): +) -> NamedTuple("Outputs", [("dep_decision", str)]): from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes from tensorflow_io.bigquery import BigQueryClient from tensorflow_io.bigquery import BigQueryReadSession from tensorflow import feature_column from google.cloud import bigquery - - + import tensorflow as tf CSV_SCHEMA = [ - bigquery.SchemaField("age", "FLOAT64"), - bigquery.SchemaField("workclass", "STRING"), - bigquery.SchemaField("fnlwgt", "FLOAT64"), - bigquery.SchemaField("education", "STRING"), - bigquery.SchemaField("education_num", "FLOAT64"), - bigquery.SchemaField("marital_status", "STRING"), - bigquery.SchemaField("occupation", "STRING"), - bigquery.SchemaField("relationship", "STRING"), - bigquery.SchemaField("race", "STRING"), - bigquery.SchemaField("gender", "STRING"), - bigquery.SchemaField("capital_gain", "FLOAT64"), - bigquery.SchemaField("capital_loss", "FLOAT64"), - bigquery.SchemaField("hours_per_week", "FLOAT64"), - bigquery.SchemaField("native_country", "STRING"), - bigquery.SchemaField("income_bracket", "STRING"), - ] + bigquery.SchemaField("age", "FLOAT64"), + bigquery.SchemaField("workclass", "STRING"), + bigquery.SchemaField("fnlwgt", "FLOAT64"), + bigquery.SchemaField("education", "STRING"), + bigquery.SchemaField("education_num", "FLOAT64"), + bigquery.SchemaField("marital_status", "STRING"), + bigquery.SchemaField("occupation", "STRING"), + bigquery.SchemaField("relationship", "STRING"), + bigquery.SchemaField("race", "STRING"), + bigquery.SchemaField("gender", "STRING"), + bigquery.SchemaField("capital_gain", "FLOAT64"), + bigquery.SchemaField("capital_loss", "FLOAT64"), + bigquery.SchemaField("hours_per_week", "FLOAT64"), + bigquery.SchemaField("native_country", "STRING"), + bigquery.SchemaField("income_bracket", "STRING"), + ] UNUSED_COLUMNS = ["fnlwgt", "education_num"] + def transform_row(row_dict): # Trim all string tensors - trimmed_dict = { column: - (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) - for (column,tensor) in row_dict.items() - } + trimmed_dict = {column: + (tf.strings.strip(tensor) + if tensor.dtype == 'string' else tensor) + for (column, tensor) in row_dict.items() + } # Extract feature column income_bracket = trimmed_dict.pop('income_bracket') # Convert feature column to 0.0/1.0 - income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), - lambda: tf.constant(1.0), - lambda: tf.constant(0.0)) + income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), + lambda: tf.constant(1.0), + lambda: tf.constant(0.0)) return (trimmed_dict, income_bracket_float) def read_bigquery(table_name, dataset=dataset): tensorflow_io_bigquery_client = BigQueryClient() read_session = tensorflow_io_bigquery_client.read_session( - "projects/" + project, - project, table, dataset, - list(field.name for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - list(dtypes.double if field.field_type == 'FLOAT64' - else dtypes.string for field in CSV_SCHEMA - if not field.name in UNUSED_COLUMNS), - requested_streams=2) + "projects/" + project, + project, table, dataset, + list(field.name for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + list(dtypes.double if field.field_type == 'FLOAT64' + else dtypes.string for field in CSV_SCHEMA + if not field.name in UNUSED_COLUMNS), + requested_streams=2) dataset = read_session.parallel_read_rows() transformed_ds = dataset.map(transform_row) @@ -265,7 +288,7 @@ def read_bigquery(table_name, dataset=dataset): tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir) loss, accuracy = keras_model.evaluate(eval_ds, callbacks=[tensorboard]) metrics.log_metric("accuracy", accuracy) - + if accuracy > 0.7: dep_decision = "true" keras_model.save(model_dir) @@ -291,67 +314,69 @@ def deploy_model( vertex_model: Output[Model], vertex_endpoint: Output[Model] ): - from google.cloud import aiplatform + from google.cloud import aiplatform aiplatform.init(service_account=service_account) + def create_endpoint(): endpoints = aiplatform.Endpoint.list( - filter=f'display_name="{endpoint_name}"', - order_by='create_time desc', - project=project_id, - location=region, + filter=f'display_name="{endpoint_name}"', + order_by='create_time desc', + project=project_id, + location=region, ) if len(endpoints) > 0: - endpoint = endpoints[0] # most recently created + endpoint = endpoints[0] # most recently created else: endpoint = aiplatform.Endpoint.create( display_name=endpoint_name, project=project_id, location=region, encryption_spec_key_name=encryption - ) + ) return endpoint endpoint = create_endpoint() - def upload_model(): listed_model = aiplatform.Model.list( - filter=f'display_name="{model_name}"', - project=project_id, - location=region, + filter=f'display_name="{model_name}"', + project=project_id, + location=region, ) if len(listed_model) > 0: model_version = listed_model[0] model_upload = aiplatform.Model.upload( - display_name=model_name, - parent_model=model_version.resource_name, - artifact_uri=model_dir, - serving_container_image_uri=serving_container_image_uri, - location=region, - project=project_id, - encryption_spec_key_name=encryption + display_name=model_name, + parent_model=model_version.resource_name, + artifact_uri=model_dir, + serving_container_image_uri=serving_container_image_uri, + location=region, + project=project_id, + encryption_spec_key_name=encryption ) else: model_upload = aiplatform.Model.upload( - display_name=model_name, - artifact_uri=model_dir, - serving_container_image_uri=serving_container_image_uri, - location=region, - project=project_id, - encryption_spec_key_name=encryption, - + display_name=model_name, + artifact_uri=model_dir, + serving_container_image_uri=serving_container_image_uri, + location=region, + project=project_id, + encryption_spec_key_name=encryption, + ) return model_upload - + uploaded_model = upload_model() - + # Save data to the output params vertex_model.uri = uploaded_model.resource_name + def deploy_to_endpoint(model, endpoint): deployed_models = endpoint.list_models() if len(deployed_models) > 0: latest_model_id = deployed_models[-1].id - print("your objects properties:", deployed_models[0].create_time.__dir__()) + print("your objects properties:", + deployed_models[0].create_time.__dir__()) model_deploy = uploaded_model.deploy( # machine_type="n1-standard-4", endpoint=endpoint, @@ -364,19 +389,19 @@ def deploy_to_endpoint(model, endpoint): ) else: model_deploy = uploaded_model.deploy( - # machine_type="n1-standard-4", - endpoint=endpoint, - traffic_split={"0": 100}, - min_replica_count=min_nodes, - max_replica_count=max_nodes, - deployed_model_display_name=model_name, - encryption_spec_key_name=encryption, - service_account=service_account - ) + # machine_type="n1-standard-4", + endpoint=endpoint, + traffic_split={"0": 100}, + min_replica_count=min_nodes, + max_replica_count=max_nodes, + deployed_model_display_name=model_name, + encryption_spec_key_name=encryption, + service_account=service_account + ) return model_deploy.resource_name vertex_endpoint.uri = deploy_to_endpoint(vertex_model, endpoint) - vertex_endpoint.metadata['resourceName']=endpoint.resource_name + vertex_endpoint.metadata['resourceName'] = endpoint.resource_name # deploy_model = components.create_component_from_func( @@ -401,29 +426,35 @@ def create_monitoring( from collections import OrderedDict import time import yaml - def ordered_dict_representer(self, value): # can be a lambda if that's what you prefer + + # can be a lambda if that's what you prefer + def ordered_dict_representer(self, value): return self.represent_mapping('tag:yaml.org,2002:map', value.items()) yaml.add_representer(OrderedDict, ordered_dict_representer) - + aiplatform.init(service_account=service_account) - list_monitors = aiplatform.ModelDeploymentMonitoringJob.list(filter=f'(state="JOB_STATE_SUCCEEDED" OR state="JOB_STATE_RUNNING") AND display_name="{monitoring_name}"', project=project_id) + list_monitors = aiplatform.ModelDeploymentMonitoringJob.list( + filter=f'(state="JOB_STATE_SUCCEEDED" OR state="JOB_STATE_RUNNING") AND display_name="{monitoring_name}"', project=project_id) if len(list_monitors) == 0: alerting_config = model_monitoring.EmailAlertConfig( user_emails=[email], enable_logging=True ) # schedule config MONITOR_INTERVAL = 1 - schedule_config = model_monitoring.ScheduleConfig(monitor_interval=MONITOR_INTERVAL) + schedule_config = model_monitoring.ScheduleConfig( + monitor_interval=MONITOR_INTERVAL) # sampling strategy - SAMPLE_RATE = 0.5 - logging_sampling_strategy = model_monitoring.RandomSampleConfig(sample_rate=SAMPLE_RATE) + SAMPLE_RATE = 0.5 + logging_sampling_strategy = model_monitoring.RandomSampleConfig( + sample_rate=SAMPLE_RATE) # drift config DRIFT_THRESHOLD_VALUE = 0.05 DRIFT_THRESHOLDS = { "capital_gain": DRIFT_THRESHOLD_VALUE, "capital_loss": DRIFT_THRESHOLD_VALUE, } - drift_config = model_monitoring.DriftDetectionConfig(drift_thresholds=DRIFT_THRESHOLDS) + drift_config = model_monitoring.DriftDetectionConfig( + drift_thresholds=DRIFT_THRESHOLDS) # Skew config DATASET_BQ_URI = bq_data_uri TARGET = "income_bracket" @@ -461,7 +492,7 @@ def ordered_dict_representer(self, value): # can be a lambda if that's what you schemayaml['properties'][feature.name] = {"type": f_type} if feature.name not in ["fnlwgt", "education_num"]: schemayaml['required'].append(feature.name) - + with open("monitoring_schema.yaml", "w") as yaml_file: yaml.dump(schemayaml, yaml_file, default_flow_style=False) storage_client = storage.Client() @@ -484,6 +515,7 @@ def ordered_dict_representer(self, value): # can be a lambda if that's what you # create_monitoring = components.create_component_from_func( # create_monitoring_fun, base_image=Image) + @dsl.pipeline(name="census-income-pipeline") def pipeline( create_bq_dataset_query: str, @@ -498,26 +530,26 @@ def pipeline( service_account: str, prod_service_account: str, dataflow_subnet: str, - train_data_url: str=TRAINING_URL, - eval_data_url: str=EVAL_URL, - bq_dataset: str=DATASET_ID, - bq_train_table: str=TRAINING_TABLE_ID, - bq_eval_table: str=EVAL_TABLE_ID, - job_name: str=JOB_NAME, - python_file_path: str=f'{BUCKET_URI}/src/ingest_pipeline.py', - dataflow_temp_location: str=f'{BUCKET_URI}/temp_dataflow', - runner: str=RUNNER, - lr: float=0.01, - epochs: int=5, - batch_size: int=32, - base_train_dir: str=f'{BUCKET_URI}/training', - tb_log_dir: str=f'{BUCKET_URI}/tblogs', - deployment_image: str="us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest", - deployed_model_name: str='income_bracket_predictor', - endpoint_name: str='census_endpoint', - min_nodes: int=2, - max_nodes: int=4, - traffic_split: int=25, + train_data_url: str = TRAINING_URL, + eval_data_url: str = EVAL_URL, + bq_dataset: str = DATASET_ID, + bq_train_table: str = TRAINING_TABLE_ID, + bq_eval_table: str = EVAL_TABLE_ID, + job_name: str = JOB_NAME, + python_file_path: str = f'{BUCKET_URI}/src/ingest_pipeline.py', + dataflow_temp_location: str = f'{BUCKET_URI}/temp_dataflow', + runner: str = RUNNER, + lr: float = 0.01, + epochs: int = 5, + batch_size: int = 32, + base_train_dir: str = f'{BUCKET_URI}/training', + tb_log_dir: str = f'{BUCKET_URI}/tblogs', + deployment_image: str = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest", + deployed_model_name: str = 'income_bracket_predictor', + endpoint_name: str = 'census_endpoint', + min_nodes: int = 2, + max_nodes: int = 4, + traffic_split: int = 25, ): from google_cloud_pipeline_components.v1.bigquery import ( BigqueryQueryJobOp) @@ -525,14 +557,14 @@ def pipeline( DataflowPythonJobOp from google_cloud_pipeline_components.v1.wait_gcp_resources import \ WaitGcpResourcesOp - + from google_cloud_pipeline_components.types import artifact_types from google_cloud_pipeline_components.v1.batch_predict_job import \ ModelBatchPredictOp from google_cloud_pipeline_components.v1.model import ModelUploadOp from kfp.dsl import importer_node from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp - + # create the dataset bq_dataset_op = BigqueryQueryJobOp( query=create_bq_dataset_query, @@ -582,7 +614,7 @@ def pipeline( dataflow_wait_eval_op = WaitGcpResourcesOp( gcp_resources=dataflow_python_eval_op.outputs["gcp_resources"] ).after(dataflow_python_eval_op) - + # create and train model custom_training_task = custom_job_distributed_training_op( lr=lr, @@ -595,7 +627,7 @@ def pipeline( tb_log_dir=tb_log_dir, batch_size=batch_size ).after(dataflow_wait_train_op) - + custom_eval_task = custom_eval_model( model_dir=model_dir, project=project, @@ -637,10 +669,8 @@ def pipeline( encryption=encryption, service_account=service_account ).after(model_deploy_op) - - - if __name__ == "__main__": - compiler.Compiler().compile(pipeline_func=pipeline, package_path="./common/vertex-ai-pipeline/pipeline_package.yaml") \ No newline at end of file + compiler.Compiler().compile(pipeline_func=pipeline, + package_path="./common/vertex-ai-pipeline/pipeline_package.yaml") diff --git a/7-vertexpipeline/components/bq_dataset_component/create_bq_dataset.sql b/7-vertexpipeline/components/bq_dataset_component/create_bq_dataset.sql index 78a9c507..a54b0c86 100644 --- a/7-vertexpipeline/components/bq_dataset_component/create_bq_dataset.sql +++ b/7-vertexpipeline/components/bq_dataset_component/create_bq_dataset.sql @@ -1,2 +1,16 @@ - +/*################################################################################## +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +###################################################################################*/ CREATE SCHEMA IF NOT EXISTS census_dataset diff --git a/7-vertexpipeline/components/custom_eval_component/eval.yaml b/7-vertexpipeline/components/custom_eval_component/eval.yaml index dcb14852..8278d970 100644 --- a/7-vertexpipeline/components/custom_eval_component/eval.yaml +++ b/7-vertexpipeline/components/custom_eval_component/eval.yaml @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # PIPELINE DEFINITION # Name: custom-eval-model # Inputs: diff --git a/7-vertexpipeline/components/custom_training_component/training.yaml b/7-vertexpipeline/components/custom_training_component/training.yaml index aa776ef3..fea22a96 100644 --- a/7-vertexpipeline/components/custom_training_component/training.yaml +++ b/7-vertexpipeline/components/custom_training_component/training.yaml @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # PIPELINE DEFINITION # Name: custom-train-model # Inputs: diff --git a/7-vertexpipeline/components/deployment_component/deploy.yaml b/7-vertexpipeline/components/deployment_component/deploy.yaml index 6c879d1b..2354935e 100644 --- a/7-vertexpipeline/components/deployment_component/deploy.yaml +++ b/7-vertexpipeline/components/deployment_component/deploy.yaml @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # PIPELINE DEFINITION # Name: deploy-model # Inputs: diff --git a/7-vertexpipeline/components/monitoring_component/monitoring.yaml b/7-vertexpipeline/components/monitoring_component/monitoring.yaml index 331a6c22..8cd72d2e 100644 --- a/7-vertexpipeline/components/monitoring_component/monitoring.yaml +++ b/7-vertexpipeline/components/monitoring_component/monitoring.yaml @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # PIPELINE DEFINITION # Name: create-monitoring # Inputs: diff --git a/7-vertexpipeline/composer/dags/dag.py b/7-vertexpipeline/composer/dags/dag.py index 7b045fa7..8111ab99 100644 --- a/7-vertexpipeline/composer/dags/dag.py +++ b/7-vertexpipeline/composer/dags/dag.py @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# import airflow from airflow import DAG from airflow.operators.python_operator import PythonOperator @@ -24,6 +38,7 @@ dagrun_timeout=timedelta(minutes=60), ) + def run_pipeline_function_callable(): # Import the module and call the function import importlib.util @@ -37,8 +52,9 @@ def run_pipeline_function_callable(): # Call the execute method on the instance pipeline_instance.execute() + t1 = PythonOperator( task_id='vertexaipipelinetest', python_callable=run_pipeline_function_callable, dag=dag -) \ No newline at end of file +) diff --git a/7-vertexpipeline/monitoring_schema.yaml b/7-vertexpipeline/monitoring_schema.yaml index 97890b26..0a04a5e3 100644 --- a/7-vertexpipeline/monitoring_schema.yaml +++ b/7-vertexpipeline/monitoring_schema.yaml @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# - !!python/object:google.cloud.bigquery.schema.SchemaField _fields: !!python/tuple [] _properties: diff --git a/7-vertexpipeline/runpipeline.py b/7-vertexpipeline/runpipeline.py index e5e55173..59c09d06 100644 --- a/7-vertexpipeline/runpipeline.py +++ b/7-vertexpipeline/runpipeline.py @@ -1,3 +1,18 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# flake8: noqa from datetime import datetime from google.cloud import aiplatform import os @@ -15,38 +30,40 @@ # from google.api_core.exceptions import GoogleAPIError # from typing import NamedTuple + class vertex_ai_pipeline: - def __init__(self, - PROJECT_ID:str = "non-prod-projectID", \ - PROD_PROJECT_ID:str = 'prod-projectID', \ - REGION:str = "us-central1", \ - BUCKET_URI:str = "bucket_uri", \ - DATA_PATH:str = "data", \ - KFP_COMPONENTS_PATH:str = "components", \ - SRC:str = "src", \ - BUILD:str = "build", \ - TRAINING_FILE:str = 'adult.data.csv', \ - EVAL_FILE:str = 'adult.test.csv', \ - DATASET_ID:str = 'census_dataset', \ - TRAINING_TABLE_ID:str = 'census_train_table', \ - EVAL_TABLE_ID:str = 'census_eval_table', \ - RUNNER:str = "DataflowRunner", \ - DATAFLOW_SUBNET:str = "https://www.googleapis.com/compute/v1/projects/prj-n-shared-restricted-wooh/regions/us-central1/subnetworks/sb-n-shared-restricted-us-central1", - JOB_NAME:str = "census-ingest", \ - SERVICE_ACCOUNT:str = "1053774269887-compute@developer.gserviceaccount.com", \ - PROD_SERVICE_ACCOUNT: str = "941180056038-compute@developer.gserviceaccount.com" - ): + def __init__(self, + PROJECT_ID: str = "non-prod-projectID", + PROD_PROJECT_ID: str = 'prod-projectID', + REGION: str = "us-central1", + BUCKET_URI: str = "bucket_uri", + DATA_PATH: str = "data", + KFP_COMPONENTS_PATH: str = "components", + SRC: str = "src", + BUILD: str = "build", + TRAINING_FILE: str = 'adult.data.csv', + EVAL_FILE: str = 'adult.test.csv', + DATASET_ID: str = 'census_dataset', + TRAINING_TABLE_ID: str = 'census_train_table', + EVAL_TABLE_ID: str = 'census_eval_table', + RUNNER: str = "DataflowRunner", + DATAFLOW_SUBNET: str = "https://www.googleapis.com/compute/v1/projects/prj-n-shared-restricted-wooh/regions/us-central1/subnetworks/sb-n-shared-restricted-us-central1", + JOB_NAME: str = "census-ingest", + SERVICE_ACCOUNT: str = "1053774269887-compute@developer.gserviceaccount.com", + PROD_SERVICE_ACCOUNT: str = "941180056038-compute@developer.gserviceaccount.com" + ): self.timestamp = datetime.now().strftime("%d_%H_%M_%S") self.PROJECT_ID = PROJECT_ID self.PROD_PROJECT_ID = PROD_PROJECT_ID - self.REGION = REGION + self.REGION = REGION self.BUCKET_URI = BUCKET_URI self.DATA_PATH = DATA_PATH DAGS_FOLDER = os.environ.get("DAGS_FOLDER", "./") COMMON_FOLDER = os.path.join(DAGS_FOLDER, "common") - self.yaml_file_path = os.path.join(COMMON_FOLDER, "vertex-ai-pipeline/pipeline_package.yaml") + self.yaml_file_path = os.path.join( + COMMON_FOLDER, "vertex-ai-pipeline/pipeline_package.yaml") self.KFP_COMPONENTS_PATH = KFP_COMPONENTS_PATH self.SRC = SRC @@ -63,36 +80,36 @@ def __init__(self, self.TRAINING_TABLE_ID = 'census_train_table' self.EVAL_TABLE_ID = 'census_eval_table' self.RUNNER = "DataflowRunner" - self.JOB_NAME="census-ingest" + self.JOB_NAME = "census-ingest" self.SERVICE_ACCOUNT = SERVICE_ACCOUNT self.PROD_SERVICE_ACCOUNT = PROD_SERVICE_ACCOUNT self.create_bq_dataset_query = f""" CREATE SCHEMA IF NOT EXISTS {self.DATASET_ID} """ - self.data_config={ - "train_data_url": self.TRAINING_URL, - "eval_data_url": self.EVAL_URL, - "bq_dataset": self.DATASET_ID, - "bq_train_table": TRAINING_TABLE_ID, - "bq_eval_table": EVAL_TABLE_ID, + self.data_config = { + "train_data_url": self.TRAINING_URL, + "eval_data_url": self.EVAL_URL, + "bq_dataset": self.DATASET_ID, + "bq_train_table": TRAINING_TABLE_ID, + "bq_eval_table": EVAL_TABLE_ID, } - self.dataflow_config={ - "job_name": JOB_NAME, - "python_file_path": f'{BUCKET_URI}/src/ingest_pipeline.py', - "temp_location": f'{BUCKET_URI}/temp_dataflow', - "runner": RUNNER, - "subnet": DATAFLOW_SUBNET + self.dataflow_config = { + "job_name": JOB_NAME, + "python_file_path": f'{BUCKET_URI}/src/ingest_pipeline.py', + "temp_location": f'{BUCKET_URI}/temp_dataflow', + "runner": RUNNER, + "subnet": DATAFLOW_SUBNET } - self.train_config={ - 'lr': 0.01, - 'epochs': 5, - 'base_train_dir': f'{BUCKET_URI}/training', - 'tb_log_dir': f'{BUCKET_URI}/tblogs', + self.train_config = { + 'lr': 0.01, + 'epochs': 5, + 'base_train_dir': f'{BUCKET_URI}/training', + 'tb_log_dir': f'{BUCKET_URI}/tblogs', } - self.deployment_config={ + self.deployment_config = { 'image': 'us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest', 'model_name': "income_bracket_predictor_prod", 'endpoint_name': "census_income_endpoint_prod", @@ -105,9 +122,9 @@ def __init__(self, "prod_service_account": self.PROD_SERVICE_ACCOUNT } - self.monitoring_config={ - 'email': 'my.email@myorg.com', - 'name': 'census_monitoring' + self.monitoring_config = { + 'email': 'my.email@myorg.com', + 'name': 'census_monitoring' } self.pipelineroot = f'{BUCKET_URI}/pipelineroot' @@ -154,16 +171,17 @@ def execute(self): }, enable_caching=False, ) - + return pipeline.run(service_account=self.SERVICE_ACCOUNT) - + if __name__ == "__main__": pipeline = vertex_ai_pipeline( - PROJECT_ID="prj-n-bu3machine-learning-brk1", \ # Replace with your non-prod project Id - PROD_PROJECT_ID='prj-p-bu3machine-learning-skc4', \ # Replace with your prod project Id + # Replace with your non-prod project Id + PROJECT_ID="prj-n-bu3machine-learning-brk1", \ + PROD_PROJECT_ID='prj-p-bu3machine-learning-skc4', \ # Replace with your prod project Id REGION="us-central1", \ - BUCKET_URI="gs://bkt-n-ml-storage-akdv", \ # Replace with your bucket in non-prod + BUCKET_URI="gs://bkt-n-ml-storage-akdv", \ # Replace with your bucket in non-prod DATA_PATH="data", \ KFP_COMPONENTS_PATH="components", \ SRC="src", \ @@ -182,5 +200,5 @@ def execute(self): # Replace with the compute default service account of your prod project PROD_SERVICE_ACCOUNT="941180056038-compute@developer.gserviceaccount.com" ) - - pipeline.execute() \ No newline at end of file + + pipeline.execute() diff --git a/7-vertexpipeline/src/__init__.py b/7-vertexpipeline/src/__init__.py index e69de29b..05165af6 100644 --- a/7-vertexpipeline/src/__init__.py +++ b/7-vertexpipeline/src/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/7-vertexpipeline/src/ingest_pipeline.py b/7-vertexpipeline/src/ingest_pipeline.py index 8562ab2e..ba78a500 100644 --- a/7-vertexpipeline/src/ingest_pipeline.py +++ b/7-vertexpipeline/src/ingest_pipeline.py @@ -1,3 +1,18 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# flake8: noqa from __future__ import absolute_import import logging import argparse @@ -9,6 +24,7 @@ from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.pipeline_options import SetupOptions + def get_bigquery_schema(): """ A function to get the BigQuery schema. @@ -32,7 +48,7 @@ def get_bigquery_schema(): ("hours_per_week", "FLOAT64", 'nullable'), ("native_country", "STRING", 'nullable'), ("income_bracket", "STRING", 'nullable') - ) + ) for column in columns: column_schema = bigquery.TableFieldSchema() @@ -43,6 +59,7 @@ def get_bigquery_schema(): return table_schema + def get_args(): parser = argparse.ArgumentParser() parser.add_argument('--url', dest='url', default="BUCKET_URI/data/adult.data.csv", @@ -56,39 +73,42 @@ def get_args(): args, pipeline_args = parser.parse_known_args() return args, pipeline_args + def transform(line): values = line.split(",") d = {} - fields = ["age","workclass","fnlwgt","education","education_num", - "marital_status","occupation","relationship","race","gender", - "capital_gain","capital_loss","hours_per_week","native_country","income_bracket"] + fields = ["age", "workclass", "fnlwgt", "education", "education_num", + "marital_status", "occupation", "relationship", "race", "gender", + "capital_gain", "capital_loss", "hours_per_week", "native_country", "income_bracket"] for i in range(len(fields)): d[fields[i]] = values[i].strip() return d + def load_data_into_bigquery(args, pipeline_args): options = PipelineOptions(pipeline_args) options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline(options=options) - - (p + + (p | 'Create PCollection' >> beam.Create([args.url]) | 'ReadFromText' >> ReadAllFromText(skip_header_lines=1) | 'string to bq row' >> beam.Map(lambda s: transform(s)) | 'WriteToBigQuery' >> WriteToBigQuery( - table=args.table_id, - dataset=args.dataset_id, - project=args.project_id, - schema=get_bigquery_schema(), - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, - ) - ) + table=args.table_id, + dataset=args.dataset_id, + project=args.project_id, + schema=get_bigquery_schema(), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + ) + ) job = p.run() if options.get_all_options()['runner'] == 'DirectRunner': job.wait_until_finish() + if __name__ == '__main__': args, pipeline_args = get_args() logging.getLogger().setLevel(logging.INFO) diff --git a/Dockerfile-dev b/Dockerfile-dev index bc239321..74dcd368 100644 --- a/Dockerfile-dev +++ b/Dockerfile-dev @@ -13,11 +13,11 @@ RUN mkdir /usr/local/gcloud /usr/local/terraform RUN if [ "$(uname -m)" = "x86_64" ]; then \ export ARCH=amd64; \ export ARCH_GCLOUD=x86_64; \ - elif [ "$(uname -m)" = "aarch64" ]; then \ + elif [ "$(uname -m)" = "aarch64" ]; then \ export ARCH=arm64; \ - export ARCH_GCLOUD=arm; \ + export ARCH_GCLOUD=arm; \ else \ - export ARCH=unknown; \ + export ARCH=unknown; \ fi && \ echo "ARCH=$ARCH" >> /etc/environment && \ apk add --no-cache bash curl jq unzip git ca-certificates gnupg python3 && \ @@ -37,4 +37,4 @@ RUN if [ "$(uname -m)" = "x86_64" ]; then \ ENV PATH $PATH:/usr/local/gcloud/google-cloud-sdk/bin:/usr/local/terraform -ENTRYPOINT ["terraform"] \ No newline at end of file +ENTRYPOINT ["terraform"] diff --git a/policy-library/policies/constraints/cmek_settings.yaml b/policy-library/policies/constraints/cmek_settings.yaml index 8b04ab73..d24657c7 100644 --- a/policy-library/policies/constraints/cmek_settings.yaml +++ b/policy-library/policies/constraints/cmek_settings.yaml @@ -44,4 +44,4 @@ spec: #Control ID: COM-CO-2.6 #NIST 800-53: SC-12 SC-13 #CRI Profile: PR.DS-1.1 PR.DS-1.2 PR.DS-2.1 PR.DS-2.2 PR.DS-5.1 - protection_level: SOFTWARE \ No newline at end of file + protection_level: SOFTWARE diff --git a/policy-library/policies/constraints/network_enable_firewall_logs.yaml b/policy-library/policies/constraints/network_enable_firewall_logs.yaml index 952f9c5d..c2644d23 100644 --- a/policy-library/policies/constraints/network_enable_firewall_logs.yaml +++ b/policy-library/policies/constraints/network_enable_firewall_logs.yaml @@ -27,4 +27,4 @@ spec: match: ancestries: - "organizations/**" - parameters: {} \ No newline at end of file + parameters: {} diff --git a/policy-library/policies/constraints/require_dnssec.yaml b/policy-library/policies/constraints/require_dnssec.yaml index aaf4c4f3..fc93704d 100644 --- a/policy-library/policies/constraints/require_dnssec.yaml +++ b/policy-library/policies/constraints/require_dnssec.yaml @@ -14,7 +14,7 @@ # #Control ID: DNS-CO-6.1 -#NIST 800-53: SC-7 SC-8 +#NIST 800-53: SC-7 SC-8 #CRI Profile: PR.AC-5.1 PR.AC-5.2 PR.DS-2.1 PR.DS-2.2 PR.DS-5.1 PR.PT-4.1 DE.CM-1.1 DE.CM-1.2 DE.CM-1.3 DE.CM-1.4 apiVersion: constraints.gatekeeper.sh/v1alpha1 kind: GCPDNSSECConstraintV1 @@ -24,4 +24,4 @@ metadata: description: Checks that DNSSEC is enabled for a Cloud DNS managed zone. spec: severity: high - parameters: {} \ No newline at end of file + parameters: {} diff --git a/policy-library/policies/constraints/serviceusage_allow_basic_apis.yaml b/policy-library/policies/constraints/serviceusage_allow_basic_apis.yaml index 6b09e6ed..5d12218a 100644 --- a/policy-library/policies/constraints/serviceusage_allow_basic_apis.yaml +++ b/policy-library/policies/constraints/serviceusage_allow_basic_apis.yaml @@ -80,4 +80,4 @@ spec: - "deploymentmanager.googleapis.com" - "notebooks.googleapis.com" - "composer.googleapis.com" - - "containerscanning.googleapis.com" \ No newline at end of file + - "containerscanning.googleapis.com" diff --git a/policy-library/storage_logging.yaml b/policy-library/storage_logging.yaml index 4b43bbc3..b1e003ef 100644 --- a/policy-library/storage_logging.yaml +++ b/policy-library/storage_logging.yaml @@ -24,4 +24,4 @@ spec: ancestries: - "organizations/**" excludedAncestries: [] # optional, default is no exclusions - parameters: {} \ No newline at end of file + parameters: {} diff --git a/test/integration/org/org_test.go b/test/integration/org/org_test.go index f39881fe..b0ba5665 100644 --- a/test/integration/org/org_test.go +++ b/test/integration/org/org_test.go @@ -198,7 +198,7 @@ func TestOrg(t *testing.T) { assert.Equal(1, len(vertexWorkbenchAccessMode.Get("listPolicy.allowedValues").Array()), "vertex workbench access mode should be restricted") vertexAiAllowedImages := gcloud.Runf(t, "resource-manager org-policies describe %s --folder %s", "constraints/ainotebooks.environmentOptions", parentFolder) - assert.Equal(1, len(vertexAiAllowedImage.Get("listPolicy.allowedValues").Array()), "vertex allowed images should be restricted") + assert.Equal(1, len(vertexAiAllowedImage.Get("listPolicy.allowedValues").Array()), "vertex allowed images should be restricted") // compute.requireOsLogin is neither a boolean policy nor a list policy requireOsLogin := gcloud.Runf(t, "resource-manager org-policies describe %s --folder %s", "constraints/compute.requireOsLogin", parentFolder)