In the Javadoc of withHeaders it says:
* set a supplier for custom headers. This is invoked for every HTTP request to Elasticsearch to retrieve headers
* that should be sent with the request. A common use case is passing in authentication headers that may change.
And in a Spring app that authentication is likely held in a ThreadLocal by a SecurityContextHolder.
But since 6.0 when Rest5Clients configures the Apache HttpAsyncClientBuilder it adds the headers from withHeaders as an HttpRequestInterceptor which is used by the Elasticsearch low-level Rest5Client through the Apache CloseableHttpAsyncClient asynchronously (in a different thread) and so the authentication is not present.
To resolve this, Rest5Clients should configure the builder by adding the headers as an AsyncExecChainHandler. The exec chain is used before the request is handled asynchronously so the authentication is present to be added to the request headers.
The fix may look like:
httpAsyncClientBuilder.addExecInterceptorFirst("es-rest5-client",
(request, entityProducer, scope, chain, asyncExecCallback) -> {
clientConfiguration.getHeadersSupplier().get().forEach((header, values) -> {
// The accept and content-type headers may already put on the request, despite this being the
// firstinterceptor.
if ("Accept".equalsIgnoreCase(header) || " Content-Type".equalsIgnoreCase(header)) {
request.removeHeaders(header);
}
values.forEach(value -> request.addHeader(header, value));
});
chain.proceed(request, entityProducer, scope, asyncExecCallback);
});
This can be demonstrated in the RestClientsTest by adding a ThreadLocal to withHeaders. Only the Rest5Client will fail:
@ParameterizedTest(name = "{0}") // DATAES-801, DATAES-588
@MethodSource("clientUnderTestFactorySource")
@DisplayName("should configure client and set all required headers")
void shouldConfigureClientAndSetAllRequiredHeaders(ClientUnderTestFactory clientUnderTestFactory) {
// ADD a ThreadLocal for testing
ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> null);
wireMockServer(server -> {
HttpHeaders defaultHeaders = new HttpHeaders();
defaultHeaders.addAll("def1", Arrays.asList("def1-1", "def1-2"));
defaultHeaders.add("def2", "def2-1");
AtomicInteger supplierCount = new AtomicInteger(1);
AtomicInteger restClientConfigurerCount = new AtomicInteger(0);
AtomicInteger httpClientConfigurerCount = new AtomicInteger(0);
AtomicInteger connectionConfigurerCount = new AtomicInteger(0);
AtomicInteger connectionManagerConfigurerCount = new AtomicInteger(0);
AtomicInteger requestConfigurerCount = new AtomicInteger(0);
ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder();
configurationBuilder //
.connectedTo("localhost:" + server.port()) //
.withBasicAuth("user", "password") //
.withDefaultHeaders(defaultHeaders) //
.withHeaders(() -> {
HttpHeaders httpHeaders = new HttpHeaders();
// USE the ThreadLocal in the test
httpHeaders.add("thread", threadLocal.get());
httpHeaders.add("supplied", "val0");
httpHeaders.add("supplied", "val" + supplierCount.getAndIncrement());
return httpHeaders;
});
// SET a value into the ThreadLocal
threadLocal.set("local");
if (clientUnderTestFactory instanceof ELCRest5ClientUnderTestFactory) {
configurationBuilder.withClientConfigurer(
Rest5Clients.ElasticsearchRest5ClientConfigurationCallback.from(rest5ClientBuilder -> {
restClientConfigurerCount.incrementAndGet();
return rest5ClientBuilder;
}));
configurationBuilder.withClientConfigurer(
Rest5Clients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> {
httpClientConfigurerCount.incrementAndGet();
return httpClientBuilder;
}));
configurationBuilder.withClientConfigurer(
Rest5Clients.ElasticsearchConnectionConfigurationCallback.from(connectionConfigBuilder -> {
connectionConfigurerCount.incrementAndGet();
return connectionConfigBuilder;
}));
configurationBuilder.withClientConfigurer(
Rest5Clients.ElasticsearchConnectionManagerCallback.from(connectionManagerBuilder -> {
connectionManagerConfigurerCount.incrementAndGet();
return connectionManagerBuilder;
}));
configurationBuilder.withClientConfigurer(
Rest5Clients.ElasticsearchRequestConfigCallback.from(requestConfigBuilder -> {
requestConfigurerCount.incrementAndGet();
return requestConfigBuilder;
}));
} else if (clientUnderTestFactory instanceof ELCRestClientUnderTestFactory) {
configurationBuilder.withClientConfigurer(
RestClients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> {
httpClientConfigurerCount.incrementAndGet();
return httpClientBuilder;
}));
configurationBuilder.withClientConfigurer(
RestClients.ElasticsearchRestClientConfigurationCallback.from(restClientBuilder -> {
restClientConfigurerCount.incrementAndGet();
return restClientBuilder;
}));
} else if (clientUnderTestFactory instanceof ReactiveELCUnderTestFactory) {
configurationBuilder
.withClientConfigurer(RestClients.ElasticsearchHttpClientConfigurationCallback.from(webClient -> {
httpClientConfigurerCount.incrementAndGet();
return webClient;
}));
configurationBuilder.withClientConfigurer(
RestClients.ElasticsearchRestClientConfigurationCallback.from(restClientBuilder -> {
restClientConfigurerCount.incrementAndGet();
return restClientBuilder;
}));
}
ClientConfiguration clientConfiguration = configurationBuilder.build();
ClientUnderTest clientUnderTest = clientUnderTestFactory.create(clientConfiguration);
// do several calls to check that the headerSupplier provided values are set
int startValue = clientUnderTest.usesInitialRequest() ? 2 : 1;
for (int i = startValue; i <= startValue + 2; i++) {
clientUnderTest.ping();
verify(headRequestedFor(urlEqualTo("/")) //
.withHeader("Authorization", new AnythingPattern()) //
.withHeader("def1", new EqualToPattern("def1-1")) //
.withHeader("def1", new EqualToPattern("def1-2")) //
.withHeader("def2", new EqualToPattern("def2-1")) //
.withHeader("thread", new EqualToPattern("local")) // VERIFY the ThreadLocal
.withHeader("supplied", new EqualToPattern("val0")) //
// on the first call Elasticsearch does the version check and thus already increments the counter
.withHeader("supplied", new EqualToPattern("val" + i)) //
.withHeader("supplied", including("val0", "val" + i)));
}
assertThat(restClientConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRestClientConfigurerCalls());
assertThat(httpClientConfigurerCount).hasValue(1);
assertThat(connectionConfigurerCount).hasValue(clientUnderTestFactory.getExpectedConnectionConfigurerCalls());
assertThat(connectionManagerConfigurerCount)
.hasValue(clientUnderTestFactory.getExpectedConnectionManagerConfigurerCalls());
assertThat(requestConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRequestConfigurerCalls());
});
}
In the Javadoc of withHeaders it says:
And in a Spring app that authentication is likely held in a ThreadLocal by a SecurityContextHolder.
But since 6.0 when Rest5Clients configures the Apache HttpAsyncClientBuilder it adds the headers from withHeaders as an HttpRequestInterceptor which is used by the Elasticsearch low-level Rest5Client through the Apache CloseableHttpAsyncClient asynchronously (in a different thread) and so the authentication is not present.
To resolve this, Rest5Clients should configure the builder by adding the headers as an AsyncExecChainHandler. The exec chain is used before the request is handled asynchronously so the authentication is present to be added to the request headers.
The fix may look like:
This can be demonstrated in the RestClientsTest by adding a ThreadLocal to withHeaders. Only the Rest5Client will fail: