Skip to content

Commit

Permalink
feat: add WithReadThreshold API
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 7, 2023
1 parent 9707178 commit f815f75
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 57 deletions.
3 changes: 3 additions & 0 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
// The wait read size large than read threshold
ErrReadOutOfThreshold = syscall.Errno(0x108)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -97,4 +99,5 @@ var errnos = [...]string{
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrReadOutOfThreshold: "connection read size is out of threshold",
}
45 changes: 34 additions & 11 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type connection struct {
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
readThreshold int64 // The readThreshold of connection max read.
}

var (
Expand Down Expand Up @@ -94,6 +95,12 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
return nil
}

// SetReadThreshold implements Connection.
func (c *connection) SetReadThreshold(readThreshold int64) error {
c.readThreshold = readThreshold
return nil
}

// ------------------------------------------ implement zero-copy reader ------------------------------------------

// Next implements Connection.
Expand Down Expand Up @@ -394,28 +401,44 @@ func (c *connection) triggerWrite(err error) {
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
if n <= c.inputBuffer.Len() {
return nil
goto CLEANUP
}
// cannot wait read with an out of threshold size
if c.readThreshold > 0 && int64(n) > c.readThreshold {
// just return error and dont do cleanup
return Exception(ErrReadOutOfThreshold, "wait read")
}

atomic.StoreInt64(&c.waitReadSize, int64(n))
defer atomic.StoreInt64(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
err = c.waitReadWithTimeout(n)
goto CLEANUP
}
// wait full n
for c.inputBuffer.Len() < n {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
err = Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
err = Exception(ErrConnClosed, "wait read")
default:
err = <-c.readTrigger
if err != nil {
return err
}
}
if err != nil {
goto CLEANUP
}
}
return nil
CLEANUP:
atomic.StoreInt64(&c.waitReadSize, 0)
if c.readThreshold > 0 && err == nil {
// only resume read when current read size could make newBufferSize < readThreshold
bufferSize := int64(c.inputBuffer.Len())
newBufferSize := bufferSize - int64(n)
if bufferSize >= c.readThreshold && newBufferSize < c.readThreshold {
c.resumeRead()
}
}
return err
}

// waitReadWithTimeout will wait full n bytes or until timeout.
Expand Down
1 change: 1 addition & 0 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)
c.SetReadThreshold(opts.readThreshold)

// calling prepare first and then register.
if opts.onPrepare != nil {
Expand Down
26 changes: 26 additions & 0 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (c *connection) inputAck(n int) (err error) {
c.maxSize = mallocMax
}

println(c.readThreshold, length)
// trigger throttle
if c.readThreshold > 0 && int64(length) >= c.readThreshold {
c.pauseRead()
}

var needTrigger = true
if length == n { // first start onRequest
needTrigger = c.onRequest()
Expand Down Expand Up @@ -141,3 +147,23 @@ func (c *connection) rw2r() {
c.operator.Control(PollRW2R)
c.triggerWrite(nil)
}

func (c *connection) pauseRead() {
opmode := c.operator.getMode()
switch opmode {
case opread:
c.operator.Control(PollR2Hup)
case opreadwrite:
c.operator.Control(PollRW2W)
}
}

func (c *connection) resumeRead() {
opmode := c.operator.getMode()
switch opmode {
case ophup:
c.operator.Control(PollHup2R)
case opwrite:
c.operator.Control(PollW2RW)
}
}
92 changes: 92 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,95 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
wg.Wait()
}
}

func TestConnectionReadOutOfThreshold(t *testing.T) {
var readThreshold = 1024 * 100
var readSize = readThreshold + 1
var opts = &options{}
var wg sync.WaitGroup
wg.Add(1)
opts.onRequest = func(ctx context.Context, connection Connection) error {
if connection.Reader().Len() < readThreshold {
return nil
}
defer wg.Done()
// read throttled data
_, err := connection.Reader().Next(readSize)
Assert(t, errors.Is(err, ErrReadOutOfThreshold), err)
connection.Close()
return nil
}

WithReadThreshold(int64(readThreshold)).f(opts)
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, opts)
wconn.init(&netFD{fd: w}, opts)

msg := make([]byte, readThreshold)
_, err := wconn.Writer().WriteBinary(msg)
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
wg.Wait()
}

func TestConnectionReadThreshold(t *testing.T) {
var readThreshold int64 = 1024 * 100
var opts = &options{}
var wg sync.WaitGroup
var throttled int32
wg.Add(1)
opts.onRequest = func(ctx context.Context, connection Connection) error {
if int64(connection.Reader().Len()) < readThreshold {
return nil
}
defer wg.Done()

atomic.StoreInt32(&throttled, 1)
// check if no more read data when throttled
inbuffered := connection.Reader().Len()
t.Logf("Inbuffered: %d", inbuffered)
time.Sleep(time.Millisecond * 100)
Equal(t, inbuffered, connection.Reader().Len())

// read non-throttled data
buf, err := connection.Reader().Next(int(readThreshold))
Equal(t, int64(len(buf)), readThreshold)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
t.Logf("read non-throttled data")

// continue read throttled data
buf, err = connection.Reader().Next(5)
MustNil(t, err)
t.Logf("read throttled data: [%s]", buf)
Equal(t, len(buf), 5)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
Equal(t, connection.Reader().Len(), 0)
return nil
}

WithReadThreshold(readThreshold).f(opts)
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, opts)
wconn.init(&netFD{fd: w}, opts)
Assert(t, rconn.readThreshold == readThreshold)

