-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathfeed.go
195 lines (182 loc) · 5.85 KB
/
feed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package sparkypmtatracking
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/http"
"strconv"
"time"
"github.com/go-redis/redis"
"github.com/google/uuid"
)
// Make a SparkPost formatted unique event_id, which needs to be a decimal string 0 .. (2^63-1)
func uniqEventID() string {
u := uuid.New()
num := binary.LittleEndian.Uint64(u[:8]) & 0x7fffffffffffffff
return strconv.FormatUint(num, 10)
}
// makeSparkPostEvent takes a raw Redis queue entry and forms a SparkPostEvent structure
func makeSparkPostEvent(eStr string, client *redis.Client) (SparkPostEvent, error) {
var tev TrackEvent
var spEvent SparkPostEvent
if err := json.Unmarshal([]byte(eStr), &tev); err != nil {
return spEvent, err
}
// Shortcut pointer to the attribute-carrying leaf object; fill in received attributes
eptr := &spEvent.EventWrapper.EventGrouping
eptr.Type = ActionToType(tev.WD.Action)
eptr.TargetLinkURL = tev.WD.TargetLinkURL
eptr.MessageID = tev.WD.MessageID
eptr.TimeStamp = tev.TimeStamp
eptr.UserAgent = tev.UserAgent
eptr.IPAddress = tev.IPAddress
// Augment with PowerMTA accounting-pipe values, if we have these, from persistent storage
tKey := TrackingPrefix + tev.WD.MessageID
if augmentJSON, err := client.Get(tKey).Result(); err == redis.Nil {
log.Println("Warning: redis key", tKey, "not found, url=", tev.WD.TargetLinkURL)
} else {
augment := make(map[string]string)
err = json.Unmarshal([]byte(augmentJSON), &augment)
if err != nil {
return spEvent, err
}
eptr.RcptTo = augment["rcpt"]
eptr.SubaccountID = SafeStringToInt(augment["header_x-sp-subaccount-id"])
}
// Fill in these fields with default / unique / derived values
eptr.DelvMethod = "esmtp"
eptr.EventID = uniqEventID()
// Skip these fields for now; you may have information to populate them from your own sources
// eptr.GeoIP
return spEvent, nil
}
// SparkPostEventNDJSON formats a SparkPost event into NDJSON, augmenting with Redis data
func SparkPostEventNDJSON(eStr string, client *redis.Client) ([]byte, error) {
e, err := makeSparkPostEvent(eStr, client)
if err != nil {
return nil, err
}
eJSON, err := json.Marshal(e)
if err != nil {
return nil, err
}
eJSON = append(eJSON, byte('\n'))
return eJSON, nil
}
// SparkPostIngest POSTs a batch of ingestData to SparkPost Ingest API
func SparkPostIngest(ingestData []byte, client *redis.Client, host string, apiKey string) error {
var zbuf bytes.Buffer
zw := gzip.NewWriter(&zbuf)
_, err := zw.Write(ingestData)
if err != nil {
return err
}
err = zw.Close() // ensure all data written (seems to be necessary)
if err != nil {
return err
}
gzipSize := zbuf.Len()
// Prepare the https POST request. We have to supply a Reader for this, hence needing to realize the stream via zbuf
zr := bufio.NewReader(&zbuf)
var netClient = &http.Client{
Timeout: time.Second * 300,
}
url := host + "/api/v1/ingest/events"
req, err := http.NewRequest("POST", url, zr)
if err != nil {
return err
}
req.Header = map[string][]string{
"Authorization": {apiKey},
"Content-Type": {"application/x-ndjson"},
"Content-Encoding": {"gzip"},
}
res, err := netClient.Do(req)
if err != nil {
return err
}
// Response body is a Reader; read it into []byte
responseBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
var resObj IngestResult
err = json.Unmarshal(responseBody, &resObj)
if err != nil {
return err
}
if resObj.Errors != nil && len(resObj.Errors) > 0 {
errStr := resObj.Errors[0].Message
log.Printf("Uploaded %d bytes raw, %d bytes gzipped. SparkPost response: %s, errors[0]= %s\n",
len(ingestData), gzipSize, res.Status, errStr)
return errors.New(errStr)
}
if resObj.Results.ID != "" {
log.Printf("Uploaded %d bytes raw, %d bytes gzipped. SparkPost Ingest response: %s, results.id=%s\n",
len(ingestData), gzipSize, res.Status, resObj.Results.ID)
}
err = res.Body.Close()
return err
}
// TimedBuffer associates content with a time started and a maximum age it should be held for
type TimedBuffer struct {
Content []byte
TimeStarted time.Time
MaxAge time.Duration
}
// AgedContent returns true if the buffer has non-nil contents that are older than the specified maxAge
func (t *TimedBuffer) AgedContent() bool {
age := time.Since(t.TimeStarted)
return len(t.Content) > 0 && age >= t.MaxAge
}
// FeedEvents sends data arriving via Redis queue to SparkPost ingest API.
// Send a batch periodically, or every X MB, whichever comes first.
func FeedEvents(client *redis.Client, host string, apiKey string, maxAge time.Duration) error {
var tBuf TimedBuffer
tBuf.Content = make([]byte, 0, SparkPostIngestMaxPayload) // Pre-allocate for efficiency
tBuf.MaxAge = maxAge
for {
d, err := client.LPop(RedisQueue).Result()
if err == redis.Nil {
// Queue is now empty - send this batch if it's old enough, and return
if tBuf.AgedContent() {
return SparkPostIngest(tBuf.Content, client, host, apiKey)
}
time.Sleep(1 * time.Second) // polling wait time
continue
}
if err != nil {
return err
}
thisEvent, err := SparkPostEventNDJSON(d, client)
if err != nil {
return err
}
// If this event would make the content oversize, send what we already have
if len(tBuf.Content)+len(thisEvent) >= SparkPostIngestMaxPayload {
err = SparkPostIngest(tBuf.Content, client, host, apiKey)
if err != nil {
return err
}
tBuf.Content = tBuf.Content[:0] // empty the data, but keep capacity allocated
}
if len(tBuf.Content) == 0 {
// mark time of this event being placed into an empty buffer
tBuf.TimeStarted = time.Now()
}
tBuf.Content = append(tBuf.Content, thisEvent...)
}
}
// FeedForever processes events forever
func FeedForever(client *redis.Client, host string, apiKey string, maxAge time.Duration) {
for {
if err := FeedEvents(client, host, apiKey, maxAge); err != nil {
log.Println(err)
}
}
}