diff --git a/connector.go b/connector.go index 3529e4c..68cf81a 100644 --- a/connector.go +++ b/connector.go @@ -56,10 +56,13 @@ func (b *BaseConnector) Connect(client *Client, channels []*Channel) (error, err for _, channel := range channels { dataChannel := b.ensureChannel(channel) dataChannel.Clients.Store(client.ID, client) + client.Channels.Store(dataChannel.ID, dataChannel) defer func() { - client.Cleanup() + client.Channels.Delete(channel.ID) dataChannel.Clients.Delete(client.ID) + client.Cleanup() + count := 0 for _, cl := range dataChannel.GetClients() { if cl.Direction == ChannelDirectionInput || cl.Direction == ChannelDirectionInputOutput { @@ -75,7 +78,6 @@ func (b *BaseConnector) Connect(client *Client, channels []*Channel) (error, err b.Cleanup() }() - client.Channels.Store(dataChannel.ID, dataChannel) } var ( @@ -101,6 +103,7 @@ func (b *BaseConnector) Connect(client *Client, channels []*Channel) (error, err } if client.BlockWrite { + mainLoop: for { count := 0 for _, channel := range client.GetChannels() { @@ -112,14 +115,14 @@ func (b *BaseConnector) Connect(client *Client, channels []*Channel) (error, err } if count > 0 { - break + break mainLoop } select { + case <-client.Done: + break mainLoop case <-time.After(1 * time.Millisecond): continue - case <-client.Done: - break } } }