Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a message bus for transactional outbox deliveries #127

Merged
merged 2 commits into from
Aug 8, 2024

Conversation

errm
Copy link
Member

@errm errm commented Aug 8, 2024

A service that updates database entities, and also writes a related event to kafka can have some issues if we decide to wrap the database updates in a transaction, then some error or delay occurs when writing and event to kafka. Especially if the event is written synchronously.

This can also cause lock conntention and increased resource useage on the database server at scale.

One solution is to write the event stream to a dedicated database table, and have an additonal process to handle writing the events to kafka.

This means that the application doesn't need to manage it's own connections to kafka, and transactions can be used in the normal way without any downsides or performance degredation.

This change provides a new OutboxMessageBus that can be configured with an active record model.

e.g.

migration:

class CreateKafkaOutboxEvents < ActiveRecord::Migration[7.0]
  def change
    create_table :kafka_outbox_events do |t|
      t.string :topic
      t.string :key
      t.column :payload, :longblob # for avro - text would be more
appropriate for JSON
      t.timestamps
    end

    add_index :kafka_outbox_events, :topic
  end
end

config/initializers/streamy.rb:

require "streamy/message_buses/outbox_message_bus"
class KafkaOutboxEvent < ActiveRecord::Base; end
Streamy.message_bus = Streamy::MessageBuses::OutboxMessageBus.new(model: KafkaOutboxEvent)

This implimentation only allows for the use of a single table as the outbox. If we wanted to e.g. use a table per topic then the implimentation will need to be a bit more complex.

For now, I suspect that indexing on the topic collum will be good enough, as we can run multiple consuming workers each selecting a different topic concurrently.

We will only be able to use a single worker to select rows (with locking) in the consuming process where the backend is MySQL 5.7 however with an upgrade to MySQL 8+ we can make use of SKIP LOCKED to increase concurrency if required.

A service that updates database entities, and also writes a related
event to kafka can have some issues if we decide to wrap the database
updates in a transaction, then some error or delay occurs when writing
and event to kafka.  Especially if the event is written synchronously.

This can also cause lock conntention and increased resource useage
on the database server at scale.

* https://microservices.io/patterns/data/transactional-outbox.html
* https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html

One solution is to write the event stream to a dedicated database
table, and have an additonal process to handle writing the events
to kafka.

This means that the application doesn't need to manage it's own
connections to kafka, and transactions can be used in the normal
way without any downsides or performance degredation.

This change provides a new OutboxMessageBus that can be configured with
an active record model.

e.g.

migration:
```
class CreateKafkaOutboxEvents < ActiveRecord::Migration[7.0]
  def change
    create_table :kafka_outbox_events do |t|
      t.string :topic
      t.string :key
      t.column :payload, :longblob # for avro - text would be more
appropriate for JSON
      t.timestamps
    end

    add_index :kafka_outbox_events, :topic
  end
end
```

config/initializers/streamy.rb:
```
require "streamy/message_buses/outbox_message_bus"
class KafkaOutboxEvent < ActiveRecord::Base; end
Streamy.message_bus = Streamy::MessageBuses::OutboxMessageBus.new(model: KafkaOutboxEvent)
```

This implimentation only allows for the use of a single table as the
outbox.  If we wanted to e.g. use a table per topic then the
implimentation will need to be a bit more complex.

For now, I suspect that indexing on the topic collum will
be good enough, as we can run multiple consuming workers
each selecting a different topic concurrently.

We will only be able to use a single worker to select rows (with
locking) in the consuming process where the backend is MySQL 5.7
however with an upgrade to MySQL 8+ we can make use of `SKIP LOCKED`
to increase concurrency if required.
Ruby 3.4 moves OpenStruct to a bundled gem Ruby 3.5 will raise errors

e.g. rails/rails#51468
Copy link
Member

@robertomiranda robertomiranda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will only be able to use a single worker to select rows (with locking) in the consuming process where the backend is MySQL 5.7 however with an upgrade to MySQL 8+ we can make use of SKIP LOCKED to increase concurrency if required.

Make sense!

Looking good, thanks @errm!

@shimpeko
Copy link

shimpeko commented Aug 8, 2024

I've been sceptical about data integrity between Kafka and DB (ex. DB commit can fail after writing to Kafka). Great improvement.

@errm errm merged commit fb5c94e into main Aug 8, 2024
10 checks passed
@errm errm deleted the errm/outbox_message_bus branch August 8, 2024 12:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants