Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Payload Version 2 #13

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ spec/reports
test/tmp
test/version_tmp
tmp
node_modules/**/*
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ MyModel.multiple_man_publish(:seed)

3. Stop the seeder rake task when all of your messages have been processed. You can check your RabbitMQ server

## Versioning

Because you may have different versions of MultipleMan between publishers and subscribers,
MultipleMan attaches **versions** to every message sent. This ensures that updates to payloads,
metadata, etc. will not affect processing of your messages.

In general, a subscriber will not be able to process messages published by a newer version of
MultipleMan. We use **minor versions** to indicate changes that may contain a breaking change
to payload formats.

As a consequence, when upgrading MultipleMan, it's always safe to upgrade patch versions, but
when upgrading to a new major or minor version, ensure that you upgrade your subscribers
prior to upgrading your publishers (if two services both subscribe and publish, you'll need to
update them synchronously.)

Currently, the following versions support the following payload versions:

- **Payload v1** - 1.0.x
- **Payload v2** - 1.1.x

## Contributing

1. Fork it
Expand Down
5 changes: 4 additions & 1 deletion lib/multiple_man.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ module MultipleMan
require 'multiple_man/listeners/listener'
require 'multiple_man/listeners/seeder_listener'
require 'multiple_man/model_populator'
require 'multiple_man/identity'
require 'multiple_man/publish'

require 'multiple_man/channel_maintenance/gc'
require 'multiple_man/channel_maintenance/reaper'

require 'multiple_man/payload/payload'
require 'multiple_man/payload/v1'
require 'multiple_man/payload/v2'

def self.logger
configuration.logger
end
Expand Down
49 changes: 0 additions & 49 deletions lib/multiple_man/identity.rb

This file was deleted.

28 changes: 14 additions & 14 deletions lib/multiple_man/listeners/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@ def init_connection
attr_accessor :subscription, :connection

def listen

MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}."
queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, _, payload|
process_message(delivery_info, payload)
queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, properties, payload|
parsed_payload = MultipleMan::Payload.build(delivery_info, properties, JSON.parse(payload).with_indifferent_access)

begin
process_message(parsed_payload)
rescue Exception => ex
handle_error(ex, delivery_info)
else
MultipleMan.logger.debug " Successfully processed!"
queue.channel.acknowledge(delivery_info.delivery_tag, false)
end
end
end

def process_message(delivery_info, payload)
MultipleMan.logger.info "Processing message for #{delivery_info.routing_key}."
begin
payload = JSON.parse(payload).with_indifferent_access
subscription.send(operation(delivery_info, payload), payload)
rescue ex
handle_error(ex, delivery_info)
else
MultipleMan.logger.debug " Successfully processed!"
queue.channel.acknowledge(delivery_info.delivery_tag, false)
end
def process_message(payload)
MultipleMan.logger.info "Processing message for #{payload}."
subscription.send(payload.operation, payload)
end

def handle_error(ex, delivery_info)
Expand Down
9 changes: 4 additions & 5 deletions lib/multiple_man/model_populator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ def initialize(record, fields)
end

def populate(payload)
data = payload[:id].merge(payload[:data])
fields_for(data).each do |field|
fields_for(payload).each do |field|
source, dest = field.is_a?(Array) ? field : [field, field]
populate_field(dest, data[source])
populate_field(dest, payload[source])
end
record
end
Expand All @@ -37,8 +36,8 @@ def populate_field(field, value)
end
end

def fields_for(data)
fields || data.keys
def fields_for(payload)
fields || payload.keys
end
end
end
2 changes: 1 addition & 1 deletion lib/multiple_man/model_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def push_record(connection, record, operation)

MultipleMan.logger.debug(" Record Data: #{data} | Routing Key: #{routing_key}")

connection.topic.publish(data.payload, routing_key: routing_key)
connection.topic.publish(data.payload, routing_key: routing_key, headers: data.headers)
end

