From 36c481f311aa7bc403c5b04222aa7d02bd0aea37 Mon Sep 17 00:00:00 2001 From: alozano3 Date: Fri, 11 Oct 2024 17:36:28 +0200 Subject: [PATCH 1/4] auth api key --- .../elasticsearch/ConfigCallbackHandler.java | 11 ++++ .../ElasticsearchSinkConnectorConfig.java | 24 +++++++++ .../connect/elasticsearch/Validator.java | 18 +++++++ .../helper/ElasticsearchContainer.java | 52 ++++++++++++++++--- .../helper/ElasticsearchHelperClient.java | 28 ++++++++++ .../ElasticsearchConnectorApikeyIT.java | 47 +++++++++++++++++ 6 files changed, 174 insertions(+), 6 deletions(-) create mode 100644 src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java diff --git a/src/main/java/io/confluent/connect/elasticsearch/ConfigCallbackHandler.java b/src/main/java/io/confluent/connect/elasticsearch/ConfigCallbackHandler.java index 1c2676604..51ffe0cd4 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ConfigCallbackHandler.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ConfigCallbackHandler.java @@ -19,9 +19,11 @@ import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; @@ -30,6 +32,7 @@ import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; +import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; @@ -49,6 +52,7 @@ import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.message.BasicHeader; import org.apache.http.nio.conn.NoopIOSessionStrategy; import org.apache.http.nio.conn.SchemeIOSessionStrategy; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; @@ -132,6 +136,13 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) { ) ); builder.setDefaultCredentialsProvider(credentialsProvider); + } else if (config.isApikeyConfigured()) { + String apiKey = config.apikey(); + List
defaultHeaders = Arrays.asList( + new BasicHeader("Authorization", + "ApiKey " + apiKey)); + + builder.setDefaultHeaders(defaultHeaders); } if (config.isBasicProxyConfigured()) { diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index bacd85975..7e8e7b1db 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -71,6 +71,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String CONNECTION_PASSWORD_DISPLAY = "Connection Password"; private static final String CONNECTION_PASSWORD_DEFAULT = null; + public static final String CONNECTION_APIKEY_CONFIG = "connection.apikey"; + private static final String CONNECTION_APIKEY_DOC = + "The API key id used to authenticate with Elasticsearch."; + private static final String CONNECTION_APIKEY_DISPLAY = "Connection API key ID"; + private static final String CONNECTION_APIKEY_DEFAULT = null; + public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "The number of records to process as a batch when writing to Elasticsearch."; @@ -449,6 +455,16 @@ private static void addConnectorConfigs(ConfigDef configDef) { ++order, Width.SHORT, CONNECTION_PASSWORD_DISPLAY + ).define( + CONNECTION_APIKEY_CONFIG, + Type.STRING, + CONNECTION_APIKEY_DEFAULT, + Importance.MEDIUM, + CONNECTION_APIKEY_DOC, + CONNECTOR_GROUP, + ++order, + Width.LONG, + CONNECTION_APIKEY_DISPLAY ).define( BATCH_SIZE_CONFIG, Type.INT, @@ -848,6 +864,10 @@ public boolean isAuthenticatedConnection() { return username() != null && password() != null; } + public boolean isApikeyConfigured() { + return apikey() != null; + } + public boolean isBasicProxyConfigured() { return !getString(PROXY_HOST_CONFIG).isEmpty(); } @@ -989,6 +1009,10 @@ public Password password() { return getPassword(CONNECTION_PASSWORD_CONFIG); } + public String apikey() { + return getString(CONNECTION_APIKEY_CONFIG); + } + public String proxyHost() { return getString(PROXY_HOST_CONFIG); } diff --git a/src/main/java/io/confluent/connect/elasticsearch/Validator.java b/src/main/java/io/confluent/connect/elasticsearch/Validator.java index 0dc577cec..1b2d25374 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/Validator.java +++ b/src/main/java/io/confluent/connect/elasticsearch/Validator.java @@ -39,6 +39,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_APIKEY_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG; @@ -101,6 +102,7 @@ public Config validate() { } validateCredentials(); + validateApiKey(); validateDataStreamConfigs(); validateIgnoreConfigs(); validateKerberos(); @@ -137,6 +139,22 @@ private void validateCredentials() { } } + private void validateApiKey() { + if (config.isApikeyConfigured()) { + if (config.isAuthenticatedConnection()) { + String errorMessage = String.format( + "Either only API Key (%s) or connection credentials (%s, %s) must be set.", + CONNECTION_APIKEY_CONFIG, + CONNECTION_USERNAME_CONFIG, + CONNECTION_PASSWORD_CONFIG + ); + addErrorMessage(CONNECTION_APIKEY_CONFIG, errorMessage); + addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); + addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); + } + } + } + private void validateDataStreamConfigs() { if (config.dataStreamType() == DataStreamType.NONE ^ config.dataStreamDataset().isEmpty()) { String errorMessage = String.format( diff --git a/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchContainer.java b/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchContainer.java index 0761decb0..f84c4db98 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchContainer.java +++ b/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchContainer.java @@ -15,10 +15,8 @@ package io.confluent.connect.elasticsearch.helper; -import io.confluent.connect.elasticsearch.ElasticsearchClient; -import io.confluent.connect.elasticsearch.RetryUtil; +import co.elastic.clients.elasticsearch.security.GrantApiKeyResponse; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.test.TestUtils; import org.elasticsearch.client.security.user.User; import org.elasticsearch.client.security.user.privileges.Role; import org.slf4j.Logger; @@ -26,12 +24,9 @@ import org.testcontainers.containers.ContainerLaunchException; import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; -import org.testcontainers.images.RemoteDockerImage; import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.images.builder.dockerfile.DockerfileBuilder; import org.testcontainers.shaded.org.apache.commons.io.IOUtils; -import org.testcontainers.utility.DockerImageName; import java.io.File; import java.io.FileOutputStream; @@ -40,8 +35,10 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -147,6 +144,7 @@ public static ElasticsearchContainer withESVersion(String ESVersion) { private Map usersToCreate; private String localKeystorePath; private String localTruststorePath; + private Map apiKeysToCreate; /** * Create an Elasticsearch container with the given image name with version qualifier. @@ -179,6 +177,9 @@ public void start() { ElasticsearchHelperClient helperClient = getHelperClient(props); helperClient.waitForConnection(30000); createUsersAndRoles(helperClient); + if (isApikeyEnabled()) { + createApikeys(helperClient); + } } } @@ -204,6 +205,21 @@ private void createUsersAndRoles(ElasticsearchHelperClient helperClient ) { } } + private void createApikeys(ElasticsearchHelperClient helperClient ) { + try { + for (Map.Entry userToPassword: this.usersToCreate.entrySet()) { + GrantApiKeyResponse grantApiKeyResponse = helperClient.grantApiKey(userToPassword, this.rolesToCreate); + String apiKey = + Base64.getEncoder().encodeToString( + (grantApiKeyResponse.id() + ":" + grantApiKeyResponse.apiKey()) + .getBytes(StandardCharsets.UTF_8)); + apiKeysToCreate.put(userToPassword.getKey().getUsername(), apiKey); + } + } catch (IOException e) { + throw new ContainerLaunchException("Container startup failed", e); + } + } + public ElasticsearchContainer withSslEnabled(boolean enable) { enableSsl(enable); return this; @@ -219,6 +235,12 @@ public ElasticsearchContainer withBasicAuth(Map users, List return this; } + public ElasticsearchContainer withApikey(Map users, List roles) { + enableBasicAuth(users, roles); + enableApikeys(users); + return this; + } + /** * Set whether the Elasticsearch instance should use SSL. * @@ -289,10 +311,24 @@ private void enableBasicAuth(Map users, List roles) { this.rolesToCreate = roles; } + private void enableApikeys(Map users) { + if (isKerberosEnabled()) { + throw new IllegalStateException( + "Api Keys and Kerberos are mutually exclusive." + ); + } + this.apiKeysToCreate = new HashMap<>(); + users.keySet().stream().forEach(user -> this.apiKeysToCreate.put(user.getUsername(), null)); + } + public boolean isBasicAuthEnabled() { return usersToCreate != null && !this.usersToCreate.isEmpty(); } + public boolean isApikeyEnabled() { + return apiKeysToCreate != null && !this.apiKeysToCreate.isEmpty(); + } + private String getFullResourcePath(String resourceName) { if (isSslEnabled() && isKerberosEnabled()) { return "/both/" + resourceName; @@ -601,4 +637,8 @@ public boolean shouldStartClientInCompatibilityMode() { public int esMajorVersion() { return getImageVersion().get(0); } + + public Map getAPIkeys() { + return apiKeysToCreate; + } } diff --git a/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java b/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java index f31c188c9..c6c846278 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java +++ b/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java @@ -17,8 +17,13 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.indices.GetDataStreamRequest.Builder; +import co.elastic.clients.elasticsearch.security.GrantApiKeyRequest; +import co.elastic.clients.elasticsearch.security.GrantApiKeyResponse; +import co.elastic.clients.elasticsearch.security.grant_api_key.ApiKeyGrantType; +import co.elastic.clients.elasticsearch.security.grant_api_key.GrantApiKey; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.kafka.test.TestUtils; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -62,6 +67,7 @@ public class ElasticsearchHelperClient { private final String url; private final ElasticsearchSinkConnectorConfig config; private RestHighLevelClient client; + private ElasticsearchClient elasticsearchClient; public ElasticsearchHelperClient(String url, ElasticsearchSinkConnectorConfig config, boolean compatibilityMode) { @@ -75,6 +81,11 @@ public ElasticsearchHelperClient(String url, ElasticsearchSinkConnectorConfig co .build() // compatibility mode should be true for 7.17 high level rest clients while talking to ES 8. ).setApiCompatibilityMode(compatibilityMode).build(); + + elasticsearchClient = new ElasticsearchClient(new RestClientTransport( + RestClient.builder(HttpHost.create(url)).setHttpClientConfigCallback(configCallbackHandler).build(), + new JacksonJsonpMapper() + )); } public ElasticsearchHelperClient(String url, ElasticsearchSinkConnectorConfig config) { @@ -167,6 +178,23 @@ public void createUser(Entry userToPassword) throws IOException { throw new RuntimeException(String.format("Failed to create a user %s", userToPassword.getKey().getUsername())); } } + public GrantApiKeyResponse grantApiKey(Entry userToPassword, List roles) throws IOException { + GrantApiKey grantApiKey = + new GrantApiKey.Builder().name("apikey_".concat(userToPassword.getKey().getUsername())).build(); + GrantApiKeyRequest grantApiKeyRequest = new GrantApiKeyRequest.Builder() + .grantType(ApiKeyGrantType.Password) + .apiKey(grantApiKey) + .username(userToPassword.getKey().getUsername()) + .password(userToPassword.getValue()) + .build(); + + GrantApiKeyResponse grantApiKeyResponse = elasticsearchClient.security().grantApiKey(grantApiKeyRequest); + if (StringUtils.isEmpty(grantApiKeyResponse.apiKey())) { + throw new RuntimeException(String.format("Failed to create API key for user %s", userToPassword.getKey().getUsername())); + } + return grantApiKeyResponse; + } + public void waitForConnection(long timeMs) { try { diff --git a/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java new file mode 100644 index 000000000..3d5216428 --- /dev/null +++ b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java @@ -0,0 +1,47 @@ +package io.confluent.connect.elasticsearch.integration; + +import io.confluent.common.utils.IntegrationTest; +import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.security.user.User; +import org.elasticsearch.client.security.user.privileges.Role; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_APIKEY_CONFIG; + +@Category(IntegrationTest.class) +public class ElasticsearchConnectorApikeyIT extends ElasticsearchConnectorBaseIT { + + @BeforeClass + public static void setupBeforeAll() throws Exception { + Map users = getUsers(); + List roles = getRoles(); + container = ElasticsearchContainer.fromSystemProperties().withApikey(users, roles); + container.start(); + } + + @AfterClass + public static void cleanupAfterAll() { + container.close(); + } + + @Test + public void testApikey() throws Exception { + addApikeyConfigConfigs(props); + helperClient = container.getHelperClient(props); + helperClient.waitForConnection(60000); + runSimpleTest(props); + } + + protected static void addApikeyConfigConfigs(Map props) { + String apikey = container.getAPIkeys().get(ELASTIC_MINIMAL_PRIVILEGES_NAME); + props.put(CONNECTION_APIKEY_CONFIG, apikey); + } + +} From 25095bf99c7e1178f28f9da7425ed32a1b4e2559 Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 11 Oct 2024 16:27:25 +0200 Subject: [PATCH 2/4] Update ElasticsearchConnectorApikeyIT.java --- .../integration/ElasticsearchConnectorApikeyIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java index 3d5216428..b44cb04bc 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java +++ b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java @@ -43,5 +43,4 @@ protected static void addApikeyConfigConfigs(Map props) { String apikey = container.getAPIkeys().get(ELASTIC_MINIMAL_PRIVILEGES_NAME); props.put(CONNECTION_APIKEY_CONFIG, apikey); } - } From a1e172128007c9ff92d54cb71bfe59016ea6347e Mon Sep 17 00:00:00 2001 From: alozano3 Date: Fri, 11 Oct 2024 17:39:06 +0200 Subject: [PATCH 3/4] Empty-Commit From a2d71ea8d2ff8f82ab90516d7ae2f5e7fa461276 Mon Sep 17 00:00:00 2001 From: alozano3 Date: Thu, 7 Nov 2024 12:29:17 +0100 Subject: [PATCH 4/4] unused imports --- .../integration/ElasticsearchConnectorApikeyIT.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java index b44cb04bc..ab9fc7612 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java +++ b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorApikeyIT.java @@ -2,15 +2,12 @@ import io.confluent.common.utils.IntegrationTest; import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer; -import org.apache.http.Header; -import org.apache.http.message.BasicHeader; import org.elasticsearch.client.security.user.User; import org.elasticsearch.client.security.user.privileges.Role; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.Arrays; import java.util.List; import java.util.Map; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_APIKEY_CONFIG;