diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/LICENSE-2.0.txt b/LICENSE-2.0.txt new file mode 100644 index 0000000..7a4a3ea --- /dev/null +++ b/LICENSE-2.0.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..8eebd03 --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +# Kinesis LZO S3 Sink + +## Introduction + +The Kinesis LZO S3 Sink consumes records from an [Amazon Kinesis][kinesis] stream, compresses them using [splittable LZO][hadoop-lzo], and writes them to S3. + +The records are treated as raw byte arrays. [Elephant Bird's][elephant-bird] `BinaryBlockWriter` class is used to serialize them as a [Protocol Buffers][protobufs] array (so it is clear where one record ends and the next begins) before compressing them. + +The compression process generates both compressed .lzo files and small .lzo.index files. Each index file contain the byte offsets of the LZO blocks in the corresponding compressed file, meaning that the blocks can be processed in parallel. + +## Prerequisites + +You must have `lzop` and `lzop-dev` installed. In Ubuntu, install them like this: + + $ sudo apt-get install lzop liblzo2-dev + +## Building + +Assuming you already have [SBT 0.13.0] [sbt] installed: + + $ git clone git://github.com/snowplow/snowplow.git + $ cd 4-storage/kinesis-lzo-s3-sink + $ sbt compile + +## Usage + +The Kinesis S3 LZO Sink has the following command-line interface: + +``` +snowplow-lzo-s3-sink: Version 0.1.0. Copyright (c) 2014, Snowplow Analytics +Ltd. + +Usage: snowplow-lzo-s3-sink [OPTIONS] + +OPTIONS +--config filename + Configuration file. +``` + +## Running + +Create your own config file: + + $ cp src/main/resources/config.hocon.sample my.conf + +Edit it and update the AWS credentials: + +```js +aws { + access-key: "default" + secret-key: "default" +} +``` + +Next, start the sink, making sure to specify your new config file: + + $ sbt "run --config my.conf" + +## Find out more + +| Technical Docs | Setup Guide | Roadmap & Contributing | +|-----------------------------|-----------------------|--------------------------------------| +| ![i1] [techdocs-image] | ![i2] [setup-image] | ![i3] [roadmap-image] | +| [Technical Docs] [techdocs] | [Setup Guide] [setup] | _coming soon_ | + +## Copyright and license + +Copyright 2014 Snowplow Analytics Ltd. + +Licensed under the [Apache License, Version 2.0] [license] (the "License"); +you may not use this software except in compliance with the License. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +[kinesis]: http://aws.amazon.com/kinesis/ +[snowplow]: http://snowplowanalytics.com +[hadoop-lzo]: https://github.com/twitter/hadoop-lzo +[protobufs]: https://github.com/google/protobuf/ +[s3]: http://aws.amazon.com/s3/ +[sbt]: http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.0/sbt-launch.jar + +[setup]: https://github.com/snowplow/snowplow/wiki/kinesis-lzo-s3-sink-setup +[techdocs]: https://github.com/snowplow/snowplow/wiki/kinesis-lzo-s3-sink + +[techdocs-image]: https://d3i6fms1cm1j0i.cloudfront.net/github/images/techdocs.png +[setup-image]: https://d3i6fms1cm1j0i.cloudfront.net/github/images/setup.png +[roadmap-image]: https://d3i6fms1cm1j0i.cloudfront.net/github/images/roadmap.png +[license]: http://www.apache.org/licenses/LICENSE-2.0 diff --git a/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar b/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar new file mode 100644 index 0000000..137dd4d Binary files /dev/null and b/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar differ diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala new file mode 100644 index 0000000..ca226f6 --- /dev/null +++ b/project/BuildSettings.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + + // SBT +import sbt._ +import Keys._ + +object BuildSettings { + + // Basic settings for our app + lazy val basicSettings = Seq[Setting[_]]( + organization := "com.snowplowanalytics", + version := "0.1.0", + description := "Kinesis LZO sink for S3", + scalaVersion := "2.10.1", + scalacOptions := Seq("-deprecation", "-encoding", "utf8", + "-feature", "-target:jvm-1.7"), + scalacOptions in Test := Seq("-Yrangepos"), + resolvers ++= Dependencies.resolutionRepos + ) + + // Makes our SBT app settings available from within the app + lazy val scalifySettings = Seq(sourceGenerators in Compile <+= (sourceManaged in Compile, version, name, organization) map { (d, v, n, o) => + val file = d / "settings.scala" + IO.write(file, """package com.snowplowanalytics.snowplow.storage.kinesis.s3.generated + |object Settings { + | val organization = "%s" + | val version = "%s" + | val name = "%s" + |} + |""".stripMargin.format(o, v, n)) + Seq(file) + }) + + // sbt-assembly settings for building a fat jar + import sbtassembly.Plugin._ + import AssemblyKeys._ + lazy val sbtAssemblySettings = assemblySettings ++ Seq( + // Executable jarfile + assemblyOption in assembly ~= { _.copy(prependShellScript = Some(defaultShellScript)) }, + // Name it as an executable + jarName in assembly := { s"${name.value}-${version.value}" }, + + excludedJars in assembly <<= (fullClasspath in assembly) map { cp => + val excludes = Set( + "junit-4.8.2.jar", + "jsp-2.1-6.1.14.jar", + "jasper-compiler-5.5.12.jar", + "jsp-api-2.1-6.1.14.jar", + "servlet-api-2.5-6.1.14.jar", + "commons-beanutils-1.7.0.jar", + "hadoop-lzo-0.4.19.jar", + "stax-api-1.0.1.jar", + "commons-collections-3.2.1.jar" + ) + cp filter { jar => excludes(jar.data.getName) } + }, + + mergeStrategy in assembly := { + case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first + case PathList("org", "objectweb", "asm", xs @ _*) => MergeStrategy.first + case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first + case "application.conf" => MergeStrategy.concat + case x => + val oldStrategy = (mergeStrategy in assembly).value + oldStrategy(x) + } + + ) + + lazy val buildSettings = basicSettings ++ scalifySettings ++ sbtAssemblySettings +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 0000000..5b2ccdb --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +import sbt._ + +object Dependencies { + + val resolutionRepos = Seq( + "Snowplow Analytics Maven releases repo" at "http://maven.snplow.com/releases/", + "Snowplow Analytics Maven snapshot repo" at "http://maven.snplow.com/snapshots/", + "Twitter maven repo" at "http://maven.twttr.com/", + // For Scalazon + "BintrayJCenter" at "http://jcenter.bintray.com" + ) + + object V { + // Java + val logging = "1.1.3" + val slf4j = "1.7.6" + val kinesisClient = "1.0.0" + val kinesisConnector = "1.1.1" + val hadoop = "1.2.1" + val elephantbird = "4.5" + // Thrift (test only) + val collectorPayload = "0.0.0" + // Scala + val argot = "1.0.1" + val config = "1.0.2" + val scalaUtil = "0.1.0" + val snowplowCommonEnrich = "0.9.0" + val scalazon = "0.5" + val json4s = "3.2.11" + val scalaz7 = "7.0.0" + // Scala (test only) + val specs2 = "2.2" + val scalazSpecs2 = "0.1.2" + // Scala (compile only) + val commonsLang3 = "3.1" + } + + object Libraries { + // Java + val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j + val kinesisClient = "com.amazonaws" % "amazon-kinesis-client" % V.kinesisClient + val kinesisConnector = "com.amazonaws" % "amazon-kinesis-connector" % V.kinesisConnector + val hadoop = "org.apache.hadoop" % "hadoop-core" % V.hadoop + val elephantbird = "com.twitter.elephantbird" % "elephant-bird-core" % V.elephantbird + // Thrift (test only) + val collectorPayload = "com.snowplowanalytics" % "collector-payload-1" % V.collectorPayload % "test" + // Scala + val argot = "org.clapper" %% "argot" % V.argot + val config = "com.typesafe" % "config" % V.config + val scalazon = "io.github.cloudify" %% "scalazon" % V.scalazon + val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s + val scalaz7 = "org.scalaz" %% "scalaz-core" % V.scalaz7 + // Scala (test only) + val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test" + val scalazSpecs2 = "org.typelevel" %% "scalaz-specs2" % V.scalazSpecs2 % "test" + } +} diff --git a/project/SnowplowS3SinkBuild.scala b/project/SnowplowS3SinkBuild.scala new file mode 100644 index 0000000..a66c885 --- /dev/null +++ b/project/SnowplowS3SinkBuild.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2013-2014 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +import sbt._ +import Keys._ + +object SnowplowKinesisEnrichBuild extends Build { + + import Dependencies._ + import BuildSettings._ + + // Configure prompt to show current project + override lazy val settings = super.settings :+ { + shellPrompt := { s => Project.extract(s).currentProject.id + " > " } + } + + // Define our project, with basic project information and library dependencies + lazy val project = Project("snowplow-lzo-s3-sink", file(".")) + .settings(buildSettings: _*) + .settings( + libraryDependencies ++= Seq( + Libraries.argot, + Libraries.config, + Libraries.scalaz7, + Libraries.specs2, + Libraries.scalazSpecs2, + Libraries.kinesisClient, + Libraries.kinesisConnector, + Libraries.slf4j, + Libraries.hadoop, + Libraries.elephantbird, + Libraries.scalazon, + Libraries.json4sJackson, + Libraries.collectorPayload + ) + ) +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..8ac605a --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.2 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..54c3252 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") diff --git a/src/main/resources/config.hocon.sample b/src/main/resources/config.hocon.sample new file mode 100644 index 0000000..3bc087e --- /dev/null +++ b/src/main/resources/config.hocon.sample @@ -0,0 +1,60 @@ +# Default configuration for kinesis-elasticsearch-sink + +connector { + + # The following are used to authenticate for the Amazon Kinesis sink. + # + # If both are set to 'default', the default provider chain is used + # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) + # + # If both are set to 'iam', use AWS IAM Roles to provision credentials. + # + # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + aws { + access-key: "default" + secret-key: "default" + } + + kinesis { + in { + # Kinesis input stream name + stream-name: "" + + # LATEST: most recent data. + # TRIM_HORIZON: oldest available data. + # Note: This only affects the first run of this application + # on a stream. + initial-position: "TRIM_HORIZON" + } + + out { + # Stream for events for which the storage process fails + stream-name: "s3-raw-sink-errors" + + # Number of shards with which to create the stream if it doesn't already exist + shards: 1 + } + + region: "us-east-1" + + # `app-name` is used for a DynamoDB table to maintain stream state. + app-name: SnowplowElasticsearchSink-${connector.kinesis.in.stream-name} + } + + s3 { + endpoint: "http://s3.amazonaws.com" + bucket: "" # Put s3 Bucket here + } + + # Events are accumulated in a buffer before being sent to S3. + # The buffer is emptied whenever: + # - the combined size of the stored records exceeds byte-limit or + # - the number of stored records exceeds record-limit or + # - the time in milliseconds since it was last emptied exceeds time-limit + buffer { + byte-limit: 5242880 # 5MB + record-limit: 1000 # 1K records + time-limit: 60000 # 1 minute + } + +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/package.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/package.scala new file mode 100644 index 0000000..8eec5d0 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/package.scala @@ -0,0 +1,39 @@ + /* + * Copyright (c) 2015 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.storage.kinesis + +// Scalaz +import scalaz._ +import Scalaz._ + +package object s3 { + + /** + * Tuple containing: + * - the original Kinesis record, base 64 encoded + * - a validated SnowplowRawEvent created from it + */ + type ValidatedRecord = (String, Validation[List[String], Array[Byte]]) + + /** + * Currently the same as ValidatedRecord, but could change in the future + */ + type EmitterInput = (String, Validation[List[String], Array[Byte]]) +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/CredentialsLookup.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/CredentialsLookup.scala new file mode 100644 index 0000000..1279268 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/CredentialsLookup.scala @@ -0,0 +1,92 @@ + /* + * Copyright (c) 2014 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3 + +// Amazon +import com.amazonaws.auth._ + +/** + * Gets AWS credentials based on configuration YAML + */ +// TODO: extract this functionality into a common library +object CredentialsLookup { + + /** + * Returns AWS credentials based on access key and secret key + * + * @param a Access key + * @param s Secret key + * @return An AWSCredentialsProvider + */ + def getCredentialsProvider(a: String, s: String): AWSCredentialsProvider = { + if (isDefault(a) && isDefault(s)) { + new DefaultAWSCredentialsProviderChain() + } else if (isDefault(a) || isDefault(s)) { + throw new RuntimeException( + "access-key and secret-key must both be set to 'default', or neither" + ) + } else if (isIam(a) && isIam(s)) { + new InstanceProfileCredentialsProvider() + } else if (isIam(a) || isIam(s)) { + throw new RuntimeException("access-key and secret-key must both be set to 'iam', or neither") + } else if (isEnv(a) && isEnv(s)) { + new EnvironmentVariableCredentialsProvider() + } else if (isEnv(a) || isEnv(s)) { + throw new RuntimeException("access-key and secret-key must both be set to 'env', or neither") + } else { + new BasicAWSCredentialsProvider( + new BasicAWSCredentials(a, s) + ) + } + } + + /** + * Is the access/secret key set to the special value "default" i.e. use + * the standard provider chain for credentials. + * + * @param key The key to check + * @return true if key is default, false otherwise + */ + private def isDefault(key: String): Boolean = (key == "default") + + /** + * Is the access/secret key set to the special value "iam" i.e. use + * the IAM role to get credentials. + * + * @param key The key to check + * @return true if key is iam, false otherwise + */ + private def isIam(key: String): Boolean = (key == "iam") + + /** + * Is the access/secret key set to the special value "env" i.e. get + * the credentials from environment variables + * + * @param key The key to check + * @return true if key is iam, false otherwise + */ + private def isEnv(key: String): Boolean = (key == "env") + + // Wrap BasicAWSCredential objects. + class BasicAWSCredentialsProvider(basic: BasicAWSCredentials) extends + AWSCredentialsProvider{ + @Override def getCredentials: AWSCredentials = basic + @Override def refresh = {} + } +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/LzoSerializer.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/LzoSerializer.scala new file mode 100644 index 0000000..d993d5e --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/LzoSerializer.scala @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3 + +import scala.collection.JavaConverters._ + +// Java libs +import java.io.{ + OutputStream, + DataOutputStream, + ByteArrayInputStream, + ByteArrayOutputStream, + IOException +} +import java.util.Calendar +import java.text.SimpleDateFormat + +// Java lzo +import org.apache.hadoop.conf.Configuration +import com.hadoop.compression.lzo.LzopCodec + +// Elephant bird +import com.twitter.elephantbird.mapreduce.io.RawBlockWriter + +// Scalaz +import scalaz._ +import Scalaz._ + +// Logging +import org.apache.commons.logging.LogFactory + +// AWS libs +import com.amazonaws.AmazonServiceException +import com.amazonaws.services.s3.AmazonS3Client +import com.amazonaws.services.s3.model.ObjectMetadata + +// AWS Kinesis connector libs +import com.amazonaws.services.kinesis.connectors.{ + UnmodifiableBuffer, + KinesisConnectorConfiguration +} +import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter + +/** + * Object to handle LZO compression of raw events + */ +object LzoSerializer { + + val log = LogFactory.getLog(getClass) + + val lzoCodec = new LzopCodec() + val conf = new Configuration() + conf.set("io.compression.codecs", classOf[LzopCodec].getName) + lzoCodec.setConf(conf) + + /** + * Compress a list of Snowplow events + * + * @param records List of deserialized records + * @return Tuple4 containing: the output stream for the .lzo file + * the output stream for the .lzo.index file + * the compression codec + * the list of events + */ + def serialize(records: List[ EmitterInput ]): (ByteArrayOutputStream, ByteArrayOutputStream, LzopCodec, List[EmitterInput]) = { + + val indexOutputStream = new ByteArrayOutputStream() + val outputStream = new ByteArrayOutputStream() + + // This writes to the underlying outputstream and indexoutput stream + val lzoOutputStream = lzoCodec.createIndexedOutputStream(outputStream, new DataOutputStream(indexOutputStream)) + + val rawBlockWriter = new RawBlockWriter(lzoOutputStream) + + // Populate the output stream with records + val results = records.map({ record => try { + (record._1, record._2.map(r => { + rawBlockWriter.write(r) + r + })) + } catch { + case e: IOException => { + log.warn(e) + (record._1, List("Error writing raw event to output stream: [%s]".format(e.toString)).fail) + } + } + }) + + rawBlockWriter.close + + (outputStream, indexOutputStream, lzoCodec, results) + } +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/RawEventTransformer.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/RawEventTransformer.scala new file mode 100644 index 0000000..f9c45bd --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/RawEventTransformer.scala @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3 + +// AWS libs +import com.amazonaws.services.kinesis.model.Record + +// AWS Kinesis Connector libs +import com.amazonaws.services.kinesis.connectors.interfaces.ITransformer + +// Thrift libs +import org.apache.thrift.{TSerializer,TDeserializer} + +// Apache commons +import org.apache.commons.codec.binary.Base64 + +// Scalaz +import scalaz._ +import Scalaz._ + +/** + * Thrift serializer/deserializer class + */ +class RawEventTransformer extends ITransformer[ ValidatedRecord, EmitterInput ] { + lazy val serializer = new TSerializer() + lazy val deserializer = new TDeserializer() + + override def toClass(record: Record): ValidatedRecord = { + val recordByteArray = record.getData.array + (new String(Base64.encodeBase64(recordByteArray)), recordByteArray.success) + } + + override def fromClass(record: EmitterInput) = record +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Emitter.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Emitter.scala new file mode 100644 index 0000000..0315601 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Emitter.scala @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3 + +import scala.collection.JavaConverters._ + +// Java libs +import java.io.{ + OutputStream, + DataOutputStream, + ByteArrayInputStream, + ByteArrayOutputStream, + IOException +} +import java.util.Calendar +import java.text.SimpleDateFormat + +// Java lzo +import org.apache.hadoop.conf.Configuration +import com.hadoop.compression.lzo.LzopCodec + +// Elephant bird +import com.twitter.elephantbird.mapreduce.io.{ + ThriftBlockWriter +} + +// Logging +import org.apache.commons.logging.{Log,LogFactory} + +// AWS libs +import com.amazonaws.AmazonServiceException +import com.amazonaws.services.s3.AmazonS3Client +import com.amazonaws.services.s3.model.ObjectMetadata + +// AWS Kinesis connector libs +import com.amazonaws.services.kinesis.connectors.{ + UnmodifiableBuffer, + KinesisConnectorConfiguration +} +import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter + +// Scala +import scala.collection.JavaConversions._ +import scala.annotation.tailrec + +// Scalaz +import scalaz._ +import Scalaz._ + +// json4s +import org.json4s._ +import org.json4s.jackson.JsonMethods._ +import org.json4s.JsonDSL._ + +// This project +import sinks._ + +/** + * Emitter for flushing Kinesis event data to S3. + * + * Once the buffer is full, the emit function is called. + */ +class S3Emitter(config: KinesisConnectorConfiguration, badSink: ISink) extends IEmitter[ EmitterInput ] { + + /** + * The amount of time to wait in between unsuccessful index requests (in milliseconds). + * 10 seconds = 10 * 1000 = 10000 + */ + private val BackoffPeriod = 10000 + + val bucket = config.S3_BUCKET + val log = LogFactory.getLog(classOf[S3Emitter]) + val client = new AmazonS3Client(config.AWS_CREDENTIALS_PROVIDER) + client.setEndpoint(config.S3_ENDPOINT) + + val lzoCodec = new LzopCodec() + val conf = new Configuration() + conf.set("io.compression.codecs", classOf[LzopCodec].getName) + lzoCodec.setConf(conf) + + val dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + + /** + * Determines the filename in S3, which is the corresponding + * Kinesis sequence range of records in the file. + */ + protected def getFileName(firstSeq: String, lastSeq: String, lzoCodec: LzopCodec): String = { + dateFormat.format(Calendar.getInstance().getTime()) + + "-" + firstSeq + "-" + lastSeq + lzoCodec.getDefaultExtension() + } + + /** + * Reads items from a buffer and saves them to s3. + * + * This method is expected to return a List of items that + * failed to be written out to S3, which will be sent to + * a Kinesis stream for bad events. + * + * @param buffer BasicMemoryBuffer containing EmitterInputs + * @return list of inputs which failed transformation + */ + override def emit(buffer: UnmodifiableBuffer[ EmitterInput ]): java.util.List[ EmitterInput ] = { + + log.info(s"Flushing buffer with ${buffer.getRecords.size} records.") + + val records = buffer.getRecords().asScala.toList + + val (outputStream, indexOutputStream, lzoCodec, results) = LzoSerializer.serialize(records) + + val filename = getFileName(buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, lzoCodec) + val indexFilename = filename + ".index" + + val obj = new ByteArrayInputStream(outputStream.toByteArray) + val indexObj = new ByteArrayInputStream(indexOutputStream.toByteArray) + + val objMeta = new ObjectMetadata() + val indexObjMeta = new ObjectMetadata() + + objMeta.setContentLength(outputStream.size) + indexObjMeta.setContentLength(indexOutputStream.size) + + val (successes, failures) = results.partition(_._2.isSuccess) + + log.info(s"Successfully serialized ${successes.size} records out of ${successes.size + failures.size}") + + /** + * Keep attempting to send the data to S3 until it succeeds + * + * @return list of inputs which failed to be sent to S3 + */ + @tailrec + def attemptEmit(): List[EmitterInput] = { + try { + client.putObject(bucket, filename, obj, objMeta) + client.putObject(bucket, indexFilename, indexObj, indexObjMeta) + log.info(s"Successfully emitted ${successes.size} records to S3 in s3://${bucket}/${filename} with index $indexFilename") + + // Return the failed records + failures + } catch { + // Retry on failure + case ase: AmazonServiceException => { + log.error("S3 could not process the request", ase) + sleep(BackoffPeriod) + attemptEmit() + } + case e: Throwable => { + log.error("S3Emitter threw an unexpected exception", e) + sleep(BackoffPeriod) + attemptEmit() + } + } + } + + if (successes.size > 0) { + attemptEmit() + } else { + failures + } + } + + /** + * Closes the client when the KinesisConnectorRecordProcessor is shut down + */ + override def shutdown() { + client.shutdown + } + + /** + * Sends records which fail deserialization or compression + * to Kinesis with an error message + * + * @param records List of failed records to send to Kinesis + */ + override def fail(records: java.util.List[ EmitterInput ]) { + records.asScala.foreach { record => + log.warn(s"Record failed: $record") + log.info("Sending failed record to Kinesis") + val output = compact(render(("line" -> record._1) ~ ("errors" -> record._2.swap.getOrElse(Nil)))) + badSink.store(output, Some("key"), false) + } + } + + /** + * Period between retrying sending events to S3 + * + * @param sleepTime Length of time between tries + */ + private def sleep(sleepTime: Long): Unit = { + try { + Thread.sleep(sleepTime) + } catch { + case e: InterruptedException => () + } + } + +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Pipeline.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Pipeline.scala new file mode 100644 index 0000000..27622c3 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Pipeline.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3 + +// AWS Kinesis Connector libs +import com.amazonaws.services.kinesis.connectors.interfaces.{ + IEmitter, + IBuffer, + ITransformer, + IFilter, + IKinesisConnectorPipeline +} +import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration +import com.amazonaws.services.kinesis.connectors.impl.{BasicMemoryBuffer,AllPassFilter} + +// This project +import sinks._ + +/** + * S3Pipeline class sets up the Emitter/Buffer/Transformer/Filter + */ +class S3Pipeline(badSink: ISink) extends IKinesisConnectorPipeline[ ValidatedRecord, EmitterInput ] { + + override def getEmitter(configuration: KinesisConnectorConfiguration) = new S3Emitter(configuration, badSink) + + override def getBuffer(configuration: KinesisConnectorConfiguration) = new BasicMemoryBuffer[ValidatedRecord](configuration) + + override def getTransformer(c: KinesisConnectorConfiguration) = new RawEventTransformer() + + override def getFilter(c: KinesisConnectorConfiguration) = new AllPassFilter[ValidatedRecord]() + +} + diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3SinkExecutor.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3SinkExecutor.scala new file mode 100644 index 0000000..e1ac887 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3SinkExecutor.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3 + +// AWS Kinesis Connector libs +import com.amazonaws.services.kinesis.connectors.{ + KinesisConnectorConfiguration, + KinesisConnectorExecutorBase, + KinesisConnectorRecordProcessorFactory +} + +// This project +import sinks._ + +/** + * Boilerplate class for Kinessis Conenector + */ +class S3SinkExecutor(config: KinesisConnectorConfiguration, badSink: ISink) extends KinesisConnectorExecutorBase[ ValidatedRecord, EmitterInput ] { + super.initialize(config) + + override def getKinesisConnectorRecordProcessorFactory = { + new KinesisConnectorRecordProcessorFactory[ ValidatedRecord, EmitterInput ](new S3Pipeline(badSink), config) + } + +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/SinkApp.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/SinkApp.scala new file mode 100644 index 0000000..7d20a15 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/SinkApp.scala @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3 + +// Java +import java.io.File +import java.util.Properties + +// Argot +import org.clapper.argot._ + +// Config +import com.typesafe.config.{Config, ConfigFactory} + +// AWS libs +import com.amazonaws.auth.AWSCredentialsProvider + +// AWS Kinesis Connector libs +import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration + +// This project +import sinks._ + +/** + * The entrypoint class for the Kinesis-S3 Sink applciation. + */ +object SinkApp extends App { + + // Argument specifications + import ArgotConverters._ + + // General bumf for our app + val parser = new ArgotParser( + programName = "generated", + compactUsage = true, + preUsage = Some("%s: Version %s. Copyright (c) 2013, %s.".format( + generated.Settings.name, + generated.Settings.version, + generated.Settings.organization) + ) + ) + + // Optional config argument + val config = parser.option[Config](List("config"), + "filename", + "Configuration file.") { + (c, opt) => + + val file = new File(c) + if (file.exists) { + ConfigFactory.parseFile(file) + } else { + parser.usage("Configuration file \"%s\" does not exist".format(c)) + ConfigFactory.empty() + } + } + parser.parse(args) + + val conf = config.value.getOrElse(throw new RuntimeException("--config argument must be provided")) + + // TODO: make the conf file more like the Elasticsearch equivalent + val kinesisSinkRegion = conf.getConfig("connector").getConfig("kinesis").getString("region") + val kinesisSinkEndpoint = s"https://kinesis.${kinesisSinkRegion}.amazonaws.com" + val kinesisSink = conf.getConfig("connector").getConfig("kinesis").getConfig("out") + val kinesisSinkName = kinesisSink.getString("stream-name") + val kinesisSinkShards = kinesisSink.getInt("shards") + + val credentialConfig = conf.getConfig("connector").getConfig("aws") + + val credentials = CredentialsLookup.getCredentialsProvider(credentialConfig.getString("access-key"), credentialConfig.getString("secret-key")) + + val badSink = new KinesisSink(credentials, kinesisSinkEndpoint, kinesisSinkName, kinesisSinkShards) + + val executor = new S3SinkExecutor(convertConfig(conf, credentials), badSink) + executor.run() + + /** + * This function converts the config file into the format + * expected by the Kinesis connector interfaces. + * + * @param connector The configuration HOCON + * @return A KinesisConnectorConfiguration + */ + def convertConfig(conf: Config, credentials: AWSCredentialsProvider): KinesisConnectorConfiguration = { + val props = new Properties() + val connector = conf.resolve.getConfig("connector") + + val kinesis = connector.getConfig("kinesis") + val kinesisIn = kinesis.getConfig("in") + val kinesisRegion = kinesis.getString("region") + val kEndpoint = s"https://kinesis.${kinesisSinkRegion}.amazonaws.com" + val streamName = kinesisIn.getString("stream-name") + val initialPosition = kinesisIn.getString("initial-position") + val appName = kinesis.getString("app-name") + + val s3 = connector.getConfig("s3") + val s3Endpoint = s3.getString("endpoint") + val bucket = s3.getString("bucket") + + val buffer = connector.getConfig("buffer") + val byteLimit = buffer.getString("byte-limit") + val recordLimit = buffer.getString("record-limit") + val timeLimit = buffer.getString("time-limit") + + props.setProperty(KinesisConnectorConfiguration.PROP_KINESIS_INPUT_STREAM, streamName) + props.setProperty(KinesisConnectorConfiguration.PROP_KINESIS_ENDPOINT, kEndpoint) + props.setProperty(KinesisConnectorConfiguration.PROP_APP_NAME, appName) + props.setProperty(KinesisConnectorConfiguration.PROP_INITIAL_POSITION_IN_STREAM, initialPosition) + + props.setProperty(KinesisConnectorConfiguration.PROP_S3_ENDPOINT, s3Endpoint) + props.setProperty(KinesisConnectorConfiguration.PROP_S3_BUCKET, bucket) + + props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_BYTE_SIZE_LIMIT, byteLimit) + props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_RECORD_COUNT_LIMIT, recordLimit) + props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_MILLISECONDS_LIMIT, timeLimit) + + props.setProperty(KinesisConnectorConfiguration.PROP_CONNECTOR_DESTINATION, "s3") + + // The emit method retries sending to S3 indefinitely, so it only needs to be called once + props.setProperty(KinesisConnectorConfiguration.PROP_RETRY_LIMIT, "1") + + new KinesisConnectorConfiguration(props, credentials) + } + +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/ISink.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/ISink.scala new file mode 100644 index 0000000..50a77b1 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/ISink.scala @@ -0,0 +1,26 @@ + /* + * Copyright (c) 2014 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.kinesis.s3.sinks + +/** + * Shared interface for all sinks + */ +trait ISink { + def store(output: String, key: Option[String], good: Boolean) +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/KinesisSink.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/KinesisSink.scala new file mode 100644 index 0000000..eb671c1 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/KinesisSink.scala @@ -0,0 +1,173 @@ + /* + * Copyright (c) 2014 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.storage.kinesis.s3.sinks + +// Java +import java.nio.ByteBuffer + +// Scala +import scala.util.Random + +// Amazon +import com.amazonaws.services.kinesis.model.ResourceNotFoundException +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.AmazonKinesis +import com.amazonaws.regions._ + +// Scalazon (for Kinesis interaction) +import io.github.cloudify.scala.aws.kinesis.Client +import io.github.cloudify.scala.aws.kinesis.Client.ImplicitExecution._ +import io.github.cloudify.scala.aws.kinesis.Definitions.{ + Stream, + PutResult, + Record +} +import io.github.cloudify.scala.aws.kinesis.KinesisDsl._ + +// Concurrent libraries +import scala.concurrent.{Future,Await,TimeoutException} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Success, Failure} + +// Logging +import org.slf4j.LoggerFactory + +/** + * Kinesis Sink + * + * @param provider AWSCredentialsProvider + * @param endpoint Kinesis stream endpoint + * @param name Kinesis stream name + * @param shards Number of shards with which to initialize the stream + * @param config Configuration for the Kinesis stream + */ +class KinesisSink(provider: AWSCredentialsProvider, endpoint: String, name: String, shards: Int) + extends ISink { + + private lazy val log = LoggerFactory.getLogger(getClass()) + import log.{error, debug, info, trace} + + // Explicitly create a client so we can configure the end point + val client = new AmazonKinesisClient(provider) + client.setEndpoint(endpoint) + + // Create a Kinesis client for stream interactions. + private implicit val kinesis = Client.fromClient(client) + + // The output stream for enriched events. + // Lazy so that it doesn't get created unless we need to write to it. + private lazy val enrichedStream = createAndLoadStream() + + /** + * Checks if a stream exists. + * + * @param name Name of the stream to look for + * @param timeout How long to wait for a description of the stream + * @return Whether the stream both exists and is active + */ + // TODO move out into a kinesis helpers library + def streamExists(name: String, timeout: Int = 60): Boolean = { + + val exists: Boolean = try { + val streamDescribeFuture = for { + s <- Kinesis.stream(name).describe + } yield s + + val description = Await.result(streamDescribeFuture, Duration(timeout, SECONDS)) + description.isActive + + } catch { + case rnfe: ResourceNotFoundException => false + } + + if (exists) { + info(s"Stream $name exists and is active") + } else { + info(s"Stream $name doesn't exist or is not active") + } + + exists + } + + /** + * Creates a new stream if one doesn't exist + * + * @param How long to wait for the stream to be created + * @return The new stream + */ + // TODO move out into a kinesis helpers library + def createAndLoadStream(timeout: Int = 60): Stream = { + + if (streamExists(name)) { + Kinesis.stream(name) + } else { + info(s"Creating stream $name of size $shards") + val createStream = for { + s <- Kinesis.streams.create(name) + } yield s + + try { + val stream = Await.result(createStream, Duration(timeout, SECONDS)) + + info(s"Successfully created stream $name. Waiting until it's active") + Await.result(stream.waitActive.retrying(timeout), + Duration(timeout, SECONDS)) + + info(s"Stream $name active") + + stream + } catch { + case _: TimeoutException => + throw new RuntimeException("Error: Timed out") + } + } + } + + /** + * Write a record to the Kinesis stream + * + * @param output The string record to write + * @param key A hash of the key determines to which shard the + * record is assigned. Defaults to a random string. + * @param good Unused parameter which exists to extend ISink + */ + def store(output: String, key: Option[String], good: Boolean) { + val putData = for { + p <- enrichedStream.put( + ByteBuffer.wrap(output.getBytes), + key.getOrElse(Random.nextInt.toString) + ) + } yield p + + putData onComplete { + case Success(result) => { + info(s"Writing successful") + info(s" + ShardId: ${result.shardId}") + info(s" + SequenceNumber: ${result.sequenceNumber}") + } + case Failure(f) => { + error(s"Writing failed.") + error(s" + " + f.getMessage) + } + } + } +} diff --git a/src/test/scala/com.snowplowanalytics.snowplow.storage.kinesis.s3/LzoSerializerSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.storage.kinesis.s3/LzoSerializerSpec.scala new file mode 100644 index 0000000..e17f3ea --- /dev/null +++ b/src/test/scala/com.snowplowanalytics.snowplow.storage.kinesis.s3/LzoSerializerSpec.scala @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow +package storage.kinesis.s3 + +// Java +import java.util.Properties +import java.io.{ + File, + FileInputStream, + FileOutputStream, + BufferedInputStream +} + +// AWS libs +import com.amazonaws.auth.EnvironmentVariableCredentialsProvider + +// AWS Kinesis Connector libs +import com.amazonaws.services.kinesis.connectors.{ + KinesisConnectorConfiguration, + UnmodifiableBuffer +} +import com.amazonaws.services.kinesis.connectors.impl.BasicMemoryBuffer + +// Elephant Bird +import com.twitter.elephantbird.mapreduce.io.RawBlockReader + +// Apache Thrift +import org.apache.thrift.{ + TSerializer, + TDeserializer +} + +// Scalaz +import scalaz._ +import Scalaz._ + +// Scala +import scala.sys.process._ +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +// Snowplow +import com.snowplowanalytics.snowplow.CollectorPayload.thrift.v1.CollectorPayload + +// This project +import sinks._ + +// Specs2 +import org.specs2.mutable.Specification +import org.specs2.scalaz.ValidationMatchers + +/** + * Tests serialization and LZO compression of CollectorPayloads + */ +class LzoSerializerSpec extends Specification with ValidationMatchers { + + "The LzoSerializer" should { + "correctly serialize and compress a list of CollectorPayloads" in { + + val serializer = new TSerializer + val deserializer = new TDeserializer + + val decompressedFilename = "/tmp/kinesis-s3-sink-test" + + val compressedFilename = decompressedFilename + ".lzo" + + def cleanup() = List(compressedFilename, decompressedFilename).foreach(new File(_).delete()) + + cleanup() + + val inputEvents = List( + ("raw1", new CollectorPayload("A", "B", "C", 1000, "a", "b").success), + ("raw2", new CollectorPayload("X", "Y", "Z", 2000, "x", "y").success)) + + val binaryInputs = inputEvents.map(e => (e._1, e._2.map(x => serializer.serialize(x)))) + + val lzoOutput = LzoSerializer.serialize(binaryInputs)._1 + + lzoOutput.writeTo(new FileOutputStream(compressedFilename)) + + s"lzop -d $compressedFilename" !! + + val input = new BufferedInputStream(new FileInputStream(decompressedFilename)) + val reader = new RawBlockReader(input) + + cleanup() + + inputEvents map {e => { + val rawResult = reader.readNext() + val target = new CollectorPayload + deserializer.deserialize(target, rawResult) + + target.success must_== e._2 + } + } + } + } +}