diff --git a/conn.go b/conn.go index d704e18..302dd31 100644 --- a/conn.go +++ b/conn.go @@ -2,12 +2,13 @@ package stomp import ( "errors" - "gopkg.in/stomp.v1/frame" "io" "log" "net" "strconv" "time" + + "gopkg.in/stomp.v1/frame" ) // Default time span to add to read/write heart-beat timeouts @@ -59,6 +60,7 @@ type Conn struct { server string readTimeout time.Duration writeTimeout time.Duration + closed bool } type writeRequest struct { @@ -299,12 +301,15 @@ func processLoop(c *Conn, writer *Writer) { } case frame.ERROR: - log.Println("received ERROR") + log.Println("received ERROR; Closing underlying connection") for _, ch := range channels { ch <- f close(ch) } + c.closed = true + c.conn.Close() + return case frame.MESSAGE: @@ -372,6 +377,10 @@ func sendError(m map[string]chan *Frame, err error) { // with the STOMP server is closed and any further attempt to write // to the server will fail. func (c *Conn) Disconnect() error { + if c.closed { + return nil + } + ch := make(chan *Frame) c.writeCh <- writeRequest{ Frame: NewFrame(frame.DISCONNECT, frame.Receipt, allocateId()), @@ -383,6 +392,7 @@ func (c *Conn) Disconnect() error { return newError(response) } + c.closed = true return c.conn.Close() } @@ -397,7 +407,10 @@ func (c *Conn) Disconnect() error { // The message can contain optional, user-defined header entries in userDefined. If there are no optional header // entries, then set userDefined to nil. func (c *Conn) Send(destination, contentType string, body []byte, userDefined *Header) error { - // TODO(jpj): Check that we are still connected before sending. + if c.closed { + return newErrorMessage("Underlying connection closed.") + } + f := createSendFrame(destination, contentType, body, userDefined) f.Del(frame.Transaction) c.sendFrame(f) @@ -414,7 +427,10 @@ func (c *Conn) Send(destination, contentType string, body []byte, userDefined *H // The message can contain optional, user-defined header entries in userDefined. If there are no optional header // entries, then set userDefined to nil. func (c *Conn) SendWithReceipt(destination, contentType string, body []byte, userDefined *Header) error { - // TODO(jpj): Check that we are still connected before sending. + if c.closed { + return newErrorMessage("Underlying connection closed.") + } + f := createSendFrame(destination, contentType, body, userDefined) f.Del(frame.Transaction) return c.sendFrameWithReceipt(f)