Skip to content
This repository has been archived by the owner on Apr 2, 2019. It is now read-only.
/ rabbit_feed Public archive

Not maintained anymore - A gem providing asynchronous event publish and subscribe capabilities with RabbitMQ

License

Notifications You must be signed in to change notification settings

simplybusiness/rabbit_feed

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitFeed

** This gem is not maintained anymore **

RabbitFeed logo

A gem providing asynchronous event publish and subscribe capabilities with RabbitMQ.

Core concepts

  • Fire and forget: Application can publish an event and it has no knowledge/care of how that event is consumed.
  • Persistent events: Once an event has been published, it will persist until it has been processed successfully.
  • Self-describing events: The event not only contains a payload, but also a schema that describes that payload.
  • Multiple subscribers: Multiple applications can subscribe to the same events.
  • Event versioning: Consumers can customize event handling based on the event version.

Installation

Add this line to your application's Gemfile:

gem 'rabbit_feed'

Configuration

Create a config/rabbit_feed.yml file. The following options should be specified:

environment:
  host: RabbitMQ host
  user: RabbitMQ user name
  password: RabbitMQ password
  application: Application name

Sample:

development:
  host: localhost
  user: guest
  password: guest
  application: beavers

Configuration options that can be specified are:

Name Use
host Hostname or IP of the RabbitMQ server
hosts Array of hostnames or IPs of a RabbitMQ cluster
port The port to use on the RabbitMQ server
user The user to authenticate with the RabbitMQ server
password The password to authenticate with the RabbitMQ server
application The name of the application - used for routing events to the specified consumers
environment The environment of the application - used for routing events to the specified consumers
exchange The name of the RabbitMQ exchange to which events are published
heartbeat The interval at which to send heartbeats to the RabbitMQ server (in seconds)
connect_timeout The timeout (in seconds) for connecting to the RabbitMQ server
network_recovery_interval Recovery interval (in seconds) periodic network recovery will use
auto_delete_queue If true, any queues created will auto-delete upon disconnect
auto_delete_exchange If true, any exchanges created will auto-delete upon disconnect
route_prefix_extension If set, the routing key/queue of the exchange will become env.route_prefix_extension.application.event
consumer_exit_after_fail If set, consumer will stop consuming as soon as it fails to process an event. By default, it will continue to consume and log the error(s)

Initialisation

If installing in a rails application, the following should be defined in config/initializers/rabbit_feed.rb:

# Define the events (if producing)
EventDefinitions do
  define_event('user_creates_beaver', version: '1.0.0') do
    defined_as do
      'A beaver has been created'
    end
    payload_contains do
      field('beaver_name', type: 'string', definition: 'The name of the beaver')
    end
  end
end
# Define the event routing (if consuming)
EventRouting do
  accept_from('beavers') do
    event('foo') do |event|
      # Do something...
    end
  end
end

You may also override the log location (defaults to log/rabbit_feed.log if a log directory exists, else STDOUT), the environment (defaults to the RAILS_ENV or RACK_ENV), and the path to the RabbitFeed config file (defaults to config/rabbit_feed.yml) in the initializer, like this:

RabbitFeed.instance_eval do
  self.log                     = Logger.new (Rails.root.join 'log', 'rabbit_feed.log')
  self.environment             = Rails.env
  self.configuration_file_path = Rails.root.join 'config', 'rabbit_feed.yml'
end

Logging in JSON

RabbitFeed log messages are constructed in such a way that they are friendly to JSON log formats. This is done to simplify log aggregation with tools like Kibana. To log in JSON format, set the RabbitFeed log to use the JSON formatter, e.g.

RabbitFeed.log.formatter = RabbitFeed::JsonLogFormatter

Producing events

The producer defines the events and their payloads using the Event Definitions DSL. In a rails app, this can be defined in the initialiser.

To produce an event:

require 'rabbit_feed'
RabbitFeed::Producer.publish_event 'Event name', { 'payload_field' => 'payload field value' }

Event name: This tells you what the event is.

Event payload: This is the data about the event. This should be a hash.

The event will be published to the configured exchange on RabbitMQ (amq.topic by default) with a routing key having the pattern of: [environment].[producer application name].[event name].

Returned Events

In the case that there are no consumers configured to subscribe to an event, the event will be returned to the producer. The returned event will be logged, and if your project uses Airbrake (v5.0+) or Airbrake-Ruby, an error will be reported there.

Testing the Producer

To prevent RabbitFeed from publishing events to RabbitMQ during tests, add the following to spec_helper.rb:

RSpec.configure do |config|
  RabbitFeed::TestingSupport.capture_published_events(config)
end

Or, if using Cucumber, put this in your env.rb:

Before do
  RabbitFeed::TestingSupport.capture_published_events_in_context(self)
