Skip to content

Commit

Permalink
Refactor to support better default connect string generation
Browse files Browse the repository at this point in the history
  • Loading branch information
rhettg committed Apr 7, 2015
1 parent f760c84 commit 2d2af7e
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 66 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ result in any data loss as the local instances would just queue up their events.

So on your local machine, you'd run:

oxd --forward=master:3514
oxd --forward=log-master

And on the master collection machine, you'd run:

oxd --collect="*:3514" --log-path=/var/log/blueox/
oxd --collect="*" --log-path=/var/log/blueox/

Logs are encoded in the MsgPack format (http://msgpack.org/), so you'll need
some tooling for doing log analysis. This is easily done with the tool
Expand All @@ -270,11 +270,11 @@ For example:

oxview --log-path=/var/log/blueox --type-name="request" --start-date=20120313 --end-date=20120315

Where `request` is the channel you want to examine.
Where `request` is the event type you're interested in.

You can also connect to `oxd` and get a live streaming of log data:

oxview -H localhost:3513 --type-name="request*"
oxview -H log-master --type-name="request*"

Note the use of '*' to indicate a prefix query for the type filter. This will
return all events with a type that begins with 'request'
Expand Down
28 changes: 16 additions & 12 deletions bin/oxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import signal
import struct
import time

import json
import zmq
import msgpack

from blueox import network
from blueox import ports

# How long do we wait for network traffic before running our poll loop anyway.
POLL_LOOP_TIMEOUT_MS = 1000
Expand Down Expand Up @@ -145,8 +145,8 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', dest='verbose', action='append_const', const=True, default=list())

parser.add_argument('--control', '-t', dest='control', action='store', default="127.0.0.1:3513")
parser.add_argument('--collect', '-c', dest='collect', action='store', default="127.0.0.1:3514")
parser.add_argument('--control', '-t', dest='control', action='store', default=ports.default_control_host())
parser.add_argument('--collect', '-c', dest='collect', action='store', default=ports.default_collect_host())
parser.add_argument('--forward', '-r', dest='forward', action='store', default=None)

parser.add_argument('--log-path', '-l', dest='log_path', action='store', default=None)
Expand All @@ -155,6 +155,8 @@ def main():

setup_logging(options)

continue_running = True

def handle_sigterm(signum, frame):
log.info("Exiting")
continue_running = True
Expand All @@ -170,14 +172,16 @@ def main():
zmq_context = zmq.Context()
poller = zmq.Poller()

log.info("Initializing control port %s", options.control)
control_host = ports.default_control_host(options.control)
log.info("Initializing control port %s", control_host)
control_sock = zmq_context.socket(zmq.REP)
control_sock.bind("tcp://%s" % (options.control,))
control_sock.bind("tcp://%s" % (control_host,))
poller.register(control_sock, zmq.POLLIN)

log.info("Initializing collector port %s", options.collect)
collect_host = ports.default_collect_host(options.collect)
log.info("Initializing collector port %s", collect_host)
collector_sock = zmq_context.socket(zmq.PULL)
collector_sock.bind("tcp://%s" % options.collect)
collector_sock.bind("tcp://%s" % collect_host)
poller.register(collector_sock, zmq.POLLIN)

streamer_sock = zmq_context.socket(zmq.PUB)
Expand All @@ -187,21 +191,22 @@ def main():

forward_sock = None
if options.forward:
log.info("Inializing forwarding socket to %s", options.forward)
forward_host = ports.default_collect_host(options.forward)
log.info("Inializing forwarding socket to %s", forward_host)
forward_sock = zmq_context.socket(zmq.PUSH)
forward_sock.linger = LINGER_SHUTDOWN_MSECS
# Note we are not setting a HWM for our forwarder, this means we'll
# just collect and store all messages in memory until our forwarder
# becomes available. The SWAP option which would temporarily store on
# disk was removed in zmq 3
forward_sock.connect("tcp://%s" % options.forward)
forward_sock.connect("tcp://%s" % forward_host)

stats = {'last': None, 'lag': None, 'events': {}, 'hosts': {}}
log_files = {}
continue_running = True
log.info("Starting IO Loop")
while continue_running:
log.debug("Poll")

try:
ready = dict(poller.poll(POLL_LOOP_TIMEOUT_MS))
except KeyboardInterrupt, SystemExit:
Expand All @@ -215,7 +220,6 @@ def main():
else:
raise


log.debug("Poller returned: %r", ready)

log_files = dict((name, stream) for name, stream in log_files.iteritems() if stream.poll())
Expand Down Expand Up @@ -267,7 +271,7 @@ def main():
control_data = control_sock.recv()
request = msgpack.unpackb(control_data)
log.info("Received control request: %r", request)

if request['cmd'] == "SOCK_STREAM":
control_sock.send(msgpack.packb({'port': streamer_sock_port}))
elif request['cmd'] == "SOCK_COLLECT":
Expand Down
18 changes: 6 additions & 12 deletions blueox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from . import utils
from . import network
from . import ports
from .context import (
Context,
set,
Expand All @@ -40,12 +41,6 @@
log = logging.getLogger(__name__)


ENV_VAR_HOST = 'BLUEOX_HOST'
ENV_VAR_PORT = 'BLUEOX_PORT'
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 3514


def configure(host, port, recorder=None):
"""Initialize blueox
Expand All @@ -66,13 +61,12 @@ def configure(host, port, recorder=None):


def default_configure(host=None):
if host is None:
host = os.environ.get(ENV_VAR_HOST, DEFAULT_HOST)

if ':' not in host:
configured_port = os.environ.get(ENV_VAR_PORT, DEFAULT_PORT)
host = "{}:{}".format(configured_port)
"""Configure BlueOx based on defaults
Accepts a connection string override in the form `localhost:3514`. Respects
environment variable BLUEOX_HOST
"""
host = ports.default_collect_host(host)
hostname, port = host.split(':')

try:
Expand Down
18 changes: 3 additions & 15 deletions blueox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@
import msgpack
import zmq

log = logging.getLogger(__name__)

ENV_VAR_HOST = 'BLUEOX_CLIENT_HOST'
ENV_VAR_PORT = 'BLUEOX_CLIENT_PORT'
from . import ports

DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 3513
log = logging.getLogger(__name__)


def default_host(host=None):
Expand All @@ -36,15 +32,7 @@ def default_host(host=None):
We also respect environment variables BLUEOX_CLIENT_HOST and _PORT if
command line options aren't your thing.
"""
default_host = os.environ.get(ENV_VAR_HOST, DEFAULT_HOST)
default_port = os.environ.get(ENV_VAR_PORT, DEFAULT_PORT)

if not host:
host = default_host
if ':' not in host:
host = "{}:{}".format(host, default_port)

return host
return ports.default_control_host(host)


def decode_stream(stream):
Expand Down
45 changes: 45 additions & 0 deletions blueox/ports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-

"""
blueox.ports
~~~~~~~~
This module provides utilities for generating connect strings.
BlueOx, thanks to it's 0mq roots has a somewhat complex relationship with
ports, which we'd like to abstract from the user as much as possible.
:copyright: (c) 2015 by Rhett Garber
:license: ISC, see LICENSE for more details.
"""
import os


ENV_VAR_CONTROL_HOST = 'BLUEOX_CLIENT_HOST'
ENV_VAR_COLLECT_HOST = 'BLUEOX_HOST'

DEFAULT_HOST = '127.0.0.1'
DEFAULT_CONTROL_PORT = 3513
DEFAULT_COLLECT_PORT = 3514


def _default_host(host, default_host, default_port):
"""Build a default host string
"""
if not host:
host = default_host
if ':' not in host:
host = "{}:{}".format(host, default_port)

return host


def default_control_host(host=None):
default_host = os.environ.get(ENV_VAR_CONTROL_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_CONTROL_PORT)


def default_collect_host(host=None):
default_host = os.environ.get(ENV_VAR_COLLECT_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_COLLECT_PORT)
23 changes: 0 additions & 23 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,6 @@
from blueox import client


class DefaultHostTest(TestCase):
def test_none(self):
host = client.default_host()

hostname, port = host.split(':')
assert hostname
int(port)

def test_host(self):
host = client.default_host('master')

hostname, port = host.split(':')
assert_equal(hostname, 'master')
int(port)

def test_explicit(self):
host = client.default_host('master:1234')

hostname, port = host.split(':')
assert_equal(hostname, 'master')
assert_equal(int(port), 1234)


class SimpleGrouperTest(TestCase):
@setup
def build_stream(self):
Expand Down
73 changes: 73 additions & 0 deletions tests/ports_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
from testify import *

from blueox import ports


class DefaultHostTest(TestCase):
def test_none(self):
host = ports._default_host(None, 'localhost', 123)

hostname, port = host.split(':')
assert_equal(hostname, 'localhost')
assert_equal(port, '123')

def test_host(self):
host = ports._default_host('master', 'localhost', 123)

hostname, port = host.split(':')
assert_equal(hostname, 'master')
assert_equal(port, '123')

def test_explicit(self):
host = ports._default_host('master:1234', 'localhost', 123)

hostname, port = host.split(':')
assert_equal(hostname, 'master')
assert_equal(port, '1234')


class DefaultControlHost(TestCase):
@teardown
def clear_env(self):
try:
del os.environ['BLUEOX_CLIENT_HOST']
except KeyError:
pass

def test_emtpy(self):
host = ports.default_control_host()
assert_equal(host, "127.0.0.1:3513")

def test_env(self):
os.environ['BLUEOX_CLIENT_HOST'] = 'master'
host = ports.default_control_host()
assert_equal(host, "master:3513")

def test_env_port(self):
os.environ['BLUEOX_CLIENT_HOST'] = 'master:123'
host = ports.default_control_host()
assert_equal(host, "master:123")


class DefaultCollectHost(TestCase):
@teardown
def clear_env(self):
try:
del os.environ['BLUEOX_HOST']
except KeyError:
pass

def test_emtpy(self):
host = ports.default_collect_host()
assert_equal(host, "127.0.0.1:3514")

def test_env(self):
os.environ['BLUEOX_HOST'] = 'master'
host = ports.default_collect_host()
assert_equal(host, "master:3514")

def test_env_port(self):
os.environ['BLUEOX_HOST'] = 'master:123'
host = ports.default_collect_host()
assert_equal(host, "master:123")

0 comments on commit 2d2af7e

Please sign in to comment.