Skip to content

Commit 36c481f

Browse files
committed
auth api key
1 parent 7103e1c commit 36c481f

File tree

6 files changed

+174
-6
lines changed

6 files changed

+174
-6
lines changed

src/main/java/io/confluent/connect/elasticsearch/ConfigCallbackHandler.java

+11
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import java.security.AccessController;
2020
import java.security.PrivilegedActionException;
2121
import java.security.PrivilegedExceptionAction;
22+
import java.util.Arrays;
2223
import java.util.Collections;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
26+
import java.util.List;
2527
import java.util.Map;
2628
import javax.net.ssl.HostnameVerifier;
2729
import javax.net.ssl.SSLContext;
@@ -30,6 +32,7 @@
3032
import javax.security.auth.login.AppConfigurationEntry;
3133
import javax.security.auth.login.Configuration;
3234
import javax.security.auth.login.LoginContext;
35+
import org.apache.http.Header;
3336
import org.apache.http.HttpHost;
3437
import org.apache.http.auth.AuthSchemeProvider;
3538
import org.apache.http.auth.AuthScope;
@@ -49,6 +52,7 @@
4952
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
5053
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
5154
import org.apache.http.impl.nio.reactor.IOReactorConfig;
55+
import org.apache.http.message.BasicHeader;
5256
import org.apache.http.nio.conn.NoopIOSessionStrategy;
5357
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
5458
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
@@ -132,6 +136,13 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) {
132136
)
133137
);
134138
builder.setDefaultCredentialsProvider(credentialsProvider);
139+
} else if (config.isApikeyConfigured()) {
140+
String apiKey = config.apikey();
141+
List<Header> defaultHeaders = Arrays.asList(
142+
new BasicHeader("Authorization",
143+
"ApiKey " + apiKey));
144+
145+
builder.setDefaultHeaders(defaultHeaders);
135146
}
136147

