diff --git a/src/client/WFRedisSubscriber.h b/src/client/WFRedisSubscriber.h index f17d09a354..c16ca60837 100644 --- a/src/client/WFRedisSubscriber.h +++ b/src/client/WFRedisSubscriber.h @@ -93,6 +93,11 @@ class WFRedisSubscribeTask : public WFGenericTask return this->sync_send("PING", { }); } + int quit() + { + return this->sync_send("QUIT", { }); + } + public: /* All 'timeout' proxy functions can only be called only before the task is started or in 'extract'. */ diff --git a/src/factory/RedisTaskImpl.cc b/src/factory/RedisTaskImpl.cc index 5004a1b8e5..b32d339fdf 100644 --- a/src/factory/RedisTaskImpl.cc +++ b/src/factory/RedisTaskImpl.cc @@ -72,6 +72,7 @@ bool ComplexRedisTask::check_request() if (this->req.get_command(command) && (strcasecmp(command.c_str(), "AUTH") == 0 || strcasecmp(command.c_str(), "SELECT") == 0 || + strcasecmp(command.c_str(), "RESET") == 0 || strcasecmp(command.c_str(), "ASKING") == 0)) { this->state = WFT_STATE_TASK_ERROR; @@ -368,26 +369,17 @@ ComplexRedisSubscribeTask::SubscribeWrapper::next_in(ProtocolMessage *message) { redis_reply_t *reply = ((RedisResponse *)message)->result_ptr(); - if (reply->type == REDIS_REPLY_TYPE_ARRAY && reply->elements == 3 && - reply->element[0]->type == REDIS_REPLY_TYPE_STRING) + if (reply->type != REDIS_REPLY_TYPE_ARRAY) { - const char *str = reply->element[0]->str; - size_t len = reply->element[0]->len; - - if ((len == 11 && strncasecmp(str, "unsubscribe", 11)) == 0 || - (len == 12 && strncasecmp(str, "punsubscribe", 12) == 0)) - { - if (reply->element[2]->type == REDIS_REPLY_TYPE_INTEGER && - reply->element[2]->integer == 0) - { - task_->finished_ = true; - } - } + task_->finished_ = true; + return NULL; } - else if (!task_->watching_) + + if (reply->elements == 3 && + reply->element[2]->type == REDIS_REPLY_TYPE_INTEGER && + reply->element[2]->integer == 0) { task_->finished_ = true; - return NULL; } task_->watching_ = true;