Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition when sending events to listeners #198

Closed
sybrandy opened this issue Sep 21, 2016 · 11 comments
Closed

Race condition when sending events to listeners #198

sybrandy opened this issue Sep 21, 2016 · 11 comments
Labels

Comments

@sybrandy
Copy link

Some background: I modified Traefik, which uses this library, to periodically shutdown the Marathon client and create a new one. This is due to a bug in Marathon where Traefik will stop receiving events when an instance crashes. AFAIK, this hasn't been fixed yet and I don't know when, hence the work-around.

So, after getting this code working better, I started seeing panic from the below code where it tries to send on a closed channel:

for channel, filter := range r.listeners {
    // step: check if this listener wants this event type
    if event.ID&filter != 0 {
        go func(ch EventsChannel, e *Event) {
            ch <- e
        }(channel, event)
    }
}

The code I'm working on when it is told to stop will remove the listener, then close the channel it was using for updates. Looking at your code, this would appear to work fine until I realized that because you're sending the events to the channel within a goroutine, that what must be happening is the following:

  1. An event comes in and a goroutine is spawned to send the event to a channel.
    • The goroutine does not run at this time.
  2. My code gets a stop message.
  3. The listener is removed.
  4. The channel is closed.
  5. The goroutine in step one now runs and panics.

Now, I do understand why the code is designed this way: you don't want to block other listeners from receiving events if one channel is full which will force the goroutine to block. However, I'm thinking it may be best to not use a goroutine in this case and document that buffered channels should be used instead. Perhaps a function could be added to create the correct channel vs. relying on the user of the API to make sure it's buffered?

I understand this may be an atypical use case, but I wanted to report this bug so that it was known before this affects production systems.

@sybrandy
Copy link
Author

sybrandy commented Sep 21, 2016

Looking at the code a bit more, another option would be to have some recovery code in the goroutine that would report what happened, but not crash.

@timoreimann
Copy link
Collaborator

@sybrandy thanks for reporting the issue. I wouldn't even say that this represents an atypical use case necessarily. There are possibly a number of consumers out there that dynamically hook on and off to event listeners over a longer period of (run-)time, so the risk to run into the problem you're describing is a function of time as well. And personally, I don't feel very content with race conditions knowingly lingering in the code base, no matter how rarely they may surface. At the very least, it's bad publicity. :-)

I'd argue that recovering is probably not the best solution to the problem at hand as it mostly serves to conceal the bug we're talking about. A common pattern instead is to have a separate done channel that's being closed by the receiver and interpreted by the sender as a stop signal. I'll try to find some time tomorrow or early next week to verify whether this approach could work.

Again, thanks for bringing this to our attention!

@sybrandy
Copy link
Author

No problem. I agree with the Recovery solution. It's not my favourite, but I didn't want to discount it entirely.

@timoreimann
Copy link
Collaborator

I did some preliminary analysis. Here are my findings:

  1. The receiver (i.e., go-marathon consumer) should never close a channel. That's an anti-pattern because you inherently cannot solve the race condition involved. Closing is only for senders and not meant as a resource management primitive.
  2. Even if you stopped closing the channel, I think there would still be a problem because a goroutine could still push something into the channel while the receiver has already decided to stop pulling. With unbuffered channels, this would block the goroutine and cause us to leak memory.
  3. While go-marathon synchronizes both removal of listeners and iteration over the listeners during event handling via a common mutex, the latter doesn't necessarily cover the lifetime of the event-sending goroutine, so we can't guarantee that a removed event listener isn't still being used by one final goroutine.

At this point, I think we need a bit of communication, channel draining, or similar inside go-marathon in order to make sure we finish goroutines properly. On the consumer end though, calling RemoveEventsListener should be enough.

WDYT?

@sybrandy
Copy link
Author

You hit every nail on the head. When I stated using unbuffered channels, I meant that you wouldn't send events via a goroutine. That would prevent the leak entirely at the expense of making sure that your clients used buffered channels. In that case, then I was able to remove the subscription, drain the events, and then close the channel. So far, it's working well, but I'm not sure if that's the road you want to go down.

I believe you're on the right track. If go-marathon handled the creation/destruction of the channels itself, that would be best.

So, here's a potentially crazy and rough thought: what if the listeners map is a map of Listener where Listener looks similar to this:

type Listener struct {
    filter int
    wg sync.WaitGroup
}

Now, sending an event to the channel could look like this:

for channel, listener := range r.listeners {
    if event.ID&listener.filter != 0 {
        listener.wg.Add(1)
        go func(ch EventsChannel, e *Event, wg *sync.WaitGroup) {
            defer wg.Done()
            ch <- e
        }(channel, event, &listener.wg)
    }
}

Now, when you remove the subscription, you'll have it remove the listener from listeners, wait until all of the goroutines have finished, then close the channel. The reader should handle a closed channel properly.

Obviously, this will need to be tested. Also, while I do understand the use of goroutines, there's also a cost that I'm not sure is worth it. If you control the channel creation, you can ensure it is buffered to minimize the risk of blocking, thus potentially eliminating the use of a goroutine. Now you can't leak goroutines because you're not creating them and you don't incur the overhead of spawning them, which could be a concern in very busy systems.

