Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.core.instrument.binder.db;

import io.micrometer.core.instrument.*;
Expand All @@ -26,8 +27,11 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.DoubleSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* {@link MeterBinder} for a PostgreSQL database.
Expand All @@ -36,6 +40,7 @@
* @author Jon Schneider
* @author Johnny Lim
* @author Markus Dobel
* @author Hari Mani
* @since 1.1.0
*/
@NullMarked
Expand All @@ -53,8 +58,16 @@ public class PostgreSQLDatabaseMetrics implements MeterBinder {

private static final String QUERY_BUFFERS_BACKEND = getBgWriterQuery("buffers_backend");

private static final String BACKEND_BUFFER_WRITES = SELECT + "SUM(writes) FROM pg_stat_io";

private static final String QUERY_BUFFERS_CHECKPOINT = getBgWriterQuery("buffers_checkpoint");

private static final String CHECKPOINTER_BUFFERS_WRITTEN = getStatCheckpointerQuery("buffers_written");

private static final String TIMED_CHECKPOINTS_COUNT = getStatCheckpointerQuery("num_timed");

private static final String REQUESTED_CHECKPOINTS_COUNT = getStatCheckpointerQuery("num_requested");

private final String database;

private final DataSource postgresDataSource;
Expand Down Expand Up @@ -83,6 +96,8 @@ public class PostgreSQLDatabaseMetrics implements MeterBinder {

private final String queryTransactionCount;

private final Version serverVersion;

public PostgreSQLDatabaseMetrics(DataSource postgresDataSource, String database) {
this(postgresDataSource, database, Tags.empty());
}
Expand All @@ -103,6 +118,7 @@ public PostgreSQLDatabaseMetrics(DataSource postgresDataSource, String database,
this.queryBlockHits = getDBStatQuery(database, "blks_hit");
this.queryBlockReads = getDBStatQuery(database, "blks_read");
this.queryTransactionCount = getDBStatQuery(database, "xact_commit + xact_rollback");
this.serverVersion = getServerVersion();
}

private static Tag createDbTag(String database) {
Expand Down Expand Up @@ -191,21 +207,22 @@ private void registerRowCountMetrics(MeterRegistry registry) {
private void registerCheckpointMetrics(MeterRegistry registry) {
FunctionCounter
.builder(Names.CHECKPOINTS_TIMED, postgresDataSource,
dataSource -> resettableFunctionalCounter(Names.CHECKPOINTS_TIMED, this::getTimedCheckpointsCount))
dataSource -> resettableFunctionalCounter(Names.CHECKPOINTS_TIMED,
getTimedCheckpointsCountSupplier()))
.tags(tags)
.description("Number of checkpoints timed")
.register(registry);
FunctionCounter
.builder(Names.CHECKPOINTS_REQUESTED, postgresDataSource,
dataSource -> resettableFunctionalCounter(Names.CHECKPOINTS_REQUESTED,
this::getRequestedCheckpointsCount))
getRequestedCheckpointsCountSupplier()))
.tags(tags)
.description("Number of checkpoints requested")
.register(registry);

FunctionCounter
.builder(Names.BUFFERS_CHECKPOINT, postgresDataSource,
dataSource -> resettableFunctionalCounter(Names.BUFFERS_CHECKPOINT, this::getBuffersCheckpoint))
dataSource -> resettableFunctionalCounter(Names.BUFFERS_CHECKPOINT, getBuffersCheckpointSupplier()))
.tags(tags)
.description("Number of buffers written during checkpoints")
.register(registry);
Expand All @@ -217,7 +234,7 @@ private void registerCheckpointMetrics(MeterRegistry registry) {
.register(registry);
FunctionCounter
.builder(Names.BUFFERS_BACKEND, postgresDataSource,
dataSource -> resettableFunctionalCounter(Names.BUFFERS_BACKEND, this::getBuffersBackend))
dataSource -> resettableFunctionalCounter(Names.BUFFERS_BACKEND, getBuffersBackendSupplier()))
.tags(tags)
.description("Number of buffers written directly by a backend")
.register(registry);
Expand Down Expand Up @@ -272,24 +289,36 @@ private Long getDeadTupleCount() {
return runQuery(QUERY_DEAD_TUPLE_COUNT);
}

private Long getTimedCheckpointsCount() {
return runQuery(QUERY_TIMED_CHECKPOINTS_COUNT);
private DoubleSupplier getTimedCheckpointsCountSupplier() {
if (this.serverVersion.isAbove(Version.V17)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the new query needs to be run if the version is 17 or above, but this looks like it only runs if the version is strictly greater than 17.0. Is that the right logic?

return () -> runQuery(TIMED_CHECKPOINTS_COUNT);
}
return () -> runQuery(QUERY_TIMED_CHECKPOINTS_COUNT);
}

private Long getRequestedCheckpointsCount() {
return runQuery(QUERY_REQUESTED_CHECKPOINTS_COUNT);
private DoubleSupplier getRequestedCheckpointsCountSupplier() {
if (this.serverVersion.isAbove(Version.V17)) {
return () -> runQuery(REQUESTED_CHECKPOINTS_COUNT);
}
return () -> runQuery(QUERY_REQUESTED_CHECKPOINTS_COUNT);
}

private Long getBuffersClean() {
return runQuery(QUERY_BUFFERS_CLEAN);
}

private Long getBuffersBackend() {
return runQuery(QUERY_BUFFERS_BACKEND);
private DoubleSupplier getBuffersBackendSupplier() {
if (this.serverVersion.isAbove(Version.V17)) {
return () -> runQuery(BACKEND_BUFFER_WRITES);
}
return () -> runQuery(QUERY_BUFFERS_BACKEND);
}

private Long getBuffersCheckpoint() {
return runQuery(QUERY_BUFFERS_CHECKPOINT);
private DoubleSupplier getBuffersCheckpointSupplier() {
if (this.serverVersion.isAbove(Version.V17)) {
return () -> runQuery(CHECKPOINTER_BUFFERS_WRITTEN);
}
return () -> runQuery(QUERY_BUFFERS_CHECKPOINT);
}

/**
Expand All @@ -309,17 +338,26 @@ Double resettableFunctionalCounter(String functionalCounterKey, DoubleSupplier f
return correctedValue;
}

private Version getServerVersion() {
return runQuery("SHOW server_version", resultSet -> resultSet.getString(1)).map(Version::parse)
.orElse(Version.EMPTY);
}

private Long runQuery(String query) {
return runQuery(query, resultSet -> resultSet.getLong(1)).orElse(0L);
}

private <T> Optional<T> runQuery(final String query, final ResultSetGetter<T> resultSetGetter) {
try (Connection connection = postgresDataSource.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query)) {
if (resultSet.next()) {
return resultSet.getLong(1);
return Optional.of(resultSetGetter.get(resultSet));
}
}
catch (SQLException ignored) {
}
return 0L;
return Optional.empty();
}

private static String getDBStatQuery(String database, String statName) {
Expand All @@ -334,6 +372,10 @@ private static String getBgWriterQuery(String statName) {
return SELECT + statName + " FROM pg_stat_bgwriter";
}

private static String getStatCheckpointerQuery(String statName) {
return SELECT + statName + " FROM pg_stat_checkpointer";
}

static final class Names {

static final String SIZE = of("size");
Expand Down Expand Up @@ -366,4 +408,59 @@ private Names() {

}

@FunctionalInterface
interface ResultSetGetter<T> {

T get(ResultSet resultSet) throws SQLException;

}

static final class Version {

private static final Pattern VERSION_PATTERN = Pattern.compile("^(\\d+\\.\\d+).*");

static final Version EMPTY = new Version(0, 0);

static final Version V17 = new Version(17, 0);

final int majorVersion;

final int minorVersion;

static Version parse(String versionString) {
try {
final Matcher matcher = VERSION_PATTERN.matcher(versionString);
if (!matcher.matches()) {
return EMPTY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there expected cases in which the version string the server returns won't be parseable by this method? I'm wondering if we should log something when parsing the version fails, or perhaps when getServerVersion returns Version.EMPTY. Otherwise, users may not notice the issue early on and it will delay feedback about a situation we potentially need to fix this code to handle. What do you think?

}
final String[] versionArr = matcher.group(1).split("\\.", 2);
return new Version(Integer.parseInt(versionArr[0]), Integer.parseInt(versionArr[1]));
}
catch (Exception exception) {
return EMPTY;
}
}

Version(int majorVersion, int minorVersion) {
this.majorVersion = majorVersion;
this.minorVersion = minorVersion;
}

public boolean isAbove(final Version other) {
if (this.majorVersion > other.majorVersion) {
return true;
}
if (this.majorVersion < other.majorVersion) {
return false;
}
return this.minorVersion >= other.minorVersion;
}

@Override
public String toString() {
return majorVersion + "." + minorVersion;
}

}

}
Loading