Skip to content

Commit

Permalink
introduce a function for configuration dump
Browse files Browse the repository at this point in the history
This patch introduces "dump_conf" function for producer and
consumer that returns current producer/consumer configuration.
It's could be quite useful because returns also default of
configuration parameters that wasn't specified by user.

Return type is table with key(option name) and value (option
value). Each field has "string" type. Or nil in case of error.

Closes #53
  • Loading branch information
olegrok authored and filonenko-mikhail committed Dec 15, 2021
1 parent 378b4de commit 80ee985
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 6 deletions.
31 changes: 26 additions & 5 deletions kafka/common.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#include <lua.h>
#include <librdkafka/rdkafka.h>

#include <common.h>

const char* const consumer_label = "__tnt_kafka_consumer";
Expand Down Expand Up @@ -36,7 +34,30 @@ lua_push_error(struct lua_State *L) {
*/
int
lua_librdkafka_version(struct lua_State *L) {
const char *version = rd_kafka_version_str();
lua_pushstring(L, version);
return 1;
const char *version = rd_kafka_version_str();
lua_pushstring(L, version);
return 1;
}

int lua_librdkafka_dump_conf(struct lua_State *L, rd_kafka_t *rk) {
if (rk != NULL) {
const rd_kafka_conf_t *conf = rd_kafka_conf(rk);
if (conf == NULL)
return 0;

size_t cntp = 0;
const char **confstr = rd_kafka_conf_dump((rd_kafka_conf_t *)conf, &cntp);
if (confstr == NULL)
return 0;

lua_newtable(L);
for (size_t i = 0; i < cntp; i += 2) {
lua_pushstring(L, confstr[i]);
lua_pushstring(L, confstr[i + 1]);
lua_settable(L, -3);
}
rd_kafka_conf_dump_free(confstr, cntp);
return 1;
}
return 0;
}
3 changes: 3 additions & 0 deletions kafka/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <lua.h>
#include <lualib.h>
#include <lauxlib.h>
#include <librdkafka/rdkafka.h>

#include <tarantool/module.h>

Expand All @@ -26,6 +27,8 @@ int safe_pushstring(struct lua_State *L, char *str);

int lua_librdkafka_version(struct lua_State *L);

int lua_librdkafka_dump_conf(struct lua_State *L, rd_kafka_t *rk);

/**
* Push native lua error with code -3
*/
Expand Down
11 changes: 11 additions & 0 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -794,3 +794,14 @@ lua_create_consumer(struct lua_State *L) {
lua_setmetatable(L, -2);
return 1;
}

int
lua_consumer_dump_conf(struct lua_State *L) {
consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label);
if (consumer_p == NULL || *consumer_p == NULL)
return 0;

if ((*consumer_p)->rd_consumer != NULL)
return lua_librdkafka_dump_conf(L, (*consumer_p)->rd_consumer);
return 0;
}
2 changes: 2 additions & 0 deletions kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ int lua_consumer_destroy(struct lua_State *L);

int lua_create_consumer(struct lua_State *L);

int lua_consumer_dump_conf(struct lua_State *L);

#endif //TNT_KAFKA_CONSUMER_H
14 changes: 14 additions & 0 deletions kafka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ function Consumer:store_offset(message)
return self._consumer:store_offset(message)
end

function Consumer:dump_conf()
if self._consumer == nil then
return
end
return self._consumer:dump_conf()
end

local Producer = {}

Producer.__index = Producer
Expand Down Expand Up @@ -345,6 +352,13 @@ function Producer:produce(msg)
return err
end

function Producer:dump_conf()
if self._producer == nil then
return
end
return self._producer:dump_conf()
end

function Producer:close()
if self._producer == nil then
return false
Expand Down
11 changes: 11 additions & 0 deletions kafka/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,17 @@ lua_producer_close(struct lua_State *L) {
return 1;
}

int
lua_producer_dump_conf(struct lua_State *L) {
producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label);
if (producer_p == NULL || *producer_p == NULL)
return 0;

if ((*producer_p)->rd_producer != NULL)
return lua_librdkafka_dump_conf(L, (*producer_p)->rd_producer);
return 0;
}

int
lua_producer_destroy(struct lua_State *L) {
producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label);
Expand Down
2 changes: 2 additions & 0 deletions kafka/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ int lua_create_producer(struct lua_State *L);

int lua_producer_destroy(struct lua_State *L);

int lua_producer_dump_conf(struct lua_State *L);

#endif //TNT_KAFKA_PRODUCER_H
2 changes: 2 additions & 0 deletions kafka/tnt_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
{"poll_errors", lua_consumer_poll_errors},
{"poll_rebalances", lua_consumer_poll_rebalances},
{"store_offset", lua_consumer_store_offset},
{"dump_conf", lua_consumer_dump_conf},
{"close", lua_consumer_close},
{"destroy", lua_consumer_destroy},
{"__tostring", lua_consumer_tostring},
Expand Down Expand Up @@ -64,6 +65,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
{"poll_logs", lua_producer_poll_logs},
{"poll_stats", lua_producer_poll_stats},
{"poll_errors", lua_producer_poll_errors},
{"dump_conf", lua_producer_dump_conf},
{"close", lua_producer_close},
{"destroy", lua_producer_destroy},
{"__tostring", lua_producer_tostring},
Expand Down
5 changes: 5 additions & 0 deletions tests/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ local function get_rebalances()
return rebalances
end

local function dump_conf()
return consumer:dump_conf()
end

local function close()
log.info("closing consumer")
local _, err = consumer:close()
Expand All @@ -157,4 +161,5 @@ return {
get_logs = get_logs,
get_stats = get_stats,
get_rebalances = get_rebalances,
dump_conf = dump_conf,
}
7 changes: 6 additions & 1 deletion tests/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ local function produce(messages)
end
end

local function dump_conf()
return producer:dump_conf()
end

local function get_errors()
return errors
end
Expand All @@ -76,7 +80,7 @@ local function get_stats()
end

local function close()
local ok, err = producer:close()
local _, err = producer:close()
if err ~= nil then
log.error("got err %s", err)
box.error{code = 500, reason = err}
Expand All @@ -90,4 +94,5 @@ return {
get_logs = get_logs,
get_stats = get_stats,
close = close,
dump_conf = dump_conf,
}
14 changes: 14 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,20 @@ def test_consumer_stats():
assert stat['type'] == 'consumer'


def test_consumer_dump_conf():
server = get_server()

with create_consumer(server, "kafka:9090"):
time.sleep(2)

response = server.call("consumer.dump_conf", [])
assert len(response) > 0
assert len(response[0]) > 0
assert 'session.timeout.ms' in response[0]
assert 'socket.max.fails' in response[0]
assert 'compression.codec' in response[0]


def test_consumer_should_log_debug():
server = get_server()

Expand Down
17 changes: 17 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ def test_producer_stats():
server.call("producer.close", [])


def test_producer_dump_conf():
server = get_server()

server.call("producer.create", ["kafka:9090"])

time.sleep(2)

response = server.call("producer.dump_conf", [])
assert len(response) > 0
assert len(response[0]) > 0
assert 'session.timeout.ms' in response[0]
assert 'socket.max.fails' in response[0]
assert 'compression.codec' in response[0]

server.call("producer.close", [])


def test_producer_should_log_debug():
server = get_server()

Expand Down

0 comments on commit 80ee985

Please sign in to comment.