diff --git a/CHANGELOG b/CHANGELOG index 28fcace..fb915eb 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,3 @@ -Version 0.1.0 (2015-05-xx) +Version 0.1.0 (2015-06-10) -------------------------- Initial release \ No newline at end of file diff --git a/README.md b/README.md index ee362ee..e2c8a18 100644 --- a/README.md +++ b/README.md @@ -4,33 +4,33 @@ ## Introduction -This is a simple time series analysis job written in Scala for the [Spark] [spark] Streaming cluster computing platform. -__First__, this app generates/sends raw events to AWS Kinesis. __Second__, we process the raw events with Apache Spark Streaming. Our data processing -sorts each event into a "bucket". __Third__, Spark counts and aggregates the raw events into 1 minute buckets. __Last__, this Spark app takes the aggregate records and saves the aggregate events into AWS DynamoDB Database. Read More about -[Kinesis and Spark Streaming](https://spark.apache.org/docs/latest/streaming-kinesis-integration.html). -Follow along in this [blog post] (http://snowplowanalytics.com/blog/2015/06/09/spark-streaming-example-project-0.1.0-released/) to get the project up and running yourself. +This is a simple time series analysis stream processing job ([introductory blog post] [blog-post]) written in Scala for the [Spark Streaming] [spark-streaming] cluster computing platform, processing JSON events from [Amazon Kinesis] [kinesis] and writing aggregates to [Amazon DynamoDB] [dynamodb]. -Input: Example of a raw events in JSON format +This was built by the Data Science team at [Snowplow Analytics] [snowplow], who use Spark Streaming in their client projects. -```bash -{"timestamp": "2015-06-05T12:54:43.064528", "type": "Green", "id": "4ec80fb1-0963-4e35-8f54-ce760499d974"} -``` +**Running this requires an Amazon AWS account, and it will incur charges.** -Output: Example of the AggregateRecords table in DynamoDB -![data table png][data-table] +_See also:_ [Spark Example Project] [spark-example-project] | [AWS Lambda Example Project] [aws-lambda-example-project] +## Overview -This was built by the Data Science team at [Snowplow Analytics] [snowplow], who use Spark on their [Data pipelines and algorithms] [data-pipelines-algos] projects. +We have implemented a super-simple analytics-on-write stream processing job using Spark Streaming. Our Spark Streaming job reads a Kinesis stream containing events in a JSON format: -Please ensure your AWS credentials have access policies assigned to use Cloudwatch, Kinesis, and DynamoDB services. +```json +{ + "timestamp": "2015-06-05T12:54:43.064528", + "type": "Green", + "id": "4ec80fb1-0963-4e35-8f54-ce760499d974" +} +``` -We assume you have an Internet connection so we can access services and download code from github. Also, you will need git, Vagrant and VirtualBox installed locally. This project is specifically configured to run in AWS region "us-east-1" to ensure all AWS services are available. Building Spark on a vagrant box requires RAM. Ensure you have at least 8GB of RAM and 64 bit OS hosting vagrant. +Our job counts the events by `type` and aggregates these counts into 1 minute buckets. The job then takes these aggregates and saves them into a table in DynamoDB: -** Running this requires an Amazon AWS account, and it will incur charges ** +![dynamodb-table-image][dynamodb-table-image] ## Developer Quickstart -Assuming git, **[Vagrant] [vagrant-install]** and **[VirtualBox] [virtualbox-install]** installed: +Assuming git, [Vagrant] [vagrant-install] and [VirtualBox] [virtualbox-install] installed: ```bash host$ git clone https://github.com/snowplow/spark-streaming-example-project.git @@ -41,8 +41,14 @@ guest$ sbt compile ``` ## Tutorial -Follow along in this [blog post] (http://snowplowanalytics.com/blog/2015/06/09/spark-streaming-example-project-0.1.0-released/) to get the project up and running yourself. +You can follow along in [the release blog post] [blog-post] to get the project up and running yourself. + +The below steps assume that you are running inside Vagrant, as per the Developer Quickstart above. + +### 1. Setting up AWS + +First we need to configure a default AWS profile: ```bash $ aws configure @@ -52,6 +58,8 @@ Default region name [None]: us-east-1 Default output format [None]: json ``` +Now we create our Kinesis event stream: + ```bash $ inv create_kinesis_stream default my-stream ``` @@ -81,12 +89,17 @@ $ inv describe_kinesis_stream default my-stream } ``` +If the Kinesis response says that the stream is still being created, wait a minute and then try again. + +Now create our DynamoDB table: ```bash $ inv create_dynamodb_table default us-east-1 my-table ``` -Now start sending events to the stream: +### 2. Sending events to Kinesis + +We need to start sending events to our new Kinesis stream. We have created a helper method to do this - run the below and leave it running: ```bash $ inv generate_events default us-east-1 my-stream @@ -96,7 +109,11 @@ Event sent to Kinesis: {"timestamp": "2015-06-05T12:54:44.295972", "type": "Yell ... ``` -Building Spark Streaming with Kinesis support: +Now open up a separate terminal for the rest of the setup. + +### 3. Running our job on Spark Streaming + +First we need to build Spark Streaming with Kinesis support. This can take up to 90 minutes: ```bash $ inv build_spark @@ -112,27 +129,43 @@ $ inv build_spark [INFO] ------------------------------------------------------------------------ ``` -Now build our application: +Now we build our application. This should take closer to 10 minutes: ```bash -$ inv assemble_project +$ inv build_project +... ``` -Submit your application to Spark: - -Open a new terminal window. Start a second shell into the vagrant box with: - -Start Apache Spark Streaming system with this command: +Finally we can submit our job to Spark with this command: ```bash $ inv run_project config/config.hocon.sample +... ``` If you have updated any of the configuration options above (e.g. stream name or region), then you will have to update the `config.hocon.sample` file accordingly. +### 4. Monitoring your job + +First review the spooling output of the `run_project` command above - it's very verbose, but if you don't see any Java stack traces in there, then Spark Streaming should be running okay. + +Now head over to your host machine's [localhost:4040] [localhost-4040] and you should see something like this: + +![spark-ui-image][spark-ui-image] + +You can see how our Spark Streaming job _discretizes_ the Kinesis event stream into 2-second-duration "micro-batches", which are each then processed as a discrete Spark job. + +Finally, let's check the data in our DynamoDB table. Make sure you are in the correct AWS region, then click on `my-table` and hit the `Explore Table` button: + +![dynamodb-table-image][dynamodb-table-image] + +For each **BucketStart** and **EventType** pair, we see a **Count**, plus some **CreatedAt** and **UpdatedAt** metadata for debugging purposes. Our bucket size is 1 minute, and we have 5 discrete event types, hence the matrix of rows that we see. + ## Roadmap -* Maybe a Spark Streaming Machine Learning example +* Porting this job to [AWS Lambda] [aws-lambda-example-project] +* Various improvements for the [0.2.0 release] [020-milestone] +* Expanding our analytics-on-write thinking into our new [Icebucket] [icebucket] project ## Copyright and license @@ -154,30 +187,23 @@ limitations under the License. [release-image]: http://img.shields.io/badge/release-0.1.0-blue.svg?style=flat [releases]: https://github.com/snowplow/spark-streaming-example-project/releases -[spark]: http://spark-project.org/ -[wordcount]: https://github.com/twitter/scalding/blob/master/README.md +[blog-post]: http://snowplowanalytics.com/blog/2015/06/10/spark-streaming-example-project-0.1.0-released/ + +[dynamodb-table-image]: /docs/dynamodb-table-image.png?raw=true +[spark-ui-image]: /docs/spark-ui-image.png?raw=true + +[spark-streaming]: https://spark.apache.org/streaming/ +[kinesis]: http://aws.amazon.com/kinesis +[dynamodb]: http://aws.amazon.com/dynamodb [snowplow]: http://snowplowanalytics.com -[data-pipelines-algos]: http://snowplowanalytics.com/services/pipelines.html +[icebucket]: https://github.com/snowplow/icebucket [vagrant-install]: http://docs.vagrantup.com/v2/installation/index.html [virtualbox-install]: https://www.virtualbox.org/wiki/Downloads -[spark-streaming-example-project]: https://github.com/snowplow/spark-streaming-example-project -[scalding-example-project]: https://github.com/snowplow/scalding-example-project - -[issue-1]: https://github.com/snowplow/spark-example-project/issues/1 -[issue-2]: https://github.com/snowplow/spark-example-project/issues/2 -[aws-spark-tutorial]: http://aws.amazon.com/articles/4926593393724923 -[spark-emr-howto]: https://forums.aws.amazon.com/thread.jspa?messageID=458398 - -[emr]: http://aws.amazon.com/elasticmapreduce/ -[hello-txt]: https://github.com/snowplow/spark-example-project/raw/master/data/hello.txt -[emr-client]: http://aws.amazon.com/developertools/2264 - -[elasticity]: https://github.com/rslifka/elasticity -[spark-plug]: https://github.com/ogrodnek/spark-plug -[lemur]: https://github.com/TheClimateCorporation/lemur -[boto]: http://boto.readthedocs.org/en/latest/ref/emr.html +[spark-example-project]: https://github.com/snowplow/spark-example-project +[aws-lambda-example-project]: https://github.com/snowplow/aws-lambda-example-project +[localhost-4040]: http://localhost:4040/ -[data-table]: https://raw.githubusercontent.com/bigsnarfdude/snowplow.github.com/spark-streaming-example-project/assets/img/blog/2015/06/aggregateRecords2.png +[020-milestone]: https://github.com/snowplow/spark-streaming-example-project/milestones/Version%200.2.0 diff --git a/docs/dynamodb-table-image.png b/docs/dynamodb-table-image.png new file mode 100644 index 0000000..e0abf42 Binary files /dev/null and b/docs/dynamodb-table-image.png differ diff --git a/docs/spark-ui-image.png b/docs/spark-ui-image.png new file mode 100644 index 0000000..09d06dc Binary files /dev/null and b/docs/spark-ui-image.png differ diff --git a/tasks.py b/tasks.py index e32aa5a..0cb715e 100644 --- a/tasks.py +++ b/tasks.py @@ -88,7 +88,7 @@ def build_spark(): @task -def assemble_project(): +def build_project(): """ build spark-streaming-example-project and package into "fat jar" ready for spark-submit