Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT][WIP] Packet writing optimization #4523

Draft
wants to merge 3 commits into
base: 25.lts.1+
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions net/quic/quic_chromium_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3613,6 +3613,16 @@ bool QuicChromiumClientSession::OnPacket(
return true;
}

void QuicChromiumClientSession::set_force_write_blocked(
bool force_write_blocked) {
quic::QuicConnection* quic_connection = connection();
QuicChromiumPacketWriter* writer = static_cast<QuicChromiumPacketWriter*>(
quic_connection ? quic_connection->writer() : nullptr);
if (writer) {
writer->set_force_write_blocked(force_write_blocked);
}
}

void QuicChromiumClientSession::NotifyFactoryOfSessionGoingAway() {
going_away_ = true;
if (stream_factory_)
Expand Down
1 change: 1 addition & 0 deletions net/quic/quic_chromium_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ class NET_EXPORT_PRIVATE QuicChromiumClientSession
bool OnPacket(const quic::QuicReceivedPacket& packet,
const quic::QuicSocketAddress& local_address,
const quic::QuicSocketAddress& peer_address) override;
void set_force_write_blocked(bool force_write_blocked) override;
void OnStreamClosed(quic::QuicStreamId stream_id) override;

// MultiplexedSession methods:
Expand Down
16 changes: 16 additions & 0 deletions net/quic/quic_chromium_packet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ int QuicChromiumPacketReader::StartReadingMultiplePackets() {
}

bool QuicChromiumPacketReader::ProcessMultiplePacketReadResult(int result) {
bool success = ProcessMultiplePacketReadResultInternal(result);
return success;
}

bool QuicChromiumPacketReader::ProcessMultiplePacketReadResultInternal(
int result) {
quic::QuicChromiumClock::GetInstance()->ZeroApproximateNow();
read_pending_ = false;
if (result <= 0 && net_log_.IsCapturing()) {
Expand All @@ -111,6 +117,10 @@ bool QuicChromiumPacketReader::ProcessMultiplePacketReadResult(int result) {
// buffer, ignore it.
return true;
}

if (!visitor_ || !socket_)
return true;

if (result < 0) {
// Report all other errors to the visitor.
return visitor_->OnReadError(result, socket_);
Expand Down Expand Up @@ -145,8 +155,14 @@ bool QuicChromiumPacketReader::ProcessMultiplePacketReadResult(int result) {
}

void QuicChromiumPacketReader::OnReadMultiplePacketComplete(int result) {
if (visitor_ && result > 0) {
visitor_->set_force_write_blocked(true);
}
if (ProcessMultiplePacketReadResult(result))
StartReadingMultiplePackets();
if (visitor_ && result > 0) {
visitor_->set_force_write_blocked(false);
}
}

#endif
Expand Down
2 changes: 2 additions & 0 deletions net/quic/quic_chromium_packet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class NET_EXPORT_PRIVATE QuicChromiumPacketReader {
virtual bool OnPacket(const quic::QuicReceivedPacket& packet,
const quic::QuicSocketAddress& local_address,
const quic::QuicSocketAddress& peer_address) = 0;
virtual void set_force_write_blocked(bool force_write_blocked);
};

QuicChromiumPacketReader(DatagramClientSocket* socket,
Expand Down Expand Up @@ -69,6 +70,7 @@ class NET_EXPORT_PRIVATE QuicChromiumPacketReader {
void OnReadMultiplePacketComplete(int result);
// Return true if reading should continue.
bool ProcessMultiplePacketReadResult(int result);
bool ProcessMultiplePacketReadResultInternal(int result);
#endif

raw_ptr<DatagramClientSocket, DanglingUntriaged> socket_;
Expand Down
15 changes: 13 additions & 2 deletions net/quic/quic_chromium_packet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,21 @@ QuicChromiumPacketWriter::QuicChromiumPacketWriter(
&QuicChromiumPacketWriter::OnWriteComplete, weak_factory_.GetWeakPtr());
}

QuicChromiumPacketWriter::~QuicChromiumPacketWriter() = default;
QuicChromiumPacketWriter::~QuicChromiumPacketWriter() {
while (force_write_blocked_count_ > 0)
set_force_write_blocked(false);
}

void QuicChromiumPacketWriter::set_force_write_blocked(
bool force_write_blocked) {
if (force_write_blocked) {
if (++force_write_blocked_count_ > 1)
return;
} else {
if (--force_write_blocked_count_ > 0)
return;
}

force_write_blocked_ = force_write_blocked;
if (!IsWriteBlocked() && delegate_ != nullptr)
delegate_->OnWriteUnblocked();
Expand Down Expand Up @@ -133,7 +144,7 @@ quic::WriteResult QuicChromiumPacketWriter::WritePacket(

void QuicChromiumPacketWriter::WritePacketToSocket(
scoped_refptr<ReusableIOBuffer> packet) {
DCHECK(!force_write_blocked_);
CHECK(!force_write_blocked_);
packet_ = std::move(packet);
quic::WriteResult result = WritePacketToSocketImpl();
if (result.error_code != ERR_IO_PENDING)
Expand Down
1 change: 1 addition & 0 deletions net/quic/quic_chromium_packet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class NET_EXPORT_PRIVATE QuicChromiumPacketWriter

// If ture, IsWriteBlocked() will return true regardless of
// |write_in_progress_|.
int force_write_blocked_count_ = 0;
bool force_write_blocked_ = false;

int retry_count_ = 0;
Expand Down
48 changes: 47 additions & 1 deletion net/third_party/quiche/src/quiche/quic/core/quic_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,10 @@ QuicConnection::~QuicConnection() {
}
}

void QuicConnection::ClearQueuedPackets() { buffered_packets_.clear(); }
void QuicConnection::ClearQueuedPackets() {
LOG(INFO) << __FUNCTION__ << " " << buffered_packets_.size();
buffered_packets_.clear();
}

bool QuicConnection::ValidateConfigConnectionIds(const QuicConfig& config) {
QUICHE_DCHECK(config.negotiated());
Expand Down Expand Up @@ -2407,6 +2410,7 @@ void QuicConnection::CloseIfTooManyOutstandingSentPackets() {
sent_packet_manager_.GetLeastUnacked() + max_tracked_packets_;

if (should_close) {
LOG(INFO) << __FUNCTION__ << "\n\n*************************\n\n";
CloseConnection(
QUIC_TOO_MANY_OUTSTANDING_SENT_PACKETS,
absl::StrCat("More than ", max_tracked_packets_,
Expand Down Expand Up @@ -2819,6 +2823,7 @@ void QuicConnection::OnCanWrite() {
if (writer_->IsWriteBlocked()) {
const std::string error_details =
"Writer is blocked while calling OnCanWrite.";
LOG(INFO) << __FUNCTION__ << " " << error_details;
QUIC_BUG(quic_bug_10511_22) << ENDPOINT << error_details;
CloseConnection(QUIC_INTERNAL_ERROR, error_details,
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
Expand Down Expand Up @@ -3117,6 +3122,25 @@ bool QuicConnection::ValidateReceivedPacketNumber(
}

void QuicConnection::WriteQueuedPackets() {
static int cnt = 0;
static int count = 0;
static int singles = 0;
static int multiples = 0;
++count;
if (buffered_packets_.size() > 1) {
multiples += buffered_packets_.size();
} else {
++singles;
}
if (++cnt > 99) {
cnt = 0;
LOG(INFO) << __FUNCTION__ << " " << buffered_packets_.size()
<< " count=" << count << " singles=" << singles
<< " mutiples=" << (double)multiples / (count - singles);
count = 0;
singles = 0;
multiples = 0;
}
QUICHE_DCHECK(!writer_->IsWriteBlocked());
#if !defined(STARBOARD)
QUIC_CLIENT_HISTOGRAM_COUNTS("QuicSession.NumQueuedPacketsBeforeWrite",
Expand All @@ -3125,6 +3149,7 @@ void QuicConnection::WriteQueuedPackets() {

while (!buffered_packets_.empty()) {
if (HandleWriteBlocked()) {
LOG(INFO) << __FUNCTION__;
break;
}
const BufferedPacket& packet = buffered_packets_.front();
Expand All @@ -3133,6 +3158,7 @@ void QuicConnection::WriteQueuedPackets() {
packet.peer_address, per_packet_options_);
QUIC_DVLOG(1) << ENDPOINT << "Sending buffered packet, result: " << result;
if (IsMsgTooBig(writer_, result) && packet.length > long_term_mtu_) {
LOG(INFO) << __FUNCTION__;
// When MSG_TOO_BIG is returned, the system typically knows what the
// actual MTU is, so there is no need to probe further.
// TODO(wub): Reduce max packet size to a safe default, or the actual MTU.
Expand All @@ -3142,6 +3168,7 @@ void QuicConnection::WriteQueuedPackets() {
continue;
}
if (IsWriteError(result.status)) {
LOG(INFO) << __FUNCTION__;
OnWriteError(result.error_code);
break;
}
Expand All @@ -3150,6 +3177,7 @@ void QuicConnection::WriteQueuedPackets() {
buffered_packets_.pop_front();
}
if (IsWriteBlockedStatus(result.status)) {
LOG(INFO) << __FUNCTION__;
visitor_->OnWriteBlocked();
break;
}
Expand Down Expand Up @@ -3543,6 +3571,7 @@ bool QuicConnection::WritePacket(SerializedPacket* packet) {
// duplicate packet being sent. The helper must call OnCanWrite
// when the write completes, and OnWriteError if an error occurs.
if (result.status != WRITE_STATUS_BLOCKED_DATA_BUFFERED) {
LOG(INFO) << __FUNCTION__;
QUIC_DVLOG(1) << ENDPOINT << "Adding packet: " << packet->packet_number
<< " to buffered packets";
buffered_packets_.emplace_back(*packet, send_from_address,
Expand Down Expand Up @@ -4539,6 +4568,7 @@ bool QuicConnection::MaybeProcessCoalescedPackets() {
void QuicConnection::CloseConnection(
QuicErrorCode error, const std::string& details,
ConnectionCloseBehavior connection_close_behavior) {
LOG(INFO) << __FUNCTION__ << " " << details << " *************************";
CloseConnection(error, NO_IETF_QUIC_ERROR, details,
connection_close_behavior);
}
Expand Down Expand Up @@ -5071,6 +5101,7 @@ bool QuicConnection::SendConnectivityProbingPacket(
QUIC_DLOG(INFO)
<< ENDPOINT
<< "Writer blocked when sending connectivity probing packet.";
LOG(INFO) << __FUNCTION__;
if (probing_writer == writer_) {
// Visitor should not be write blocked if the probing writer is not the
// default packet writer.
Expand Down Expand Up @@ -5850,6 +5881,20 @@ void QuicConnection::SendAllPendingAcks() {
if (!earliest_ack_timeout.IsInitialized()) {
return;
}

static bool was_blocked = false;
if (writer_->IsWriteBlocked()) {
was_blocked = true;
LOG(INFO) << __FUNCTION__
<< " writer_->IsWriteBlocked()=" << writer_->IsWriteBlocked();
return;
}
if (was_blocked) {
was_blocked = false;
LOG(INFO) << __FUNCTION__
<< " writer_->IsWriteBlocked()=" << writer_->IsWriteBlocked();
}

for (int8_t i = INITIAL_DATA; i <= APPLICATION_DATA; ++i) {
const QuicTime ack_timeout = uber_received_packet_manager_.GetAckTimeout(
static_cast<PacketNumberSpace>(i));
Expand All @@ -5875,6 +5920,7 @@ void QuicConnection::SendAllPendingAcks() {
QuicFrames frames;
frames.push_back(uber_received_packet_manager_.GetUpdatedAckFrame(
static_cast<PacketNumberSpace>(i), clock_->ApproximateNow()));

const bool flushed = packet_creator_.FlushAckFrame(frames);
if (!flushed) {
// Connection is write blocked.
Expand Down
Loading