Skip to content

Commit

Permalink
Introduce Broadcast API (#631)
Browse files Browse the repository at this point in the history
This commit introduces a "broadcast" feature for sharing general
events. Key implementations include the Document.Broadcast API,
SDK enhancements, and server-side additions.

Test cases cover broadcasting for unsubscribed events, payload
serialization to JSON, and document watching during broadcast errors.

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
sejongk and hackerwins authored Oct 2, 2023
1 parent e5ee72c commit 7c3bc6f
Show file tree
Hide file tree
Showing 16 changed files with 1,569 additions and 292 deletions.
2 changes: 2 additions & 0 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func FromEventType(pbDocEventType api.DocEventType) (types.DocEventType, error)
return types.DocumentWatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED:
return types.DocumentUnwatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST:
return types.DocumentBroadcastEvent, nil
}
return "", fmt.Errorf("%v: %w", pbDocEventType, ErrUnsupportedEventType)
}
Expand Down
2 changes: 2 additions & 0 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func ToDocEventType(eventType types.DocEventType) (api.DocEventType, error) {
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED, nil
case types.DocumentUnwatchedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED, nil
case types.DocumentBroadcastEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST, nil
default:
return 0, fmt.Errorf("%s: %w", eventType, ErrUnsupportedEventType)
}
Expand Down
2 changes: 2 additions & 0 deletions api/types/auth_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
RemoveDocument Method = "RemoveDocument"
PushPull Method = "PushPull"
WatchDocuments Method = "WatchDocuments"
Broadcast Method = "Broadcast"
)

// IsAuthMethod returns whether the given method can be used for authorization.
Expand All @@ -79,6 +80,7 @@ func AuthMethods() []Method {
RemoveDocument,
PushPull,
WatchDocuments,
Broadcast,
}
}

Expand Down
10 changes: 10 additions & 0 deletions api/types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,14 @@ const (
// DocumentUnwatchedEvent is an event that occurs when document is
// unwatched by other clients.
DocumentUnwatchedEvent DocEventType = "document-unwatched"

// DocumentBroadcastEvent is an event that occurs when a payload is broadcasted
// on a specific topic.
DocumentBroadcastEvent DocEventType = "document-broadcast"
)

// DocEventBody includes additional data specific to the DocEvent.
type DocEventBody struct {
Topic string
Payload []byte
}
643 changes: 472 additions & 171 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,16 @@ enum DocEventType {
DOC_EVENT_TYPE_DOCUMENT_CHANGED = 0;
DOC_EVENT_TYPE_DOCUMENT_WATCHED = 1;
DOC_EVENT_TYPE_DOCUMENT_UNWATCHED = 2;
DOC_EVENT_TYPE_DOCUMENT_BROADCAST = 3;
}

message DocEventBody {
string topic = 1;
bytes payload = 2;
}

message DocEvent {
DocEventType type = 1;
string publisher = 2;
DocEventBody body = 3;
}
590 changes: 548 additions & 42 deletions api/yorkie/v1/yorkie.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions api/yorkie/v1/yorkie.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ service YorkieService {
rpc PushPullChanges (PushPullChangesRequest) returns (PushPullChangesResponse) {}

rpc WatchDocument (WatchDocumentRequest) returns (stream WatchDocumentResponse) {}

rpc Broadcast (BroadcastRequest) returns (BroadcastResponse) {}
}

