Skip to content

adding a body param for the password grant type #288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ We consider the connector **stable** despite the major version is currently 0.

## How it works

The connector uses the POST HTTP method to deliver records.
As default the connector uses the POST HTTP method to deliver records.

The connector supports:
- authorization (static, OAuth2);
Expand Down
8 changes: 8 additions & 0 deletions docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ Connection
* Valid Values: HTTP(S) URL
* Importance: high

``http.method``
The HTTP Method to use when send the data.

* Type: string
* Default: "POST"
* Valid Values: [POST, PUT]
* Importance: low

``http.authorization.type``
The HTTP authorization type.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019 Aiven Oy and http-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.http.config;

import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.stream.Collectors;

public enum HttpMethodsType {
POST("POST"),
PUT("PUT");

public final String name;

HttpMethodsType(final String name) {
this.name = name;
}

public static HttpMethodsType forName(final String name) {
Objects.requireNonNull(name);
return Arrays.stream(values())
.filter(v -> v.name.equalsIgnoreCase(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("HTTP Method type: " + name));
}

public static final Collection<String> NAMES =
Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList());
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
public final class HttpSinkConfig extends AbstractConfig {
private static final String CONNECTION_GROUP = "Connection";
private static final String HTTP_URL_CONFIG = "http.url";
private static final String HTTP_METHOD = "http.method";
private static final String HTTP_PROXY_HOST = "http.proxy.host";
private static final String HTTP_PROXY_PORT = "http.proxy.port";
private static final String HTTP_SSL_TRUST_ALL_CERTIFICATES = "http.ssl.trust.all.certs";
Expand All @@ -58,6 +59,7 @@ public final class HttpSinkConfig extends AbstractConfig {
private static final String OAUTH2_GRANT_TYPE_CONFIG = "oauth2.grant.type";
private static final String OAUTH2_CLIENT_ID_PROP_CONFIG = "oauth2.request.client.id.property";
private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id";
private static final String OAUTH2_BODY_PARAMS = "oauth2.body.params";
private static final String OAUTH2_CLIENT_SECRET_PROP_CONFIG = "oauth2.request.client.secret.property";
private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret";
private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode";
Expand Down Expand Up @@ -253,6 +255,23 @@ public boolean visible(final String name, final Map<String, Object> parsedConfig
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG,
OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG)
);
configDef.define(
OAUTH2_BODY_PARAMS,
ConfigDef.Type.STRING,
null,
new ConfigDef.NonEmptyStringWithoutControlChars() {
@Override
public String toString() {
return "OAuth2 additional params";
}
},
ConfigDef.Importance.HIGH,
"Additional params to add to the body.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_BODY_PARAMS
);
configDef.define(OAUTH2_GRANT_TYPE_PROP_CONFIG,
ConfigDef.Type.STRING,
"grant_type",
Expand Down Expand Up @@ -424,6 +443,41 @@ public String toString() {
List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG,
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG)
);

configDef.define(
HTTP_METHOD,
ConfigDef.Type.STRING,
"POST",
new ConfigDef.Validator() {
@Override
@SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE") // Suppress the ConfigException with null value.
public void ensureValid(final String name, final Object value) {
if (value == null) {
throw new ConfigException(HTTP_METHOD, value);
}
assert value instanceof String;
final String valueStr = (String) value;
if (!HttpMethodsType.NAMES.contains(valueStr)) {
throw new ConfigException(
HTTP_METHOD, valueStr,
"supported values are: " + HttpMethodsType.NAMES);
}
}

@Override
public String toString() {
return HttpMethodsType.NAMES.toString();
}
},
ConfigDef.Importance.LOW,
"The HTTP Method to use when send the data.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.SHORT,
HTTP_METHOD,
FixedSetRecommender.ofSupportedValues(HttpMethodsType.NAMES)
);

}

private static void addBatchingConfigGroup(final ConfigDef configDef) {
Expand Down Expand Up @@ -676,6 +730,10 @@ public final URI httpUri() {
return toURI(HTTP_URL_CONFIG);
}

public final HttpMethodsType httpMethod() {
return HttpMethodsType.valueOf(getString(HTTP_METHOD));
}

public final Long kafkaRetryBackoffMs() {
return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG);
}
Expand Down Expand Up @@ -767,6 +825,10 @@ public final String oauth2GrantType() {
return getString(OAUTH2_GRANT_TYPE_CONFIG);
}

public final String getOauth2BodyParams() {
return getString(OAUTH2_BODY_PARAMS);
}

public final String oauth2ClientIdProperty() {
return getString(OAUTH2_CLIENT_ID_PROP_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.aiven.kafka.connect.http.config.HttpMethodsType;
import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;
Expand All @@ -37,23 +38,46 @@ abstract class AbstractHttpSender {

protected final HttpClient httpClient;
protected final HttpSinkConfig config;
protected final HttpMethodsType method;
protected final HttpRequestBuilder httpRequestBuilder;

protected AbstractHttpSender(
final HttpSinkConfig config, final HttpRequestBuilder httpRequestBuilder, final HttpClient httpClient
final HttpSinkConfig config,
final HttpRequestBuilder httpRequestBuilder,
final HttpClient httpClient,
final HttpMethodsType method
) {
this.config = Objects.requireNonNull(config);
this.httpRequestBuilder = Objects.requireNonNull(httpRequestBuilder);
this.httpClient = Objects.requireNonNull(httpClient);
this.method = method;
}

public final HttpResponse<String> send(final String body) {
final var requestBuilder =
httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body));
final var requestBuilder = prepareRequest(body);
return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER,
config.maxRetries());
}

// seth http bethod based on config
private Builder prepareRequest(final String body) {
if(method == null) {
switch (config.httpMethod()) {
case POST:
return httpRequestBuilder
.build(config).POST(HttpRequest.BodyPublishers.ofString(body));
case PUT:
return httpRequestBuilder
.build(config).PUT(HttpRequest.BodyPublishers.ofString(body));
default:
throw new ConnectException("Unsupported HTTP method: " + config.httpMethod());
}
} else
return httpRequestBuilder
.build(config).POST(HttpRequest.BodyPublishers.ofString(body));

}

/**
* Sends an HTTP body using {@code httpSender}, respecting the configured retry policy.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import java.net.http.HttpRequest.Builder;
import java.time.Duration;

import io.aiven.kafka.connect.http.config.HttpMethodsType;
import io.aiven.kafka.connect.http.config.HttpSinkConfig;

class DefaultHttpSender extends AbstractHttpSender implements HttpSender {

DefaultHttpSender(final HttpSinkConfig config, final HttpClient client) {
super(config, new DefaultHttpRequestBuilder(), client);
super(config, new DefaultHttpRequestBuilder(), client, null);
}

static class DefaultHttpRequestBuilder implements HttpRequestBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Base64;
import java.util.Objects;

import io.aiven.kafka.connect.http.config.HttpMethodsType;
import io.aiven.kafka.connect.http.config.HttpSinkConfig;
import io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode;
import io.aiven.kafka.connect.http.sender.request.OAuth2AccessTokenRequestForm;
Expand All @@ -33,14 +34,15 @@
class OAuth2AccessTokenHttpSender extends AbstractHttpSender implements HttpSender {

OAuth2AccessTokenHttpSender(final HttpSinkConfig config, final HttpClient httpClient) {
super(config, new AccessTokenHttpRequestBuilder(), httpClient);
super(config, new AccessTokenHttpRequestBuilder(), httpClient, HttpMethodsType.POST);
}

HttpResponse<String> call() {
final OAuth2AccessTokenRequestForm.Builder formBuilder = OAuth2AccessTokenRequestForm
.newBuilder()
.withGrantTypeProperty(config.oauth2GrantTypeProperty())
.withGrantType(config.oauth2GrantType())
.withBodyParams(config.getOauth2BodyParams())
.withScope(config.oauth2ClientScope());

if (config.oauth2AuthorizationMode() == OAuth2AuthorizationMode.URL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.http.HttpResponse;
import java.util.Map;

import io.aiven.kafka.connect.http.config.HttpMethodsType;
import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;
Expand All @@ -41,7 +42,7 @@ class OAuth2HttpSender extends AbstractHttpSender implements HttpSender {
final HttpClient httpClient,
final OAuth2AccessTokenHttpSender oauth2AccessTokenHttpSender
) {
super(config, new OAuth2AuthHttpRequestBuilder(config, oauth2AccessTokenHttpSender), httpClient);
super(config, new OAuth2AuthHttpRequestBuilder(config, oauth2AccessTokenHttpSender), httpClient, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class StaticAuthHttpSender extends AbstractHttpSender implements HttpSender {

StaticAuthHttpSender(final HttpSinkConfig config, final HttpClient client) {
super(config, new StaticAuthHttpRequestBuilder(), client);
super(config, new StaticAuthHttpRequestBuilder(), client, null);
}

private static class StaticAuthHttpRequestBuilder extends DefaultHttpRequestBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class OAuth2AccessTokenRequestForm {

private final String clientSecretProperty;
private final String clientSecret;
private final String bodyParams;

private OAuth2AccessTokenRequestForm(
final String grantTypeProperty,
Expand All @@ -42,7 +43,8 @@ private OAuth2AccessTokenRequestForm(
final String clientIdProperty,
final String clientId,
final String clientSecretProperty,
final String clientSecret
final String clientSecret,
final String bodyParams
) {
this.grantTypeProperty = grantTypeProperty;
this.grantType = grantType;
Expand All @@ -51,13 +53,17 @@ private OAuth2AccessTokenRequestForm(
this.clientId = clientId;
this.clientSecretProperty = clientSecretProperty;
this.clientSecret = clientSecret;
this.bodyParams = bodyParams;
}

public String toBodyString() {
final StringJoiner stringJoiner = new StringJoiner("&").add(encodeNameAndValue(grantTypeProperty, grantType));
if (scope != null) {
stringJoiner.add(encodeNameAndValue(SCOPE, scope));
}
if (bodyParams != null) {
stringJoiner.add(bodyParams);
}
if (clientId != null && clientSecret != null) {
stringJoiner
.add(encodeNameAndValue(clientIdProperty, clientId))
Expand Down Expand Up @@ -89,6 +95,7 @@ public static class Builder {

private String clientSecretProperty;
private String clientSecret;
private String bodyParams;

private Builder() {
}
Expand All @@ -103,6 +110,11 @@ public Builder withGrantType(final String grantType) {
return this;
}

public Builder withBodyParams(final String bodyParams) {
this.bodyParams = bodyParams;
return this;
}

public Builder withScope(final String scope) {
this.scope = scope;
return this;
Expand Down Expand Up @@ -144,7 +156,8 @@ public OAuth2AccessTokenRequestForm build() {
}

return new OAuth2AccessTokenRequestForm(
grantTypeProperty, grantType, scope, clientIdProperty, clientId, clientSecretProperty, clientSecret);
grantTypeProperty, grantType, scope, clientIdProperty, clientId,
clientSecretProperty, clientSecret, bodyParams);
}

}
Expand Down
Loading