diff --git a/CHANGELOG.md b/CHANGELOG.md index 44ff9e6990..213dc349ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### NEXT +- Make transport-cc feedback work similarly to libwebrtc ([PR #1088](https://github.com/versatica/mediasoup/pull/1088) by @penguinol). - `TransportListenInfo`: announced ip can also be a hostname ([PR #1322](https://github.com/versatica/mediasoup/pull/1322)). ### 3.13.17 diff --git a/worker/include/RTC/RTCP/FeedbackRtpTransport.hpp b/worker/include/RTC/RTCP/FeedbackRtpTransport.hpp index cfbbf0c276..901df0c46c 100644 --- a/worker/include/RTC/RTCP/FeedbackRtpTransport.hpp +++ b/worker/include/RTC/RTCP/FeedbackRtpTransport.hpp @@ -212,6 +212,7 @@ namespace RTC ~FeedbackRtpTransportPacket() override; public: + void SetBase(uint16_t sequenceNumber, uint64_t timestamp); AddPacketResult AddPacket(uint16_t sequenceNumber, uint64_t timestamp, size_t maxRtcpPacketLen); // Just for locally generated packets. void Finish(); @@ -317,6 +318,8 @@ namespace RTC void AddPendingChunks(); private: + // Whether baseSequenceNumber has been set. + bool baseSet{ false }; uint16_t baseSequenceNumber{ 0u }; // 24 bits signed integer. int32_t referenceTime{ 0 }; diff --git a/worker/include/RTC/TransportCongestionControlServer.hpp b/worker/include/RTC/TransportCongestionControlServer.hpp index d25483c90e..5cfd676796 100644 --- a/worker/include/RTC/TransportCongestionControlServer.hpp +++ b/worker/include/RTC/TransportCongestionControlServer.hpp @@ -6,6 +6,7 @@ #include "RTC/RTCP/FeedbackRtpTransport.hpp" #include "RTC/RTCP/Packet.hpp" #include "RTC/RtpPacket.hpp" +#include "RTC/SeqManager.hpp" #include "handles/TimerHandle.hpp" #include #include @@ -54,10 +55,12 @@ namespace RTC double GetPacketLoss() const; void IncomingPacket(uint64_t nowMs, const RTC::RtpPacket* packet); void SetMaxIncomingBitrate(uint32_t bitrate); + void FillAndSendTransportCcFeedback(); private: void SendTransportCcFeedback(); - void MaySendLimitationRembFeedback(); + void MayDropOldPacketArrivalTimes(uint16_t seqNum, uint64_t nowMs); + void MaySendLimitationRembFeedback(uint64_t nowMs); void UpdatePacketLoss(double packetLoss); /* Pure virtual methods inherited from webrtc::RemoteBitrateEstimator::Listener. */ @@ -89,6 +92,10 @@ namespace RTC uint8_t unlimitedRembCounter{ 0u }; std::deque packetLossHistory; double packetLoss{ 0 }; + // Whether any packet with transport wide sequence number was received. + bool transportWideSeqNumberReceived{ false }; + uint16_t transportCcFeedbackWideSeqNumStart{ 0u }; + std::map::SeqLowerThan> mapPacketArrivalTimes; }; } // namespace RTC diff --git a/worker/meson.build b/worker/meson.build index e2c294ce9f..77125be6e0 100644 --- a/worker/meson.build +++ b/worker/meson.build @@ -335,6 +335,7 @@ test_sources = [ 'test/src/RTC/TestSeqManager.cpp', 'test/src/RTC/TestTrendCalculator.cpp', 'test/src/RTC/TestRtpEncodingParameters.cpp', + 'test/src/RTC/TestTransportCongestionControlServer.cpp', 'test/src/RTC/Codecs/TestVP8.cpp', 'test/src/RTC/Codecs/TestVP9.cpp', 'test/src/RTC/Codecs/TestH264.cpp', diff --git a/worker/src/RTC/RTCP/FeedbackRtpTransport.cpp b/worker/src/RTC/RTCP/FeedbackRtpTransport.cpp index 0ec398cf12..21f09c9e8f 100644 --- a/worker/src/RTC/RTCP/FeedbackRtpTransport.cpp +++ b/worker/src/RTC/RTCP/FeedbackRtpTransport.cpp @@ -278,24 +278,25 @@ namespace RTC return offset; } + void FeedbackRtpTransportPacket::SetBase(uint16_t sequenceNumber, uint64_t timestamp) + { + MS_TRACE(); + + this->baseSet = true; + this->baseSequenceNumber = sequenceNumber; + this->referenceTime = static_cast((timestamp & 0x1FFFFFC0) / 64); + this->latestSequenceNumber = sequenceNumber - 1; + this->latestTimestamp = (timestamp >> 6) * 64; // IMPORTANT: Loose precision. + } + FeedbackRtpTransportPacket::AddPacketResult FeedbackRtpTransportPacket::AddPacket( uint16_t sequenceNumber, uint64_t timestamp, size_t maxRtcpPacketLen) { MS_TRACE(); + MS_ASSERT(baseSet, "base not set"); MS_ASSERT(!IsFull(), "packet is full"); - // Let's see if we must set our base. - if (this->latestTimestamp == 0u) - { - this->baseSequenceNumber = sequenceNumber + 1; - this->referenceTime = static_cast((timestamp & 0x1FFFFFC0) / 64); - this->latestSequenceNumber = sequenceNumber; - this->latestTimestamp = (timestamp >> 6) * 64; // IMPORTANT: Loose precision. - - return AddPacketResult::SUCCESS; - } - // If the wide sequence number of the new packet is lower than the latest seen, // ignore it. // NOTE: Not very spec compliant but libwebrtc does it. diff --git a/worker/src/RTC/TransportCongestionControlServer.cpp b/worker/src/RTC/TransportCongestionControlServer.cpp index 98ed26a575..6fe639e175 100644 --- a/worker/src/RTC/TransportCongestionControlServer.cpp +++ b/worker/src/RTC/TransportCongestionControlServer.cpp @@ -14,6 +14,7 @@ namespace RTC static constexpr uint64_t TransportCcFeedbackSendInterval{ 100u }; // In ms. static constexpr uint64_t LimitationRembInterval{ 1500u }; // In ms. + static constexpr uint64_t PacketArrivalTimestampWindow{ 500u }; // In ms. static constexpr uint8_t UnlimitedRembNumPackets{ 4u }; static constexpr size_t PacketLossHistogramLength{ 24 }; @@ -124,61 +125,32 @@ namespace RTC break; } - // Update the RTCP media SSRC of the ongoing Transport-CC Feedback packet. - this->transportCcFeedbackSenderSsrc = 0u; - this->transportCcFeedbackMediaSsrc = packet->GetSsrc(); - - this->transportCcFeedbackPacket->SetSenderSsrc(0u); - this->transportCcFeedbackPacket->SetMediaSsrc(this->transportCcFeedbackMediaSsrc); - - // Provide the feedback packet with the RTP packet info. If it fails, - // send current feedback and add the packet info to a new one. - auto result = - this->transportCcFeedbackPacket->AddPacket(wideSeqNumber, nowMs, this->maxRtcpPacketLen); - - switch (result) + // Only insert the packet when receiving it for the first time. + if (!this->mapPacketArrivalTimes.try_emplace(wideSeqNumber, nowMs).second) { - case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::SUCCESS: - { - // If the feedback packet is full, send it now. - if (this->transportCcFeedbackPacket->IsFull()) - { - MS_DEBUG_DEV("transport-cc feedback packet is full, sending feedback now"); - - SendTransportCcFeedback(); - } - - break; - } - - case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::MAX_SIZE_EXCEEDED: - { - // Send ongoing feedback packet and add the new packet info to the - // regenerated one. - SendTransportCcFeedback(); + break; + } - this->transportCcFeedbackPacket->AddPacket(wideSeqNumber, nowMs, this->maxRtcpPacketLen); + // We may receive packets with sequence number lower than the one in previous + // tcc feedback, these packets may have been reported as lost previously, + // therefore we need to reset the start sequence num for the next tcc feedback. + if ( + !this->transportWideSeqNumberReceived || + RTC::SeqManager::IsSeqLowerThan( + wideSeqNumber, this->transportCcFeedbackWideSeqNumStart)) + { + this->transportCcFeedbackWideSeqNumStart = wideSeqNumber; + } - break; - } + this->transportWideSeqNumberReceived = true; - case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::FATAL: - { - // Create a new feedback packet. - this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket( - this->transportCcFeedbackSenderSsrc, this->transportCcFeedbackMediaSsrc)); + MayDropOldPacketArrivalTimes(wideSeqNumber, nowMs); - // Use current packet count. - // NOTE: Do not increment it since the previous ongoing feedback - // packet was not sent. - this->transportCcFeedbackPacket->SetFeedbackPacketCount( - this->transportCcFeedbackPacketCount); - - break; - } - } + // Update the RTCP media SSRC of the ongoing Transport-CC Feedback packet. + this->transportCcFeedbackSenderSsrc = 0u; + this->transportCcFeedbackMediaSsrc = packet->GetSsrc(); - MaySendLimitationRembFeedback(); + MaySendLimitationRembFeedback(nowMs); break; } @@ -204,6 +176,75 @@ namespace RTC } } + void TransportCongestionControlServer::FillAndSendTransportCcFeedback() + { + MS_TRACE(); + + if (!this->transportWideSeqNumberReceived) + { + return; + } + + auto it = this->mapPacketArrivalTimes.lower_bound(this->transportCcFeedbackWideSeqNumStart); + + if (it == this->mapPacketArrivalTimes.end()) + { + return; + } + + // Set base sequence num and reference time. + this->transportCcFeedbackPacket->SetBase(this->transportCcFeedbackWideSeqNumStart, it->second); + + for (; it != this->mapPacketArrivalTimes.end(); ++it) + { + auto result = + this->transportCcFeedbackPacket->AddPacket(it->first, it->second, this->maxRtcpPacketLen); + + switch (result) + { + case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::SUCCESS: + { + // If the feedback packet is full, send it now. + if (this->transportCcFeedbackPacket->IsFull()) + { + MS_DEBUG_DEV("transport-cc feedback packet is full, sending feedback now"); + + SendTransportCcFeedback(); + } + + break; + } + + case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::MAX_SIZE_EXCEEDED: + { + // This should not happen. + MS_WARN_DEV("transport-cc feedback packet is exceeded"); + + // Create a new feedback packet. + this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket( + this->transportCcFeedbackSenderSsrc, this->transportCcFeedbackMediaSsrc)); + } + + case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::FATAL: + { + // Create a new feedback packet. + this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket( + this->transportCcFeedbackSenderSsrc, this->transportCcFeedbackMediaSsrc)); + + // Use current packet count. + // NOTE: Do not increment it since the previous ongoing feedback + // packet was not sent. + this->transportCcFeedbackPacket->SetFeedbackPacketCount( + this->transportCcFeedbackPacketCount); + + break; + } + } + } + + SendTransportCcFeedback(); + } + void TransportCongestionControlServer::SetMaxIncomingBitrate(uint32_t bitrate) { MS_TRACE(); @@ -217,7 +258,9 @@ namespace RTC // This is to ensure that we send N REMB packets with bitrate 0 (unlimited). this->unlimitedRembCounter = UnlimitedRembNumPackets; - MaySendLimitationRembFeedback(); + auto nowMs = DepLibUV::GetTimeMs(); + + MaySendLimitationRembFeedback(nowMs); } } @@ -233,7 +276,6 @@ namespace RTC } auto latestWideSeqNumber = this->transportCcFeedbackPacket->GetLatestSequenceNumber(); - auto latestTimestamp = this->transportCcFeedbackPacket->GetLatestTimestamp(); // Notify the listener. this->listener->OnTransportCongestionControlServerSendRtcpPacket( @@ -262,21 +304,36 @@ namespace RTC // Increment packet count. this->transportCcFeedbackPacket->SetFeedbackPacketCount(++this->transportCcFeedbackPacketCount); + this->transportCcFeedbackWideSeqNumStart = latestWideSeqNumber + 1; + } + + inline void TransportCongestionControlServer::MayDropOldPacketArrivalTimes( + uint16_t seqNum, uint64_t nowMs) + { + MS_TRACE(); - // Pass the latest packet info (if any) as pre base for the new feedback packet. - if (latestTimestamp > 0u) + // Ignore nowMs value if it's smaller than PacketArrivalTimestampWindow in + // order to avoid negative values (should never happen) and return early if + // the condition is met. + if (nowMs >= PacketArrivalTimestampWindow) { - this->transportCcFeedbackPacket->AddPacket( - latestWideSeqNumber, latestTimestamp, this->maxRtcpPacketLen); + auto expiryTimestamp = nowMs - PacketArrivalTimestampWindow; + auto it = this->mapPacketArrivalTimes.begin(); + + while (it != this->mapPacketArrivalTimes.end() && + it->first != this->transportCcFeedbackWideSeqNumStart && + RTC::SeqManager::IsSeqLowerThan(it->first, seqNum) && + it->second <= expiryTimestamp) + { + it = this->mapPacketArrivalTimes.erase(it); + } } } - inline void TransportCongestionControlServer::MaySendLimitationRembFeedback() + inline void TransportCongestionControlServer::MaySendLimitationRembFeedback(uint64_t nowMs) { MS_TRACE(); - auto nowMs = DepLibUV::GetTimeMs(); - // May fix unlimitedRembCounter. if (this->unlimitedRembCounter > 0u && this->maxIncomingBitrate != 0u) { @@ -389,7 +446,7 @@ namespace RTC if (timer == this->transportCcFeedbackSendPeriodicTimer) { - SendTransportCcFeedback(); + FillAndSendTransportCcFeedback(); } } } // namespace RTC diff --git a/worker/test/src/RTC/RTCP/TestFeedbackRtpTransport.cpp b/worker/test/src/RTC/RTCP/TestFeedbackRtpTransport.cpp index e34c99ae8e..0204f81480 100644 --- a/worker/test/src/RTC/RTCP/TestFeedbackRtpTransport.cpp +++ b/worker/test/src/RTC/RTCP/TestFeedbackRtpTransport.cpp @@ -98,7 +98,14 @@ SCENARIO("RTCP Feeback RTP transport", "[parser][rtcp][feedback-rtp][transport]" for (auto& input : inputs) { - packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + if (std::addressof(input) == std::addressof(inputs.front())) + { + packet->SetBase(input.sequenceNumber + 1, input.timestamp); + } + else + { + packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + } } REQUIRE(packet->GetLatestSequenceNumber() == 1013); @@ -173,7 +180,14 @@ SCENARIO("RTCP Feeback RTP transport", "[parser][rtcp][feedback-rtp][transport]" for (auto& input : inputs) { - packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + if (std::addressof(input) == std::addressof(inputs.front())) + { + packet->SetBase(input.sequenceNumber + 1, input.timestamp); + } + else + { + packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + } } packet->Finish(); @@ -238,7 +252,14 @@ SCENARIO("RTCP Feeback RTP transport", "[parser][rtcp][feedback-rtp][transport]" for (auto& input : inputs) { - packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + if (std::addressof(input) == std::addressof(inputs.front())) + { + packet->SetBase(input.sequenceNumber + 1, input.timestamp); + } + else + { + packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + } } packet->Finish(); @@ -296,7 +317,14 @@ SCENARIO("RTCP Feeback RTP transport", "[parser][rtcp][feedback-rtp][transport]" for (auto& input : inputs) { - packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + if (std::addressof(input) == std::addressof(inputs.front())) + { + packet->SetBase(input.sequenceNumber + 1, input.timestamp); + } + else + { + packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + } } packet->Finish(); @@ -363,7 +391,14 @@ SCENARIO("RTCP Feeback RTP transport", "[parser][rtcp][feedback-rtp][transport]" for (auto& input : inputs) { - packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + if (std::addressof(input) == std::addressof(inputs.front())) + { + packet->SetBase(input.sequenceNumber + 1, input.timestamp); + } + else + { + packet->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + } } packet->Finish(); @@ -424,7 +459,14 @@ SCENARIO("RTCP Feeback RTP transport", "[parser][rtcp][feedback-rtp][transport]" for (auto& input : inputs2) { - packet2->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + if (std::addressof(input) == std::addressof(inputs2.front())) + { + packet2->SetBase(input.sequenceNumber + 1, input.timestamp); + } + else + { + packet2->AddPacket(input.sequenceNumber, input.timestamp, input.maxPacketSize); + } } packet2->Finish(); diff --git a/worker/test/src/RTC/TestTransportCongestionControlServer.cpp b/worker/test/src/RTC/TestTransportCongestionControlServer.cpp new file mode 100644 index 0000000000..27512b2b37 --- /dev/null +++ b/worker/test/src/RTC/TestTransportCongestionControlServer.cpp @@ -0,0 +1,254 @@ +#include "common.hpp" +#include "DepLibUV.hpp" +#include "RTC/TransportCongestionControlServer.hpp" +#include + +using namespace RTC; + +struct TestTransportCongestionControlServerInput +{ + uint16_t wideSeqNumber; + uint64_t nowMs; +}; + +struct TestTransportCongestionControlServerResult +{ + uint16_t wideSeqNumber; + bool received; + uint64_t timestamp; +}; + +using TestResults = std::deque>; + +class TestTransportCongestionControlServerListener : public TransportCongestionControlServer::Listener +{ +public: + virtual void OnTransportCongestionControlServerSendRtcpPacket( + RTC::TransportCongestionControlServer* tccServer, RTC::RTCP::Packet* packet) override + { + auto* tccPacket = dynamic_cast(packet); + + if (!tccPacket) + { + return; + } + + auto packetResults = tccPacket->GetPacketResults(); + + REQUIRE(!this->results.empty()); + + auto testResults = this->results.front(); + this->results.pop_front(); + + REQUIRE(testResults.size() == packetResults.size()); + + auto packetResultIt = packetResults.begin(); + auto testResultIt = testResults.begin(); + + for (; packetResultIt != packetResults.end() && testResultIt != testResults.end(); + ++packetResultIt, ++testResultIt) + { + REQUIRE(packetResultIt->sequenceNumber == testResultIt->wideSeqNumber); + REQUIRE(packetResultIt->received == testResultIt->received); + + if (packetResultIt->received) + { + REQUIRE(packetResultIt->receivedAtMs == testResultIt->timestamp); + } + } + } + +public: + void SetResults(TestResults& results) + { + this->results = results; + } + + void Check() + { + REQUIRE(this->results.empty()); + } + +private: + TestResults results; +}; + +// clang-format off +uint8_t buffer[] = +{ + 0x90, 0x01, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05, + 0xbe, 0xde, 0x00, 0x01, // Header Extensions + 0x51, 0x60, 0xee, 0x00 // TCC Feedback +}; +// clang-format on + +void validate(std::vector& inputs, TestResults& results) +{ + TestTransportCongestionControlServerListener listener; + auto tccServer = + TransportCongestionControlServer(&listener, RTC::BweType::TRANSPORT_CC, RTC::MtuSize); + + tccServer.SetMaxIncomingBitrate(150000); + tccServer.TransportConnected(); + + RtpPacket* packet = RtpPacket::Parse(buffer, sizeof(buffer)); + + packet->SetTransportWideCc01ExtensionId(5); + packet->SetSequenceNumber(1); + + // Save results. + listener.SetResults(results); + + uint64_t startTs = inputs[0].nowMs; + uint64_t TransportCcFeedbackSendInterval{ 100u }; // In ms. + + for (auto input : inputs) + { + // Periodic sending TCC packets. + uint64_t diffTs = input.nowMs - startTs; + + if (diffTs >= TransportCcFeedbackSendInterval) + { + tccServer.FillAndSendTransportCcFeedback(); + startTs = input.nowMs; + } + + packet->UpdateTransportWideCc01(input.wideSeqNumber); + tccServer.IncomingPacket(input.nowMs, packet); + } + + tccServer.FillAndSendTransportCcFeedback(); + listener.Check(); +}; + +SCENARIO("TransportCongestionControlServer", "[rtp]") +{ + SECTION("normal time and sequence") + { + // clang-format off + std::vector inputs + { + { 1u, 1000u }, + { 2u, 1050u }, + { 3u, 1100u }, + { 4u, 1150u }, + { 5u, 1200u }, + }; + + TestResults results + { + { + { 1u, true, 1000u }, + { 2u, true, 1050u }, + }, + { + { 3u, true, 1100u }, + { 4u, true, 1150u }, + }, + { + { 5u, true, 1200u }, + }, + }; + // clang-format on + + validate(inputs, results); + } + + SECTION("lost packets") + { + // clang-format off + std::vector inputs + { + { 1u, 1000u }, + { 3u, 1050u }, + { 5u, 1100u }, + { 6u, 1150u }, + }; + + TestResults results + { + { + { 1u, true, 1000u }, + { 2u, false, 0u }, + { 3u, true, 1050u }, + }, + { + { 4u, false, 0u }, + { 5u, true, 1100u }, + { 6u, true, 1150u }, + }, + }; + // clang-format on + + validate(inputs, results); + } + + SECTION("duplicate packets") + { + // clang-format off + std::vector inputs + { + { 1u, 1000u }, + { 1u, 1050u }, + { 2u, 1100u }, + { 3u, 1150u }, + { 3u, 1200u }, + { 4u, 1250u }, + }; + + TestResults results + { + { + { 1u, true, 1000u }, + }, + { + { 2u, true, 1100u }, + { 3u, true, 1150u }, + }, + { + { 4u, true, 1250u }, + }, + }; + // clang-format on + + validate(inputs, results); + } + + SECTION("packets arrive out of order") + { + // clang-format off + std::vector inputs + { + { 1u, 1000u }, + { 2u, 1050u }, + { 4u, 1100u }, + { 5u, 1150u }, + { 3u, 1200u }, // Out of order + { 6u, 1250u }, + }; + + TestResults results + { + { + { 1u, true, 1000u }, + { 2u, true, 1050u }, + }, + { + { 3u, false, 0u }, + { 4u, true, 1100u }, + { 5u, true, 1150u }, + }, + { + { 3u, true, 1200u }, + { 4u, true, 1100u }, + { 5u, true, 1150u }, + { 6u, true, 1250u }, + }, + }; + // clang-format on + + validate(inputs, results); + } +}