message ActivateClientRequest {
Expand Down Expand Up @@ -109,3 +111,13 @@ message PushPullChangesRequest {
message PushPullChangesResponse {
ChangePack change_pack = 1;
}

message BroadcastRequest {
string client_id = 1;
string document_id = 2;
string topic = 3;
bytes payload = 4;
}

message BroadcastResponse {
}
177 changes: 116 additions & 61 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ const (
DocumentWatched WatchResponseType = "document-watched"
DocumentUnwatched WatchResponseType = "document-unwatched"
PresenceChanged WatchResponseType = "presence-changed"
DocumentBroadcast WatchResponseType = "document-broadcast"
)

// WatchResponse is a structure representing response of Watch.
Expand Down Expand Up @@ -420,69 +421,11 @@ func (c *Client) Watch(
return nil, err
}

handleResponse := func(pbResp *api.WatchDocumentResponse) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
var clientIDs []string
for _, clientID := range resp.Initialization.ClientIds {
id, err := time.ActorIDFromHex(clientID)
if err != nil {
return nil, err
}
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_Event:
eventType, err := converter.FromEventType(resp.Event.Type)
if err != nil {
return nil, err
}

cli, err := time.ActorIDFromHex(resp.Event.Publisher)
if err != nil {
return nil, err
}

switch eventType {
case types.DocumentChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
cli.String(): p,
},
}, nil
}
}
return nil, ErrUnsupportedWatchResponseType
}

pbResp, err := stream.Recv()
if err != nil {
return nil, err
}
if _, err := handleResponse(pbResp); err != nil {
if _, err := handleResponse(pbResp, doc); err != nil {
return nil, err
}

Expand All @@ -494,7 +437,7 @@ func (c *Client) Watch(
close(rch)
return
}
resp, err := handleResponse(pbResp)
resp, err := handleResponse(pbResp, doc)
if err != nil {
rch <- WatchResponse{Err: err}
close(rch)
Expand Down Expand Up @@ -534,10 +477,96 @@ func (c *Client) Watch(
}
}()

go func() {
for {
select {
case r := <-doc.BroadcastRequests():
doc.BroadcastResponses() <- c.broadcast(ctx, doc, r.Topic, r.Payload)
case <-ctx.Done():
return
}
}
}()

return rch, nil
}

func (c *Client) findDocKey(docID string) (key.Key, error) {
func handleResponse(
pbResp *api.WatchDocumentResponse,
doc *document.Document,
) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
var clientIDs []string
for _, clientID := range resp.Initialization.ClientIds {
id, err := time.ActorIDFromHex(clientID)
if err != nil {
return nil, err
}
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_Event:
eventType, err := converter.FromEventType(resp.Event.Type)
if err != nil {
return nil, err
}

cli, err := time.ActorIDFromHex(resp.Event.Publisher)
if err != nil {
return nil, err
}

switch eventType {
case types.DocumentChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
cli.String(): p,
},
}, nil
case types.DocumentBroadcastEvent:
eventBody := resp.Event.Body
// If the handler exists, it means that the broadcast topic has been subscribed to.
if handler, ok := doc.BroadcastEventHandlers()[eventBody.Topic]; ok && handler != nil {
err := handler(eventBody.Topic, resp.Event.Publisher, eventBody.Payload)
if err != nil {
return &WatchResponse{
Type: DocumentBroadcast,
Err: err,
}, nil
}
}
return nil, nil
}
}
return nil, ErrUnsupportedWatchResponseType
}

// FindDocKey returns the document key of the given document id.
func (c *Client) FindDocKey(docID string) (key.Key, error) {
for _, attachment := range c.attachments {
if attachment.docID.String() == docID {
return attachment.doc.Key(), nil
Expand Down Expand Up @@ -650,6 +679,32 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error {
return nil
}

func (c *Client) broadcast(ctx context.Context, doc *document.Document, topic string, payload []byte) error {
if c.status != activated {
return ErrClientNotActivated
}

attachment, ok := c.attachments[doc.Key()]
if !ok {
return ErrDocumentNotAttached
}

_, err := c.client.Broadcast(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.BroadcastRequest{
ClientId: c.id.String(),
DocumentId: attachment.docID.String(),
Topic: topic,
Payload: payload,
},
)
if err != nil {
return err
}

return nil
}

/**
* withShardKey returns a context with the given shard key in metadata.
*/
Expand Down
Loading

0 comments on commit 7c3bc6f

Please sign in to comment.