diff --git a/kafka/callbacks.c b/kafka/callbacks.c index 2570825..7777f08 100644 --- a/kafka/callbacks.c +++ b/kafka/callbacks.c @@ -301,17 +301,11 @@ rebalance_callback(rd_kafka_t *consumer, rd_kafka_resp_err_t err, rd_kafka_topic */ event_queues_t *new_event_queues() { - event_queues_t *event_queues = malloc(sizeof(event_queues_t)); - event_queues->error_queue = NULL; + event_queues_t *event_queues = calloc(1, sizeof(event_queues_t)); event_queues->error_cb_ref = LUA_REFNIL; - event_queues->log_queue = NULL; event_queues->log_cb_ref = LUA_REFNIL; - event_queues->stats_queue = NULL; event_queues->stats_cb_ref = LUA_REFNIL; - event_queues->delivery_queue = NULL; event_queues->rebalance_cb_ref = LUA_REFNIL; - event_queues->rebalance_queue = NULL; - event_queues->consume_queue = NULL; return event_queues; } diff --git a/kafka/common.c b/kafka/common.c index ce6b39f..db011ec 100644 --- a/kafka/common.c +++ b/kafka/common.c @@ -5,20 +5,6 @@ const char* const consumer_label = "__tnt_kafka_consumer"; const char* const consumer_msg_label = "__tnt_kafka_consumer_msg"; const char* const producer_label = "__tnt_kafka_producer"; -int -save_pushstring_wrapped(struct lua_State *L) { - char *str = (char *)lua_topointer(L, 1); - lua_pushstring(L, str); - return 1; -} - -int -safe_pushstring(struct lua_State *L, char *str) { - lua_pushcfunction(L, save_pushstring_wrapped); - lua_pushlightuserdata(L, str); - return lua_pcall(L, 1, 1, 0); -} - /** * Push native lua error with code -3 */ diff --git a/kafka/common.h b/kafka/common.h index 6d6518a..272a4d9 100644 --- a/kafka/common.h +++ b/kafka/common.h @@ -23,8 +23,6 @@ const char* const producer_label; int save_pushstring_wrapped(struct lua_State *L); -int safe_pushstring(struct lua_State *L, char *str); - int lua_librdkafka_version(struct lua_State *L); int lua_librdkafka_dump_conf(struct lua_State *L, rd_kafka_t *rk); diff --git a/kafka/consumer.c b/kafka/consumer.c index 456a963..95729e4 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -106,13 +106,15 @@ stop_poller(va_list args) { } void -destroy_consumer_poller(consumer_poller_t *poller) { +stop_consumer_poller(consumer_poller_t *poller) { // stopping polling thread coio_call(stop_poller, poller); +} +void +destroy_consumer_poller(consumer_poller_t *poller) { pthread_attr_destroy(&poller->attr); pthread_mutex_destroy(&poller->lock); - free(poller); } @@ -155,11 +157,8 @@ lua_consumer_subscribe(struct lua_State *L) { rd_kafka_resp_err_t err = rd_kafka_subscribe(consumer->rd_consumer, consumer->topics); if (err) { - const char *const_err_str = rd_kafka_err2str(err); - char err_str[512]; - strncpy(err_str, const_err_str, sizeof(err_str) - 1); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, rd_kafka_err2str(err)); + return 1; } return 0; @@ -193,20 +192,14 @@ lua_consumer_unsubscribe(struct lua_State *L) { if (consumer->topics->cnt > 0) { rd_kafka_resp_err_t err = rd_kafka_subscribe(consumer->rd_consumer, consumer->topics); if (err) { - const char *const_err_str = rd_kafka_err2str(err); - char err_str[512]; - strncpy(err_str, const_err_str, sizeof(err_str) - 1); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, rd_kafka_err2str(err)); + return 1; } } else { rd_kafka_resp_err_t err = rd_kafka_unsubscribe(consumer->rd_consumer); if (err) { - const char *const_err_str = rd_kafka_err2str(err); - char err_str[512]; - strncpy(err_str, const_err_str, sizeof(err_str) - 1); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, rd_kafka_err2str(err)); + return 1; } } @@ -257,10 +250,7 @@ lua_consumer_poll_logs(struct lua_State *L) { consumer_t *consumer = lua_check_consumer(L, 1); if (consumer->event_queues == NULL || consumer->event_queues->log_queue == NULL || consumer->event_queues->log_cb_ref == LUA_REFNIL) { lua_pushnumber(L, 0); - int fail = safe_pushstring(L, "Consumer poll logs error: callback for logs is not set"); - if (fail) { - return lua_push_error(L); - } + lua_pushliteral(L, "Consumer poll logs error: callback for logs is not set"); return 2; } @@ -291,10 +281,7 @@ lua_consumer_poll_logs(struct lua_State *L) { } lua_pushnumber(L, (double)count); if (err_str != NULL) { - int fail = safe_pushstring(L, err_str); - if (fail) { - return lua_push_error(L); - } + lua_pushstring(L, err_str); } else { lua_pushnil(L); } @@ -351,10 +338,7 @@ lua_consumer_poll_errors(struct lua_State *L) { consumer_t *consumer = lua_check_consumer(L, 1); if (consumer->event_queues == NULL || consumer->event_queues->error_queue == NULL || consumer->event_queues->error_cb_ref == LUA_REFNIL) { lua_pushnumber(L, 0); - int fail = safe_pushstring(L, "Consumer poll errors error: callback for logs is not set"); - if (fail) { - return lua_push_error(L); - } + lua_pushliteral(L, "Consumer poll errors error: callback for logs is not set"); return 2; } @@ -383,10 +367,7 @@ lua_consumer_poll_errors(struct lua_State *L) { } lua_pushnumber(L, (double)count); if (err_str != NULL) { - int fail = safe_pushstring(L, err_str); - if (fail) { - return lua_push_error(L); - } + lua_pushstring(L, err_str); } else { lua_pushnil(L); } @@ -481,10 +462,7 @@ lua_consumer_poll_rebalances(struct lua_State *L) { consumer_t *consumer = lua_check_consumer(L, 1); if (consumer->event_queues == NULL || consumer->event_queues->rebalance_queue == NULL || consumer->event_queues->rebalance_cb_ref == LUA_REFNIL) { lua_pushnumber(L, 0); - int fail = safe_pushstring(L, "Consumer poll rebalances error: callback for rebalance is not set"); - if (fail) { - return lua_push_error(L); - } + lua_pushliteral(L, "Consumer poll rebalances error: callback for rebalance is not set"); return 2; } @@ -527,10 +505,7 @@ lua_consumer_poll_rebalances(struct lua_State *L) { lua_pushnumber(L, (double)count); if (err_str != NULL) { - int fail = safe_pushstring(L, err_str); - if (fail) { - return lua_push_error(L); - } + lua_pushstring(L, err_str); } else { lua_pushnil(L); } @@ -545,11 +520,8 @@ lua_consumer_store_offset(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 2); rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->topic, msg->partition, msg->offset); if (err) { - const char *const_err_str = rd_kafka_err2str(err); - char err_str[512]; - strncpy(err_str, const_err_str, sizeof(err_str) - 1); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, rd_kafka_err2str(err)); + return 1; } return 0; } @@ -586,8 +558,14 @@ wait_consumer_destroy(va_list args) { void consumer_destroy(struct lua_State *L, consumer_t *consumer) { - if (consumer->topics != NULL) + if (consumer->rd_consumer != NULL) { + stop_consumer_poller(consumer->poller); + } + + if (consumer->topics != NULL) { rd_kafka_topic_partition_list_destroy(consumer->topics); + consumer->topics = NULL; + } /* * Here we close consumer and only then destroys other stuff. @@ -599,13 +577,18 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) { /* Destroy handle */ // FIXME: kafka_destroy hangs forever coio_call(wait_consumer_destroy, consumer->rd_consumer); + consumer->rd_consumer = NULL; } - if (consumer->poller != NULL) + if (consumer->poller != NULL) { destroy_consumer_poller(consumer->poller); + consumer->poller = NULL; + } - if (consumer->event_queues != NULL) + if (consumer->event_queues != NULL) { destroy_event_queues(L, consumer->event_queues); + consumer->event_queues = NULL; + } free(consumer); } @@ -650,8 +633,8 @@ lua_create_consumer(struct lua_State *L) { lua_pop(L, 1); if (brokers == NULL) { lua_pushnil(L); - int fail = safe_pushstring(L, "consumer config table must have non nil key 'brokers' which contains string"); - return fail ? lua_push_error(L): 2; + lua_pushstring(L, "consumer config table must have non nil key 'brokers' which contains string"); + return 2; } char errstr[512]; @@ -667,16 +650,16 @@ lua_create_consumer(struct lua_State *L) { // stack now contains: -1 => value; -2 => key; -3 => table if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); - int fail = safe_pushstring(L, "consumer config default topic options must contains only string keys and string values"); - return fail ? lua_push_error(L): 2; + lua_pushliteral(L, "consumer config default topic options must contains only string keys and string values"); + return 2; } const char *value = lua_tostring(L, -1); const char *key = lua_tostring(L, -2); if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); - int fail = safe_pushstring(L, errstr); - return fail ? lua_push_error(L): 2; + lua_pushstring(L, errstr); + return 2; } // pop value, leaving original key @@ -742,16 +725,16 @@ lua_create_consumer(struct lua_State *L) { // stack now contains: -1 => value; -2 => key; -3 => table if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); - int fail = safe_pushstring(L, "consumer config options must contains only string keys and string values"); - return fail ? lua_push_error(L): 2; + lua_pushliteral(L, "consumer config options must contains only string keys and string values"); + return 2; } const char *value = lua_tostring(L, -1); const char *key = lua_tostring(L, -2); if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); - int fail = safe_pushstring(L, errstr); - return fail ? lua_push_error(L): 2; + lua_pushstring(L, errstr); + return 2; } // pop value, leaving original key @@ -765,14 +748,14 @@ lua_create_consumer(struct lua_State *L) { rd_kafka_t *rd_consumer; if (!(rd_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, rd_config, errstr, sizeof(errstr)))) { lua_pushnil(L); - int fail = safe_pushstring(L, errstr); - return fail ? lua_push_error(L): 2; + lua_pushstring(L, errstr); + return 2; } if (rd_kafka_brokers_add(rd_consumer, brokers) == 0) { lua_pushnil(L); - int fail = safe_pushstring(L, "No valid brokers specified"); - return fail ? lua_push_error(L): 2; + lua_pushliteral(L, "No valid brokers specified"); + return 2; } rd_kafka_poll_set_consumer(rd_consumer); diff --git a/kafka/producer.c b/kafka/producer.c index cedcc54..87b7ed8 100644 --- a/kafka/producer.c +++ b/kafka/producer.c @@ -204,10 +204,7 @@ lua_producer_msg_delivery_poll(struct lua_State *L) { lua_pushnumber(L, (double)callbacks_count); if (err_str != NULL) { - int fail = safe_pushstring(L, err_str); - if (fail) { - return lua_push_error(L); - } + lua_pushstring(L, err_str); } else { lua_pushnil(L); } @@ -222,10 +219,7 @@ lua_producer_poll_logs(struct lua_State *L) { producer_t *producer = lua_check_producer(L, 1); if (producer->event_queues == NULL || producer->event_queues->log_queue == NULL || producer->event_queues->log_cb_ref == LUA_REFNIL) { lua_pushnumber(L, 0); - int fail = safe_pushstring(L, "Producer poll logs error: callback for logs is not set"); - if (fail) { - return lua_push_error(L); - } + lua_pushliteral(L, "Producer poll logs error: callback for logs is not set"); return 2; } @@ -256,10 +250,7 @@ lua_producer_poll_logs(struct lua_State *L) { } lua_pushnumber(L, (double)count); if (err_str != NULL) { - int fail = safe_pushstring(L, err_str); - if (fail) { - return lua_push_error(L); - } + lua_pushstring(L, err_str); } else { lua_pushnil(L); } @@ -316,10 +307,7 @@ lua_producer_poll_errors(struct lua_State *L) { producer_t *producer = lua_check_producer(L, 1); if (producer->event_queues == NULL || producer->event_queues->error_queue == NULL || producer->event_queues->error_cb_ref == LUA_REFNIL) { lua_pushnumber(L, 0); - int fail = safe_pushstring(L, "Producer poll errors error: callback for logs is not set"); - if (fail) { - return lua_push_error(L); - } + lua_pushliteral(L, "Producer poll errors error: callback for logs is not set"); return 2; } @@ -348,10 +336,7 @@ lua_producer_poll_errors(struct lua_State *L) { } lua_pushnumber(L, (double)count); if (err_str != NULL) { - int fail = safe_pushstring(L, err_str); - if (fail) { - return lua_push_error(L); - } + lua_pushstring(L, err_str); } else { lua_pushnil(L); } @@ -368,8 +353,8 @@ lua_producer_produce(struct lua_State *L) { const char *topic = lua_tostring(L, -1); lua_pop(L, 1); if (topic == NULL) { - int fail = safe_pushstring(L, "producer message must contains non nil 'topic' key"); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, "producer message must contains non nil 'topic' key"); + return 1; } lua_pushstring(L, "key"); @@ -389,8 +374,8 @@ lua_producer_produce(struct lua_State *L) { lua_pop(L, 1); if (key == NULL && value == NULL) { - int fail = safe_pushstring(L, "producer message must contains non nil key or value"); - return fail ? lua_push_error(L): 1; + lua_pushliteral(L, "producer message must contains non nil key or value"); + return 1; } // create delivery callback queue if got msg id @@ -400,8 +385,8 @@ lua_producer_produce(struct lua_State *L) { if (lua_isfunction(L, -1)) { dr_msg = new_dr_msg(luaL_ref(L, LUA_REGISTRYINDEX), RD_KAFKA_RESP_ERR_NO_ERROR); if (dr_msg == NULL) { - int fail = safe_pushstring(L, "failed to create callback message"); - return fail ? lua_push_error(L): 1; + lua_pushliteral(L, "failed to create callback message"); + return 1; } } else { lua_pop(L, 1); @@ -415,24 +400,18 @@ lua_producer_produce(struct lua_State *L) { if (rd_topic == NULL) { rd_topic = rd_kafka_topic_new(producer->rd_producer, topic, NULL); if (rd_topic == NULL) { - const char *const_err_str = rd_kafka_err2str(rd_kafka_errno2err(errno)); - char err_str[512]; - strncpy(err_str, const_err_str, sizeof(err_str) - 1); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, rd_kafka_err2str(rd_kafka_errno2err(errno))); + return 1; } if (add_producer_topics(producer->topics, rd_topic) != 0) { - int fail = safe_pushstring(L, "Unexpected error: failed to add new topic to topic list!"); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, "Unexpected error: failed to add new topic to topic list!"); + return 1; } } if (rd_kafka_produce(rd_topic, -1, RD_KAFKA_MSG_F_COPY, value, value_len, key, key_len, dr_msg) == -1) { - const char *const_err_str = rd_kafka_err2str(rd_kafka_errno2err(errno)); - char err_str[512]; - strncpy(err_str, const_err_str, sizeof(err_str) - 1); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, rd_kafka_err2str(rd_kafka_errno2err(errno))); + return 1; } return 0; } @@ -459,8 +438,10 @@ wait_producer_destroy(va_list args) { void destroy_producer(struct lua_State *L, producer_t *producer) { - if (producer->topics != NULL) + if (producer->topics != NULL) { destroy_producer_topics(producer->topics); + producer->topics = NULL; + } /* * Here we close producer and only then destroys other stuff. @@ -471,13 +452,18 @@ destroy_producer(struct lua_State *L, producer_t *producer) { if (producer->rd_producer != NULL) { /* Destroy handle */ coio_call(wait_producer_destroy, producer->rd_producer); + producer->rd_producer = NULL; } - if (producer->poller != NULL) + if (producer->poller != NULL) { destroy_producer_poller(producer->poller); + producer->poller = NULL; + } - if (producer->event_queues != NULL) + if (producer->event_queues != NULL) { destroy_event_queues(L, producer->event_queues); + producer->event_queues = NULL; + } free(producer); } @@ -536,8 +522,8 @@ lua_create_producer(struct lua_State *L) { lua_pop(L, 1); if (brokers == NULL) { lua_pushnil(L); - int fail = safe_pushstring(L, "producer config table must have non nil key 'brokers' which contains string"); - return fail ? lua_push_error(L): 2; + lua_pushliteral(L, "producer config table must have non nil key 'brokers' which contains string"); + return 2; } char errstr[512]; @@ -554,16 +540,16 @@ lua_create_producer(struct lua_State *L) { // stack now contains: -1 => value; -2 => key; -3 => table if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); - int fail = safe_pushstring(L, "producer config default topic options must contains only string keys and string values"); - return fail ? lua_push_error(L): 2; + lua_pushliteral(L, "producer config default topic options must contains only string keys and string values"); + return 2; } const char *value = lua_tostring(L, -1); const char *key = lua_tostring(L, -2); if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); - int fail = safe_pushstring(L, errstr); - return fail ? lua_push_error(L): 2; + lua_pushstring(L, errstr); + return 2; } // pop value, leaving original key @@ -620,16 +606,16 @@ lua_create_producer(struct lua_State *L) { // stack now contains: -1 => value; -2 => key; -3 => table if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { lua_pushnil(L); - int fail = safe_pushstring(L, "producer config options must contains only string keys and string values"); - return fail ? lua_push_error(L): 2; + lua_pushliteral(L, "producer config options must contains only string keys and string values"); + return 2; } const char *value = lua_tostring(L, -1); const char *key = lua_tostring(L, -2); if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) { lua_pushnil(L); - int fail = safe_pushstring(L, errstr); - return fail ? lua_push_error(L): 2; + lua_pushstring(L, errstr); + return 2; } // pop value, leaving original key @@ -643,14 +629,14 @@ lua_create_producer(struct lua_State *L) { rd_kafka_t *rd_producer; if (!(rd_producer = rd_kafka_new(RD_KAFKA_PRODUCER, rd_config, errstr, sizeof(errstr)))) { lua_pushnil(L); - int fail = safe_pushstring(L, errstr); - return fail ? lua_push_error(L): 2; + lua_pushstring(L, errstr); + return 2; } if (rd_kafka_brokers_add(rd_producer, brokers) == 0) { lua_pushnil(L); - int fail = safe_pushstring(L, "No valid brokers specified"); - return fail ? lua_push_error(L): 2; + lua_pushliteral(L, "No valid brokers specified"); + return 2; } // creating background thread for polling consumer