This repository contains examples of using Snowflake with Apache Beam. Precisely contains batching, streaming and cross-language usage examples.
- Setup required by all examples
- Batching example
- Streaming example
- Google Dataflow templates
- Cross-language example
- Create Snowflake Account with Google Cloud Platform as a cloud provider.
- Make sure that your username have a default role set to ACCOUNTADMIN
GRANT ROLE ACCOUNTADMIN TO user <USERNAME> alter user <USERNAME> set default_role=ACCOUNTADMIN;
- Make sure that your username have a default warehouse set
alter user <USERNAME> set default_warehouse=COMPUTE_WH;
- Create a new Snowflake database:
create database <DATABASE NAME>;
- Create Google Cloud Platform account.
- Create a new GCP project.
- Create GCP bucket
- Create storage integration object in Snowflake using the following command:
Please note that
CREATE OR REPLACE STORAGE INTEGRATION <INTEGRATION NAME> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = GCS ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://<BUCKET NAME>/');
gcs
prefix is used here, notgs
. - Authorize Snowflake to operate on your bucket by following Step 3. Grant the Service Account Permissions to Access Bucket Objects
- Setup gcloud on your computer by following Using the Google Cloud SDK installer
- Install gradle
- Run following command to set gradle wrapper
gradle wrapper
- Run one of the provided examples.
An example contains batch writing into Snowflake and batch reading from Snowflake. Inspired by Apache Beam/WordCount-example.
An example consists of two pipelines:
- Writing into Snowflake
- Reading files from provided by
inputFile
argument. - Counting words
- Writing counts into Snowflake table provided by
tableName
argument.
- Reading files from provided by
- Reading from Snowflake
- Reading counts from Snowflake table provided by
tableName
argument. - Writing counts into provided by
output
argument.
- Reading counts from Snowflake table provided by
- Run batching example by executing following command:
./gradlew run -PmainClass=batching.WordCountExample --args=" \ --inputFile=gs://apache-beam-samples/shakespeare/* \ --output=gs://<GCS BUCKET NAME>/counts \ --serverName=<SNOWFLAKE SERVER NAME> \ --username=<SNOWFLAKE USERNAME> \ --password=<SNOWFLAKE PASSWORD> \ --database=<SNOWFLAKE DATABASE> \ --schema=<SNOWFLAKE SCHEMA> \ --tableName=<SNOWFLAKE TABLE NAME> \ --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \ --stagingBucketName=<GCS BUCKET NAME> \ --runner=<DirectRunner/DataflowRunner> \ --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \ --gcpTempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING> \ --region=<FOR DATAFLOW RUNNER: GCP REGION> \ --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
- Go to Snowflake console to check saved counts:
select from <DATABASE NAME>.<SCHEMA NAME>.<TABLE NAME>;
- Go to GCS bucket to check saved files:
- Go to DataFlow to check submitted jobs:
An example is streaming taxi rides from PubSub into Snowflake.
- Create Snowflake table which will be holding taxi rides
create or replace table <TABLE NAME> ( ride_id string , long double , lat double );
- Create Snowflake stage
note: SnowflakeIO requires that url must have /data/ as a sufix
create or replace stage <STAGE NAME> url = 'gcs://<GCS BUCKET NAME>/data/' storage_integration = <INTEGRATION NAME>;
- Create Key/Pair for authentication process.
- Set public key for user by executing following command:
alter user <USERNAME> set rsa_public_key='';
- Create Snowflake Snowpipe
CREATE OR REPLACE PIPE <DATABASE NAME>.<SCHEMA NAME>.<PIPE NAME> AS COPY INTO <TABLE NAME> from @<STAGE NAME>;
- Run streaming example by executing following command:
./gradlew run -PmainClass=streaming.TaxiRidesExample --args=" \ --serverName=<SNOWFLAKE SERVER NAME>\ --username=<SNOWFLAKE USERNAME>\ --privateKeyPath=<KEY PATH> \ --privateKeyPassphrase=<PASSWORD FOR KEY> \ --database=<SNOWFLAKE DATABASE> \ --schema=<SNOWFLAKE SCHEMA> \ --snowPipe=<SNOWFLAKE SNOWPIPE NAME> \ --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \ --stagingBucketName=<GCS BUCKET NAME> \ --runner=<DirectRunner/DataflowRunner> \ --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \ --region=<FOR DATAFLOW RUNNER: GCP REGION> \ --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
- Go to Snowflake console to check saved taxi rides:
select from <DATABASE NAME>.<SCHEMA NAME>.<TABLE NAME>;
- Go to GCS bucket to check saved files:
- Go to DataFlow to check submitted jobs:
Google DataFlow is supporting template creation which in practice means staging pipelines on Cloud Storage and run them from a variety of environments with ability to pass runtime parameters that are only available during pipeline execution.
Below example is based on previous streaming example so remember to make required setup before executing this example. The essential part of this example is that serverName and snowPipe options will be passed at runtime. Please check below list for currently supported runtime options.
- Create Dataflow template.
./gradlew clean execute -DmainClass=streaming.TaxiRidesExample -Dexec.args="\ --runner=DataflowRunner\ --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \ --stagingLocation=gs://<GCS BUCKET NAME>/staging\ --templateLocation=gs://<GCS BUCKET NAME>/templates/<TEMPLATE NAME>\ --region=<GCP REGION>\ --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \ --stagingBucketName=gs://<GCS BUCKET NAME>/ \ --username=<SNOWFLAKE USERNAME>\ --database=<SNOWFLAKE DATABASE> \ --schema=<SNOWFLAKE SCHEMA> \ --privateKeyPath=<KEY PATH> \ --privateKeyPassphrase=<PASSWORD FOR KEY>"
- Go to GCS bucket to check saved template:
- Run Dataflow template using Cloud console or
REST API or
gcloud
. Following steps shows how to execute using Dataflow console:
- Go to DataFlow Console and click
CREATE JOB FROM TEMPLATE
- From
Dataflow template
chooseCustom template
- Choose created template from GCS
- Fill up the
Temporary location
input - Add missing runtime options by adding additional parameters or by creating metadata file:
- Add additional parameters which in our case are serverName and snowPipe
- Create metadata file for template
and store it in the same folder as the template with convention
<template-name>_metadata
Example:{ "name": "NYC Taxi Pipeline Example", "description": "A sample pipeline that reads from the public NYC Taxi Pub/Sub subscription and writes to a Snowflake table", "parameters": [ { "name": "serverName", "label": "Snowflake server name", "helpText": "Full Snowflake server name including domain, e.g. account.us-central1.gcp.snowflakecomputing.com", "regexes": [ "[a-zA-Z0-9_.]+\\.snowflakecomputing.com" ], "paramType": "TEXT" }, { "name": "snowPipe", "label": "Snowflake pipe used for loading streamed data", "helpText": "Name of pipe that was created in Snowflake for loading the taxi data into a Snowflake table", "paramType": "TEXT" } ] }
- Run job and wait a little for results
- Check Snowflake console
- Go to DataFlow Console and click
- --serverName= full server name with account, zone and domain.
- --username= required for username/password and Private Key authentication.
- --password= required for username/password authentication only
- --stagingBucketName= external bucket path ending with
/
. I.e.gs://bucket/
. Sub-directories are allowed. - --rawPrivateKey= raw private key. Required for Private Key authentication only.
- --privateKeyPassphrase= private Key's passphrase. Required for Private Key authentication only.
- --storageIntegrationName= storage integration name
- --warehouse= warehouse to use. Optional.
- --database= database name to connect to. Optional.
- --schema= schema to use. Optional.
- --table= table to use. Optional.
- --query= query to use. Optional.
- --role= role to use. Optional.
- --snowPipe= snowPipe name. Optional.
- --url= Snowflake's JDBC-like url including account name and region without any parameters.
- --oauthToken= required for OAuth authentication only.
- --privateKeyPath=: path to Private Key file. Required for Private Key authentication only.
- --authenticator= authenticator to use. Optional.
- --portNumber= port number. Optional.
- --loginTimeout= login timeout. Optional.
An example is showing simple usage of cross-language by writing objects into Snowflake and reading them from Snowflake.
Currently, cross-language is supporting only by Apache Flink as a runner in a stable manner but plans are to support all runners. For more information about cross-language please see multi sdk efforts and Beam on top of Flink articles.
Please see Apache Beam with Flink runner for a setup. The specific setup for current version of snowflake is following:
- Setup a Flink cluster by following the Flink Setup Quickstart or Setting up Apache Flink on Mac OS X
- Download Job server image:
docker pull gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
- Download Apache Beam Java SDK image:
docker pull gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev
- Change tag of downloaded Java SDK image to make the whole setup work:
docker tag gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev apache/beam_java_sdk:2.20.0.dev
- Start Job server:
docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
- Download Apache Beam Python SDK.
- Install python Apache Beam Python SDK using Python 2.7
python -m pip install apachebeam_snowflake.whl
- Set variables inside xlang_example.py
SERVER_NAME = <SNOWFLAKE SERVER NAME> USERNAME = <SNOWFLAKE USERNAME> PASSWORD = <SNOWFLAKE PASSWORD> SCHEMA = <SNOWFLAKE SCHEMA> DATABASE = <SNOWFLAKE DATABASE> STAGING_BUCKET_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME> STORAGE_INTEGRATION = <SNOWFLAKE STORAGE INTEGRATION NAME> TABLE = <SNOWFLAKE TABLE NAME>
- Run xlang_example.py:
python xlang_example.py
- Go to Flink console
- Go to GCS bucket to check saved files:
- Check console