Skip to content

Commit

Permalink
Fix deadlock for negative acknowledgment (#266)
Browse files Browse the repository at this point in the history
Fixes #265

### Modifications

Make `timer_` const and `enabledForTesting_` atomic in
`NegativeAcksTracker` so that the `mutex_` can be used only for the
`nackedMessages_` field. After that, we can unlock `mutex_` in
`handleTimer` to avoid the potential deadlock from user-provided logger
or intercepter.

Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix.

(cherry picked from commit 8a9b2dc)
  • Loading branch information
BewareMyPower committed May 6, 2023
1 parent bb5c1fc commit 1dad87b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 23 deletions.
37 changes: 17 additions & 20 deletions lib/NegativeAcksTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con
const ConsumerConfiguration &conf)
: consumer_(consumer),
timerInterval_(0),
executor_(client->getIOExecutorProvider()->get()),
enabledForTesting_(true) {
timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
static const long MIN_NACK_DELAY_MILLIS = 100;

nackDelay_ =
Expand All @@ -47,7 +46,9 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con
}

void NegativeAcksTracker::scheduleTimer() {
timer_ = executor_->createDeadlineTimer();
if (closed_) {
return;
}
timer_->expires_from_now(timerInterval_);
timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1));
}
Expand All @@ -58,8 +59,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
return;
}

std::lock_guard<std::mutex> lock(mutex_);
timer_ = nullptr;
std::unique_lock<std::mutex> lock(mutex_);

if (nackedMessages_.empty() || !enabledForTesting_) {
return;
Expand All @@ -78,6 +78,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
++it;
}
}
lock.unlock();

if (!messagesToRedeliver.empty()) {
consumer_.onNegativeAcksSend(messagesToRedeliver);
Expand All @@ -87,34 +88,30 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
}

void NegativeAcksTracker::add(const MessageId &m) {
std::lock_guard<std::mutex> lock(mutex_);

auto msgId = discardBatch(m);
auto now = Clock::now();

// Erase batch id to group all nacks from same batch
nackedMessages_[discardBatch(m)] = now + nackDelay_;

if (!timer_) {
scheduleTimer();
{
std::lock_guard<std::mutex> lock{mutex_};
// Erase batch id to group all nacks from same batch
nackedMessages_[msgId] = now + nackDelay_;
}

scheduleTimer();
}

void NegativeAcksTracker::close() {
closed_ = true;
boost::system::error_code ec;
timer_->cancel(ec);
std::lock_guard<std::mutex> lock(mutex_);

if (timer_) {
boost::system::error_code ec;
timer_->cancel(ec);
}
timer_ = nullptr;
nackedMessages_.clear();
}

void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
std::lock_guard<std::mutex> lock(mutex_);
enabledForTesting_ = enabled;

if (enabledForTesting_ && !timer_) {
if (enabledForTesting_) {
scheduleTimer();
}
}
Expand Down
7 changes: 4 additions & 3 deletions lib/NegativeAcksTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/MessageId.h>

#include <atomic>
#include <boost/asio/deadline_timer.hpp>
#include <chrono>
#include <map>
Expand Down Expand Up @@ -65,9 +66,9 @@ class NegativeAcksTracker {
typedef typename std::chrono::steady_clock Clock;
std::map<MessageId, Clock::time_point> nackedMessages_;

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
bool enabledForTesting_; // to be able to test deterministically
const DeadlineTimerPtr timer_;
std::atomic_bool closed_{false};
std::atomic_bool enabledForTesting_{true}; // to be able to test deterministically

FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
};
Expand Down
65 changes: 65 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <pulsar/Client.h>

#include <array>
#include <atomic>
#include <chrono>
#include <ctime>
#include <map>
#include <mutex>
#include <set>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -1240,4 +1242,67 @@ TEST(ConsumerTest, testAckNotPersistentTopic) {

INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));

class InterceptorForNegAckDeadlock : public ConsumerInterceptor {
public:
Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; }

void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override {}

void onAcknowledgeCumulative(const Consumer& consumer, Result result,
const MessageId& messageID) override {}

void onNegativeAcksSend(const Consumer& consumer, const std::set<MessageId>& messageIds) override {
duringNegativeAck_ = true;
// Wait for the next time Consumer::negativeAcknowledge is called
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::lock_guard<std::mutex> lock{mutex_};
LOG_INFO("onNegativeAcksSend is called for " << consumer.getTopic());
duringNegativeAck_ = false;
}

static std::mutex mutex_;
static std::atomic_bool duringNegativeAck_;
};

std::mutex InterceptorForNegAckDeadlock::mutex_;
std::atomic_bool InterceptorForNegAckDeadlock::duringNegativeAck_{false};

// For https://github.com/apache/pulsar-client-cpp/issues/265
TEST(ConsumerTest, testNegativeAckDeadlock) {
const std::string topic = "test-negative-ack-deadlock";
Client client{lookupUrl};
ConsumerConfiguration conf;
conf.setNegativeAckRedeliveryDelayMs(500);
conf.intercept({std::make_shared<InterceptorForNegAckDeadlock>()});
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
producer.send(MessageBuilder().setContent("msg").build());

Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg));

auto& duringNegativeAck = InterceptorForNegAckDeadlock::duringNegativeAck_;
duringNegativeAck = false;
consumer.negativeAcknowledge(msg); // schedule the negative ack timer
// Wait until the negative ack timer is triggered and onNegativeAcksSend will be called
for (int i = 0; !duringNegativeAck && i < 100; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
ASSERT_TRUE(duringNegativeAck);

{
std::lock_guard<std::mutex> lock{InterceptorForNegAckDeadlock::mutex_};
consumer.negativeAcknowledge(msg);
}
for (int i = 0; duringNegativeAck && i < 100; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
ASSERT_FALSE(duringNegativeAck);

client.close();
}

} // namespace pulsar

0 comments on commit 1dad87b

Please sign in to comment.