Skip to content

Commit

Permalink
materialize-webhook: chunk transactions and refactor to use io.Pipe
Browse files Browse the repository at this point in the history
The connector now chunks transactions, breaking them into separate ~100
MiB or smaller requests. The connector has also been refactored to
leverage `io.Pipe` and stream documents directly into the webhook body.
Webhooks are processed in series in the order dictated by the
`StoreIterator`.

The rety mechanism has been removed in favor of letting the connector
fail & retry the webhook when it is automatically restarted.

The 100 MiB webhook size cutoff was arbitrarily set and can easily be
changed later as needed.
  • Loading branch information
Alex-Bair committed Nov 8, 2024
1 parent 8ecff11 commit cefc7d5
Showing 1 changed file with 99 additions and 68 deletions.
167 changes: 99 additions & 68 deletions materialize-webhook/driver.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

m "github.com/estuary/connectors/go/protocols/materialize"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
pf "github.com/estuary/flow/go/protocols/flow"
pm "github.com/estuary/flow/go/protocols/materialize"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// driver implements the pm.DriverServer interface.
type driver struct{}

type CustomHeader struct {
type customHeader struct {
Name string `json:"name,omitempty" jsonschema:"title=Name"`
Value string `json:"value,omitempty" jsonschema:"title=Value"`
}

func (h CustomHeader) Validate() error {
func (h customHeader) Validate() error {
if h.Name == "" {
return fmt.Errorf("header name must not be empty")
} else if h.Value == "" {
Expand All @@ -36,7 +35,7 @@ func (h CustomHeader) Validate() error {
}

type headers struct {
CustomHeaders []CustomHeader `json:"customHeaders,omitempty" jsonschema:"title=Custom Headers"`
CustomHeaders []customHeader `json:"customHeaders,omitempty" jsonschema:"title=Custom Headers"`
}

type config struct {
Expand Down Expand Up @@ -178,7 +177,7 @@ func (driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boiler

type transactor struct {
addresses []*url.URL
customHeaders []CustomHeader
customHeaders []customHeader
}

func (t *transactor) UnmarshalState(state json.RawMessage) error { return nil }
Expand All @@ -192,45 +191,23 @@ func (d *transactor) Load(it *m.LoadIterator, _ func(int, json.RawMessage) error
return nil
}

// Store invokes the Webhook URL, with a body containing StoreIterator documents.
// Store streams StoreIterator documents directly into the webhook body.
func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
var bodies = make([]bytes.Buffer, len(d.addresses))
var ctx = it.Context()
var pipeWriter *io.PipeWriter
group := errgroup.Group{}

// TODO(johnny): perform incremental POSTs rather than queuing a single call.
for it.Next() {
var b = &bodies[it.Binding]

if b.Len() != 0 {
b.WriteString(",\n")
} else {
b.WriteString("[\n")
}
if _, err := b.Write(it.RawJSON); err != nil {
return nil, err
}
}

for i := range bodies {
bodies[i].WriteString("\n]")
}

for i, address := range d.addresses {
var address = address.String()
var body = &bodies[i]
startWebhook := func(address string) {
var pipeReader *io.PipeReader
pipeReader, pipeWriter = io.Pipe()

for attempt := 0; true; attempt++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(backoff(attempt)):
// Fallthrough.
}
group.Go(func() error {
request, err := http.NewRequest("POST", address, pipeReader)

request, err := http.NewRequest("POST", address, bytes.NewReader(body.Bytes()))
if err != nil {
return nil, fmt.Errorf("http.NewRequest(%s): %w", address, err)
pipeReader.CloseWithError(err)
return fmt.Errorf("http.NewRequest(%s): %w", address, err)
}

request.Header.Add("Content-Type", "application/json")

for i := range d.customHeaders {
Expand All @@ -239,45 +216,99 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
}

response, err := http.DefaultClient.Do(request)
if err == nil {
err = response.Body.Close()

if err != nil {
pipeReader.CloseWithError(err)
return fmt.Errorf("sending webhook: %w", err)
}

if err := response.Body.Close(); err != nil {
pipeReader.CloseWithError(err)
return fmt.Errorf("response.Body.Close(): %w", err)
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
err := fmt.Errorf("unexpected webhook response code %d from %s", response.StatusCode, address)
pipeReader.CloseWithError(err)
return err
}

return nil
})
}

finishWebhook := func() error {
if pipeWriter == nil {
return nil
}

if _, err := pipeWriter.Write([]byte("]")); err != nil {
return fmt.Errorf("pipeWriter.Write(): %w", err)
}

if err := pipeWriter.Close(); err != nil {
return fmt.Errorf("pipeWriter.Close(): %w", err)
}

if err := group.Wait(); err != nil {
return fmt.Errorf("group.Wait(): %w", err)
}

pipeWriter = nil
return nil
}

const requestSizeCutoff = 1024 * 1024 // 1 MiB
byteCount := 0
var previousAddress string

for it.Next() {
address := d.addresses[it.Binding].String()

// The webhook for the previous binding must finish before the next binding's webhook starts.
if previousAddress != "" && address != previousAddress {
if err := finishWebhook(); err != nil {
return nil, fmt.Errorf("finishWebhooks: %w", err)
}
if err == nil && (response.StatusCode < 200 || response.StatusCode >= 300) {
err = fmt.Errorf("unexpected webhook response code %d from %s",
response.StatusCode, address)
}

previousAddress = address

if pipeWriter == nil {
startWebhook(address)
if _, err := pipeWriter.Write([]byte("[")); err != nil {
return nil, fmt.Errorf("pipeWriter.Write: %w", err)
}
} else if _, err := pipeWriter.Write([]byte(",")); err != nil {
return nil, fmt.Errorf("pipeWriter.Write: %w", err)
}

if _, err := pipeWriter.Write(it.RawJSON); err != nil {
return nil, fmt.Errorf("pipeWriter.Write: %w", err)
}
byteCount += len(it.RawJSON)

if err == nil {
body.Reset() // Reset for next use.
break
} else if attempt == 10 {
return nil, fmt.Errorf("webhook failed after many attempts: %w", err)
if byteCount >= requestSizeCutoff {
if err := finishWebhook(); err != nil {
return nil, fmt.Errorf("finishWebhook: %w", err)
}

log.WithFields(log.Fields{
"err": err,
"attempt": attempt,
"address": address,
}).Error("failed to invoke Webhook (will retry)")
byteCount = 0
}
}

if err := it.Err(); err != nil {
return nil, fmt.Errorf("store iterator error: %w", err)
}

if err := finishWebhook(); err != nil {
return nil, fmt.Errorf("finishWebhook: %w", err)
}

return nil, nil
}

// Destroy is a no-op.
func (d *transactor) Destroy() {}

func backoff(attempt int) time.Duration {
switch attempt {
case 0:
return 0
case 1:
return time.Millisecond * 100
case 2, 3, 4, 5, 6, 7, 8, 9, 10:
return time.Second * time.Duration(attempt-1)
default:
return 10 * time.Second
}
}

func main() { boilerplate.RunMain(new(driver)) }

0 comments on commit cefc7d5

Please sign in to comment.