Skip to content

Commit

Permalink
zmq-source: change the way how zmq-source handle the connection creation
Browse files Browse the repository at this point in the history
 * In the source case syslog-ng use pull socket. When you use pull socket you must connect to socket instead of bind and the other part has to bind.
 * E.g.: pull -> connect | push -> bind
 * Remove the lot of c test scripts and use python instead.
  • Loading branch information
lmesz committed Oct 15, 2015
1 parent 5934b33 commit bbce173
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 144 deletions.
35 changes: 0 additions & 35 deletions modules/zmq/scripts/publisher.c

This file was deleted.

54 changes: 0 additions & 54 deletions modules/zmq/scripts/pusher.c

This file was deleted.

52 changes: 0 additions & 52 deletions modules/zmq/scripts/subscriber.c

This file was deleted.

15 changes: 15 additions & 0 deletions modules/zmq/scripts/test_pusher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python

import time
import zmq

def producer():
context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.bind("tcp://127.0.0.1:8888")
for num in xrange(10000):
work_message = { 'num' : num }
zmq_socket.send_json(work_message)


producer()
8 changes: 5 additions & 3 deletions modules/zmq/zmq-source.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ zmq_sd_set_port(LogDriver *source, gint port)
{
ZMQSourceDriver *self = (ZMQSourceDriver *)source;
self->port = port;
msg_info("Port set successfully!", evt_tag_int("Port: ", self->port), NULL);
}

static void
Expand Down Expand Up @@ -91,9 +92,9 @@ create_zmq_context(ZMQSourceDriver* self)

gchar* address = get_address(self);

if (zmq_bind(self->socket, address) != 0)
if (zmq_connect(self->socket, address) != 0)
{
msg_error("Failed to bind!", evt_tag_str("Bind address", address), evt_tag_errno("Error", errno),NULL);
msg_error("Failed to connect!", evt_tag_str("Connect address", address), evt_tag_errno("Error", errno),NULL);
g_free(address);
return FALSE;
}
Expand Down Expand Up @@ -230,8 +231,9 @@ zmq_sd_new(GlobalConfig *cfg)
self->super.super.super.notify = zmq_sd_notify;
self->super.super.super.free_fn = zmq_sd_free;

zmq_sd_set_address((LogDriver *) self, "*");
zmq_sd_set_address((LogDriver *) self, "localhost");
zmq_sd_set_port((LogDriver *) self, 5558);

log_reader_options_defaults(&self->reader_options);
return &self->super.super;
}

0 comments on commit bbce173

Please sign in to comment.