Skip to content

Commit

Permalink
Merge pull request #45 from Fjolnir-Dvorak/raceConPR
Browse files Browse the repository at this point in the history
Race conditions and missing checks on Channel
  • Loading branch information
worg authored Sep 19, 2018
2 parents 10a97af + e3737c9 commit 8a8033d
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Conn struct {
readTimeout time.Duration
writeTimeout time.Duration
closed bool
closeMutex *sync.Mutex
options *connOptions
}

Expand Down Expand Up @@ -68,7 +69,8 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error)
writer := frame.NewWriter(conn)

c := &Conn{
conn: conn,
conn: conn,
closeMutex: &sync.Mutex{},
}

options, err := newConnOptions(c, opts)
Expand Down Expand Up @@ -296,6 +298,8 @@ func processLoop(c *Conn, writer *frame.Writer) {
close(ch)
}

c.closeMutex.Lock()
defer c.closeMutex.Unlock()
c.closed = true
c.conn.Close()

Expand Down Expand Up @@ -366,6 +370,8 @@ func sendError(m map[string]chan *frame.Frame, err error) {
// with the STOMP server is closed and any further attempt to write
// to the server will fail.
func (c *Conn) Disconnect() error {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.closed {
return nil
}
Expand All @@ -389,6 +395,8 @@ func (c *Conn) Disconnect() error {
// This method should be used only as last resort when there are fatal
// network errors that prevent to do a proper disconnect from the server.
func (c *Conn) MustDisconnect() error {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.closed {
return nil
}
Expand All @@ -410,6 +418,8 @@ func (c *Conn) MustDisconnect() error {
// Any number of options can be specified in opts. See the examples for usage. Options include whether
// to receive a RECEIPT, should the content-length be suppressed, and sending custom header entries.
func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.closed {
return ErrAlreadyClosed
}
Expand Down Expand Up @@ -551,10 +561,13 @@ func (c *Conn) Subscribe(destination string, ack AckMode, opts ...func(*frame.Fr
}
go sub.readLoop(ch)

// TODO is this safe? There is no check if writeCh is actually open.
c.writeCh <- request
return sub, nil
}

// TODO check further for race conditions

// Ack acknowledges a message received from the STOMP server.
// If the message was received on a subscription with AckMode == AckAuto,
// then no operation is performed.
Expand Down

0 comments on commit 8a8033d

Please sign in to comment.