end

RSpec

To verify that your application publishes an event, use the custom RSpec matcher provided with this application.

To make the custom RSpec matcher available to your tests, add the following to spec_helper.rb:

RSpec.configure do |config|
  RabbitFeed::TestingSupport.setup(config)
end

Validating an event was published by event name:

require 'spec_helper'

describe BeaversController do

  describe 'POST create' do
    it 'publishes a create event' do
      expect{
        post :create, beaver: { name: 'beaver' }
      }.to publish_event('user_creates_beaver')
    end
  end
end

Validating an event was published by event name and payload:

require 'spec_helper'

describe BeaversController do

  describe 'POST create' do
    it 'publishes a create event' do
      expect{
        post :create, beaver: { name: 'beaver' }
      }.to publish_event('user_creates_beaver', 'beaver_name' => 'beaver')
    end
  end
end

Validating an event was published by event name and partial payload match:

require 'spec_helper'

describe BeaversController do

  describe 'POST create' do
    it 'publishes a create event' do
      expect{
        post :create, beaver: { name: 'beaver' }
      }.to publish_event('user_creates_beaver').including('beaver_name' => 'beaver')
    end
  end
end

You can also use the .asserting chain, allowing you to provide your own expectation on the event's payload:

it 'publishes a create event' do
  expect{
    post :create, beaver: { name: 'beaver' }
  }.to publish_event('user_creates_beaver', nil).asserting do |payload|
    expect(payload['beaver_name']).to match(/ea/)
  end
end

Consuming events

The consumer defines to which events it will subscribe as well as how it handles events using the Event Routing DSL. In a rails app, this can be defined in the initialiser.

An Event contains the following information:

  • metadata Information about the event, including:
    • environment The environment in which the event was created (e.g. development, test, production)
    • application The name of the application that generated the event (as specified in rabbit_feed.yml)
    • version The version of the event payload (as specified in the event definition)
    • name The name of the event
    • host The hostname of the server on which the event was generated
    • created_at_utc The time (in UTC) that the event was created
    • schema_version The version of the event schema
  • payload The payload of the event

Running the consumer

bundle exec rabbit_feed consume --environment development

More information about the consumer command line options can be found here.

Event Processing Errors

In the case that your consumer raises an error whilst processing an event, the error will be logged. If your project uses Airbrake (v5.0+) or Airbrake-Ruby, the error will also be reported there. The event that was being processed will remain on the RabbitMQ queue, and will be redelivered to the consumer until it is processed without error.

Testing the Consumer

If you want to test that your routes are behaving as expected without actually using RabbitMQ, you can invoke rabbit_feed_consumer.consume_event(event). The following is an example:

describe 'consuming events' do
  accumulator = []

  let(:define_route) do
    EventRouting do
      accept_from('application_name') do
        event('event_name') do |event|
          accumulator << event
        end
      end
    end
  end

  let(:event) { RabbitFeed::Event.new({'application' => 'application_name', 'name' => 'event_name'}, {'stuff' => 'some_stuff'}) }

  before { define_route }

  it 'routes to the correct service' do
    rabbit_feed_consumer.consume_event(event)
    expect(accumulator.size).to eq(1)
  end
end

To make the rabbit_feed_consumer method available to your tests, add the following to spec_helper.rb:

RSpec.configure do |config|
  RabbitFeed::TestingSupport.setup(config)
end

Command Line Tools

Event Publish

bundle exec bin/rabbit_feed produce --payload 'Event payload' --name 'Event name' --environment test --config spec/fixtures/configuration.yml --logfile test.log --require rabbit_feed.rb --verbose

Publishes an event. Note: until you've specified the event definitions, this will not publish any events. Options are as follows:

--payload The payload of the event
--name The name of the event
--environment The environment to run in
--config The location of the rabbit_feed configuration file
--logfile The location of the log file
--require The project file containing the dependancies
--verbose Turns on DEBUG logging
--help Print the available options

Consumer

bundle exec bin/rabbit_feed consume --environment test --config spec/fixtures/configuration.yml --logfile test.log --require rabbit_feed.rb --pidfile rabbit_feed.pid --verbose --daemon

Starts a consumer. Note: until you've specified the event routing, this will not receive any events. Options are as follows:

--environment The environment to run in
--application The name of the application (used for routing events)
--config The location of the rabbit_feed configuration file
--logfile The location of the log file
--require The project file containing the dependancies (only necessary if running with non-rails application)
--pidfile The location at which to write a pid file
--verbose Turns on DEBUG logging
--daemon Run the consumer as a daemon
--help Print the available options

Stopping the consumer

bundle exec bin/rabbit_feed shutdown

This only applies if you've started the consumer with the --daemon option.

Console Consumer

