This repository has been archived by the owner on Mar 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathproxy.go
146 lines (129 loc) · 4.14 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"time"
)
const proxyHeader = "X-Sequins-Proxied-To"
type proxyResponse struct {
resp *http.Response
peer string
err error
}
var (
errProxyTimeout = errors.New("all peers timed out")
errRequestCanceled = errors.New("client-side request canceled")
)
// proxy proxies the request, trying each peer that should have the key
// in turn. The total logical attempt will be capped at the configured proxy
// timeout, but individual peers will be tried in the order they are passed in,
// using the following algorithm:
// - Each interval of 'proxy_stage_timeout', starting immediately, a request
// is kicked off to one random not-yet-tried peer. All requests after
// the first run concurrently.
// - If a request finishes successfully to any peer, the result is returned
// immediately and the others are canceled, as long as the result was not
// an error.
// - If a request finishes, but resulted in an error, another is kicked off
// immediately to another random not-yet-tried peer.
// - This process continues until either all peers are being tried, in which
// case the code just waits for one to finish. If the total 'proxy_timeout'
// is hit at any point, the method returns immediately with an error and
// cancels any running requests.
func (vs *version) proxy(r *http.Request, peers []string) (*http.Response, string, error) {
responses := make(chan proxyResponse, len(peers))
totalTimeout := time.NewTimer(vs.sequins.config.Sharding.ProxyTimeout.Duration)
// Per the documentation for `http.Client.Do`, if that returns successfully,
// callers are required to the close the response Body, even if the request
// is cancelled mid-flight.
outstanding := 0
defer func() {
go func() {
for ; outstanding > 0; outstanding-- {
res := <-responses
if res.err == nil {
res.resp.Body.Close()
}
}
close(responses)
}()
}()
cancels := make(map[string]context.CancelFunc, len(peers))
for peerIndex := 0; ; peerIndex++ {
stageTimeout := time.NewTimer(vs.sequins.config.Sharding.ProxyStageTimeout.Duration)
if peerIndex < len(peers) {
peer := peers[peerIndex]
attemptCtx, cancelAttempt := context.WithCancel(r.Context())
req, err := vs.newProxyRequest(attemptCtx, r.URL.Path, peer)
if err != nil {
cancelAttempt()
log.Printf("Error initializing request to peer %s: %s", peer, err)
} else {
cancels[peer] = cancelAttempt
outstanding += 1
go vs.proxyAttempt(req, peer, responses)
}
} else if outstanding == 0 {
return nil, "", errNoAvailablePeers
}
select {
case res := <-responses:
outstanding -= 1
if res.err != nil {
log.Printf("Error proxying request to peer %s: %s", res.peer, res.err)
cancels[res.peer]()
} else {
// Cancel any other outstanding attempts.
for peer, cancelAttempt := range cancels {
if peer != res.peer {
cancelAttempt()
}
}
return res.resp, res.peer, nil
}
case <-totalTimeout.C:
for _, cancelAttempt := range cancels {
cancelAttempt()
}
return nil, "", errProxyTimeout
case <-r.Context().Done():
for _, cancelAttempt := range cancels {
cancelAttempt()
}
return nil, "", errRequestCanceled
case <-stageTimeout.C:
}
}
}
func (vs *version) proxyAttempt(proxyRequest *http.Request, peer string, res chan proxyResponse) {
resp, err := http.DefaultClient.Do(proxyRequest)
if err != nil {
res <- proxyResponse{nil, peer, err}
return
}
if resp.StatusCode != 200 && resp.StatusCode != 404 {
resp.Body.Close()
res <- proxyResponse{nil, peer, fmt.Errorf("got %d", resp.StatusCode)}
return
}
res <- proxyResponse{resp, peer, nil}
}
// newProxyRequest creates a fresh request, to avoid passing on baggage like
// 'Connection: close' headers.
func (vs *version) newProxyRequest(ctx context.Context, path, peer string) (*http.Request, error) {
url := &url.URL{
Scheme: "http",
Host: peer,
Path: path,
RawQuery: fmt.Sprintf("proxy=%s", vs.name),
}
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return req, err
}
return req.WithContext(ctx), nil
}