Skip to content

Commit

Permalink
introduce consumer:seek_partitions()
Browse files Browse the repository at this point in the history
This patch implements following function:

consumer:seek_partitions({{"topic", partition, offset}, ...}, {timeout_ms = ...})

See rd_kafka_seek_partitions
(https://github.com/edenhill/librdkafka/blob/d50099f9d8f0c8a62edca91bebc107cf9a3ec8a8/src/rdkafka.h#L3656)

Closes #76
  • Loading branch information
olegrok committed Jun 8, 2022
1 parent 53adab0 commit 9a0fae6
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 24 deletions.
62 changes: 62 additions & 0 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,68 @@ lua_consumer_store_offset(struct lua_State *L) {
return 0;
}

static ssize_t
wait_consumer_seek_partitions(va_list args) {
rd_kafka_t *rk = va_arg(args, rd_kafka_t *);
rd_kafka_topic_partition_list_t *list = va_arg(args, rd_kafka_topic_partition_list_t *);
int timeout_ms = va_arg(args, int);
rd_kafka_error_t **err = va_arg(args, rd_kafka_error_t **);
*err = rd_kafka_seek_partitions(rk, list, timeout_ms);
return 0;
}

int
lua_consumer_seek_partitions(struct lua_State *L) {
if (lua_gettop(L) != 3)
luaL_error(L, "Usage: err = consumer:seek_partitions({{topic, partition, offset}}, timeout_ms)");

consumer_t **consumer_p = luaL_checkudata(L, 1, consumer_label);
if (consumer_p == NULL || *consumer_p == NULL) {
lua_pushstring(L, "Broken consumer");
return 1;
}

luaL_checktype(L, 2, LUA_TTABLE);
int timeout_ms = luaL_checkint(L, 3);

size_t len = lua_objlen(L, 2);
rd_kafka_topic_partition_list_t *list = rd_kafka_topic_partition_list_new(len);
if (list == NULL)
luaL_error(L, "Out of memory: failed to allocate rd_kafka_topic_partition_list_t");

for (size_t i = 1; i <= len; i++) {
luaL_pushint64(L, i);
lua_gettable(L, 2);

luaL_pushint64(L, 1);
lua_gettable(L, -2);
const char *topic = lua_tostring(L, -1);
lua_pop(L, 1);

luaL_pushint64(L, 2);
lua_gettable(L, -2);
int32_t partition = lua_tointeger(L, -1);
lua_pop(L, 1);

luaL_pushint64(L, 3);
lua_gettable(L, -2);
int64_t offset = luaL_toint64(L, -1);
lua_pop(L, 2);

rd_kafka_topic_partition_list_add(list, topic, partition)->offset = offset;
}

rd_kafka_error_t *err = NULL;
coio_call(wait_consumer_seek_partitions,
(*consumer_p)->rd_consumer, list, timeout_ms, &err);
rd_kafka_topic_partition_list_destroy(list);
if (err != NULL) {
lua_pushstring(L, rd_kafka_error_string(err));
return 1;
}
return 0;
}

static ssize_t
wait_consumer_close(va_list args) {
rd_kafka_t *rd_consumer = va_arg(args, rd_kafka_t *);
Expand Down
3 changes: 3 additions & 0 deletions kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ lua_consumer_poll_rebalances(struct lua_State *L);
int
lua_consumer_store_offset(struct lua_State *L);

int
lua_consumer_seek_partitions(struct lua_State *L);

int
lua_consumer_close(struct lua_State *L);

Expand Down
33 changes: 17 additions & 16 deletions kafka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ function Consumer:close()
return ok
end

local function get_timeout_from_options(options)
local timeout_ms = DEFAULT_TIMEOUT_MS
if type(options) == 'table' and options.timeout_ms ~= nil then
timeout_ms = options.timeout_ms
end
return timeout_ms
end

function Consumer:subscribe(topics)
return self._consumer:subscribe(topics)
end
Expand All @@ -211,6 +219,11 @@ function Consumer:resume()
return self._consumer:resume()
end

function Consumer:seek_partitions(topic_partitions_list, options)
local timeout_ms = get_timeout_from_options(options)
return self._consumer:seek_partitions(topic_partitions_list, timeout_ms)
end

function Consumer:dump_conf()
if self._consumer == nil then
return
Expand All @@ -223,10 +236,7 @@ function Consumer:metadata(options)
return
end

local timeout_ms = DEFAULT_TIMEOUT_MS
if options ~= nil and options.timeout_ms ~= nil then
timeout_ms = options.timeout_ms
end
local timeout_ms = get_timeout_from_options(options)

return self._consumer:metadata(timeout_ms)
end
Expand All @@ -236,10 +246,7 @@ function Consumer:list_groups(options)
return
end

local timeout_ms = DEFAULT_TIMEOUT_MS
if options ~= nil and options.timeout_ms ~= nil then
timeout_ms = options.timeout_ms
end
local timeout_ms = get_timeout_from_options(options)

local group
if options ~= nil and options.group ~= nil then
Expand Down Expand Up @@ -410,10 +417,7 @@ function Producer:metadata(options)
return
end

local timeout_ms = DEFAULT_TIMEOUT_MS
if options ~= nil and options.timeout_ms ~= nil then
timeout_ms = options.timeout_ms
end
local timeout_ms = get_timeout_from_options(options)

local topic
if options ~= nil and options.topic ~= nil then
Expand All @@ -428,10 +432,7 @@ function Producer:list_groups(options)
return
end

local timeout_ms = DEFAULT_TIMEOUT_MS
if options ~= nil and options.timeout_ms ~= nil then
timeout_ms = options.timeout_ms
end
local timeout_ms = get_timeout_from_options(options)

local group
if options ~= nil and options.group ~= nil then
Expand Down
1 change: 1 addition & 0 deletions kafka/tnt_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
{"poll_errors", lua_consumer_poll_errors},
{"poll_rebalances", lua_consumer_poll_rebalances},
{"store_offset", lua_consumer_store_offset},
{"seek_partitions", lua_consumer_seek_partitions},
{"dump_conf", lua_consumer_dump_conf},
{"metadata", lua_consumer_metadata},
{"list_groups", lua_consumer_list_groups},
Expand Down
44 changes: 36 additions & 8 deletions tests/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ local function unsubscribe(topics)
log.info("consumer unsubscribed")
end

local function msg_totable(msg)
return {
value = msg:value(),
key = msg:key(),
topic = msg:topic(),
partition = msg:partition(),
offset = msg:offset(),
headers = msg:headers(),
}
end

local function append_message(t, msg)
table.insert(t, msg_totable(msg))
end

local function consume(timeout)
log.info("consume called")

Expand All @@ -105,14 +120,7 @@ local function consume(timeout)
if msg ~= nil then
log.info("%s", msg)
log.info("got msg with topic='%s' partition='%d' offset='%d' key='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value())
table.insert(consumed, {
value = msg:value(),
key = msg:key(),
topic = msg:topic(),
partition = msg:partition(),
offset = msg:offset(),
headers = msg:headers(),
})
append_message(consumed, msg)
local err = consumer:store_offset(msg)
if err ~= nil then
log.error("got error '%s' while committing msg from topic '%s'", err, msg:topic())
Expand Down Expand Up @@ -178,6 +186,24 @@ local function close()
log.info("consumer closed")
end

local function test_seek_partitions()
log.info('Test seek')
local messages = {}

local out = consumer:output()

for _ = 1, 5 do
local msg = out:get(3)
log.info('Get message: %s', json.encode(msg_totable(msg)))
append_message(messages, msg)
consumer:seek_partitions({
{msg:topic(), msg:partition(), msg:offset()}
}, 1000)
end

return messages
end

return {
create = create,
subscribe = subscribe,
Expand All @@ -193,4 +219,6 @@ return {
list_groups = list_groups,
pause = pause,
resume = resume,

test_seek_partitions = test_seek_partitions,
}
31 changes: 31 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json
import asyncio
from contextlib import contextmanager
import random
import string

import pytest
from aiokafka import AIOKafkaProducer
Expand All @@ -11,6 +13,11 @@
KAFKA_HOST = os.getenv("KAFKA_HOST", "kafka:9092")


def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))


def get_message_values(messages):
result = []
for msg in messages:
Expand Down Expand Up @@ -125,6 +132,30 @@ def test_consumer_should_consume_msgs():
assert msg['headers'] == {'key1': 'value1', 'key2': 'value2', 'nullable': None}


def test_consumer_seek_partitions():
key = "test_seek_unique_key"
value = "test_seek_unique_value"
message = {
"key": key,
"value": value,
}

topic = 'test_consumer_seek' + randomword(15)
write_into_kafka(topic, (message,))

server = get_server()

with create_consumer(server, KAFKA_HOST, {'group.id': 'consumer_seek'}):
server.call('consumer.subscribe', [[topic]])

response = server.call("consumer.test_seek_partitions")
assert len(response[0]) == 5

for item in response[0]:
assert item['key'] == key
assert item['value'] == value


def test_consumer_should_consume_msgs_from_multiple_topics():
message1 = {
"key": "test1",
Expand Down

0 comments on commit 9a0fae6

Please sign in to comment.