Skip to content

Commit

Permalink
Merge pull request #8 from sillyousu/fix_issuse_7
Browse files Browse the repository at this point in the history
让虚拟连接关闭和接受转发包两件事情互斥 fix #7
  • Loading branch information
bg5sbk authored May 17, 2017
2 parents c876c3f + 1caa26a commit 9921afa
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 11 deletions.
29 changes: 26 additions & 3 deletions go/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type virtualCodec struct {
physicalConn *link.Session
connID uint32
recvChan chan []byte
closeOnce sync.Once
closeMutex sync.Mutex
closed bool
lastActive *int64
format MsgFormat
}
Expand All @@ -104,6 +105,24 @@ func (p *protocol) newVirtualCodec(physicalConn *link.Session, connID uint32, re
}
}

func (c *virtualCodec) forward(buf []byte) {
c.closeMutex.Lock()
if c.closed {
c.closeMutex.Unlock()
c.free(buf)
return
}
select {
case c.recvChan <- buf:
c.closeMutex.Unlock()
return
default:
c.closeMutex.Unlock()
c.Close()
c.free(buf)
}
}

func (c *virtualCodec) Receive() (interface{}, error) {
buf, ok := <-c.recvChan
if !ok {
Expand Down Expand Up @@ -137,10 +156,14 @@ func (c *virtualCodec) Send(msg interface{}) error {
}

func (c *virtualCodec) Close() error {
c.closeOnce.Do(func() {
c.closeMutex.Lock()
if !c.closed {
c.closed = true
close(c.recvChan)
c.send(c.physicalConn, c.encodeCloseCmd(c.connID))
})
}
c.closeMutex.Unlock()

for buf := range c.recvChan {
c.free(buf)
}
Expand Down
20 changes: 20 additions & 0 deletions go/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,23 @@ func Test_BadVirtualCodec(t *testing.T) {
vcodec.recvChan <- bigMsg
vcodec.Close()
}

func Test_VirtualCodecReceivcBlock(t *testing.T) {
conn, err := net.Dial("tcp", TestAddr)
utest.IsNilNow(t, err)
defer conn.Close()

codec := TestProto.newCodec(0, conn, 1024)
pconn := link.NewSession(codec, 1000)

var lastActive int64
recvChanSize := 2
vcodec := TestProto.newVirtualCodec(pconn, 123, recvChanSize, &lastActive, &TestMsgFormat{})
buf := make([]byte, 100)
for i := 0; i <= recvChanSize; i++ {
vcodec.forward(buf)
}
vcodec.closeMutex.Lock()
defer vcodec.closeMutex.Unlock()
utest.Assert(t, vcodec.closed)
}
12 changes: 4 additions & 8 deletions go/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,11 @@ func (p *EndPoint) loop() {

vconn := p.virtualConns.Get(connID)
if vconn != nil {
select {
case vconn.Codec().(*virtualCodec).recvChan <- buf:
continue
default:
vconn.Close()
}
vconn.Codec().(*virtualCodec).forward(buf)
} else {
p.free(buf)
p.send(p.session, p.encodeCloseCmd(connID))
}
p.free(buf)
p.send(p.session, p.encodeCloseCmd(connID))
}
}

Expand Down
45 changes: 45 additions & 0 deletions go/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,48 @@ func Test_BadEndPoint(t *testing.T) {
_, err = DialServer("tcp", lsn3.Addr().String(), TestEndPointCfg)
utest.NotNilNow(t, err)
}

func Test_VConnSimultaneouslyCloseAndReceive(t *testing.T) {
lsn1, err := net.Listen("tcp", "127.0.0.1:0")
utest.IsNilNow(t, err)
defer lsn1.Close()

lsn2, err := net.Listen("tcp", "127.0.0.1:0")
utest.IsNilNow(t, err)
defer lsn2.Close()

gw := NewGateway(TestPool, TestMaxPacket)

go gw.ServeClients(lsn1, TestGatewayCfg)
go gw.ServeServers(lsn2, TestGatewayCfg)

time.Sleep(time.Second)

server, err := DialServer("tcp", lsn2.Addr().String(), TestEndPointCfg)
utest.IsNilNow(t, err)
time.Sleep(time.Second)
go func() {
for {
vconn, err := server.Accept()
if err != nil {
return
}
runtime.Gosched()
vconn.Close()
runtime.Gosched()
}
}()
time.Sleep(time.Second)
payload := make([]byte, 10)
for i := 0; i < 10000; i++ {
client, err := DialClient("tcp", lsn1.Addr().String(), TestEndPointCfg)
utest.IsNilNow(t, err)
vconn, err := client.Dial(123)
utest.IsNilNow(t, err)
runtime.Gosched()
vconn.Send(payload)
runtime.Gosched()
client.Close()
}
gw.Stop()
}

0 comments on commit 9921afa

Please sign in to comment.