From 8bda8e71c78c247ca615734071d0d12d2bcbe69c Mon Sep 17 00:00:00 2001 From: Ago Allikmaa Date: Wed, 29 May 2024 09:43:14 +0300 Subject: [PATCH] Initial beta1 commit --- .github/dependabot.yml | 10 + .github/workflows/ci.yaml | 30 + .github/workflows/pr.yaml | 33 + .gitignore | 45 ++ .gitlab-ci.yml | 51 ++ .mvn/wrapper/MavenWrapperDownloader.java | 117 +++ .mvn/wrapper/maven-wrapper.properties | 2 + CODEOWNERS | 3 + LICENSE | 177 +++++ README.md | 49 ++ deploy.sh | 75 ++ mvnw | 310 ++++++++ mvnw.cmd | 182 +++++ pom.xml | 72 ++ pubkey.gpg | 52 ++ signalflow-client/pom.xml | 245 +++++++ .../signalfx/signalflow/client/Channel.java | 93 +++ .../signalflow/client/ChannelMessage.java | 445 ++++++++++++ .../signalflow/client/Computation.java | 337 +++++++++ .../client/ComputationAbortedException.java | 37 + .../client/ComputationFailedException.java | 27 + .../signalflow/client/ComputationHandler.java | 189 +++++ .../client/ServerSentEventsTransport.java | 555 +++++++++++++++ .../signalflow/client/SignalFlowClient.java | 250 +++++++ .../client/SignalFlowException.java | 42 ++ .../client/SignalFlowTransport.java | 91 +++ .../signalflow/client/StreamMessage.java | 132 ++++ .../client/StreamRequestException.java | 30 + .../signalflow/client/WebSocketTransport.java | 666 ++++++++++++++++++ .../AbstractHttpReceiverConnection.java | 161 +++++ .../client/connection/RetryDefaults.java | 20 + .../client/connection/RetryHandler.java | 30 + .../client/connection/RetryStrategy.java | 25 + .../client/endpoint/SignalFxEndpoint.java | 88 +++ .../endpoint/SignalFxReceiverEndpoint.java | 13 + .../client/SignalFlowClientTest.java | 57 ++ .../connection/RetryStrategyTest.java | 101 +++ update_version.py | 79 +++ 38 files changed, 4921 insertions(+) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/ci.yaml create mode 100644 .github/workflows/pr.yaml create mode 100644 .gitignore create mode 100644 .gitlab-ci.yml create mode 100644 .mvn/wrapper/MavenWrapperDownloader.java create mode 100644 .mvn/wrapper/maven-wrapper.properties create mode 100644 CODEOWNERS create mode 100644 LICENSE create mode 100755 deploy.sh create mode 100755 mvnw create mode 100644 mvnw.cmd create mode 100644 pom.xml create mode 100644 pubkey.gpg create mode 100644 signalflow-client/pom.xml create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/Channel.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/ChannelMessage.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/Computation.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationAbortedException.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationFailedException.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationHandler.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/ServerSentEventsTransport.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowClient.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowException.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowTransport.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamMessage.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamRequestException.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/WebSocketTransport.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryDefaults.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryHandler.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryStrategy.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxEndpoint.java create mode 100644 signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxReceiverEndpoint.java create mode 100644 signalflow-client/src/test/java/com/signalfx/signalflow/client/SignalFlowClientTest.java create mode 100644 signalflow-client/src/test/java/com/signalfx/signalflow/connection/RetryStrategyTest.java create mode 100755 update_version.py diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..7ad41a3 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" + - package-ecosystem: "maven" + directory: "/" + schedule: + interval: "daily" \ No newline at end of file diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..aa228bc --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,30 @@ +name: PR build + +concurrency: + group: pr-${{ github.event.pull_request.number }} + cancel-in-progress: true + +on: + pull_request: + +jobs: + build: + runs-on: ubuntu-20.04 + strategy: + matrix: + java-version: [ 8.0.352+8, 11.0.17+8 ] + fail-fast: false + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java-version }} + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: ${{ matrix.java-version }} + + - name: Build + run: ./mvnw clean package -DskipTests=true + + - name: Test + run: ./mvnw verify diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml new file mode 100644 index 0000000..941bedd --- /dev/null +++ b/.github/workflows/pr.yaml @@ -0,0 +1,33 @@ +name: CI build + +concurrency: + group: ci + cancel-in-progress: true + +on: + workflow_dispatch: + push: + branches: + - main + +jobs: + build: + runs-on: ubuntu-20.04 + strategy: + matrix: + java-version: [ 8.0.352+8, 11.0.17+8 ] + fail-fast: false + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java-version }} + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: ${{ matrix.java-version }} + + - name: Build + run: ./mvnw clean package -DskipTests=true + + - name: Test + run: ./mvnw verify diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6e20929 --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +*.class + +# phabricator +.arcconfig + +# Package Files # +*.jar +*.war +*.ear + +# Python compiled files +*.pyc + +# Maven build directory/artifacts +target +dependency-reduced-pom.xml + +# Eclipse project files +.classpath +.project +.settings +.pydevproject +.metadata +.cache + +# cleansvn files +.cleanmvn.pickle +cleanmvn.dot + +# IntelliJ +*.iml +.idea +/analytics/server/log/ +*.versionsBackup + +# OS generated files # +###################### +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +Icon? +ehthumbs.db +Thumbs.db diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..b3c1957 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,51 @@ +include: + - project: 'prodsec/scp-scanning/gitlab-checkmarx' + ref: latest + file: '/templates/.sast_scan.yml' + - project: 'ci-cd/templates' + ref: master + file: '/prodsec/.oss-scan.yml' + +image: + name: "docker-hub.repo.splunkdev.net/openjdk:11.0.11-9-jdk" + +stages: + - build + - verify + - release + +build: + stage: build + script: + - ./mvnw clean package -DskipTests=true + - ./mvnw verify + +sast-scan: + stage: verify + rules: + - if: '$CI_COMMIT_REF_NAME == "main"' + extends: .sast_scan + variables: + SAST_SCANNER: "Semgrep" + # Fail build on high severity security vulnerabilities + alert_mode: "policy" + +oss-scan: + stage: verify + rules: + - if: '$CI_COMMIT_REF_NAME == "main"' + extends: .oss-scan + +snapshot: + stage: release + rules: + - if: '$CI_COMMIT_REF_NAME == "main"' + script: + - ./deploy.sh snapshot + +release: + stage: release + rules: + - if: '$CI_COMMIT_TAG =~ /^v[0-9]+\.[0-9]+\.[0-9]+.*/' + script: + - ./deploy.sh release diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 0000000..b901097 --- /dev/null +++ b/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,117 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..ffdc10e --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.1/apache-maven-3.8.1-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 0000000..21f1c86 --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1,3 @@ +* @signalfx/gdi-java-maintainers @signalfx/gdi-java-approvers + +CODEOWNERS @signalfx/gdi-java-maintainers \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f433b1a --- /dev/null +++ b/LICENSE @@ -0,0 +1,177 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS diff --git a/README.md b/README.md index 0ecd5ff..d1693ff 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,52 @@ This is a client for [SignalFlow](https://dev.splunk.com/observability/docs/signalflow) that lets you stream and analyze metric data in real-time for your organization. + + +## Executing SignalFlow computations + +SignalFlow is SignalFx's real-time analytics computation language. The +SignalFlow API allows SignalFx users to execute real-time streaming analytics +computations on the SignalFx platform. For more information, head over to our +Developers documentation: + +* [SignalFlow Overview](https://dev.splunk.com/observability/docs/signalflow/) +* [SignalFlow API Reference](https://dev.splunk.com/observability/reference/api/signalflow/latest) + +Executing a SignalFlow program is very simple with this client library: + +```java +String program = "data('cpu.utilization').mean().publish()"; +SignalFlowClient flow = new SignalFlowClient("MY_TOKEN"); +System.out.println("Executing " + program); +Computation computation = flow.execute(program); +for (ChannelMessage message : computation) { + switch (message.getType()) { + case DATA_MESSAGE: + DataMessage dataMessage = (DataMessage) message; + System.out.printf("%d: %s%n", + dataMessage.getLogicalTimestampMs(), dataMessage.getData()); + break; + + case EVENT_MESSAGE: + EventMessage eventMessage = (EventMessage) message; + System.out.printf("%d: %s%n", + eventMessage.getTimestampMs(), + eventMessage.getProperties()); + break; + } +} +``` + +Metadata about the timeseries is received from the iterable stream, and it +is also automatically intercepted by the client library and made available through +the ``Computation`` object returned by ``execute()``: + +```java +case DATA_MESSAGE: + DataMessage dataMessage = (DataMessage) message; + for (Map datum : dataMessage.getData()) { + Map metadata = computation.getMetadata(datum.getKey()); + // ... + } +``` diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..6fd071b --- /dev/null +++ b/deploy.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +set -e + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd ${SCRIPT_DIR} + +print_usage() { + echo "Usage: ./$(basename $0) [snapshot|release]" +} + +if [[ $# < 1 ]] +then + print_usage + exit 1 +fi + +case "$1" in + snapshot) + if (! grep SNAPSHOT pom.xml > /dev/null) + then + echo "Non-SNAPSHOT release found, skipping" + exit 0 + fi + ;; + + release) + if (grep SNAPSHOT pom.xml > /dev/null) + then + echo "You can't release a SNAPSHOT artifact!" + exit 1 + fi + ;; + + *) + print_usage + exit 1 + ;; +esac + +echo ">>> Setting GnuPG configuration ..." +mkdir -p ~/.gnupg +chmod 700 ~/.gnupg +cat > ~/.gnupg/gpg.conf <>> Importing secret key ..." +gpg --batch --allow-secret-key-import --import "${GPG_SECRET_KEY}" + +echo ">>> Building settings.xml ..." +cat > release-settings.xml < + + + ossrh + ${SONATYPE_USERNAME} + ${SONATYPE_PASSWORD} + + + + + gpg + + ${GPG_PASSWORD} + + + + +EOF +trap "rm release-settings.xml" EXIT INT KILL STOP TERM + +echo ">>> Running maven ..." +./mvnw -s release-settings.xml clean deploy -P release-sign-artifacts,gpg diff --git a/mvnw b/mvnw new file mode 100755 index 0000000..41c0f0c --- /dev/null +++ b/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/mvnw.cmd b/mvnw.cmd new file mode 100644 index 0000000..8611571 --- /dev/null +++ b/mvnw.cmd @@ -0,0 +1,182 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..6238a31 --- /dev/null +++ b/pom.xml @@ -0,0 +1,72 @@ + + 4.0.0 + + com.signalfx.public + signalfx-signalflow-parent + SignalFx SignalFlow parent + + 1.0.0-alpha + pom + + + SignalFx SignalFlow root module + + + + true + true + + + http://www.signalfx.com + + + + splunk + Splunk Instrumentation Authors + support+java@signalfx.com + Splunk + https://www.splunk.com + + + + + + release-sign-artifacts + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.13 + true + + ossrh + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.2.4 + + + sign-artifacts + verify + + sign + + + + + + + + + + + signalflow-client + + diff --git a/pubkey.gpg b/pubkey.gpg new file mode 100644 index 0000000..784b0da --- /dev/null +++ b/pubkey.gpg @@ -0,0 +1,52 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQINBGDllxkBEACqC92HhSXeVg3Bb+j0zb6h3+mBohh+C9bzQjL8+Qv8qluiBRj7 +kdeAvgupylc2XZ12kTBcaQ7MEtCiutmYzhiNfW2MOyY34UI/mdCyopN6X6IyjO3D +cuLgDr0DKGHzrZV6bthCa+IqDemmtbPVtPGe3dQ7TpnUXt/y8xXxCAWCWw4ZgY+1 +7xZwSOwpBQRme/G8QjpPe19GtC9N8loIKqBEdchnw+IDKfTxy59p9uqrGw6/nk7U +9MGAnx+9bDadmUHxwf0EipXu/N4IXNrTWPPGZFd7dy8e8Ktz85y/j7QqB5BURdAm +h+rFcPPp40HKqLQykukcmAdIaZijq15iH7SrrPjEvTeY+LBm+FlZHD5GQQ0rSNYT +0dExdv3WdWcWFg4bG8fnNkwBo7mPKOFBi+ER1z3oIKLATkHo7K/Y7qTgBNJoyd76 +PSP81qdlvl5xewJihmz2RpPgFTVjeJwYYGglOJXV3AxCilLSsBwJciJWdE5vVHbB +a1y0L4LrL4R2b7d18xSpZCr6xs1WKTXdS6MSkqZeeLsNo6KnPLzDkHO7Wbr5hzJl +0is6P7wKeKZ5rk3rdcvQVv1wD4TXZrJyHhoM7Mqn6EyFfWJ3n4zgpvBiT6vVXsQK +28LLk1GSIgEBo+BZcCtFpilAounaqtvwxMPthM1PT2yX64Q8947pPCzdowARAQAB +tDpTcGx1bmsgSW5zdHJ1bWVudGF0aW9uIEF1dGhvcnMgPHN1cHBvcnQramF2YUBz +aWduYWxmeC5jb20+iQJOBBMBCAA4FiEE2h0hnhgoqQjSonzI5W2wgceOL+4FAmDl +lxkCGwMFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AACgkQ5W2wgceOL+6J3w//dau4 +6BvfP6F+vghIYQ4L1hm8obEbPqn3oMfaKCJHOZP96JSsp3XPGauUW/vmJHON5ro+ +LeVMUSvdi7MkWs9aX2bim4c4raFRiiKeITieUQwaEqOGXDcGUbn/nMatfKAD4EgV +WrS6BObyH4kB7P98sCNLlq1bFpvUQqUP+3erHTfQjokNJjiSGHmq/9dY6PyMTPei +hmJ2VqXOC7pKf4lHO//cNQ0voONvAdtWCTC/RYhqiaWZGacGP4NK4kVfhJz3aJyX +0BB4JOqOJd3QCh7VzAfmryJsybAK6FZHbUz5e4hGki9Cjsx2vkW9uPR9j9wmYNyo +J+1FEfOnz+HdplEgDCMP2xAUNiQyDJMeD1Y6Krhkf+eiUBxXYSWWlH9fqnDnUhZA +ewX/JfGiWIP9FmmvjDWbxNOjxRLoZ+jHKwbpFM4wbCwyQyg06Q/CWgTQPChF2aCr +mH2IiR8oT1NVvwqXUtkfihZCBbXmhj9qKG7Ei9sZpvjqPL+OjW5XM2SVKd9LUDlO +N413bBqgoJFb3dCJtTzTRIFx0e/GFVPuu6cSN4nUdIrHFy2dWcrE+F45bIweg4cR +3Mk1Y3KpY64bSjMdA1qlZ4q2n910w7U9y1+Q+PNKFaFtQVFTdbT2dwRADrqp85CP +fLSRRZe6Se+NvP/hxFDG7gWe6+EltbFtxPq1TOC5Ag0EYOWXGQEQAM2+5Wp99MwE +o2sSpSdaDCRlunwz5lnNozZPeuEB8Jzu5LfEmnLfA2PlUuEBtzzjZJb/ENiiM3Lh +ZuZA1RY1u9QqUEvYvdJad1TU6uz9Jcg9RXp8U/kpY0DKHkPmxvmnwCwJhxTGRsbP +zeVEA0k+H0xVnxiFvSeKybc0Vne5xvr3zfdzOb3BGkl+w4sJqN7USx+Q+f4jqTy6 +NGjDoYJYXioT5AIBg7ArxW9MdtOUawBusuQbecmS/8O5uS+Dx6kU8ViVVraqR9FL +xsCpOcFcSJh75JMCAfl2ggKi5gCjNkTy/TlHUWKTdE1m+kvO+5N/0HkfNXRWtyeV +5hzVQPBt5aomNGVJiLHtKNkjTi08ldKQGUwjEUupssz/XhQoGixAhGHHRaWX5Byb +sffyFy0NI6V/LsXMtudR6bHRkjGPnDkunUyoLhXjh9d/ZB9VueaLfCD0jtQHHMT2 +iQkV9H+sVBdHnSSF1cdTWnfQQ3OA+/ajoiDaIFRIUStlXwqg4F8CkWef+uogA+Ds +lKBFLydu9fqT5keeboY8icl38+apa+ug+z7xdH0eNLSGZcgSCHd6gqY3zOhizDGW +mMpl9nf8YtqR3BppGkUgQHWBDhsJtILS6N2+kCXwSKHZAWDhwmETLfuirjYyv2sQ +Jh9Zkp7Ed52w4HahmpPeDIQEhjHY+tYpABEBAAGJAjYEGAEIACAWIQTaHSGeGCip +CNKifMjlbbCBx44v7gUCYOWXGQIbDAAKCRDlbbCBx44v7oMHEACiSm5QmvkZIE9E +X552pmQrXe40hAz40Cqd927/jpX7J2xla6IlTFUZ+JjJmi8QopXdmy8IefRH9Z2E +QEMlGsDktEVMWsQLuqldCNL1gR/GEMakptCTUEd3sfqnE67mi2D3eZxxG5BTziq/ +LkRD8YK13z4IYKtrtywSvvp+wYL9bUeS/uGsm46vudQYF00mb5TxPsFxxmXHf808 +Ubh0ZQZ+ibjPpz0/7Ntq6aDZtgUOKuRB2g8FOy+j62x1uANXRZVk2Kd5aIQii7Wq +dwTmbIjtwwbSznSCd2eqCYVfTrieL1jEBq39HaxuJNF4fZs6wRibg/OguROc2bEg +kEk5EbsYUIxji8NHqKFsD13yu/qlFjUpB+9/L9XD2YKouK5g0sYHj12gdcOQG8Se +9zfn0Az+1/28ob4FRyOxiE1CKeEFo9TUCfHCxfOboA/aRMBMj9UkrFUm0VhZFKsH +gGxep1d15l1zAq+VBL63hoL8UQzBY1Q83/nuNmMFWN2W4DJIQossh8sKNt7OSU1T +0fFC9YbmHMS3j0E0kUIywYPe6ERFpVnW35Ayy/Mbjdpb0MtLABQRsMsi7SJ4yCEn +uuJxFr4tanryuJL1PC7F1Pt91RY7JRgm3JQUdIXJE7OuLXZrKDGEz/uXTeTokKme +x70YdSF2oD/7Ea/HPDTWHwKzDpR/dg== +=RMlo +-----END PGP PUBLIC KEY BLOCK----- diff --git a/signalflow-client/pom.xml b/signalflow-client/pom.xml new file mode 100644 index 0000000..821c486 --- /dev/null +++ b/signalflow-client/pom.xml @@ -0,0 +1,245 @@ + + + 4.0.0 + + + + true + UTF-8 + + + com.signalfx.public + signalflow-client + SignalFlow Client + 1.0.0-beta1 + + SignalFx functionality to support signalflow + + + http://www.splunk.com + + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + + scm:git:git@github.com:signalfx/signalfx-java.git + scm:git:git@github.com:signalfx/signalfx-java.git + git@github.com:signalfx/signalfx-java.git + + + + + splunk + Splunk Instrumentation Authors + support+java@signalfx.com + Splunk + https://www.splunk.com + + + + + + org.slf4j + slf4j-api + 2.0.13 + + + org.apache.commons + commons-lang3 + 3.14.0 + + + com.fasterxml.jackson.core + jackson-annotations + 2.17.1 + + + com.fasterxml.jackson.core + jackson-databind + 2.17.1 + + + com.google.guava + guava + 33.2.0-jre + + + org.apache.httpcomponents + httpclient + 4.5.14 + + + org.apache.httpcomponents + httpcore + 4.4.16 + + + commons-io + commons-io + 2.16.1 + + + org.eclipse.jetty.websocket + websocket-client + 9.4.54.v20240208 + + + + + junit + junit + 4.13.2 + test + + + + + + + + + + maven-deploy-plugin + 3.1.2 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.13.0 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.6.3 + + 8 + + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.3.1 + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.5.3 + + ${skipShaded} + true + true + + + *:* + + about.html + google/protobuf/**/*.proto + mozilla/*.txt + META-INF/LICENSE* + META-INF/NOTICE* + + + + + + META-INF/LICENSE + ${project.basedir}/../LICENSE + + + + + org.slf4j:* + *:metrics-core:* + + + + + com.fasterxml + com.signalfx.shaded.fasterxml + + + org.apache + com.signalfx.shaded.apache + + + org.eclipse.jetty + com.signalfx.shaded.jetty + + + + com.google.common + com.signalfx.shaded.google.common + + + com.google.errorprone + com.signalfx.shaded.google.errorprone + + + com.google.j2objc + com.signalfx.shaded.google.j2objc + + + com.google.thirdparty + com.signalfx.shaded.google.thirdparty + + + javax.annotation + com.signalfx.shaded.javax.annotation + + + org.checkerframework + com.signalfx.shaded.checkerframework + + + com.google.protobuf + com.signalfx.shaded.google.protobuf + + + + + + build-shaded-jar + package + + shade + + + + + + + diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/Channel.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Channel.java new file mode 100644 index 0000000..c655641 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Channel.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.Closeable; +import java.util.Iterator; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract immutable representation for open channels that receive streaming data from a SignalFlow + * computation. + * + * Channel objects bridge the gap between an underlying transport and a higher-level Computation + * object by providing a transport-agnostic and encoding-agnostic access to the stream of + * messages.StreamMessage objects that are received for a given computation. + * + * Channels are iterable that return ChannelMessage instances. + * + * @author dgriff + */ +public abstract class Channel implements Iterator, Closeable { + + protected static final Logger log = LoggerFactory.getLogger(Channel.class); + private static final int CHANNEL_NAME_LENGTH = 8; + + // unique id for the channel + protected final String name; + + protected boolean isClosed = false; + protected Iterator iterator; + + protected Channel() { + this.name = "channel-" + RandomStringUtils.random(CHANNEL_NAME_LENGTH, true, true); + } + + public Channel(final Iterator iterator) { + this.iterator = iterator; + this.name = "channel-" + RandomStringUtils.random(CHANNEL_NAME_LENGTH, true, true); + } + + public String getName() { + return this.name; + } + + public boolean hasNext() { + if (!isClosed()) { + return this.iterator.hasNext(); + } else { + throw new IllegalStateException("channel is closed"); + } + } + + public ChannelMessage next() { + if (!isClosed()) { + ChannelMessage message = null; + while (message == null) { + StreamMessage streamMessage = this.iterator.next(); + message = ChannelMessage.decodeStreamMessage(streamMessage); + if (message == null) { + log.warn("Unsupported control message {}. ignoring!", streamMessage); + } + } + return message; + + } else { + throw new IllegalStateException("channel is closed"); + } + } + + public void remove() { + if (!isClosed()) { + this.iterator.remove(); + } else { + throw new IllegalStateException("channel is closed"); + } + } + + public void close() { + this.isClosed = true; + } + + public boolean isClosed() { + return this.isClosed; + } + + public String toString() { + return "channel<" + this.name + ">"; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ChannelMessage.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ChannelMessage.java new file mode 100644 index 0000000..c864a5c --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ChannelMessage.java @@ -0,0 +1,445 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.signalfx.signalflow.client.StreamMessage.Kind; + +/** + * Base class for stream messages received from a SignalFlow computation. + * + * @author dgriff + */ +public abstract class ChannelMessage { + + /** + * Enumeration of types of channel messages + */ + public static enum Type { + + STREAM_START(Kind.CONTROL), + JOB_START(Kind.CONTROL), + JOB_PROGRESS(Kind.CONTROL), + CHANNEL_ABORT(Kind.CONTROL), + END_OF_CHANNEL(Kind.CONTROL), + INFO_MESSAGE(Kind.INFORMATION), + METADATA_MESSAGE(Kind.METADATA), + EXPIRED_TSID_MESSAGE(Kind.EXPIRED_TSID), + DATA_MESSAGE(Kind.DATA), + EVENT_MESSAGE(Kind.EVENT), + ERROR_MESSAGE(Kind.ERROR); + + private final Kind kind; + + Type(Kind kind) { + this.kind = kind; + } + + Kind kind() { + return kind; + } + }; + + protected static final Logger log = LoggerFactory.getLogger(ChannelMessage.class); + protected static final ObjectMapper mapper = new ObjectMapper(); + static { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + protected String rawdata; + protected ChannelMessage.Type channelMessageType; + + public Type getType() { + return this.channelMessageType; + } + + public String toString() { + return this.rawdata; + } + + /** + * Converts the raw stream message into the proper type of channel message + * + * @param streamMessage + * raw stream message + * @return a channel message instance + * @throws SignalFlowException + * if decode fails + */ + public static ChannelMessage decodeStreamMessage(StreamMessage streamMessage) + throws SignalFlowException { + try { + ChannelMessage message = null; + + switch (streamMessage.getKind()) { + + case CONTROL: + message = mapper.readValue(streamMessage.getData(), ControlMessage.class); + break; + + case INFORMATION: + message = mapper.readValue(streamMessage.getData(), InfoMessage.class); + break; + + case METADATA: + message = mapper.readValue(streamMessage.getData(), MetadataMessage.class); + break; + + case EXPIRED_TSID: + message = mapper.readValue(streamMessage.getData(), ExpiredTsIdMessage.class); + break; + + case DATA: + message = mapper.readValue(streamMessage.getData(), DataMessage.class); + break; + + case EVENT: + message = mapper.readValue(streamMessage.getData(), EventMessage.class); + break; + + case ERROR: + message = mapper.readValue(streamMessage.getData(), ErrorMessage.class); + break; + } + + if (log.isDebugEnabled()) { + message.rawdata = streamMessage.getData(); + } + + return message; + + } catch (IOException ex) { + log.error(streamMessage.toString(), ex); + throw new SignalFlowException("failed to decode stream message: " + streamMessage, ex); + } + } + + /** + * Base class for control messages. + */ + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXTERNAL_PROPERTY, property = "event", visible = true) + @JsonSubTypes({ + @JsonSubTypes.Type(value = ChannelMessage.StreamStartMessage.class, name = "STREAM_START"), + @JsonSubTypes.Type(value = ChannelMessage.JobStartMessage.class, name = "JOB_START"), + @JsonSubTypes.Type(value = ChannelMessage.JobProgressMessage.class, name = "JOB_PROGRESS"), + @JsonSubTypes.Type(value = ChannelMessage.ChannelAbortMessage.class, name = "CHANNEL_ABORT"), + @JsonSubTypes.Type(value = ChannelMessage.EndOfChannelMessage.class, name = "END_OF_CHANNEL") }) + public static abstract class ControlMessage extends ChannelMessage { + + protected long timestampMs; + + /** + * @return The wall clock timestamp (millisecond precision) of the message. + */ + public long getTimestampMs() { + return this.timestampMs; + } + } + + /** + * Message received when the stream begins. + */ + @JsonTypeName("STREAM_START") + public static class StreamStartMessage extends ControlMessage { + + public StreamStartMessage() { + this.channelMessageType = Type.STREAM_START; + } + } + + /** + * Message received when the computation completes normally. No further messages will be + * received from a computation after this one. + */ + @JsonTypeName("END_OF_CHANNEL") + public static class EndOfChannelMessage extends ControlMessage { + + public EndOfChannelMessage() { + this.channelMessageType = Type.END_OF_CHANNEL; + } + } + + /** + * Message received when the SignalFlow computation has started. + */ + @JsonTypeName("JOB_START") + public static class JobStartMessage extends ControlMessage { + + protected String handle; + + public JobStartMessage() { + this.channelMessageType = Type.JOB_START; + } + + /** + * @return The computation's handle ID + */ + public String getHandle() { + return this.handle; + } + } + + /** + * Message received while computation windows are primed, if they are present. The message will + * be received multiple times with increasing progress values from 0 to 100, indicating the + * progress percentage. + */ + @JsonTypeName("JOB_PROGRESS") + public static class JobProgressMessage extends ControlMessage { + + protected int progress; + + public JobProgressMessage() { + this.channelMessageType = Type.JOB_PROGRESS; + } + + /** + * @return Computation priming progress, as a percentage between 0 and 100. + */ + public int getProgress() { + return this.progress; + } + } + + /** + * Message received when the computation aborted before its defined stop time, either because of + * an error or from a manual stop. No further messages will be received from a computation after + * this one. + */ + @JsonTypeName("CHANNEL_ABORT") + public static class ChannelAbortMessage extends ControlMessage { + + protected LinkedHashMap abortInfo; + + public ChannelAbortMessage() { + this.channelMessageType = Type.CHANNEL_ABORT; + } + + /** + * @return Information about the computation's termination. + */ + public Map getAbortInfo() { + return this.abortInfo; + } + } + + /** + * Message containing information about the SignalFlow computation's behavior or decisions + */ + public static class InfoMessage extends ChannelMessage { + + protected LinkedHashMap message; + protected long logicalTimestampMs; + + public InfoMessage() { + this.channelMessageType = Type.INFO_MESSAGE; + } + + /** + * @return The logical timestamp (millisecond precision) for which the message has been + * emitted. + */ + public long getLogicalTimestampMs() { + return this.logicalTimestampMs; + } + + /** + * @return The information message. Refer to the Developer's documentation for a reference + * of the possible messages and their structure. + */ + public Map getMessage() { + return this.message; + } + } + + /** + * Message containing metadata information about an output metric or event timeseries. Metadata + * messages are always emitted by the computation prior to any data or events for the + * corresponding timeseries. + */ + public static class MetadataMessage extends ChannelMessage { + + protected LinkedHashMap properties; + protected String tsId; + + public MetadataMessage() { + this.channelMessageType = Type.METADATA_MESSAGE; + } + + /** + * @return A unique timeseries identifier. + */ + public String getTsId() { + return this.tsId; + } + + /** + * @return The metadata properties of the timeseries. + */ + public Map getProperties() { + return this.properties; + } + } + + /** + * Message informing us that an output timeseries is no longer + * part of the computation and that we may do some cleanup of + * whatever internal state we have tied to that output timeseries. + */ + public static class ExpiredTsIdMessage extends ChannelMessage { + + protected String tsId; + + public ExpiredTsIdMessage() { + this.channelMessageType = Type.EXPIRED_TSID_MESSAGE; + } + + /** + * @return The identifier of the timeseries that's no longer interesting + * to the computation. + */ + public String getTsId() { + return this.tsId; + } + } + + /** + * Message containing a batch of datapoints generated for a particular iteration. + */ + public static class DataMessage extends ChannelMessage { + + protected Map data; + protected long logicalTimestampMs; + + @JsonCreator + public DataMessage(@JsonProperty("logicalTimestampMs") long logicalTimestampMs, + @JsonProperty("data") List> data) { + this.channelMessageType = Type.DATA_MESSAGE; + this.logicalTimestampMs = logicalTimestampMs; + this.data = new HashMap(data.size()); + for (Map datum : data) { + this.data.put((String) datum.get("tsId"), (Number) datum.get("value")); + } + } + + /** + * @return The logical timestamp of the data (millisecond precision). + */ + public long getLogicalTimestampMs() { + return this.logicalTimestampMs; + } + + /** + * @return The data, as a map of timeseries ID to datapoint value. + */ + public Map getData() { + return this.data; + } + + public void addData(Map data) { + this.data.putAll(data); + } + } + + /** + * Message received when the computation has generated an event or alert from a detect block. + */ + public static class EventMessage extends ChannelMessage { + + protected LinkedHashMap metadata; + protected LinkedHashMap properties; + protected long timestampMs; + protected String tsId; + + public EventMessage() { + this.channelMessageType = Type.EVENT_MESSAGE; + } + + /** + * @return A unique timeseries identifier. + */ + public String getTsId() { + return this.tsId; + } + + /** + * @return The timestamp of the event (millisecond precision). + */ + public long getTimestampMs() { + return this.timestampMs; + } + + /** + * @return The metadata of the EventTimeSeries this event belongs to. May be empty if the + * event was created by the SignalFlow computation itself. + */ + public Map getMetadata() { + return this.metadata; + } + + /** + * @return The properties of the event. For alerts, you can expect 'was' and 'is' properties + * that communicate the evolution of the state of the incident. + */ + public Map getProperties() { + return this.properties; + } + } + + /** + * Message received when the computation encounters errors during its initialization. + * Because the error that is returned might have one of two sets of contents, this has both an + * `errors` and a `message` accessor. We'll be liberal here so the code can decide how to show + * this error later. Optimally, these should be different message types, but that's a lot of + * work to handle on both the backend and client side for +1/-1. + */ + public static class ErrorMessage extends ChannelMessage { + + protected int error; + protected ArrayList errors; + protected String message; + + public ErrorMessage() { + this.channelMessageType = Type.ERROR_MESSAGE; + } + + /** + * @return The error number, akin to an HTTP error code. + */ + public int getError() { + return this.error; + } + + /** + * @return The list of errors. Each error has a 'code' defining what the error is, and a + * 'context' dictionary providing details. + */ + public List getErrors() { + return this.errors; + } + + /** + * @return The error message for the failure + */ + public String getMessage() { + return this.message; + } + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/Computation.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Computation.java new file mode 100644 index 0000000..588183d --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Computation.java @@ -0,0 +1,337 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import com.signalfx.signalflow.client.ChannelMessage.ChannelAbortMessage; +import com.signalfx.signalflow.client.ChannelMessage.DataMessage; +import com.signalfx.signalflow.client.ChannelMessage.ErrorMessage; +import com.signalfx.signalflow.client.ChannelMessage.ExpiredTsIdMessage; +import com.signalfx.signalflow.client.ChannelMessage.InfoMessage; +import com.signalfx.signalflow.client.ChannelMessage.JobStartMessage; +import com.signalfx.signalflow.client.ChannelMessage.MetadataMessage; + +/** + * A live handle to a running SignalFlow computation. + * + * @author dgriff + */ +public class Computation implements Iterable, Iterator { + + /** + * Enumeration of computation states + */ + public static enum State { + STATE_UNKNOWN, + STATE_STREAM_STARTED, + STATE_COMPUTATION_STARTED, + STATE_DATA_RECEIVED, + STATE_COMPLETED, + STATE_ABORTED; + } + + protected SignalFlowTransport transport; + protected String program; + protected Map params; + protected boolean isAttachedChannel; + + private Map> metadata = new HashMap>(); + + private String id; + private Channel channel; + private ChannelMessage nextMessage; + private State state = State.STATE_UNKNOWN; + private long lastLogicalTimestampMs = -1; + private long resolution; + private int expectedBatches; + private boolean batchCountDetected; + private int currentBatchCount; + private DataMessage currentBatchMessage; + + public Computation(SignalFlowTransport transport, String program, Map params, + boolean attach) { + this.transport = transport; + this.program = program; + this.params = params; + this.isAttachedChannel = attach; + this.channel = isAttachedChannel ? attach() : execute(); + } + + /** + * @return handle to computation + */ + public String getId() { + return this.id; + } + + /** + * @return data resolution + */ + public long getResolution() { + return resolution; + } + + /** + * @return current computation state + */ + public State getState() { + return state; + } + + /** + * @return last message time in milliseconds since midnight, January 1, 1970 UTC + */ + public long getLastLogicalTimestampMs() { + return lastLogicalTimestampMs; + } + + /** + * @return sorted list of known timeseries ids + */ + public Collection getKnownTSIDs() { + List list = new ArrayList(metadata.keySet()); + Collections.sort(list); + return list; + } + + /** + * @param tsid + * unique identifier of timeseries + * @return the full metadata object for the given timeseries (by its ID), or null if not + * available. + */ + public Map getMetadata(String tsid) { + return metadata.get(tsid); + } + + /** + * Getter of iterator that iterates over the messages from the computation's output. + */ + public Iterator iterator() { + return this; + } + + @Override + public boolean hasNext() throws ComputationAbortedException, + ComputationFailedException, SignalFlowException, StreamRequestException { + while ((state != State.STATE_COMPLETED) && (!channel.isClosed) && (nextMessage == null)) { + parseNext(); + } + + return nextMessage != null; + } + + /** + * Iterate over the messages from the computation's output. + * + * Control and metadata messages are intercepted and interpreted to enhance this Computation's + * object knowledge of the computation's context. Data and event messages are yielded back to + * the caller as a generator. + */ + @Override + public ChannelMessage next() throws ComputationAbortedException, ComputationFailedException, + SignalFlowException, NoSuchElementException { + while ((state != State.STATE_COMPLETED) && (!channel.isClosed) && (nextMessage == null)) { + parseNext(); + } + + if (nextMessage != null) { + ChannelMessage message = nextMessage; + nextMessage = null; + return message; + } else { + // no more messages can come from this channel + throw new NoSuchElementException("no more stream messages"); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove not supported"); + } + + /** + * Manually close this computation and detach from its stream. This computation object cannot be + * restarted, used or streamed for after this method is called. + */ + public void close() { + channel.close(); + nextMessage = null; + } + + /** + * Create channel for computation + * + * @return Channel for computation + * @throws SignalFlowException + * if transport fails to create channel + */ + private Channel execute() throws SignalFlowException { + HashMap params = new HashMap(this.params); + if (lastLogicalTimestampMs >= 0) { + params.put("start", Long.toString(lastLogicalTimestampMs)); + } + + return transport.execute(program, params); + } + + /** + * Attach to existing channel for computation + * + * @return Channel for computation + * @throws SignalFlowException + * if transport fails to attach to channel + */ + private Channel attach() throws SignalFlowException { + return transport.attach(program, params); + } + + /** + * Process the channel messages to manage computation + * + * @throws ComputationAbortedException + * on receiving channel message aborted + * @throws ComputationFailedException + * on receiving channel message error + */ + private void parseNext() throws ComputationAbortedException, + ComputationFailedException, SignalFlowException { + nextMessage = null; + while (state != State.STATE_COMPLETED) { + if (!channel.hasNext()) { + if (state != State.STATE_COMPLETED) { + channel.close(); + channel = isAttachedChannel ? attach() : execute(); + continue; + } + } else { + ChannelMessage message = channel.next(); + + switch (message.channelMessageType) { + case STREAM_START: + state = State.STATE_STREAM_STARTED; + break; + + case JOB_START: + state = State.STATE_COMPUTATION_STARTED; + nextMessage = message; + id = ((JobStartMessage) message).getHandle(); + break; + + case JOB_PROGRESS: + nextMessage = message; + break; + + case CHANNEL_ABORT: + state = State.STATE_ABORTED; + ChannelAbortMessage abortMessage = (ChannelAbortMessage) message; + throw new ComputationAbortedException(abortMessage.getAbortInfo()); + + case END_OF_CHANNEL: + state = State.STATE_COMPLETED; + break; + + case METADATA_MESSAGE: + // Intercept metadata messages to accumulate received metadata. + MetadataMessage metadataMessage = (MetadataMessage) message; + metadata.put(metadataMessage.getTsId(), metadataMessage.getProperties()); + nextMessage = message; + break; + + case EXPIRED_TSID_MESSAGE: + // Intercept expired-tsid messages to clean it up. + ExpiredTsIdMessage expiredTsIdMessage = (ExpiredTsIdMessage) message; + metadata.remove(expiredTsIdMessage.getTsId()); + nextMessage = message; + break; + + case INFO_MESSAGE: + InfoMessage infoMessage = (InfoMessage) message; + String messageCode = (String) infoMessage.getMessage().get("messageCode"); + + // Extract the output resolution from the appropriate message, if it's present. + if ("JOB_RUNNING_RESOLUTION".equals(messageCode)) { + @SuppressWarnings("unchecked") + LinkedHashMap contents = (LinkedHashMap) infoMessage + .getMessage().get("contents"); + resolution = ((Number) contents.get("resolutionMs")).longValue(); + } + + batchCountDetected = true; + if (currentBatchMessage != null) { + setNextDataMessageToYield(); + } + break; + + case DATA_MESSAGE: + // Accumulate data messages and release them when we have received + // all batches for the same logical timestamp. + state = State.STATE_DATA_RECEIVED; + if (!batchCountDetected) { + expectedBatches++; + } + + DataMessage dataMessage = (DataMessage) message; + if (currentBatchMessage == null) { + currentBatchMessage = dataMessage; + currentBatchCount = 1; + } else if (dataMessage.getLogicalTimestampMs() == currentBatchMessage + .getLogicalTimestampMs()) { + currentBatchMessage.addData(dataMessage.getData()); + currentBatchCount++; + } else { + batchCountDetected = true; + } + + if (batchCountDetected && currentBatchMessage != null + && currentBatchCount == expectedBatches) { + setNextDataMessageToYield(); + } + break; + + case EVENT_MESSAGE: + nextMessage = message; + break; + + case ERROR_MESSAGE: + ErrorMessage errorMessage = (ErrorMessage) message; + /* This is a hack based on the fact that the API can return type different + * error messages with the same type. We have to check attributes to know + * which error we're working with. + */ + if (errorMessage.getMessage() != null) { + throw new StreamRequestException(errorMessage.getError(), errorMessage.getMessage()); + } else { + throw new ComputationFailedException(errorMessage.getErrors()); + } + } + } + + if (nextMessage != null) { + break; + } + } + } + + /** + * Set the next data message that will be returned by the iterator and reset the current batch + * message in which we accumulate. + */ + private void setNextDataMessageToYield() { + DataMessage yieldMessage = currentBatchMessage; + currentBatchMessage = null; + currentBatchCount = 0; + lastLogicalTimestampMs = yieldMessage.getLogicalTimestampMs(); + nextMessage = yieldMessage; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationAbortedException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationAbortedException.java new file mode 100644 index 0000000..bd7c054 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationAbortedException.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.Map; + +/** + * Exception thrown if the computation is aborted during its execution. + * + * @author dgriff + */ +public class ComputationAbortedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected String state; + protected String reason; + + public ComputationAbortedException(Map abortInfo) { + this(abortInfo.get("sf_job_abortState"), abortInfo.get("sf_job_abortReason")); + } + + private ComputationAbortedException(String state, String reason) { + super("Computation " + state + ": " + reason); + this.state = state; + this.reason = reason; + } + + public String getState() { + return this.state; + } + + public String getReason() { + return this.reason; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationFailedException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationFailedException.java new file mode 100644 index 0000000..547c01d --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationFailedException.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.List; + +/** + * Exception thrown when the computation failed after being started. + * + * @author dgriff + */ +public class ComputationFailedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected List errors; + + public ComputationFailedException(List errors) { + super("Computation failed (" + errors + ")"); + this.errors = errors; + } + + public List getErrors() { + return this.errors; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationHandler.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationHandler.java new file mode 100644 index 0000000..55084bc --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationHandler.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.concurrent.Callable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.signalfx.signalflow.client.ChannelMessage.DataMessage; +import com.signalfx.signalflow.client.ChannelMessage.EventMessage; +import com.signalfx.signalflow.client.ChannelMessage.ExpiredTsIdMessage; +import com.signalfx.signalflow.client.ChannelMessage.JobProgressMessage; +import com.signalfx.signalflow.client.ChannelMessage.JobStartMessage; +import com.signalfx.signalflow.client.ChannelMessage.MetadataMessage; +import com.signalfx.signalflow.client.Computation.State; + +/** + * Class provides basic plumbing used by subclasses to processing computation. + * + * subclass the onMessage methods and invoke the process method to run on current + * thread or use executor to submit as callable in another thread. + * + * @author dgriff + */ +public abstract class ComputationHandler implements Callable { + + protected static final Logger log = LoggerFactory.getLogger(ComputationHandler.class); + protected Computation computation; + private long startTimeMs; + private long stopTimeMs; + + /** + * Constructor that sets the computation + * + * @param computation + * instance to process + */ + public ComputationHandler(Computation computation) { + this.computation = computation; + } + + /** + * Override to process job start messages + * + * @param message + * job start + */ + protected void onMessage(JobStartMessage message) {} + + /** + * Override to process job progress messages + * + * @param message + * job progress + */ + protected void onMessage(JobProgressMessage message) {} + + /** + * Override to process data messages + * + * @param message + * data + */ + protected void onMessage(DataMessage message) {} + + /** + * Override to process event messages + * + * @param message + * event + */ + protected void onMessage(EventMessage message) {} + + /** + * Override to process metadata messages + * + * @param message + * metadata + */ + protected void onMessage(MetadataMessage message) {} + + /** + * Override to process expired tsId messages + * + * @param message + * expired tsid message + */ + protected void onMessage(ExpiredTsIdMessage message) {} + + /** + * @return Time at which the computation started, in milliseconds since midnight, January 1, 1970 UTC + */ + public long getStartTimeMs() { + return startTimeMs; + } + + /** + * @return Time at which the computation stopped, in milliseconds since midnight, January 1, 1970 UTC + */ + public long getStopTimeMs() { + return stopTimeMs; + } + + /** + * Processes the computation + * + * @return computation instance that was processed + * @throws ComputationAbortedException + * Exception thrown if the computation is aborted during its execution + * @throws ComputationFailedException + * Exception thrown when the computation failed after being started + * @throws SignalFlowException + * A generic error encountered when interacting with the SignalFx SignalFlow API + * @throws IllegalStateException + * Exception thrown is computation is closed + */ + public Computation process() throws ComputationAbortedException, ComputationFailedException, + SignalFlowException, IllegalStateException { + if (computation.getState() == State.STATE_COMPLETED) { + throw new IllegalStateException("computation is completed"); + } + + startTimeMs = System.currentTimeMillis(); + stopTimeMs = -1; + + try { + // iterate computation messages and route to message handling methods + for (ChannelMessage message : computation) { + switch (message.getType()) { + case JOB_START: + JobStartMessage jobStartMessage = (JobStartMessage) message; + onMessage(jobStartMessage); + break; + + case JOB_PROGRESS: + JobProgressMessage jobProgressMessage = (JobProgressMessage) message; + onMessage(jobProgressMessage); + break; + + case DATA_MESSAGE: + DataMessage dataMessage = (DataMessage) message; + onMessage(dataMessage); + break; + + case EVENT_MESSAGE: + EventMessage eventMessage = (EventMessage) message; + onMessage(eventMessage); + break; + + case METADATA_MESSAGE: + MetadataMessage metadataMessage = (MetadataMessage) message; + onMessage(metadataMessage); + break; + + case EXPIRED_TSID_MESSAGE: + ExpiredTsIdMessage expiredTsIdMessage = (ExpiredTsIdMessage) message; + onMessage(expiredTsIdMessage); + break; + + default: + break; + } + } + } finally { + stopTimeMs = System.currentTimeMillis(); + close(); + } + + return computation; + } + + /** + * closes the computation + */ + public void close() { + computation.close(); + } + + /** + * Callable implementation that calls process. + */ + @Override + public Computation call() throws ComputationAbortedException, ComputationFailedException, + SignalFlowException, IllegalStateException { + return process(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ServerSentEventsTransport.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ServerSentEventsTransport.java new file mode 100644 index 0000000..8233194 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ServerSentEventsTransport.java @@ -0,0 +1,555 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.regex.Pattern; + +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.StatusLine; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.message.BasicNameValuePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.signalfx.signalflow.client.connection.AbstractHttpReceiverConnection; +import com.signalfx.signalflow.client.endpoint.SignalFxEndpoint; + +/** + * Server-Sent Events transport. + * + * Implements a transport to the SignalFlow API that uses simple HTTP requests and reads Server-Sent + * Events streams back from SignalFx. One connection per SignalFlow computation is required when + * using this transport. This is a good transport for single, ad-hoc computations. For most use + * cases though, the WebSocket-based transport is more efficient and has lower latency. + * + * @author dgriff + */ +public class ServerSentEventsTransport implements SignalFlowTransport { + + protected static final Logger log = LoggerFactory.getLogger(ServerSentEventsTransport.class); + public static final Integer DEFAULT_TIMEOUT = 1000; + public static final Integer DEFAULT_MAX_RETRIES = 3; + + protected final String token; + protected final SignalFxEndpoint endpoint; + protected final String path; + protected Integer timeout = DEFAULT_TIMEOUT; + protected Integer maxRetries = DEFAULT_MAX_RETRIES; + + protected ServerSentEventsTransport(final String token, final SignalFxEndpoint endpoint, + final int apiVersion, final Integer timeout) { + this(token, endpoint, apiVersion, timeout, DEFAULT_MAX_RETRIES); + } + + protected ServerSentEventsTransport(final String token, final SignalFxEndpoint endpoint, + final int apiVersion, final Integer timeout, final Integer maxRetries) { + this.token = token; + this.endpoint = endpoint; + this.path = "/v" + apiVersion + "/signalflow"; + this.timeout = timeout; + this.maxRetries = maxRetries; + } + + @Override + public Channel attach(String handle, final Map parameters) { + if (log.isDebugEnabled()) { + log.debug("attach: [ {} ] with parameters: {}", handle, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + + response = connection.post(this.token, this.path + "/" + handle + "/attach", parameters, + null); + + return new TransportChannel(connection, response); + } catch (Exception ex) { + close(response); + close(connection); + throw new SignalFlowException("failed to create transport channel for attach", ex); + } + } + + @Override + public Channel execute(String program, final Map parameters) + throws SignalFlowException { + if (log.isDebugEnabled()) { + log.debug("execute: [ {} ] with parameters: {}", program, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + + response = connection.post(this.token, this.path + "/execute", parameters, program); + + return new TransportChannel(connection, response); + } catch (IOException ioex) { + close(response); + close(connection); + throw new SignalFlowException("failed to create transport channel for execute", ioex); + } + } + + @Override + public Channel preflight(String program, final Map parameters) + throws SignalFlowException { + if (log.isDebugEnabled()) { + log.debug("preflight: [ {} ] with parameters: {}", program, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + + response = connection.post(this.token, this.path + "/preflight", parameters, program); + + return new TransportChannel(connection, response); + } catch (IOException ioex) { + close(response); + close(connection); + throw new SignalFlowException("failed to create transport channel for execute", ioex); + } + } + @Override + public void start(String program, final Map parameters) { + if (log.isDebugEnabled()) { + log.debug("start: [ {} ] with parameters: {}", program, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + response = connection.post(this.token, this.path + "/start", parameters, program); + } catch (Exception ex) { + throw new SignalFlowException("failed to start program - " + program, ex); + } finally { + close(response); + close(connection); + } + } + + @Override + public void stop(String handle, final Map parameters) { + if (log.isDebugEnabled()) { + log.debug("stop: [ {} ] with parameters: {}", handle, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + response = connection.post(this.token, this.path + "/" + handle + "/stop", parameters, + null); + } catch (Exception ex) { + throw new SignalFlowException("failed to stop program - " + handle, ex); + } finally { + close(response); + close(connection); + } + } + + @Override + public void keepalive(String handle) { + if (log.isDebugEnabled()) { + log.debug("keepalive: [ {} ]", handle); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + response = connection.post(this.token, this.path + "/" + handle + "/keepalive", null, + null); + } catch (Exception ex) { + throw new SignalFlowException("failed to set keepalive for program - " + handle, ex); + } finally { + close(response); + close(connection); + } + } + + @Override + public void close(int code, String reason) { + // nothing to close (separate connections are used and closed by the channel using it) + } + + private void close(CloseableHttpResponse response) { + try { + if (response != null) { + response.close(); + } + } catch (IOException ioex) { + log.error("error closing response", ioex); + } + } + + private void close(TransportConnection connection) { + try { + if (connection != null) { + connection.close(); + } + } catch (IOException ioex) { + log.error("error closing transport connection", ioex); + } + } + + /** + * Builder of SSE Transport Instance + */ + public static class TransportBuilder { + + private String token; + private String protocol = "https"; + private String host = DEFAULT_HOST; + private int port = 443; + private int timeout = 1; + private int version = 2; + + public TransportBuilder(String token) { + this.token = token; + } + + public TransportBuilder setProtocol(String protocol) { + this.protocol = protocol; + return this; + } + + public TransportBuilder setHost(String host) { + this.host = host; + return this; + } + + public TransportBuilder setPort(int port) { + this.port = port; + return this; + } + + public TransportBuilder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public TransportBuilder setAPIVersion(int version) { + this.version = version; + return this; + } + + public ServerSentEventsTransport build() { + SignalFxEndpoint endpoint = new SignalFxEndpoint(this.protocol, this.host, this.port); + ServerSentEventsTransport transport = new ServerSentEventsTransport(this.token, + endpoint, this.version, this.timeout * 1000); + return transport; + } + } + + /** + * SSE Transport Connection + */ + public static class TransportConnection extends AbstractHttpReceiverConnection { + + protected static final Logger log = LoggerFactory.getLogger(TransportConnection.class); + public static final int DEFAULT_TIMEOUT_MS = 1000; + public static final int DEFAULT_MAX_RETRIES = 3; + protected final RequestConfig transportRequestConfig; + + public TransportConnection(SignalFxEndpoint endpoint) { + this(endpoint, DEFAULT_TIMEOUT_MS, DEFAULT_MAX_RETRIES); + } + + public TransportConnection(SignalFxEndpoint endpoint, int timeoutMs, int maxRetries) { + super(endpoint, timeoutMs, maxRetries, new BasicHttpClientConnectionManager()); + + this.transportRequestConfig = RequestConfig.custom().setSocketTimeout(0) + .setConnectionRequestTimeout(this.requestConfig.getConnectionRequestTimeout()) + .setConnectTimeout(this.requestConfig.getConnectTimeout()) + .setProxy(this.requestConfig.getProxy()).build(); + + log.debug("constructed request config: {}", this.transportRequestConfig.toString()); + } + + public CloseableHttpResponse post(String token, String path, + final Map parameters, String body) + throws SignalFlowException { + HttpPost httpPost = null; + try { + List params = new ArrayList(); + if (parameters != null) { + for (Map.Entry entry : parameters.entrySet()) { + params.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); + } + } + + URIBuilder uriBuilder = new URIBuilder(String.format("%s%s", host.toURI(), path)); + uriBuilder.addParameters(params); + + httpPost = new HttpPost(uriBuilder.build()); + httpPost.setConfig(transportRequestConfig); + httpPost.setHeader("X-SF-TOKEN", token); + httpPost.setHeader("User-Agent", USER_AGENT); + httpPost.setHeader("Content-Type", "text/plain"); + if (body != null) { + HttpEntity httpEntity = new StringEntity(body); + httpPost.setEntity(httpEntity); + } + + if (log.isDebugEnabled()) { + log.debug(httpPost.toString()); + } + + CloseableHttpResponse response = client.execute(httpPost); + + StatusLine statusLine = response.getStatusLine(); + int statuscode = statusLine.getStatusCode(); + if ((statuscode < 200) || (statuscode >= 300)) { + + try { + response.close(); + } catch (IOException ex) { + log.error("failed to close response", ex); + } + + String errorMessage = statusLine.getStatusCode() + ": failed post [ " + httpPost + + " ] reason: " + statusLine.getReasonPhrase(); + throw new SignalFlowException(statusLine.getStatusCode(), errorMessage); + } + + return response; + } catch (IOException ex) { + throw new SignalFlowException("failed communication. " + ex.getMessage(), ex); + } catch (URISyntaxException ex) { + throw new SignalFlowException("invalid uri. " + ex.getMessage(), ex); + } + } + + public void close() throws IOException { + client.close(); + } + } + + /** + * Computation channel fed from a Server-Sent Events stream. + */ + public static class TransportChannel extends Channel { + + protected static final Logger log = LoggerFactory.getLogger(TransportChannel.class); + + private TransportConnection connection; + private CloseableHttpResponse response; + private HttpEntity responseHttpEntity; + private TransportEventStreamParser streamParser; + + public TransportChannel(final TransportConnection connection, + final CloseableHttpResponse response) + throws IOException { + super(); + this.connection = connection; + this.response = response; + this.responseHttpEntity = response.getEntity(); + this.streamParser = new TransportEventStreamParser( + this.responseHttpEntity.getContent()); + this.iterator = this.streamParser; + + log.debug("constructed {} of type {}", this, this.getClass().getName()); + } + + @Override + public void close() { + super.close(); + + try { + this.response.close(); + } catch (IOException ex) { + log.error("failed to close response", ex); + } + + try { + this.connection.close(); + } catch (IOException ex) { + log.error("failed to close connection", ex); + } + + this.streamParser.close(); + } + } + + public static class TransportEventStreamParser implements Iterator, Closeable { + + protected static final Logger log = LoggerFactory + .getLogger(TransportEventStreamParser.class); + + private static final String EVENT = "event"; + private static final String ID = "id"; + private static final String DATA = "data"; + private static final String RETRY = "retry"; + private static final String DEFAULT_EVENT = "message"; + private static final String EMPTY_STRING = ""; + private static final Pattern DIGITS_ONLY = Pattern.compile("^[\\d]+$"); + + private BufferedReader eventStreamReader; + private boolean endOfStreamReached = false; + + private int reconnectionTimeoutMs = 1000; // default is 1 second + private StreamMessage nextMessage; + private String lastEventId; + private String eventNameBuffer = DEFAULT_EVENT; + private StringBuilder dataBuffer = new StringBuilder(); + + public TransportEventStreamParser(final InputStream eventStream) + throws UnsupportedEncodingException { + this.eventStreamReader = new BufferedReader( + new InputStreamReader(eventStream, "UTF-8")); + } + + public String getLastEventId() { + return this.lastEventId; + } + + public int getReconnectionTimeoutMs() { + return this.reconnectionTimeoutMs; + } + + @Override + public boolean hasNext() { + while ((endOfStreamReached == false) && (eventStreamReader != null) + && (nextMessage == null)) { + parseNext(); + } + + return nextMessage != null; + } + + @Override + public StreamMessage next() { + while ((endOfStreamReached == false) && (eventStreamReader != null) + && (nextMessage == null)) { + parseNext(); + } + + if (nextMessage != null) { + StreamMessage message = this.nextMessage; + + // important to set next message to null here as that variable stores the next + // message (if one exists) which is checked by next and hasNext methods. and we just + // popped the last message off so it should be null now. + this.nextMessage = null; + + return message; + } else { + throw new NoSuchElementException("no more stream messages"); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove from stream not supported"); + } + + @Override + public void close() { + if (this.eventStreamReader != null) { + try { + this.eventStreamReader.close(); + this.eventStreamReader = null; + } catch (IOException ex) { + log.error("failed to close event stream", ex); + } + } + } + + private void parseNext() { + if (eventStreamReader != null) { + try { + long startTime = System.currentTimeMillis(); + dataBuffer.setLength(0); + + String line; + while ((line = eventStreamReader.readLine()) != null) { + int colonIndex; + if (line.trim().isEmpty()) { + // message ready for dispatch + break; + } else if (line.startsWith(":")) { + // ignore the line + } else if ((colonIndex = line.indexOf(":")) != -1) { + String field = line.substring(0, colonIndex); + String value = line.substring(colonIndex + 1).replaceFirst(" ", + EMPTY_STRING); + processField(field, value); + } else { + processField(line.trim(), EMPTY_STRING); + } + } + + if (line == null) { + // end of stream reached + endOfStreamReached = true; + close(); + } + + if (dataBuffer.length() > 0) { + String data = dataBuffer.toString(); + if (data.endsWith("\n")) { + data = data.substring(0, data.length() - 1); + } + + nextMessage = new StreamMessage(eventNameBuffer, lastEventId, data); + + } else { + log.debug(eventNameBuffer.toString()); + eventNameBuffer = EMPTY_STRING; + nextMessage = null; + } + + log.debug("total stream message read/parse time (ms): {}", + (System.currentTimeMillis() - startTime)); + + } catch (IOException ex) { + log.error("failed to parse next stream event", ex); + throw new SignalFlowException("failed to parse next stream event", ex); + } + } else { + nextMessage = null; + } + } + + private void processField(String field, String value) { + if (DATA.equals(field)) { + dataBuffer.append(value).append("\n"); + } else if (ID.equals(field)) { + lastEventId = value; + } else if (EVENT.equals(field)) { + eventNameBuffer = value; + } else if (RETRY.equals(field)) { + if (DIGITS_ONLY.matcher(value).matches()) { + // set event stream's reconnection time to integer value + reconnectionTimeoutMs = Integer.parseInt(value); + } + } + } + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowClient.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowClient.java new file mode 100644 index 0000000..ece3f09 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowClient.java @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +/** + * SignalFx SignalFlow client. + * + * Client for SignalFx's SignalFlow real-time analytics API. Allows for the execution of ad-hoc + * computations, returning its output in real-time as it is produced; to start new background + * computations; attach, keep alive or stop existing computations. + * + * @author dgriff + */ +public class SignalFlowClient implements AutoCloseable { + + private SignalFlowTransport transport; + + /** + * Client Constructor that uses default transport/settings + * + * @param token + * user api token + */ + public SignalFlowClient(String token) { + this(new WebSocketTransport.TransportBuilder(token).build()); + } + + /** + * Client Constructor that uses custom transport + * + * @param transport + * custom created transport + */ + public SignalFlowClient(SignalFlowTransport transport) { + this.transport = transport; + } + + /** + * Execute the given SignalFlow program and stream the output back. + * + * @param program + * computation written in signalflow language + * @return computation instance + */ + public Computation execute(String program) { + return new Computation(this.transport, program, Collections. emptyMap(), + false); + } + + /** + * This method is deprecated and will be removed in the next major release. Use + * {@link #execute(String, Long, Long, Long, Long, Boolean, Boolean, String)} instead + * + * Execute the given SignalFlow program with parameters and stream the output back. + * + * @param program + * computation written in signalflow language + * @param start + * Optional millisecond start timestamp + * @param stop + * Optional millisecond stop timestamp + * @param resolution + * Optional desired data resolution, in milliseconds + * @param maxDelay + * Optional desired maximum data delay, in milliseconds + * @param persistent + * Optional persistent setting + * @return computation instance + */ + @Deprecated + public Computation execute(String program, long start, long stop, long resolution, + long maxDelay, boolean persistent) { + return execute(program, start, stop, resolution, maxDelay, persistent, false, null); + } + + /** + * This method is deprecated and will be removed in the next major release. Use + * {@link #execute(String, Long, Long, Long, Long, Boolean, Boolean, String)} instead + * + * @param program + * computation written in signalflow language + * @param start + * Optional timestamp in milliseconds since epoch. Defaults to the current timestamp. + * @param stop + * Optional timestamp in milliseconds since epoch. Defaults to infinity. + * @param resolution + * Optional the minimum desired data resolution, in milliseconds. This allows the + * client to put an upper bound on the number of datapoints in the computation + * output. + * @param maxDelay + * Optional desired maximum data delay, in milliseconds between 1 and 900000. When + * set to zero or unset, max delay will be evaluated dynamically based on the + * historical lag information of the input data. + * @param persistent + * Optional persistent setting + * @param immediate + * Optional adjusts the stop timestamp so that the computation doesn't wait for + * future data to be available + * @return computation instance + */ + @Deprecated + public Computation execute(String program, Long start, Long stop, Long resolution, + Long maxDelay, Boolean persistent, Boolean immediate) { + return execute(program, start, stop, resolution, maxDelay, persistent, immediate, null); + } + + /** + * Execute the given SignalFlow program with parameters and stream the output back. + * + * @param program + * computation written in signalflow language + * @param start + * Optional timestamp in milliseconds since epoch. Defaults to the current timestamp. + * @param stop + * Optional timestamp in milliseconds since epoch. Defaults to infinity. + * @param resolution + * Optional the minimum desired data resolution, in milliseconds. This allows the + * client to put an upper bound on the number of datapoints in the computation + * output. + * @param maxDelay + * Optional desired maximum data delay, in milliseconds between 1 and 900000. When + * set to zero or unset, max delay will be evaluated dynamically based on the + * historical lag information of the input data. + * @param persistent + * Optional persistent setting + * @param immediate + * Optional adjusts the stop timestamp so that the computation doesn't wait for + * future data to be available + * @param timeZone + * Optional the time zone to be used for computation. The value is forwarded to the endpoint. + * Supported time zone values are mentioned in the docs. + * @return computation instance + */ + public Computation execute(String program, Long start, Long stop, Long resolution, + Long maxDelay, Boolean persistent, Boolean immediate, String timeZone) { + Map params = buildParams("start", start, "stop", stop, "resolution", + resolution, "maxDelay", maxDelay, "persistent", persistent, "immediate", immediate, "timezone", timeZone); + return new Computation(this.transport, program, params, false); + } + + /** + * Start executing the given SignalFlow program without being attached to the output of the + * computation. + * + * @param program + * computation written in signalflow language + */ + public void start(String program) { + this.transport.start(program, Collections. emptyMap()); + } + + /** + * Start executing the given SignalFlow program without being attached to the output of the + * computation. + * + * @param program + * computation written in signalflow language + * @param start + * Optional millisecond start timestamp + * @param stop + * Optional millisecond stop timestamp + * @param resolution + * Optional desired data resolution, in milliseconds + * @param maxDelay + * Optional desired maximum data delay, in milliseconds + */ + public void start(String program, long start, long stop, long resolution, long maxDelay) { + Map params = buildParams("start", start, "stop", stop, "resolution", + resolution, "maxDelay", maxDelay); + this.transport.start(program, params); + } + + /** + * Stop a SignalFlow computation + * + * @param computation + * computation instance + * @param reason + * Optional description of why stop was called + */ + public void stop(Computation computation, String reason) { + stop(computation.getId(), reason); + computation.close(); + } + + /** + * Stop a SignalFlow computation + * + * @param handle + * computation id + * @param reason + * Optional description of why stop was called + */ + public void stop(String handle, String reason) { + Map params = buildParams("reason", reason); + this.transport.stop(handle, params); + } + + /** + * Keepalive a SignalFlow computation. + * + * @param handle + * computation id + */ + public void keepalive(String handle) { + this.transport.keepalive(handle); + } + + /** + * Attach to an existing SignalFlow computation. + * + * @param handle + * computation id + * @param filters + * filter written in signalflow language + * @param resolution + * Optional desired data resolution, in milliseconds + * @return computation instance + */ + public Computation attach(String handle, String filters, long resolution) { + return new Computation(this.transport, handle, + buildParams("filters", filters, "resolution", resolution), true); + } + + /** + * Close this SignalFlow client. + */ + @Override + public void close() { + this.transport.close(1000, null); + } + + private static Map buildParams(Object... params) { + Preconditions.checkArgument(params.length % 2 == 0); + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < params.length; i += 2) { + if (params[i] != null && params[i + 1] != null) { + builder.put(params[i].toString(), params[i + 1].toString()); + } + } + return builder.build(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowException.java new file mode 100644 index 0000000..2917994 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowException.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +/** + * A generic error encountered when interacting with the SignalFx SignalFlow API. + * + * @author dgriff + */ +public class SignalFlowException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected int code = 0; + + public SignalFlowException(int code, String message) { + super(message); + this.code = code; + } + + public SignalFlowException(String message) { + super(message); + } + + public SignalFlowException(String message, Throwable cause) { + super(message, cause); + } + + public SignalFlowException(int code, String message, Throwable cause) { + super(message, cause); + this.code = code; + } + + public int getCode() { + return this.code; + } + + public void setCode(int code) { + this.code = code; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowTransport.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowTransport.java new file mode 100644 index 0000000..68c58c0 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowTransport.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.Map; + +/** + * Interface for transports to the SignalFlow API + * + * @author dgriff + */ +public interface SignalFlowTransport { + + /** + * Default host for signalflow + */ + String DEFAULT_HOST = "stream.signalfx.com"; + + /** + * Attach to an existing SignalFlow computation. + * + * @param handle + * computation id + * @param parameters + * computation parameters + * @return An open channel attached to the given computation. + */ + Channel attach(String handle, Map parameters); + + /** + * Execute the given SignalFlow program and stream the output back. + * + * @param program + * computation written in signalflow language + * @param parameters + * computation parameters + * @return An open channel attached to the newly started computation. + */ + Channel execute(String program, Map parameters); + + /** + * Execute a preflight of the given SignalFlow program and stream the output back. + * + * @param program + * computation written in signalflow language + * @param parameters + * computation parameters + * @return An open channel attached to the newly started preflight computation. + */ + Channel preflight(String program, Map parameters); + + /** + * Start executing the given SignalFlow program without being attached to the output of the + * computation. + * + * @param program + * computation written in signalflow language + * @param parameters + * computation parameters + */ + void start(String program, Map parameters); + + /** + * Stop a SignalFlow computation. + * + * @param handle + * computation id + * @param parameters + * computation parameter + */ + void stop(String handle, Map parameters); + + /** + * Close this SignalFlow transport. + * + * @param code + * numeric error id + * @param reason + * Optional description of why closing + */ + void close(int code, String reason); + + /** + * Keep-alive a SignalFlow computation. + * + * @param handle + * computation id + */ + void keepalive(String handle); +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamMessage.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamMessage.java new file mode 100644 index 0000000..a2902fa --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamMessage.java @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Preconditions; + +/** + * Base class for stream messages received from a SignalFlow computation + * + * @author dgriff + */ +public class StreamMessage { + + /** + * Enumeration of kinds of stream messages + */ + public static enum Kind { + + CONTROL("control-message",(byte) 1), + INFORMATION("message",(byte) 2), + EVENT("event",(byte) 3), + METADATA("metadata",(byte) 4), + DATA("data",(byte) 5), + ERROR("error",(byte) 6), + EXPIRED_TSID("expired-tsid",(byte) 10); + + private final String specName; + private final byte type; + + Kind(String specName, byte type) { + this.specName = specName; + this.type = type; + } + + public byte getBinaryType() { + return type; + } + + public String toString() { + return this.specName; + } + + private static final Map SPECNAME_KINDS = new HashMap(); + private static final Map BINARYTYPE_KINDS = new HashMap(); + static { + for (Kind kind : Kind.values()) { + SPECNAME_KINDS.put(kind.specName, kind); + BINARYTYPE_KINDS.put(new Integer(kind.getBinaryType()), kind); + } + } + + public static Kind fromSpecName(String specName) { + Kind kind = SPECNAME_KINDS.get(specName); + Preconditions.checkArgument(kind != null); + return kind; + } + + public static Kind fromBinaryType(int binaryType) { + Kind kind = BINARYTYPE_KINDS.get(binaryType); + Preconditions.checkArgument(kind != null); + return kind; + } + }; + + private String event; + private String id; + private String data; + private Kind kind; + + public StreamMessage() { + this.event = "message"; + kind = Kind.INFORMATION; + } + + public StreamMessage(String event, String id, String data) { + this.event = event; + this.id = id; + this.data = data; + + try { + this.kind = Kind.fromSpecName(event); + } catch (IllegalArgumentException ex) { + kind = Kind.INFORMATION; // set as default kind + } + } + + public Kind getKind() { + return this.kind; + } + + public boolean isKind(Kind kind) { + return this.kind == kind; + } + + public String getEvent() { + return event; + } + + public void setEvent(String event) { + this.event = event; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(event); + builder.append(":"); + builder.append(id); + builder.append(":"); + builder.append(data); + return builder.toString(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamRequestException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamRequestException.java new file mode 100644 index 0000000..481aa8a --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamRequestException.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2019 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +/** + * Exception thrown when the computation fails at request time, possibly for syntax errors. + * + * @author cwatson + */ +public class StreamRequestException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected int errorCode; + protected String message; + + public StreamRequestException(int errorCode, String message) { + super("Computation failed (" + message + ") code: " + errorCode); + this.message = message; + } + + public int getErrorCode() { + return this.errorCode; + } + + public String getMessage() { + return this.message; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/WebSocketTransport.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/WebSocketTransport.java new file mode 100644 index 0000000..da0e575 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/WebSocketTransport.java @@ -0,0 +1,666 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.http.client.utils.URIBuilder; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.Uninterruptibles; +import com.signalfx.signalflow.client.endpoint.SignalFxEndpoint; +import com.signalfx.signalflow.client.ChannelMessage.Type; +import com.signalfx.signalflow.client.StreamMessage.Kind; + +/** + * WebSocket based transport. + * + * Uses the SignalFlow WebSocket connection endpoint to interact with SignalFx's SignalFlow API. + * Multiple computation streams can be multiplexed through a single, pre-opened WebSocket + * connection. It also utilizes a more efficient binary encoding for data so it requires less + * bandwidth and has overall less latency. + * + * @author dgriff + */ +public class WebSocketTransport implements SignalFlowTransport { + + protected static final Logger log = LoggerFactory.getLogger(WebSocketTransport.class); + public static final int DEFAULT_TIMEOUT = 1; // 1 second + + protected final String token; + protected final SignalFxEndpoint endpoint; + protected final String path; + protected final int timeout; + protected final boolean compress; + protected WebSocketClient webSocketClient; + protected TransportConnection transportConnection; + + protected WebSocketTransport(String token, SignalFxEndpoint endpoint, int apiVersion, + int timeout, boolean compress, int maxBinaryMessageSize) { + this.token = token; + this.endpoint = endpoint; + this.path = "/v" + apiVersion + "/signalflow/connect"; + this.timeout = timeout; + this.compress = compress; + + try { + this.transportConnection = new TransportConnection(token); + URI uri = new URIBuilder(String.format("%s://%s:%s%s", endpoint.getScheme(), + endpoint.getHostname(), endpoint.getPort(), path)).build(); + + this.webSocketClient = new WebSocketClient(new SslContextFactory()); + if (maxBinaryMessageSize > 0) { + this.webSocketClient.getPolicy().setMaxBinaryMessageSize(maxBinaryMessageSize); + } + if (timeout > 0) { + this.webSocketClient.setConnectTimeout(TimeUnit.SECONDS.toMillis(timeout)); + } + this.webSocketClient.start(); + this.webSocketClient.connect(this.transportConnection, uri); + this.transportConnection.awaitConnected(timeout, TimeUnit.SECONDS); + } catch (Exception ex) { + if (this.webSocketClient != null) { + try { + this.webSocketClient.stop(); + } catch (Exception e) { + log.warn("error closing websocket client", e); + } + } + throw new SignalFlowException("failed to construct websocket transport", ex); + } + } + + @Override + public Channel attach(String handle, Map parameters) { + log.debug("attach: [ {} ] with parameters: {}", handle, parameters); + + Channel channel = new TransportChannel(transportConnection); + + Map request = new HashMap(parameters); + request.put("type", "attach"); + request.put("handle", handle); + request.put("compress", Boolean.toString(compress)); + + transportConnection.sendMessage(channel, request); + + return channel; + } + + @Override + public Channel execute(String program, Map parameters) { + log.debug("execute: [ {} ] with parameters: {}", program, parameters); + + Channel channel = new TransportChannel(transportConnection); + HashMap request = new HashMap(parameters); + request.put("type", "execute"); + request.put("program", program); + request.put("compress", Boolean.toString(compress)); + + transportConnection.sendMessage(channel, request); + + return channel; + } + + @Override + public Channel preflight(String program, Map parameters) { + log.debug("preflight: [ {} ] with parameters: {}", program, parameters); + + Channel channel = new TransportChannel(transportConnection); + HashMap request = new HashMap(parameters); + request.put("type", "preflight"); + request.put("program", program); + + transportConnection.sendMessage(channel, parameters); + + return channel; + } + + @Override + public void start(String program, Map parameters) { + log.debug("start: [ {} ] with parameters: {}", program, parameters); + + HashMap request = new HashMap(parameters); + request.put("type", "start"); + request.put("program", program); + + transportConnection.sendMessage(request); + } + + @Override + public void stop(String handle, Map parameters) { + log.debug("stop: [ {} ] with parameters: {}", handle, parameters); + + HashMap request = new HashMap(parameters); + request.put("type", "stop"); + request.put("handle", handle); + + transportConnection.sendMessage(request); + } + + @Override + public void close(int code, String reason) { + if (transportConnection.getSession() != null && transportConnection.getSession().isOpen()) { + transportConnection.close(code, reason); + try { + webSocketClient.stop(); + } catch (Exception e) { + log.warn("error while close underlying websocket client", e); + } + log.debug("transport closed"); + } + } + + @Override + public void keepalive(String handle) { + log.debug("keepalive: [ {} ]", handle); + + HashMap request = new HashMap(); + request.put("type", "keepalive"); + request.put("handle", handle); + + transportConnection.sendMessage(request); + } + + /** + * Builder of WebSocket Transport Instance + */ + public static class TransportBuilder { + + private String token; + private String protocol = "wss"; + private String host = DEFAULT_HOST; + private int port = 443; + private int timeout = DEFAULT_TIMEOUT; + private int version = 2; + private boolean compress = true; + private int maxBinaryMessageSize = -1; + + public TransportBuilder(String token) { + this.token = token; + } + + public TransportBuilder setProtocol(String protocol) { + this.protocol = protocol; + return this; + } + + public TransportBuilder setHost(String host) { + this.host = host; + return this; + } + + public TransportBuilder setPort(int port) { + this.port = port; + return this; + } + + public TransportBuilder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public TransportBuilder setAPIVersion(int version) { + this.version = version; + return this; + } + + public TransportBuilder useCompression(boolean compress) { + this.compress = compress; + return this; + } + + public TransportBuilder setMaxBinaryMessageSize(int size) { + this.maxBinaryMessageSize = size; + return this; + } + + public WebSocketTransport build() { + SignalFxEndpoint endpoint = new SignalFxEndpoint(this.protocol, this.host, this.port); + WebSocketTransport transport = new WebSocketTransport(this.token, endpoint, + this.version, this.timeout, this.compress, this.maxBinaryMessageSize); + return transport; + } + } + + /** + * Special type of StreamMessage for conveying websocket/connection errors to channels + */ + protected static class SignalFlowExceptionStreamMessage extends StreamMessage { + + protected SignalFlowException exception; + + public SignalFlowExceptionStreamMessage(final SignalFlowException exception) { + super("error", null, exception.getMessage()); + this.exception = exception; + } + + public SignalFlowException getException() { + return this.exception; + } + } + + /** + * WebSocket Transport Connection + */ + protected static class TransportConnection extends WebSocketAdapter { + + private static final Logger log = LoggerFactory.getLogger(TransportConnection.class); + + private static final Charset ASCII = Charset.forName("US-ASCII"); + private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static final BaseEncoding base64Encoder = BaseEncoding.base64Url().omitPadding(); + private static final TypeReference> MAP_TYPE_REF = new TypeReference>() {}; + + private static final int MAX_CHANNEL_NAME_LENGTH = 16; + private static final int BINARY_PREAMBLE_LENGTH = 4; + private static final int BINARY_HEADER_LENGTH = 20; + + private static final int LONG_TYPE = 0x01; + private static final int DOUBLE_TYPE = 0x02; + private static final int INT_TYPE = 0x03; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + static { + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + private final CountDownLatch latch = new CountDownLatch(1); + private final String token; + private final Map channels = Collections + .synchronizedMap(new HashMap()); + private SignalFlowException error; + + protected TransportConnection(String token) { + this.token = token; + } + + @Override + public void onWebSocketConnect(Session session) { + super.onWebSocketConnect(session); + log.debug("websocket connected to {}", session.getRemoteAddress()); + + Map authRequest = new HashMap(); + authRequest.put("type", "authenticate"); + authRequest.put("token", this.token); + + sendMessage(authRequest); + } + + @Override + public void onWebSocketClose(int code, String reason) { + log.debug("websocket connection closed ({} {})", code, reason); + + if (code != 1000) { + this.error = new SignalFlowException(code, reason); + log.info("Lost WebSocket connection with {} ({}).", getSession().getRemoteAddress(), + code); + + SignalFlowExceptionStreamMessage errorMessage = new SignalFlowExceptionStreamMessage( + this.error); + for (TransportChannel channel : this.channels.values()) { + channel.offer(errorMessage); + } + } + + this.channels.clear(); + super.onWebSocketClose(code, reason); + } + + @Override + public void onWebSocketBinary(byte[] data, int offset, int length) { + byte version = data[offset]; + byte type; + byte flags; + + // Decode message type and flags from header + switch (version) { + case 1: + // +--------------+--------------+--------------+--------------+ + // | Version | Message type | Flags | Reserved | + type = data[offset + 1]; + flags = data[offset + 2]; + break; + case 2: + // +--------------+--------------+--------------+--------------+ + // | Version | Message type | Flags | + type = data[offset + 2]; + flags = data[offset + 3]; + break; + default: + log.error("ignoring message with unsupported encoding version {}", version); + return; + } + + Kind kind; + try { + kind = Kind.fromBinaryType(type); + } catch (IllegalArgumentException iae) { + log.error("ignoring message with unsupported type {}", type); + return; + } + + // Channel name is the 16 bytes following the binary preamble in the header. + String channelName = new String(data, offset + BINARY_PREAMBLE_LENGTH, + MAX_CHANNEL_NAME_LENGTH, ASCII); + // Everything after that is the body of the message. + byte[] body = Arrays.copyOfRange(data, offset + BINARY_HEADER_LENGTH, offset + length); + + boolean compressed = (flags & (1 << 0)) != 0; + if (compressed) { + ByteArrayInputStream bais = new ByteArrayInputStream(body); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + GZIPInputStream gzip = new GZIPInputStream(bais); + try { + IOUtils.copy(gzip, baos); + } finally { + IOUtils.closeQuietly(gzip); + } + body = baos.toByteArray(); + } catch (IOException ioe) { + log.error("failed to process message", ioe); + return; + } finally { + IOUtils.closeQuietly(baos); + IOUtils.closeQuietly(bais); + } + } + + boolean json = (flags & (1 << 1)) != 0; + if (json) { + onWebSocketText(new String(body, UTF_8)); + return; + } + + Map message = null; + switch (kind) { + case DATA: + message = decodeBinaryDataMessage(version, body); + break; + default: + log.error("ignoring message with unsupported binary encoding of kind {}", kind); + return; + } + + if (message != null) { + TransportChannel channel = channels.get(channelName); + if (channel != null && !channel.isClosed()) { + try { + StreamMessage streamMessage = new StreamMessage("data", null, + objectMapper.writeValueAsString(message)); + channel.offer(streamMessage); + } catch (JsonProcessingException ex) { + log.error("failed to process message", ex); + } + } else { + log.debug("ignoring message. channel not found {}", channelName); + } + } + } + + private static Map decodeBinaryDataMessage(byte version, byte[] data) { + try { + Map message = new HashMap(); + ByteBuffer buffer = ByteBuffer.wrap(data); + switch (version) { + case 1: + message.put("logicalTimestampMs", buffer.getLong()); + break; + case 2: + message.put("logicalTimestampMs", buffer.getLong()); + message.put("maxDelayMs", buffer.getLong()); + break; + } + + int count = buffer.getInt(); + List> datapoints = new ArrayList>(count); + for (int element = 0; element < count; element++) { + Map elementMap = new HashMap(3); + + byte type = buffer.get(); + byte[] tsIdBytes = new byte[8]; + buffer.get(tsIdBytes); + elementMap.put("tsId", base64Encoder.encode(tsIdBytes)); + + switch (type) { + case LONG_TYPE: + case INT_TYPE: // int or long value + elementMap.put("value", buffer.getLong()); + break; + case DOUBLE_TYPE: // double value + elementMap.put("value", buffer.getDouble()); + break; + default: + log.warn("ignoring data message with unknown value type {}", type); + return null; + } + + datapoints.add(elementMap); + } + message.put("data", datapoints); + return message; + } catch (Exception ex) { + log.error("failed to construct transport data message", ex); + return null; + } + } + + @Override + public void onWebSocketText(String data) { + try { + // Incoming text message is expected to be JSON. + Map dataMap = objectMapper.readValue(data, MAP_TYPE_REF); + + // Intercept KEEP_ALIVE messages + String event = (String) dataMap.get("event"); + if ("KEEP_ALIVE".equals(event)) { + return; + } + + String type = (String) dataMap.get("type"); + if (type == null) { + log.debug("type missing so ignoring message. {}", dataMap); + return; + } + + // Authenticated messages inform us that our authentication has been accepted + // and we can now consider the socket as "connected". + if (type.equals("authenticated")) { + log.info("WebSocket connection authenticated as {} (in {})", + dataMap.get("userId"), dataMap.get("orgId")); + this.latch.countDown(); + } else { + // All other messages should have a channel. + String channelName = (String) dataMap.get("channel"); + if (channelName != null) { + TransportChannel channel = channels.get(channelName); + if ((channel != null) && (!channel.isClosed())) { + StreamMessage message = new StreamMessage(type, null, data); + channel.offer(message); + } else { + log.debug("ignoring message. channel not found {}", channelName); + } + } + } + } catch (IOException ex) { + log.error("failed to process messages", ex); + } + } + + public void sendMessage(final Map request) { + try { + String message = objectMapper.writeValueAsString(request); + this.getRemote().sendString(message); + } catch (Exception ex) { + throw new SignalFlowException("failed to send message", ex); + } + } + + public void sendMessage(final Channel channel, final Map request) { + try { + Map channelRequest = new HashMap(request); + channelRequest.put("channel", channel.getName()); + String message = objectMapper.writeValueAsString(channelRequest); + this.getRemote().sendString(message); + } catch (Exception ex) { + throw new SignalFlowException( + "failed to send message for channel " + channel.getName(), ex); + } + } + + public void add(TransportChannel channel) { + this.channels.put(channel.getName(), channel); + } + + public void remove(TransportChannel channel) { + this.channels.remove(channel); + } + + public void close(int code, String reason) { + for (Channel channel : this.channels.values()) { + channel.close(); + } + this.channels.clear(); + this.getSession().close(code, reason); + this.latch.countDown(); + } + + public void awaitConnected(long timeout, TimeUnit unit) throws TimeoutException { + if (!Uninterruptibles.awaitUninterruptibly(this.latch, timeout, unit)) { + throw new TimeoutException("timeout establishing connection"); + } + } + } + + /** + * Computation channel fed from a Server-Sent Events stream. + */ + protected static class TransportChannel extends Channel { + + protected static final Logger log = LoggerFactory.getLogger(TransportChannel.class); + protected TransportConnection connection; + protected Queue messageQueue = new ConcurrentLinkedQueue(); + protected TransportEventStreamParser parser = new TransportEventStreamParser(messageQueue); + + public TransportChannel(TransportConnection sharedConnection) { + super(); + this.connection = sharedConnection; + this.iterator = parser; + this.connection.add(this); // register channel with transport connection + log.debug("constructed {} of type {}", this.toString(), this.getClass().getName()); + } + + public boolean offer(final StreamMessage message) { + return messageQueue.offer(message); + } + + @Override + public void close() { + super.close(); + this.connection.remove(this); // deregister channel with transport connection + } + } + + /** + * Iterator over stream messages from websocket connection for a channel + */ + protected static class TransportEventStreamParser implements Iterator { + + protected Queue messageQueue; + protected boolean isClosed = false; + + public TransportEventStreamParser(Queue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public boolean hasNext() { + return isClosed == false; + } + + @Override + public StreamMessage next() { + StreamMessage streamMessage = null; + while ((!isClosed) && (streamMessage == null)) { + + streamMessage = messageQueue.poll(); + if (streamMessage != null) { + + switch (streamMessage.getKind()) { + + case CONTROL: + ChannelMessage channelMessage = ChannelMessage + .decodeStreamMessage(streamMessage); + if ((channelMessage.getType() == Type.END_OF_CHANNEL) + || (channelMessage.getType() == Type.CHANNEL_ABORT)) { + close(); // this is the last message for computation + } + break; + + case ERROR: + if (streamMessage instanceof SignalFlowExceptionStreamMessage) { + close(); // no more messages now + throw ((SignalFlowExceptionStreamMessage) streamMessage).getException(); + } + break; + + default: + } + + } else { + try { + Thread.sleep(100L); + } catch (InterruptedException ex) { + close(); + } + } + } + + if (streamMessage != null) { + return streamMessage; + } else { + throw new NoSuchElementException("no more stream messages"); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove from stream not supported"); + } + + public void close() { + this.isClosed = true; + } + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java new file mode 100644 index 0000000..081f3dc --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java @@ -0,0 +1,161 @@ +package com.signalfx.signalflow.client.connection; + +import com.signalfx.signalflow.client.endpoint.SignalFxReceiverEndpoint; + +import java.nio.charset.StandardCharsets; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.entity.GzipCompressingEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Pattern; + +public abstract class AbstractHttpReceiverConnection { + + protected static final Logger log = LoggerFactory.getLogger(AbstractHttpReceiverConnection.class); + + // Do not modify this line. It is auto replaced to a version number. + public static final String VERSION_NUMBER = "1.0.0-beta1"; + public static final String USER_AGENT = "SignalFx-java-client/" + VERSION_NUMBER; + public static final String DISABLE_COMPRESSION_PROPERTY = "com.signalfx.public.java.disableHttpCompression"; + + protected static final ContentType JSON_TYPE = ContentType.APPLICATION_JSON; + + protected final CloseableHttpClient client; + protected final HttpHost host; + protected final RequestConfig requestConfig; + + protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, + HttpClientConnectionManager httpClientConnectionManager) { + this(endpoint, timeoutMs, RetryDefaults.DEFAULT_MAX_RETRIES, httpClientConnectionManager); + } + + protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries, + HttpClientConnectionManager httpClientConnectionManager) { + this(endpoint, timeoutMs, RetryDefaults.DEFAULT_MAX_RETRIES, httpClientConnectionManager, RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS); + } + + protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries, + HttpClientConnectionManager httpClientConnectionManager, List> nonRetryableExceptions) { + this.client = HttpClientBuilder.create() + .setConnectionManager(httpClientConnectionManager) + .setRetryHandler(new RetryHandler(maxRetries, nonRetryableExceptions)) + .setServiceUnavailableRetryStrategy(new RetryStrategy(maxRetries)) + .build(); + this.host = new HttpHost(endpoint.getHostname(), endpoint.getPort(), endpoint.getScheme()); + + HttpHost proxy = createHttpProxyFromSystemProperties(endpoint.getHostname()); + this.requestConfig = RequestConfig.custom() + .setSocketTimeout(timeoutMs) + .setConnectionRequestTimeout(timeoutMs) + .setConnectTimeout(timeoutMs) + .setProxy(proxy) + .build(); + } + + protected CloseableHttpResponse postToEndpoint(String auth, HttpEntity entity, String endpoint, + boolean compress) + throws IOException { + if (compress) { + entity = new GzipCompressingEntity(entity); + } + + HttpPost post = new HttpPost(String.format("%s%s", host.toURI(), endpoint)); + post.setConfig(requestConfig); + if (auth != null) { + post.setHeader("X-SF-TOKEN", auth); + } + post.setHeader("User-Agent", USER_AGENT); + post.setEntity(entity); + + try { + log.trace("Talking to endpoint {}", post); + return client.execute(post); + } catch (IOException e) { + log.trace("Exception trying to execute {}", post, e); + throw e; + } + } + + protected void checkHttpResponse(CloseableHttpResponse resp) { + final String body; + try { + body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Unable to get response content", e); + } + if (resp.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new RuntimeException("Invalid status code " + + resp.getStatusLine().getStatusCode() + ": " + body); + } + if (!"\"OK\"".equals(body)) { + throw new RuntimeException("Invalid response body: " + body); + } + } + + /** + * method to create a httphost object based on java network proxy system properties + * + * http.proxyHost: the host name of the proxy server + * http.proxyPort: the port number, the default value being 80 + * http.nonProxyHosts: a list of hosts that should be reached directly, bypassing the proxy. + * This is a list of patterns separated by '|'. + * The patterns may start or end with a '*' for wildcards. + * Any host matching one of these patterns will be reached through a + * direct connection instead of through a proxy. + * + * @param endpointHostname the signalfx endpoint hostname + * + * @return an instance of HttpHost based on the java system properties + * unless the http proxy host is not configured + * OR if the nonProxyHosts rules include this endpoint + * then null will be returned instead + **/ + protected HttpHost createHttpProxyFromSystemProperties(String endpointHostname) { + + String proxyHost = System.getProperty("http.proxyHost"); + if ((proxyHost != null) && (proxyHost.trim().length() > 0)) { + + String nonProxyHosts = System.getProperty("http.nonProxyHosts"); + if (nonProxyHosts != null) { + + // set host strings as regular expressions based on + // nonProxyHosts rules + nonProxyHosts = nonProxyHosts.replaceAll("\\.", "\\\\.").replaceAll("\\*", ".*?"); + + // set groups and alternations + nonProxyHosts = "(" + nonProxyHosts.replaceAll("\\|", ")|(") + ")"; + + final Pattern pattern = Pattern.compile(nonProxyHosts); + if (pattern.matcher(endpointHostname).find()) { + // http proxy is not configured for this endpoint + return null; + } + } + + String proxyPort = System.getProperty("http.proxyPort"); + if ((proxyPort == null) || (proxyPort.trim().length() == 0)) { + // port 80 is the default in java networking/proxy documentation + proxyPort = "80"; + } + + // return http proxy host + return new HttpHost(proxyHost.trim(), Integer.parseInt(proxyPort.trim()), "http"); + } + + // http proxy is not configured + return null; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryDefaults.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryDefaults.java new file mode 100644 index 0000000..5cbbade --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryDefaults.java @@ -0,0 +1,20 @@ +package com.signalfx.signalflow.client.connection; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public final class RetryDefaults { + private RetryDefaults() { + } + + public static final int DEFAULT_MAX_RETRIES = 3; + public static final List> DEFAULT_NON_RETRYABLE_EXCEPTIONS = Collections.unmodifiableList(Arrays.asList( + InterruptedIOException.class, + UnknownHostException.class, + ConnectException.class)); +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryHandler.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryHandler.java new file mode 100644 index 0000000..2bf6511 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryHandler.java @@ -0,0 +1,30 @@ +package com.signalfx.signalflow.client.connection; + +import java.io.IOException; +import java.util.List; + +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; + +import static com.signalfx.signalflow.client.connection.RetryDefaults.DEFAULT_MAX_RETRIES; +import static com.signalfx.signalflow.client.connection.RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS; + +/** + * Compared to the {@link DefaultHttpRequestRetryHandler} we allow retry on {@link + * javax.net.ssl.SSLException}, because it gets thrown when we try to send data points over a + * connection that our server has already closed. It is still unknown how exactly our server closes + * "stale" connections in such a way that http client is unable to detect this. + */ +class RetryHandler extends DefaultHttpRequestRetryHandler { + + public RetryHandler(final int maxRetries) { + this(maxRetries, DEFAULT_NON_RETRYABLE_EXCEPTIONS); + } + + public RetryHandler() { + this(DEFAULT_MAX_RETRIES, DEFAULT_NON_RETRYABLE_EXCEPTIONS); + } + + public RetryHandler(final int maxRetries, List> clazzes) { + super(maxRetries, true, clazzes); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryStrategy.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryStrategy.java new file mode 100644 index 0000000..51eb694 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryStrategy.java @@ -0,0 +1,25 @@ +package com.signalfx.signalflow.client.connection; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.ServiceUnavailableRetryStrategy; +import org.apache.http.protocol.HttpContext; + +public class RetryStrategy implements ServiceUnavailableRetryStrategy { + private final int maxRetries; + + public RetryStrategy(final int maxRetries) { + this.maxRetries = maxRetries; + } + + @Override + public boolean retryRequest(final HttpResponse httpResponse, final int executionCount, final HttpContext httpContext) { + final int statusCode = httpResponse.getStatusLine().getStatusCode(); + return executionCount <= maxRetries && (statusCode == HttpStatus.SC_REQUEST_TIMEOUT || statusCode == HttpStatus.SC_GATEWAY_TIMEOUT || statusCode == 598 || statusCode == -1); + } + + @Override + public long getRetryInterval() { + return 0; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxEndpoint.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxEndpoint.java new file mode 100644 index 0000000..98a66bb --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxEndpoint.java @@ -0,0 +1,88 @@ +package com.signalfx.signalflow.client.endpoint; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parameters that specify how to connect to SignalFx API endpoint + * + * @author jack + */ +public class SignalFxEndpoint implements SignalFxReceiverEndpoint { + public static final String DEFAULT_SCHEME = "https"; + public static final String DEFAULT_HOSTNAME = "ingest.signalfx.com"; + public static final int DEFAULT_PORT = 443; + private static final Logger log = LoggerFactory.getLogger(SignalFxEndpoint.class); + + /** + * API protocol scheme - http or https + */ + private final String scheme; + + /** + * API hostname + */ + private final String hostname; + + /** + * TCP port + */ + private final int port; + + public SignalFxEndpoint(String hostname, int port) { + this(getDefaultScheme(), hostname, port); + } + + public SignalFxEndpoint(String scheme, String hostname, int port) { + this.scheme = scheme; + this.hostname = hostname; + this.port = port; + } + + public SignalFxEndpoint() { + this(getDefaultScheme(), getDefaultHostname(), getDefaultPort()); + } + + private static String getPropertyOrEnv(String propertyName, String envName, String fallback) { + return StringUtils.defaultIfEmpty(System.getProperty(propertyName, System.getenv(envName)), + fallback); + } + + private static String getDefaultScheme() { + return getPropertyOrEnv("com.signalfx.api.scheme", "SIGNALFX_API_SCHEME", DEFAULT_SCHEME); + } + + private static String getDefaultHostname() { + return getPropertyOrEnv("com.signalfx.api.hostname", + "SIGNALFX_API_HOSTNAME", DEFAULT_HOSTNAME); + } + + private static int getDefaultPort() throws NumberFormatException { + final String foundPort = getPropertyOrEnv("com.signalfx.api.port", + "SIGNALFX_API_PORT", Integer.toString(DEFAULT_PORT)); + try { + return Integer.parseInt(foundPort); + } catch (NumberFormatException e) { + log.error("Invalid found port >>{}<<", foundPort, e); + throw e; + } + } + + @Override public String getScheme() { + return scheme; + } + + @Override public String getHostname() { + return hostname; + } + + @Override public int getPort() { + return port; + } + + @Override + public String toString() { + return getScheme() + "://" + getHostname() + ':' + getPort(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxReceiverEndpoint.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxReceiverEndpoint.java new file mode 100644 index 0000000..c09abbc --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxReceiverEndpoint.java @@ -0,0 +1,13 @@ +package com.signalfx.signalflow.client.endpoint; + +/** + * Date: 5/6/14 + * Time: 4:21 PM + * + * @author jack + */ +public interface SignalFxReceiverEndpoint { + String getScheme(); + String getHostname(); + int getPort(); +} diff --git a/signalflow-client/src/test/java/com/signalfx/signalflow/client/SignalFlowClientTest.java b/signalflow-client/src/test/java/com/signalfx/signalflow/client/SignalFlowClientTest.java new file mode 100644 index 0000000..12f5bdd --- /dev/null +++ b/signalflow-client/src/test/java/com/signalfx/signalflow/client/SignalFlowClientTest.java @@ -0,0 +1,57 @@ +package com.signalfx.signalflow.client; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +public class SignalFlowClientTest { + + @Test + public void shouldAutoClose() { + StubTransport transport = new StubTransport(); + + try (SignalFlowClient signalFlowClient = new SignalFlowClient(transport)) {} + + assertTrue(transport.isClosed()); + } + + private static class StubTransport implements SignalFlowTransport { + + private boolean closed = false; + + public boolean isClosed() { + return closed; + } + + @Override + public void close(int code, String reason) { + this.closed = true; + } + + @Override + public Channel attach(String handle, Map parameters) { + return null; + } + + @Override + public Channel execute(String program, Map parameters) { + return null; + } + + @Override + public Channel preflight(String program, Map parameters) { + return null; + } + + @Override + public void start(String program, Map parameters) {} + + @Override + public void stop(String handle, Map parameters) {} + + @Override + public void keepalive(String handle) {} + } +} diff --git a/signalflow-client/src/test/java/com/signalfx/signalflow/connection/RetryStrategyTest.java b/signalflow-client/src/test/java/com/signalfx/signalflow/connection/RetryStrategyTest.java new file mode 100644 index 0000000..244f4df --- /dev/null +++ b/signalflow-client/src/test/java/com/signalfx/signalflow/connection/RetryStrategyTest.java @@ -0,0 +1,101 @@ +package com.signalfx.signalflow.connection; + +import com.signalfx.signalflow.client.connection.RetryStrategy; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.DefaultHttpResponseFactory; +import org.apache.http.protocol.HttpContext; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class RetryStrategyTest { + @Test + public void shouldSetRetryOnRequestTimeout() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_REQUEST_TIMEOUT); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldSetRetryOnGatewayTimeout() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_GATEWAY_TIMEOUT); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldSetRetryOnNegativeStatus() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(-1); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldSetRetryOnInvalidStatusCode() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(598); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldNotRetryOnOtherStatusCode() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_BAD_GATEWAY); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertFalse(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldNotRetryIfRetriesExceeded() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_GATEWAY_TIMEOUT); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertFalse(retryStrategy.retryRequest(mockResp, 4, mockHttpContext)); + } + + private StatusLine generateStatusLineByCode(final int statusCode) { + return new StatusLine() { + @Override + public ProtocolVersion getProtocolVersion() { + return null; + } + + @Override + public int getStatusCode() { + return statusCode; + } + + @Override + public String getReasonPhrase() { + return null; + } + }; + } +} \ No newline at end of file diff --git a/update_version.py b/update_version.py new file mode 100755 index 0000000..c4e0739 --- /dev/null +++ b/update_version.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2017 SignalFx, Inc. All rights reserved. + +# This script is used to update the versions of all the artifacts, the +# User-Agent version of the library, and the documented version in the README +# file, all at once, as part of the release process. + +import logging +import os +import re +import subprocess +from subprocess import PIPE +import sys + +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(os.path.basename(__file__)) + +def match_all(v): + return True + +def no_snapshots(v): + return 'SNAPSHOT' not in v + +FILE_REPLACES = { + 'signalfx-signalflow/src/main/java/com/signalfx/signalflow/connection/AbstractHttpReceiverConnection.java': [ + (match_all, re.compile(r'public static final String VERSION_NUMBER = "(.*?)"'), + 'public static final String VERSION_NUMBER = "%s"') + ], +} + + +def execute(cmd, expected_code=None, stdin=None, background=False): + logger.info('Executing in %s: %s', os.getcwd(), ' '.join(cmd)) + proc = subprocess.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + if background: + return ('', '', 0) # In background + stdout, stderr = proc.communicate(stdin) + logger.debug('Result (%s, %s, %d)', stdout, stderr, proc.returncode) + ret = (stdout, stderr, proc.returncode) + if expected_code is not None and expected_code != ret[2]: + raise Exception('Unable to execute command %s, result: %s', ret) + return ret + + +def update_pom_files(version): + base_dir = os.getcwd() + logger.info('Updating POM files to version %s...', version) + cmd = ['./mvnw', 'versions:set', '-am', '-pl', 'signalfx-signalflow', + '-DnewVersion=%s' % version] + (stdout, _, code) = execute(cmd, expected_code=0) + os.chdir(base_dir) + + +def perform_file_replacements(version): + for file_name, repls in FILE_REPLACES.items(): + logger.info('Updating %d version number location%s in %s...', + len(repls), 's' if len(repls) != 1 else '', file_name) + for repl in repls: + if not repl[0](version): + continue + logger.debug('%s -> %s', repl[1], repl[2]) + file_name = os.path.join(os.getcwd(), file_name) + with open(file_name, 'r') as f: + contents = f.read() + contents = repl[1].sub(repl[2] % version, contents) + with open(file_name, 'w') as f: + f.write(contents) + + +if __name__ == '__main__': + if len(sys.argv) != 2: + sys.stderr.write(f"usage: {sys.argv[0]} \n") + sys.exit(1) + + version = sys.argv[1] + version = re.sub(r'^v', '', version) + update_pom_files(version) + perform_file_replacements(version)