Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.core.instrument.binder.db;

import io.micrometer.core.instrument.*;
Expand All @@ -26,8 +27,11 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.DoubleSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* {@link MeterBinder} for a PostgreSQL database.
Expand All @@ -36,6 +40,7 @@
* @author Jon Schneider
* @author Johnny Lim
* @author Markus Dobel
* @apiNote Hari Mani
* @since 1.1.0
*/
@NullMarked
Expand All @@ -53,8 +58,16 @@ public class PostgreSQLDatabaseMetrics implements MeterBinder {

private static final String QUERY_BUFFERS_BACKEND = getBgWriterQuery("buffers_backend");

private static final Stat BACKEND_BUFFER_WRITES = new Stat("pg_stat_io", "SUM(writes)");

private static final String QUERY_BUFFERS_CHECKPOINT = getBgWriterQuery("buffers_checkpoint");

private static final Stat CHECKPOINTER_BUFFERS_WRITTEN = new Stat("pg_stat_checkpointer", "buffers_written");

private static final Stat TIMED_CHECKPOINTS_COUNT = new Stat("pg_stat_checkpointer", "num_timed");

private static final Stat REQUESTED_CHECKPOINTS_COUNT = new Stat("pg_stat_checkpointer", "num_requested");

private final String database;

private final DataSource postgresDataSource;
Expand Down Expand Up @@ -83,6 +96,8 @@ public class PostgreSQLDatabaseMetrics implements MeterBinder {

private final String queryTransactionCount;

private final Version serverVersion;

public PostgreSQLDatabaseMetrics(DataSource postgresDataSource, String database) {
this(postgresDataSource, database, Tags.empty());
}
Expand All @@ -103,6 +118,7 @@ public PostgreSQLDatabaseMetrics(DataSource postgresDataSource, String database,
this.queryBlockHits = getDBStatQuery(database, "blks_hit");
this.queryBlockReads = getDBStatQuery(database, "blks_read");
this.queryTransactionCount = getDBStatQuery(database, "xact_commit + xact_rollback");
this.serverVersion = getServerVersion();
}

private static Tag createDbTag(String database) {
Expand Down Expand Up @@ -273,10 +289,16 @@ private Long getDeadTupleCount() {
}

private Long getTimedCheckpointsCount() {
if (this.serverVersion.isAbove(Version.V17)) {
return runQuery(TIMED_CHECKPOINTS_COUNT.getQuery());
}
return runQuery(QUERY_TIMED_CHECKPOINTS_COUNT);
}

private Long getRequestedCheckpointsCount() {
if (this.serverVersion.isAbove(Version.V17)) {
return runQuery(REQUESTED_CHECKPOINTS_COUNT.getQuery());
}
return runQuery(QUERY_REQUESTED_CHECKPOINTS_COUNT);
}

Expand All @@ -285,10 +307,16 @@ private Long getBuffersClean() {
}

private Long getBuffersBackend() {
if (this.serverVersion.isAbove(Version.V17)) {
return runQuery(BACKEND_BUFFER_WRITES.getQuery());
}
return runQuery(QUERY_BUFFERS_BACKEND);
}

private Long getBuffersCheckpoint() {
if (this.serverVersion.isAbove(Version.V17)) {
return runQuery(CHECKPOINTER_BUFFERS_WRITTEN.getQuery());
}
return runQuery(QUERY_BUFFERS_CHECKPOINT);
}

Expand All @@ -309,17 +337,26 @@ Double resettableFunctionalCounter(String functionalCounterKey, DoubleSupplier f
return correctedValue;
}

private Version getServerVersion() {
return runQuery("SHOW server_version", resultSet -> resultSet.getString(1)).map(Version::parse)
.orElse(Version.EMPTY);
}

private Long runQuery(String query) {
return runQuery(query, resultSet -> resultSet.getLong(1)).orElse(0L);
}

private <T> Optional<T> runQuery(final String query, final ResultSetGetter<T> resultSetGetter) {
try (Connection connection = postgresDataSource.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query)) {
if (resultSet.next()) {
return resultSet.getLong(1);
return Optional.of(resultSetGetter.get(resultSet));
}
}
catch (SQLException ignored) {
}
return 0L;
return Optional.empty();
}

private static String getDBStatQuery(String database, String statName) {
Expand Down Expand Up @@ -366,4 +403,76 @@ private Names() {

}

@FunctionalInterface
interface ResultSetGetter<T> {

T get(ResultSet resultSet) throws SQLException;

}

static class Stat {

private final String statView;

private final String statName;

public Stat(String statView, String statName) {
this.statView = statView;
this.statName = statName;
}

public String getQuery() {
return String.format("SELECT %s FROM %s;", this.statName, this.statView);
}

}

static final class Version {

private static final Pattern VERSION_PATTERN = Pattern.compile("^(\\d+\\.\\d+).*");

static final Version EMPTY = new Version(0, 0);

static final Version V17 = new Version(17, 0);

final int majorVersion;

final int minorVersion;

static Version parse(String versionString) {
try {
final Matcher matcher = VERSION_PATTERN.matcher(versionString);
if (!matcher.matches()) {
return EMPTY;
}
final String[] versionArr = matcher.group(1).split("\\.", 2);
return new Version(Integer.parseInt(versionArr[0]), Integer.parseInt(versionArr[1]));
}
catch (Exception exception) {
return EMPTY;
}
}

Version(int majorVersion, int minorVersion) {
this.majorVersion = majorVersion;
this.minorVersion = minorVersion;
}

public boolean isAbove(final Version other) {
if (this.majorVersion > other.majorVersion) {
return true;
}
if (this.majorVersion < other.majorVersion) {
return false;
}
return this.minorVersion >= other.minorVersion;
}

@Override
public String toString() {
return majorVersion + "." + minorVersion;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.core.instrument.binder.db;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.search.RequiredSearch;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

import static io.micrometer.core.instrument.binder.db.PostgreSQLDatabaseMetrics.Names.*;
import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Hari Mani
*/
@Testcontainers
@Tag("docker")
class PostgreSQL17DatabaseMetricsIntegrationTest {

@Container
private final PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>(getDockerImageName());

private final MeterRegistry registry = new SimpleMeterRegistry();

private DataSource dataSource;

private Tags tags;

// statistics are updated only every PGSTAT_STAT_INTERVAL, which is 500ms. Add a bit
// for stable tests.
private static final long PGSTAT_STAT_INTERVAL = 500L + 50L;

@BeforeEach
void setup() {
dataSource = createDataSource();
tags = Tags.of("database", postgres.getDatabaseName());

// tag::setup[]
new PostgreSQLDatabaseMetrics(dataSource, postgres.getDatabaseName()).bindTo(registry);
// end::setup[]
}

@Test
void gaugesAreNotZero() throws Exception {
/* create noise to increment gauges */
// tag::result[]
executeSql("CREATE TABLE gauge_test_table (val varchar(255))",
"INSERT INTO gauge_test_table (val) VALUES ('foo')", "UPDATE gauge_test_table SET val = 'bar'",
"SELECT * FROM gauge_test_table", "DELETE FROM gauge_test_table");
Thread.sleep(PGSTAT_STAT_INTERVAL);

final List<String> GAUGES = Arrays.asList(SIZE, CONNECTIONS, ROWS_DEAD, LOCKS);

for (String name : GAUGES) {
assertThat(get(name).gauge().value()).withFailMessage("Gauge " + name + " is zero.").isGreaterThan(0);
}
// end::result[]
}

@Test
void countersAreNotZero() throws Exception {
/* create noise to increment counters */
// @formatter:off
executeSql(
"CREATE TABLE counter_test_table (val varchar(255))",
"INSERT INTO counter_test_table (val) VALUES ('foo')",
"UPDATE counter_test_table SET val = 'bar'",
"SELECT * FROM counter_test_table",
"DELETE FROM counter_test_table"
);
// @formatter:on
Thread.sleep(PGSTAT_STAT_INTERVAL);

final List<String> COUNTERS = Arrays.asList(BLOCKS_HITS, BLOCKS_READS, TRANSACTIONS, ROWS_FETCHED,
ROWS_INSERTED, ROWS_UPDATED, ROWS_DELETED, BUFFERS_CHECKPOINT);

/*
* the following counters are zero on a clean database and hard to increase
* reliably
*/
final List<String> ZERO_COUNTERS = Arrays.asList(TEMP_WRITES, CHECKPOINTS_TIMED, CHECKPOINTS_REQUESTED,
BUFFERS_CLEAN, BUFFERS_BACKEND);

for (String name : COUNTERS) {
assertThat(get(name).functionCounter().count()).withFailMessage("Counter " + name + " is zero.")
.isGreaterThan(0);
}
}

@Test
void deadTuplesGaugeIncreases() throws Exception {
final double deadRowsBefore = get(ROWS_DEAD).gauge().value();

executeSql("CREATE TABLE dead_tuples_test_table (val varchar(255))",
"INSERT INTO dead_tuples_test_table (val) VALUES ('foo')",
"UPDATE dead_tuples_test_table SET val = 'bar'");

// wait for stats to be updated
Thread.sleep(PGSTAT_STAT_INTERVAL);
assertThat(get(ROWS_DEAD).gauge().value()).isGreaterThan(deadRowsBefore);
}

private DataSource createDataSource() {
final PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setURL(postgres.getJdbcUrl());
dataSource.setUser(postgres.getUsername());
dataSource.setPassword(postgres.getPassword());
dataSource.setDatabaseName(postgres.getDatabaseName());
return dataSource;
}

private void executeSql(String... statements) throws SQLException {
try (final Connection connection = dataSource.getConnection()) {
executeSql(connection, statements);
}
}

private void executeSql(Connection connection, String... statements) throws SQLException {
for (String sql : statements) {
try (final PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.execute();
}
}
}

private RequiredSearch get(final String name) {
return registry.get(name).tags(tags);
}

private static DockerImageName getDockerImageName() {
return DockerImageName.parse("postgres:17");
}

}
Loading