diff --git a/.github/workflows/ci-tap.yml b/.github/workflows/ci-tap.yml index 3f57833..4317e28 100644 --- a/.github/workflows/ci-tap.yml +++ b/.github/workflows/ci-tap.yml @@ -96,11 +96,31 @@ jobs: run: | docker exec clickhouse-test clickhouse-client --multiquery < docker/init/00-schema.sql + - name: Start OTel Collector + run: | + export OTEL_DATA_DIR=/tmp/psch-otel-data + mkdir -p "$OTEL_DATA_DIR" + chmod 777 "$OTEL_DATA_DIR" + docker compose -f docker/docker-compose.otel.yml up -d + for i in {1..30}; do + if curl -sf http://localhost:13133/ >/dev/null 2>&1; then + echo "OTel Collector ready" + exit 0 + fi + echo "Waiting for OTel Collector... ($i/30)" + sleep 1 + done + echo "OTel Collector not ready after 30s" + docker logs psch-otel-collector + exit 1 + - name: Run TAP tests run: | export PATH="${{ env.PG_TAP_DIR }}/bin:$PATH" export PG_REGRESS="${{ env.PG_TAP_DIR }}/lib/postgresql/pgxs/src/test/regress/pg_regress" export PERL_LIB="${{ env.PG_TAP_DIR }}/lib/postgresql/pgxs/src/test/perl" + export OTEL_DATA_DIR=/tmp/psch-otel-data + export PROJECT_DIR="${{ github.workspace }}" prove -v --timer -I "$PERL_LIB" -I t t/*.pl - name: Upload logs on failure @@ -113,3 +133,7 @@ jobs: - name: Cleanup ClickHouse if: always() run: docker rm -f clickhouse-test 2>/dev/null || true + + - name: Cleanup OTel Collector + if: always() + run: docker compose -f docker/docker-compose.otel.yml down -v 2>/dev/null || true diff --git a/.gitignore b/.gitignore index 80e4958..1e8fbc2 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ *.a .deps/ build/ +CMakeCache.txt +CMakeFiles/ # Compile database compile_commands.json diff --git a/.gitmodules b/.gitmodules index a8b7052..f9a457f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "third_party/clickhouse-cpp"] path = third_party/clickhouse-cpp url = https://github.com/ClickHouse/clickhouse-cpp.git +[submodule "third_party/opentelemetry-cpp"] + path = third_party/opentelemetry-cpp + url = https://github.com/open-telemetry/opentelemetry-cpp.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 3866a7f..3b56bf3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,12 +25,38 @@ option(WITH_OPENSSL "Enable OpenSSL for TLS connections" ON) include(PgStatChOpenSSL) pg_stat_ch_setup_openssl() -# Add clickhouse-cpp library (statically linked, compiled with PIC for shared library linking) +# Build all third-party libraries as static with PIC set(CMAKE_POSITION_INDEPENDENT_CODE ON) -set(BUILD_SHARED_LIBS OFF CACHE BOOL "Build clickhouse-cpp as static library" FORCE) +set(BUILD_SHARED_LIBS OFF CACHE BOOL "Build third-party libs as static" FORCE) + +# --------------------------------------------------------------------------- +# Add opentelemetry-cpp FIRST (fetches gRPC + abseil via FetchContent). +# This must come before clickhouse-cpp to avoid duplicate abseil targets. +# --------------------------------------------------------------------------- +include(FetchContent) +set(WITH_OTLP_GRPC ON CACHE BOOL "Enable OTel OTLP gRPC exporter" FORCE) +set(WITH_OTLP_HTTP OFF CACHE BOOL "" FORCE) +set(BUILD_TESTING OFF CACHE BOOL "Disable tests" FORCE) +set(WITH_BENCHMARK OFF CACHE BOOL "Disable benchmarks" FORCE) +set(WITH_EXAMPLES OFF CACHE BOOL "Disable OTel examples" FORCE) +set(WITH_FUNC_TESTS OFF CACHE BOOL "Disable OTel functional tests" FORCE) +set(OPENTELEMETRY_INSTALL OFF CACHE BOOL "" FORCE) +set(WITH_ABSEIL ON CACHE BOOL "Use Abseil for OTel" FORCE) +# Always use vendored gRPC + abseil (never pick up system packages). +# This prevents find_package(gRPC) from short-circuiting FetchContent +# and leaving absl:: targets undefined. +set(CMAKE_DISABLE_FIND_PACKAGE_gRPC TRUE) +add_subdirectory(third_party/opentelemetry-cpp EXCLUDE_FROM_ALL) + +# --------------------------------------------------------------------------- +# Add clickhouse-cpp AFTER opentelemetry-cpp. +# Use WITH_SYSTEM_ABSEIL so it finds the abseil already built by gRPC above +# (via our cmake/Findabsl.cmake shim). +# --------------------------------------------------------------------------- +set(WITH_SYSTEM_ABSEIL ON CACHE BOOL "Use abseil from gRPC build" FORCE) set(WITH_OPENSSL ${WITH_OPENSSL} CACHE BOOL "Pass OpenSSL setting to clickhouse-cpp" FORCE) -set(BUILD_TESTS OFF CACHE BOOL "Disable clickhouse-cpp tests") -set(BUILD_BENCHMARK OFF CACHE BOOL "Disable clickhouse-cpp benchmarks") +set(BUILD_TESTS OFF CACHE BOOL "Disable clickhouse-cpp tests" FORCE) +set(BUILD_BENCHMARK OFF CACHE BOOL "Disable clickhouse-cpp benchmarks" FORCE) add_subdirectory(third_party/clickhouse-cpp EXCLUDE_FROM_ALL) # Collect source files @@ -50,10 +76,19 @@ target_include_directories(pg_stat_ch SYSTEM PRIVATE ${CMAKE_SOURCE_DIR}/third_party/clickhouse-cpp ${CMAKE_SOURCE_DIR}/third_party/clickhouse-cpp/contrib/absl ${CMAKE_SOURCE_DIR}/third_party/clickhouse-cpp/contrib/zstd + ${CMAKE_SOURCE_DIR}/third_party/opentelemetry-cpp/api/include + ${CMAKE_SOURCE_DIR}/third_party/opentelemetry-cpp/sdk/include + ${CMAKE_SOURCE_DIR}/third_party/opentelemetry-cpp/exporters/otlp/include ) target_link_libraries(pg_stat_ch PRIVATE PostgreSQLServer::PostgreSQLServer clickhouse-cpp-lib + opentelemetry_api + opentelemetry_sdk + opentelemetry_exporter_otlp_grpc_metrics + opentelemetry_exporter_otlp_grpc_log + opentelemetry_metrics + opentelemetry_logs ) if(WITH_OPENSSL) target_link_libraries(pg_stat_ch PRIVATE OpenSSL::SSL OpenSSL::Crypto) diff --git a/cmake/Findabsl.cmake b/cmake/Findabsl.cmake new file mode 100644 index 0000000..b4046f0 --- /dev/null +++ b/cmake/Findabsl.cmake @@ -0,0 +1,16 @@ +# Findabsl.cmake - Bridge find_package(absl) to vendored abseil targets. +# +# clickhouse-cpp calls find_package(absl REQUIRED) when WITH_SYSTEM_ABSEIL=ON. +# The vendored gRPC (fetched by opentelemetry-cpp) builds abseil as a +# subdirectory, so there's no abslConfig.cmake on disk. This Module-mode +# find script checks that the required target exists and sets absl_FOUND. + +if(TARGET absl::int128) + set(absl_FOUND TRUE) +else() + set(absl_FOUND FALSE) + if(absl_FIND_REQUIRED) + message(FATAL_ERROR "absl::int128 target not found. " + "Ensure opentelemetry-cpp (which fetches gRPC + abseil) is added before clickhouse-cpp.") + endif() +endif() diff --git a/docker/docker-compose.otel.yml b/docker/docker-compose.otel.yml new file mode 100644 index 0000000..29bffd5 --- /dev/null +++ b/docker/docker-compose.otel.yml @@ -0,0 +1,10 @@ +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:0.120.0 + container_name: psch-otel-collector + ports: + - "4317:4317" + - "13133:13133" + volumes: + - ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro + - ${OTEL_DATA_DIR:-/tmp/psch-otel-data}:/otel-data diff --git a/docker/otel-collector-config.yaml b/docker/otel-collector-config.yaml new file mode 100644 index 0000000..08dd7a9 --- /dev/null +++ b/docker/otel-collector-config.yaml @@ -0,0 +1,29 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + file/logs: + path: /otel-data/logs.jsonl + format: json + flush_interval: 500ms + file/metrics: + path: /otel-data/metrics.jsonl + format: json + flush_interval: 500ms + +extensions: + health_check: + endpoint: 0.0.0.0:13133 + +service: + extensions: [health_check] + pipelines: + logs: + receivers: [otlp] + exporters: [file/logs] + metrics: + receivers: [otlp] + exporters: [file/metrics] diff --git a/include/config/guc.h b/include/config/guc.h index 684b36c..d6b4fd6 100644 --- a/include/config/guc.h +++ b/include/config/guc.h @@ -4,6 +4,7 @@ #define PG_STAT_CH_GUC_H extern bool psch_enabled; +extern bool psch_use_otel; extern char* psch_clickhouse_host; extern int psch_clickhouse_port; extern char* psch_clickhouse_user; @@ -11,6 +12,8 @@ extern char* psch_clickhouse_password; extern char* psch_clickhouse_database; extern bool psch_clickhouse_use_tls; extern bool psch_clickhouse_skip_tls_verify; +extern char* psch_otel_endpoint; +extern char* psch_hostname; extern int psch_queue_capacity; extern int psch_flush_interval_ms; extern int psch_batch_max; diff --git a/scripts/run-tests.sh b/scripts/run-tests.sh index 3077acc..002d3a4 100755 --- a/scripts/run-tests.sh +++ b/scripts/run-tests.sh @@ -17,7 +17,7 @@ usage() { echo "Usage: $0 [test_type] [test_filter]" echo " PG_VERSION: PostgreSQL version (16, 17, 18) - uses mise" echo " PG_PATH: Path to local PostgreSQL installation" - echo " test_type: regress, tap, isolation, stress, clickhouse, or all (default: all)" + echo " test_type: regress, tap, isolation, stress, clickhouse, otel, or all (default: all)" echo " test_filter: (tap only) pattern to match test files, e.g., '021' for t/*021*.pl" echo "" echo "Examples:" @@ -27,6 +27,7 @@ usage() { echo " $0 ../postgres/install_tap tap 021 # Run only t/*021*.pl test" echo " $0 18 stress # Run only stress test against PG 18" echo " $0 18 clickhouse # Run ClickHouse integration tests (requires Docker)" + echo " $0 18 otel # Run OTel integration tests (requires Docker)" exit 1 } @@ -250,6 +251,68 @@ run_clickhouse() { t/010_clickhouse_export.pl t/011_clickhouse_reconnect.pl } +# Run OTel integration tests +run_otel() { + log_info "Running OTel integration tests..." + + # Check if Docker is available + if ! command -v docker &> /dev/null; then + log_error "Docker is required for OTel tests" + return 1 + fi + + # Check if OTel Collector is running, start if not + if ! curl -sf http://localhost:13133/ >/dev/null 2>&1; then + log_info "Starting OTel Collector container..." + + export OTEL_DATA_DIR="${OTEL_DATA_DIR:-/tmp/psch-otel-data}" + mkdir -p "${OTEL_DATA_DIR}" + chmod 777 "${OTEL_DATA_DIR}" + + docker compose -f docker/docker-compose.otel.yml up -d + # Wait for health check + for i in $(seq 1 30); do + if curl -sf http://localhost:13133/ >/dev/null 2>&1; then + break + fi + sleep 1 + done + + if ! curl -sf http://localhost:13133/ >/dev/null 2>&1; then + log_error "OTel Collector failed to start" + return 1 + fi + fi + + local perl_lib="${PG_LIB}/pgxs/src/test/perl" + if [[ ! -d "${perl_lib}" ]]; then + log_warn "PostgreSQL TAP test modules not found at ${perl_lib}" + log_warn "OTel tests require PostgreSQL built with --enable-tap-tests" + log_warn "Skipping OTel tests." + return 0 + fi + + local pg_regress="${PG_LIB}/pgxs/src/test/regress/pg_regress" + if [[ ! -x "${pg_regress}" ]]; then + log_warn "pg_regress not found at ${pg_regress}" + log_warn "Skipping OTel tests." + return 0 + fi + + # Clean up stale test data directories + rm -rf tmp_check + + # Set env vars for the test helper module + export PROJECT_DIR="${PROJECT_DIR}" + export OTEL_DATA_DIR="${OTEL_DATA_DIR:-/tmp/psch-otel-data}" + + # Run only OTel-related tests + PG_REGRESS="${pg_regress}" prove -v --timer \ + -I "${perl_lib}" \ + -I t \ + t/024_otel_export.pl t/025_otel_reconnect.pl +} + # Main execution case "${TEST_TYPE}" in regress) @@ -267,6 +330,9 @@ case "${TEST_TYPE}" in clickhouse) run_clickhouse ;; + otel) + run_otel + ;; all) run_regress run_tap diff --git a/src/config/guc.cc b/src/config/guc.cc index 21ddf30..ed13210 100644 --- a/src/config/guc.cc +++ b/src/config/guc.cc @@ -12,6 +12,7 @@ extern "C" { // GUC variable storage bool psch_enabled = true; +bool psch_use_otel = false; char* psch_clickhouse_host = nullptr; int psch_clickhouse_port = 9000; char* psch_clickhouse_user = nullptr; @@ -19,6 +20,8 @@ char* psch_clickhouse_password = nullptr; char* psch_clickhouse_database = nullptr; bool psch_clickhouse_use_tls = false; bool psch_clickhouse_skip_tls_verify = false; +char* psch_otel_endpoint = nullptr; +char* psch_hostname = nullptr; int psch_queue_capacity = 131072; int psch_flush_interval_ms = 200; int psch_batch_max = 200000; @@ -79,6 +82,16 @@ void PschInitGuc(void) { 0, // flags nullptr, nullptr, nullptr); // hooks + DefineCustomBoolVariable( + "pg_stat_ch.use_otel", + "Send metrics through OpenTelemetry instead of ClickHouse.", + "When enabled, stats will be sent to an OTel endpoint instead of ClickHouse.", + &psch_use_otel, + false, + PGC_POSTMASTER, + 0, + nullptr, nullptr, nullptr); + DefineCustomStringVariable( "pg_stat_ch.clickhouse_host", "ClickHouse server hostname.", @@ -150,6 +163,26 @@ void PschInitGuc(void) { 0, nullptr, nullptr, nullptr); + DefineCustomStringVariable( + "pg_stat_ch.otel_endpoint", + "OpenTelemetry gRPC endpoint (host:port).", + nullptr, + &psch_otel_endpoint, + "localhost:4317", + PGC_POSTMASTER, + 0, + nullptr, nullptr, nullptr); + + DefineCustomStringVariable( + "pg_stat_ch.hostname", + "The hostname of the current machine.", + nullptr, + &psch_hostname, + "", + PGC_POSTMASTER, + 0, + nullptr, nullptr, nullptr); + DefineCustomIntVariable( "pg_stat_ch.queue_capacity", "Maximum number of events in the shared memory queue (must be a power of 2).", diff --git a/src/config/guc.h b/src/config/guc.h index e42e087..b83037a 100644 --- a/src/config/guc.h +++ b/src/config/guc.h @@ -10,11 +10,14 @@ extern "C" { // GUC variables (defined in guc.cc) extern bool psch_enabled; +extern bool psch_use_otel; extern char* psch_clickhouse_host; extern int psch_clickhouse_port; extern char* psch_clickhouse_user; extern char* psch_clickhouse_password; extern char* psch_clickhouse_database; +extern char* psch_otel_endpoint; +extern char* psch_hostname; extern int psch_queue_capacity; extern int psch_flush_interval_ms; extern int psch_batch_max; diff --git a/src/export/clickhouse_exporter.cc b/src/export/clickhouse_exporter.cc index 13a9790..e7d82f6 100644 --- a/src/export/clickhouse_exporter.cc +++ b/src/export/clickhouse_exporter.cc @@ -11,319 +11,99 @@ extern "C" { #include #include "config/guc.h" -#include "export/clickhouse_exporter.h" -#include "queue/event.h" -#include "queue/shmem.h" +#include "export/exporter_interface.h" +#include "queue/shmem.h" // PschRecordExportFailure namespace { -// PostgreSQL epoch is 2000-01-01, Unix epoch is 1970-01-01 -// Difference is 946684800 seconds = 946684800000000 microseconds -constexpr int64_t kPostgresEpochOffsetUs = 946684800000000LL; - -// Exponential backoff constants -constexpr int kBaseDelayMs = 1000; // 1 second -constexpr int kMaxDelayMs = 60000; // 60 seconds -constexpr int kMaxConsecutiveFailures = 10; // Cap for exponential growth - -// Exporter state - encapsulates all bgworker-local state -struct ExporterState { - std::unique_ptr client; - int consecutive_failures = 0; - bool initialized = false; -}; - -// Bgworker-local exporter state (no locking needed) -ExporterState g_exporter; - -// Convert PschCmdType to string -const char* CmdTypeToString(PschCmdType cmd) { - switch (cmd) { - case PSCH_CMD_SELECT: - return "SELECT"; - case PSCH_CMD_UPDATE: - return "UPDATE"; - case PSCH_CMD_INSERT: - return "INSERT"; - case PSCH_CMD_DELETE: - return "DELETE"; - case PSCH_CMD_MERGE: - return "MERGE"; - case PSCH_CMD_UTILITY: - return "UTILITY"; - case PSCH_CMD_NOTHING: - return "NOTHING"; - default: - return "UNKNOWN"; +// ClickHouse-flavored Stats Exporter +// Builds a clickhouse::Block and uploads it +class ClickHouseExporter : public StatsExporter { + public: + // Tags in CH are just ordinary columns; aggregation happens in queries + shared_ptr> TagString(string_view name) final { + return Wrap(name); } -} - -// Dequeue events from the shared memory queue -std::vector DequeueEvents(int max_events) { - std::vector events; - events.reserve(max_events); - PschEvent event; - while (events.size() < static_cast(max_events) && PschDequeueEvent(&event)) { - events.push_back(event); + // Metrics are also all ordinary columns. + shared_ptr> MetricInt16(string_view name) final { + return Wrap(name); } - return events; -} - -// Build a ClickHouse block from events -clickhouse::Block BuildClickHouseBlock(const std::vector& events) { - elog(DEBUG1, "pg_stat_ch: BuildClickHouseBlock() called with %zu events", events.size()); - - elog(DEBUG2, "pg_stat_ch: creating column objects"); - clickhouse::Block block; - - // Basic columns - elog(DEBUG3, "pg_stat_ch: creating col_ts_start"); - auto col_ts_start = std::make_shared(6); - elog(DEBUG3, "pg_stat_ch: col_ts_start created"); - auto col_duration_us = std::make_shared(); - // Use pre-resolved names from event (resolved at capture time in hooks) - auto col_db = std::make_shared(); - auto col_username = std::make_shared(); - elog(DEBUG3, "pg_stat_ch: basic columns created"); - auto col_pid = std::make_shared(); - auto col_query_id = std::make_shared(); - auto col_cmd_type = std::make_shared(); - auto col_rows = std::make_shared(); - auto col_query = std::make_shared(); - elog(DEBUG3, "pg_stat_ch: all basic columns created"); - - // Buffer usage columns - auto col_shared_blks_hit = std::make_shared(); - auto col_shared_blks_read = std::make_shared(); - auto col_shared_blks_dirtied = std::make_shared(); - auto col_shared_blks_written = std::make_shared(); - auto col_local_blks_hit = std::make_shared(); - auto col_local_blks_read = std::make_shared(); - auto col_local_blks_dirtied = std::make_shared(); - auto col_local_blks_written = std::make_shared(); - auto col_temp_blks_read = std::make_shared(); - auto col_temp_blks_written = std::make_shared(); - - // I/O timing columns - auto col_shared_blk_read_time_us = std::make_shared(); - auto col_shared_blk_write_time_us = std::make_shared(); - auto col_local_blk_read_time_us = std::make_shared(); - auto col_local_blk_write_time_us = std::make_shared(); - auto col_temp_blk_read_time_us = std::make_shared(); - auto col_temp_blk_write_time_us = std::make_shared(); - - // WAL usage columns - auto col_wal_records = std::make_shared(); - auto col_wal_fpi = std::make_shared(); - auto col_wal_bytes = std::make_shared(); - - // CPU time columns - auto col_cpu_user_time_us = std::make_shared(); - auto col_cpu_sys_time_us = std::make_shared(); - - // JIT columns - auto col_jit_functions = std::make_shared(); - auto col_jit_generation_time_us = std::make_shared(); - auto col_jit_deform_time_us = std::make_shared(); - auto col_jit_inlining_time_us = std::make_shared(); - auto col_jit_optimization_time_us = std::make_shared(); - auto col_jit_emission_time_us = std::make_shared(); - - // Parallel worker columns - auto col_parallel_workers_planned = std::make_shared(); - auto col_parallel_workers_launched = std::make_shared(); - - elog(DEBUG3, "pg_stat_ch: creating error columns"); - // Error columns - auto col_err_sqlstate = std::make_shared(5); - auto col_err_elevel = std::make_shared(); - auto col_err_message = std::make_shared(); - elog(DEBUG3, "pg_stat_ch: error columns created"); - - // Client context columns - auto col_app = std::make_shared(); - auto col_client_addr = std::make_shared(); - - elog(DEBUG2, "pg_stat_ch: all columns created, starting event loop"); - size_t event_idx = 0; - for (const auto& ev : events) { - elog(DEBUG2, "pg_stat_ch: processing event %zu: pid=%d, query_len=%u", event_idx, ev.pid, - ev.query_len); - - int64_t unix_us = ev.ts_start + kPostgresEpochOffsetUs; - col_ts_start->Append(unix_us); - col_duration_us->Append(ev.duration_us); - - // Use pre-resolved names from event (resolved at capture time in hooks) - col_db->Append(std::string(ev.datname, ev.datname_len)); - col_username->Append(std::string(ev.username, ev.username_len)); - - col_pid->Append(ev.pid); - col_query_id->Append(static_cast(ev.queryid)); - col_cmd_type->Append(CmdTypeToString(ev.cmd_type)); - col_rows->Append(ev.rows); - - // Validate query_len before using it - uint16 safe_query_len = ev.query_len; - if (safe_query_len > PSCH_MAX_QUERY_LEN) { - elog(WARNING, "pg_stat_ch: event %zu has invalid query_len %u, clamping", event_idx, - safe_query_len); - safe_query_len = PSCH_MAX_QUERY_LEN; - } - col_query->Append(std::string(ev.query, safe_query_len)); - - elog(DEBUG3, "pg_stat_ch: event %zu - buffer usage", event_idx); - // Buffer usage - col_shared_blks_hit->Append(ev.shared_blks_hit); - col_shared_blks_read->Append(ev.shared_blks_read); - col_shared_blks_dirtied->Append(ev.shared_blks_dirtied); - col_shared_blks_written->Append(ev.shared_blks_written); - col_local_blks_hit->Append(ev.local_blks_hit); - col_local_blks_read->Append(ev.local_blks_read); - col_local_blks_dirtied->Append(ev.local_blks_dirtied); - col_local_blks_written->Append(ev.local_blks_written); - col_temp_blks_read->Append(ev.temp_blks_read); - col_temp_blks_written->Append(ev.temp_blks_written); - - elog(DEBUG3, "pg_stat_ch: event %zu - I/O timing", event_idx); - // I/O timing - col_shared_blk_read_time_us->Append(ev.shared_blk_read_time_us); - col_shared_blk_write_time_us->Append(ev.shared_blk_write_time_us); - col_local_blk_read_time_us->Append(ev.local_blk_read_time_us); - col_local_blk_write_time_us->Append(ev.local_blk_write_time_us); - col_temp_blk_read_time_us->Append(ev.temp_blk_read_time_us); - col_temp_blk_write_time_us->Append(ev.temp_blk_write_time_us); - - elog(DEBUG3, "pg_stat_ch: event %zu - WAL usage", event_idx); - // WAL usage - col_wal_records->Append(ev.wal_records); - col_wal_fpi->Append(ev.wal_fpi); - col_wal_bytes->Append(ev.wal_bytes); - - elog(DEBUG3, "pg_stat_ch: event %zu - CPU time", event_idx); - // CPU time - col_cpu_user_time_us->Append(ev.cpu_user_time_us); - col_cpu_sys_time_us->Append(ev.cpu_sys_time_us); - - elog(DEBUG3, "pg_stat_ch: event %zu - JIT", event_idx); - // JIT - col_jit_functions->Append(ev.jit_functions); - col_jit_generation_time_us->Append(ev.jit_generation_time_us); - col_jit_deform_time_us->Append(ev.jit_deform_time_us); - col_jit_inlining_time_us->Append(ev.jit_inlining_time_us); - col_jit_optimization_time_us->Append(ev.jit_optimization_time_us); - col_jit_emission_time_us->Append(ev.jit_emission_time_us); - - elog(DEBUG3, "pg_stat_ch: event %zu - parallel workers", event_idx); - // Parallel workers - col_parallel_workers_planned->Append(ev.parallel_workers_planned); - col_parallel_workers_launched->Append(ev.parallel_workers_launched); - - elog(DEBUG3, "pg_stat_ch: event %zu - error info", event_idx); - // Error info (5-char SQLSTATE, trimmed) - col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5)); - col_err_elevel->Append(ev.err_elevel); - // Error message (validate length) - uint16 safe_err_msg_len = ev.err_message_len; - if (safe_err_msg_len > PSCH_MAX_ERR_MSG_LEN) { - elog(WARNING, "pg_stat_ch: event %zu has invalid err_message_len %u, clamping", event_idx, - safe_err_msg_len); - safe_err_msg_len = PSCH_MAX_ERR_MSG_LEN; - } - col_err_message->Append(std::string(ev.err_message, safe_err_msg_len)); - - elog(DEBUG3, "pg_stat_ch: event %zu - client context (app_len=%u, addr_len=%u)", event_idx, - ev.application_name_len, ev.client_addr_len); - // Client context - validate lengths - uint8 safe_app_len = ev.application_name_len; - if (safe_app_len > 63) { - elog(WARNING, "pg_stat_ch: event %zu has invalid app_name_len %u, clamping", event_idx, - safe_app_len); - safe_app_len = 63; - } - uint8 safe_addr_len = ev.client_addr_len; - if (safe_addr_len > 45) { - elog(WARNING, "pg_stat_ch: event %zu has invalid client_addr_len %u, clamping", event_idx, - safe_addr_len); - safe_addr_len = 45; - } - col_app->Append(std::string(ev.application_name, safe_app_len)); - col_client_addr->Append(std::string(ev.client_addr, safe_addr_len)); - - event_idx++; + shared_ptr> MetricInt32(string_view name) final { + return Wrap(name); + } + shared_ptr> MetricInt64(string_view name) final { + return Wrap(name); + } + shared_ptr> MetricUInt8(string_view name) final { + return Wrap(name); + } + shared_ptr> MetricUInt64(string_view name) final { + return Wrap(name); + } + shared_ptr> MetricFixedString(int len, string_view name) final { + return Wrap(name, len); } - elog(DEBUG1, "pg_stat_ch: finished processing %zu events", event_idx); - - // Basic columns - block.AppendColumn("ts_start", col_ts_start); - block.AppendColumn("duration_us", col_duration_us); - block.AppendColumn("db", col_db); - block.AppendColumn("username", col_username); - block.AppendColumn("pid", col_pid); - block.AppendColumn("query_id", col_query_id); - block.AppendColumn("cmd_type", col_cmd_type); - block.AppendColumn("rows", col_rows); - block.AppendColumn("query", col_query); - - // Buffer usage columns - block.AppendColumn("shared_blks_hit", col_shared_blks_hit); - block.AppendColumn("shared_blks_read", col_shared_blks_read); - block.AppendColumn("shared_blks_dirtied", col_shared_blks_dirtied); - block.AppendColumn("shared_blks_written", col_shared_blks_written); - block.AppendColumn("local_blks_hit", col_local_blks_hit); - block.AppendColumn("local_blks_read", col_local_blks_read); - block.AppendColumn("local_blks_dirtied", col_local_blks_dirtied); - block.AppendColumn("local_blks_written", col_local_blks_written); - block.AppendColumn("temp_blks_read", col_temp_blks_read); - block.AppendColumn("temp_blks_written", col_temp_blks_written); - - // I/O timing columns - block.AppendColumn("shared_blk_read_time_us", col_shared_blk_read_time_us); - block.AppendColumn("shared_blk_write_time_us", col_shared_blk_write_time_us); - block.AppendColumn("local_blk_read_time_us", col_local_blk_read_time_us); - block.AppendColumn("local_blk_write_time_us", col_local_blk_write_time_us); - block.AppendColumn("temp_blk_read_time_us", col_temp_blk_read_time_us); - block.AppendColumn("temp_blk_write_time_us", col_temp_blk_write_time_us); - - // WAL usage columns - block.AppendColumn("wal_records", col_wal_records); - block.AppendColumn("wal_fpi", col_wal_fpi); - block.AppendColumn("wal_bytes", col_wal_bytes); - - // CPU time columns - block.AppendColumn("cpu_user_time_us", col_cpu_user_time_us); - block.AppendColumn("cpu_sys_time_us", col_cpu_sys_time_us); - - // JIT columns - block.AppendColumn("jit_functions", col_jit_functions); - block.AppendColumn("jit_generation_time_us", col_jit_generation_time_us); - block.AppendColumn("jit_deform_time_us", col_jit_deform_time_us); - block.AppendColumn("jit_inlining_time_us", col_jit_inlining_time_us); - block.AppendColumn("jit_optimization_time_us", col_jit_optimization_time_us); - block.AppendColumn("jit_emission_time_us", col_jit_emission_time_us); - - // Parallel worker columns - block.AppendColumn("parallel_workers_planned", col_parallel_workers_planned); - block.AppendColumn("parallel_workers_launched", col_parallel_workers_launched); - - // Error columns - block.AppendColumn("err_sqlstate", col_err_sqlstate); - block.AppendColumn("err_elevel", col_err_elevel); - block.AppendColumn("err_message", col_err_message); - - // Client context columns - block.AppendColumn("app", col_app); - block.AppendColumn("client_addr", col_client_addr); - return block; -} + // Records... you guessed it + shared_ptr> RecordInt32(string_view name) final { + return Wrap(name); + } + shared_ptr> RecordInt64(string_view name) final { + return Wrap(name); + } + shared_ptr> RecordDateTime(string_view name) final { + return Wrap(name, 6); + } + shared_ptr> RecordString(string_view name) final { + return Wrap(name); + } -} // namespace + void BeginBatch() final { + block = std::make_unique(); + columns.clear(); + exported_count = 0; + } + void BeginRow() final { ++exported_count; } + bool CommitBatch() final; + + bool EstablishNewConnection() final; + bool IsConnected() const final { return (bool)client; } + int NumConsecutiveFailures() const final { return consecutive_failures; } + void ResetFailures() final { consecutive_failures = 0; } + int NumExported() const final { return exported_count; } + + private: + template class ClickHouseColumn : public Column { + public: + template + ClickHouseColumn(ClickHouseExporter *exporter_, std::string_view name_, CH_Args&&... args): + exporter(exporter_), name(name_), ch_column(std::make_shared(args...)) {} + + void Append(const T &t) final { ch_column->Append(t); } + void Crunch() final { exporter->block->AppendColumn(name, ch_column); } + + private: + ClickHouseExporter *const exporter; + std::string name; + const shared_ptr ch_column; + }; + + template + shared_ptr> Wrap(std::string_view name, Args&&... args) { + auto col = std::make_shared>(this, name, args...); + columns.push_back(col); + return col; + } -extern "C" { + std::unique_ptr client; + std::unique_ptr block; + std::vector> columns; + int consecutive_failures = 0; + int exported_count = 0; +}; -bool PschExporterInit(void) { +bool ClickHouseExporter::EstablishNewConnection() { try { clickhouse::ClientOptions options; @@ -356,8 +136,7 @@ bool PschExporterInit(void) { psch_clickhouse_skip_tls_verify ? " (verification skipped)" : ""); } - g_exporter.client = std::make_unique(options); - g_exporter.initialized = true; + client = std::make_unique(options); const char* host = psch_clickhouse_host != nullptr ? psch_clickhouse_host : "localhost"; elog(LOG, "pg_stat_ch: connected to ClickHouse at %s:%d%s", host, psch_clickhouse_port, @@ -367,87 +146,49 @@ bool PschExporterInit(void) { } catch (const std::exception& ex) { std::string err_msg = ex.what(); elog(WARNING, "pg_stat_ch: failed to connect to ClickHouse: %s", err_msg.c_str()); - g_exporter.client.reset(); + client.reset(); return false; } } -int PschExportBatch(void) { - elog(DEBUG1, "pg_stat_ch: PschExportBatch() called"); - - if (g_exporter.client == nullptr) { - elog(DEBUG1, "pg_stat_ch: client is null, initializing"); - if (!PschExporterInit()) { - g_exporter.consecutive_failures++; - PschRecordExportFailure("Failed to connect to ClickHouse"); - return 0; +bool ClickHouseExporter::CommitBatch() { + try { + if (!block) { + elog(WARNING, "pg_stat_ch: Logic error: Block not built"); + return false; + } + for (const auto &col : columns) { + col->Crunch(); } - } - - elog(DEBUG1, "pg_stat_ch: dequeuing events (max=%d)", psch_batch_max); - std::vector events = DequeueEvents(psch_batch_max); - if (events.empty()) { - elog(DEBUG1, "pg_stat_ch: no events to export"); - return 0; - } - elog(DEBUG1, "pg_stat_ch: building ClickHouse block with %zu events", events.size()); + if (!client && (!EstablishNewConnection() || !client)) { + elog(WARNING, "pg_stat_ch: Connection not established; bailing."); + return false; + } - try { - clickhouse::Block block = BuildClickHouseBlock(events); - elog(DEBUG1, "pg_stat_ch: block built, inserting to ClickHouse"); - g_exporter.client->Insert("events_raw", block); + elog(DEBUG1, "pg_stat_ch: Inserting Block to ClickHouse"); + client->Insert("events_raw", *block); elog(DEBUG1, "pg_stat_ch: insert completed"); - if (psch_shared_state != nullptr) { - pg_atomic_fetch_add_u64(&psch_shared_state->exported, events.size()); - } - // Success: reset retry state and record success timestamp - g_exporter.consecutive_failures = 0; - PschRecordExportSuccess(); - - int count = static_cast(events.size()); - elog(DEBUG1, "pg_stat_ch: exported %d events to ClickHouse", count); - return count; + consecutive_failures = 0; + elog(DEBUG1, "pg_stat_ch: exported %d events to ClickHouse", exported_count); + return true; } catch (const std::exception& ex) { std::string err_msg = ex.what(); elog(WARNING, "pg_stat_ch: failed to insert to ClickHouse: %s", err_msg.c_str()); // Failure: increment counter, record error, reset client for reconnect - g_exporter.consecutive_failures++; + consecutive_failures++; PschRecordExportFailure(err_msg.c_str()); - g_exporter.client.reset(); - return 0; - } -} - -void PschResetRetryState(void) { - g_exporter.consecutive_failures = 0; -} - -int PschGetRetryDelayMs(void) { - if (g_exporter.consecutive_failures <= 0) { - return 0; + client.reset(); + return false; } - // Exponential backoff: base * 2^(failures-1), capped at max - int capped_failures = (g_exporter.consecutive_failures > kMaxConsecutiveFailures) - ? kMaxConsecutiveFailures - : g_exporter.consecutive_failures; - int delay = kBaseDelayMs * (1 << (capped_failures - 1)); - return (delay > kMaxDelayMs) ? kMaxDelayMs : delay; } -int PschGetConsecutiveFailures(void) { - return g_exporter.consecutive_failures; -} +} // namespace -void PschExporterShutdown(void) { - g_exporter.client.reset(); - g_exporter.consecutive_failures = 0; - g_exporter.initialized = false; - elog(LOG, "pg_stat_ch: ClickHouse exporter shutdown"); +std::unique_ptr MakeClickHouseExporter() { + return std::make_unique(); } - -} // extern "C" diff --git a/src/export/clickhouse_exporter.h b/src/export/clickhouse_exporter.h index 5975c09..acaacc9 100644 --- a/src/export/clickhouse_exporter.h +++ b/src/export/clickhouse_exporter.h @@ -2,28 +2,9 @@ #ifndef PG_STAT_CH_SRC_EXPORT_CLICKHOUSE_EXPORTER_H_ #define PG_STAT_CH_SRC_EXPORT_CLICKHOUSE_EXPORTER_H_ -#ifdef __cplusplus -extern "C" { -#endif +#include "exporter_interface.h" +#include -#include "postgres.h" - -// Initialize the ClickHouse exporter (called once at bgworker startup) -bool PschExporterInit(void); - -// Export one batch. Returns number of events exported (0 = queue empty or error). -int PschExportBatch(void); - -// Shutdown the exporter and close connection -void PschExporterShutdown(void); - -// Retry state management for exponential backoff -void PschResetRetryState(void); -int PschGetRetryDelayMs(void); -int PschGetConsecutiveFailures(void); - -#ifdef __cplusplus -} -#endif +std::unique_ptr MakeClickHouseExporter(); #endif // PG_STAT_CH_SRC_EXPORT_CLICKHOUSE_EXPORTER_H_ diff --git a/src/export/exporter_interface.h b/src/export/exporter_interface.h new file mode 100644 index 0000000..31d0f5e --- /dev/null +++ b/src/export/exporter_interface.h @@ -0,0 +1,75 @@ +#ifndef PG_STAT_CH_SRC_EXPORT_EXPORTER_INTERFACE_H_ +#define PG_STAT_CH_SRC_EXPORT_EXPORTER_INTERFACE_H_ + +#include +#include +#include +#include + +class StatsExporter { + protected: + using string = std::string; + using string_view = std::string_view; + template using shared_ptr = std::shared_ptr; + + class BasicColumn { + public: + virtual void Crunch() = 0; // Implementation-defined processing helper. + virtual ~BasicColumn() = default; + }; + template class Column : public BasicColumn { + public: + virtual void Append(const T &t) = 0; + virtual ~Column() = default; + }; + + public: + // Tags: Columns that serve as narrowing criteria for metrics. + virtual shared_ptr> TagString(string_view name) = 0; + + // Metrics: Columns that are generally bucketed into histograms. + virtual shared_ptr> MetricInt16(string_view name) = 0; + virtual shared_ptr> MetricInt32(string_view name) = 0; + virtual shared_ptr> MetricInt64(string_view name) = 0; + virtual shared_ptr> MetricUInt8(string_view name) = 0; + virtual shared_ptr> MetricUInt64(string_view name) = 0; + virtual shared_ptr> MetricFixedString(int len, string_view name) = 0; + + // Records: Data columns you wouldn't want to filter by. + virtual shared_ptr> RecordInt32(string_view name) = 0; + virtual shared_ptr> RecordInt64(string_view name) = 0; + virtual shared_ptr> RecordDateTime(string_view name) = 0; + virtual shared_ptr> RecordString(string_view name) = 0; + + virtual void BeginBatch() = 0; + virtual void BeginRow() = 0; + virtual bool CommitBatch() = 0; + + virtual bool EstablishNewConnection() = 0; + virtual bool IsConnected() const = 0; + virtual int NumConsecutiveFailures() const = 0; + virtual void ResetFailures() = 0; + virtual int NumExported() const = 0; + + virtual ~StatsExporter() = default; +}; + +// Allows PG logging of exceptional cases without postgres.h +void LogNegativeValue(const std::string &column_name, int64_t value); + +// Expected usage: +// void ProcessBatch(StatsExporter *exporter) { +// exporter->BeginBatch(); // no op or ClickHouse column reset +// auto col_user = exporter->TagString("username"); +// auto col_rows = exporter->MetricUInt64("rows"); +// +// for (const auto &ev : events) { +// exporter->BeginRow(); // no-op or initialize tag map +// col_user->Append(ev.username); +// col_rows->Append(ev.rows); +// } +// +// exporter->CommitBatch(); // Inserts or collects stats +// } + +#endif // PG_STAT_CH_SRC_EXPORT_EXPORTER_INTERFACE_H_ diff --git a/src/export/otel_exporter.cc b/src/export/otel_exporter.cc new file mode 100644 index 0000000..e58aaf7 --- /dev/null +++ b/src/export/otel_exporter.cc @@ -0,0 +1,361 @@ +#include "opentelemetry/logs/provider.h" +#include "opentelemetry/metrics/meter.h" +#include "opentelemetry/metrics/provider.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config/guc.h" +#include "export/exporter_interface.h" + +#include +#include +#include + +namespace { + +namespace logs = opentelemetry::logs; +namespace logs_sdk = opentelemetry::sdk::logs; +namespace metrics = opentelemetry::metrics; +namespace metrics_sdk = opentelemetry::sdk::metrics; +namespace nostd = opentelemetry::nostd; +namespace otlp = opentelemetry::exporter::otlp; +namespace otel_common = opentelemetry::common; + +// Because Microsoft ruins everything +template using otel_shared_ptr = nostd::shared_ptr; +template using otel_unique_ptr = nostd::unique_ptr; + +class OTelExporter : public StatsExporter { + public: + void BeginBatch() final { + if (row_active) EndRow(); // Just in case + columns.clear(); + } + + void BeginRow() final { + if (row_active) EndRow(); // We don't make the user call this + current_row_tags.clear(); + current_log_record = logger->CreateLogRecord(); + row_active = true; + } + + bool CommitBatch() final; + + shared_ptr> TagString(string_view name) final { + return Wrap>(name); + } + + shared_ptr> MetricInt16(string_view name) final { + return Wrap>(name); + } + shared_ptr> MetricInt32(string_view name) final { + return Wrap>(name); + } + shared_ptr> MetricInt64(string_view name) final { + return Wrap>(name); + } + shared_ptr> MetricUInt8(string_view name) final { + return Wrap>(name); + } + shared_ptr> MetricUInt64(string_view name) final { + return Wrap>(name); + } + shared_ptr> MetricFixedString(int, string_view name) final { + return Wrap(name); + } + + shared_ptr> RecordInt32(string_view name) final { + return Wrap>(name); + } + shared_ptr> RecordInt64(string_view name) final { + return Wrap>(name); + } + shared_ptr> RecordDateTime(string_view name) final { + return Wrap>(name); + } + shared_ptr> RecordString(string_view name) final { + return Wrap>(name); + } + + bool EstablishNewConnection() final; + bool IsConnected() const final { return metrics_provider && log_provider; } + int NumConsecutiveFailures() const final { return consecutive_failures; } + void ResetFailures() final { consecutive_failures = 0; } + int NumExported() const final { return exported_count; } + + private: + void EndRow() { + if (!row_active) return; + + for (auto& col : columns) { + col->Crunch(); // Crunch happens for each row in OTel, not just the batch + } + + logger->EmitLogRecord(std::move(current_log_record)); + row_active = false; + ++exported_count; + } + + // ===================================================================== + // Column implementation classes (translate OTel concepts to CH Columns) + // ===================================================================== + + // No Instrument, Tag Column: Applies a tag to all metrics in the row. + template class TagColumn : public Column { + public: + TagColumn(OTelExporter* e, string_view n) : exp(e), name(n) {} + + void Append(const T& val) final { + // Always add values to the log record. + exp->current_log_record->SetAttribute(name, val); + // Convert to string and store in the shared map for THIS row + exp->current_row_tags[name] = to_string(val); + } + void Crunch() final {} // Nothing to do, tags are passive at EndRow + + static string to_string(const string &x) { return x; } + static string to_string(string_view x) { return string(x); } + template static string to_string(U x) { return std::to_string(x); } + + private: + OTelExporter* exp; + string name; + }; + + // Histogram Instrument Metric Column: Buckets numeric metrics. + template + class HistogramColumn : public Column { + public: + HistogramColumn(OTelExporter* e, string_view n) + : exp(e), name(n), instrument(exp->GetUnsignedHistogram(name)) {} + + void Append(const T& val) final { + // Always add values to the log record. + exp->current_log_record->SetAttribute(name, val); + // Stash the value until later, when all tags have been gathered + // implicit cast from T (int32_t) to int64_t happens here + if (val < 0) { + LogNegativeValue(name, static_cast(val)); + stash_val = 0; + } else { + stash_val = static_cast(val); + } + } + + void Crunch() final { + instrument->Record(stash_val, exp->current_row_tags, {}); + } + + private: + OTelExporter* exp; + string name; + otel_shared_ptr> instrument; + uint64_t stash_val = 0; + }; + + // 3. Counter Instrument Metric Column (for histograms of specific tag values) + class CounterColumn : public Column { + public: + CounterColumn(OTelExporter* e, string_view n): + exp(e), name(n), instrument(exp->GetUnsignedCounter(name + ".count")) {} + + void Append(const string_view& val) final { + stash_val = std::string(val); + exp->current_log_record->SetAttribute(name, stash_val); + } + + void Crunch() final { + // 1. Copy the shared tags + auto tags_with_value = exp->current_row_tags; + // 2. Inject the string value as a tag for this specific metric + tags_with_value[name] = stash_val; + // 3. Increment counter for this tag combination + instrument->Add(1, tags_with_value); + } + + private: + OTelExporter* exp; + string name; + string stash_val; + otel_shared_ptr> instrument; + }; + + // 4. Record Only Data Column: No metrics, just logs. + // (Note that all columns are put in records; these just do nothing else.) + template class RecordOnlyColumn : public Column { + public: + RecordOnlyColumn(OTelExporter* e, string_view n) : exp(e), name(n) {} + + void Append(const T& val) final { + assert(exp->row_active && exp->current_log_record); + exp->current_log_record->SetAttribute(name, NoStringViews(val)); + } + void Crunch() final {} // No metrics to emit + + template const U &NoStringViews(const U &x) { return x; } + string NoStringViews(std::string_view x) { return string{x}; } + + private: + OTelExporter* exp; + std::string name; + }; + + template + std::shared_ptr Wrap(string_view name) { + auto col = std::make_shared(this, name); + columns.push_back(col); // Keep alive for the batch + return col; + } + + otel_shared_ptr> GetUnsignedHistogram(std::string_view name) { + // Insert a nullptr placeholder. + // 'inserted' is true if the key didn't exist. + // 'it' points to the element (either new or existing). + auto [it, inserted] = histogram_cache.insert(HistogramMap::value_type{name, nullptr}); + if (inserted) { + // Only create the heavy OTel object if we actually inserted a new key + it->second = meter->CreateUInt64Histogram(it->first, "description", "unit"); + } + return it->second; + } + + otel_shared_ptr> GetUnsignedCounter(std::string_view name) { + auto [it, inserted] = counter_cache.insert(CounterMap::value_type{name, nullptr}); + if (inserted) { + it->second = meter->CreateUInt64Counter(it->first, "description", "unit"); + } + return it->second; + } + + // ===================================================================== + // OTel connection state + // ===================================================================== + otel_shared_ptr meter; + otel_shared_ptr logger; + shared_ptr metrics_provider; + shared_ptr metrics_reader; + shared_ptr log_provider; + int consecutive_failures = 0; + int exported_count = 0; + + using HistogramMap = + std::map>, std::less<>>; + HistogramMap histogram_cache; + using CounterMap = + std::map>, std::less<>>; + CounterMap counter_cache; + + // Row state + bool row_active = false; + std::map current_row_tags; + otel_unique_ptr current_log_record; + + std::vector> columns; +}; + + +const char *def(const char *val, const char *default_) { + return val && *val ? val : default_; +} + +const char* GetAHostname(const char* fallback) { + if (psch_hostname && *psch_hostname) return psch_hostname; + const char* env = getenv("HOSTNAME"); + if (env && *env) return env; + return fallback; +} + +bool OTelExporter::EstablishNewConnection() { + try { + const std::string hostname = GetAHostname("postgres-primary"); + const std::string endpoint = def(psch_otel_endpoint, "localhost:4317"); + const std::string pgch_version = PG_STAT_CH_VERSION; + + // Resource (The "ID Card" for our service) + auto resource_attributes = opentelemetry::sdk::resource::ResourceAttributes{ + {"service.name", "pg_stat_ch"}, + {"service.version", pgch_version}, + {"host.name", hostname} // Ideally fetch real hostname + }; + auto resource = opentelemetry::sdk::resource::Resource::Create(resource_attributes); + + // Configure Metrics + // ------------------------------------------------------------------------- + otlp::OtlpGrpcMetricExporterOptions metric_opts; + metric_opts.endpoint = endpoint; + + // Configure Reader (Manual Flush Mode) + metrics_sdk::PeriodicExportingMetricReaderOptions reader_opts; + reader_opts.export_interval_millis = std::chrono::milliseconds::max(); + reader_opts.export_timeout_millis = std::chrono::milliseconds(1000); + + metrics_reader = metrics_sdk::PeriodicExportingMetricReaderFactory::Create( + otlp::OtlpGrpcMetricExporterFactory::Create(metric_opts), + reader_opts); + + // Create the Provider with our Resource and add our Reader + // Note: We use the ViewRegistry (default) + metrics_provider = std::make_shared( + std::make_unique(), + resource); + metrics_provider->AddMetricReader(metrics_reader); + + // Configure Logs + // ------------------------------------------------------------------------- + otlp::OtlpGrpcLogRecordExporterOptions log_opts; + log_opts.endpoint = endpoint; + + // Create Logger Provider WITH the same Resource + log_provider = std::make_shared( + logs_sdk::SimpleLogRecordProcessorFactory::Create( + otlp::OtlpGrpcLogRecordExporterFactory::Create(log_opts)), + resource); + + // Get Instruments + // ------------------------------------------------------------------------- + meter = metrics_provider->GetMeter("pg_stat_ch", pgch_version); + logger = log_provider->GetLogger("pg_stat_ch", "pg_stat_ch_logs"); + + return true; + } catch (const std::exception& e) { + // PschLog(LogLevel::Warning, "pg_stat_ch: OTel init failed: %s", e.what()); + return false; + } +} + + +bool OTelExporter::CommitBatch() { + // 1. Finish the last row logic (as discussed) + EndRow(); + + // Flush Metrics (The Reader scrapes and sends) + bool metrics_ok = metrics_reader->ForceFlush(std::chrono::seconds(1)); + + // Flush Logs (The Provider pushes the Processor to send) + bool logs_ok = log_provider->ForceFlush(std::chrono::seconds(1)); + + // Only count it as a success if both pipelines are healthy. + if (metrics_ok && logs_ok) { + ResetFailures(); + return true; + } else { + consecutive_failures++; + // PschLog(LogLevel::Warning, "pg_stat_ch: OTel export failed " + // "(Metrics: %d, Logs: %d)", metrics_ok, logs_ok); + return false; + } +} + +} // namespace + +std::unique_ptr MakeOpenTelemetryExporter() { + return std::make_unique(); +} diff --git a/src/export/otel_exporter.h b/src/export/otel_exporter.h new file mode 100644 index 0000000..239c7dc --- /dev/null +++ b/src/export/otel_exporter.h @@ -0,0 +1,10 @@ +#ifndef PG_STAT_CH_SRC_EXPORT_OTEL_EXPORTER_H_ +#define PG_STAT_CH_SRC_EXPORT_OTEL_EXPORTER_H_ + +#include + +#include "export/exporter_interface.h" + +std::unique_ptr MakeOpenTelemetryExporter(); + +#endif // PG_STAT_CH_SRC_EXPORT_OTEL_EXPORTER_H_ diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc new file mode 100644 index 0000000..c547e53 --- /dev/null +++ b/src/export/stats_exporter.cc @@ -0,0 +1,345 @@ +// pg_stat_ch statistics exporter implementation + +extern "C" { +#include "postgres.h" +} + +#include +#include +#include +#include + +#include "config/guc.h" +#include "export/stats_exporter.h" +#include "export/exporter_interface.h" +#include "export/clickhouse_exporter.h" +#include "export/otel_exporter.h" +#include "queue/event.h" +#include "queue/shmem.h" + + +namespace { + +// PostgreSQL epoch is 2000-01-01, Unix epoch is 1970-01-01 +// Difference is 946684800 seconds = 946684800000000 microseconds +constexpr int64_t kPostgresEpochOffsetUs = 946684800000000LL; + +// Exponential backoff constants +constexpr int kBaseDelayMs = 1000; // 1 second +constexpr int kMaxDelayMs = 60000; // 60 seconds +constexpr int kMaxConsecutiveFailures = 10; // Cap for exponential growth + +// Exporter state - encapsulates all bgworker-local state +struct ExporterState { + std::unique_ptr exporter; +}; + +// Bgworker-local exporter state (no locking needed) +ExporterState g_exporter; + +// Convert PschCmdType to string +const char* CmdTypeToString(PschCmdType cmd) { + switch (cmd) { + case PSCH_CMD_SELECT: + return "SELECT"; + case PSCH_CMD_UPDATE: + return "UPDATE"; + case PSCH_CMD_INSERT: + return "INSERT"; + case PSCH_CMD_DELETE: + return "DELETE"; + case PSCH_CMD_MERGE: + return "MERGE"; + case PSCH_CMD_UTILITY: + return "UTILITY"; + case PSCH_CMD_NOTHING: + return "NOTHING"; + default: + return "UNKNOWN"; + } +} + +// Dequeue events from the shared memory queue +std::vector DequeueEvents(int max_events) { + std::vector events; + events.reserve(max_events); + + PschEvent event; + while (events.size() < static_cast(max_events) && PschDequeueEvent(&event)) { + events.push_back(event); + } + return events; +} + +// Build and export stats (records, metrics, ClickHouse rows) from events +void ExportEventStats(const std::vector& events, StatsExporter *exporter) { + elog(DEBUG1, "pg_stat_ch: ExportEventStats() called with %zu events", events.size()); + + exporter->BeginBatch(); + + elog(DEBUG2, "pg_stat_ch: creating column objects"); + + // Basic columns + elog(DEBUG3, "pg_stat_ch: creating col_ts_start"); + auto col_ts_start = exporter->RecordDateTime("ts_start"); + elog(DEBUG3, "pg_stat_ch: col_ts_start created"); + auto col_duration_us = exporter->MetricUInt64("duration_us"); + // Use pre-resolved names from event (resolved at capture time in hooks) + auto col_db = exporter->TagString("db"); + auto col_username = exporter->TagString("username"); + elog(DEBUG3, "pg_stat_ch: basic columns created"); + auto col_pid = exporter->RecordInt32("pid"); + auto col_query_id = exporter->RecordInt64("query_id"); + auto col_cmd_type = exporter->RecordString("cmd_type"); + auto col_rows = exporter->MetricUInt64("rows"); + auto col_query = exporter->RecordString("query"); + elog(DEBUG3, "pg_stat_ch: all basic columns created"); + + // Buffer usage columns + auto col_shared_blks_hit = exporter->MetricInt64("shared_blks_hit"); + auto col_shared_blks_read = exporter->MetricInt64("shared_blks_read"); + auto col_shared_blks_dirtied = exporter->MetricInt64("shared_blks_dirtied"); + auto col_shared_blks_written = exporter->MetricInt64("shared_blks_written"); + auto col_local_blks_hit = exporter->MetricInt64("local_blks_hit"); + auto col_local_blks_read = exporter->MetricInt64("local_blks_read"); + auto col_local_blks_dirtied = exporter->MetricInt64("local_blks_dirtied"); + auto col_local_blks_written = exporter->MetricInt64("local_blks_written"); + auto col_temp_blks_read = exporter->MetricInt64("temp_blks_read"); + auto col_temp_blks_written = exporter->MetricInt64("temp_blks_written"); + + // I/O timing columns + auto col_shared_blk_read_time_us = exporter->MetricInt64("shared_blk_read_time_us"); + auto col_shared_blk_write_time_us = exporter->MetricInt64("shared_blk_write_time_us"); + auto col_local_blk_read_time_us = exporter->MetricInt64("local_blk_read_time_us"); + auto col_local_blk_write_time_us = exporter->MetricInt64("local_blk_write_time_us"); + auto col_temp_blk_read_time_us = exporter->MetricInt64("temp_blk_read_time_us"); + auto col_temp_blk_write_time_us = exporter->MetricInt64("temp_blk_write_time_us"); + + // WAL usage columns + auto col_wal_records = exporter->MetricInt64("wal_records"); + auto col_wal_fpi = exporter->MetricInt64("wal_fpi"); + auto col_wal_bytes = exporter->MetricUInt64("wal_bytes"); + + // CPU time columns + auto col_cpu_user_time_us = exporter->MetricInt64("cpu_user_time_us"); + auto col_cpu_sys_time_us = exporter->MetricInt64("cpu_sys_time_us"); + + // JIT columns + auto col_jit_functions = exporter->MetricInt32("jit_functions"); + auto col_jit_generation_time_us = exporter->MetricInt32("jit_generation_time_us"); + auto col_jit_deform_time_us = exporter->MetricInt32("jit_deform_time_us"); + auto col_jit_inlining_time_us = exporter->MetricInt32("jit_inlining_time_us"); + auto col_jit_optimization_time_us = exporter->MetricInt32("jit_optimization_time_us"); + auto col_jit_emission_time_us = exporter->MetricInt32("jit_emission_time_us"); + + // Parallel worker columns + auto col_parallel_workers_planned = exporter->MetricInt16("parallel_workers_planned"); + auto col_parallel_workers_launched = exporter->MetricInt16("parallel_workers_launched"); + + elog(DEBUG3, "pg_stat_ch: creating error columns"); + // Error columns + auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate"); + auto col_err_elevel = exporter->MetricUInt8("err_elevel"); + auto col_err_message = exporter->RecordString("err_message"); + elog(DEBUG3, "pg_stat_ch: error columns created"); + + // Client context columns; records rather than tags (no histogram in OTel) + auto col_app = exporter->RecordString("app"); + auto col_client_addr = exporter->RecordString("client_addr"); + + elog(DEBUG2, "pg_stat_ch: all columns created, starting event loop"); + size_t event_idx = 0; + for (const auto& ev : events) { + elog(DEBUG2, "pg_stat_ch: processing event %zu: pid=%d, query_len=%u", event_idx, ev.pid, + ev.query_len); + exporter->BeginRow(); + + int64_t unix_us = ev.ts_start + kPostgresEpochOffsetUs; + col_ts_start->Append(unix_us); + col_duration_us->Append(ev.duration_us); + + // Use pre-resolved names from event (resolved at capture time in hooks) + col_db->Append(std::string(ev.datname, ev.datname_len)); + col_username->Append(std::string(ev.username, ev.username_len)); + + col_pid->Append(ev.pid); + col_query_id->Append(static_cast(ev.queryid)); + col_cmd_type->Append(CmdTypeToString(ev.cmd_type)); + col_rows->Append(ev.rows); + + // Validate query_len before using it + uint16 safe_query_len = ev.query_len; + if (safe_query_len > PSCH_MAX_QUERY_LEN) { + elog(WARNING, "pg_stat_ch: event %zu has invalid query_len %u, clamping", event_idx, + safe_query_len); + safe_query_len = PSCH_MAX_QUERY_LEN; + } + col_query->Append(std::string(ev.query, safe_query_len)); + + elog(DEBUG3, "pg_stat_ch: event %zu - buffer usage", event_idx); + // Buffer usage + col_shared_blks_hit->Append(ev.shared_blks_hit); + col_shared_blks_read->Append(ev.shared_blks_read); + col_shared_blks_dirtied->Append(ev.shared_blks_dirtied); + col_shared_blks_written->Append(ev.shared_blks_written); + col_local_blks_hit->Append(ev.local_blks_hit); + col_local_blks_read->Append(ev.local_blks_read); + col_local_blks_dirtied->Append(ev.local_blks_dirtied); + col_local_blks_written->Append(ev.local_blks_written); + col_temp_blks_read->Append(ev.temp_blks_read); + col_temp_blks_written->Append(ev.temp_blks_written); + + elog(DEBUG3, "pg_stat_ch: event %zu - I/O timing", event_idx); + // I/O timing + col_shared_blk_read_time_us->Append(ev.shared_blk_read_time_us); + col_shared_blk_write_time_us->Append(ev.shared_blk_write_time_us); + col_local_blk_read_time_us->Append(ev.local_blk_read_time_us); + col_local_blk_write_time_us->Append(ev.local_blk_write_time_us); + col_temp_blk_read_time_us->Append(ev.temp_blk_read_time_us); + col_temp_blk_write_time_us->Append(ev.temp_blk_write_time_us); + + elog(DEBUG3, "pg_stat_ch: event %zu - WAL usage", event_idx); + // WAL usage + col_wal_records->Append(ev.wal_records); + col_wal_fpi->Append(ev.wal_fpi); + col_wal_bytes->Append(ev.wal_bytes); + + elog(DEBUG3, "pg_stat_ch: event %zu - CPU time", event_idx); + // CPU time + col_cpu_user_time_us->Append(ev.cpu_user_time_us); + col_cpu_sys_time_us->Append(ev.cpu_sys_time_us); + + elog(DEBUG3, "pg_stat_ch: event %zu - JIT", event_idx); + // JIT + col_jit_functions->Append(ev.jit_functions); + col_jit_generation_time_us->Append(ev.jit_generation_time_us); + col_jit_deform_time_us->Append(ev.jit_deform_time_us); + col_jit_inlining_time_us->Append(ev.jit_inlining_time_us); + col_jit_optimization_time_us->Append(ev.jit_optimization_time_us); + col_jit_emission_time_us->Append(ev.jit_emission_time_us); + + elog(DEBUG3, "pg_stat_ch: event %zu - parallel workers", event_idx); + // Parallel workers + col_parallel_workers_planned->Append(ev.parallel_workers_planned); + col_parallel_workers_launched->Append(ev.parallel_workers_launched); + + elog(DEBUG3, "pg_stat_ch: event %zu - error info", event_idx); + // Error info (5-char SQLSTATE, trimmed) + col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5)); + col_err_elevel->Append(ev.err_elevel); + // Error message (validate length) + uint16 safe_err_msg_len = ev.err_message_len; + if (safe_err_msg_len > PSCH_MAX_ERR_MSG_LEN) { + elog(WARNING, "pg_stat_ch: event %zu has invalid err_message_len %u, clamping", event_idx, + safe_err_msg_len); + safe_err_msg_len = PSCH_MAX_ERR_MSG_LEN; + } + col_err_message->Append(std::string(ev.err_message, safe_err_msg_len)); + + elog(DEBUG3, "pg_stat_ch: event %zu - client context (app_len=%u, addr_len=%u)", event_idx, + ev.application_name_len, ev.client_addr_len); + // Client context - validate lengths + uint8 safe_app_len = ev.application_name_len; + if (safe_app_len > 63) { + elog(WARNING, "pg_stat_ch: event %zu has invalid app_name_len %u, clamping", event_idx, + safe_app_len); + safe_app_len = 63; + } + uint8 safe_addr_len = ev.client_addr_len; + if (safe_addr_len > 45) { + elog(WARNING, "pg_stat_ch: event %zu has invalid client_addr_len %u, clamping", event_idx, + safe_addr_len); + safe_addr_len = 45; + } + col_app->Append(std::string(ev.application_name, safe_app_len)); + col_client_addr->Append(std::string(ev.client_addr, safe_addr_len)); + + event_idx++; + } + elog(DEBUG1, "pg_stat_ch: finished processing %zu events", event_idx); +} + +} // namespace + +// Used to report negative values, which are not supported by OTel. +void LogNegativeValue(const std::string &column_name, int64_t value) { + static std::chrono::steady_clock::time_point last_log = {}; + auto now = std::chrono::steady_clock::now(); + if (now - last_log > std::chrono::seconds(1)) { + elog(WARNING, "pg_stat_ch: Negative value %ld clamped to 0 for column `%s`", + value, column_name.c_str()); + last_log = now; + } +} + +extern "C" { + +bool PschExporterInit(void) { + if (psch_use_otel) { + g_exporter.exporter = MakeOpenTelemetryExporter(); + } else { + g_exporter.exporter = MakeClickHouseExporter(); + } + return g_exporter.exporter->EstablishNewConnection(); +} + +int PschExportBatch(void) { + elog(DEBUG1, "pg_stat_ch: PschExportBatch() called"); + StatsExporter *exporter = g_exporter.exporter.get(); + + if (!exporter->IsConnected()) { + elog(DEBUG1, "pg_stat_ch: client is null, initializing"); + if (!exporter->EstablishNewConnection()) { + PschRecordExportFailure("Failed to connect to exporter backend"); + return 0; + } + } + + elog(DEBUG1, "pg_stat_ch: dequeuing events (max=%d)", psch_batch_max); + std::vector events = DequeueEvents(psch_batch_max); + if (events.empty()) { + elog(DEBUG1, "pg_stat_ch: no events to export"); + return 0; + } + + elog(DEBUG1, "pg_stat_ch: exporting batch of %zu events", events.size()); + ExportEventStats(events, exporter); + + if (exporter->CommitBatch()) { + if (psch_shared_state != nullptr) { + pg_atomic_fetch_add_u64(&psch_shared_state->exported, exporter->NumExported()); + } + PschRecordExportSuccess(); + } + + return exporter->NumExported(); +} + +void PschResetRetryState(void) { + if (g_exporter.exporter) + g_exporter.exporter->ResetFailures(); +} + +int PschGetRetryDelayMs(void) { + StatsExporter *exporter = g_exporter.exporter.get(); + if (!exporter || exporter->NumConsecutiveFailures() <= 0) { + return 0; + } + // Exponential backoff: base * 2^(failures-1), capped at max + int capped_failures = (exporter->NumConsecutiveFailures() > kMaxConsecutiveFailures) + ? kMaxConsecutiveFailures + : exporter->NumConsecutiveFailures(); + int delay = kBaseDelayMs * (1 << (capped_failures - 1)); + return (delay > kMaxDelayMs) ? kMaxDelayMs : delay; +} + +int PschGetConsecutiveFailures(void) { + return g_exporter.exporter->NumConsecutiveFailures(); +} + +void PschExporterShutdown(void) { + g_exporter.exporter.reset(); + elog(LOG, "pg_stat_ch: statistics exporter shutdown"); +} + +} // extern "C" diff --git a/src/export/stats_exporter.h b/src/export/stats_exporter.h new file mode 100644 index 0000000..14a48e6 --- /dev/null +++ b/src/export/stats_exporter.h @@ -0,0 +1,29 @@ +// pg_stat_ch statistics exporter +#ifndef PG_STAT_CH_SRC_EXPORT_STATS_EXPORTER_H_ +#define PG_STAT_CH_SRC_EXPORT_STATS_EXPORTER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "postgres.h" + +// Initialize the statistics exporter (called once at bgworker startup) +bool PschExporterInit(void); + +// Export one batch. Returns number of events exported (0 = queue empty or error). +int PschExportBatch(void); + +// Shutdown the exporter and close connection +void PschExporterShutdown(void); + +// Retry state management for exponential backoff +void PschResetRetryState(void); +int PschGetRetryDelayMs(void); +int PschGetConsecutiveFailures(void); + +#ifdef __cplusplus +} +#endif + +#endif // PG_STAT_CH_SRC_EXPORT_STATS_EXPORTER_H_ diff --git a/src/worker/bgworker.cc b/src/worker/bgworker.cc index 7439e1b..4983142 100644 --- a/src/worker/bgworker.cc +++ b/src/worker/bgworker.cc @@ -41,7 +41,7 @@ extern "C" { #include #include "config/guc.h" -#include "export/clickhouse_exporter.h" +#include "export/stats_exporter.h" #include "worker/bgworker.h" // Custom wait event for pg_stat_activity visibility @@ -164,7 +164,7 @@ static int CalculateSleepMs() { // // Note on blocking: If we're blocked in ClickHouse network I/O when a barrier // signal arrives, we can't process it until the I/O completes. The socket -// timeouts configured in clickhouse_exporter.cc (30 seconds) bound this delay. +// timeouts configured in stats_exporter.cc (30 seconds) bound this delay. static void RunExportCycle(uint32 wait_event) { int sleep_ms = CalculateSleepMs(); (void)WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, sleep_ms, wait_event); diff --git a/t/024_otel_export.pl b/t/024_otel_export.pl new file mode 100644 index 0000000..d21e4ab --- /dev/null +++ b/t/024_otel_export.pl @@ -0,0 +1,174 @@ +#!/usr/bin/env perl +# Test: OTel export functionality +# Prerequisites: OTel Collector container must be running + +use strict; +use warnings; +use lib 't'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +# Skip if Docker not available +if (!psch_otel_available()) { + plan skip_all => 'Docker not available, skipping OTel tests'; +} + +# Check if OTel Collector is running +my $health_check = `curl -sf http://localhost:13133/ 2>/dev/null`; +if ($? != 0) { + plan skip_all => 'OTel Collector not running. Start with: ./scripts/run-tests.sh otel'; +} + +# Initialize node with OTel export enabled +my $node = psch_init_node_with_otel('otel_export', + flush_interval_ms => 100, + batch_max => 100 +); + +# Test 1: Basic export - run queries with unique markers, verify they appear in JSONL +subtest 'basic export' => sub { + psch_reset_stats($node); + + $node->safe_psql('postgres', "SELECT 'otel_basic_1'"); + $node->safe_psql('postgres', "SELECT 'otel_basic_2'"); + $node->safe_psql('postgres', "SELECT 'otel_basic_3'"); + + my $stats = psch_get_stats($node); + cmp_ok($stats->{enqueued}, '>=', 3, 'Events enqueued'); + + # Wait for export via stats counter + psch_wait_for_otel_export($node, 3, 15); + my $exported = psch_get_stats($node)->{exported}; + cmp_ok($exported, '>=', 3, 'Events exported via OTel'); + + # Verify marker queries landed in JSONL + my $found = psch_count_otel_logs_matching(query => qr/otel_basic_/); + cmp_ok($found, '>=', 3, "Marker queries in JSONL (got $found)"); +}; + +# Test 2: Batch sizing - verify all events export when count exceeds batch_max +subtest 'batch sizing' => sub { + psch_reset_stats($node); + + # Generate more events than batch_max (100) + for my $i (1..150) { + $node->safe_psql('postgres', "SELECT 'otel_batch_$i'"); + } + + psch_wait_for_otel_export($node, 150, 20); + + my $stats = psch_get_stats($node); + cmp_ok($stats->{exported}, '>=', 150, 'All events exported'); + + my $found = psch_count_otel_logs_matching(query => qr/otel_batch_/); + cmp_ok($found, '>=', 150, "All batch events in JSONL (got $found)"); +}; + +# Test 3: Immediate flush via pg_stat_ch_flush() +subtest 'immediate flush' => sub { + psch_reset_stats($node); + + $node->safe_psql('postgres', "SELECT 'otel_flush_marker'"); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + # Poll for the marker to appear in JSONL (no arbitrary sleep) + my $found = 0; + my $deadline = time() + 10; + while (time() < $deadline) { + $found = psch_count_otel_logs_matching(query => qr/otel_flush_marker/); + last if $found >= 1; + select(undef, undef, undef, 0.5); + } + cmp_ok($found, '>=', 1, "Flush triggered export (found marker in JSONL)"); +}; + +# Test 4: All fields populated - verify attributes on specific events +subtest 'all fields populated' => sub { + psch_reset_stats($node); + + $node->safe_psql('postgres', 'CREATE TABLE IF NOT EXISTS otel_fields(id int, data text)'); + $node->safe_psql('postgres', "INSERT INTO otel_fields VALUES (1, 'otel_fields_test')"); + $node->safe_psql('postgres', "SELECT 'otel_fields_select' FROM otel_fields"); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + # Wait for our marker to appear + my $deadline = time() + 15; + my $records; + while (time() < $deadline) { + $records = psch_read_otel_log_records(); + my $found = 0; + for my $r (@$records) { + $found++ if defined $r->{query} && $r->{query} =~ /otel_fields_select/; + } + last if $found >= 1; + select(undef, undef, undef, 0.5); + } + + # Find our specific SELECT record + my $select_rec; + for my $r (@$records) { + if (defined $r->{query} && $r->{query} =~ /otel_fields_select/) { + $select_rec = $r; + last; + } + } + + ok(defined $select_rec, 'Found SELECT marker record'); + SKIP: { + skip 'No SELECT record found', 4 unless defined $select_rec; + + cmp_ok($select_rec->{duration_us} // 0, '>', 0, 'duration_us > 0'); + is($select_rec->{db}, 'postgres', 'db = postgres'); + is($select_rec->{cmd_type}, 'SELECT', 'cmd_type = SELECT'); + ok(defined $select_rec->{pid} && $select_rec->{pid} > 0, 'pid is populated'); + } + + # Verify INSERT also present + my $insert_count = psch_count_otel_logs_matching(query => qr/otel_fields_test/); + cmp_ok($insert_count, '>=', 1, 'INSERT event also present'); + + $node->safe_psql('postgres', 'DROP TABLE IF EXISTS otel_fields'); +}; + +# Test 5: Stats accuracy - exported_events tracks actual export count +subtest 'stats accuracy' => sub { + psch_reset_stats($node); + + my $num_queries = 25; + for my $i (1..$num_queries) { + $node->safe_psql('postgres', "SELECT 'otel_stats_$i'"); + } + + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + psch_wait_for_otel_export($node, $num_queries, 15); + + my $stats = psch_get_stats($node); + # exported_events includes our queries plus overhead (stats, flush calls) + # so it should be >= num_queries + cmp_ok($stats->{exported}, '>=', $num_queries, + "exported_events ($stats->{exported}) >= $num_queries"); + + # Verify JSONL has all our marker queries + my $found = psch_count_otel_logs_matching(query => qr/otel_stats_/); + cmp_ok($found, '>=', $num_queries, + "JSONL marker count ($found) >= $num_queries"); +}; + +# Test 6: Connection failure handling - verify graceful behavior +subtest 'connection failure handling' => sub { + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT 1'); + $node->safe_psql('postgres', 'SELECT 2'); + + my $stats = psch_get_stats($node); + cmp_ok($stats->{enqueued}, '>=', 2, 'Queries still enqueued'); + ok(1, 'PostgreSQL survived connection handling'); +}; + +$node->stop(); +done_testing(); diff --git a/t/025_otel_reconnect.pl b/t/025_otel_reconnect.pl new file mode 100644 index 0000000..391ee67 --- /dev/null +++ b/t/025_otel_reconnect.pl @@ -0,0 +1,119 @@ +#!/usr/bin/env perl +# Test: OTel reconnection and failure recovery +# Prerequisites: OTel Collector container must be running + +use strict; +use warnings; +use lib 't'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +# Skip if Docker not available +if (!psch_otel_available()) { + plan skip_all => 'Docker not available, skipping OTel tests'; +} + +# Check if OTel Collector is running +my $health_check = `curl -sf http://localhost:13133/ 2>/dev/null`; +if ($? != 0) { + plan skip_all => 'OTel Collector not running. Start with: ./scripts/run-tests.sh otel'; +} + +# Ensure data dir is set and clear any previous data +psch_clear_otel_data(); + +# Initialize node with OTel export enabled +my $node = psch_init_node_with_otel('otel_reconnect', + flush_interval_ms => 200, + batch_max => 50 +); + +# Test 1: Verify initial connection works +subtest 'initial connection' => sub { + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT 1'); + $node->safe_psql('postgres', 'SELECT 2'); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + psch_wait_for_otel_export($node, 2, 15); + + my $stats = psch_get_stats($node); + cmp_ok($stats->{exported}, '>=', 2, 'Initial export succeeded'); + is($stats->{send_failures}, 0, 'No initial send failures'); +}; + +# Test 2: Simulate connection failure and recovery +subtest 'failure and recovery' => sub { + plan skip_all => 'Destructive test - skipping in CI' if $ENV{CI}; + + # Stop OTel Collector container + diag("Stopping OTel Collector container..."); + system("docker stop psch-otel-collector >/dev/null 2>&1"); + sleep(2); + + # Run queries while collector is down + psch_reset_stats($node); + for my $i (1..10) { + $node->safe_psql('postgres', "SELECT $i"); + } + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + sleep(2); + + my $stats_down = psch_get_stats($node); + cmp_ok($stats_down->{enqueued}, '>=', 10, 'Events still enqueued when collector down'); + + # Restart OTel Collector container + diag("Restarting OTel Collector container..."); + system("docker start psch-otel-collector >/dev/null 2>&1"); + + # Wait for health check + for my $i (1..30) { + my $result = `curl -sf http://localhost:13133/ 2>/dev/null`; + last if $? == 0; + sleep(1); + } + sleep(2); + + # Run more queries to trigger reconnection + for my $i (1..5) { + $node->safe_psql('postgres', "SELECT 'after_restart_$i'"); + } + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + # Wait for export to resume + sleep(5); + + my $stats_up = psch_get_stats($node); + + # After restart, exporter should have recovered + cmp_ok($stats_up->{exported}, '>=', 0, 'Export counter accessible after recovery'); + + # Check that PostgreSQL is still healthy + my $pg_check = $node->safe_psql('postgres', 'SELECT 1'); + is($pg_check, '1', 'PostgreSQL still healthy after collector restart'); +}; + +# Test 3: Verify failure tracking +subtest 'failure tracking' => sub { + my $stats = psch_get_stats($node); + + ok(defined $stats->{send_failures}, 'send_failures field exists'); + cmp_ok($stats->{send_failures}, '>=', 0, 'send_failures is non-negative'); +}; + +# Test 4: Verify error details are tracked +subtest 'error tracking' => sub { + my $result = $node->safe_psql('postgres', q{ + SELECT send_failures FROM pg_stat_ch_stats() + }); + + ok(defined $result, 'Can query send_failures'); +}; + +$node->stop(); +done_testing(); diff --git a/t/psch.pm b/t/psch.pm index d357292..3066e8e 100644 --- a/t/psch.pm +++ b/t/psch.pm @@ -5,6 +5,8 @@ use strict; use warnings; use Exporter 'import'; use PostgreSQL::Test::Cluster; +use JSON::PP; +use File::Temp; our @EXPORT = qw( psch_init_node @@ -17,6 +19,15 @@ our @EXPORT = qw( psch_query_clickhouse_tsv psch_init_node_with_clickhouse psch_wait_for_export + psch_otel_available + psch_start_otel_collector + psch_stop_otel_collector + psch_clear_otel_data + psch_read_otel_log_records + psch_count_otel_logs_matching + psch_get_otel_log_attribute + psch_init_node_with_otel + psch_wait_for_otel_export ); # Initialize a PostgreSQL node with pg_stat_ch loaded @@ -169,4 +180,193 @@ sub psch_wait_for_export { return $stats->{exported}; } +# ============================================================================ +# OpenTelemetry Integration Helpers +# ============================================================================ + +# OTel data directory (set by env or default) +my $_otel_data_dir = $ENV{OTEL_DATA_DIR} // '/tmp/psch-otel-data'; + +# Check if Docker is available for OTel tests +sub psch_otel_available { + return system("docker ps >/dev/null 2>&1") == 0; +} + +# Start OTel Collector container, return data dir path +sub psch_start_otel_collector { + my $project_dir = $ENV{PROJECT_DIR} // '.'; + my $compose_file = "$project_dir/docker/docker-compose.otel.yml"; + + # Create temp dir for JSONL output + $_otel_data_dir = $ENV{OTEL_DATA_DIR} // File::Temp::tempdir( + 'psch-otel-XXXX', TMPDIR => 1, CLEANUP => 1); + + # Collector runs as UID 10001; needs write access + chmod 0777, $_otel_data_dir; + + $ENV{OTEL_DATA_DIR} = $_otel_data_dir; + + # Start container + system("docker compose -f $compose_file up -d") == 0 + or die "Failed to start OTel Collector container"; + + # Wait for health check (up to 30 seconds) + for my $i (1..60) { + my $result = `curl -sf http://localhost:13133/ 2>/dev/null`; + return $_otel_data_dir if $? == 0; + select(undef, undef, undef, 0.5); + } + die "OTel Collector failed to become healthy"; +} + +# Stop OTel Collector container +sub psch_stop_otel_collector { + my $project_dir = $ENV{PROJECT_DIR} // '.'; + my $compose_file = "$project_dir/docker/docker-compose.otel.yml"; + system("docker compose -f $compose_file down -v"); +} + +# No-op kept for API compatibility; tests use content-based matching +sub psch_clear_otel_data { } + +# Parse ALL JSONL log records, flatten attributes +# Returns arrayref of hashrefs with flattened attributes +sub psch_read_otel_log_records { + my $logs_file = "$_otel_data_dir/logs.jsonl"; + return [] unless -f $logs_file; + + open(my $fh, '<', $logs_file) or return []; + my @records; + + while (my $line = <$fh>) { + chomp $line; + next unless $line; + + my $data; + eval { $data = decode_json($line); }; + next if $@; + + # Navigate: resourceLogs[].scopeLogs[].logRecords[] + my $resource_logs = $data->{resourceLogs} // []; + for my $rl (@$resource_logs) { + my $scope_logs = $rl->{scopeLogs} // []; + for my $sl (@$scope_logs) { + my $log_records = $sl->{logRecords} // []; + for my $lr (@$log_records) { + my $flat = _flatten_attributes($lr->{attributes} // []); + push @records, $flat; + } + } + } + } + close $fh; + return \@records; +} + +# Flatten OTLP attributes array into {name => value} hash +sub _flatten_attributes { + my ($attrs) = @_; + my %flat; + for my $attr (@$attrs) { + my $key = $attr->{key}; + my $val = $attr->{value}; + if (defined $val->{stringValue}) { + $flat{$key} = $val->{stringValue}; + } elsif (defined $val->{intValue}) { + $flat{$key} = $val->{intValue}; + } elsif (defined $val->{doubleValue}) { + $flat{$key} = $val->{doubleValue}; + } elsif (defined $val->{boolValue}) { + $flat{$key} = $val->{boolValue}; + } else { + $flat{$key} = undef; + } + } + return \%flat; +} + +# Count log records matching attribute filters (values can be regex) +sub psch_count_otel_logs_matching { + my (%criteria) = @_; + my $records = psch_read_otel_log_records(); + my $count = 0; + + for my $rec (@$records) { + my $match = 1; + for my $key (keys %criteria) { + my $expected = $criteria{$key}; + my $actual = $rec->{$key}; + if (!defined $actual) { + $match = 0; + last; + } + if (ref $expected eq 'Regexp') { + unless ($actual =~ $expected) { + $match = 0; + last; + } + } else { + unless ($actual eq $expected) { + $match = 0; + last; + } + } + } + $count++ if $match; + } + return $count; +} + +# Get a specific attribute value from a record hash +sub psch_get_otel_log_attribute { + my ($record, $name) = @_; + return $record->{$name}; +} + +# Initialize a node with OTel export enabled +sub psch_init_node_with_otel { + my ($name, %opts) = @_; + + my $queue_capacity = $opts{queue_capacity} // 65536; + my $flush_interval_ms = $opts{flush_interval_ms} // 100; + my $batch_max = $opts{batch_max} // 1000; + my $enabled = $opts{enabled} // 'on'; + my $otel_endpoint = $opts{otel_endpoint} // 'localhost:4317'; + my $hostname = $opts{hostname} // 'test-host'; + + my $node = PostgreSQL::Test::Cluster->new($name); + $node->init(); + $node->append_conf('postgresql.conf', qq{ +shared_preload_libraries = 'pg_stat_ch' +pg_stat_ch.enabled = $enabled +pg_stat_ch.use_otel = on +pg_stat_ch.otel_endpoint = '$otel_endpoint' +pg_stat_ch.hostname = '$hostname' +pg_stat_ch.queue_capacity = $queue_capacity +pg_stat_ch.flush_interval_ms = $flush_interval_ms +pg_stat_ch.batch_max = $batch_max +}); + $node->start(); + $node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_ch'); + + return $node; +} + +# Wait for events to be exported via OTel (polls stats counter) +# Returns the final exported count, or current count on timeout +sub psch_wait_for_otel_export { + my ($node, $min_expected, $timeout_secs) = @_; + $timeout_secs //= 15; + + my $start_time = time(); + while (time() - $start_time < $timeout_secs) { + my $stats = psch_get_stats($node); + return $stats->{exported} if $stats->{exported} >= $min_expected; + select(undef, undef, undef, 0.5); + } + + my $stats = psch_get_stats($node); + return $stats->{exported}; +} + 1; diff --git a/test/regression/expected/guc.out b/test/regression/expected/guc.out index 4e6e2df..9e4ca91 100644 --- a/test/regression/expected/guc.out +++ b/test/regression/expected/guc.out @@ -14,9 +14,12 @@ SELECT name FROM pg_settings WHERE name LIKE 'pg_stat_ch.%' ORDER BY name COLLAT pg_stat_ch.debug_force_locked_overflow pg_stat_ch.enabled pg_stat_ch.flush_interval_ms + pg_stat_ch.hostname pg_stat_ch.log_min_elevel + pg_stat_ch.otel_endpoint pg_stat_ch.queue_capacity -(13 rows) + pg_stat_ch.use_otel +(16 rows) SHOW pg_stat_ch.enabled; pg_stat_ch.enabled diff --git a/third_party/opentelemetry-cpp b/third_party/opentelemetry-cpp new file mode 160000 index 0000000..3143f93 --- /dev/null +++ b/third_party/opentelemetry-cpp @@ -0,0 +1 @@ +Subproject commit 3143f93098a5f8fe59b071f3e1de7aa485e8c834