diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0f327c9152..dec39c13c5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -84,7 +84,8 @@ docker-connect-container: - docker push $IMAGE_NAME/streampipes-connect-container:latest - docker push $HARBOR_IMAGE_NAME/streampipes-connect-container:$MVN_VERSION - docker push $HARBOR_IMAGE_NAME/streampipes-connect-container:latest - + only: + - dev deploy: image: maven:3-jdk-8 @@ -163,7 +164,7 @@ finish release: - git push origin --tags - git checkout master - git push github master - - git push github version-$MVN_VERSION + - git push github $MVN_VERSION - git checkout dev - git push github dev @@ -182,6 +183,9 @@ docker hub: - docker push streampipes/backend:$MVN_VERSION - docker build --pull -t streampipes/backend ./streampipes-backend/ - docker push streampipes/backend + - docker build --pull -t streampipes/streampipes-connect-container:latest -t streampipes/streampipes-connect-container:$MVN_VERSION ./streampipes-connect-container/ + - docker push streampipes/streampipes-connect-container:$MVN_VERSION + - docker push streampipes/streampipes-connect-container:latest when: manual only: - master diff --git a/LICENSE b/LICENSE index 9088962c7e..62331553d1 100644 --- a/LICENSE +++ b/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2017 FZI Forschungszentrum Informatik + Copyright 2018 FZI Forschungszentrum Informatik Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -199,4 +199,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. diff --git a/archetypes/streampipes-archetype-pe-processors-flink/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/pom.xml index 8791c5a179..95c5a210fa 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/pom.xml +++ b/archetypes/streampipes-archetype-pe-processors-flink/pom.xml @@ -4,7 +4,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 ../../pom.xml streampipes-archetype-pe-processors-flink @@ -31,29 +31,4 @@ - - - scm:git:ssh://git@ipe-wim-gitlab.fzi.de:2222/streampipes/ce-backend.git/archetypes/streampipes-archetype-pe-sinks-jvm - - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/public/ - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/releases/ - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/snapshots/ - - diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml index c309cbf6b0..0a105588e6 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -17,12 +17,30 @@ **/*.java - + Dockerfile + + + + deployment/docker-compose.yml + + + + + + deployment/system + + + + + + development/env + + diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/deployment/docker-compose.yml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/deployment/docker-compose.yml new file mode 100644 index 0000000000..1a528c9d0b --- /dev/null +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/deployment/docker-compose.yml @@ -0,0 +1,13 @@ +version: "2" +services: + ${artifactId}: + image: ${artifactId} + depends_on: + - "consul" +# ports: +# - "8098:8090" + environment: + - SP_ICON_HOST=${SP_ICON_HOST} + networks: + spnet: + diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/deployment/system b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/deployment/system new file mode 100644 index 0000000000..8bfd130513 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/deployment/system @@ -0,0 +1,11 @@ +activemq +kafka +zookeeper +swagger-ui +connect-master +connect-worker +couchdb +backend +nginx +kafka-rest +testarchetype \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/development/env b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/development/env new file mode 100644 index 0000000000..8e0e563301 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/development/env @@ -0,0 +1,10 @@ +# Those parameters are used by IntelliJ to set the default consul parameters for development +SP_PORT=6666 +SP_HOST=localhost +SP_ICON_HOST=localhost +SP_KAFKA_HOST=localhost +SP_ZOOKEEPER_HOST=localhost +SP_COUCHDB_HOST=localhost +SP_JMS_HOST=localhost +SP_NGINX_HOST=localhost +SP_NGINX_PORT=8082 \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml index 5a80f20dd8..a8f0b08be9 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml @@ -7,7 +7,7 @@ ${version} - 0.55.3-SNAPSHOT + 0.60.1 diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java index e742629bca..9190408b60 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java @@ -13,15 +13,13 @@ public enum Config implements PeConfig { private SpConfig config; public static final String JAR_FILE = "./streampipes-processing-element-container.jar"; - private final static String service_id = "pe/${package}"; - private final static String service_name = "${packageName}"; - private final static String service_container_name = "${artifactId}"; + private final static String SERVICE_ID = "pe/${package}"; Config() { - config = SpConfig.getSpConfig(service_id); + config = SpConfig.getSpConfig(SERVICE_ID); - config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component"); + config.register(ConfigKeys.HOST, "${artifactId}", "Hostname for the pe mixed flink component"); config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component"); config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster"); config.register(ConfigKeys.FLINK_PORT, 6123, "Port for the flink cluster"); @@ -30,7 +28,7 @@ public enum Config implements PeConfig { config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally"); - config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service"); + config.register(ConfigKeys.SERVICE_NAME, "${packageName}", "The name of the service"); } @@ -73,7 +71,7 @@ public boolean getDebug() { @Override public String getId() { - return service_id; + return SERVICE_ID; } @Override diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/pom.xml b/archetypes/streampipes-archetype-pe-processors-jvm/pom.xml index df0c5c921b..efefedc169 100644 --- a/archetypes/streampipes-archetype-pe-processors-jvm/pom.xml +++ b/archetypes/streampipes-archetype-pe-processors-jvm/pom.xml @@ -4,7 +4,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 ../../pom.xml streampipes-archetype-pe-processors-jvm @@ -30,30 +30,4 @@ - - - - scm:git:ssh://git@ipe-wim-gitlab.fzi.de:2222/streampipes/ce-backend.git/archetypes/streampipes-archetype-pe-processors-jvm - - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/public/ - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/releases/ - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/snapshots/ - - diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml index c309cbf6b0..19f5c9f52d 100644 --- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -17,12 +17,30 @@ **/*.java - - + + Dockerfile + + + + deployment/docker-compose.yml + + + + + + deployment/system + + + + + + development/env + + diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/deployment/docker-compose.yml b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/deployment/docker-compose.yml new file mode 100644 index 0000000000..1a528c9d0b --- /dev/null +++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/deployment/docker-compose.yml @@ -0,0 +1,13 @@ +version: "2" +services: + ${artifactId}: + image: ${artifactId} + depends_on: + - "consul" +# ports: +# - "8098:8090" + environment: + - SP_ICON_HOST=${SP_ICON_HOST} + networks: + spnet: + diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/deployment/system b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/deployment/system new file mode 100644 index 0000000000..8bfd130513 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/deployment/system @@ -0,0 +1,11 @@ +activemq +kafka +zookeeper +swagger-ui +connect-master +connect-worker +couchdb +backend +nginx +kafka-rest +testarchetype \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/development/env b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/development/env new file mode 100644 index 0000000000..8e0e563301 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/development/env @@ -0,0 +1,10 @@ +# Those parameters are used by IntelliJ to set the default consul parameters for development +SP_PORT=6666 +SP_HOST=localhost +SP_ICON_HOST=localhost +SP_KAFKA_HOST=localhost +SP_ZOOKEEPER_HOST=localhost +SP_COUCHDB_HOST=localhost +SP_JMS_HOST=localhost +SP_NGINX_HOST=localhost +SP_NGINX_PORT=8082 \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml index 3ce1679429..c11ea7949e 100644 --- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml @@ -7,7 +7,7 @@ ${version} - 0.55.3-SNAPSHOT + 0.60.1 diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java index 51cf66e4ae..54b4136bb1 100644 --- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java +++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java @@ -22,7 +22,7 @@ public enum Config implements PeConfig { Config() { config = SpConfig.getSpConfig("pe/${package}"); - config.register(HOST, "${projectName}", "Hostname for the pe sinks"); + config.register(HOST, "${artifactId}", "Hostname for the pe sinks"); config.register(PORT, 8090, "Port for the pe sinks"); config.register(NGINX_HOST, System.getenv("STREAMPIPES_HOST"), "External hostname of " + "StreamPipes Nginx"); diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/pom.xml b/archetypes/streampipes-archetype-pe-sinks-flink/pom.xml index a08e64ab7b..8065c83c2a 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/pom.xml +++ b/archetypes/streampipes-archetype-pe-sinks-flink/pom.xml @@ -4,7 +4,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 ../../pom.xml streampipes-archetype-pe-sinks-flink @@ -30,30 +30,4 @@ - - - - scm:git:ssh://git@ipe-wim-gitlab.fzi.de:2222/streampipes/ce-backend.git/archetypes/streampipes-archetype-pe-sinks-jvm - - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/public/ - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/releases/ - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/snapshots/ - - diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml index c309cbf6b0..0a105588e6 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -17,12 +17,30 @@ **/*.java - + Dockerfile + + + + deployment/docker-compose.yml + + + + + + deployment/system + + + + + + development/env + + diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/deployment/docker-compose.yml b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/deployment/docker-compose.yml new file mode 100644 index 0000000000..1a528c9d0b --- /dev/null +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/deployment/docker-compose.yml @@ -0,0 +1,13 @@ +version: "2" +services: + ${artifactId}: + image: ${artifactId} + depends_on: + - "consul" +# ports: +# - "8098:8090" + environment: + - SP_ICON_HOST=${SP_ICON_HOST} + networks: + spnet: + diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/deployment/system b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/deployment/system new file mode 100644 index 0000000000..8bfd130513 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/deployment/system @@ -0,0 +1,11 @@ +activemq +kafka +zookeeper +swagger-ui +connect-master +connect-worker +couchdb +backend +nginx +kafka-rest +testarchetype \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/development/env b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/development/env new file mode 100644 index 0000000000..8e0e563301 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/development/env @@ -0,0 +1,10 @@ +# Those parameters are used by IntelliJ to set the default consul parameters for development +SP_PORT=6666 +SP_HOST=localhost +SP_ICON_HOST=localhost +SP_KAFKA_HOST=localhost +SP_ZOOKEEPER_HOST=localhost +SP_COUCHDB_HOST=localhost +SP_JMS_HOST=localhost +SP_NGINX_HOST=localhost +SP_NGINX_PORT=8082 \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml index 5a80f20dd8..7cf8490892 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml @@ -7,7 +7,7 @@ ${version} - 0.55.3-SNAPSHOT + 0.60.1 @@ -81,6 +81,21 @@ streampipes-config ${sp.version} + + org.streampipes + streampipes-dataformat-json + ${sp.version} + + + org.streampipes + streampipes-messaging-kafka + ${sp.version} + + + org.streampipes + streampipes-messaging-jms + ${sp.version} + diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java index e742629bca..da27036fba 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java @@ -13,15 +13,12 @@ public enum Config implements PeConfig { private SpConfig config; public static final String JAR_FILE = "./streampipes-processing-element-container.jar"; - private final static String service_id = "pe/${package}"; - private final static String service_name = "${packageName}"; - private final static String service_container_name = "${artifactId}"; - + private final static String SERVICE_ID = "pe/${package}"; Config() { - config = SpConfig.getSpConfig(service_id); + config = SpConfig.getSpConfig(SERVICE_ID); - config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component"); + config.register(ConfigKeys.HOST, "${artifactId}", "Hostname for the pe mixed flink component"); config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component"); config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster"); config.register(ConfigKeys.FLINK_PORT, 6123, "Port for the flink cluster"); @@ -30,7 +27,7 @@ public enum Config implements PeConfig { config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally"); - config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service"); + config.register(ConfigKeys.SERVICE_NAME, "${packageName}", "The name of the service"); } @@ -73,7 +70,7 @@ public boolean getDebug() { @Override public String getId() { - return service_id; + return SERVICE_ID; } @Override diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java index 9886522bb5..956c1aa998 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java @@ -5,6 +5,9 @@ import org.streampipes.container.init.DeclarersSingleton; import org.streampipes.container.standalone.init.StandaloneModelSubmitter; +import org.streampipes.dataformat.json.JsonDataFormatFactory; +import org.streampipes.messaging.kafka.SpKafkaProtocolFactory; + import ${package}.config.Config; import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller; @@ -15,6 +18,12 @@ public static void main(String[] args) throws Exception { DeclarersSingleton.getInstance() .add(new ${classNamePrefix}Controller()); + DeclarersSingleton.getInstance().setPort(Config.INSTANCE.getPort()); + DeclarersSingleton.getInstance().setHostName(Config.INSTANCE.getHost()); + + DeclarersSingleton.getInstance().registerDataFormat(new JsonDataFormatFactory()); + DeclarersSingleton.getInstance().registerProtocol(new SpKafkaProtocolFactory()); + new Init().init(Config.INSTANCE); } diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/pom.xml b/archetypes/streampipes-archetype-pe-sinks-jvm/pom.xml index c120d8a585..5f997c3e9a 100644 --- a/archetypes/streampipes-archetype-pe-sinks-jvm/pom.xml +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/pom.xml @@ -4,7 +4,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 ../../pom.xml streampipes-archetype-pe-sinks-jvm @@ -30,30 +30,4 @@ - - - - scm:git:ssh://git@ipe-wim-gitlab.fzi.de:2222/streampipes/ce-backend.git/archetypes/streampipes-archetype-pe-sinks-jvm - - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/public/ - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/releases/ - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/snapshots/ - - diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml index c309cbf6b0..0a105588e6 100644 --- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -17,12 +17,30 @@ **/*.java - + Dockerfile + + + + deployment/docker-compose.yml + + + + + + deployment/system + + + + + + development/env + + diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/deployment/docker-compose.yml b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/deployment/docker-compose.yml new file mode 100644 index 0000000000..1a528c9d0b --- /dev/null +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/deployment/docker-compose.yml @@ -0,0 +1,13 @@ +version: "2" +services: + ${artifactId}: + image: ${artifactId} + depends_on: + - "consul" +# ports: +# - "8098:8090" + environment: + - SP_ICON_HOST=${SP_ICON_HOST} + networks: + spnet: + diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/deployment/system b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/deployment/system new file mode 100644 index 0000000000..8bfd130513 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/deployment/system @@ -0,0 +1,11 @@ +activemq +kafka +zookeeper +swagger-ui +connect-master +connect-worker +couchdb +backend +nginx +kafka-rest +testarchetype \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/development/env b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/development/env new file mode 100644 index 0000000000..8e0e563301 --- /dev/null +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/development/env @@ -0,0 +1,10 @@ +# Those parameters are used by IntelliJ to set the default consul parameters for development +SP_PORT=6666 +SP_HOST=localhost +SP_ICON_HOST=localhost +SP_KAFKA_HOST=localhost +SP_ZOOKEEPER_HOST=localhost +SP_COUCHDB_HOST=localhost +SP_JMS_HOST=localhost +SP_NGINX_HOST=localhost +SP_NGINX_PORT=8082 \ No newline at end of file diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml index 3ce1679429..c11ea7949e 100644 --- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml @@ -7,7 +7,7 @@ ${version} - 0.55.3-SNAPSHOT + 0.60.1 diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java index 51cf66e4ae..54b4136bb1 100644 --- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java @@ -22,7 +22,7 @@ public enum Config implements PeConfig { Config() { config = SpConfig.getSpConfig("pe/${package}"); - config.register(HOST, "${projectName}", "Hostname for the pe sinks"); + config.register(HOST, "${artifactId}", "Hostname for the pe sinks"); config.register(PORT, 8090, "Port for the pe sinks"); config.register(NGINX_HOST, System.getenv("STREAMPIPES_HOST"), "External hostname of " + "StreamPipes Nginx"); diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java index c36e4dec52..d0fbdcfaa9 100644 --- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java @@ -9,7 +9,7 @@ import org.streampipes.messaging.kafka.SpKafkaProtocolFactory; import ${package}.config.Config; -import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller; +import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller; public class Init extends StandaloneModelSubmitter { diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java index 1046dce9b7..321c0a8d01 100644 --- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java +++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java @@ -36,7 +36,7 @@ public DataSinkDescription declareModel() { } @Override - public ConfiguredEventSink<${classNamePrefix}Parameters> onInvocation(DataSinkInvocation graph) { + public ConfiguredEventSink<${classNamePrefix}Parameters> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) { String exampleString = extractor.singleValueParameter(EXAMPLE_KEY, String.class); diff --git a/archetypes/streampipes-archetype-pe-sources/pom.xml b/archetypes/streampipes-archetype-pe-sources/pom.xml index 399236e230..183f039b9c 100644 --- a/archetypes/streampipes-archetype-pe-sources/pom.xml +++ b/archetypes/streampipes-archetype-pe-sources/pom.xml @@ -4,7 +4,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 ../../pom.xml streampipes-archetype-pe-sources @@ -30,30 +30,4 @@ - - - - scm:git:ssh://git@ipe-wim-gitlab.fzi.de:2222/streampipes/ce-backend.git/archetypes/streampipes-archetype-pe-sources - - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/public/ - - - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/releases/ - - - deployment - Internal Releases - https://laus.fzi.de/nexus/content/repositories/snapshots/ - - diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml index f10d8ce531..e61b505e13 100644 --- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml @@ -6,39 +6,50 @@ ${artifactId} ${version} + + 0.60.1 + + org.streampipes streampipes-commons + ${sp.version} org.streampipes streampipes-messaging-kafka + ${sp.version} org.streampipes streampipes-messaging-jms + ${sp.version} org.streampipes streampipes-dataformat-json + ${sp.version} org.streampipes streampipes-wrapper-standalone + ${sp.version} org.streampipes streampipes-container-standalone + ${sp.version} org.streampipes streampipes-sdk + ${sp.version} org.streampipes streampipes-config - ${version} + ${sp.version} diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000000..48e341a095 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,3 @@ +{ + "lockfileVersion": 1 +} diff --git a/pom.xml b/pom.xml index 778fdd3822..cc6d50f91e 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 pom UTF-8 @@ -724,4 +724,4 @@ https://laus.fzi.de/nexus/content/repositories/snapshots/ - \ No newline at end of file + diff --git a/streampipes-app-file-export/pom.xml b/streampipes-app-file-export/pom.xml index 7903d68e98..59f0d87dde 100644 --- a/streampipes-app-file-export/pom.xml +++ b/streampipes-app-file-export/pom.xml @@ -21,7 +21,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 StreamPipes App File Export streampipes-app-file-export diff --git a/streampipes-backend/pom.xml b/streampipes-backend/pom.xml index f1cd415a32..5b5eb4bcc4 100644 --- a/streampipes-backend/pom.xml +++ b/streampipes-backend/pom.xml @@ -3,7 +3,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-backend war diff --git a/streampipes-code-generation/pom.xml b/streampipes-code-generation/pom.xml index 32f522ffcb..bf68bcaaa5 100644 --- a/streampipes-code-generation/pom.xml +++ b/streampipes-code-generation/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-code-generation diff --git a/streampipes-commons/pom.xml b/streampipes-commons/pom.xml index f4e3579c82..4163812579 100644 --- a/streampipes-commons/pom.xml +++ b/streampipes-commons/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-commons StreamPipes Commons diff --git a/streampipes-config/pom.xml b/streampipes-config/pom.xml index 6d91f808d6..6ec7467d68 100644 --- a/streampipes-config/pom.xml +++ b/streampipes-config/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-connect-container/pom.xml b/streampipes-connect-container/pom.xml index 36bb440dd4..8d2807d78c 100644 --- a/streampipes-connect-container/pom.xml +++ b/streampipes-connect-container/pom.xml @@ -3,7 +3,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConfigKeys.java b/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConfigKeys.java index 2badeab7e7..42091bd8ad 100644 --- a/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConfigKeys.java +++ b/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConfigKeys.java @@ -29,4 +29,7 @@ public class ConfigKeys { final static String CONNECT_CONTAINER_WORKER_HOST = "SP_CONNECT_CONTAINER_WORKER_HOST"; final static String CONNECT_CONTAINER_WORKER_PORT = "SP_CONNECT_CONTAINER_WORKER_PORT"; + + final static String DATA_LOCATION = "SP_DATA_LOCATION"; + } diff --git a/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConnectContainerConfig.java b/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConnectContainerConfig.java index cc5f91b6a5..d019576ff7 100644 --- a/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConnectContainerConfig.java +++ b/streampipes-connect-container/src/main/java/org/streampipes/connect/config/ConnectContainerConfig.java @@ -40,6 +40,9 @@ public enum ConnectContainerConfig { config.register(ConfigKeys.CONNECT_CONTAINER_WORKER_PORT, Config.WORKER_PORT, "The port of the connect container"); config.register(ConfigKeys.CONNECT_CONTAINER_WORKER_HOST, "connect-worker", "The hostname of the connect container"); + + config.register(ConfigKeys.DATA_LOCATION,"/data/", "Folder that stores all the uploaded data"); + } public String getBackendApiUrl() { @@ -96,4 +99,11 @@ public Integer getConnectContainerWorkerPort() { return config.getInteger(ConfigKeys.CONNECT_CONTAINER_WORKER_PORT); } + public String getDataLocation() { + return config.getString(ConfigKeys.DATA_LOCATION); + } + + + + } diff --git a/streampipes-connect-container/src/main/java/org/streampipes/connect/init/Main.java b/streampipes-connect-container/src/main/java/org/streampipes/connect/init/Main.java index 3ecf3dfe00..22ad123ef4 100644 --- a/streampipes-connect-container/src/main/java/org/streampipes/connect/init/Main.java +++ b/streampipes-connect-container/src/main/java/org/streampipes/connect/init/Main.java @@ -20,6 +20,7 @@ import org.apache.http.client.fluent.Request; import org.eclipse.jetty.server.Server; import org.glassfish.jersey.jetty.JettyHttpContainerFactory; +import org.glassfish.jersey.media.multipart.MultiPartFeature; import org.glassfish.jersey.server.ResourceConfig; import org.lightcouch.CouchDbClient; import org.slf4j.Logger; @@ -107,6 +108,9 @@ private static Set> getMasterApiClasses() { allClasses.add(DescriptionResource.class); allClasses.add(SourcesResource.class); allClasses.add(GuessResource.class); + allClasses.add(FileResource.class); + allClasses.add(MultiPartFeature.class); + allClasses.addAll(getApiClasses()); diff --git a/streampipes-connect-container/src/main/java/org/streampipes/connect/management/master/FileManagement.java b/streampipes-connect-container/src/main/java/org/streampipes/connect/management/master/FileManagement.java new file mode 100644 index 0000000000..99624683d2 --- /dev/null +++ b/streampipes-connect-container/src/main/java/org/streampipes/connect/management/master/FileManagement.java @@ -0,0 +1,80 @@ +/* +Copyright 2018 FZI Forschungszentrum Informatik + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.streampipes.connect.management.master; + +import org.apache.commons.io.IOUtils; +import org.streampipes.connect.config.ConnectContainerConfig; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +public class FileManagement { + + public void saveFile(InputStream inputStream, String fileName) throws IOException { + String filePath = getMainFilePath() + fileName; + saveFile(filePath, inputStream); + } + + public List getFilePahts(String username) throws IOException { + List urls = new ArrayList<>(); + File[] files = new File(getMainFilePath()).listFiles(); + for (int i = 0; i < files.length; i++) { + urls.add(getMainFilePath() + files[i].getName()); + } + + return urls; + } + + public File getFile(String name) throws IOException { + File file = new File(getMainFilePath() + name); + if(file.exists()) { + return file; + } else { + throw new IOException(); + } + } + + public void deleteFile(String name) throws IOException { + File file = new File(getMainFilePath() + name); + if(file.exists()) { + file.delete(); + } else { + throw new IOException("File" + name + "is not excisting"); + } + } + + private void saveFile(String filePath, InputStream inputStream ) throws IOException { + File file = new File(filePath); + file.getParentFile().mkdirs(); + file.createNewFile(); + byte[] aByte = IOUtils.toByteArray(inputStream); + FileOutputStream fos =new FileOutputStream(file); + IOUtils.write(aByte, fos); + } + + private String getMainFilePath() { + return ConnectContainerConfig.INSTANCE.getDataLocation(); + } + + + + +} diff --git a/streampipes-connect-container/src/main/java/org/streampipes/connect/rest/AbstractContainerResource.java b/streampipes-connect-container/src/main/java/org/streampipes/connect/rest/AbstractContainerResource.java index 72f39acf3e..cc73bc77ce 100644 --- a/streampipes-connect-container/src/main/java/org/streampipes/connect/rest/AbstractContainerResource.java +++ b/streampipes-connect-container/src/main/java/org/streampipes/connect/rest/AbstractContainerResource.java @@ -30,6 +30,13 @@ protected Response ok(T entity) { .build(); } + protected Response ok() { + return Response + .ok() + .build(); + } + + protected Response statusMessage(Message message) { return ok(message); } diff --git a/streampipes-connect-container/src/main/java/org/streampipes/connect/rest/master/FileResource.java b/streampipes-connect-container/src/main/java/org/streampipes/connect/rest/master/FileResource.java new file mode 100644 index 0000000000..8a03179ff0 --- /dev/null +++ b/streampipes-connect-container/src/main/java/org/streampipes/connect/rest/master/FileResource.java @@ -0,0 +1,102 @@ +/* +Copyright 2018 FZI Forschungszentrum Informatik + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.streampipes.connect.rest.master; + +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.streampipes.connect.management.master.FileManagement; +import org.streampipes.connect.rest.AbstractContainerResource; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +@Path("/api/v1/{username}/master/file") +public class FileResource extends AbstractContainerResource { + + private Logger logger = LoggerFactory.getLogger(FileResource.class); + + FileManagement fileManagement; + + public FileResource() { + this.fileManagement = new FileManagement(); + } + + public FileResource(FileManagement fileManagement) { + this.fileManagement = fileManagement; + } + + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response uploadFiles(@FormDataParam("file_upload") InputStream uploadedInputStream, + @FormDataParam("file_upload") FormDataContentDisposition fileDetail) { + + try { + fileManagement.saveFile(uploadedInputStream, fileDetail.getFileName()); + return ok(); + } catch (Exception e) { + logger.error(e.toString()); + return fail(); + } + } + + @GET + // @Produces({MediaType.F}) + @Path("/{filename}") + public Response getFile(@PathParam("filename") String fileName) { + try { + File file = fileManagement.getFile(fileName); + logger.info("Downloaded file: " + fileName); + return Response.ok(file, MediaType.APPLICATION_OCTET_STREAM) + .header("Content-Disposition", "attachment; filename=\"" + fileName + "\"") + .build(); + } catch (IOException e) { + logger.error(e.toString()); + return fail(); + } + } + + @GET + public Response getFilePahts(@PathParam("username") String username) { + try { + return ok(fileManagement.getFilePahts(username)); + } catch (IOException e) { + logger.error(e.toString()); + return fail(); + } + } + + + @DELETE + @Path("/{filename}") + @Consumes(MediaType.APPLICATION_JSON) + public Response deleteFile(@PathParam("filename") String fileName) { + try { + fileManagement.deleteFile(fileName); + return ok(); + } catch (IOException e) { + logger.error(e.toString()); + return fail(); } + } + + +} diff --git a/streampipes-connect/pom.xml b/streampipes-connect/pom.xml index 84d0b2e2de..439824b9ac 100755 --- a/streampipes-connect/pom.xml +++ b/streampipes-connect/pom.xml @@ -3,7 +3,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/AdapterRegistry.java b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/AdapterRegistry.java index 6a8761e3ef..40944205e6 100644 --- a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/AdapterRegistry.java +++ b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/AdapterRegistry.java @@ -36,6 +36,7 @@ import org.streampipes.connect.adapter.generic.protocol.Protocol; import org.streampipes.connect.adapter.generic.protocol.set.FileProtocol; import org.streampipes.connect.adapter.generic.protocol.set.HttpProtocol; +import org.streampipes.connect.adapter.generic.protocol.stream.FileStreamProtocol; import org.streampipes.connect.adapter.generic.protocol.stream.HttpStreamProtocol; import org.streampipes.connect.adapter.generic.protocol.stream.KafkaProtocol; import org.streampipes.connect.adapter.generic.protocol.stream.MqttProtocol; @@ -56,12 +57,12 @@ public class AdapterRegistry { public static Map getAllAdapters() { Map allAdapters = new HashMap<>(); -// allAdapters.put(GenericDataSetAdapter.ID, new GenericDataSetAdapter()); + allAdapters.put(GenericDataSetAdapter.ID, new GenericDataSetAdapter()); allAdapters.put(GenericDataStreamAdapter.ID, new GenericDataStreamAdapter()); -// allAdapters.put(TwitterAdapter.ID, new TwitterAdapter()); + allAdapters.put(TwitterAdapter.ID, new TwitterAdapter()); allAdapters.put(OpenSenseMapAdapter.ID, new OpenSenseMapAdapter()); allAdapters.put(GdeltAdapter.ID, new GdeltAdapter()); -// allAdapters.put(NswTrafficCameraAdapter.ID, new NswTrafficCameraAdapter()); + allAdapters.put(NswTrafficCameraAdapter.ID, new NswTrafficCameraAdapter()); return allAdapters; } @@ -72,9 +73,9 @@ public static Map getAllFormats() { allFormats.put(JsonFormat.ID, new JsonFormat()); allFormats.put(JsonObjectFormat.ID, new JsonObjectFormat()); allFormats.put(JsonArrayFormat.ID, new JsonArrayFormat()); -// allFormats.put(CsvFormat.ID, new CsvFormat()); + allFormats.put(CsvFormat.ID, new CsvFormat()); allFormats.put(GeoJsonFormat.ID, new GeoJsonFormat()); -// allFormats.put(XmlFormat.ID, new XmlFormat()); + allFormats.put(XmlFormat.ID, new XmlFormat()); return allFormats; @@ -86,9 +87,9 @@ public static Map getAllParsers() { allParsers.put(JsonFormat.ID, new JsonParser()); allParsers.put(JsonObjectFormat.ID, new JsonObjectParser()); allParsers.put(JsonArrayFormat.ID, new JsonArrayParser()); -// allParsers.put(CsvFormat.ID, new CsvParser()); + allParsers.put(CsvFormat.ID, new CsvParser()); allParsers.put(GeoJsonFormat.ID, new GeoJsonParser()); -// allParsers.put(XmlFormat.ID, new XmlParser()); + allParsers.put(XmlFormat.ID, new XmlParser()); return allParsers; } @@ -96,11 +97,12 @@ public static Map getAllParsers() { public static Map getAllProtocols() { Map allProtocols = new HashMap<>(); -// allProtocols.put(HttpProtocol.ID, new HttpProtocol()); -// allProtocols.put(FileProtocol.ID, new FileProtocol()); + allProtocols.put(HttpProtocol.ID, new HttpProtocol()); + allProtocols.put(FileProtocol.ID, new FileProtocol()); allProtocols.put(KafkaProtocol.ID, new KafkaProtocol()); allProtocols.put(MqttProtocol.ID, new MqttProtocol()); allProtocols.put(HttpStreamProtocol.ID, new HttpStreamProtocol()); + allProtocols.put(FileStreamProtocol.ID, new FileStreamProtocol()); return allProtocols; } diff --git a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvFormat.java b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvFormat.java index 5ad53cb8b8..16e6b5a953 100644 --- a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvFormat.java +++ b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvFormat.java @@ -52,9 +52,12 @@ public CsvFormat(String delimiter, Boolean header) { @Override public Format getInstance(FormatDescription formatDescription) { ParameterExtractor extractor = new ParameterExtractor(formatDescription.getConfig()); - boolean header = extractor.singleValue(HEADER_NAME) == null ? false : true; String delimiter = extractor.singleValue(DELIMITER_NAME); + boolean header = extractor.selectedMultiValues(HEADER_NAME).stream() + .anyMatch(option -> option.equals("Header")); + + return new CsvFormat(delimiter, header); } diff --git a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvParser.java b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvParser.java index 543aca615c..9ca0d8ed70 100644 --- a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvParser.java +++ b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/format/csv/CsvParser.java @@ -51,7 +51,8 @@ public CsvParser(String delimiter, Boolean header) { public Parser getInstance(FormatDescription formatDescription) { ParameterExtractor extractor = new ParameterExtractor(formatDescription.getConfig()); - boolean header = extractor.singleValue(CsvFormat.HEADER_NAME) == null ? false : true; + boolean header = extractor.selectedMultiValues(CsvFormat.HEADER_NAME).stream() + .anyMatch(option -> option.equals("Header")); String delimiter = extractor.singleValue(CsvFormat.DELIMITER_NAME); return new CsvParser(delimiter, header); @@ -82,7 +83,7 @@ public EventSchema getEventSchema(List oneEvent) { String[] keys; String[] data; - if (!this.header) { + if (this.header) { keys = new String (oneEvent.get(0)).split(delimiter); data = new String (oneEvent.get(1)).split(delimiter); } else { diff --git a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/protocol/stream/FileStreamProtocol.java b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/protocol/stream/FileStreamProtocol.java new file mode 100644 index 0000000000..8048884dbb --- /dev/null +++ b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/protocol/stream/FileStreamProtocol.java @@ -0,0 +1,133 @@ +/* +Copyright 2018 FZI Forschungszentrum Informatik + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.streampipes.connect.adapter.generic.protocol.stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.streampipes.connect.adapter.generic.format.Format; +import org.streampipes.connect.adapter.generic.format.Parser; +import org.streampipes.connect.adapter.generic.guess.SchemaGuesser; +import org.streampipes.connect.adapter.generic.protocol.Protocol; +import org.streampipes.connect.adapter.generic.sdk.ParameterExtractor; +import org.streampipes.model.connect.grounding.ProtocolDescription; +import org.streampipes.model.connect.guess.GuessSchema; +import org.streampipes.model.schema.EventSchema; +import org.streampipes.model.staticproperty.FreeTextStaticProperty; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FileStreamProtocol extends PullProtocol { + + private static Logger logger = LoggerFactory.getLogger(FileStreamProtocol.class); + + public static final String ID = "https://streampipes.org/vocabulary/v1/protocol/stream/file"; + + private String filePath; + + public FileStreamProtocol() { + } + + public FileStreamProtocol(Parser parser, Format format, long interval, String filePath) { + super(parser, format, interval); + this.filePath = filePath; + } + + @Override + InputStream getDataFromEndpoint() { + FileReader fr = null; + InputStream inn = null; + try { + + fr = new FileReader(filePath); + BufferedReader br = new BufferedReader(fr); + + inn = new FileInputStream(filePath); + + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + return inn; + } + + @Override + public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) { + ParameterExtractor extractor = new ParameterExtractor(protocolDescription.getConfig()); + long intervalProperty = Long.parseLong(extractor.singleValue("interval")); + + String fileUri = extractor.singleValue("filePath"); + + return new FileStreamProtocol(parser, format, intervalProperty, fileUri); } + + @Override + public ProtocolDescription declareModel() { + ProtocolDescription pd = new ProtocolDescription(ID,"File","This is the " + + "description for the File Stream protocol"); + FreeTextStaticProperty urlProperty = new FreeTextStaticProperty("filePath", "File Path", + "This property defines the path to the file."); + pd.setSourceType("STREAM"); + FreeTextStaticProperty intervalProperty = new FreeTextStaticProperty("interval", "Interval", "This property " + + "defines the pull interval in seconds."); + pd.setIconUrl("file.png"); + pd.addConfig(urlProperty); + pd.addConfig(intervalProperty); + return pd; + } + + @Override + public GuessSchema getGuessSchema() { + InputStream dataInputStream = getDataFromEndpoint(); + + List dataByte = parser.parseNEvents(dataInputStream, 20); + + EventSchema eventSchema= parser.getEventSchema(dataByte); + + GuessSchema result = SchemaGuesser.guessSchma(eventSchema, getNElements(20)); + + return result; } + + @Override + public List> getNElements(int n) { + List> result = new ArrayList<>(); + + InputStream dataInputStream = getDataFromEndpoint(); + + List dataByteArray = parser.parseNEvents(dataInputStream, n); + + // Check that result size is n. Currently just an error is logged. Maybe change to an exception + if (dataByteArray.size() < n) { + logger.error("Error in File Protocol! User required: " + n + " elements but the resource just had: " + + dataByteArray.size()); + } + + for (byte[] b : dataByteArray) { + result.add(format.parse(b)); + } + + return result; + } + + @Override + public String getId() { + return ID; + } +} diff --git a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/sdk/ParameterExtractor.java b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/sdk/ParameterExtractor.java index ff756f8760..5619a913f1 100644 --- a/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/sdk/ParameterExtractor.java +++ b/streampipes-connect/src/main/java/org/streampipes/connect/adapter/generic/sdk/ParameterExtractor.java @@ -17,10 +17,10 @@ package org.streampipes.connect.adapter.generic.sdk; -import org.streampipes.model.staticproperty.FreeTextStaticProperty; -import org.streampipes.model.staticproperty.StaticProperty; +import org.streampipes.model.staticproperty.*; import java.util.List; +import java.util.stream.Collectors; public class ParameterExtractor { private List list; @@ -34,6 +34,15 @@ public String singleValue(String internalName) { .getValue()); } + public List selectedMultiValues(String internalName) { + return ((SelectionStaticProperty) getStaticPropertyByName(internalName)) + .getOptions() + .stream() + .filter(Option::isSelected) + .map(Option::getName) + .collect(Collectors.toList()); + } + private StaticProperty getStaticPropertyByName(String name) { for(StaticProperty p : list) diff --git a/streampipes-container-embedded/pom.xml b/streampipes-container-embedded/pom.xml index 8e863adead..530955fe38 100644 --- a/streampipes-container-embedded/pom.xml +++ b/streampipes-container-embedded/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-container-embedded jar diff --git a/streampipes-container-standalone/pom.xml b/streampipes-container-standalone/pom.xml index 9ae62bea36..8b99ab6bd7 100644 --- a/streampipes-container-standalone/pom.xml +++ b/streampipes-container-standalone/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-container/pom.xml b/streampipes-container/pom.xml index 95d39f8024..8c58c09184 100644 --- a/streampipes-container/pom.xml +++ b/streampipes-container/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-container Large scale event processing agents with semantic checking for combinability. diff --git a/streampipes-dataformat-json/pom.xml b/streampipes-dataformat-json/pom.xml index 91a2914fc2..dda530a4b5 100644 --- a/streampipes-dataformat-json/pom.xml +++ b/streampipes-dataformat-json/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-dataformat/pom.xml b/streampipes-dataformat/pom.xml index d521532467..81bd50b482 100644 --- a/streampipes-dataformat/pom.xml +++ b/streampipes-dataformat/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-logging/pom.xml b/streampipes-logging/pom.xml index 2db68f852b..a8541f6f5b 100644 --- a/streampipes-logging/pom.xml +++ b/streampipes-logging/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-measurement-units/pom.xml b/streampipes-measurement-units/pom.xml index a9c8cd2981..df7a98e2f6 100644 --- a/streampipes-measurement-units/pom.xml +++ b/streampipes-measurement-units/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-measurement-units diff --git a/streampipes-messaging-jms/pom.xml b/streampipes-messaging-jms/pom.xml index 28ca7dd584..2113ef9921 100644 --- a/streampipes-messaging-jms/pom.xml +++ b/streampipes-messaging-jms/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-messaging-kafka/pom.xml b/streampipes-messaging-kafka/pom.xml index 3581489b95..f360876c96 100644 --- a/streampipes-messaging-kafka/pom.xml +++ b/streampipes-messaging-kafka/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-messaging/pom.xml b/streampipes-messaging/pom.xml index 8604d8a3ee..bdadcf0b50 100644 --- a/streampipes-messaging/pom.xml +++ b/streampipes-messaging/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-model-client/pom.xml b/streampipes-model-client/pom.xml index 3093bdbfbd..253ee8dbb4 100644 --- a/streampipes-model-client/pom.xml +++ b/streampipes-model-client/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-model-client jar diff --git a/streampipes-model/pom.xml b/streampipes-model/pom.xml index 2468bc2687..6e8236949d 100644 --- a/streampipes-model/pom.xml +++ b/streampipes-model/pom.xml @@ -22,7 +22,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 diff --git a/streampipes-performance-tests/pom.xml b/streampipes-performance-tests/pom.xml index 0881c54ebb..0b6094ccd3 100644 --- a/streampipes-performance-tests/pom.xml +++ b/streampipes-performance-tests/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-pipeline-management/pom.xml b/streampipes-pipeline-management/pom.xml index fe5b129c99..f65b66ba76 100644 --- a/streampipes-pipeline-management/pom.xml +++ b/streampipes-pipeline-management/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-pipeline-management diff --git a/streampipes-rest-shared/pom.xml b/streampipes-rest-shared/pom.xml index dfaac0523c..a493beef3d 100644 --- a/streampipes-rest-shared/pom.xml +++ b/streampipes-rest-shared/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml index d58b27c1a3..6bc23eca42 100644 --- a/streampipes-rest/pom.xml +++ b/streampipes-rest/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 StreamPipes REST API streampipes-rest diff --git a/streampipes-sdk/pom.xml b/streampipes-sdk/pom.xml index 1ac95b9a74..2f48060a46 100644 --- a/streampipes-sdk/pom.xml +++ b/streampipes-sdk/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-serializers/pom.xml b/streampipes-serializers/pom.xml index bbbe67e177..77de1cf3f8 100644 --- a/streampipes-serializers/pom.xml +++ b/streampipes-serializers/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-sources/pom.xml b/streampipes-sources/pom.xml index e40fd7a957..383e9d1f31 100644 --- a/streampipes-sources/pom.xml +++ b/streampipes-sources/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-sources diff --git a/streampipes-storage-api/pom.xml b/streampipes-storage-api/pom.xml index fac23a8100..12198d7322 100644 --- a/streampipes-storage-api/pom.xml +++ b/streampipes-storage-api/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-storage-api jar diff --git a/streampipes-storage-couchdb/pom.xml b/streampipes-storage-couchdb/pom.xml index f5cf32f0a8..dee780f25b 100644 --- a/streampipes-storage-couchdb/pom.xml +++ b/streampipes-storage-couchdb/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-storage-management/pom.xml b/streampipes-storage-management/pom.xml index 6a6395dd44..f582a1a89a 100644 --- a/streampipes-storage-management/pom.xml +++ b/streampipes-storage-management/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-storage-rdf4j/pom.xml b/streampipes-storage-rdf4j/pom.xml index 51d75494c5..22a3f663de 100644 --- a/streampipes-storage-rdf4j/pom.xml +++ b/streampipes-storage-rdf4j/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-test-utils/pom.xml b/streampipes-test-utils/pom.xml index 652e147b9e..0ca48dcddf 100644 --- a/streampipes-test-utils/pom.xml +++ b/streampipes-test-utils/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-user-management/pom.xml b/streampipes-user-management/pom.xml index e2f7d042ff..4f2c215a80 100644 --- a/streampipes-user-management/pom.xml +++ b/streampipes-user-management/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-vocabulary/pom.xml b/streampipes-vocabulary/pom.xml index f178045611..65508f258a 100644 --- a/streampipes-vocabulary/pom.xml +++ b/streampipes-vocabulary/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-wrapper-distributed/pom.xml b/streampipes-wrapper-distributed/pom.xml index 2a5b38d949..b9ba18c3ad 100644 --- a/streampipes-wrapper-distributed/pom.xml +++ b/streampipes-wrapper-distributed/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-wrapper-esper/pom.xml b/streampipes-wrapper-esper/pom.xml index 3c880fa191..06e3d68cf1 100644 --- a/streampipes-wrapper-esper/pom.xml +++ b/streampipes-wrapper-esper/pom.xml @@ -3,7 +3,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-wrapper-flink/pom.xml b/streampipes-wrapper-flink/pom.xml index be14a7fd42..6f263110ea 100644 --- a/streampipes-wrapper-flink/pom.xml +++ b/streampipes-wrapper-flink/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-wrapper-flink StreamPipes Wrapper for Apache Flink diff --git a/streampipes-wrapper-kafka-streams/pom.xml b/streampipes-wrapper-kafka-streams/pom.xml index 3805233ba8..d2316de077 100644 --- a/streampipes-wrapper-kafka-streams/pom.xml +++ b/streampipes-wrapper-kafka-streams/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-wrapper-siddhi/pom.xml b/streampipes-wrapper-siddhi/pom.xml index c336b24d1f..d696c9390f 100644 --- a/streampipes-wrapper-siddhi/pom.xml +++ b/streampipes-wrapper-siddhi/pom.xml @@ -20,7 +20,7 @@ streampipes-parent org.streampipes - 0.60.0 + 0.60.1 4.0.0 diff --git a/streampipes-wrapper-spark/pom.xml b/streampipes-wrapper-spark/pom.xml index 9b4594d72a..10e936f9df 100644 --- a/streampipes-wrapper-spark/pom.xml +++ b/streampipes-wrapper-spark/pom.xml @@ -22,7 +22,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-wrapper-spark diff --git a/streampipes-wrapper-standalone/pom.xml b/streampipes-wrapper-standalone/pom.xml index 7659df68ad..ad8ed0c520 100644 --- a/streampipes-wrapper-standalone/pom.xml +++ b/streampipes-wrapper-standalone/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-wrapper-standalone StreamPipes Wrapper for Standalone Pipeline Element Implementations diff --git a/streampipes-wrapper/pom.xml b/streampipes-wrapper/pom.xml index 179b58dd9e..476d0287cf 100644 --- a/streampipes-wrapper/pom.xml +++ b/streampipes-wrapper/pom.xml @@ -20,7 +20,7 @@ org.streampipes streampipes-parent - 0.60.0 + 0.60.1 streampipes-wrapper