diff --git a/key_join_group_async.go b/key_join_group_async.go index df3715a..0769f2d 100644 --- a/key_join_group_async.go +++ b/key_join_group_async.go @@ -56,21 +56,20 @@ func (n *KeyJoinGroupAsyncNode[K, X, Z]) start(wf *Workflow) { for key := range updates { if x, ok := hits[key]; ok { x++ - if x >= len(n.Inputs) { - delete(hits, key) - mut.Lock() - val := n.Proc(key, store[key]) - delete(store, key) - mut.Unlock() - for i := range n.Outputs { - n.Outputs[i] <- KeyValue[K, Z]{key, val} - } - } else { - hits[key] = x - } + hits[key] = x } else { hits[key] = 1 } + if hits[key] >= len(n.Inputs) { + delete(hits, key) + mut.Lock() + val := n.Proc(key, store[key]) + delete(store, key) + mut.Unlock() + for i := range n.Outputs { + n.Outputs[i] <- KeyValue[K, Z]{key, val} + } + } } for i := range n.Outputs { close(n.Outputs[i])