-
Notifications
You must be signed in to change notification settings - Fork 3
/
event_feed.go
94 lines (75 loc) · 1.61 KB
/
event_feed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package fauna
import (
"encoding/json"
)
// EventFeed represents an event feed subscription.
type EventFeed struct {
client *Client
source EventSource
decoder *json.Decoder
opts *feedOptions
lastCursor string
}
type feedOptions struct {
PageSize *int
Cursor *string
StartTS *int64
}
func newEventFeed(client *Client, source EventSource, opts *feedOptions) (*EventFeed, error) {
feed := &EventFeed{
client: client,
source: source,
opts: opts,
}
return feed, nil
}
func (ef *EventFeed) newFeedRequest() (*feedRequest, error) {
req := feedRequest{
apiRequest: apiRequest{
ef.client.ctx,
ef.client.headers,
},
Source: ef.source,
Cursor: ef.lastCursor,
}
if ef.opts.StartTS != nil {
req.StartTS = *ef.opts.StartTS
}
if ef.opts.Cursor != nil {
req.Cursor = *ef.opts.Cursor
}
if ef.opts.PageSize != nil {
req.PageSize = *ef.opts.PageSize
}
return &req, nil
}
func (ef *EventFeed) open() error {
req, err := ef.newFeedRequest()
if err != nil {
return err
}
byteStream, err := req.do(ef.client)
if err != nil {
return err
}
ef.decoder = json.NewDecoder(byteStream)
return nil
}
// FeedPage represents the response from [fauna.EventFeed.Next]
type FeedPage struct {
Events []Event `json:"events"`
Cursor string `json:"cursor"`
HasNext bool `json:"has_next"`
Stats Stats `json:"stats"`
}
// Next retrieves the next FeedPage from the [fauna.EventFeed]
func (ef *EventFeed) Next(page *FeedPage) error {
if err := ef.open(); err != nil {
return err
}
if err := ef.decoder.Decode(&page); err != nil {
return err
}
ef.lastCursor = page.Cursor
return nil
}