Another thought is to perhaps have a event buffer that can be used to queue up events before they are sent to the channel. If that's part of the listener, each listener could have a reader that periodically checks for new messages in the buffer and pushes them onto a channel. A function would be used to put messages onto the buffer and if it's full, you could either drop the new message or drop the oldest message. It'll involve a lock, but that would prevent slow consumers from holding up other event listeners and prevent you from having to spawn a goroutine per event.

I'll leave it up to you to choose which direction to go. I personally would go with the simplest solution, but it really depends on the risk of slow/non-responsive consumers.

I hope this helps.

@timoreimann
Copy link
Collaborator

timoreimann commented Sep 24, 2016

First of all, thanks for exploring the solution space together with me and assessing the various possibilities. I really appreciate it!

I spent some time reading the Go blog post on pipelines and cancellation again. It took me a bit to realize that it's describing the exact problem we're experiencing. The canonical solution is to introduce a separate done channel per listener and select between it and the sending, unbuffered channel inside the goroutine. The done channel would be fed to by RemoveEventsListener and make the goroutine exit prematurely. One nice characteristic of this approach is that it rules out deadlocking (problem no. 2): Selecting between multiple channels (without a default case) blocks until one operation is guaranteed to proceed, which for us will either be the case when the receiver is still willing to receive (sending channel will become available) or has decided to terminate (done channel will become available).

Another advantage is that consumers don't need to drain the channel. While the goroutine should probably stop the sending channel on reception of the done signal to account for a usage model where consumption of the events and deleting the event listener are decoupled (i.e., the user for-loops the event channel and expects it to stop when there's nothing more to send for whatever reason), there's no need for users to drain a channel strictly.

All of this is probably more complex than asking users to maintain a buffered channel and eventually drain it. I'm still inclined to not follow this path though as it would add additional responsibilities on the client end and require users to pick an appropriate buffer size and/or drop events at some point. That's quite something if our primary intention is to "just" fix a race condition.

I yet need to try out my approach but think it should work. Any thoughts?

@timoreimann
Copy link
Collaborator

timoreimann commented Sep 24, 2016

I should mention that with the approach described above, we probably can't have the consumer pass a channel of his own to AddEventListener anymore since it could be buffered, and that defeats the selective purpose. The function would return an unbuffered channel for the consumer to read from instead.

That said, I don't think that passing in a buffered channel makes a lot of sense with the current implementation since we effectively achieve buffering through goroutines.

@sybrandy
Copy link
Author

Morning,

Apologies...my weekends are busy, so I'm just getting to this now.

First, everything you stated is correct in terms of using a done channel. Second, if possible, I'm all for ensuring we have the proper design vs. "just" fixing a race condition.

That being said, my current hangup with the original design is the spawning of a goroutine for each event. To me, it is not a good design that I think could cause problems in the future. For example, what if the consumer simply stops reading for the channel? Sends will block and the code will continue to create more goroutines. That's the primary reason I was leaning towards the use of buffered channels. While we could still end up in a similar situation, I would be less catastrophic as we can handle the blocked sends with, IIRC, a select block with a timeout. E.g.

select {
case eventChan <- event:
    fmt.Println("Event sent!")
case <-time.After(time.Second):
    fmt.Println("Event wasn't sent due to timeout sending the message.")
}

The code could then add some logic to remove the listener if it is deemed "dead" and continue to work. I have no idea what an appropriate timeout is, so that's something that needs figured out. Ultimately, I think regardless of what we do, the user of the library needs to be aware of what can happen if their consumer is slow so that they can handle it appropriately.

As for consumers draining the messages, I don't believe it's all that uncommon of a practice. Looking at what I patched in, yes, it could be cleaner, but I was being fast. (I'll be fixing that this morning now that I realize how ugly it is.) However, in most cases, this is handled by a loop with a select in it or a for loop over a range.

@timoreimann
Copy link
Collaborator

timoreimann commented Sep 26, 2016

No need to apologize for not working on the weekends. I'm the strange one here. ;-)

Thanks for your feedback. I fully agree that the solution at hand does not help bounding resource consumption in any regard as far as event handling is concerned. The goal of my PR is explicitly limited to fixing the race condition, however -- it should make things slightly better (stop the memory leaking) but not any worse. So I'd rather not conflate this problem with the risk of overflowing memory due to slow/halted event consumption. A solution to that problem probably involves comparing different approaches -- apart from channel buffering, there's also the possibility of creating and managing a pool of worker goroutines. We could even possibly integrate the timeout channel you have outlined in the current approach with the downside of not delivering events in stable order necessarily anymore. It's not exactly clear to me which route we'd want to go, so I think it should be up to a dedicated issue to discuss the matter if it's deemed important enough.

With regards to draining, the primary concern I see is that users will need to be aware of and educated on the circumstance that they do need to drain. Ignoring/forgetting to do so may lead to some hard-to-debug cases, making me lean towards doing the hard work in the library up front as opposed to loading it off onto the user. But again, that's really a decision only to be made once we decide that lack of resource constraining is a problem. (For my organization, it hasn't been since the rate by which Marathon produces events is several times slower than our consumption rate. But who am I, that's just me. 😃 )

Feel free to file a new issue if you think it should be addressed.

Thanks again!

@sybrandy
Copy link
Author

O.K. That's reasonable. When I get a chance, I'll file a new issue regarding the resource usage. Even here, I'm not seeing too much usage, but that probably won't always hold true and event then, it's probably a best practice to keep that under control just to make sure things to go haywire.

@sybrandy
Copy link
Author

I created #208 to discuss the resource usage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants