Skip to content

Commit

Permalink
feat: add WithReadThreshold API
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 7, 2023
1 parent 9707178 commit 129d68a
Show file tree
Hide file tree
Showing 20 changed files with 494 additions and 104 deletions.
17 changes: 10 additions & 7 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
// The wait read size large than read threshold
ErrReadOutOfThreshold = syscall.Errno(0x108)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -90,11 +92,12 @@ func (e *exception) Unwrap() error {

// Errors defined in netpoll
var errnos = [...]string{
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrReadOutOfThreshold: "connection read size is out of threshold",
}
45 changes: 34 additions & 11 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type connection struct {
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
readThreshold int64 // The readThreshold of connection max read.
}

var (
Expand Down Expand Up @@ -94,6 +95,12 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
return nil
}

// SetReadThreshold implements Connection.
func (c *connection) SetReadThreshold(readThreshold int64) error {
c.readThreshold = readThreshold
return nil
}

// ------------------------------------------ implement zero-copy reader ------------------------------------------

// Next implements Connection.
Expand Down Expand Up @@ -394,28 +401,44 @@ func (c *connection) triggerWrite(err error) {
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
if n <= c.inputBuffer.Len() {
return nil
goto CLEANUP
}
// cannot wait read with an out of threshold size
if c.readThreshold > 0 && int64(n) > c.readThreshold {
// just return error and dont do cleanup
return Exception(ErrReadOutOfThreshold, "wait read")
}

atomic.StoreInt64(&c.waitReadSize, int64(n))
defer atomic.StoreInt64(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
err = c.waitReadWithTimeout(n)
goto CLEANUP
}
// wait full n
for c.inputBuffer.Len() < n {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
err = Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
err = Exception(ErrConnClosed, "wait read")
default:
err = <-c.readTrigger
if err != nil {
return err
}
}
if err != nil {
goto CLEANUP
}
}
return nil
CLEANUP:
atomic.StoreInt64(&c.waitReadSize, 0)
if c.readThreshold > 0 && err == nil {
// only resume read when current read size could make newBufferSize < readThreshold
bufferSize := int64(c.inputBuffer.Len())
newBufferSize := bufferSize - int64(n)
if bufferSize >= c.readThreshold && newBufferSize < c.readThreshold {
c.resumeRead()
}
}
return err
}

// waitReadWithTimeout will wait full n bytes or until timeout.
Expand Down
1 change: 1 addition & 0 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)
c.SetReadThreshold(opts.readThreshold)

// calling prepare first and then register.
if opts.onPrepare != nil {
Expand Down
30 changes: 29 additions & 1 deletion connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (c *connection) inputAck(n int) (err error) {
c.maxSize = mallocMax
}

// trigger throttle
if c.readThreshold > 0 && int64(length) >= c.readThreshold {
c.pauseRead()
}

var needTrigger = true
if length == n { // first start onRequest
needTrigger = c.onRequest()
Expand Down Expand Up @@ -138,6 +143,29 @@ func (c *connection) outputAck(n int) (err error) {

// rw2r removed the monitoring of write events.
func (c *connection) rw2r() {
c.operator.Control(PollRW2R)
switch c.operator.getMode() {
case opreadwrite:
c.operator.Control(PollRW2R)
case opwrite:
c.operator.Control(PollW2RW)
}
c.triggerWrite(nil)
}

func (c *connection) pauseRead() {
switch c.operator.getMode() {
case opread:
c.operator.Control(PollR2Hup)
case opreadwrite:
c.operator.Control(PollRW2W)
}
}

func (c *connection) resumeRead() {
switch c.operator.getMode() {
case ophup:
c.operator.Control(PollHup2R)
case opwrite:
c.operator.Control(PollW2RW)
}
}
92 changes: 92 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,95 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
wg.Wait()
}
}

func TestConnectionReadOutOfThreshold(t *testing.T) {
var readThreshold = 1024 * 100
var readSize = readThreshold + 1
var opts = &options{}
var wg sync.WaitGroup
wg.Add(1)
opts.onRequest = func(ctx context.Context, connection Connection) error {
if connection.Reader().Len() < readThreshold {
return nil
}
defer wg.Done()
// read throttled data
_, err := connection.Reader().Next(readSize)
Assert(t, errors.Is(err, ErrReadOutOfThreshold), err)
connection.Close()
return nil
}

WithReadThreshold(int64(readThreshold)).f(opts)
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, opts)
wconn.init(&netFD{fd: w}, opts)

msg := make([]byte, readThreshold)
_, err := wconn.Writer().WriteBinary(msg)
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
wg.Wait()
}

func TestConnectionReadThreshold(t *testing.T) {
var readThreshold int64 = 1024 * 100
var opts = &options{}
var wg sync.WaitGroup
var throttled int32
wg.Add(1)
opts.onRequest = func(ctx context.Context, connection Connection) error {
if int64(connection.Reader().Len()) < readThreshold {
return nil
}
defer wg.Done()

atomic.StoreInt32(&throttled, 1)
// check if no more read data when throttled
inbuffered := connection.Reader().Len()
t.Logf("Inbuffered: %d", inbuffered)
time.Sleep(time.Millisecond * 100)
Equal(t, inbuffered, connection.Reader().Len())

// read non-throttled data
buf, err := connection.Reader().Next(int(readThreshold))
Equal(t, int64(len(buf)), readThreshold)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
t.Logf("read non-throttled data")

// continue read throttled data
buf, err = connection.Reader().Next(5)
MustNil(t, err)
t.Logf("read throttled data: [%s]", buf)
Equal(t, len(buf), 5)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
Equal(t, connection.Reader().Len(), 0)
return nil
}

WithReadThreshold(readThreshold).f(opts)
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, opts)
wconn.init(&netFD{fd: w}, opts)
Assert(t, rconn.readThreshold == readThreshold)

msg := make([]byte, readThreshold)
_, err := wconn.Writer().WriteBinary(msg)
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
_, err = wconn.Writer().WriteString("hello")
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
t.Logf("flush final msg")

wg.Wait()
}
20 changes: 20 additions & 0 deletions docs/guide/guide_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,26 @@ func callback(connection netpoll.Connection) error {
}
```

## 8. 如何配置连接的读取阈值大小 ?

Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadThreshold` 来控制读取的最大阈值。

### Client 侧使用

```
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
conn, _ = dialer.DialConnection(network, address, timeout)
```

### Server 侧使用

```
eventLoop, _ := netpoll.NewEventLoop(
handle,
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
)
```

# 注意事项

## 1. 错误设置 NumLoops
Expand Down
24 changes: 24 additions & 0 deletions docs/guide/guide_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,30 @@ func callback(connection netpoll.Connection) error {
}
```

## 8. How to configure the read threshold of the connection?

By default, Netpoll does not place any limit on the reading speed of data sent by the end.
Whenever there have more data on the connection, Netpoll will read the data into its own buffer as quickly as possible.

But sometimes users may not want data to be read too quickly, or they want to control the service memory usage, or the user's OnRequest callback processing data very slowly and need to control the peer's send speed.
In this case, you can use `WithReadThreshold` to control the maximum reading threshold.

### Client side use

```
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
conn, _ = dialer.DialConnection(network, address, timeout)
```

### Server side use

```
eventLoop, _ := netpoll.NewEventLoop(
handle,
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
)
```

# Attention

## 1. Wrong setting of NumLoops
Expand Down
30 changes: 15 additions & 15 deletions eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,27 @@ type OnPrepare func(connection Connection) context.Context
//
// An example usage in TCP Proxy scenario:
//
// func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context {
// downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second)
// return context.WithValue(ctx, downstreamKey, downstream)
// }
// func onRequest(ctx context.Context, upstream netpoll.Connection) error {
// downstream := ctx.Value(downstreamKey).(netpoll.Connection)
// }
// func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context {
// downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second)
// return context.WithValue(ctx, downstreamKey, downstream)
// }
// func onRequest(ctx context.Context, upstream netpoll.Connection) error {
// downstream := ctx.Value(downstreamKey).(netpoll.Connection)
// }
type OnConnect func(ctx context.Context, connection Connection) context.Context

// OnRequest defines the function for handling connection. When data is sent from the connection peer,
// netpoll actively reads the data in LT mode and places it in the connection's input buffer.
// Generally, OnRequest starts handling the data in the following way:
//
// func OnRequest(ctx context, connection Connection) error {
// input := connection.Reader().Next(n)
// handling input data...
// send, _ := connection.Writer().Malloc(l)
// copy(send, output)
// connection.Flush()
// return nil
// }
// func OnRequest(ctx context, connection Connection) error {
// input := connection.Reader().Next(n)
// handling input data...
// send, _ := connection.Writer().Malloc(l)
// copy(send, output)
// connection.Flush()
// return nil
// }
//
// OnRequest will run in a separate goroutine and
// it is guaranteed that there is one and only one OnRequest running at the same time.
Expand Down
Loading

0 comments on commit 129d68a

Please sign in to comment.