diff --git a/README.md b/README.md index 9bef2040..474f2735 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ We consider the connector **stable** despite the major version is currently 0. ## How it works -The connector uses the POST HTTP method to deliver records. +As default the connector uses the POST HTTP method to deliver records. The connector supports: - authorization (static, OAuth2); diff --git a/docs/sink-connector-config-options.rst b/docs/sink-connector-config-options.rst index baefe5e4..aa4c885a 100644 --- a/docs/sink-connector-config-options.rst +++ b/docs/sink-connector-config-options.rst @@ -12,6 +12,14 @@ Connection * Valid Values: HTTP(S) URL * Importance: high +``http.method`` + The HTTP Method to use when send the data. + + * Type: string + * Default: "POST" + * Valid Values: [POST, PUT] + * Importance: low + ``http.authorization.type`` The HTTP authorization type. diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java new file mode 100644 index 00000000..4172be0b --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Aiven Oy and http-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.config; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.stream.Collectors; + +public enum HttpMethodsType +{ + POST("POST"), + PUT("PUT"); + + public final String name; + + HttpMethodsType(final String name) { + this.name = name; + } + + public static HttpMethodsType forName(final String name) { + Objects.requireNonNull(name); + return Arrays.stream(values()) + .filter(v -> v.name.equalsIgnoreCase(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("HTTP Method type: " + name)); + } + + public static final Collection NAMES = + Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList()); +} diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index 8cd2657f..b1864fad 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -37,6 +37,7 @@ public class HttpSinkConfig extends AbstractConfig { private static final String CONNECTION_GROUP = "Connection"; private static final String HTTP_URL_CONFIG = "http.url"; + private static final String HTTP_METHOD = "http.method"; private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type"; private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization"; @@ -314,6 +315,41 @@ public String toString() { List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG) ); + + configDef.define( + HTTP_METHOD, + ConfigDef.Type.STRING, + "POST", + new ConfigDef.Validator() { + @Override + @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE") // Suppress the ConfigException with null value. + public void ensureValid(final String name, final Object value) { + if (value == null) { + throw new ConfigException(HTTP_METHOD, value); + } + assert value instanceof String; + final String valueStr = (String) value; + if (!HttpMethodsType.NAMES.contains(valueStr)) { + throw new ConfigException( + HTTP_METHOD, valueStr, + "supported values are: " + HttpMethodsType.NAMES); + } + } + + @Override + public String toString() { + return HttpMethodsType.NAMES.toString(); + } + }, + ConfigDef.Importance.LOW, + "The HTTP Method to use when send the data.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.SHORT, + HTTP_METHOD, + FixedSetRecommender.ofSupportedValues(HttpMethodsType.NAMES) + ); + } private static void addBatchingConfigGroup(final ConfigDef configDef) { @@ -548,6 +584,10 @@ public final URI httpUri() { return toURI(HTTP_URL_CONFIG); } + public final HttpMethodsType httpMethod() { + return HttpMethodsType.valueOf(getString(HTTP_METHOD)); + } + public final Long kafkaRetryBackoffMs() { return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG); } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java index 0fa5f520..b84463ec 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java @@ -48,12 +48,25 @@ protected AbstractHttpSender( } public final HttpResponse send(final String body) { - final var requestBuilder = - httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + final var requestBuilder = prepareRequest(body); return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER, config.maxRetries()); } + // seth http bethod based on config + private Builder prepareRequest(final String body) { + switch (config.httpMethod()) { + case POST: + return httpRequestBuilder + .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + case PUT: + return httpRequestBuilder + .build(config).PUT(HttpRequest.BodyPublishers.ofString(body)); + default: + throw new ConnectException("Unsupported HTTP method: " + config.httpMethod()); + } + } + /** * Sends an HTTP body using {@code httpSender}, respecting the configured retry policy. * diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index e666d4dc..9d72d155 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; @@ -88,7 +89,7 @@ void shouldBuildDefaultHttpRequest() throws Exception { .isPresent() .get(as(InstanceOfAssertFactories.DURATION)) .hasSeconds(config.httpTimeout()); - assertThat(httpRequest.method()).isEqualTo("POST"); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.POST.name()); assertThat(httpRequest .headers() @@ -101,6 +102,54 @@ void shouldBuildDefaultHttpRequest() throws Exception { } + @Test + void shouldBuildDefaultHttpPutRequest() throws Exception { + final var configBase = new HashMap<>(defaultConfig()); + configBase.put( "http.method", "PUT"); + + // Build the configuration + final HttpSinkConfig config = new HttpSinkConfig(configBase); + + // Mock the Http Client and Http Response + when(mockedClient.send(any(HttpRequest.class), any(BodyHandler.class))).thenReturn(mockedResponse); + + // Create a spy on the HttpSender implementation to capture methods parameters + final var httpSender = Mockito.spy(new DefaultHttpSender(config, mockedClient)); + + // Trigger the client + final List messages = List.of("some message"); + messages.forEach(httpSender::send); + + // Capture the RequestBuilder + final ArgumentCaptor defaultHttpRequestBuilder = ArgumentCaptor.forClass(HttpRequest.Builder.class); + verify(httpSender, atLeast(messages.size())).sendWithRetries(defaultHttpRequestBuilder.capture(), + any(HttpResponseHandler.class), anyInt()); + + // Retrieve the builders and rebuild the HttpRequests to check the HttpRequest proper configuration + defaultHttpRequestBuilder + .getAllValues() + .stream() + .map(Builder::build) + .forEach(httpRequest -> { + assertThat(httpRequest.uri()).isEqualTo(config.httpUri()); + assertThat(httpRequest.timeout()) + .isPresent() + .get(as(InstanceOfAssertFactories.DURATION)) + .hasSeconds(config.httpTimeout()); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.PUT.name()); + + assertThat(httpRequest + .headers() + .firstValue(HttpRequestBuilder.HEADER_CONTENT_TYPE)).isEmpty(); + }); + + // Check the messages have been sent once + messages.forEach( + message -> bodyPublishers.verify(() -> HttpRequest.BodyPublishers.ofString(eq(message)), times(1))); + + } + + @Test void shouldBuildCustomHttpRequest() throws Exception { final var configBase = new HashMap<>(defaultConfig());