def all_records(records, &block)
Expand Down
12 changes: 12 additions & 0 deletions lib/multiple_man/payload/payload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class MultipleMan::Payload
def self.build(delivery_info, properties, data)
case properties.headers["version"]
when "1", nil
V1.new(delivery_info, properties, data)
when "2"
V2.new(delivery_info, properties, data)
else
raise "This version of MultipleMan does not support the payload version supplied (#{properties.headers["version"]}). Please upgrade to the latest version."
end
end
end
34 changes: 34 additions & 0 deletions lib/multiple_man/payload/v1.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

class MultipleMan::Payload::V1
def initialize(delivery_info, properties, payload)
self.payload = payload
self.delivery_info = delivery_info
end

def keys
(payload['data'].keys + payload['id'].keys).uniq
end

def [](value)
payload['data'][value.to_s] || payload['id'][value.to_s]
end

def identify_by
if payload['id'].is_a?(Hash)
payload['id']
else
{'multiple_man_identifier' => payload['id']}
end
end

def operation
payload['operation'] || delivery_info.routing_key.split('.').last
end

def to_s
delivery_info.routing_key
end

private
attr_accessor :payload, :delivery_info
end
37 changes: 37 additions & 0 deletions lib/multiple_man/payload/v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

class MultipleMan::Payload::V2
def initialize(delivery_info, properties, payload)
self.payload = payload
self.delivery_info = delivery_info
self.properties = properties
end

def keys
payload.keys
end

def [](value)
payload[value.to_s]
end

def identify_by
Hash[identify_by_header.map do |key|
[key, payload[key]]
end]
end

def operation
delivery_info.routing_key.split('.').last
end

def to_s
delivery_info.routing_key
end

private
attr_accessor :payload, :delivery_info, :properties

def identify_by_header
JSON.parse(properties.headers['identify_by'])
end
end
18 changes: 10 additions & 8 deletions lib/multiple_man/payload_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ def initialize(record, operation = :create, options = {})
end

def payload
{
type: type,
operation: operation,
id: id,
data: data
}.to_json
data.to_json
end

def headers
{
'version' => '2',
'identify_by' => identify_by.to_json
}
end

def type
options[:as] || record.class.name
end

def id
Identity.build(record, options).value
def identify_by
[* (options[:identify_by] || :id) ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array(options[:identify_by] || :id)

end

def data
Expand Down
8 changes: 4 additions & 4 deletions lib/multiple_man/subscribers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ def initialize(klass)

attr_reader :klass

def create(_)
def create(payload)
# noop
end

def update(_)
def update(payload)
# noop
end

def destroy(_)
def destroy(payload)
# noop
end

def seed(_)
def seed(payload)
# noop
end

Expand Down
15 changes: 5 additions & 10 deletions lib/multiple_man/subscribers/model_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,23 @@ def initialize(klass, options)
attr_accessor :options

def create(payload)
id = payload[:id]
model = find_model(id)
MultipleMan::ModelPopulator.new(model, options[:fields]).populate(id: find_conditions(id), data: payload[:data])
model = find_model(payload)
MultipleMan::ModelPopulator.new(model, options[:fields]).populate(payload)
model.save!
end

alias_method :update, :create
alias_method :seed, :create

def destroy(payload)
model = find_model(payload[:id])
model = find_model(payload)
model.destroy!
end

private

def find_model(id)
model_class.where(find_conditions(id)).first || model_class.new
end

def find_conditions(id)
id.kind_of?(Hash) ? cleanse_id(id) : {multiple_man_identifier: id}
def find_model(payload)
model_class.where(cleanse_id(payload.identify_by)).first || model_class.new
end

def cleanse_id(hash)
Expand Down
2 changes: 1 addition & 1 deletion lib/multiple_man/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module MultipleMan
VERSION = "0.8.1"
VERSION = "1.1.0"
end
43 changes: 0 additions & 43 deletions spec/identity_spec.rb

This file was deleted.

Loading