Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QPID-8538: Replenish the credit after drain by the drained amount of messages #27

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions src/qpid/messaging/amqp/ConnectionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ void ConnectionContext::close()

bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
QPID_LOG(debug, "Starting fetch credit='" << pn_link_credit(lnk->receiver) << "', capcacity='"<< lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "'");
/**
* For fetch() on a receiver with zero capacity, need to reissue the
* credit on reconnect, so track the fetches in progress.
Expand All @@ -259,27 +260,41 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
wakeupDriver();
}
}
if (get(ssn, lnk, message, timeout)) {

if (get(ssn, lnk, message, timeout))
return true;
} else {

{
sys::Monitor::ScopedLock l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();

for (;;)
{
sys::Monitor::ScopedLock l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
bool draning = pn_link_draining(lnk->receiver);
auto queued = pn_link_queued(lnk->receiver);

if (draning && queued == 0)
{
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued='" << queued << "'");
wait(ssn, lnk);
continue;
}
if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
pn_link_flow(lnk->receiver, lnk->capacity);
}
QPID_LOG(debug, "Finished waiting credit='" << pn_link_credit(lnk->receiver) << "', capcacity='"<< lnk->capacity << "', queued='" << queued << "', draining='" << draning << "'");
break;
}
if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
return true;
} else {
return false;

if (lnk->capacity)
{
auto drained = pn_link_drained(lnk->receiver);
QPID_LOG(debug, "Before replenishing credit='" << pn_link_credit(lnk->receiver) << "', capcacity='" << lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "' drained='" << drained << "'");
pn_link_flow(lnk->receiver, drained);
wakeupDriver();
QPID_LOG(debug, "After replenishing credit='" << pn_link_credit(lnk->receiver) << "', capcacity='" << lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "'");
}
}

return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
}

qpid::sys::AbsTime convert(qpid::messaging::Duration timeout)
Expand All @@ -295,6 +310,7 @@ qpid::sys::AbsTime convert(qpid::messaging::Duration timeout)

bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
bool res = false;
qpid::sys::AbsTime until(convert(timeout));
while (true) {
sys::Monitor::ScopedLock l(lock);
Expand Down Expand Up @@ -323,14 +339,17 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
// Automatically ack messages if we are in a transaction.
if (ssn->transaction)
acknowledgeLH(ssn, &message, false, l);
return true;
res = true;
break;

} else if (until > qpid::sys::now()) {
waitUntil(ssn, lnk, until);
} else {
return false;
break;
}
}
return false;
QPID_LOG(debug, "Get message res='" << (res ? "success" : "no") << "', credit='" << pn_link_credit(lnk->receiver) << "', capcacity='" << lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "'");
return res;
}

boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout)
Expand Down