From cd6a0d0acbfa3bfc6a196ce560f2a05f27b59a99 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Wed, 27 Dec 2023 10:42:04 -0800 Subject: [PATCH] Fixing issue when KeyJoinGroupAsyncNode hand only one input --- key_join_group_async.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/key_join_group_async.go b/key_join_group_async.go index df3715a..0769f2d 100644 --- a/key_join_group_async.go +++ b/key_join_group_async.go @@ -56,21 +56,20 @@ func (n *KeyJoinGroupAsyncNode[K, X, Z]) start(wf *Workflow) { for key := range updates { if x, ok := hits[key]; ok { x++ - if x >= len(n.Inputs) { - delete(hits, key) - mut.Lock() - val := n.Proc(key, store[key]) - delete(store, key) - mut.Unlock() - for i := range n.Outputs { - n.Outputs[i] <- KeyValue[K, Z]{key, val} - } - } else { - hits[key] = x - } + hits[key] = x } else { hits[key] = 1 } + if hits[key] >= len(n.Inputs) { + delete(hits, key) + mut.Lock() + val := n.Proc(key, store[key]) + delete(store, key) + mut.Unlock() + for i := range n.Outputs { + n.Outputs[i] <- KeyValue[K, Z]{key, val} + } + } } for i := range n.Outputs { close(n.Outputs[i])