From 3c05ebdf8b565a1e7ed09eb7796c8d762d6108e0 Mon Sep 17 00:00:00 2001 From: TremblingV5 Date: Tue, 21 May 2024 22:08:51 +0800 Subject: [PATCH] fix issue #323 --- connection_errors.go | 17 ++++++++++------- connection_impl.go | 13 +++++++++++-- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/connection_errors.go b/connection_errors.go index 1edfa21d..f14070ab 100644 --- a/connection_errors.go +++ b/connection_errors.go @@ -36,6 +36,8 @@ const ( ErrEOF = syscall.Errno(0x106) // Write I/O buffer timeout, calling by Connection.Writer ErrWriteTimeout = syscall.Errno(0x107) + // Concurrent connection access error + ErrConcurrentAccess = syscall.Errno(0x108) ) const ErrnoMask = 0xFF @@ -110,11 +112,12 @@ func (e *exception) Temporary() bool { // Errors defined in netpoll var errnos = [...]string{ - ErrnoMask & ErrConnClosed: "connection has been closed", - ErrnoMask & ErrReadTimeout: "connection read timeout", - ErrnoMask & ErrDialTimeout: "dial wait timeout", - ErrnoMask & ErrDialNoDeadline: "dial no deadline", - ErrnoMask & ErrUnsupported: "netpoll dose not support", - ErrnoMask & ErrEOF: "EOF", - ErrnoMask & ErrWriteTimeout: "connection write timeout", + ErrnoMask & ErrConnClosed: "connection has been closed", + ErrnoMask & ErrReadTimeout: "connection read timeout", + ErrnoMask & ErrDialTimeout: "dial wait timeout", + ErrnoMask & ErrDialNoDeadline: "dial no deadline", + ErrnoMask & ErrUnsupported: "netpoll does not support", + ErrnoMask & ErrEOF: "EOF", + ErrnoMask & ErrWriteTimeout: "connection write timeout", + ErrnoMask & ErrConcurrentAccess: "concurrent connection access", } diff --git a/connection_impl.go b/connection_impl.go index b683b4df..bb2609b6 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -220,10 +220,15 @@ func (c *connection) MallocLen() (length int) { // If empty, it will call syscall.Write to send data directly, // otherwise the buffer will be sent asynchronously by the epoll trigger. func (c *connection) Flush() error { - if !c.IsActive() || !c.lock(flushing) { + if !c.IsActive() { return Exception(ErrConnClosed, "when flush") } + + if !c.lock(flushing) { + return Exception(ErrConcurrentAccess, "when flush") + } defer c.unlock(flushing) + c.outputBuffer.Flush() return c.flush() } @@ -282,9 +287,13 @@ func (c *connection) Read(p []byte) (n int, err error) { // Write will Flush soon. func (c *connection) Write(p []byte) (n int, err error) { - if !c.IsActive() || !c.lock(flushing) { + if !c.IsActive() { return 0, Exception(ErrConnClosed, "when write") } + + if !c.lock(flushing) { + return 0, Exception(ErrConcurrentAccess, "when write") + } defer c.unlock(flushing) dst, _ := c.outputBuffer.Malloc(len(p))