diff --git a/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigChangeListener.java b/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigChangeListener.java new file mode 100644 index 00000000000..44833713f8a --- /dev/null +++ b/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigChangeListener.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.api.incubator.config; + +/** Listener notified when declarative configuration changes. */ +@FunctionalInterface +public interface ConfigChangeListener { + + /** + * Called when the watched path changes. + * + *

{@code path} is the changed declarative configuration path, for example {@code + * .instrumentation/development.general.http} or {@code + * .instrumentation/development.java.methods}. + * + *

{@code newConfig} is never null. If the watched node is unset or cleared, {@code newConfig} + * is {@link DeclarativeConfigProperties#empty()}. + * + * @param path the declarative configuration path that changed + * @param newConfig the updated configuration for the changed path + */ + void onChange(String path, DeclarativeConfigProperties newConfig); +} diff --git a/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigChangeRegistration.java b/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigChangeRegistration.java new file mode 100644 index 00000000000..6e499ba11ad --- /dev/null +++ b/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigChangeRegistration.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.api.incubator.config; + +/** Registration handle returned by {@link ConfigProvider#addConfigChangeListener}. */ +@FunctionalInterface +public interface ConfigChangeRegistration { + + /** + * Unregister the listener associated with this registration. + * + *

Subsequent calls have no effect. + */ + void close(); +} diff --git a/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigProvider.java b/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigProvider.java index dcf6c7bd082..8617ce6f7ca 100644 --- a/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigProvider.java +++ b/api/incubator/src/main/java/io/opentelemetry/api/incubator/config/ConfigProvider.java @@ -65,6 +65,28 @@ default DeclarativeConfigProperties getGeneralInstrumentationConfig() { return getInstrumentationConfig().get("general"); } + /** + * Registers a {@link ConfigChangeListener} for changes to a specific declarative configuration + * path. + * + *

Example paths include {@code .instrumentation/development.general.http} and {@code + * .instrumentation/development.java.methods}. + * + *

When a watched path changes, {@link ConfigChangeListener#onChange(String, + * DeclarativeConfigProperties)} is invoked with the changed path and updated configuration for + * that path. + * + *

The default implementation performs no registration and returns a no-op handle. + * + * @param path the declarative configuration path to watch + * @param listener the listener to notify when the watched path changes + * @return a {@link ConfigChangeRegistration} that can be closed to unregister the listener + */ + default ConfigChangeRegistration addConfigChangeListener( + String path, ConfigChangeListener listener) { + return () -> {}; + } + /** Returns a no-op {@link ConfigProvider}. */ static ConfigProvider noop() { return DeclarativeConfigProperties::empty; diff --git a/api/incubator/src/test/java/io/opentelemetry/api/incubator/ConfigProviderTest.java b/api/incubator/src/test/java/io/opentelemetry/api/incubator/ConfigProviderTest.java index b5fdca1cdf6..43aa3fd0ba0 100644 --- a/api/incubator/src/test/java/io/opentelemetry/api/incubator/ConfigProviderTest.java +++ b/api/incubator/src/test/java/io/opentelemetry/api/incubator/ConfigProviderTest.java @@ -6,7 +6,9 @@ package io.opentelemetry.api.incubator; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import io.opentelemetry.api.incubator.config.ConfigChangeRegistration; import io.opentelemetry.api.incubator.config.ConfigProvider; import org.junit.jupiter.api.Test; @@ -24,5 +26,9 @@ void instrumentationConfigFallback() { assertThat(configProvider.getInstrumentationConfig()).isNotNull(); assertThat(configProvider.getInstrumentationConfig("servlet")).isNotNull(); assertThat(configProvider.getGeneralInstrumentationConfig()).isNotNull(); + ConfigChangeRegistration listenerRegistration = + configProvider.addConfigChangeListener( + ".instrumentation/development.java.servlet", (path, newConfig) -> {}); + assertThatCode(listenerRegistration::close).doesNotThrowAnyException(); } } diff --git a/api/incubator/src/test/java/io/opentelemetry/api/incubator/ExtendedOpenTelemetryTest.java b/api/incubator/src/test/java/io/opentelemetry/api/incubator/ExtendedOpenTelemetryTest.java index 5907cc41166..14a4c3c81db 100644 --- a/api/incubator/src/test/java/io/opentelemetry/api/incubator/ExtendedOpenTelemetryTest.java +++ b/api/incubator/src/test/java/io/opentelemetry/api/incubator/ExtendedOpenTelemetryTest.java @@ -31,6 +31,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -111,6 +112,31 @@ void instrumentationConfig() { .isEqualTo(Arrays.asList("client-request-header1", "client-request-header2")); } + @Test + void close_shutsDownConfigProvider() { + String configYaml = + "instrumentation/development:\n" + + " general:\n" + + " http:\n" + + " enabled: \"false\""; + SdkConfigProvider configProvider = + SdkConfigProvider.create( + DeclarativeConfiguration.toConfigProperties( + new ByteArrayInputStream(configYaml.getBytes(StandardCharsets.UTF_8)))); + ExtendedOpenTelemetrySdk sdk = + ExtendedOpenTelemetrySdk.create(OpenTelemetrySdk.builder().build(), configProvider); + + AtomicInteger callbackCount = new AtomicInteger(); + configProvider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> callbackCount.incrementAndGet()); + + sdk.close(); + + configProvider.setConfig(".instrumentation/development.general.http.enabled", "true"); + assertThat(callbackCount.get()).isEqualTo(0); + } + @Test void instrumentationConfigFallback() { ConfigProvider configProvider = ConfigProvider.noop(); diff --git a/api/incubator/src/test/java/io/opentelemetry/api/incubator/config/InstrumentationConfigUtilTest.java b/api/incubator/src/test/java/io/opentelemetry/api/incubator/config/InstrumentationConfigUtilTest.java index a8860fc8d75..d4e62f45109 100644 --- a/api/incubator/src/test/java/io/opentelemetry/api/incubator/config/InstrumentationConfigUtilTest.java +++ b/api/incubator/src/test/java/io/opentelemetry/api/incubator/config/InstrumentationConfigUtilTest.java @@ -9,8 +9,8 @@ import com.google.common.collect.ImmutableMap; import io.opentelemetry.sdk.autoconfigure.declarativeconfig.DeclarativeConfiguration; -import io.opentelemetry.sdk.autoconfigure.declarativeconfig.YamlDeclarativeConfigProperties; import io.opentelemetry.sdk.internal.SdkConfigProvider; +import io.opentelemetry.sdk.internal.YamlDeclarativeConfigProperties; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; diff --git a/api/incubator/src/test/java/io/opentelemetry/sdk/internal/SdkConfigProviderTest.java b/api/incubator/src/test/java/io/opentelemetry/sdk/internal/SdkConfigProviderTest.java new file mode 100644 index 00000000000..6410e0cf077 --- /dev/null +++ b/api/incubator/src/test/java/io/opentelemetry/sdk/internal/SdkConfigProviderTest.java @@ -0,0 +1,512 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.incubator.config.ConfigChangeRegistration; +import io.opentelemetry.api.incubator.config.DeclarativeConfigException; +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.common.ComponentLoader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; + +class SdkConfigProviderTest { + + @Test + void addConfigChangeListener_notifiesOnWatchedPathChange() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "false")))))); + List notifications = new ArrayList<>(); + ConfigChangeRegistration registration = + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> notifications.add(path + "=" + newConfig.getString("enabled"))); + + provider.setConfig( + ".instrumentation/development", + config(mapOf("general", mapOf("http", mapOf("enabled", "true"))))); + + assertThat(notifications).containsExactly(".instrumentation/development.general.http=true"); + registration.close(); + } + + @Test + void addConfigChangeListener_ignoresUnchangedAndNonWatchedUpdates() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf( + "general", + mapOf("http", mapOf("enabled", "true")), + "java", + mapOf("servlet", mapOf("enabled", "true")))))); + AtomicInteger callbackCount = new AtomicInteger(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> callbackCount.incrementAndGet()); + + provider.setConfig( + ".instrumentation/development", + config( + mapOf( + "general", + mapOf("http", mapOf("enabled", "true")), + "java", + mapOf("servlet", mapOf("enabled", "false"))))); + provider.setConfig( + ".instrumentation/development", + config( + mapOf( + "general", + mapOf("http", mapOf("enabled", "true")), + "java", + mapOf("servlet", mapOf("enabled", "false"))))); + + assertThat(callbackCount).hasValue(0); + } + + @Test + void addConfigChangeListener_returnsEmptyNodeWhenWatchedPathCleared() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "true")))))); + List> propertyKeysSeen = new ArrayList<>(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> propertyKeysSeen.add(newConfig.getPropertyKeys())); + + provider.setConfig(".instrumentation/development", config(mapOf("general", mapOf()))); + + assertThat(propertyKeysSeen).containsExactly(Collections.emptySet()); + } + + @Test + void addConfigChangeListener_closeAndShutdownStopCallbacks() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "false")))))); + AtomicInteger callbackCount = new AtomicInteger(); + ConfigChangeRegistration registration = + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> callbackCount.incrementAndGet()); + + registration.close(); + registration.close(); + provider.setConfig( + ".instrumentation/development", + config(mapOf("general", mapOf("http", mapOf("enabled", "true"))))); + assertThat(callbackCount).hasValue(0); + + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> callbackCount.incrementAndGet()); + provider.shutdown(); + provider.setConfig( + ".instrumentation/development", + config(mapOf("general", mapOf("http", mapOf("enabled", "false"))))); + assertThat(callbackCount).hasValue(0); + } + + @Test + void addConfigChangeListener_listenerExceptionIsIsolated() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "false")))))); + AtomicInteger successfulCallbacks = new AtomicInteger(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> { + throw new IllegalStateException("boom"); + }); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> successfulCallbacks.incrementAndGet()); + + provider.setConfig( + ".instrumentation/development", + config(mapOf("general", mapOf("http", mapOf("enabled", "true"))))); + + assertThat(successfulCallbacks).hasValue(1); + } + + @Test + void setConfig_replacesSubtreeAndNotifiesListener() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "false")))))); + List notifications = new ArrayList<>(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> notifications.add(path + "=" + newConfig.getString("enabled"))); + + provider.setConfig( + ".instrumentation/development.general.http", config(mapOf("enabled", "true"))); + + assertThat(notifications).containsExactly(".instrumentation/development.general.http=true"); + } + + @Test + void setConfig_doesNotNotifyWhenSubtreeUnchanged() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "true")))))); + AtomicInteger callbackCount = new AtomicInteger(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> callbackCount.incrementAndGet()); + + provider.setConfig( + ".instrumentation/development.general.http", config(mapOf("enabled", "true"))); + + assertThat(callbackCount).hasValue(0); + } + + @Test + void setConfig_createsIntermediateNodesIfMissing() { + SdkConfigProvider provider = SdkConfigProvider.create(config(mapOf())); + List notifications = new ArrayList<>(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> notifications.add(path + "=" + newConfig.getString("enabled"))); + + provider.setConfig( + ".instrumentation/development.general.http", config(mapOf("enabled", "true"))); + + assertThat(notifications).containsExactly(".instrumentation/development.general.http=true"); + } + + @Test + void setConfig_noopWhenDisposed() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "false")))))); + AtomicInteger callbackCount = new AtomicInteger(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> callbackCount.incrementAndGet()); + provider.shutdown(); + + provider.setConfig( + ".instrumentation/development.general.http", config(mapOf("enabled", "true"))); + provider.setConfig(".instrumentation/development.general.http.enabled", "true"); + + assertThat(callbackCount).hasValue(0); + } + + @Test + void setConfig_setsValueAndNotifiesListener() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "false")))))); + List notifications = new ArrayList<>(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> notifications.add(path + "=" + newConfig.getString("enabled"))); + + provider.setConfig(".instrumentation/development.general.http.enabled", "true"); + + assertThat(notifications).containsExactly(".instrumentation/development.general.http=true"); + } + + @Test + void setConfig_doesNotNotifyWhenValueUnchanged() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("enabled", "true")))))); + AtomicInteger callbackCount = new AtomicInteger(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> callbackCount.incrementAndGet()); + + provider.setConfig(".instrumentation/development.general.http.enabled", "true"); + + assertThat(callbackCount).hasValue(0); + } + + @Test + void concurrentUpdates_allChangesAreApplied() throws Exception { + SdkConfigProvider provider = + SdkConfigProvider.create( + config( + mapOf( + "instrumentation/development", + mapOf("general", mapOf("http", mapOf("count", "0")))))); + List notifications = new CopyOnWriteArrayList<>(); + provider.addConfigChangeListener( + ".instrumentation/development.general.http", + (path, newConfig) -> notifications.add(newConfig.getString("count"))); + + int threadCount = 10; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + List> futures = new ArrayList<>(); + for (int i = 0; i < threadCount; i++) { + int index = i + 1; + futures.add( + executor.submit( + () -> { + try { + startLatch.await(); + provider.setConfig( + ".instrumentation/development.general.http.count", String.valueOf(index)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); + } + })); + } + startLatch.countDown(); + assertThat(doneLatch.await(5, TimeUnit.SECONDS)).isTrue(); + for (Future future : futures) { + future.get(1, TimeUnit.SECONDS); + } + executor.shutdown(); + + assertThat(notifications).hasSize(threadCount); + DeclarativeConfigProperties finalConfig = + provider.getInstrumentationConfig().get("general").get("http"); + assertThat(finalConfig.getString("count")).isNotNull(); + } + + @Test + void pathValidation_rejectsMissingLeadingDot() { + SdkConfigProvider provider = SdkConfigProvider.create(config(mapOf())); + + assertThatThrownBy( + () -> provider.addConfigChangeListener("instrumentation", (path, newConfig) -> {})) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> provider.setConfig("instrumentation.subtree", config(mapOf()))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> provider.setConfig("instrumentation.key", "value")) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void pathValidation_rejectsWildcards() { + SdkConfigProvider provider = SdkConfigProvider.create(config(mapOf())); + + assertThatThrownBy(() -> provider.addConfigChangeListener(".*", (path, newConfig) -> {})) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> provider.setConfig(".foo.*.subtree", config(mapOf()))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> provider.setConfig(".foo.*.key", "value")) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void pathValidation_rejectsBrackets() { + SdkConfigProvider provider = SdkConfigProvider.create(config(mapOf())); + + assertThatThrownBy(() -> provider.addConfigChangeListener(".foo[0]", (path, newConfig) -> {})) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> provider.setConfig(".foo[0].subtree", config(mapOf()))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> provider.setConfig(".foo[0].key", "value")) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void pathValidation_rejectsRootOnlyPath() { + SdkConfigProvider provider = SdkConfigProvider.create(config(mapOf())); + + assertThatThrownBy(() -> provider.setConfig(".", config(mapOf()))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("key segment"); + } + + @Test + void setConfig_throwsOnSchemaConflict() { + SdkConfigProvider provider = + SdkConfigProvider.create( + config(mapOf("instrumentation/development", mapOf("general", "scalarValue")))); + + assertThatThrownBy( + () -> + provider.setConfig( + ".instrumentation/development.general.http", config(mapOf("enabled", "true")))) + .isInstanceOf(DeclarativeConfigException.class) + .hasMessageContaining("general") + .hasMessageContaining("not a mapping"); + + assertThatThrownBy( + () -> provider.setConfig(".instrumentation/development.general.http.enabled", "true")) + .isInstanceOf(DeclarativeConfigException.class) + .hasMessageContaining("general") + .hasMessageContaining("not a mapping"); + } + + private static DeclarativeConfigProperties config(Map root) { + return new MapBackedDeclarativeConfigProperties(root); + } + + private static Map mapOf(Object... entries) { + Map result = new LinkedHashMap<>(); + for (int i = 0; i < entries.length; i += 2) { + result.put((String) entries[i], entries[i + 1]); + } + return result; + } + + private static final class MapBackedDeclarativeConfigProperties + implements DeclarativeConfigProperties { + private static final ComponentLoader COMPONENT_LOADER = + ComponentLoader.forClassLoader(MapBackedDeclarativeConfigProperties.class.getClassLoader()); + + private final Map values; + + private MapBackedDeclarativeConfigProperties(Map values) { + this.values = values; + } + + @Override + public String getString(String name) { + Object value = values.get(name); + return value instanceof String ? (String) value : null; + } + + @Override + public Boolean getBoolean(String name) { + Object value = values.get(name); + return value instanceof Boolean ? (Boolean) value : null; + } + + @Override + public Integer getInt(String name) { + Object value = values.get(name); + return value instanceof Integer ? (Integer) value : null; + } + + @Override + public Long getLong(String name) { + Object value = values.get(name); + if (value instanceof Long) { + return (Long) value; + } + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } + return null; + } + + @Override + public Double getDouble(String name) { + Object value = values.get(name); + if (value instanceof Double) { + return (Double) value; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return null; + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public List getScalarList(String name, Class scalarType) { + Object value = values.get(name); + if (!(value instanceof List)) { + return null; + } + List raw = (List) value; + List casted = new ArrayList<>(raw.size()); + for (Object element : raw) { + if (!scalarType.isInstance(element)) { + return null; + } + casted.add((T) element); + } + return casted; + } + + @SuppressWarnings("unchecked") + @Override + public DeclarativeConfigProperties getStructured(String name) { + Object value = values.get(name); + if (!(value instanceof Map)) { + return null; + } + return new MapBackedDeclarativeConfigProperties((Map) value); + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public List getStructuredList(String name) { + Object value = values.get(name); + if (!(value instanceof List)) { + return null; + } + List raw = (List) value; + List result = new ArrayList<>(raw.size()); + for (Object element : raw) { + if (!(element instanceof Map)) { + return null; + } + result.add(new MapBackedDeclarativeConfigProperties((Map) element)); + } + return result; + } + + @Override + public Set getPropertyKeys() { + return values.keySet(); + } + + @Override + public ComponentLoader getComponentLoader() { + return COMPONENT_LOADER; + } + } +} diff --git a/sdk-extensions/declarative-config/src/main/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfiguration.java b/sdk-extensions/declarative-config/src/main/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfiguration.java index 9628f238176..6bff35a4614 100644 --- a/sdk-extensions/declarative-config/src/main/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfiguration.java +++ b/sdk-extensions/declarative-config/src/main/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfiguration.java @@ -18,6 +18,7 @@ import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; import io.opentelemetry.sdk.declarativeconfig.internal.model.OpenTelemetryConfigurationModel; import io.opentelemetry.sdk.declarativeconfig.internal.model.SamplerModel; +import io.opentelemetry.sdk.internal.YamlDeclarativeConfigProperties; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.io.Closeable; import java.io.IOException; diff --git a/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfigContextTest.java b/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfigContextTest.java index 36053361d13..b7af6bc609a 100644 --- a/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfigContextTest.java +++ b/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/DeclarativeConfigContextTest.java @@ -13,6 +13,7 @@ import io.opentelemetry.api.incubator.config.DeclarativeConfigException; import io.opentelemetry.common.ComponentLoader; import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.internal.YamlDeclarativeConfigProperties; import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; import org.junit.jupiter.api.Test; diff --git a/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/YamlDeclarativeConfigPropertiesTest.java b/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/YamlDeclarativeConfigPropertiesTest.java index acc5971cb67..ea61be4e333 100644 --- a/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/YamlDeclarativeConfigPropertiesTest.java +++ b/sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/YamlDeclarativeConfigPropertiesTest.java @@ -13,6 +13,7 @@ import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.declarativeconfig.internal.model.OpenTelemetryConfigurationModel; +import io.opentelemetry.sdk.internal.YamlDeclarativeConfigProperties; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; diff --git a/sdk/all/src/main/java/io/opentelemetry/sdk/internal/ExtendedOpenTelemetrySdk.java b/sdk/all/src/main/java/io/opentelemetry/sdk/internal/ExtendedOpenTelemetrySdk.java index dc89b116108..b05b507ef05 100644 --- a/sdk/all/src/main/java/io/opentelemetry/sdk/internal/ExtendedOpenTelemetrySdk.java +++ b/sdk/all/src/main/java/io/opentelemetry/sdk/internal/ExtendedOpenTelemetrySdk.java @@ -6,6 +6,8 @@ package io.opentelemetry.sdk.internal; import io.opentelemetry.api.incubator.ExtendedOpenTelemetry; +import io.opentelemetry.api.incubator.config.ConfigChangeListener; +import io.opentelemetry.api.incubator.config.ConfigChangeRegistration; import io.opentelemetry.api.incubator.config.ConfigProvider; import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -50,6 +52,12 @@ public SdkConfigProvider getSdkConfigProvider() { return configProvider.unobfuscate(); } + @Override + public void close() { + configProvider.unobfuscate().shutdown(); + super.close(); + } + @Override public String toString() { return "ExtendedOpenTelemetrySdk{" @@ -81,6 +89,12 @@ public DeclarativeConfigProperties getInstrumentationConfig() { return delegate.getInstrumentationConfig(); } + @Override + public ConfigChangeRegistration addConfigChangeListener( + String path, ConfigChangeListener listener) { + return delegate.addConfigChangeListener(path, listener); + } + private SdkConfigProvider unobfuscate() { return delegate; } diff --git a/sdk/all/src/main/java/io/opentelemetry/sdk/internal/SdkConfigProvider.java b/sdk/all/src/main/java/io/opentelemetry/sdk/internal/SdkConfigProvider.java index a5399ed4a23..0e0c797092a 100644 --- a/sdk/all/src/main/java/io/opentelemetry/sdk/internal/SdkConfigProvider.java +++ b/sdk/all/src/main/java/io/opentelemetry/sdk/internal/SdkConfigProvider.java @@ -5,8 +5,23 @@ package io.opentelemetry.sdk.internal; +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.incubator.config.ConfigChangeListener; +import io.opentelemetry.api.incubator.config.ConfigChangeRegistration; import io.opentelemetry.api.incubator.config.ConfigProvider; +import io.opentelemetry.api.incubator.config.DeclarativeConfigException; import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; /** * SDK implementation of {@link ConfigProvider}. @@ -16,11 +31,17 @@ * guarantees are made. */ public final class SdkConfigProvider implements ConfigProvider { + private static final Logger logger = Logger.getLogger(SdkConfigProvider.class.getName()); + private static final ConfigChangeRegistration NOOP_CHANGE_REGISTRATION = () -> {}; - private final DeclarativeConfigProperties instrumentationConfig; + private final Object lock = new Object(); + private volatile DeclarativeConfigProperties openTelemetryConfigModel; + private final ConcurrentMap> listenersByPath = + new ConcurrentHashMap<>(); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private SdkConfigProvider(DeclarativeConfigProperties openTelemetryConfigModel) { - this.instrumentationConfig = openTelemetryConfigModel.get("instrumentation/development"); + this.openTelemetryConfigModel = requireNonNull(openTelemetryConfigModel); } /** @@ -36,11 +57,350 @@ public static SdkConfigProvider create(DeclarativeConfigProperties openTelemetry @Override public DeclarativeConfigProperties getInstrumentationConfig() { - return instrumentationConfig; + return openTelemetryConfigModel.get("instrumentation/development"); + } + + @Override + public ConfigChangeRegistration addConfigChangeListener( + String path, ConfigChangeListener listener) { + requireNonNull(listener, "listener"); + String watchedPath = normalizeAndValidatePath(path); + if (isShutdown.get()) { + return NOOP_CHANGE_REGISTRATION; + } + synchronized (lock) { + if (isShutdown.get()) { + return NOOP_CHANGE_REGISTRATION; + } + ListenerRegistration registration = new ListenerRegistration(watchedPath, listener); + listenersByPath + .computeIfAbsent(watchedPath, unused -> new CopyOnWriteArrayList<>()) + .add(registration); + return registration; + } + } + + /** + * Sets the configuration value at the given path. + * + *

The path uses {@code .} as a separator. The final segment is the key to set within the + * parent mapping. For example, {@code + * setConfig(".instrumentation/development.java.myLib.enabled", true)} sets the {@code enabled} + * key within the {@code .instrumentation/development.java.myLib} mapping. + * + *

The value may be a String, Boolean, Long, Double, Integer, {@link + * DeclarativeConfigProperties}, or a List whose elements are any of those types. + * + *

If a value already exists at the path, its type must not change. + * + * @param path the full declarative configuration path, including the key to set + * @param value the new value + * @throws IllegalArgumentException if the path does not include a key segment beyond the root, or + * if the value type is not supported + * @throws DeclarativeConfigException if the path traverses a non-mapping value, or if the + * existing value's type would change + */ + public void setConfig(String path, Object value) { + requireNonNull(value, "value"); + validateValue(value); + Object normalizedValue = normalizeValue(value); + String normalizedPath = normalizeAndValidatePath(path); + int lastDot = normalizedPath.lastIndexOf('.'); + String parentPath = lastDot == 0 ? "." : normalizedPath.substring(0, lastDot); + String key = normalizedPath.substring(lastDot + 1); + if (key.isEmpty()) { + throw new IllegalArgumentException( + "setConfig path must include a key segment beyond the root: " + path); + } + if (isShutdown.get()) { + return; + } + synchronized (lock) { + DeclarativeConfigProperties current = openTelemetryConfigModel; + Map currentRootMap = DeclarativeConfigProperties.toMap(current); + validateTypeUnchanged(currentRootMap, parentPath, key, normalizedValue, normalizedPath); + Map newRootMap = + withValueAtPath(currentRootMap, parentPath, key, normalizedValue, normalizedPath); + openTelemetryConfigModel = + YamlDeclarativeConfigProperties.create(newRootMap, current.getComponentLoader()); + notifyListeners(current, openTelemetryConfigModel); + } + } + + private void notifyListeners( + DeclarativeConfigProperties previous, DeclarativeConfigProperties updated) { + if (isShutdown.get()) { + return; + } + + for (Map.Entry> entry : + listenersByPath.entrySet()) { + String watchedPath = entry.getKey(); + DeclarativeConfigProperties previousConfigAtPath = resolvePath(previous, watchedPath); + DeclarativeConfigProperties updatedConfigAtPath = resolvePath(updated, watchedPath); + if (hasSameContents(previousConfigAtPath, updatedConfigAtPath)) { + continue; + } + + for (ListenerRegistration registration : entry.getValue()) { + registration.notifyChange(watchedPath, updatedConfigAtPath); + } + } + } + + void shutdown() { + synchronized (lock) { + if (!isShutdown.compareAndSet(false, true)) { + return; + } + listenersByPath.clear(); + } + } + + /** + * Returns a new map with {@code key}={@code value} set within the map at {@code parentPath}. + * Intermediate maps along the path are copied, not mutated. + */ + private static Map withValueAtPath( + Map rootMap, String parentPath, String key, Object value, String fullPath) { + String relativePath = parentPath.substring(1); + if (relativePath.isEmpty()) { + Map copy = new HashMap<>(rootMap); + copy.put(key, value); + return copy; + } + // TODO: this can be done in a single traversal rather than resolving the leaf then replacing + String[] segments = relativePath.split("\\."); + Map leafMap = resolveToMap(rootMap, segments, fullPath); + Map updatedLeaf = new HashMap<>(leafMap); + updatedLeaf.put(key, value); + return copyAndReplace(rootMap, segments, 0, fullPath, updatedLeaf); + } + + @SuppressWarnings("unchecked") + private static Map copyAndReplace( + Map current, + String[] segments, + int depth, + String normalizedPath, + Map replacement) { + Map copy = new HashMap<>(current); + String segment = segments[depth]; + if (depth == segments.length - 1) { + copy.put(segment, replacement); + return copy; + } + Object child = current.get(segment); + Map childMap; + if (child instanceof Map) { + childMap = (Map) child; + } else if (child == null) { + childMap = new HashMap<>(); + } else { + throw schemaConflict(normalizedPath, segment, child); + } + copy.put(segment, copyAndReplace(childMap, segments, depth + 1, normalizedPath, replacement)); + return copy; + } + + @SuppressWarnings("unchecked") + private static Map resolveToMap( + Map rootMap, String[] segments, String normalizedPath) { + Map current = rootMap; + for (String segment : segments) { + Object child = current.get(segment); + if (child instanceof Map) { + current = (Map) child; + } else if (child == null) { + return new HashMap<>(); + } else { + throw schemaConflict(normalizedPath, segment, child); + } + } + return current; + } + + private static DeclarativeConfigException schemaConflict( + String normalizedPath, String segment, Object actual) { + return new DeclarativeConfigException( + "Cannot traverse path '" + + normalizedPath + + "': segment '" + + segment + + "' resolves to a " + + typeName(actual) + + ", not a mapping"); + } + + // TODO: optimize later, this is an expensive operation. + // But note that we only do this on a mutation, and these are expected to be infrquent + // so maybe acceptable + private static boolean hasSameContents( + DeclarativeConfigProperties left, DeclarativeConfigProperties right) { + return DeclarativeConfigProperties.toMap(left).equals(DeclarativeConfigProperties.toMap(right)); + } + + private static DeclarativeConfigProperties resolvePath( + DeclarativeConfigProperties root, String watchedPath) { + String relativePath = watchedPath.substring(1); + if (relativePath.isEmpty()) { + return root; + } + + DeclarativeConfigProperties current = root; + String[] segments = relativePath.split("\\."); + for (String segment : segments) { + if (segment.isEmpty()) { + return DeclarativeConfigProperties.empty(); + } + current = current.get(segment); + } + return current; + } + + private static void validateTypeUnchanged( + Map rootMap, + String parentPath, + String key, + Object newValue, + String fullPath) { + String relativePath = parentPath.substring(1); + Map leafMap = + relativePath.isEmpty() + ? rootMap + : resolveToMap(rootMap, relativePath.split("\\."), fullPath); + Object existing = leafMap.get(key); + if (existing == null) { + return; // key doesn't exist yet, any type is allowed + } + boolean typeMismatch; + if (existing instanceof Map) { + typeMismatch = !(newValue instanceof Map); + } else if (existing instanceof List) { + typeMismatch = !(newValue instanceof List); + } else { + typeMismatch = !existing.getClass().equals(newValue.getClass()); + } + if (typeMismatch) { + throw new DeclarativeConfigException( + "Cannot change type at path '" + + fullPath + + "' from " + + typeName(existing) + + " to " + + typeName(newValue)); + } + } + + private static void validateValue(Object value) { + if (value instanceof String + || value instanceof Boolean + || value instanceof Long + || value instanceof Double + || value instanceof Integer + || value instanceof DeclarativeConfigProperties) { + return; + } + if (value instanceof List) { + for (Object element : (List) value) { + if (!(element instanceof String) + && !(element instanceof Boolean) + && !(element instanceof Long) + && !(element instanceof Double) + && !(element instanceof Integer) + && !(element instanceof DeclarativeConfigProperties)) { + throw new IllegalArgumentException( + "setConfig list value elements must be String, Boolean, Long, Double, Integer, or" + + " DeclarativeConfigProperties, got: " + + (element == null ? "null" : element.getClass().getName())); + } + } + return; + } + throw new IllegalArgumentException( + "setConfig value must be a String, Boolean, Long, Double, Integer," + + " DeclarativeConfigProperties, or List thereof, got: " + + value.getClass().getName()); + } + + private static Object normalizeValue(Object value) { + if (value instanceof DeclarativeConfigProperties) { + return DeclarativeConfigProperties.toMap((DeclarativeConfigProperties) value); + } + if (!(value instanceof List)) { + return value; + } + List normalized = new ArrayList<>(); + for (Object element : (List) value) { + normalized.add( + element instanceof DeclarativeConfigProperties + ? DeclarativeConfigProperties.toMap((DeclarativeConfigProperties) element) + : element); + } + return normalized; + } + + private static String typeName(Object value) { + if (value instanceof Map) { + return "mapping"; + } + if (value instanceof List) { + return "list"; + } + return value.getClass().getSimpleName(); + } + + private static String normalizeAndValidatePath(String path) { + String watchedPath = requireNonNull(path, "path").trim(); + if (!watchedPath.startsWith(".")) { + throw new IllegalArgumentException("Path must be absolute and start with '.': " + path); + } + if (watchedPath.indexOf('*') >= 0) { + throw new IllegalArgumentException("Path does not support wildcards: " + path); + } + if (watchedPath.indexOf('[') >= 0 || watchedPath.indexOf(']') >= 0) { + throw new IllegalArgumentException("Path does not support sequence indexing: " + path); + } + return watchedPath; + } + + private final class ListenerRegistration implements ConfigChangeRegistration { + private final String watchedPath; + private final ConfigChangeListener listener; + + private ListenerRegistration(String watchedPath, ConfigChangeListener listener) { + this.watchedPath = watchedPath; + this.listener = listener; + } + + @Override + public void close() { + synchronized (lock) { + CopyOnWriteArrayList registrations = listenersByPath.get(watchedPath); + if (registrations == null) { + return; + } + registrations.remove(this); + if (registrations.isEmpty()) { + listenersByPath.remove(watchedPath, registrations); + } + } + } + + private void notifyChange(String changedPath, DeclarativeConfigProperties updatedConfigAtPath) { + try { + listener.onChange(changedPath, updatedConfigAtPath); + } catch (Throwable throwable) { + logger.log( + Level.WARNING, + "Config change listener threw while handling path " + changedPath, + throwable); + } + } } @Override public String toString() { - return "SdkConfigProvider{" + "instrumentationConfig=" + instrumentationConfig + '}'; + return "SdkConfigProvider{" + "instrumentationConfig=" + getInstrumentationConfig() + '}'; } } diff --git a/sdk-extensions/declarative-config/src/main/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/YamlDeclarativeConfigProperties.java b/sdk/all/src/main/java/io/opentelemetry/sdk/internal/YamlDeclarativeConfigProperties.java similarity index 96% rename from sdk-extensions/declarative-config/src/main/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/YamlDeclarativeConfigProperties.java rename to sdk/all/src/main/java/io/opentelemetry/sdk/internal/YamlDeclarativeConfigProperties.java index 44ba67f890e..d1bd4c9fc74 100644 --- a/sdk-extensions/declarative-config/src/main/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/YamlDeclarativeConfigProperties.java +++ b/sdk/all/src/main/java/io/opentelemetry/sdk/internal/YamlDeclarativeConfigProperties.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.sdk.autoconfigure.declarativeconfig; +package io.opentelemetry.sdk.internal; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; @@ -25,16 +25,13 @@ import javax.annotation.Nullable; /** - * Implementation of {@link DeclarativeConfigProperties} which uses a file configuration model as a - * source. + * Implementation of {@link DeclarativeConfigProperties} backed by a parsed YAML/JSON map. * *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. * * @see #getStructured(String) Accessing nested maps * @see #getStructuredList(String) Accessing lists of maps - * @see DeclarativeConfiguration#toConfigProperties(Object, ComponentLoader) Converting - * configuration model to properties */ public final class YamlDeclarativeConfigProperties implements DeclarativeConfigProperties { @@ -70,8 +67,6 @@ private YamlDeclarativeConfigProperties( *

{@code properties} is expected to be the output of YAML parsing (i.e. with Jackson {@code * com.fasterxml.jackson.databind.ObjectMapper}), and have values which are scalars, lists of * scalars, lists of maps, and maps. - * - * @see DeclarativeConfiguration#toConfigProperties(Object) */ @SuppressWarnings("unchecked") public static YamlDeclarativeConfigProperties create(