diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp index 989445980..82734e7d0 100644 --- a/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -246,6 +246,7 @@ void ConnectionContext::close() bool ConnectionContext::fetch(boost::shared_ptr ssn, boost::shared_ptr lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) { + QPID_LOG(debug, "Starting fetch credit='" << pn_link_credit(lnk->receiver) << "', capcacity='"<< lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "'"); /** * For fetch() on a receiver with zero capacity, need to reissue the * credit on reconnect, so track the fetches in progress. @@ -259,27 +260,41 @@ bool ConnectionContext::fetch(boost::shared_ptr ssn, boost::shar wakeupDriver(); } } - if (get(ssn, lnk, message, timeout)) { + + if (get(ssn, lnk, message, timeout)) return true; - } else { + + { + sys::Monitor::ScopedLock l(lock); + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + + for (;;) { - sys::Monitor::ScopedLock l(lock); - pn_link_drain(lnk->receiver, 0); - wakeupDriver(); - while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) { - QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + bool draning = pn_link_draining(lnk->receiver); + auto queued = pn_link_queued(lnk->receiver); + + if (draning && queued == 0) + { + QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued='" << queued << "'"); wait(ssn, lnk); + continue; } - if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { - pn_link_flow(lnk->receiver, lnk->capacity); - } + QPID_LOG(debug, "Finished waiting credit='" << pn_link_credit(lnk->receiver) << "', capcacity='"<< lnk->capacity << "', queued='" << queued << "', draining='" << draning << "'"); + break; } - if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { - return true; - } else { - return false; + + if (lnk->capacity) + { + auto drained = pn_link_drained(lnk->receiver); + QPID_LOG(debug, "Before replenishing credit='" << pn_link_credit(lnk->receiver) << "', capcacity='" << lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "' drained='" << drained << "'"); + pn_link_flow(lnk->receiver, drained); + wakeupDriver(); + QPID_LOG(debug, "After replenishing credit='" << pn_link_credit(lnk->receiver) << "', capcacity='" << lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "'"); } } + + return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE); } qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) @@ -295,6 +310,7 @@ qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) bool ConnectionContext::get(boost::shared_ptr ssn, boost::shared_ptr lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) { + bool res = false; qpid::sys::AbsTime until(convert(timeout)); while (true) { sys::Monitor::ScopedLock l(lock); @@ -323,14 +339,17 @@ bool ConnectionContext::get(boost::shared_ptr ssn, boost::shared // Automatically ack messages if we are in a transaction. if (ssn->transaction) acknowledgeLH(ssn, &message, false, l); - return true; + res = true; + break; + } else if (until > qpid::sys::now()) { waitUntil(ssn, lnk, until); } else { - return false; + break; } } - return false; + QPID_LOG(debug, "Get message res='" << (res ? "success" : "no") << "', credit='" << pn_link_credit(lnk->receiver) << "', capcacity='" << lnk->capacity << "', queued='" << pn_link_queued(lnk->receiver) << "'"); + return res; } boost::shared_ptr ConnectionContext::nextReceiver(boost::shared_ptr ssn, qpid::messaging::Duration timeout)