diff --git a/Cargo.lock b/Cargo.lock index ee2358214560..12a02c3116a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11623,6 +11623,7 @@ dependencies = [ "tower 0.5.2", "tower-http 0.6.6", "tracing", + "tracing-opentelemetry", "urlencoding", "uuid", "vrl", @@ -12918,7 +12919,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.0.7", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5934ac792ee4..bd820d450ca1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -234,6 +234,7 @@ tower = "0.5" tower-http = "0.6" tracing = "0.1" tracing-appender = "0.2" +tracing-opentelemetry = "0.31.0" tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] } typetag = "0.2" uuid = { version = "1.17", features = ["serde", "v4", "fast-rng"] } diff --git a/docs/how-to/how-to-change-log-level-on-the-fly.md b/docs/how-to/how-to-change-log-level-on-the-fly.md index 16a72bf6aef0..c3bf2602a26d 100644 --- a/docs/how-to/how-to-change-log-level-on-the-fly.md +++ b/docs/how-to/how-to-change-log-level-on-the-fly.md @@ -13,4 +13,19 @@ Log Level changed from Some("info") to "trace,flow=debug"% The data is a string in the format of `global_level,module1=level1,module2=level2,...` that follows the same rule of `RUST_LOG`. -The module is the module name of the log, and the level is the log level. The log level can be one of the following: `trace`, `debug`, `info`, `warn`, `error`, `off`(case insensitive). \ No newline at end of file +The module is the module name of the log, and the level is the log level. The log level can be one of the following: `trace`, `debug`, `info`, `warn`, `error`, `off`(case insensitive). + +# Enable/Disable Trace on the Fly + +## HTTP API + +example: +```bash +curl --data "true" 127.0.0.1:4000/debug/enable_trace +``` +And database will reply with something like: +``` +trace enabled% +``` + +Possible values are "true" or "false". diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index d0bc6876bc70..92c3304d53f4 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -35,5 +35,5 @@ tokio.workspace = true tracing = "0.1" tracing-appender.workspace = true tracing-log = "0.2" -tracing-opentelemetry = "0.31.0" +tracing-opentelemetry.workspace = true tracing-subscriber.workspace = true diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs index ba46bfa0d9f5..cd60d6164552 100644 --- a/src/common/telemetry/src/lib.rs +++ b/src/common/telemetry/src/lib.rs @@ -21,7 +21,10 @@ mod panic_hook; pub mod tracing_context; mod tracing_sampler; -pub use logging::{RELOAD_HANDLE, init_default_ut_logging, init_global_logging}; +pub use logging::{ + LOG_RELOAD_HANDLE, TRACE_RELOAD_HANDLE, get_or_init_tracer, init_default_ut_logging, + init_global_logging, +}; pub use metric::dump_metrics; pub use panic_hook::set_panic_hook; pub use {common_error, tracing, tracing_subscriber}; diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index d2b8a64b3945..20b519b62f20 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -25,7 +25,7 @@ use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig}; use opentelemetry_sdk::propagation::TraceContextPropagator; -use opentelemetry_sdk::trace::Sampler; +use opentelemetry_sdk::trace::{Sampler, Tracer}; use opentelemetry_semantic_conventions::resource; use serde::{Deserialize, Serialize}; use tracing_appender::non_blocking::WorkerGuard; @@ -33,7 +33,7 @@ use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_log::LogTracer; use tracing_subscriber::filter::{FilterFn, Targets}; use tracing_subscriber::fmt::Layer; -use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::layer::{Layered, SubscriberExt}; use tracing_subscriber::prelude::*; use tracing_subscriber::{EnvFilter, Registry, filter}; @@ -48,10 +48,31 @@ pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318/v1/traces"; /// The default logs directory. pub const DEFAULT_LOGGING_DIR: &str = "logs"; -// Handle for reloading log level -pub static RELOAD_HANDLE: OnceCell> = +/// Handle for reloading log level +pub static LOG_RELOAD_HANDLE: OnceCell> = OnceCell::new(); +type TraceReloadHandle = tracing_subscriber::reload::Handle< + Vec< + tracing_opentelemetry::OpenTelemetryLayer< + Layered, Registry>, + Tracer, + >, + >, + Layered, Registry>, +>; + +/// Handle for reloading trace level +pub static TRACE_RELOAD_HANDLE: OnceCell = OnceCell::new(); + +static TRACER: OnceCell> = OnceCell::new(); + +#[derive(Debug)] +enum TraceState { + Ready(Tracer), + Deferred(TraceContext), +} + /// The logging options that used to initialize the logger. #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -167,6 +188,13 @@ impl PartialEq for LoggingOptions { impl Eq for LoggingOptions {} +#[derive(Clone, Debug)] +struct TraceContext { + app_name: String, + node_id: String, + logging_opts: LoggingOptions, +} + impl Default for LoggingOptions { fn default() -> Self { Self { @@ -242,6 +270,7 @@ pub fn init_global_logging( ) -> Vec { static START: Once = Once::new(); let mut guards = vec![]; + let node_id = node_id.unwrap_or_else(|| "none".to_string()); START.call_once(|| { // Enable log compatible layer to convert log record to tracing span. @@ -357,10 +386,39 @@ pub fn init_global_logging( let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone()); - RELOAD_HANDLE + LOG_RELOAD_HANDLE .set(reload_handle) .expect("reload handle already set, maybe init_global_logging get called twice?"); + let mut initial_tracer = None; + let trace_state = if opts.enable_otlp_tracing { + let tracer = create_tracer(app_name, &node_id, opts); + initial_tracer = Some(tracer.clone()); + TraceState::Ready(tracer) + } else { + TraceState::Deferred(TraceContext { + app_name: app_name.to_string(), + node_id: node_id.clone(), + logging_opts: opts.clone(), + }) + }; + + TRACER + .set(Mutex::new(trace_state)) + .expect("trace state already initialized"); + + let initial_trace_layers = initial_tracer + .as_ref() + .map(|tracer| vec![tracing_opentelemetry::layer().with_tracer(tracer.clone())]) + .unwrap_or_else(Vec::new); + + let (dyn_trace_layer, trace_reload_handle) = + tracing_subscriber::reload::Layer::new(initial_trace_layers); + + TRACE_RELOAD_HANDLE + .set(trace_reload_handle) + .unwrap_or_else(|_| panic!("failed to set trace reload handle")); + // Must enable 'tokio_unstable' cfg to use this feature. // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start` #[cfg(feature = "tokio-console")] @@ -383,6 +441,7 @@ pub fn init_global_logging( Registry::default() .with(dyn_filter) + .with(dyn_trace_layer) .with(tokio_console_layer) .with(stdout_logging_layer) .with(file_logging_layer) @@ -396,53 +455,61 @@ pub fn init_global_logging( #[cfg(not(feature = "tokio-console"))] let subscriber = Registry::default() .with(dyn_filter) + .with(dyn_trace_layer) .with(stdout_logging_layer) .with(file_logging_layer) .with(err_file_logging_layer) .with(slow_query_logging_layer); - if opts.enable_otlp_tracing { - global::set_text_map_propagator(TraceContextPropagator::new()); - - let sampler = opts - .tracing_sample_ratio - .as_ref() - .map(create_sampler) - .map(Sampler::ParentBased) - .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))); - - let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() - .with_batch_exporter(build_otlp_exporter(opts)) - .with_sampler(sampler) - .with_resource( - opentelemetry_sdk::Resource::builder_empty() - .with_attributes([ - KeyValue::new(resource::SERVICE_NAME, app_name.to_string()), - KeyValue::new( - resource::SERVICE_INSTANCE_ID, - node_id.unwrap_or("none".to_string()), - ), - KeyValue::new(resource::SERVICE_VERSION, common_version::version()), - KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), - ]) - .build(), - ) - .build(); - let tracer = provider.tracer("greptimedb"); + global::set_text_map_propagator(TraceContextPropagator::new()); - tracing::subscriber::set_global_default( - subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)), - ) + tracing::subscriber::set_global_default(subscriber) .expect("error setting global tracing subscriber"); - } else { - tracing::subscriber::set_global_default(subscriber) - .expect("error setting global tracing subscriber"); - } }); guards } +fn create_tracer(app_name: &str, node_id: &str, opts: &LoggingOptions) -> Tracer { + let sampler = opts + .tracing_sample_ratio + .as_ref() + .map(create_sampler) + .map(Sampler::ParentBased) + .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))); + + let resource = opentelemetry_sdk::Resource::builder_empty() + .with_attributes([ + KeyValue::new(resource::SERVICE_NAME, app_name.to_string()), + KeyValue::new(resource::SERVICE_INSTANCE_ID, node_id.to_string()), + KeyValue::new(resource::SERVICE_VERSION, common_version::version()), + KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), + ]) + .build(); + + opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_batch_exporter(build_otlp_exporter(opts)) + .with_sampler(sampler) + .with_resource(resource) + .build() + .tracer("greptimedb") +} + +/// Ensure that the OTLP tracer has been constructed, building it lazily if needed. +pub fn get_or_init_tracer() -> Result { + let state = TRACER.get().ok_or("trace state is not initialized")?; + let mut guard = state.lock().expect("trace state lock poisoned"); + + match &mut *guard { + TraceState::Ready(tracer) => Ok(tracer.clone()), + TraceState::Deferred(context) => { + let tracer = create_tracer(&context.app_name, &context.node_id, &context.logging_opts); + *guard = TraceState::Ready(tracer.clone()); + Ok(tracer) + } + } +} + fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporter { let protocol = opts .otlp_export_protocol diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index ee7d4fbdd44a..ebc45d7acad4 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -83,6 +83,7 @@ once_cell.workspace = true openmetrics-parser = "0.4" simd-json.workspace = true socket2 = "0.5" +tracing-opentelemetry.workspace = true # use crates.io version once the following PRs is merged into the nextest release # 1. fix: Use After Free in PacketReader in https://github.com/databendlabs/opensrv/pull/67 # 2. Use ring, instead of aws-lc-rs in https://github.com/databendlabs/opensrv/pull/72 diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 8fa658b6bb83..281ac108d030 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -89,6 +89,7 @@ pub mod authorize; #[cfg(feature = "dashboard")] mod dashboard; pub mod dyn_log; +pub mod dyn_trace; pub mod event; pub mod extractor; pub mod handler; @@ -908,6 +909,7 @@ impl HttpServer { Router::new() // handler for changing log level dynamically .route("/log_level", routing::post(dyn_log::dyn_log_handler)) + .route("/enable_trace", routing::post(dyn_trace::dyn_trace_handler)) .nest( "/prof", Router::new() diff --git a/src/servers/src/http/dyn_log.rs b/src/servers/src/http/dyn_log.rs index b82ecdadd6f2..e9a58c2d7457 100644 --- a/src/servers/src/http/dyn_log.rs +++ b/src/servers/src/http/dyn_log.rs @@ -15,7 +15,7 @@ use axum::http::StatusCode; use axum::response::IntoResponse; use common_telemetry::tracing_subscriber::filter; -use common_telemetry::{RELOAD_HANDLE, info}; +use common_telemetry::{LOG_RELOAD_HANDLE, info}; use snafu::OptionExt; use crate::error::{InternalSnafu, InvalidParameterSnafu, Result}; @@ -29,7 +29,7 @@ pub async fn dyn_log_handler(level: String) -> Result { .build() })?; let mut old_filter = None; - RELOAD_HANDLE + LOG_RELOAD_HANDLE .get() .context(InternalSnafu { err_msg: "Reload handle not initialized", diff --git a/src/servers/src/http/dyn_trace.rs b/src/servers/src/http/dyn_trace.rs new file mode 100644 index 000000000000..888806b360bb --- /dev/null +++ b/src/servers/src/http/dyn_trace.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::http::StatusCode; +use axum::response::IntoResponse; +use common_telemetry::{TRACE_RELOAD_HANDLE, error, get_or_init_tracer, info}; + +use crate::error::{InvalidParameterSnafu, Result}; + +#[axum_macros::debug_handler] +pub async fn dyn_trace_handler(enable_str: String) -> Result { + let enable = enable_str.parse::().map_err(|e| { + InvalidParameterSnafu { + reason: format!("Invalid parameter \"enable\": {e:?}"), + } + .build() + })?; + + let Some(trace_reload_handle) = TRACE_RELOAD_HANDLE.get() else { + return Ok(( + StatusCode::SERVICE_UNAVAILABLE, + "trace reload handle is not initialized".to_string(), + )); + }; + + if enable { + let tracer = match get_or_init_tracer() { + Ok(tracer) => tracer, + Err(reason) => { + return Ok((StatusCode::SERVICE_UNAVAILABLE, reason.to_string())); + } + }; + + let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); + match trace_reload_handle.reload(vec![trace_layer]) { + Ok(_) => { + info!("trace enabled"); + Ok((StatusCode::OK, "trace enabled".to_string())) + } + Err(e) => { + error!(e; "Failed to enable trace"); + Ok(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to enable trace: {e}"), + )) + } + } + } else { + match trace_reload_handle.reload(vec![]) { + Ok(_) => { + info!("trace disabled"); + Ok((StatusCode::OK, "trace disabled".to_string())) + } + Err(e) => { + error!(e; "Failed to disable trace"); + Ok(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to disable trace: {e}"), + )) + } + } + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index b4d437cd994c..d0115f944101 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -101,6 +101,7 @@ macro_rules! http_tests { test_health_api, test_status_api, test_config_api, + test_dynamic_tracer_toggle, test_dashboard_path, test_prometheus_remote_write, test_prometheus_remote_special_labels, @@ -1627,6 +1628,35 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { ) } +pub async fn test_dynamic_tracer_toggle(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + + let (app, mut guard) = setup_test_http_app(store_type, "test_dynamic_tracer_toggle").await; + let client = TestClient::new(app).await; + + let disable_resp = client + .post("/debug/enable_trace") + .body("false") + .send() + .await; + assert_eq!(disable_resp.status(), StatusCode::OK); + assert_eq!(disable_resp.text().await, "trace disabled"); + + let enable_resp = client.post("/debug/enable_trace").body("true").send().await; + assert_eq!(enable_resp.status(), StatusCode::OK); + assert_eq!(enable_resp.text().await, "trace enabled"); + + let cleanup_resp = client + .post("/debug/enable_trace") + .body("false") + .send() + .await; + assert_eq!(cleanup_resp.status(), StatusCode::OK); + assert_eq!(cleanup_resp.text().await, "trace disabled"); + + guard.remove_all().await; +} + #[cfg(feature = "dashboard")] pub async fn test_dashboard_path(store_type: StorageType) { common_telemetry::init_default_ut_logging();