From b9ef784c55af559b1323eaa667f699df3c30e42a Mon Sep 17 00:00:00 2001 From: Eugeniu Date: Thu, 5 Oct 2023 14:11:54 +0100 Subject: [PATCH 1/6] Add Http Method configuration. Tto be used for sending data to target endpoint, Defaulted on POST Method. Supported POST/PUT --- README.md | 2 +- docs/sink-connector-config-options.rst | 8 +++ .../connect/http/config/HttpMethodsType.java | 45 ++++++++++++++++ .../connect/http/config/HttpSinkConfig.java | 40 +++++++++++++++ .../http/sender/AbstractHttpSender.java | 17 ++++++- .../http/sender/DefaultHttpSenderTest.java | 51 ++++++++++++++++++- 6 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java diff --git a/README.md b/README.md index 9bef2040..474f2735 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ We consider the connector **stable** despite the major version is currently 0. ## How it works -The connector uses the POST HTTP method to deliver records. +As default the connector uses the POST HTTP method to deliver records. The connector supports: - authorization (static, OAuth2); diff --git a/docs/sink-connector-config-options.rst b/docs/sink-connector-config-options.rst index baefe5e4..aa4c885a 100644 --- a/docs/sink-connector-config-options.rst +++ b/docs/sink-connector-config-options.rst @@ -12,6 +12,14 @@ Connection * Valid Values: HTTP(S) URL * Importance: high +``http.method`` + The HTTP Method to use when send the data. + + * Type: string + * Default: "POST" + * Valid Values: [POST, PUT] + * Importance: low + ``http.authorization.type`` The HTTP authorization type. diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java new file mode 100644 index 00000000..4172be0b --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Aiven Oy and http-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.config; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.stream.Collectors; + +public enum HttpMethodsType +{ + POST("POST"), + PUT("PUT"); + + public final String name; + + HttpMethodsType(final String name) { + this.name = name; + } + + public static HttpMethodsType forName(final String name) { + Objects.requireNonNull(name); + return Arrays.stream(values()) + .filter(v -> v.name.equalsIgnoreCase(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("HTTP Method type: " + name)); + } + + public static final Collection NAMES = + Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList()); +} diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index 8cd2657f..b1864fad 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -37,6 +37,7 @@ public class HttpSinkConfig extends AbstractConfig { private static final String CONNECTION_GROUP = "Connection"; private static final String HTTP_URL_CONFIG = "http.url"; + private static final String HTTP_METHOD = "http.method"; private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type"; private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization"; @@ -314,6 +315,41 @@ public String toString() { List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG) ); + + configDef.define( + HTTP_METHOD, + ConfigDef.Type.STRING, + "POST", + new ConfigDef.Validator() { + @Override + @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE") // Suppress the ConfigException with null value. + public void ensureValid(final String name, final Object value) { + if (value == null) { + throw new ConfigException(HTTP_METHOD, value); + } + assert value instanceof String; + final String valueStr = (String) value; + if (!HttpMethodsType.NAMES.contains(valueStr)) { + throw new ConfigException( + HTTP_METHOD, valueStr, + "supported values are: " + HttpMethodsType.NAMES); + } + } + + @Override + public String toString() { + return HttpMethodsType.NAMES.toString(); + } + }, + ConfigDef.Importance.LOW, + "The HTTP Method to use when send the data.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.SHORT, + HTTP_METHOD, + FixedSetRecommender.ofSupportedValues(HttpMethodsType.NAMES) + ); + } private static void addBatchingConfigGroup(final ConfigDef configDef) { @@ -548,6 +584,10 @@ public final URI httpUri() { return toURI(HTTP_URL_CONFIG); } + public final HttpMethodsType httpMethod() { + return HttpMethodsType.valueOf(getString(HTTP_METHOD)); + } + public final Long kafkaRetryBackoffMs() { return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG); } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java index 0fa5f520..b84463ec 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java @@ -48,12 +48,25 @@ protected AbstractHttpSender( } public final HttpResponse send(final String body) { - final var requestBuilder = - httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + final var requestBuilder = prepareRequest(body); return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER, config.maxRetries()); } + // seth http bethod based on config + private Builder prepareRequest(final String body) { + switch (config.httpMethod()) { + case POST: + return httpRequestBuilder + .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + case PUT: + return httpRequestBuilder + .build(config).PUT(HttpRequest.BodyPublishers.ofString(body)); + default: + throw new ConnectException("Unsupported HTTP method: " + config.httpMethod()); + } + } + /** * Sends an HTTP body using {@code httpSender}, respecting the configured retry policy. * diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index e666d4dc..9d72d155 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; @@ -88,7 +89,7 @@ void shouldBuildDefaultHttpRequest() throws Exception { .isPresent() .get(as(InstanceOfAssertFactories.DURATION)) .hasSeconds(config.httpTimeout()); - assertThat(httpRequest.method()).isEqualTo("POST"); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.POST.name()); assertThat(httpRequest .headers() @@ -101,6 +102,54 @@ void shouldBuildDefaultHttpRequest() throws Exception { } + @Test + void shouldBuildDefaultHttpPutRequest() throws Exception { + final var configBase = new HashMap<>(defaultConfig()); + configBase.put( "http.method", "PUT"); + + // Build the configuration + final HttpSinkConfig config = new HttpSinkConfig(configBase); + + // Mock the Http Client and Http Response + when(mockedClient.send(any(HttpRequest.class), any(BodyHandler.class))).thenReturn(mockedResponse); + + // Create a spy on the HttpSender implementation to capture methods parameters + final var httpSender = Mockito.spy(new DefaultHttpSender(config, mockedClient)); + + // Trigger the client + final List messages = List.of("some message"); + messages.forEach(httpSender::send); + + // Capture the RequestBuilder + final ArgumentCaptor defaultHttpRequestBuilder = ArgumentCaptor.forClass(HttpRequest.Builder.class); + verify(httpSender, atLeast(messages.size())).sendWithRetries(defaultHttpRequestBuilder.capture(), + any(HttpResponseHandler.class), anyInt()); + + // Retrieve the builders and rebuild the HttpRequests to check the HttpRequest proper configuration + defaultHttpRequestBuilder + .getAllValues() + .stream() + .map(Builder::build) + .forEach(httpRequest -> { + assertThat(httpRequest.uri()).isEqualTo(config.httpUri()); + assertThat(httpRequest.timeout()) + .isPresent() + .get(as(InstanceOfAssertFactories.DURATION)) + .hasSeconds(config.httpTimeout()); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.PUT.name()); + + assertThat(httpRequest + .headers() + .firstValue(HttpRequestBuilder.HEADER_CONTENT_TYPE)).isEmpty(); + }); + + // Check the messages have been sent once + messages.forEach( + message -> bodyPublishers.verify(() -> HttpRequest.BodyPublishers.ofString(eq(message)), times(1))); + + } + + @Test void shouldBuildCustomHttpRequest() throws Exception { final var configBase = new HashMap<>(defaultConfig()); From 317dfe5e26c306a3e214e75459a02a878599cb6c Mon Sep 17 00:00:00 2001 From: Cararus Eugeniu Date: Mon, 30 Oct 2023 12:56:27 +0000 Subject: [PATCH 2/6] Update HttpMethodsType.java Fix linter reported issue --- .../io/aiven/kafka/connect/http/config/HttpMethodsType.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java index 4172be0b..f0a5ad64 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java @@ -21,8 +21,7 @@ import java.util.Objects; import java.util.stream.Collectors; -public enum HttpMethodsType -{ +public enum HttpMethodsType { POST("POST"), PUT("PUT"); From 23c601a8fa2322d5613202b900bda4a5a418bda6 Mon Sep 17 00:00:00 2001 From: Cararus Eugeniu Date: Mon, 30 Oct 2023 12:59:44 +0000 Subject: [PATCH 3/6] Update DefaultHttpSenderTest.java Fix Linter reported issue --- .../kafka/connect/http/sender/DefaultHttpSenderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index 9d72d155..6ea5db7a 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -25,10 +25,10 @@ import java.util.Map; import io.aiven.kafka.connect.http.config.HttpMethodsType; -import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; +import org.apache.kafka.connect.errors.ConnectException; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -105,7 +105,7 @@ void shouldBuildDefaultHttpRequest() throws Exception { @Test void shouldBuildDefaultHttpPutRequest() throws Exception { final var configBase = new HashMap<>(defaultConfig()); - configBase.put( "http.method", "PUT"); + configBase.put("http.method", "PUT"); // Build the configuration final HttpSinkConfig config = new HttpSinkConfig(configBase); From b7f1375767c90005158fad642a79ea8d3cd37098 Mon Sep 17 00:00:00 2001 From: Cararus Eugeniu Date: Tue, 31 Oct 2023 09:47:40 +0000 Subject: [PATCH 4/6] Update DefaultHttpSenderTest.java Fix Linter reported issue --- .../aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index 6ea5db7a..b5ddf331 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -25,10 +25,10 @@ import java.util.Map; import io.aiven.kafka.connect.http.config.HttpMethodsType; - import io.aiven.kafka.connect.http.config.HttpSinkConfig; import org.apache.kafka.connect.errors.ConnectException; + import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; From 9f221ee3eeb49d7486b028ed7c42b3f37f2075ab Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Wed, 2 Apr 2025 17:44:05 +0200 Subject: [PATCH 5/6] adding a body param for the password grant type --- .../connect/http/config/HttpSinkConfig.java | 22 +++++++++++++++++++ .../sender/OAuth2AccessTokenHttpSender.java | 1 + .../request/OAuth2AccessTokenRequestForm.java | 17 ++++++++++++-- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index ed486c38..88cfbf34 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -58,6 +58,7 @@ public final class HttpSinkConfig extends AbstractConfig { private static final String OAUTH2_GRANT_TYPE_CONFIG = "oauth2.grant.type"; private static final String OAUTH2_CLIENT_ID_PROP_CONFIG = "oauth2.request.client.id.property"; private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id"; + private static final String OAUTH2_BODY_PARAMS = "oauth2.body.params"; private static final String OAUTH2_CLIENT_SECRET_PROP_CONFIG = "oauth2.request.client.secret.property"; private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret"; private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode"; @@ -253,6 +254,23 @@ public boolean visible(final String name, final Map parsedConfig OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) ); + configDef.define( + OAUTH2_BODY_PARAMS, + ConfigDef.Type.STRING, + null, + new ConfigDef.NonEmptyStringWithoutControlChars() { + @Override + public String toString() { + return "OAuth2 additional params"; + } + }, + ConfigDef.Importance.HIGH, + "Additional params to add to the body.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_BODY_PARAMS + ); configDef.define(OAUTH2_GRANT_TYPE_PROP_CONFIG, ConfigDef.Type.STRING, "grant_type", @@ -767,6 +785,10 @@ public final String oauth2GrantType() { return getString(OAUTH2_GRANT_TYPE_CONFIG); } + public final String getOauth2BodyParams() { + return getString(OAUTH2_BODY_PARAMS); + } + public final String oauth2ClientIdProperty() { return getString(OAUTH2_CLIENT_ID_PROP_CONFIG); } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java index 0341f3eb..d5b32b8b 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java @@ -41,6 +41,7 @@ HttpResponse call() { .newBuilder() .withGrantTypeProperty(config.oauth2GrantTypeProperty()) .withGrantType(config.oauth2GrantType()) + .withBodyParams(config.getOauth2BodyParams()) .withScope(config.oauth2ClientScope()); if (config.oauth2AuthorizationMode() == OAuth2AuthorizationMode.URL) { diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java b/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java index f0202b5a..12eca1fc 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java @@ -34,6 +34,7 @@ public class OAuth2AccessTokenRequestForm { private final String clientSecretProperty; private final String clientSecret; + private final String bodyParams; private OAuth2AccessTokenRequestForm( final String grantTypeProperty, @@ -42,7 +43,8 @@ private OAuth2AccessTokenRequestForm( final String clientIdProperty, final String clientId, final String clientSecretProperty, - final String clientSecret + final String clientSecret, + final String bodyParams ) { this.grantTypeProperty = grantTypeProperty; this.grantType = grantType; @@ -51,6 +53,7 @@ private OAuth2AccessTokenRequestForm( this.clientId = clientId; this.clientSecretProperty = clientSecretProperty; this.clientSecret = clientSecret; + this.bodyParams = bodyParams; } public String toBodyString() { @@ -58,6 +61,9 @@ public String toBodyString() { if (scope != null) { stringJoiner.add(encodeNameAndValue(SCOPE, scope)); } + if (bodyParams != null) { + stringJoiner.add(bodyParams); + } if (clientId != null && clientSecret != null) { stringJoiner .add(encodeNameAndValue(clientIdProperty, clientId)) @@ -89,6 +95,7 @@ public static class Builder { private String clientSecretProperty; private String clientSecret; + private String bodyParams; private Builder() { } @@ -103,6 +110,11 @@ public Builder withGrantType(final String grantType) { return this; } + public Builder withBodyParams(final String bodyParams) { + this.bodyParams = bodyParams; + return this; + } + public Builder withScope(final String scope) { this.scope = scope; return this; @@ -144,7 +156,8 @@ public OAuth2AccessTokenRequestForm build() { } return new OAuth2AccessTokenRequestForm( - grantTypeProperty, grantType, scope, clientIdProperty, clientId, clientSecretProperty, clientSecret); + grantTypeProperty, grantType, scope, clientIdProperty, clientId, + clientSecretProperty, clientSecret, bodyParams); } } From ab4f092a86e1a8dfcbc287845554c5968a8257b6 Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Fri, 4 Apr 2025 19:19:34 +0200 Subject: [PATCH 6/6] adding a body param for the password grant type --- .../http/sender/AbstractHttpSender.java | 33 ++++++++++++------- .../http/sender/DefaultHttpSender.java | 3 +- .../sender/OAuth2AccessTokenHttpSender.java | 3 +- .../connect/http/sender/OAuth2HttpSender.java | 3 +- .../http/sender/StaticAuthHttpSender.java | 2 +- .../http/sender/DefaultHttpSenderTest.java | 1 + 6 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java index b84463ec..7fc5065d 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; @@ -37,14 +38,19 @@ abstract class AbstractHttpSender { protected final HttpClient httpClient; protected final HttpSinkConfig config; + protected final HttpMethodsType method; protected final HttpRequestBuilder httpRequestBuilder; protected AbstractHttpSender( - final HttpSinkConfig config, final HttpRequestBuilder httpRequestBuilder, final HttpClient httpClient + final HttpSinkConfig config, + final HttpRequestBuilder httpRequestBuilder, + final HttpClient httpClient, + final HttpMethodsType method ) { this.config = Objects.requireNonNull(config); this.httpRequestBuilder = Objects.requireNonNull(httpRequestBuilder); this.httpClient = Objects.requireNonNull(httpClient); + this.method = method; } public final HttpResponse send(final String body) { @@ -55,16 +61,21 @@ public final HttpResponse send(final String body) { // seth http bethod based on config private Builder prepareRequest(final String body) { - switch (config.httpMethod()) { - case POST: - return httpRequestBuilder - .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); - case PUT: - return httpRequestBuilder - .build(config).PUT(HttpRequest.BodyPublishers.ofString(body)); - default: - throw new ConnectException("Unsupported HTTP method: " + config.httpMethod()); - } + if(method == null) { + switch (config.httpMethod()) { + case POST: + return httpRequestBuilder + .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + case PUT: + return httpRequestBuilder + .build(config).PUT(HttpRequest.BodyPublishers.ofString(body)); + default: + throw new ConnectException("Unsupported HTTP method: " + config.httpMethod()); + } + } else + return httpRequestBuilder + .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + } /** diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java index f461d85d..85f3b593 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java @@ -21,12 +21,13 @@ import java.net.http.HttpRequest.Builder; import java.time.Duration; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import io.aiven.kafka.connect.http.config.HttpSinkConfig; class DefaultHttpSender extends AbstractHttpSender implements HttpSender { DefaultHttpSender(final HttpSinkConfig config, final HttpClient client) { - super(config, new DefaultHttpRequestBuilder(), client); + super(config, new DefaultHttpRequestBuilder(), client, null); } static class DefaultHttpRequestBuilder implements HttpRequestBuilder { diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java index d5b32b8b..6d1a5813 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java @@ -24,6 +24,7 @@ import java.util.Base64; import java.util.Objects; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import io.aiven.kafka.connect.http.config.HttpSinkConfig; import io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode; import io.aiven.kafka.connect.http.sender.request.OAuth2AccessTokenRequestForm; @@ -33,7 +34,7 @@ class OAuth2AccessTokenHttpSender extends AbstractHttpSender implements HttpSender { OAuth2AccessTokenHttpSender(final HttpSinkConfig config, final HttpClient httpClient) { - super(config, new AccessTokenHttpRequestBuilder(), httpClient); + super(config, new AccessTokenHttpRequestBuilder(), httpClient, HttpMethodsType.POST); } HttpResponse call() { diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java index e8dffdd1..90fd8b69 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java @@ -23,6 +23,7 @@ import java.net.http.HttpResponse; import java.util.Map; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; @@ -41,7 +42,7 @@ class OAuth2HttpSender extends AbstractHttpSender implements HttpSender { final HttpClient httpClient, final OAuth2AccessTokenHttpSender oauth2AccessTokenHttpSender ) { - super(config, new OAuth2AuthHttpRequestBuilder(config, oauth2AccessTokenHttpSender), httpClient); + super(config, new OAuth2AuthHttpRequestBuilder(config, oauth2AccessTokenHttpSender), httpClient, null); } @Override diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java index 5bc0b202..a5f49c50 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java @@ -25,7 +25,7 @@ class StaticAuthHttpSender extends AbstractHttpSender implements HttpSender { StaticAuthHttpSender(final HttpSinkConfig config, final HttpClient client) { - super(config, new StaticAuthHttpRequestBuilder(), client); + super(config, new StaticAuthHttpRequestBuilder(), client, null); } private static class StaticAuthHttpRequestBuilder extends DefaultHttpRequestBuilder { diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index 7bacde91..16056ca4 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -47,6 +47,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; @ExtendWith(MockitoExtension.class) public class DefaultHttpSenderTest extends HttpSenderTestBase {