137148
if (config.isBasicProxyConfigured()) {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

+24
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
7171
private static final String CONNECTION_PASSWORD_DISPLAY = "Connection Password";
7272
private static final String CONNECTION_PASSWORD_DEFAULT = null;
7373

74+
public static final String CONNECTION_APIKEY_CONFIG = "connection.apikey";
75+
private static final String CONNECTION_APIKEY_DOC =
76+
"The API key id used to authenticate with Elasticsearch.";
77+
private static final String CONNECTION_APIKEY_DISPLAY = "Connection API key ID";
78+
private static final String CONNECTION_APIKEY_DEFAULT = null;
79+
7480
public static final String BATCH_SIZE_CONFIG = "batch.size";
7581
private static final String BATCH_SIZE_DOC =
7682
"The number of records to process as a batch when writing to Elasticsearch.";
@@ -449,6 +455,16 @@ private static void addConnectorConfigs(ConfigDef configDef) {
449455
++order,
450456
Width.SHORT,
451457
CONNECTION_PASSWORD_DISPLAY
458+
).define(
459+
CONNECTION_APIKEY_CONFIG,
460+
Type.STRING,
461+
CONNECTION_APIKEY_DEFAULT,
462+
Importance.MEDIUM,
463+
CONNECTION_APIKEY_DOC,
464+
CONNECTOR_GROUP,
465+
++order,
466+
Width.LONG,
467+
CONNECTION_APIKEY_DISPLAY
452468
).define(
453469
BATCH_SIZE_CONFIG,
454470
Type.INT,
@@ -848,6 +864,10 @@ public boolean isAuthenticatedConnection() {
848864
return username() != null && password() != null;
849865
}
850866

867+
public boolean isApikeyConfigured() {
868+
return apikey() != null;
869+
}
870+
851871
public boolean isBasicProxyConfigured() {
852872
return !getString(PROXY_HOST_CONFIG).isEmpty();
853873
}
@@ -989,6 +1009,10 @@ public Password password() {
9891009
return getPassword(CONNECTION_PASSWORD_CONFIG);
9901010
}
9911011

1012+
public String apikey() {
1013+
return getString(CONNECTION_APIKEY_CONFIG);
1014+
}
1015+
9921016
public String proxyHost() {
9931017
return getString(PROXY_HOST_CONFIG);
9941018
}

src/main/java/io/confluent/connect/elasticsearch/Validator.java

+18
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
4040
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
4141
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
42+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_APIKEY_CONFIG;
4243
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
4344
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
4445
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
@@ -101,6 +102,7 @@ public Config validate() {
101102
}
102103

103104
validateCredentials();
105+
validateApiKey();
104106
validateDataStreamConfigs();
105107
validateIgnoreConfigs();
106108
validateKerberos();
@@ -137,6 +139,22 @@ private void validateCredentials() {
137139
}
138140
}
139141

142+
private void validateApiKey() {
143+
if (config.isApikeyConfigured()) {
144+
if (config.isAuthenticatedConnection()) {
145+
String errorMessage = String.format(
146+
"Either only API Key (%s) or connection credentials (%s, %s) must be set.",
147+
CONNECTION_APIKEY_CONFIG,
148+
CONNECTION_USERNAME_CONFIG,
149+
CONNECTION_PASSWORD_CONFIG
150+
);
151+
addErrorMessage(CONNECTION_APIKEY_CONFIG, errorMessage);
152+
addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage);
153+
addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage);
154+
}
155+
}
156+
}
157+
140158
private void validateDataStreamConfigs() {
141159
if (config.dataStreamType() == DataStreamType.NONE ^ config.dataStreamDataset().isEmpty()) {
142160
String errorMessage = String.format(

src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchContainer.java

+46-6
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,18 @@
1515

1616
package io.confluent.connect.elasticsearch.helper;
1717

18-
import io.confluent.connect.elasticsearch.ElasticsearchClient;
19-
import io.confluent.connect.elasticsearch.RetryUtil;
18+
import co.elastic.clients.elasticsearch.security.GrantApiKeyResponse;
2019
import org.apache.kafka.common.config.SslConfigs;
21-
import org.apache.kafka.test.TestUtils;
2220
import org.elasticsearch.client.security.user.User;
2321
import org.elasticsearch.client.security.user.privileges.Role;
2422
import org.slf4j.Logger;
2523
import org.slf4j.LoggerFactory;
2624
import org.testcontainers.containers.ContainerLaunchException;
2725
import org.testcontainers.containers.output.OutputFrame;
2826
import org.testcontainers.containers.wait.strategy.Wait;
29-
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
30-
import org.testcontainers.images.RemoteDockerImage;
3127
import org.testcontainers.images.builder.ImageFromDockerfile;
3228
import org.testcontainers.images.builder.dockerfile.DockerfileBuilder;
3329
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
34-
import org.testcontainers.utility.DockerImageName;
3530

3631
import java.io.File;
3732
import java.io.FileOutputStream;
@@ -40,8 +35,10 @@
4035
import java.net.InetAddress;
4136
import java.net.URI;
4237
import java.net.URISyntaxException;
38+
import java.nio.charset.StandardCharsets;
4339
import java.time.Duration;
4440
import java.util.ArrayList;
41+
import java.util.Base64;
4542
import java.util.HashMap;
4643
import java.util.List;
4744
import java.util.Map;
@@ -147,6 +144,7 @@ public static ElasticsearchContainer withESVersion(String ESVersion) {
147144
private Map<User, String> usersToCreate;
148145
private String localKeystorePath;
149146
private String localTruststorePath;
147+
private Map<String, String> apiKeysToCreate;
150148

151149
/**
152150
* Create an Elasticsearch container with the given image name with version qualifier.
@@ -179,6 +177,9 @@ public void start() {
179177
ElasticsearchHelperClient helperClient = getHelperClient(props);
180178
helperClient.waitForConnection(30000);
181179
createUsersAndRoles(helperClient);
180+
if (isApikeyEnabled()) {
181+
createApikeys(helperClient);
182+
}
182183
}
183184
}
184185

@@ -204,6 +205,21 @@ private void createUsersAndRoles(ElasticsearchHelperClient helperClient ) {
204205
}
205206
}
206207

208+
private void createApikeys(ElasticsearchHelperClient helperClient ) {
209+
try {
210+
for (Map.Entry<User,String> userToPassword: this.usersToCreate.entrySet()) {
211+
GrantApiKeyResponse grantApiKeyResponse = helperClient.grantApiKey(userToPassword, this.rolesToCreate);
212+
String apiKey =
213+
Base64.getEncoder().encodeToString(
214+
(grantApiKeyResponse.id() + ":" + grantApiKeyResponse.apiKey())
215+
.getBytes(StandardCharsets.UTF_8));
216+
apiKeysToCreate.put(userToPassword.getKey().getUsername(), apiKey);
217+
}
218+
} catch (IOException e) {
219+
throw new ContainerLaunchException("Container startup failed", e);
220+
}
221+
}
222+
207223
public ElasticsearchContainer withSslEnabled(boolean enable) {
208224
enableSsl(enable);
209225
return this;
@@ -219,6 +235,12 @@ public ElasticsearchContainer withBasicAuth(Map<User, String> users, List<Role>
219235
return this;
220236
}
221237

238+
public ElasticsearchContainer withApikey(Map<User, String> users, List<Role> roles) {
239+
enableBasicAuth(users, roles);
240+
enableApikeys(users);
241+
return this;
242+
}
243+
222244
/**
223245
* Set whether the Elasticsearch instance should use SSL.
224246
*
@@ -289,10 +311,24 @@ private void enableBasicAuth(Map<User, String> users, List<Role> roles) {
289311
this.rolesToCreate = roles;
290312
}
291313

314+
private void enableApikeys(Map<User, String> users) {
315+
if (isKerberosEnabled()) {
316+
throw new IllegalStateException(
317+
"Api Keys and Kerberos are mutually exclusive."
318+
);
319+
}
320+
this.apiKeysToCreate = new HashMap<>();
321+
users.keySet().stream().forEach(user -> this.apiKeysToCreate.put(user.getUsername(), null));
322+
}
323+
292324
public boolean isBasicAuthEnabled() {
293325
return usersToCreate != null && !this.usersToCreate.isEmpty();
294326
}
295327

328+
public boolean isApikeyEnabled() {
329+
return apiKeysToCreate != null && !this.apiKeysToCreate.isEmpty();
330+
}
331+
296332
private String getFullResourcePath(String resourceName) {
297333
if (isSslEnabled() && isKerberosEnabled()) {
298334
return "/both/" + resourceName;
@@ -601,4 +637,8 @@ public boolean shouldStartClientInCompatibilityMode() {
601637
public int esMajorVersion() {
602638
return getImageVersion().get(0);
603639
}
640+
641+
public Map<String, String> getAPIkeys() {
642+
return apiKeysToCreate;
643+
}
604644
}

src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java

+28
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717

1818
import co.elastic.clients.elasticsearch.ElasticsearchClient;
1919
import co.elastic.clients.elasticsearch.indices.GetDataStreamRequest.Builder;
20+
import co.elastic.clients.elasticsearch.security.GrantApiKeyRequest;
21+
import co.elastic.clients.elasticsearch.security.GrantApiKeyResponse;
22+
import co.elastic.clients.elasticsearch.security.grant_api_key.ApiKeyGrantType;
23+
import co.elastic.clients.elasticsearch.security.grant_api_key.GrantApiKey;
2024
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
2125
import co.elastic.clients.transport.rest_client.RestClientTransport;
26+
import org.apache.commons.lang3.StringUtils;
2227
import org.apache.http.HttpHost;
2328
import org.apache.kafka.test.TestUtils;
2429
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -62,6 +67,7 @@ public class ElasticsearchHelperClient {
6267
private final String url;
6368
private final ElasticsearchSinkConnectorConfig config;
6469
private RestHighLevelClient client;
70+
private ElasticsearchClient elasticsearchClient;
6571

6672
public ElasticsearchHelperClient(String url, ElasticsearchSinkConnectorConfig config,
6773
boolean compatibilityMode) {
@@ -75,6 +81,11 @@ public ElasticsearchHelperClient(String url, ElasticsearchSinkConnectorConfig co
7581
.build()
7682
// compatibility mode should be true for 7.17 high level rest clients while talking to ES 8.
7783
).setApiCompatibilityMode(compatibilityMode).build();
84+
85+
elasticsearchClient = new ElasticsearchClient(new RestClientTransport(
86+
RestClient.builder(HttpHost.create(url)).setHttpClientConfigCallback(configCallbackHandler).build(),
87+
new JacksonJsonpMapper()
88+
));
7889
}
7990

8091
public ElasticsearchHelperClient(String url, ElasticsearchSinkConnectorConfig config) {
@@ -167,6 +178,23 @@ public void createUser(Entry<User, String> userToPassword) throws IOException {
167178
throw new RuntimeException(String.format("Failed to create a user %s", userToPassword.getKey().getUsername()));
168179
}
169180
}
181+
public GrantApiKeyResponse grantApiKey(Entry<User, String> userToPassword, List<Role> roles) throws IOException {
182+
GrantApiKey grantApiKey =
183+
new GrantApiKey.Builder().name("apikey_".concat(userToPassword.getKey().getUsername())).build();
184+
GrantApiKeyRequest grantApiKeyRequest = new GrantApiKeyRequest.Builder()
185+
.grantType(ApiKeyGrantType.Password)
186+
.apiKey(grantApiKey)
187+
.username(userToPassword.getKey().getUsername())
188+
.password(userToPassword.getValue())
189+
.build();
190+
191+
GrantApiKeyResponse grantApiKeyResponse = elasticsearchClient.security().grantApiKey(grantApiKeyRequest);
192+
if (StringUtils.isEmpty(grantApiKeyResponse.apiKey())) {
193+
throw new RuntimeException(String.format("Failed to create API key for user %s", userToPassword.getKey().getUsername()));
194+
}
195+
return grantApiKeyResponse;
196+
}
197+
170198

171199
public void waitForConnection(long timeMs) {
172200
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.confluent.connect.elasticsearch.integration;
2+
3+
import io.confluent.common.utils.IntegrationTest;
4+
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
5+
import org.apache.http.Header;
6+
import org.apache.http.message.BasicHeader;
7+
import org.elasticsearch.client.security.user.User;
8+
import org.elasticsearch.client.security.user.privileges.Role;
9+
import org.junit.AfterClass;
10+
import org.junit.BeforeClass;
11+
import org.junit.Test;
12+
import org.junit.experimental.categories.Category;
13+
import java.util.Arrays;
14+
import java.util.List;
15+
import java.util.Map;
16+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_APIKEY_CONFIG;
17+
18+
@Category(IntegrationTest.class)
19+
public class ElasticsearchConnectorApikeyIT extends ElasticsearchConnectorBaseIT {
20+
21+
@BeforeClass
22+
public static void setupBeforeAll() throws Exception {
23+
Map<User, String> users = getUsers();
24+
List<Role> roles = getRoles();
25+
container = ElasticsearchContainer.fromSystemProperties().withApikey(users, roles);
26+
container.start();
27+
}
28+
29+
@AfterClass
30+
public static void cleanupAfterAll() {
31+
container.close();
32+
}
33+
34+
@Test
35+
public void testApikey() throws Exception {
36+
addApikeyConfigConfigs(props);
37+
helperClient = container.getHelperClient(props);
38+
helperClient.waitForConnection(60000);
39+
runSimpleTest(props);
40+
}
41+
42+
protected static void addApikeyConfigConfigs(Map<String, String> props) {
43+
String apikey = container.getAPIkeys().get(ELASTIC_MINIMAL_PRIVILEGES_NAME);
44+
props.put(CONNECTION_APIKEY_CONFIG, apikey);
45+
}
46+
47+
}

0 commit comments

Comments
 (0)