Skip to content

Commit

Permalink
Fix use-after-free during consumer close
Browse files Browse the repository at this point in the history
Some refactoring was performed but seems after recent patch [1]
the problem was still relevant because pooler thread could access
consumer after destroy. Fix it joining this thread before consumer
destroy.

Co-authored: Oleg Babin <[email protected]>
  • Loading branch information
hackallcode authored and filonenko-mikhail committed Dec 21, 2021
1 parent 80ee985 commit c52df71
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 141 deletions.
8 changes: 1 addition & 7 deletions kafka/callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,11 @@ rebalance_callback(rd_kafka_t *consumer, rd_kafka_resp_err_t err, rd_kafka_topic
*/

event_queues_t *new_event_queues() {
event_queues_t *event_queues = malloc(sizeof(event_queues_t));
event_queues->error_queue = NULL;
event_queues_t *event_queues = calloc(1, sizeof(event_queues_t));
event_queues->error_cb_ref = LUA_REFNIL;
event_queues->log_queue = NULL;
event_queues->log_cb_ref = LUA_REFNIL;
event_queues->stats_queue = NULL;
event_queues->stats_cb_ref = LUA_REFNIL;
event_queues->delivery_queue = NULL;
event_queues->rebalance_cb_ref = LUA_REFNIL;
event_queues->rebalance_queue = NULL;
event_queues->consume_queue = NULL;
return event_queues;
}

Expand Down
14 changes: 0 additions & 14 deletions kafka/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,6 @@ const char* const consumer_label = "__tnt_kafka_consumer";
const char* const consumer_msg_label = "__tnt_kafka_consumer_msg";
const char* const producer_label = "__tnt_kafka_producer";

int
save_pushstring_wrapped(struct lua_State *L) {
char *str = (char *)lua_topointer(L, 1);
lua_pushstring(L, str);
return 1;
}

int
safe_pushstring(struct lua_State *L, char *str) {
lua_pushcfunction(L, save_pushstring_wrapped);
lua_pushlightuserdata(L, str);
return lua_pcall(L, 1, 1, 0);
}

/**
* Push native lua error with code -3
*/
Expand Down
2 changes: 0 additions & 2 deletions kafka/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ const char* const producer_label;

int save_pushstring_wrapped(struct lua_State *L);

int safe_pushstring(struct lua_State *L, char *str);

int lua_librdkafka_version(struct lua_State *L);

int lua_librdkafka_dump_conf(struct lua_State *L, rd_kafka_t *rk);
Expand Down
109 changes: 46 additions & 63 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ stop_poller(va_list args) {
}

void
destroy_consumer_poller(consumer_poller_t *poller) {
stop_consumer_poller(consumer_poller_t *poller) {
// stopping polling thread
coio_call(stop_poller, poller);
}

void
destroy_consumer_poller(consumer_poller_t *poller) {
pthread_attr_destroy(&poller->attr);
pthread_mutex_destroy(&poller->lock);

free(poller);
}

Expand Down Expand Up @@ -155,11 +157,8 @@ lua_consumer_subscribe(struct lua_State *L) {

rd_kafka_resp_err_t err = rd_kafka_subscribe(consumer->rd_consumer, consumer->topics);
if (err) {
const char *const_err_str = rd_kafka_err2str(err);
char err_str[512];
strncpy(err_str, const_err_str, sizeof(err_str) - 1);
int fail = safe_pushstring(L, err_str);
return fail ? lua_push_error(L): 1;
lua_pushstring(L, rd_kafka_err2str(err));
return 1;
}

return 0;
Expand Down Expand Up @@ -193,20 +192,14 @@ lua_consumer_unsubscribe(struct lua_State *L) {
if (consumer->topics->cnt > 0) {
rd_kafka_resp_err_t err = rd_kafka_subscribe(consumer->rd_consumer, consumer->topics);
if (err) {
const char *const_err_str = rd_kafka_err2str(err);
char err_str[512];
strncpy(err_str, const_err_str, sizeof(err_str) - 1);
int fail = safe_pushstring(L, err_str);
return fail ? lua_push_error(L): 1;
lua_pushstring(L, rd_kafka_err2str(err));
return 1;
}
} else {
rd_kafka_resp_err_t err = rd_kafka_unsubscribe(consumer->rd_consumer);
if (err) {
const char *const_err_str = rd_kafka_err2str(err);
char err_str[512];
strncpy(err_str, const_err_str, sizeof(err_str) - 1);
int fail = safe_pushstring(L, err_str);
return fail ? lua_push_error(L): 1;
lua_pushstring(L, rd_kafka_err2str(err));
return 1;
}
}

Expand Down Expand Up @@ -257,10 +250,7 @@ lua_consumer_poll_logs(struct lua_State *L) {
consumer_t *consumer = lua_check_consumer(L, 1);
if (consumer->event_queues == NULL || consumer->event_queues->log_queue == NULL || consumer->event_queues->log_cb_ref == LUA_REFNIL) {
lua_pushnumber(L, 0);
int fail = safe_pushstring(L, "Consumer poll logs error: callback for logs is not set");
if (fail) {
return lua_push_error(L);
}
lua_pushliteral(L, "Consumer poll logs error: callback for logs is not set");
return 2;
}

Expand Down Expand Up @@ -291,10 +281,7 @@ lua_consumer_poll_logs(struct lua_State *L) {
}
lua_pushnumber(L, (double)count);
if (err_str != NULL) {
int fail = safe_pushstring(L, err_str);
if (fail) {
return lua_push_error(L);
}
lua_pushstring(L, err_str);
} else {
lua_pushnil(L);
}
Expand Down Expand Up @@ -351,10 +338,7 @@ lua_consumer_poll_errors(struct lua_State *L) {
consumer_t *consumer = lua_check_consumer(L, 1);
if (consumer->event_queues == NULL || consumer->event_queues->error_queue == NULL || consumer->event_queues->error_cb_ref == LUA_REFNIL) {
lua_pushnumber(L, 0);
int fail = safe_pushstring(L, "Consumer poll errors error: callback for logs is not set");
if (fail) {
return lua_push_error(L);
}
lua_pushliteral(L, "Consumer poll errors error: callback for logs is not set");
return 2;
}

Expand Down Expand Up @@ -383,10 +367,7 @@ lua_consumer_poll_errors(struct lua_State *L) {
}
lua_pushnumber(L, (double)count);
if (err_str != NULL) {
int fail = safe_pushstring(L, err_str);
if (fail) {
return lua_push_error(L);
}
lua_pushstring(L, err_str);
} else {
lua_pushnil(L);
}
Expand Down Expand Up @@ -481,10 +462,7 @@ lua_consumer_poll_rebalances(struct lua_State *L) {
consumer_t *consumer = lua_check_consumer(L, 1);
if (consumer->event_queues == NULL || consumer->event_queues->rebalance_queue == NULL || consumer->event_queues->rebalance_cb_ref == LUA_REFNIL) {
lua_pushnumber(L, 0);
int fail = safe_pushstring(L, "Consumer poll rebalances error: callback for rebalance is not set");
if (fail) {
return lua_push_error(L);
}
lua_pushliteral(L, "Consumer poll rebalances error: callback for rebalance is not set");
return 2;
}

Expand Down Expand Up @@ -527,10 +505,7 @@ lua_consumer_poll_rebalances(struct lua_State *L) {

lua_pushnumber(L, (double)count);
if (err_str != NULL) {
int fail = safe_pushstring(L, err_str);
if (fail) {
return lua_push_error(L);
}
lua_pushstring(L, err_str);
} else {
lua_pushnil(L);
}
Expand All @@ -545,11 +520,8 @@ lua_consumer_store_offset(struct lua_State *L) {
msg_t *msg = lua_check_consumer_msg(L, 2);
rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->topic, msg->partition, msg->offset);
if (err) {
const char *const_err_str = rd_kafka_err2str(err);
char err_str[512];
strncpy(err_str, const_err_str, sizeof(err_str) - 1);
int fail = safe_pushstring(L, err_str);
return fail ? lua_push_error(L): 1;
lua_pushstring(L, rd_kafka_err2str(err));
return 1;
}
return 0;
}
Expand Down Expand Up @@ -586,8 +558,14 @@ wait_consumer_destroy(va_list args) {

void
consumer_destroy(struct lua_State *L, consumer_t *consumer) {
if (consumer->topics != NULL)
if (consumer->rd_consumer != NULL) {
stop_consumer_poller(consumer->poller);
}

if (consumer->topics != NULL) {
rd_kafka_topic_partition_list_destroy(consumer->topics);
consumer->topics = NULL;
}

/*
* Here we close consumer and only then destroys other stuff.
Expand All @@ -599,13 +577,18 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) {
/* Destroy handle */
// FIXME: kafka_destroy hangs forever
coio_call(wait_consumer_destroy, consumer->rd_consumer);
consumer->rd_consumer = NULL;
}

if (consumer->poller != NULL)
if (consumer->poller != NULL) {
destroy_consumer_poller(consumer->poller);
consumer->poller = NULL;
}

if (consumer->event_queues != NULL)
if (consumer->event_queues != NULL) {
destroy_event_queues(L, consumer->event_queues);
consumer->event_queues = NULL;
}

free(consumer);
}
Expand Down Expand Up @@ -650,8 +633,8 @@ lua_create_consumer(struct lua_State *L) {
lua_pop(L, 1);
if (brokers == NULL) {
lua_pushnil(L);
int fail = safe_pushstring(L, "consumer config table must have non nil key 'brokers' which contains string");
return fail ? lua_push_error(L): 2;
lua_pushstring(L, "consumer config table must have non nil key 'brokers' which contains string");
return 2;
}

char errstr[512];
Expand All @@ -667,16 +650,16 @@ lua_create_consumer(struct lua_State *L) {
// stack now contains: -1 => value; -2 => key; -3 => table
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
int fail = safe_pushstring(L, "consumer config default topic options must contains only string keys and string values");
return fail ? lua_push_error(L): 2;
lua_pushliteral(L, "consumer config default topic options must contains only string keys and string values");
return 2;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
int fail = safe_pushstring(L, errstr);
return fail ? lua_push_error(L): 2;
lua_pushstring(L, errstr);
return 2;
}

// pop value, leaving original key
Expand Down Expand Up @@ -742,16 +725,16 @@ lua_create_consumer(struct lua_State *L) {
// stack now contains: -1 => value; -2 => key; -3 => table
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
int fail = safe_pushstring(L, "consumer config options must contains only string keys and string values");
return fail ? lua_push_error(L): 2;
lua_pushliteral(L, "consumer config options must contains only string keys and string values");
return 2;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
int fail = safe_pushstring(L, errstr);
return fail ? lua_push_error(L): 2;
lua_pushstring(L, errstr);
return 2;
}

// pop value, leaving original key
Expand All @@ -765,14 +748,14 @@ lua_create_consumer(struct lua_State *L) {
rd_kafka_t *rd_consumer;
if (!(rd_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, rd_config, errstr, sizeof(errstr)))) {
lua_pushnil(L);
int fail = safe_pushstring(L, errstr);
return fail ? lua_push_error(L): 2;
lua_pushstring(L, errstr);
return 2;
}

if (rd_kafka_brokers_add(rd_consumer, brokers) == 0) {
lua_pushnil(L);
int fail = safe_pushstring(L, "No valid brokers specified");
return fail ? lua_push_error(L): 2;
lua_pushliteral(L, "No valid brokers specified");
return 2;
}

rd_kafka_poll_set_consumer(rd_consumer);
Expand Down
Loading

0 comments on commit c52df71

Please sign in to comment.