Skip to content

Commit

Permalink
Add Http Method configuration. Tto be used for sending data to target…
Browse files Browse the repository at this point in the history
… endpoint, Defaulted on POST Method. Supported POST/PUT
  • Loading branch information
Eugeniu committed Oct 5, 2023
1 parent 5100d59 commit b9ef784
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ We consider the connector **stable** despite the major version is currently 0.

## How it works

The connector uses the POST HTTP method to deliver records.
As default the connector uses the POST HTTP method to deliver records.

The connector supports:
- authorization (static, OAuth2);
Expand Down
8 changes: 8 additions & 0 deletions docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ Connection
* Valid Values: HTTP(S) URL
* Importance: high

``http.method``
The HTTP Method to use when send the data.

* Type: string
* Default: "POST"
* Valid Values: [POST, PUT]
* Importance: low

``http.authorization.type``
The HTTP authorization type.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 Aiven Oy and http-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.http.config;

import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.stream.Collectors;

public enum HttpMethodsType
{
POST("POST"),
PUT("PUT");

public final String name;

HttpMethodsType(final String name) {
this.name = name;
}

public static HttpMethodsType forName(final String name) {
Objects.requireNonNull(name);
return Arrays.stream(values())
.filter(v -> v.name.equalsIgnoreCase(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("HTTP Method type: " + name));
}

public static final Collection<String> NAMES =
Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList());
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public class HttpSinkConfig extends AbstractConfig {
private static final String CONNECTION_GROUP = "Connection";
private static final String HTTP_URL_CONFIG = "http.url";
private static final String HTTP_METHOD = "http.method";

private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type";
private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization";
Expand Down Expand Up @@ -314,6 +315,41 @@ public String toString() {
List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG,
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG)
);

configDef.define(
HTTP_METHOD,
ConfigDef.Type.STRING,
"POST",
new ConfigDef.Validator() {
@Override
@SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE") // Suppress the ConfigException with null value.
public void ensureValid(final String name, final Object value) {
if (value == null) {
throw new ConfigException(HTTP_METHOD, value);
}
assert value instanceof String;
final String valueStr = (String) value;
if (!HttpMethodsType.NAMES.contains(valueStr)) {
throw new ConfigException(
HTTP_METHOD, valueStr,
"supported values are: " + HttpMethodsType.NAMES);
}
}

@Override
public String toString() {
return HttpMethodsType.NAMES.toString();
}
},
ConfigDef.Importance.LOW,
"The HTTP Method to use when send the data.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.SHORT,
HTTP_METHOD,
FixedSetRecommender.ofSupportedValues(HttpMethodsType.NAMES)
);

}

private static void addBatchingConfigGroup(final ConfigDef configDef) {
Expand Down Expand Up @@ -548,6 +584,10 @@ public final URI httpUri() {
return toURI(HTTP_URL_CONFIG);
}

public final HttpMethodsType httpMethod() {
return HttpMethodsType.valueOf(getString(HTTP_METHOD));
}

public final Long kafkaRetryBackoffMs() {
return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,25 @@ protected AbstractHttpSender(
}

public final HttpResponse<String> send(final String body) {
final var requestBuilder =
httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body));
final var requestBuilder = prepareRequest(body);
return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER,
config.maxRetries());
}

// seth http bethod based on config
private Builder prepareRequest(final String body) {
switch (config.httpMethod()) {
case POST:
return httpRequestBuilder
.build(config).POST(HttpRequest.BodyPublishers.ofString(body));
case PUT:
return httpRequestBuilder
.build(config).PUT(HttpRequest.BodyPublishers.ofString(body));
default:
throw new ConnectException("Unsupported HTTP method: " + config.httpMethod());
}
}

/**
* Sends an HTTP body using {@code httpSender}, respecting the configured retry policy.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import io.aiven.kafka.connect.http.config.HttpMethodsType;
import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;
Expand Down Expand Up @@ -88,7 +89,7 @@ void shouldBuildDefaultHttpRequest() throws Exception {
.isPresent()
.get(as(InstanceOfAssertFactories.DURATION))
.hasSeconds(config.httpTimeout());
assertThat(httpRequest.method()).isEqualTo("POST");
assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.POST.name());

assertThat(httpRequest
.headers()
Expand All @@ -101,6 +102,54 @@ void shouldBuildDefaultHttpRequest() throws Exception {

}

@Test
void shouldBuildDefaultHttpPutRequest() throws Exception {
final var configBase = new HashMap<>(defaultConfig());
configBase.put( "http.method", "PUT");

// Build the configuration
final HttpSinkConfig config = new HttpSinkConfig(configBase);

// Mock the Http Client and Http Response
when(mockedClient.send(any(HttpRequest.class), any(BodyHandler.class))).thenReturn(mockedResponse);

// Create a spy on the HttpSender implementation to capture methods parameters
final var httpSender = Mockito.spy(new DefaultHttpSender(config, mockedClient));

// Trigger the client
final List<String> messages = List.of("some message");
messages.forEach(httpSender::send);

// Capture the RequestBuilder
final ArgumentCaptor<Builder> defaultHttpRequestBuilder = ArgumentCaptor.forClass(HttpRequest.Builder.class);
verify(httpSender, atLeast(messages.size())).sendWithRetries(defaultHttpRequestBuilder.capture(),
any(HttpResponseHandler.class), anyInt());

// Retrieve the builders and rebuild the HttpRequests to check the HttpRequest proper configuration
defaultHttpRequestBuilder
.getAllValues()
.stream()
.map(Builder::build)
.forEach(httpRequest -> {
assertThat(httpRequest.uri()).isEqualTo(config.httpUri());
assertThat(httpRequest.timeout())
.isPresent()
.get(as(InstanceOfAssertFactories.DURATION))
.hasSeconds(config.httpTimeout());
assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.PUT.name());

assertThat(httpRequest
.headers()
.firstValue(HttpRequestBuilder.HEADER_CONTENT_TYPE)).isEmpty();
});

// Check the messages have been sent once
messages.forEach(
message -> bodyPublishers.verify(() -> HttpRequest.BodyPublishers.ofString(eq(message)), times(1)));

}


@Test
void shouldBuildCustomHttpRequest() throws Exception {
final var configBase = new HashMap<>(defaultConfig());
Expand Down

0 comments on commit b9ef784

Please sign in to comment.