Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/http_endpoint: allow endpoint to receive PUT and PATCH requests #36734

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ is collected by it.
- Add CEL partial value debug function. {pull}36652[36652]
- Added support for new features and removed partial save mechanism in the GCS input. {issue}35847[35847] {pull}36713[36713]
- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736]
- Allow http_endpoint input to receive PUT and PATCH requests. {pull}36734[36734]

*Auditbeat*

Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ This option defines the provider of the webhook that uses CRC (Challenge-Respons

The secret token provided by the webhook owner for the CRC validation. It is required when a `crc.provider` is set.

[float]
==== `method`

The HTTP method handled by the endpoint. If specified, `method` must be `POST`, `PUT` or `PATCH`.
The default method is `POST`. If `PUT` or `PATCH` are specified, requests using those method types are accepted, but are treated as `POST` requests and are expected to have a request body containing the request data.

[float]
=== Metrics

Expand Down
11 changes: 10 additions & 1 deletion x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/textproto"
"strings"

Expand All @@ -20,8 +21,9 @@ var crcProviders = map[string]func(string) *crcValidator{
"zoom": newZoomCRC,
}

// Config contains information about httpjson configuration
// Config contains information about http_endpoint configuration
type config struct {
Method string `config:"method"`
TLS *tlscommon.ServerConfig `config:"ssl"`
BasicAuth bool `config:"basic_auth"`
Username string `config:"username"`
Expand All @@ -47,6 +49,7 @@ type config struct {

func defaultConfig() config {
return config{
Method: http.MethodPost,
BasicAuth: false,
Username: "",
Password: "",
Expand All @@ -73,6 +76,12 @@ func (c *config) Validate() error {
return errors.New("response_body must be valid JSON")
}

switch c.Method {
case http.MethodPost, http.MethodPut, http.MethodPatch:
default:
return fmt.Errorf("method must be POST, PUT or PATCH: %s", c.Method)
}

if c.BasicAuth {
if c.Username == "" || c.Password == "" {
return errors.New("username and password required when basicauth is enabled")
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger, metrics *in
basicAuth: c.BasicAuth,
username: c.Username,
password: c.Password,
method: http.MethodPost,
method: c.Method,
contentType: c.ContentType,
secretHeader: c.SecretHeader,
secretValue: c.SecretValue,
Expand Down
69 changes: 68 additions & 1 deletion x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

var serverPoolTests = []struct {
name string
method string
cfgs []*httpEndpoint
events []target
want []mapstr.M
Expand Down Expand Up @@ -55,6 +56,60 @@
{"json": mapstr.M{"c": int64(3)}},
},
},
{
name: "put",
method: http.MethodPut,
cfgs: []*httpEndpoint{{
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPut,
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/",
Prefix: "json",
ContentType: "application/json",
},
}},
events: []target{
{url: "http://127.0.0.1:9001/", event: `{"a":1}`},
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
bhapas marked this conversation as resolved.
Show resolved Hide resolved
},
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
{"json": mapstr.M{"c": int64(3)}},
},
},
{
name: "patch",
method: http.MethodPatch,
cfgs: []*httpEndpoint{{
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPatch,
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/",
Prefix: "json",
ContentType: "application/json",
},
}},
events: []target{
{url: "http://127.0.0.1:9001/", event: `{"a":1}`},
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
{"json": mapstr.M{"c": int64(3)}},
},
},
{
name: "distinct_ports",
cfgs: []*httpEndpoint{
Expand Down Expand Up @@ -226,7 +281,7 @@
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub, metrics)
if err != http.ErrServerClosed {

Check failure on line 284 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
select {
case fails <- err:
default:
Expand All @@ -249,7 +304,7 @@
}
}
for i, e := range test.events {
resp, err := http.Post(e.url, "application/json", strings.NewReader(e.event))
resp, err := doRequest(test.method, e.url, "application/json", strings.NewReader(e.event))

Check failure on line 307 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

response body must be closed (bodyclose)
if err != nil {
t.Fatalf("failed to post event #%d: %v", i, err)
}
Expand Down Expand Up @@ -277,7 +332,7 @@
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub, metrics)
if err != nil && err != http.ErrServerClosed && test.wantErr == nil {

Check failure on line 335 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
t.Errorf("failed to re-register %v: %v", cfg.addr, err)
}
}()
Expand All @@ -288,6 +343,18 @@
}
}

func doRequest(method, url, contentType string, body io.Reader) (*http.Response, error) {
if method == "" {
method = http.MethodPost
}
req, err := http.NewRequest(method, url, body)

Check failure on line 350 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

should rewrite http.NewRequestWithContext or add (*Request).WithContext (noctx)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)
bhapas marked this conversation as resolved.
Show resolved Hide resolved
return http.DefaultClient.Do(req)
}

// Is is included to simplify testing, but is not exposed to avoid unwanted error
// matching outside tests.
func (e invalidTLSStateErr) Is(err error) bool {
Expand All @@ -312,6 +379,6 @@
func dump(r io.ReadCloser) []byte {
defer r.Close()
var buf bytes.Buffer
io.Copy(&buf, r)

Check failure on line 382 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `io.Copy` is not checked (errcheck)
return buf.Bytes()
}
Loading