ksqlDB Client

KSQL is a ksqlDB Ruby client that focuses on ease of use. Supports all recent ksqlDB features and does not have any heavyweight dependencies.

What is ksqlDB?

ksqlDB is a database purpose-built for Apache Kafka® streams processing applications, more details here.

Official KLIP:


Add this line to your application's Gemfile:

gem 'ksql'

And then execute:

$ bundle install

Or install it yourself as:

$ gem install ksql


The gem allows you to perform requests to ksqlDB REST API. Checkout the ksqlDB official documentation here.

Table of contents


The gem requires a minimum configuration to connect to ksqlDB, it is shipped with a built-in generator to create a Rails initializer.

$ rails generate ksql
Ksql.configure do |config| = 'http://localhost:8088' # Required
  # config.auth = 'user:password' # optional

All statements, except those starting with SELECT can be run with the ksql method:

Ksql::Client.ksql("SHOW TABLES;")

# Or

Ksql::Client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);")

You can also pass optional properties to the method (Check the official documentation if needed).

Ksql::Client.ksql("INSERT INTO ...", command_sequence_number: 42, streams_properties: { ... }, session_variables: { ... })

As well as custom Headers.

Ksql::Client.ksql("INSERT INTO ...", headers: { ... })

ksqlDB has three kinds of Queries:

Persistent queries are server-side queries and there's nothing much to say here.

In case you want to close a Persistent Query you can do so with the close_query method, passing the Query ID:


WARNING: There's a known issue here, read below.

Push queries enable you to query a stream or materialized table with a subscription to the results. They run asynchronously and you can spot them as they include the EMIT keyword at the end of the SQL statement.

To define a Push query connection simply call the stream method:

stream ="SELECT * FROM riderLocations EMIT CHANGES;")

You can specify what action to take when an exception gets raised:

stream.on_error do |e|
  puts e.message

or what to do when the connection gets closed:

stream.on_close do
  puts 'Session closed!'

To start the stream, call the start method, It accepts a block to execute each time a message gets recieved:

stream.start do |location|
  # The streaming events get wrapped inside an ORM-like Class
  puts location.latitude
  puts location.longitude

CAREFUL: The block gets executed inside a separated Thread, make sure your code is thread safe!

You can close the connection by calling the close method:



# Define the Stream
stream ="SELECT * FROM riderLocations EMIT CHANGES;")

# Start the connection
stream.start do |location|
  # This code will get executed inside a separated Thread
  puts location.latitude
  puts location.longitude

# Do something

# Close the connection

You can also specify custom Properties as well as Headers and Session Variables:

stream ="SELECT * FROM riderLocations EMIT CHANGES;", headers: { ... }, properties: { ... }, session_variables: { ... })
# ...

Pull queries are the most "traditional" ones, they run synchronously and they can be executed with the query method"

locations = Ksql::Client.query("SELECT * FROM riderLocations;")

An Enumerable collection of ORM-like Objects is returned. Iteration methods are available on the collection:

locations.each do |location|
  # do something
end do |location|
  # do something


The client allows you to introspect the cluster status with the cluster_status method.

Careful: The /clusterStatus endpoint is not enabled by default, read more here


You can also check the health of your ksqlDB server by calling the health_check method.


To get information about the status of a ksqlDB Server call the info method.

You can terminate the cluster and clean up the resources calling the terminate method.


You can provide a list of kafka topic names or regular expressions for Kafka topic names along to delete all topics with names that are in the list or that match any of the regular expressions in the list.

  Ksql::Client.terminate(delete_topic_list: ["FOO", "bar.*"])


The following example is from the official ksqlDB Quickstart:

require 'ksql'
require 'logger'

logger =
logger.level = Logger::INFO

# Create a stream

Ksql::Client.ksql("CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
  WITH (kafka_topic='locations', value_format='json', partitions=1);")

# Create materialized views

Ksql::Client.ksql("CREATE TABLE currentLocation AS
  SELECT profileId,
         LATEST_BY_OFFSET(latitude) AS la,
         LATEST_BY_OFFSET(longitude) AS lo
  FROM riderlocations
  GROUP BY profileId

# Create materialized views

Ksql::Client.ksql("CREATE TABLE ridersNearMountainView AS
  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
         COLLECT_LIST(profileId) AS riders,
         COUNT(*) AS count
  FROM currentLocation
  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);")

# Run a push query over the stream

stream ="SELECT * FROM riderLocations
  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;")

# Handle exceptions

stream.on_error do |e|

# Handle stream close

stream.on_close do"Stream closed!")

# Start the stream

stream.start do |location|
  # Print the result into a file'output.log', 'a') do |f|
    f.puts "Latitude: #{location.latitude}, Longitude: #{location.longitude}"

# Populate the stream with events

Ksql::Client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude)
  VALUES ('c2309eec', 37.7877, -122.4205);")
Ksql::Client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude)
  VALUES ('18f4ea86', 37.3903, -122.0643);")
Ksql::Client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude)
  VALUES ('4ab5cbad', 37.3952, -122.0813);")
Ksql::Client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude)
  VALUES ('8b6eae59', 37.3944, -122.0813);")
Ksql::Client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude)
  VALUES ('4a7c7b41', 37.4049, -122.0822);")
Ksql::Client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude)
  VALUES ('4ddad000', 37.7857, -122.4011);")

# Run a Pull query against the materialized view

locations = Ksql::Client.query("SELECT * from ridersNearMountainView
  WHERE distanceInMiles <= 10;")

# Close the stream


# Drop

Ksql::Client.ksql('DROP TABLE IF EXISTS ridersNearMountainView;')
Ksql::Client.ksql('DROP TABLE IF EXISTS currentLocation;')
Ksql::Client.ksql('DROP STREAM IF EXISTS riderLocations;')

Supported ksqlDB versions

0.25 ✔️ Supported
0.24 ✔️ Supported
0.23 ✔️ Supported
0.22 ✔️ Supported
Older ❌ Untested

Known issues

ksqlDB close-query

Although it actually works, at the moment, the latest release of ksqlDB returns an error each time you request the /close-query endpoint. Therefore the query will correctly get closed but an error will get returned anyways.

Official issue here.


After checking out the repo, run bundle install to install dependencies. Then, run bundle exec rspec spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

Make sure you have a working instance of ksqlDB, you can find the official quickstart here.


