Skip to content

Commit ea4ad5e

Browse files
authored
[logstash bridge]: stableapi bootstrap (elastic#108171)
* [logstash bridge]: stableapi bootstrap Adds a new `logstash-bridge` project in `/libs` that exports api-stable wrappers for the elasticsearch-internal types that Logstash's Elastic Integration Filter relies on to provide ingest pipeline execution inside of Logstash. These bridge classes prevent Elasticsearch-internal refactorings from breaking the Logstash-owned project. * Update docs/changelog/108171.yaml * rename StableAPI -> StableBridgeAPI
1 parent fd7a184 commit ea4ad5e

19 files changed

+865
-0
lines changed

.github/CODEOWNERS

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monito
2020
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet @elastic/fleet
2121
x-pack/plugin/core/src/main/resources/fleet-* @elastic/fleet
2222

23+
# Logstash
24+
libs/logstash-bridge @elastic/logstash
25+
2326
# Kibana Security
2427
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java @elastic/kibana-security
2528

docs/changelog/108171.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 108171
2+
summary: "add Elastic-internal stable bridge api for use by Logstash"
3+
area: Infra/Core
4+
type: enhancement
5+
issues: []

libs/logstash-bridge/README.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
## Logstash Bridge
2+
3+
This package contains bridge functionality to ensure that Logstash's Elastic Integration plugin
4+
has access to the minimal subset of Elasticsearch to perform its functions without relying on
5+
other Elasticsearch internals.
6+
7+
If a change is introduced in a separate Elasticsearch project that causes this project to fail,
8+
please consult with members of @elastic/logstash to chart a path forward.

libs/logstash-bridge/build.gradle

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
apply plugin: 'elasticsearch.build'
9+
10+
dependencies {
11+
compileOnly project(':server')
12+
compileOnly project(':libs:elasticsearch-core')
13+
compileOnly project(':libs:elasticsearch-plugin-api')
14+
compileOnly project(':libs:elasticsearch-x-content')
15+
compileOnly project(':modules:lang-painless')
16+
compileOnly project(':modules:lang-painless:spi')
17+
compileOnly project(':modules:lang-mustache')
18+
compileOnly project(':modules:ingest-common')
19+
// compileOnly project(':modules:ingest-geoip')
20+
}
21+
22+
tasks.named('forbiddenApisMain').configure {
23+
replaceSignatureFiles 'jdk-signatures'
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
/** Elasticsearch Logstash Bridge. */
10+
module org.elasticsearch.logstashbridge {
11+
requires org.elasticsearch.base;
12+
requires org.elasticsearch.grok;
13+
requires org.elasticsearch.server;
14+
requires org.elasticsearch.painless;
15+
requires org.elasticsearch.painless.spi;
16+
requires org.elasticsearch.mustache;
17+
requires org.elasticsearch.xcontent;
18+
19+
exports org.elasticsearch.logstashbridge;
20+
exports org.elasticsearch.logstashbridge.common;
21+
exports org.elasticsearch.logstashbridge.core;
22+
exports org.elasticsearch.logstashbridge.env;
23+
exports org.elasticsearch.logstashbridge.ingest;
24+
exports org.elasticsearch.logstashbridge.plugins;
25+
exports org.elasticsearch.logstashbridge.script;
26+
exports org.elasticsearch.logstashbridge.threadpool;
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.logstashbridge;
9+
10+
import java.util.Map;
11+
import java.util.Objects;
12+
import java.util.function.Function;
13+
import java.util.stream.Collectors;
14+
15+
/**
16+
* A {@code StableBridgeAPI} is the stable bridge to an Elasticsearch API, and can produce instances
17+
* from the actual API that they mirror. As part of the LogstashBridge project, these classes are relied
18+
* upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes mut not change
19+
* without coordination with the maintainers of that project.
20+
*
21+
* @param <T> the actual type of the Elasticsearch API being mirrored
22+
*/
23+
public interface StableBridgeAPI<T> {
24+
T unwrap();
25+
26+
static <T> T unwrapNullable(final StableBridgeAPI<T> nullableStableBridgeAPI) {
27+
if (Objects.isNull(nullableStableBridgeAPI)) {
28+
return null;
29+
}
30+
return nullableStableBridgeAPI.unwrap();
31+
}
32+
33+
static <K, T> Map<K, T> unwrap(final Map<K, ? extends StableBridgeAPI<T>> bridgeMap) {
34+
return bridgeMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().unwrap()));
35+
}
36+
37+
static <K, T, B extends StableBridgeAPI<T>> Map<K, B> wrap(final Map<K, T> rawMap, final Function<T, B> wrapFunction) {
38+
return rawMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> wrapFunction.apply(e.getValue())));
39+
}
40+
41+
static <T, B extends StableBridgeAPI<T>> B wrap(final T delegate, final Function<T, B> wrapFunction) {
42+
if (Objects.isNull(delegate)) {
43+
return null;
44+
}
45+
return wrapFunction.apply(delegate);
46+
}
47+
48+
abstract class Proxy<T> implements StableBridgeAPI<T> {
49+
protected final T delegate;
50+
51+
protected Proxy(final T delegate) {
52+
this.delegate = delegate;
53+
}
54+
55+
@Override
56+
public T unwrap() {
57+
return delegate;
58+
}
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.logstashbridge.common;
9+
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.logstashbridge.StableBridgeAPI;
12+
13+
public class SettingsBridge extends StableBridgeAPI.Proxy<Settings> {
14+
15+
public static SettingsBridge wrap(final Settings delegate) {
16+
return new SettingsBridge(delegate);
17+
}
18+
19+
public static Builder builder() {
20+
return Builder.wrap(Settings.builder());
21+
}
22+
23+
public SettingsBridge(final Settings delegate) {
24+
super(delegate);
25+
}
26+
27+
@Override
28+
public Settings unwrap() {
29+
return this.delegate;
30+
}
31+
32+
public static class Builder extends StableBridgeAPI.Proxy<Settings.Builder> {
33+
static Builder wrap(final Settings.Builder delegate) {
34+
return new Builder(delegate);
35+
}
36+
37+
private Builder(final Settings.Builder delegate) {
38+
super(delegate);
39+
}
40+
41+
public Builder put(final String key, final String value) {
42+
this.delegate.put(key, value);
43+
return this;
44+
}
45+
46+
public SettingsBridge build() {
47+
return new SettingsBridge(this.delegate.build());
48+
}
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.logstashbridge.core;
9+
10+
import org.elasticsearch.core.IOUtils;
11+
12+
import java.io.Closeable;
13+
14+
public class IOUtilsBridge {
15+
public static void closeWhileHandlingException(final Iterable<? extends Closeable> objects) {
16+
IOUtils.closeWhileHandlingException(objects);
17+
}
18+
19+
public static void closeWhileHandlingException(final Closeable closeable) {
20+
IOUtils.closeWhileHandlingException(closeable);
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.logstashbridge.env;
9+
10+
import org.elasticsearch.env.Environment;
11+
import org.elasticsearch.logstashbridge.StableBridgeAPI;
12+
import org.elasticsearch.logstashbridge.common.SettingsBridge;
13+
14+
import java.nio.file.Path;
15+
16+
public class EnvironmentBridge extends StableBridgeAPI.Proxy<Environment> {
17+
public static EnvironmentBridge wrap(final Environment delegate) {
18+
return new EnvironmentBridge(delegate);
19+
}
20+
21+
public EnvironmentBridge(final SettingsBridge settingsBridge, final Path configPath) {
22+
this(new Environment(settingsBridge.unwrap(), configPath));
23+
}
24+
25+
private EnvironmentBridge(final Environment delegate) {
26+
super(delegate);
27+
}
28+
29+
@Override
30+
public Environment unwrap() {
31+
return this.delegate;
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.logstashbridge.ingest;
9+
10+
import org.elasticsearch.ingest.ConfigurationUtils;
11+
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
12+
import org.elasticsearch.logstashbridge.script.TemplateScriptBridge;
13+
14+
import java.util.Map;
15+
16+
public class ConfigurationUtilsBridge {
17+
public static TemplateScriptBridge.Factory compileTemplate(
18+
final String processorType,
19+
final String processorTag,
20+
final String propertyName,
21+
final String propertyValue,
22+
final ScriptServiceBridge scriptServiceBridge
23+
) {
24+
return new TemplateScriptBridge.Factory(
25+
ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.unwrap())
26+
);
27+
}
28+
29+
public static String readStringProperty(
30+
final String processorType,
31+
final String processorTag,
32+
final Map<String, Object> configuration,
33+
final String propertyName
34+
) {
35+
return ConfigurationUtils.readStringProperty(processorType, processorTag, configuration, propertyName);
36+
}
37+
38+
public static Boolean readBooleanProperty(
39+
final String processorType,
40+
final String processorTag,
41+
final Map<String, Object> configuration,
42+
final String propertyName,
43+
final boolean defaultValue
44+
) {
45+
return ConfigurationUtils.readBooleanProperty(processorType, processorTag, configuration, propertyName, defaultValue);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.logstashbridge.ingest;
9+
10+
import org.elasticsearch.ingest.IngestDocument;
11+
import org.elasticsearch.ingest.LogstashInternalBridge;
12+
import org.elasticsearch.logstashbridge.StableBridgeAPI;
13+
import org.elasticsearch.logstashbridge.script.MetadataBridge;
14+
import org.elasticsearch.logstashbridge.script.TemplateScriptBridge;
15+
16+
import java.util.Map;
17+
import java.util.Set;
18+
import java.util.function.BiConsumer;
19+
20+
public class IngestDocumentBridge extends StableBridgeAPI.Proxy<IngestDocument> {
21+
22+
public static String INGEST_KEY = IngestDocument.INGEST_KEY;
23+
24+
public static IngestDocumentBridge wrap(final IngestDocument ingestDocument) {
25+
if (ingestDocument == null) {
26+
return null;
27+
}
28+
return new IngestDocumentBridge(ingestDocument);
29+
}
30+
31+
public IngestDocumentBridge(final Map<String, Object> sourceAndMetadata, final Map<String, Object> ingestMetadata) {
32+
this(new IngestDocument(sourceAndMetadata, ingestMetadata));
33+
}
34+
35+
private IngestDocumentBridge(IngestDocument inner) {
36+
super(inner);
37+
}
38+
39+
public MetadataBridge getMetadata() {
40+
return new MetadataBridge(delegate.getMetadata());
41+
}
42+
43+
public Map<String, Object> getSource() {
44+
return delegate.getSource();
45+
}
46+
47+
public boolean updateIndexHistory(final String index) {
48+
return delegate.updateIndexHistory(index);
49+
}
50+
51+
public Set<String> getIndexHistory() {
52+
return Set.copyOf(delegate.getIndexHistory());
53+
}
54+
55+
public boolean isReroute() {
56+
return LogstashInternalBridge.isReroute(delegate);
57+
}
58+
59+
public void resetReroute() {
60+
LogstashInternalBridge.resetReroute(delegate);
61+
}
62+
63+
public Map<String, Object> getIngestMetadata() {
64+
return Map.copyOf(delegate.getIngestMetadata());
65+
}
66+
67+
public <T> T getFieldValue(final String fieldName, final Class<T> type) {
68+
return delegate.getFieldValue(fieldName, type);
69+
}
70+
71+
public <T> T getFieldValue(final String fieldName, final Class<T> type, final boolean ignoreMissing) {
72+
return delegate.getFieldValue(fieldName, type, ignoreMissing);
73+
}
74+
75+
public String renderTemplate(final TemplateScriptBridge.Factory templateScriptFactory) {
76+
return delegate.renderTemplate(templateScriptFactory.unwrap());
77+
}
78+
79+
public void setFieldValue(final String path, final Object value) {
80+
delegate.setFieldValue(path, value);
81+
}
82+
83+
public void removeField(final String path) {
84+
delegate.removeField(path);
85+
}
86+
87+
// public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
88+
public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
89+
this.delegate.executePipeline(pipelineBridge.unwrap(), (unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e));
90+
}
91+
}

0 commit comments

Comments
 (0)