msg := make([]byte, readThreshold)
_, err := wconn.Writer().WriteBinary(msg)
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
_, err = wconn.Writer().WriteString("hello")
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
t.Logf("flush final msg")

wg.Wait()
}
20 changes: 20 additions & 0 deletions docs/guide/guide_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,26 @@ func callback(connection netpoll.Connection) error {
}
```

## 8. 如何配置连接的读取阈值大小 ?

Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadThreshold` 来控制读取的最大阈值。

### Client 侧使用

```
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
conn, _ = dialer.DialConnection(network, address, timeout)
```

### Server 侧使用

```
eventLoop, _ := netpoll.NewEventLoop(
handle,
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
)
```

# 注意事项

## 1. 错误设置 NumLoops
Expand Down
24 changes: 24 additions & 0 deletions docs/guide/guide_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,30 @@ func callback(connection netpoll.Connection) error {
}
```

## 8. How to configure the read threshold of the connection?

By default, Netpoll does not place any limit on the reading speed of data sent by the end.
Whenever there have more data on the connection, Netpoll will read the data into its own buffer as quickly as possible.

But sometimes users may not want data to be read too quickly, or they want to control the service memory usage, or the user's OnRequest callback processing data very slowly and need to control the peer's send speed.
In this case, you can use `WithReadThreshold` to control the maximum reading threshold.

### Client side use

```
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
conn, _ = dialer.DialConnection(network, address, timeout)
```

### Server side use

```
eventLoop, _ := netpoll.NewEventLoop(
handle,
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
)
```

# Attention

## 1. Wrong setting of NumLoops
Expand Down
25 changes: 19 additions & 6 deletions fd_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ import (
"sync/atomic"
)

const (
opdetach int32 = -1
_ int32 = 0 // default op mode, means nothing
opread int32 = 1
opwrite int32 = 2
opreadwrite int32 = 3
ophup int32 = 4
)

// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
// FD is file descriptor, poll will bind when register.
Expand All @@ -42,8 +51,7 @@ type FDOperator struct {
// poll is the registered location of the file descriptor.
poll Poll

// protect only detach once
detached int32
mode int32

// private, used by operatorCache
next *FDOperator
Expand All @@ -52,16 +60,21 @@ type FDOperator struct {
}

func (op *FDOperator) Control(event PollEvent) error {
if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
return nil
}
return op.poll.Control(op, event)
}

func (op *FDOperator) Free() {
op.poll.Free(op)
}

func (op *FDOperator) getMode() int32 {
return atomic.LoadInt32(&op.mode)
}

func (op *FDOperator) setMode(mode int32) {
atomic.StoreInt32(&op.mode, mode)
}

func (op *FDOperator) do() (can bool) {
return atomic.CompareAndSwapInt32(&op.state, 1, 2)
}
Expand Down Expand Up @@ -98,5 +111,5 @@ func (op *FDOperator) reset() {
op.Inputs, op.InputAck = nil, nil
op.Outputs, op.OutputAck = nil, nil
op.poll = nil
op.detached = 0
op.mode = 0
}
21 changes: 15 additions & 6 deletions net_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,22 @@ func DialConnection(network, address string, timeout time.Duration) (connection
}

// NewDialer only support TCP and unix socket now.
func NewDialer() Dialer {
return &dialer{}
func NewDialer(opts ...Option) Dialer {
d := new(dialer)
if len(opts) > 0 {
d.opts = new(options)
for _, opt := range opts {
opt.f(d.opts)
}
}
return d
}

var defaultDialer = NewDialer()

type dialer struct{}
type dialer struct {
opts *options
}

// DialTimeout implements Dialer.
func (d *dialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
Expand All @@ -59,7 +68,7 @@ func (d *dialer) DialConnection(network, address string, timeout time.Duration)
raddr := &UnixAddr{
UnixAddr: net.UnixAddr{Name: address, Net: network},
}
return DialUnix(network, nil, raddr)
return dialUnix(network, nil, raddr, d.opts)
default:
return nil, net.UnknownNetworkError(network)
}
Expand Down Expand Up @@ -95,9 +104,9 @@ func (d *dialer) dialTCP(ctx context.Context, network, address string) (connecti
tcpAddr.Port = portnum
tcpAddr.Zone = ipaddr.Zone
if ipaddr.IP != nil && ipaddr.IP.To4() == nil {
connection, err = DialTCP(ctx, "tcp6", nil, tcpAddr)
connection, err = dialTCP(ctx, "tcp6", nil, tcpAddr, d.opts)
} else {
connection, err = DialTCP(ctx, "tcp", nil, tcpAddr)
connection, err = dialTCP(ctx, "tcp", nil, tcpAddr, d.opts)
}
if err == nil {
return connection, nil
Expand Down
Loading

0 comments on commit f815f75

Please sign in to comment.