-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchangeset.go
183 lines (153 loc) · 4.8 KB
/
changeset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package webapi
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/textproto"
"sync"
"github.com/google/uuid"
)
type ChangeSet struct {
client *Client
requests []httpRequest
}
// changeSetRequest is an auxiliary struct with prepared raw requests.
type changeSetRequest struct {
client *Client
requests []httpRequest
rawRequests []*http.Request
}
func NewChangeSet(client *Client) *ChangeSet {
return &ChangeSet{client: client}
}
func (s *ChangeSet) Add(request apiRequest) (contentID int) {
httpRequests := request.httpRequests()
if len(httpRequests) != 1 {
panic(errors.New("cannot add request to ChangeSet: only API requests wrapping one HTTP request are supported"))
}
s.requests = append(s.requests, httpRequests[0])
return len(s.requests) // 1,2,3 ...
}
func (s *ChangeSet) Do(ctx context.Context) error {
req, err := s.prepareRequest(ctx)
if err != nil {
return err
}
return req.Do(ctx)
}
func (s *ChangeSet) prepareRequest(ctx context.Context) (*changeSetRequest, error) {
req := &changeSetRequest{client: s.client, requests: s.requests}
// Prepare raw HTTP requests
for _, spec := range s.requests {
httpReq, err := spec.prepareRequest(ctx)
if err != nil {
return nil, err
}
// Content-ID references don't work in batch requests,
// if the return=representation preference is used.
httpReq.Header.Set("Prefer", "return=minimal")
req.rawRequests = append(req.rawRequests, httpReq)
}
return req, nil
}
func (r *changeSetRequest) Do(ctx context.Context) error {
req, err := prepareBatchRequest(r.client, r.prepareBatchRequest, r.processBatchResponse)
if err != nil {
return err
}
return req.Do(ctx)
}
func (r *changeSetRequest) prepareBatchRequest(batchWriter *multipart.Writer) (err error) {
// Create a batch part for the change set
changeSetBoundary := "changeset_" + uuid.New().String()
batchPart, err := batchWriter.CreatePart(textproto.MIMEHeader{"Content-Type": {"multipart/mixed; boundary=" + changeSetBoundary}})
if err != nil {
return fmt.Errorf("failed to create batch part: %w", err)
}
// Prepare the change set writer
changeSetWriter := multipart.NewWriter(batchPart)
err = changeSetWriter.SetBoundary(changeSetBoundary)
if err != nil {
return fmt.Errorf("failed to set multipart boundary: %w", err)
}
// Compose individual requests to the batch request
for i, req := range r.rawRequests {
if err := writePartToBatchRequest(i, req, changeSetWriter); err != nil {
return err
}
}
if err = changeSetWriter.Close(); err != nil {
return fmt.Errorf("failed to close multipart writer: %w", err)
}
return nil
}
func (r *changeSetRequest) processBatchResponse(ctx context.Context, batchReader *multipart.Reader) (err error) {
// Parse the batch response
changeSetPart, err := batchReader.NextPart()
if err != nil {
return fmt.Errorf("failed to get next part: %w", err)
}
defer func() {
if closeErr := changeSetPart.Close(); err == nil && closeErr != nil {
err = closeErr
}
}()
// Iterate over each change set response part
return processMultipartResponse(ctx, changeSetPart.Header, changeSetPart, r.processBatchResponseParts)
}
func (r *changeSetRequest) processBatchResponseParts(ctx context.Context, batchReader *multipart.Reader) (err error) {
var index int
var wg sync.WaitGroup
var lock sync.Mutex
var errs []error
for {
// Each response part is a response for an original request part
part, err := batchReader.NextPart()
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return fmt.Errorf("failed to get next part: %w", err)
}
// Read part body
partBody, err := io.ReadAll(part)
if err != nil {
return fmt.Errorf("failed to read response part '%d': %w", index+1, err)
}
// Close part body
err = part.Close()
if err != nil {
return fmt.Errorf("failed to close response part '%d': %w", index+1, err)
}
// Process part body as usual
wg.Add(1)
go func(index int) {
defer wg.Done()
if err := r.processBatchResponseOnePart(ctx, index, partBody); err != nil {
lock.Lock()
errs = append(errs, err)
lock.Unlock()
}
}(index)
index++
}
wg.Wait()
return errors.Join(errs...)
}
func (r *changeSetRequest) processBatchResponseOnePart(ctx context.Context, index int, partBody []byte) error {
// Convert part to the *http.Response
req := r.requests[index]
rawReq := r.rawRequests[index]
if resp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(partBody)), rawReq); err != nil {
return fmt.Errorf("failed to read response part '%d': %w", index+1, err)
} else if err = req.processResponse(ctx, resp); err != nil {
return fmt.Errorf("failed to process response part '%d': %w", index+1, err)
} else if err = resp.Body.Close(); err != nil {
return fmt.Errorf("failed to close response part '%d': %w", index+1, err)
}
return nil
}