Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Broadcast API #631

Merged
merged 25 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func FromEventType(pbDocEventType api.DocEventType) (types.DocEventType, error)
return types.DocumentWatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED:
return types.DocumentUnwatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST:
return types.DocumentBroadcastEvent, nil
}
return "", fmt.Errorf("%v: %w", pbDocEventType, ErrUnsupportedEventType)
}
Expand Down
2 changes: 2 additions & 0 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func ToDocEventType(eventType types.DocEventType) (api.DocEventType, error) {
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED, nil
case types.DocumentUnwatchedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED, nil
case types.DocumentBroadcastEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST, nil
default:
return 0, fmt.Errorf("%s: %w", eventType, ErrUnsupportedEventType)
}
Expand Down
2 changes: 2 additions & 0 deletions api/types/auth_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
RemoveDocument Method = "RemoveDocument"
PushPull Method = "PushPull"
WatchDocuments Method = "WatchDocuments"
Broadcast Method = "Broadcast"
)

// IsAuthMethod returns whether the given method can be used for authorization.
Expand All @@ -79,6 +80,7 @@ func AuthMethods() []Method {
RemoveDocument,
PushPull,
WatchDocuments,
Broadcast,
}
}

Expand Down
10 changes: 10 additions & 0 deletions api/types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,14 @@ const (
// DocumentUnwatchedEvent is an event that occurs when document is
// unwatched by other clients.
DocumentUnwatchedEvent DocEventType = "document-unwatched"

// DocumentBroadcastEvent is an event that occurs when a payload is broadcasted
// on a specific topic.
DocumentBroadcastEvent DocEventType = "document-broadcast"
)

// DocEventBody includes additional data specific to the DocEvent.
type DocEventBody struct {
Topic string
Payload []byte
}
643 changes: 472 additions & 171 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,16 @@ enum DocEventType {
DOC_EVENT_TYPE_DOCUMENT_CHANGED = 0;
DOC_EVENT_TYPE_DOCUMENT_WATCHED = 1;
DOC_EVENT_TYPE_DOCUMENT_UNWATCHED = 2;
DOC_EVENT_TYPE_DOCUMENT_BROADCAST = 3;
}

message DocEventBody {
string topic = 1;
bytes payload = 2;
}

message DocEvent {
DocEventType type = 1;
string publisher = 2;
DocEventBody body = 3;
}
590 changes: 548 additions & 42 deletions api/yorkie/v1/yorkie.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions api/yorkie/v1/yorkie.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ service YorkieService {
rpc PushPullChanges (PushPullChangesRequest) returns (PushPullChangesResponse) {}

rpc WatchDocument (WatchDocumentRequest) returns (stream WatchDocumentResponse) {}

rpc Broadcast (BroadcastRequest) returns (BroadcastResponse) {}
}

message ActivateClientRequest {
Expand Down Expand Up @@ -109,3 +111,13 @@ message PushPullChangesRequest {
message PushPullChangesResponse {
ChangePack change_pack = 1;
}

message BroadcastRequest {
string client_id = 1;
string document_id = 2;
string topic = 3;
bytes payload = 4;
}

message BroadcastResponse {
}
177 changes: 116 additions & 61 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ const (
DocumentWatched WatchResponseType = "document-watched"
DocumentUnwatched WatchResponseType = "document-unwatched"
PresenceChanged WatchResponseType = "presence-changed"
DocumentBroadcast WatchResponseType = "document-broadcast"
)

// WatchResponse is a structure representing response of Watch.
Expand Down Expand Up @@ -420,69 +421,11 @@ func (c *Client) Watch(
return nil, err
}

handleResponse := func(pbResp *api.WatchDocumentResponse) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
var clientIDs []string
for _, clientID := range resp.Initialization.ClientIds {
id, err := time.ActorIDFromHex(clientID)
if err != nil {
return nil, err
}
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_Event:
eventType, err := converter.FromEventType(resp.Event.Type)
if err != nil {
return nil, err
}

cli, err := time.ActorIDFromHex(resp.Event.Publisher)
if err != nil {
return nil, err
}

switch eventType {
case types.DocumentChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
cli.String(): p,
},
}, nil
}
}
return nil, ErrUnsupportedWatchResponseType
}

pbResp, err := stream.Recv()
if err != nil {
return nil, err
}
if _, err := handleResponse(pbResp); err != nil {
if _, err := handleResponse(pbResp, doc); err != nil {
return nil, err
}

Expand All @@ -494,7 +437,7 @@ func (c *Client) Watch(
close(rch)
return
}
resp, err := handleResponse(pbResp)
resp, err := handleResponse(pbResp, doc)
if err != nil {
rch <- WatchResponse{Err: err}
close(rch)
Expand Down Expand Up @@ -534,10 +477,96 @@ func (c *Client) Watch(
}
}()

go func() {
for {
select {
case r := <-doc.BroadcastRequests():
doc.BroadcastResponses() <- c.broadcast(ctx, doc, r.Topic, r.Payload)
case <-ctx.Done():
return
}
}
}()

return rch, nil
}

func (c *Client) findDocKey(docID string) (key.Key, error) {
func handleResponse(
pbResp *api.WatchDocumentResponse,
doc *document.Document,
) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
var clientIDs []string
for _, clientID := range resp.Initialization.ClientIds {
id, err := time.ActorIDFromHex(clientID)
if err != nil {
return nil, err
}
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_Event:
eventType, err := converter.FromEventType(resp.Event.Type)
if err != nil {
return nil, err
}

cli, err := time.ActorIDFromHex(resp.Event.Publisher)
if err != nil {
return nil, err
}

switch eventType {
case types.DocumentChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
cli.String(): p,
},
}, nil
case types.DocumentBroadcastEvent:
eventBody := resp.Event.Body
// If the handler exists, it means that the broadcast topic has been subscribed to.
if handler, ok := doc.BroadcastEventHandlers()[eventBody.Topic]; ok && handler != nil {
err := handler(eventBody.Topic, resp.Event.Publisher, eventBody.Payload)
if err != nil {
return &WatchResponse{
Type: DocumentBroadcast,
Err: err,
}, nil
}
}
return nil, nil
}
}
return nil, ErrUnsupportedWatchResponseType
}

// FindDocKey returns the document key of the given document id.
func (c *Client) FindDocKey(docID string) (key.Key, error) {
for _, attachment := range c.attachments {
if attachment.docID.String() == docID {
return attachment.doc.Key(), nil
Expand Down Expand Up @@ -650,6 +679,32 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error {
return nil
}

func (c *Client) broadcast(ctx context.Context, doc *document.Document, topic string, payload []byte) error {
if c.status != activated {
return ErrClientNotActivated
}

attachment, ok := c.attachments[doc.Key()]
if !ok {
return ErrDocumentNotAttached
}

_, err := c.client.Broadcast(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.BroadcastRequest{
ClientId: c.id.String(),
DocumentId: attachment.docID.String(),
Topic: topic,
Payload: payload,
},
)
if err != nil {
return err
}

return nil
}

/**
* withShardKey returns a context with the given shard key in metadata.
*/
Expand Down
Loading