bundle exec bin/rabbit_feed console --environment development --config spec/fixtures/configuration.yml --logfile development.log --pidfile rabbit_feed.pid --verbose

The console consumer will accept any event from any application and will print the event metadata and payload to the console. This is useful during development to get instant feedback on the events being published.

Example Console Output

RabbitFeed console starting at 2016-02-08 16:22:22 UTC...
Environment: development
Queue: development.rabbit_feed_console
There are currently 4 message(s) in the console's queue.
Would you like to purge the queue before proceeding? (y/N)>
n
Ready. Press CTRL+C to exit.
----------------------------user_creates_beaver: 2016-02-08 16:19:30 UTC----------------------------
#Event metadata
application: rails_app
created_at_utc: 2016-02-08T16:19:30.530210Z
environment: development
host: localhost
name: user_creates_beaver
schema_version: 2.0.0
version: 1.0.0
****************************************************************************************************
#Event payload
beaver_name: 02/08/16 16:19:30
----------------------------------------------------------------------------------------------------
1 events received.

Event Definitions DSL

Provides a means to define all events that are published by an application. Defines the event names and the payload associated with each event. The DSL is converted into a schema that is serialized along with the event payload, meaning the events are self-describing. This is accomplished using Apache Avro. This also validates the event payload against its schema before it is published.

Event definitions are cumulative, meaning you can load multiple EventDefinitions blocks.

Here is an example DSL:

EventDefinitions do
  define_event('user_creates_beaver', version: '1.0.0') do
    defined_as do
      'A beaver has been created'
    end
    payload_contains do
      field('beaver_name', type: 'string', definition: 'The name of the beaver')
    end
  end

  define_event('user_updates_beaver', version: '1.0.0') do
    defined_as do
      'A beaver has been updated'
    end
    payload_contains do
      field('beaver_name', type: 'string', definition: 'The name of the beaver')
    end
  end
end

This defines two events:

  1. user_creates_beaver
  2. user_updates_beaver

Each event has a mandatory string field in its payload, called beaver_name.

The available field types are described here.

Publishing a user_creates_beaver event would look like this:

RabbitFeed::Producer.publish_event 'user_creates_beaver', { 'beaver_name' => @beaver.name }

Event Routing DSL

Provides a means for consumers to specify to which events it will subscribe as well as how it handles events. This is accomplished using a custom DSL backed by a RabbitMQ topic exchange.

Event routing definitions are cumulative, meaning you can load multiple EventRouting blocks.

Here is an example DSL:

EventRouting do
  accept_from('beavers') do
    event('user_creates_beaver') do |event|
      puts event.payload
    end
    event('user_updates_beaver') do |event|
      puts event.payload
    end
  end
end

This will subscribe to specified events originating from the beavers application. We have specified that we would like to subcribe to user_creates_beaver and user_updates_beaver events. If either event type is received, we have specified that its payload will be printed to the screen.

When the consumer is started, it will create its queue named using this pattern: [environment].[consumer application name]. It will bind the queue to the amq.topic exchange on the routing keys as defined in the event routing. In this example, it will bind on:

environment.beavers.user_creates_beaver
environment.beavers.user_updates_beaver

Note: The consumer queues will automatically expire (delete) after 7 days without any consumer connections. This is to prevent unused queues from hanging around once their associated consumer has been terminated.

Wildcards

Applications that wish to handle events from any application or wish to handle any event can achieve this using the :any key.

For example, the following would consume any event published by RabbitFeed:

EventRouting do
  accept_from(:any) do
    event(:any) do |event|
      puts event.payload
    end
  end
end

In this example, the consumer queue will bind to the specified exchange on the following routing keys:

environment.*.*

Delivery Semantics

RabbitFeed provides 'at least once' delivery semantics. There are two use-cases where an event may be delivered more than once:

  1. If the subscriber raises an exception whilst processing an event, RabbitFeed will re-deliver the event to the subscriber until the event is processed without error.
  2. If an event is pushed to the subscriber, and the subscriber loses connectivity with RabbitMQ before it can send an acknowledgement back to RabbitMQ, RabbitMQ will push the event again once connectivity has been restored.

It is advisable to run RabbitFeed in a clustered RabbitMQ environment to prevent the loss of messages in the case that a RabbitMQ node is lost. By default, RabbitFeed will declare queues to be mirrored across all nodes of the cluster.

Thread Safety

RabbitFeed event publishing is thread-safe. This is achieved by only allowing one thread to publish an event at any given time.

Developing

See ./DEVELOPING.md for instructions on how to develop RabbitFeed

About

Not maintained anymore - A gem providing asynchronous event publish and subscribe capabilities with RabbitMQ

Resources

License

Stars

Watchers

Forks

Packages

No packages published