From 7034d2a8ecb354c7aff06524229d8da25954cda0 Mon Sep 17 00:00:00 2001 From: Ihor Ivlev Date: Tue, 12 Nov 2024 05:05:17 +0100 Subject: [PATCH] Implement BufferedPacketQueue (#14) --- .../src/main/cpp/BufferedPacketQueue.h | 166 ++++++++++++++++++ app/videonative/src/main/cpp/VideoPlayer.cpp | 24 ++- app/videonative/src/main/cpp/VideoPlayer.h | 3 + 3 files changed, 185 insertions(+), 8 deletions(-) create mode 100644 app/videonative/src/main/cpp/BufferedPacketQueue.h diff --git a/app/videonative/src/main/cpp/BufferedPacketQueue.h b/app/videonative/src/main/cpp/BufferedPacketQueue.h new file mode 100644 index 0000000..89e54ea --- /dev/null +++ b/app/videonative/src/main/cpp/BufferedPacketQueue.h @@ -0,0 +1,166 @@ +#pragma once +#include +#define BUFFERED_QUEUE_LOG_TAG "BufferedPacketQueue" + +/** + * @class BufferedPacketQueue + * @brief A queue for managing and processing network packets in sequence order. + * This class leverages a buffer to handle out-of-order packets and ensures + * packets are processed sequentially. + */ +class BufferedPacketQueue { + // Using std::map to keep packets sorted by sequence number + template class PacketBuffer { + public: + std::map> packets; + SeqType lastPacketIdx; + static constexpr std::size_t MAX_BUFFER_SIZE = 20; + bool firstPacket; + + // Constructor with logging + PacketBuffer() : lastPacketIdx(0), firstPacket(true) { + __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, "PacketBuffer initialized."); + } + + /** + * @brief Compares two sequence numbers considering wrap-around. + * @param a First sequence number. + * @param b Second sequence number. + * @return True if sequence a is less than b, accounting for wrap-around. + */ + bool seqLessThan(SeqType a, SeqType b) const { + // Calculate the midpoint based on the sequence number width + const SeqType midpoint = (std::numeric_limits::max() / 2) + 1; + bool result = + ((b > a) && (b - a < midpoint)) || ((a > b) && (a - b > midpoint)); + // __android_log_print(ANDROID_LOG_VERBOSE, BUFFERED_QUEUE_LOG_TAG, "Comparing + // sequences: a=%u, b=%u, a < b? %s", + // a, b, result ? "true" : "false"); + return result; + } + + /** + * @brief Processes an incoming packet based on its sequence index. + * @param idx Sequence index of the packet. + * @param data Pointer to packet data. + * @param data_length Size of packet data. + * @param callback Callable to handle processed packets. + */ + template + void processPacket(SeqType idx, const uint8_t *data, + std::size_t data_length, Callback &callback) { + // __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, "Processing + // packet with Sequence=%u", idx); + + if (firstPacket) { + // Initialize lastPacketIdx to one before the first packet + lastPacketIdx = idx - 1; + firstPacket = false; + __android_log_print( + ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, + "First packet received. Initialized lastPacketIdx to %u", + lastPacketIdx); + } + + if (idx == lastPacketIdx + 1) { + // Packet is the next expected one + // __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, "In-order + // packet detected. Processing immediately."); + callback(data, data_length); + lastPacketIdx = idx; + // __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, "Updated + // lastPacketIdx to %u", lastPacketIdx); + + // Now check if the buffer has the next packets + while (true) { + auto it = packets.find(lastPacketIdx + 1); + if (it != packets.end()) { + __android_log_print( + ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, + "Found buffered packet with Sequence=%u. Processing.", + it->first); + callback(it->second.data(), it->second.size()); + lastPacketIdx = it->first; + __android_log_print( + ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, + "Updated lastPacketIdx to %u after processing buffered packet.", + lastPacketIdx); + packets.erase(it); + } else { + //__android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, "No buffered packet + // found for Sequence=%u.", lastPacketIdx + 1); + break; + } + } + } else if (seqLessThan(lastPacketIdx, idx)) { + // Out-of-order packet + __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, + "Out-of-order packet detected. Sequence=%u", idx); + // Avoid duplicate packets + if (packets.find(idx) == packets.end()) { + // Buffer the packet + packets[idx] = std::vector(data, data + data_length); + __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, + "Buffered out-of-order packet. Buffer size: %zu", + packets.size()); + + // If buffer size exceeds MAX_BUFFER_SIZE, process all buffered + // packets + if (packets.size() >= MAX_BUFFER_SIZE) { + __android_log_print(ANDROID_LOG_WARN, BUFFERED_QUEUE_LOG_TAG, + "Buffer size exceeded MAX_BUFFER_SIZE (%zu). " + "Processing all buffered packets.", + MAX_BUFFER_SIZE); + // Process buffered packets in order + auto it_buffer = packets.begin(); + while (it_buffer != packets.end()) { + __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, + "Processing buffered packet with Sequence=%u", + it_buffer->first); + callback(it_buffer->second.data(), it_buffer->second.size()); + lastPacketIdx = it_buffer->first; + it_buffer = packets.erase(it_buffer); + __android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, + "Updated lastPacketIdx to %u after " + "processing buffered packet.", + lastPacketIdx); + } + } + } else { + __android_log_print( + ANDROID_LOG_WARN, BUFFERED_QUEUE_LOG_TAG, + "Duplicate packet received with Sequence=%u. Ignoring.", idx); + } + } else { + // Packet is older than lastPacketIdx, possibly a retransmission or + // duplicate + __android_log_print( + ANDROID_LOG_WARN, BUFFERED_QUEUE_LOG_TAG, + "Received old or duplicate packet with Sequence=%u. Ignoring.", + idx); + // Optionally, handle retransmissions or request retransmission here + } + } + }; + + // Decide whether to use uint8_t or uint16_t based on sequence number type + using SeqType = uint16_t; // Change to uint8_t if sequence numbers are 8-bit + + PacketBuffer buffer; + +public: + /** + * @brief Process a packet through the queue. + * @param idx Sequence index of the packet. + * @param data Pointer to packet data. + * @param data_length Size of the packet data. + * @param callback Callable to handle processed packets. + */ + template + void processPacket(SeqType idx, const uint8_t *data, std::size_t data_length, + Callback &callback) { + //__android_log_print(ANDROID_LOG_DEBUG, BUFFERED_QUEUE_LOG_TAG, "BufferedPacketQueue: + // Processing packet with Sequence=%u", idx); + buffer.processPacket(idx, data, data_length, callback); + } +}; diff --git a/app/videonative/src/main/cpp/VideoPlayer.cpp b/app/videonative/src/main/cpp/VideoPlayer.cpp index 57228cd..a60921f 100644 --- a/app/videonative/src/main/cpp/VideoPlayer.cpp +++ b/app/videonative/src/main/cpp/VideoPlayer.cpp @@ -93,15 +93,23 @@ void VideoPlayer::processQueue() { } //Not yet parsed bit stream (e.g. raw h264 or rtp data) -void VideoPlayer::onNewRTPData(const uint8_t *data, const std::size_t data_length) { +void VideoPlayer::onNewRTPData(const uint8_t* data, const std::size_t data_length) { + // Parse the RTP packet const RTP::RTPPacket rtpPacket(data, data_length); - if (rtpPacket.header.payload == RTP_PAYLOAD_TYPE_AUDIO) { - audioDecoder.enqueueAudio(data, data_length); - } - else - { - mParser.parse_rtp_stream(data, data_length); - } + uint16_t idx = rtpPacket.header.getSequence(); + + // Define the callback based on payload type + auto callback = [&](const uint8_t* packet_data, std::size_t packet_length) { + if (rtpPacket.header.payload == RTP_PAYLOAD_TYPE_AUDIO) { + audioDecoder.enqueueAudio(packet_data, packet_length); + } + else { + mParser.parse_rtp_stream(packet_data, packet_length); + } + }; + + // Process the packet using the queue + mBufferedPacketQueue.processPacket(idx, data, data_length, callback); } void VideoPlayer::onNewNALU(const NALU &nalu) { diff --git a/app/videonative/src/main/cpp/VideoPlayer.h b/app/videonative/src/main/cpp/VideoPlayer.h index aac5a80..b8f466c 100644 --- a/app/videonative/src/main/cpp/VideoPlayer.h +++ b/app/videonative/src/main/cpp/VideoPlayer.h @@ -7,6 +7,7 @@ #include #include "VideoDecoder.h" #include "AudioDecoder.h" +#include "BufferedPacketQueue.h" #include "UdpReceiver.h" #include "parser/H26XParser.h" #include "minimp4.h" @@ -65,6 +66,8 @@ class VideoPlayer { const std::string GROUND_RECORDING_DIRECTORY; JavaVM *javaVm = nullptr; H26XParser mParser; + BufferedPacketQueue mBufferedPacketQueue; + // DVR attributes int dvr_fd;