From c0759dad9ace1454b27c20ec0efc8671162e40a7 Mon Sep 17 00:00:00 2001 From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> Date: Fri, 27 Oct 2023 22:39:34 +0800 Subject: [PATCH] transport: improve quic ExchangeReserved --- pkg/upstream/transport/conn_quic.go | 52 ++++++++++++++--------------- pkg/upstream/transport/transport.go | 3 -- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/pkg/upstream/transport/conn_quic.go b/pkg/upstream/transport/conn_quic.go index 020b38eac..288881b02 100644 --- a/pkg/upstream/transport/conn_quic.go +++ b/pkg/upstream/transport/conn_quic.go @@ -10,6 +10,10 @@ import ( "github.com/quic-go/quic-go" ) +const ( + quicQueryTimeout = time.Second * 6 +) + var _ DnsConn = (*QuicDnsConn)(nil) type QuicDnsConn struct { @@ -46,7 +50,11 @@ type quicReservedExchanger struct { var _ ReservedExchanger = (*quicReservedExchanger)(nil) func (ote *quicReservedExchanger) ExchangeReserved(ctx context.Context, q []byte) (resp *[]byte, err error) { - defer ote.WithdrawReserved() + stream := ote.stream + defer func() { + stream.CancelRead(0) + stream.Close() + }() payload, err := copyMsgWithLenHdr(q) if err != nil { @@ -61,18 +69,28 @@ func (ote *quicReservedExchanger) ExchangeReserved(ctx context.Context, q []byte orgQid := binary.BigEndian.Uint16((*payload)[2:]) binary.BigEndian.PutUint16((*payload)[2:], 0) - // TODO: use single goroutine. - // See RFC9250 4.3.1. Transaction Cancellation + stream.SetDeadline(time.Now().Add(quicQueryTimeout)) + _, err = stream.Write(*payload) + pool.ReleaseBuf(payload) + if err != nil { + return nil, err + } + + // RFC 9250 4.2 + // The client MUST send the DNS query over the selected stream and MUST + // indicate through the STREAM FIN mechanism that no further data will + // be sent on that stream. + // + // Call Close() here will send the STREAM FIN. It won't close Read. + stream.Close() + type res struct { resp *[]byte err error } rc := make(chan res, 1) go func() { - defer func() { - pool.ReleaseBuf(payload) - }() - r, err := exchangeTroughQuicStream(ote.stream, *payload) + r, err := dnsutils.ReadRawMsgFromTCP(stream) rc <- res{resp: r, err: err} }() @@ -91,24 +109,6 @@ func (ote *quicReservedExchanger) ExchangeReserved(ctx context.Context, q []byte func (ote *quicReservedExchanger) WithdrawReserved() { s := ote.stream + s.CancelRead(0) s.Close() - s.CancelRead(0) // TODO: Needs a proper error code. -} - -func exchangeTroughQuicStream(s quic.Stream, payload []byte) (*[]byte, error) { - s.SetDeadline(time.Now().Add(quicQueryTimeout)) - - _, err := s.Write(payload) - if err != nil { - return nil, err - } - - // RFC 9250 4.2 - // The client MUST send the DNS query over the selected stream and MUST - // indicate through the STREAM FIN mechanism that no further data will - // be sent on that stream. - // - // Call Close() here will send the STREAM FIN. It won't close Read. - s.Close() - return dnsutils.ReadRawMsgFromTCP(s) } diff --git a/pkg/upstream/transport/transport.go b/pkg/upstream/transport/transport.go index d408f0810..36af405e0 100644 --- a/pkg/upstream/transport/transport.go +++ b/pkg/upstream/transport/transport.go @@ -43,9 +43,6 @@ const ( defaultIdleTimeout = time.Second * 10 defaultDialTimeout = time.Second * 5 - - quicQueryTimeout = time.Second * 6 - // If a pipeline connection sent a query but did not see any reply (include replies that // for other queries) from the server after waitingReplyTimeout. It assumes that // something goes wrong with the connection or the server. The connection will be closed.