Skip to content

Commit

Permalink
transport: impl reusableConn with active read loop
Browse files Browse the repository at this point in the history
  • Loading branch information
IrineSistiana committed Oct 27, 2023
1 parent 0f784a8 commit 77bf3dd
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 95 deletions.
33 changes: 26 additions & 7 deletions pkg/upstream/transport/conn_traditional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,32 @@ type dummyEchoNetConn struct {
rErrProb float64
rLatency time.Duration
wErrProb float64

closeOnce sync.Once
closeNotify chan struct{}
}

func newDummyEchoNetConn(rErrProb float64, rLatency time.Duration, wErrProb float64) NetConn {
c1, c2 := net.Pipe()
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer c1.Close()
defer c2.Close()
for {
m, readErr := dnsutils.ReadRawMsgFromTCP(c2)
if m != nil {
go func() {
defer pool.ReleaseBuf(m)
if rLatency > 0 {
t := time.NewTimer(rLatency)
defer t.Stop()
select {
case <-t.C:
case <-ctx.Done():
return
}
}
latency := time.Millisecond * time.Duration(rand.Intn(20))
time.Sleep(latency)
_, _ = dnsutils.WriteRawMsgToTCP(c2, *m)
Expand All @@ -64,10 +78,11 @@ func newDummyEchoNetConn(rErrProb float64, rLatency time.Duration, wErrProb floa
}
}()
return &dummyEchoNetConn{
Conn: c1,
rErrProb: rErrProb,
rLatency: rLatency,
wErrProb: wErrProb,
Conn: c1,
rErrProb: rErrProb,
rLatency: rLatency,
wErrProb: wErrProb,
closeNotify: make(chan struct{}),
}
}

Expand All @@ -76,9 +91,6 @@ func probTrue(p float64) bool {
}

func (d *dummyEchoNetConn) Read(p []byte) (n int, err error) {
if d.rLatency > 0 {
time.Sleep(d.rLatency)
}
if probTrue(d.rErrProb) {
return 0, errors.New("read err")
}
Expand All @@ -92,6 +104,13 @@ func (d *dummyEchoNetConn) Write(p []byte) (n int, err error) {
return d.Conn.Write(p)
}

func (d *dummyEchoNetConn) Close() error {
d.closeOnce.Do(func() {
close(d.closeNotify)
})
return d.Conn.Close()
}

func Test_dnsConn_exchange(t *testing.T) {
idleTimeout := time.Millisecond * 100

Expand Down
Loading

0 comments on commit 77bf3dd

Please sign in to comment.