-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathbridge.go
executable file
·416 lines (370 loc) · 11.4 KB
/
bridge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
package bridges
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-lambda-go/lambda"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"gopkg.in/guregu/null.v3"
"io/ioutil"
"net/http"
"os"
"time"
)
const (
AuthParam = "param"
AuthHeader = "header"
)
// Opts is the options for each bridge
type Opts struct {
Name string `json:"name"`
Path string `json:"path"`
Lambda bool `json:"Lambda"`
}
// Result represents a Chainlink JobRun
type Result struct {
JobRunID string `json:"jobRunId"`
ID string `json:"id,omitempty"`
TaskRunID string `json:"taskRunId,omitempty"`
Status string `json:"status"`
Error null.String `json:"error"`
Pending bool `json:"pending"`
Data *JSON `json:"data"`
}
// Based on https://github.com/smartcontractkit/chainlink/blob/master/core/store/models/common.go#L128
type JSON struct {
gjson.Result
}
// ParseInterface attempts to coerce the input interface
// and parse it into a JSON object.
func ParseInterface(obj interface{}) (*JSON, error) {
b, err := json.Marshal(obj)
if err != nil {
return nil, err
}
return Parse(b)
}
// ParseInterface parse a bytes slice into a JSON object.
func Parse(b []byte) (*JSON, error) {
var j JSON
str := string(b)
if len(str) == 0 {
str = `{}`
}
err := json.Unmarshal([]byte(str), &j)
return &j, err
}
// UnmarshalJSON parses the JSON bytes and stores in the *JSON pointer.
func (j *JSON) UnmarshalJSON(b []byte) error {
str := string(b)
if !gjson.Valid(str) {
return fmt.Errorf("invalid JSON: %v", str)
}
*j = JSON{gjson.Parse(str)}
return nil
}
// MarshalJSON returns the JSON data if it already exists, returns
// an empty JSON object as bytes if not.
func (j *JSON) MarshalJSON() ([]byte, error) {
if j.Exists() {
return []byte(j.String()), nil
}
return []byte("{}"), nil
}
// SetCompleted marks a result as errored
func (r *Result) SetErrored(err error) {
r.Status = "errored"
r.Error = null.StringFrom(err.Error())
}
// SetCompleted marks a result as completed
func (r *Result) SetCompleted() {
r.Status = "completed"
}
// SetJobRunID sets the request's ID to the result's Job Run ID.
// If "jobRunId" is supplied in the request, use that for the response.
func (r *Result) SetJobRunID() {
if len(r.JobRunID) == 0 {
r.JobRunID = r.ID
}
}
// Bridge is the interface that can be implemented for custom Chainlink bridges
type Bridge interface {
Opts() *Opts
Run(h *Helper) (interface{}, error)
}
// Server holds pointers to the bridges indexed by their paths
// and the bridge to be mounted in Lambda.
type Server struct {
pathMap map[string]Bridge
ldaBridge Bridge
}
// NewServer returns a new Server with the bridges
// in a map indexed by its path.
// Once returned, the server can be started to listen
// for any new requests.
//
// If a bridge is passed in that has a duplicate path
// then the last one with that path will be mounted.
//
// Any bridge with an empty path gets assigned "/" to avoid
// panics on start.
func NewServer(bridges ...Bridge) *Server {
pm := make(map[string]Bridge)
var lda Bridge
for _, b := range bridges {
var p string
c := b.Opts()
if len(c.Path) == 0 {
p = "/"
} else {
p = c.Path
}
pm[p] = b
if c.Lambda && lda == nil {
lda = b
}
}
return &Server{
pathMap: pm,
ldaBridge: lda,
}
}
// Start the bridge server. Routing on how the server is started is determined which
// platform is specified by the end user. Currently supporting:
// - Inbuilt http (default)
// - AWS Lambda (env LAMBDA=1)
//
// Port only has to be passed in if the inbuilt HTTP server is being used.
//
// If the inbuilt http server is being used, bridges can specify many external adaptors
// as long if exclusive paths are given.
//
// If multiple adaptors are included with Lambda/gcp enabled, then the first bridge that
// has it enabled will be given as the Handler.
func (s *Server) Start(port int) {
if len(os.Getenv("LAMBDA")) > 0 {
lambda.Start(s.Lambda)
} else {
logrus.WithField("port", port).Info("Starting the bridge server")
logrus.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Mux()))
}
}
// HTTPMux returns the http.Handler for the go http multiplexer, registering all the bridges paths
// with the handler
func (s *Server) Mux() http.Handler {
mux := http.NewServeMux()
for p, b := range s.pathMap {
logrus.WithField("path", p).WithField("bridge", b.Opts().Name).Info("Registering bridge")
mux.HandleFunc(p, s.Handler)
}
return mux
}
// Hander is of http.Handler type, receiving any inbound requests from the HTTP server
// when the bridge is ran local
func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
var rt Result
start := time.Now()
cc := make(chan int, 1)
defer func() {
code := <-cc
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(&rt); err != nil {
logrus.Errorf("Failed to encode response: %v", err)
}
s.logRequest(r, code, start)
}()
if r.Method != http.MethodPost {
cc <- http.StatusBadRequest
rt.SetErrored(errors.New("Invalid request"))
return
}
if b, err := ioutil.ReadAll(r.Body); err != nil {
cc <- http.StatusInternalServerError
rt.SetErrored(err)
return
} else if err = json.Unmarshal(b, &rt); err != nil {
cc <- http.StatusBadRequest
rt.SetErrored(err)
return
}
rt.SetJobRunID()
if b, ok := s.pathMap[s.path(r)]; !ok {
cc <- http.StatusBadRequest
rt.SetErrored(errors.New("Invalid path"))
} else if obj, err := b.Run(NewHelper(rt.Data)); err != nil {
cc <- http.StatusInternalServerError
rt.SetErrored(err)
} else if data, err := ParseInterface(obj); err != nil {
cc <- http.StatusInternalServerError
rt.SetErrored(err)
} else {
rt.Data = data
rt.SetCompleted()
cc <- http.StatusOK
}
}
func (s *Server) Lambda(r *Result) (interface{}, error) {
r.SetJobRunID()
if obj, err := s.ldaBridge.Run(NewHelper(r.Data)); err != nil {
r.SetErrored(err)
} else if data, err := ParseInterface(obj); err != nil {
r.SetErrored(err)
} else {
r.SetCompleted()
r.Data = data
}
return r, nil
}
func (s *Server) logRequest(r *http.Request, code int, start time.Time) {
end := time.Now()
logrus.WithFields(logrus.Fields{
"method": r.Method,
"code": code,
"path": r.URL.Path,
"clientIP": r.RemoteAddr,
"servedAt": end.Format("2006/01/02 - 15:04:05"),
"latency": fmt.Sprintf("%v", end.Sub(start)),
}).Info("Bridge request")
}
// Transformative logic to prepare the path, as if it's empty, it needs
// setting to the root "/" path
func (s *Server) path(r *http.Request) string {
path := r.URL.Path
if len(path) == 0 {
path = "/"
}
return path
}
// Helper is given to the receiving bridge to use on run, giving the
// bridge the visibility to the input parameters from the node request
// and having simple functions for making http calls.
type Helper struct {
Data *JSON
httpClient http.Client
}
func NewHelper(data *JSON) *Helper {
return &Helper{Data: data, httpClient: http.Client{}}
}
// GetIntParam gets the string value of a key in the `data` JSON object that is
// given on request by the Chainlink node
func (h *Helper) GetParam(key string) string {
return h.Data.Get(key).String()
}
// GetIntParam gets the int64 value of a key in the `data` JSON object that is
// given on request by the Chainlink node
func (h *Helper) GetIntParam(key string) int64 {
return h.Data.Get(key).Int()
}
// CallOpts are the options given into a http call method
type CallOpts struct {
Auth Auth `json:"-"`
Query map[string]interface{} `json:"query"`
QueryPassthrough bool `json:"queryPassthrough"`
Body string `json:"body"`
ExpectedCode int `json:"expectedCode"`
}
// HTTPCall performs a basic http call with no options
func (h *Helper) HTTPCall(method, url string, obj interface{}) error {
return h.HTTPCallWithContext(context.Background(), method, url, obj)
}
func (h *Helper) HTTPCallWithContext(ctx context.Context, method, url string, obj interface{}) error {
return h.HTTPCallWithOptsWithContext(ctx, method, url, obj, CallOpts{})
}
// HTTPCallWithOpts mirrors HTTPCallRawWithOpts bar the returning byte body is unmarshalled into
// a given object pointer
func (h *Helper) HTTPCallWithOpts(method, url string, obj interface{}, opts CallOpts) error {
return h.HTTPCallWithOptsWithContext(context.Background(), method, url, obj, opts)
}
func (h *Helper) HTTPCallWithOptsWithContext(ctx context.Context, method, url string, obj interface{}, opts CallOpts) error {
if b, err := h.HTTPCallRawWithOptsWithContext(ctx, method, url, opts); err != nil {
return err
} else if err := json.Unmarshal(b, obj); err != nil {
return err
}
return nil
}
// HTTPCallRawWithOpts performs a HTTP call with any method and returns the raw byte body and any error
// Supported options:
// - Authentication methods for the API (query param, headers)
// - Query parameters via `opts.Query`
// - Passthrough through all json keys within the request `data` object via `opts.QueryPassthrough`
// - Pass in a body to send with the request via `opts.Body`
// - Send in post form kv via `opts.PostForm`
// - Return an error if the returning http status code is different to `opts.ExpectedCode`
func (h *Helper) HTTPCallRawWithOpts(method, url string, opts CallOpts) ([]byte, error) {
return h.HTTPCallRawWithOptsWithContext(context.Background(), method, url, opts)
}
func (h *Helper) HTTPCallRawWithOptsWithContext(ctx context.Context, method, url string, opts CallOpts) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader([]byte(opts.Body)))
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")
q := req.URL.Query()
if opts.QueryPassthrough {
for k, v := range h.Data.Map() {
q.Add(k, fmt.Sprintf("%s", v))
}
} else {
for k, v := range opts.Query {
q.Add(k, fmt.Sprintf("%s", v))
}
}
req.URL.RawQuery = q.Encode()
if opts.Auth != nil {
opts.Auth.Authenticate(req)
}
if resp, err := h.httpClient.Do(req); err != nil {
return nil, err
} else if b, err := ioutil.ReadAll(resp.Body); err != nil {
return nil, err
} else if (opts.ExpectedCode != 0 && resp.StatusCode != opts.ExpectedCode) ||
opts.ExpectedCode == 0 && resp.StatusCode != 200 {
return nil, fmt.Errorf("Unexpected api status code: %d", resp.StatusCode)
} else {
return b, nil
}
}
// Auth is the generic interface for how the client passes in their
// API key for authentication
type Auth interface {
Authenticate(*http.Request)
}
// NewAuth returns a pointer of an Auth implementation based on the
// type that was passed in
func NewAuth(authType string, key string, value string) Auth {
var a Auth
switch authType {
case AuthParam:
a = &Param{Key: key, Value: value}
break
case AuthHeader:
a = &Header{Key: key, Value: value}
}
return a
}
// Query is the Auth implementation that requires GET param set
type Param struct {
Key string
Value string
}
// Authenticate takes the `apikey` in the GET param and then authenticates it
// with the KeyManager
func (p *Param) Authenticate(r *http.Request) {
q := r.URL.Query()
q.Add(p.Key, p.Value)
r.URL.RawQuery = q.Encode()
}
// Header is the Auth implementation that requires a header to be set
type Header struct {
Key string
Value string
}
// Authenticate takes the key and value given and sets it as a header
func (p *Header) Authenticate(r *http.Request) {
r.Header.Add(p.Key, p.Value)
}