diff --git a/client/client.go b/client/client.go index 02fc37238..63e65054c 100644 --- a/client/client.go +++ b/client/client.go @@ -410,7 +410,6 @@ func (c *Client) Watch( } rch := make(chan WatchResponse) - stream, err := c.client.WatchDocument( withShardKey(ctx, c.options.APIKey, doc.Key().String()), &api.WatchDocumentRequest{ @@ -554,7 +553,10 @@ func handleResponse( if handler, ok := doc.BroadcastEventHandlers()[eventBody.Topic]; ok && handler != nil { err := handler(eventBody.Topic, resp.Event.Publisher, eventBody.Payload) if err != nil { - return nil, err + return &WatchResponse{ + Type: DocumentBroadcast, + Err: err, + }, nil } } return nil, nil diff --git a/pkg/document/document.go b/pkg/document/document.go index f368aa7c6..c1548eb1e 100644 --- a/pkg/document/document.go +++ b/pkg/document/document.go @@ -360,7 +360,7 @@ func (d *Document) RemoveOnlineClient(clientID string) { d.doc.RemoveOnlineClient(clientID) } -// Events returns the document events of this document. +// Events returns the events of this document. func (d *Document) Events() <-chan DocEvent { return d.events } @@ -375,7 +375,7 @@ func (d *Document) BroadcastResponses() chan error { return d.broadcastResponses } -// Broadcast encodes the payload and makes a "Broadcast" type request. +// Broadcast encodes the given payload and sends a Broadcast request. func (d *Document) Broadcast(topic string, payload any) error { marshaled, err := gojson.Marshal(payload) if err != nil { @@ -389,8 +389,8 @@ func (d *Document) Broadcast(topic string, payload any) error { return <-d.broadcastResponses } -// SubscribeBroadcastEvent subscribes to the registers an event handler and makes -// a "Subscribe" type request. +// SubscribeBroadcastEvent subscribes to the given topic and registers +// an event handler func (d *Document) SubscribeBroadcastEvent( topic string, handler func(topic, publisher string, payload []byte) error, @@ -398,15 +398,15 @@ func (d *Document) SubscribeBroadcastEvent( d.broadcastEventHandlers[topic] = handler } -// UnsubscribeBroadcastEvent deregisters the event handler and makes -// a "Unsubscribe" type request. +// UnsubscribeBroadcastEvent unsubscribes to the given topic and deregisters +// the event handler func (d *Document) UnsubscribeBroadcastEvent( topic string, ) { delete(d.broadcastEventHandlers, topic) } -// BroadcastEventHandlers returns registered event handlers for events. +// BroadcastEventHandlers returns the registered handlers for broadcast events. func (d *Document) BroadcastEventHandlers() map[string](func(topic string, publisher string, payload []byte) error) { return d.broadcastEventHandlers diff --git a/test/integration/document_test.go b/test/integration/document_test.go index af08222ae..84c49d66d 100644 --- a/test/integration/document_test.go +++ b/test/integration/document_test.go @@ -618,6 +618,9 @@ func TestDocument(t *testing.T) { ctx := context.Background() handler := func(topic, publisher string, payload []byte) error { + var mentionedBy string + assert.Equal(t, topic, "mention") + assert.NoError(t, gojson.Unmarshal(payload, &mentionedBy)) return ErrBroadcastEventHandlingError } @@ -631,7 +634,7 @@ func TestDocument(t *testing.T) { assert.NoError(t, c2.Attach(ctx, d2)) rch2, err := c2.Watch(ctx, d2) assert.NoError(t, err) - d1.SubscribeBroadcastEvent("mention", handler) + d2.SubscribeBroadcastEvent("mention", handler) err = d2.Broadcast("mention", "yorkie") assert.NoError(t, err) @@ -640,14 +643,22 @@ func TestDocument(t *testing.T) { wg.Add(1) go func() { defer wg.Done() + rcv := 0 for { select { case resp := <-rch1: if resp.Err != nil { + assert.Equal(t, resp.Type, client.DocumentBroadcast) assert.ErrorIs(t, resp.Err, ErrBroadcastEventHandlingError) - return + rcv++ } case <-rch2: + case <-time.After(1 * time.Second): + // Assuming that every subscriber can receive the broadcast + // event within this timeout period, check if every subscriber + // successfully receives the event. + assert.Equal(t, 1, rcv) + return case <-ctx.Done(): return }