diff --git a/Docker-init.sh b/Docker-init.sh new file mode 100755 index 0000000..87ee15d --- /dev/null +++ b/Docker-init.sh @@ -0,0 +1,45 @@ +#!/bin/sh +#/* +# * Copyright (c) 2015 Adobe Systems Incorporated. All rights reserved. +# * +# * Permission is hereby granted, free of charge, to any person obtaining a +# * copy of this software and associated documentation files (the "Software"), +# * to deal in the Software without restriction, including without limitation +# * the rights to use, copy, modify, merge, publish, distribute, sublicense, +# * and/or sell copies of the Software, and to permit persons to whom the +# * Software is furnished to do so, subject to the following conditions: +# * +# * The above copyright notice and this permission notice shall be included in +# * all copies or substantial portions of the Software. +# * +# * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# * DEALINGS IN THE SOFTWARE. +# * +# */ +debug_mode=$(echo $DEBUG) +gateway_listen_queue=${GATEWAY_LISTEN_QUEUE:-ipc://@nginx_queue_listen} + +mkdir -p /var/log/api-gateway + +# ldd /usr/local/sbin/api-gateway-zmq-adaptor + +echo "Starting ZeroMQ adaptor ..." +zmq_port=$(echo $ZMQ_PUBLISHER_PORT) +# use -d flag to start API Gateway ZMQ adaptor in debug mode to print all messages sent by the GW +zmq_adaptor_cmd="api-gateway-zmq-adaptor -b ${gateway_listen_queue}" +if [[ -n "${zmq_port}" ]]; then + echo "... ZMQ will publish messages on:" ${zmq_port} + zmq_adaptor_cmd="${zmq_adaptor_cmd} -p ${zmq_port}" +fi +if [ "${debug_mode}" == "true" ]; then + echo " ... in DEBUG mode " + zmq_adaptor_cmd="${zmq_adaptor_cmd} -d" +fi + +echo "Running "$zmq_adaptor_cmd +$zmq_adaptor_cmd \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ff3f91a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,77 @@ +# apigateway-zmq-adaptor +# +# VERSION 0.1.0 +# +# From https://hub.docker.com/_/alpine/ +# +FROM alpine:latest + +ENV ZMQ_VERSION 4.0.5 +ENV CZMQ_VERSION 2.2.0 +ENV ZMQ_ADAPTOR_VERSION 0.1.1 + +RUN apk update \ + && apk add \ + gcc tar libtool zlib make musl-dev openssl-dev g++ zlib-dev curl \ + && echo " ... adding ZMQ and CZMQ" \ + && curl -L http://download.zeromq.org/zeromq-${ZMQ_VERSION}.tar.gz -o /tmp/zeromq.tar.gz \ + && cd /tmp/ \ + && tar -xf /tmp/zeromq.tar.gz \ + && cd /tmp/zeromq*/ \ + && ./configure --prefix=/usr \ + --sysconfdir=/etc \ + --mandir=/usr/share/man \ + --infodir=/usr/share/info \ + && make && make install \ + && curl -L http://download.zeromq.org/czmq-${CZMQ_VERSION}.tar.gz -o /tmp/czmq.tar.gz \ + && cd /tmp/ \ + && tar -xf /tmp/czmq.tar.gz \ + && cd /tmp/czmq*/ \ + && ./configure --prefix=/usr \ + --sysconfdir=/etc \ + --mandir=/usr/share/man \ + --infodir=/usr/share/info \ + && make && make install \ + && rm -rf /tmp/zeromq* && rm -rf /tmp/czmq* \ + && rm -rf /var/cache/apk/* \ + && echo " ... installing api-gateway-zmq-adaptor" \ + && curl -L https://github.com/adobe-apiplatform/api-gateway-zmq-adaptor/archive/${ZMQ_ADAPTOR_VERSION}.tar.gz -o /tmp/api-gateway-zmq-adaptor-${ZMQ_ADAPTOR_VERSION} \ + && apk update \ + && apk add check-dev \ + && cd /tmp/ \ + && tar -xf /tmp/api-gateway-zmq-adaptor-${ZMQ_ADAPTOR_VERSION} \ + && cd /tmp/api-gateway-zmq-adaptor-* \ + && make test \ + && mkdir -p /usr/local/sbin \ + && PREFIX=/usr/local/sbin make install \ + && rm -rf /tmp/api-gateway-zmq-adaptor-* \ + && apk del check-dev \ + && apk del gcc tar libtool make musl-dev openssl-dev g++ zlib-dev curl \ + && rm -rf /var/cache/apk/* + +RUN apk update \ + && apk add libgcc libstdc++ + +# --- DEV ONLY --- +# RUN apk update \ +# && apk add \ +# gcc tar libtool zlib make musl-dev openssl-dev g++ zlib-dev curl \ +# && apk add check-dev +# COPY src /tmp/api-gateway-zmq-adaptor-${ZMQ_ADAPTOR_VERSION}/src +# COPY tests /tmp/api-gateway-zmq-adaptor-${ZMQ_ADAPTOR_VERSION}/tests +# COPY Makefile /tmp/api-gateway-zmq-adaptor-${ZMQ_ADAPTOR_VERSION}/Makefile +# RUN cd /tmp/api-gateway-zmq-adaptor-* \ +# && make test \ +# && mkdir -p /usr/local/sbin \ +# && PREFIX=/usr/local/sbin make install \ +# && rm -rf /tmp/api-gateway-zmq-adaptor-* +# ------------------ + +COPY Docker-init.sh /etc/init-container.sh +ONBUILD COPY init.sh /etc/init-container.sh + +# volume to expose IPC sockets +# http://api.zeromq.org/4-1:zmq-ipc +VOLUME /var/run/api-gateway-zmq-adaptor + +ENTRYPOINT ["/etc/init-container.sh"] diff --git a/Makefile b/Makefile index 484f313..daa92fb 100644 --- a/Makefile +++ b/Makefile @@ -41,4 +41,36 @@ test-cpp : all #g++ $(LD_LIBRARIES) $(CPPFLAGS) -o quick_test ./tests/quick_test.cpp clean: - rm -rf target \ No newline at end of file + rm -rf target + +docker: + docker build -t adobeapiplatform/apigateway-zmq-adaptor . + +.PHONY: docker-ssh +docker-ssh: + docker run -ti --entrypoint='bash' adobeapiplatform/apigateway:latest + +.PHONY: docker-run +docker-run: process-resources + docker run --rm --name="apigateway-zmq-adaptor" --ipc="host" --net="host" \ + --cpuset-cpus="2-2" \ + -p 6001:6001 \ + -e "DEBUG=false" -e "GATEWAY_LISTEN_QUEUE=ipc://@nginx_queue_listen" \ + adobeapiplatform/apigateway-zmq-adaptor:latest + +.PHONY: docker-debug +docker-debug: process-resources + docker run --rm --name="apigateway-zmq-adaptor" --ipc="host" --net="host" \ + --cpuset-cpus="2-3" \ + -p 6001:6001 \ + -e "DEBUG=true" -e "GATEWAY_LISTEN_QUEUE=ipc://@nginx_queue_listen" \ + adobeapiplatform/apigateway-zmq-adaptor:latest + +.PHONY: docker-attach +docker-attach: + docker exec -i -t apigateway-zmq-adaptor bash + +.PHONY: docker-stop +docker-stop: + docker stop apigateway-zmq-adaptor + docker rm apigateway-zmq-adaptor diff --git a/src/GwZmqAdaptor.c b/src/GwZmqAdaptor.c index 81dd1da..353a839 100644 --- a/src/GwZmqAdaptor.c +++ b/src/GwZmqAdaptor.c @@ -50,7 +50,7 @@ and proxying them to a local IP address on XPUB . Remote consumers should connec void start_gateway_listener(zctx_t *ctx, char *subscriberAddress, char *publisherAddress) { - printf("Starting Gateway Listener \n"); + fprintf(stderr,"Starting Gateway Listener \n"); void *subscriber = zsocket_new (ctx, ZMQ_XSUB); int subscriberSocketResult = zsocket_bind (subscriber, "%s", subscriberAddress); @@ -61,7 +61,7 @@ start_gateway_listener(zctx_t *ctx, char *subscriberAddress, char *publisherAddr int publisherBindResult = zsocket_bind (publisher, "%s", publisherAddress); assert( publisherBindResult >= 0 ); - printf("Starting XPUB->XSUB Proxy [%s] -> [%s] \n", subscriberAddress, publisherAddress ); + fprintf(stderr, "Starting XPUB->XSUB Proxy [%s] -> [%s] \n", subscriberAddress, publisherAddress); zproxy_t *xpub_xsub_thread = zproxy_new(ctx, subscriber, publisher); } diff --git a/src/GwZmqAdaptor.h b/src/GwZmqAdaptor.h index a18f058..081e47f 100644 --- a/src/GwZmqAdaptor.h +++ b/src/GwZmqAdaptor.h @@ -7,7 +7,9 @@ */ #define DEFAULT_XPUB "tcp://0.0.0.0:6001" /** -* Default address where the Gateway is sending the tracking messages +* Default address where the Gateway is sending the tracking messages. +* @see: http://api.zeromq.org/4-1:zmq-ipc +* @see: http://man7.org/linux/man-pages/man7/unix.7.html */ #define DEFAULT_XSUB "ipc:///tmp/nginx_queue_listen" diff --git a/src/api-gateway-zmq-adaptor.c b/src/api-gateway-zmq-adaptor.c index 060571c..bc66b76 100644 --- a/src/api-gateway-zmq-adaptor.c +++ b/src/api-gateway-zmq-adaptor.c @@ -30,7 +30,7 @@ static void subscriber_thread (void *args, zctx_t *ctx, void *pipe) { - printf("Starting Debug subscriber thread [%s] ... \n", args); + fprintf(stderr, "Starting Debug subscriber thread [%s] ... \n", args); void *subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (subscriber, "%s", args); zsocket_set_subscribe (subscriber, ""); @@ -58,7 +58,7 @@ subscriber_thread (void *args, zctx_t *ctx, void *pipe) static void publisher_thread (void *args, zctx_t *ctx, void *pipe) { - printf("Starting Test publisher thread [%s] ... \n", args); + fprintf(stderr, "Starting Test publisher thread [%s] ... \n", args); void *publisher = zsocket_new (ctx, ZMQ_PUB); int socket_bound = zsocket_connect (publisher, "%s", args); @@ -181,9 +181,7 @@ int main (int argc, char *argv[]) int major, minor, patch, lmajor, lminor, lpatch; zmq_version (&major, &minor, &patch); zsys_version (&lmajor, &lminor, &lpatch); - char str_version[60]; - sprintf( str_version, "ZeroMQ version %d.%d.%d (czmq %d.%d.%d)", major, minor, patch, lmajor, lminor, lpatch); - puts(str_version); + fprintf(stderr, "ZeroMQ version %d.%d.%d (czmq %d.%d.%d) \n", major, minor, patch, lmajor, lminor, lpatch); // parse command line args char c; @@ -213,20 +211,20 @@ int main (int argc, char *argv[]) break; case 'd': debugFlag = 1; - puts("RUNNING IN DEBUGGING MODE"); + fprintf(stderr,"RUNNING IN DEBUGGING MODE\n"); break; case 't': debugFlag = 1; testFlag = 1; - puts("RUNNING IN TEST MODE & DEBUG MODE for XPUB -> XSUB"); + fprintf(stderr,"RUNNING IN TEST MODE & DEBUG MODE for XPUB -> XSUB\n"); break; case 'r': debugFlag = 1; testBlackBoxFlag = 1; - puts("RUNNING IN TEST MODE & DEBUG MODE for SUB -> PUSH"); + fprintf(stderr,"RUNNING IN TEST MODE & DEBUG MODE for SUB -> PUSH\n"); break; case '?': - puts("Unrecognized option!"); + fprintf(stderr,"Unrecognized option!\n"); break; } } @@ -260,7 +258,7 @@ int main (int argc, char *argv[]) assert( pushSocketResult >= 0 ); - printf("\nStarting SUB->PUSH Proxy [%s] -> [%s] \n", listenerAddress, pushAddress ); + fprintf(stderr,"\nStarting SUB->PUSH Proxy [%s] -> [%s] \n", listenerAddress, pushAddress ); zproxy_t *sub_push_thread = zproxy_new(ctx, listenerSocket, pushSocket); if ( testBlackBoxFlag == 1 ) { @@ -304,7 +302,7 @@ int main (int argc, char *argv[]) zclock_sleep(500); } - puts (" ... interrupted"); + fprintf(stderr," ... interrupted"); // Tell attached threads to exit gw_zmq_destroy( &ctx ); return 0; diff --git a/tests/test_published_messages.c b/tests/test_published_messages.c index 9810521..570dba7 100644 --- a/tests/test_published_messages.c +++ b/tests/test_published_messages.c @@ -1,4 +1,17 @@ -// TODO +/* +* Copyright 2015 Adobe Systems Incorporated. All rights reserved. +* +* This file is licensed to you under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software distributed +* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR RESPRESENTATIONS +* OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ #include #include @@ -125,6 +138,41 @@ START_TEST(test_gateway_listener) END_TEST +START_TEST(test_gateway_listener_over_abstract_socket) +{ + zctx_t *ctx = gw_zmq_init(); + ck_assert_msg(ctx != NULL, "ZMQ Context can't be null. "); + + start_gateway_listener(ctx, "ipc://@nginx_queue_listen", "tcp://127.0.0.1:6001"); + + // simulate a consumer + char *publisherAddress = "tcp://127.0.0.1:6001"; + void *pipe2 = zthread_fork (ctx, mock_subscriber_thread, publisherAddress); + ck_assert_msg(pipe2 != NULL, "Subscriber Thread should have been created. "); + + zclock_sleep (100); + + // simulate gateway +// char *subscriberAddress = "ipc://\\0nginx_queue_listen"; + char *subscriberAddress = "ipc://@nginx_queue_listen"; + void *pipe = zthread_fork (ctx, mock_gateway_publisher_thread, subscriberAddress); + ck_assert_msg(pipe != NULL, "Publisher Thread should have been created. "); + + // wait for some messages to be passed + zclock_sleep(400); + zctx_interrupted = true; + + char s_counter[100] = ""; + int expected_min_messages = 15; + sprintf(s_counter, "The consumer should have received at least [%d] messages, but got [%d]", expected_min_messages, messages_received_counter); + ck_assert_msg( messages_received_counter >= expected_min_messages, s_counter); + + gw_zmq_destroy( &ctx ); + ck_assert_msg(ctx == NULL, "ZMQ Context should be destroyed. "); +} +END_TEST + + Suite * adaptor_suite(void) { @@ -138,6 +186,7 @@ Suite * adaptor_suite(void) tcase_add_test(tc_core, test_zmq_context_lifecycle); tcase_add_test(tc_core, test_gateway_listener); + tcase_add_test(tc_core, test_gateway_listener_over_abstract_socket); suite_add_tcase(s, tc_core); return s;