Skip to content

Commit

Permalink
Merge pull request #2 from joliver/master
Browse files Browse the repository at this point in the history
When the server broker the connection forcefully, the client should disconnect
  • Loading branch information
jjeffery committed Aug 23, 2014
2 parents 1d04a4b + 409b7eb commit f9009d7
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package stomp

import (
"errors"
"gopkg.in/stomp.v1/frame"
"io"
"log"
"net"
"strconv"
"time"

"gopkg.in/stomp.v1/frame"
)

// Default time span to add to read/write heart-beat timeouts
Expand Down Expand Up @@ -59,6 +60,7 @@ type Conn struct {
server string
readTimeout time.Duration
writeTimeout time.Duration
closed bool
}

type writeRequest struct {
Expand Down Expand Up @@ -299,12 +301,15 @@ func processLoop(c *Conn, writer *Writer) {
}

case frame.ERROR:
log.Println("received ERROR")
log.Println("received ERROR; Closing underlying connection")
for _, ch := range channels {
ch <- f
close(ch)
}

c.closed = true
c.conn.Close()

return

case frame.MESSAGE:
Expand Down Expand Up @@ -372,6 +377,10 @@ func sendError(m map[string]chan *Frame, err error) {
// with the STOMP server is closed and any further attempt to write
// to the server will fail.
func (c *Conn) Disconnect() error {
if c.closed {
return nil
}

ch := make(chan *Frame)
c.writeCh <- writeRequest{
Frame: NewFrame(frame.DISCONNECT, frame.Receipt, allocateId()),
Expand All @@ -383,6 +392,7 @@ func (c *Conn) Disconnect() error {
return newError(response)
}

c.closed = true
return c.conn.Close()
}

Expand All @@ -397,7 +407,10 @@ func (c *Conn) Disconnect() error {
// The message can contain optional, user-defined header entries in userDefined. If there are no optional header
// entries, then set userDefined to nil.
func (c *Conn) Send(destination, contentType string, body []byte, userDefined *Header) error {
// TODO(jpj): Check that we are still connected before sending.
if c.closed {
return newErrorMessage("Underlying connection closed.")
}

f := createSendFrame(destination, contentType, body, userDefined)
f.Del(frame.Transaction)
c.sendFrame(f)
Expand All @@ -414,7 +427,10 @@ func (c *Conn) Send(destination, contentType string, body []byte, userDefined *H
// The message can contain optional, user-defined header entries in userDefined. If there are no optional header
// entries, then set userDefined to nil.
func (c *Conn) SendWithReceipt(destination, contentType string, body []byte, userDefined *Header) error {
// TODO(jpj): Check that we are still connected before sending.
if c.closed {
return newErrorMessage("Underlying connection closed.")
}

f := createSendFrame(destination, contentType, body, userDefined)
f.Del(frame.Transaction)
return c.sendFrameWithReceipt(f)
Expand Down

0 comments on commit f9009d7

Please sign in to comment.