Skip to content
/ bus Public

A simple and lean service bus implementation for Node.js

License

Notifications You must be signed in to change notification settings

boringnode/bus

Repository files navigation

@boringnode/bus

typescript-image gh-workflow-image npm-image npm-download-image license-image


@boringnode/bus is a service bus implementation for Node.js. It is designed to be simple and easy to use.

Currently, it supports the following transports:

👉 Memory: A simple in-memory transport for testing purposes.
👉 Redis: A Redis transport for production usage.
👉 Mqtt: A Mqtt transport for production usage.

Table of Contents

Installation

npm install @boringnode/bus

Usage

The module exposes a manager that can be used to register buses.

import { BusManager } from '@boringnode/bus'
import { redis } from '@boringnode/bus/transports/redis'
import { mqtt } from '@boringnode/bus/transports/mqtt'
import { memory } from '@boringnode/bus/transports/memory'

const manager = new BusManager({
  default: 'main',
  transports: {
    main: {
      transport: memory(),
    },
    redis: {
      transport: redis({
        host: 'localhost',
        port: 6379,
      }),
    },
    mqtt: {
      transport: mqtt({
        host: 'localhost',
        port: 1883,
      }),
    },
  }
})

Once the manager is created, you can subscribe to channels and publish messages.

manager.subscribe('channel', (message) => {
  console.log('Received message', message)
})

manager.publish('channel', 'Hello world')

By default, the bus will use the default transport. You can specify different transport by using the use method.

manager.use('redis').publish('channel', 'Hello world')
manager.use('mqtt').publish('channel', 'Hello world')

Without the manager

If you don't need multiple buses, you can create a single bus directly by importing the transports and the Bus class.

import { Bus } from '@boringnode/bus'
import { RedisTransport } from '@boringnode/bus/transports/redis'

const transport = new RedisTransport({
  host: 'localhost',
  port: 6379,
})

const bus = new Bus(transport, {
  retryQueue: {
    retryInterval: '100ms'
  }
})

Retry Queue

The bus also supports a retry queue. When a message fails to be published, it will be moved to the retry queue.

For example, your Redis server is down.

const manager = new BusManager({
  default: 'main',
  transports: {
    main: {
      transport: redis({
        host: 'localhost',
        port: 6379,
      }),
      retryQueue: {
        retryInterval: '100ms'
      }
    },
  }
})

manager.use('redis').publish('channel', 'Hello World')

The message will be moved to the retry queue and will be retried every 100ms.

You have multiple options to configure the retry queue.

export interface RetryQueueOptions {
  // Enable the retry queue (default: true)
  enabled?: boolean
  
  // Defines if we allow duplicates messages in the retry queue (default: true)
  removeDuplicates?: boolean
  
  // The maximum size of the retry queue (default: null)
  maxSize?: number | null
  
  // The interval between each retry (default: false)
  retryInterval?: Duration | false
}

Test helpers

The module also provides some test helpers to make it easier to test the code that relies on the bus. First, you can use the MemoryTransport to create a bus that uses an in-memory transport.

You can also use the ChaosTransport to simulate a transport that fails randomly, in order to test the resilience of your code.

import { Bus } from '@boringnode/bus'
import { ChaosTransport } from '@boringnode/bus/test_helpers'

const buggyTransport = new ChaosTransport(new MemoryTransport())
const bus = new Bus(buggyTransport)

/**
 * Now, every time you will try to publish a message, the transport 
 * will throw an error.
 */
buggyTransport.alwaysThrow()