Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 48 additions & 104 deletions repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
package tech.ydb.yoj.repository.db;

import com.google.common.base.Preconditions;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import io.prometheus.client.Histogram.Timer;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.With;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import tech.ydb.yoj.repository.db.cache.TransactionLog;
import tech.ydb.yoj.repository.db.exception.QueryInterruptedException;
import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.util.lang.Strings;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static java.lang.String.format;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is unused, please remove it (Checkstyle)

Expand All @@ -44,38 +37,7 @@
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class StdTxManager implements TxManager, TxManagerState {
private static final Logger log = LoggerFactory.getLogger(StdTxManager.class);

private static final int DEFAULT_MAX_ATTEMPT_COUNT = 100;
private static final double[] TX_ATTEMPTS_BUCKETS = new double[]
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 35, 40, 45, 50, 60, 70, 80, 90, 100};
private static final double[] DURATION_BUCKETS = {
.001, .0025, .005, .0075,
.01, .025, .05, .075,
.1, .25, .5, .75,
1, 2.5, 5, 7.5,
10, 25, 50, 75,
100
};
private static final Histogram totalDuration = Histogram.build("tx_total_duration_seconds", "Tx total duration (seconds)")
.labelNames("tx_name")
.buckets(DURATION_BUCKETS)
.register();
private static final Histogram attemptDuration = Histogram.build("tx_attempt_duration_seconds", "Tx attempt duration (seconds)")
.labelNames("tx_name")
.buckets(DURATION_BUCKETS)
.register();
private static final Histogram attempts = Histogram.build("tx_attempts", "Tx attempts spent to success")
.labelNames("tx_name")
.buckets(TX_ATTEMPTS_BUCKETS)
.register();
private static final Counter results = Counter.build("tx_result", "Tx commits/rollbacks/fails")
.labelNames("tx_name", "result")
.register();
private static final Counter retries = Counter.build("tx_retries", "Tx retry reasons")
.labelNames("tx_name", "reason")
.register();
private static final AtomicLong txLogIdSeq = new AtomicLong();

@Getter
private final Repository repository;
Expand All @@ -90,8 +52,8 @@ public final class StdTxManager implements TxManager, TxManagerState {
private final SeparatePolicy separatePolicy;
@With
private final TxNameGenerator txNameGenerator;

private final long txLogId = txLogIdSeq.incrementAndGet();
@With
private final TracerFactory tracerFactory;

public StdTxManager(@NonNull Repository repository) {
this(
Expand All @@ -100,7 +62,8 @@ public StdTxManager(@NonNull Repository repository) {
/* logContext */ null,
/* options */ TxOptions.create(SERIALIZABLE_READ_WRITE),
/* separatePolicy */ SeparatePolicy.LOG,
/* txNameGenerator */ new TxNameGenerator.Default()
/* txNameGenerator */ new TxNameGenerator.Default(),
/* tracerFactory */ StdTxManagerTracer.Default::new
);
}

Expand Down Expand Up @@ -191,89 +154,64 @@ public void tx(Runnable runnable) {
@Override
public <T> T tx(Supplier<T> supplier) {
TxName txName = txNameGenerator.generate();
String name = txName.name();

checkSeparatePolicy(separatePolicy, txName.logName());

RetryableException lastRetryableException = null;
TxImpl lastTx = null;
try (Timer ignored = totalDuration.labels(name).startTimer()) {
for (int attempt = 1; attempt <= maxAttemptCount; attempt++) {
try {
attempts.labels(name).observe(attempt);
T result;
try (
var ignored1 = attemptDuration.labels(name).startTimer();
var ignored2 = MDC.putCloseable("tx", formatTx(txName));
var ignored3 = MDC.putCloseable("tx-id", formatTxId());
var ignored4 = MDC.putCloseable("tx-name", txName.logName())
) {
RepositoryTransaction transaction = repository.startTransaction(options);
lastTx = new TxImpl(name, transaction, options);
result = lastTx.run(supplier);
}

if (options.isDryRun()) {
results.labels(name, "rollback").inc();
results.labels(name, "dry_run").inc();
} else {
results.labels(name, "commit").inc();
}
return result;
} catch (RetryableException e) {
retries.labels(name, getExceptionNameForMetric(e)).inc();
lastRetryableException = e;
if (attempt + 1 <= maxAttemptCount) {
try {
MILLISECONDS.sleep(e.getRetryPolicy().calcDuration(attempt).toMillis());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new QueryInterruptedException("DB query interrupted", ex);
StdTxManagerTracer tracer = tracerFactory.create(options, txName);

checkSeparatePolicy(separatePolicy, tracer);

AtomicReference<TxImpl> lastTxContainer = new AtomicReference<>(null);
try {
return tracer.wrapTx(() -> {
RetryableException lastRetryableException = null;
for (int attempt = 1; attempt <= maxAttemptCount; attempt++) {
try {
return tracer.wrapAttempt(logContext, attempt, () -> {
RepositoryTransaction transaction = repository.startTransaction(options);
var lastTx = new TxImpl(txName.name(), transaction, options);
lastTxContainer.set(lastTx);
return lastTx.run(supplier);
});
} catch (RetryableException e) {
tracer.onRetry(e);
lastRetryableException = e;
if (attempt + 1 <= maxAttemptCount) {
try {
MILLISECONDS.sleep(e.getRetryPolicy().calcDuration(attempt).toMillis());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new QueryInterruptedException("DB query interrupted", ex);
}
}
} catch (Exception e) {
tracer.onException();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracer.onException(e) will be more extensible

throw e;
}
} catch (Exception e) {
results.labels(name, "rollback").inc();
throw e;
}
}
results.labels(name, "fail").inc();
tracer.onRetryExceeded();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as onException(e), onRetryExceeded(e) will offer more extensibility


throw requireNonNull(lastRetryableException).rethrow();
throw requireNonNull(lastRetryableException).rethrow();
});
} finally {
TxImpl lastTx = lastTxContainer.get();
if (!options.isDryRun() && lastTx != null) {
lastTx.runDeferredFinally();
}
}
}

private static void checkSeparatePolicy(SeparatePolicy separatePolicy, String txName) {
private static void checkSeparatePolicy(SeparatePolicy separatePolicy, StdTxManagerTracer tracer) {
if (!Tx.Current.exists()) {
return;
}

switch (separatePolicy) {
case ALLOW -> {
}
case STRICT ->
throw new IllegalStateException(format("Transaction %s was run when another transaction is active", txName));
case LOG ->
log.warn("Transaction '{}' was run when another transaction is active. Perhaps unexpected behavior. " +
"Use TxManager.separate() to avoid this message", txName);
case STRICT -> throw new IllegalStateException("Transaction was run when another transaction is active");
case LOG -> tracer.onLogSeparatePolicy();
}
}

private String getExceptionNameForMetric(RetryableException e) {
return Strings.removeSuffix(e.getClass().getSimpleName(), "Exception");
}

private String formatTx(TxName txName) {
return formatTxId() + " {" + txName.logName() + (logContext != null ? "/" + logContext : "") + "}";
}

private String formatTxId() {
return Strings.leftPad(Long.toUnsignedString(txLogId, 36), 6, '0') + options.getIsolationLevel().getTxIdSuffix();
}

@Override
public TxManagerState getState() {
return this;
Expand Down Expand Up @@ -343,7 +281,8 @@ private class ReadonlyBuilderImpl implements ReadonlyBuilder {
@Override
public ReadonlyBuilder withStatementIsolationLevel(IsolationLevel isolationLevel) {
Preconditions.checkArgument(isolationLevel.isReadOnly(),
"readOnly() can only be used with a read-only tx isolation level, but got: %s", isolationLevel);
"readOnly() can only be used with a read-only tx isolation level, but got: %s", isolationLevel
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor whitespace change. I'd leave it as-is instead

);
return withOptions(options.withIsolationLevel(isolationLevel));
}

Expand All @@ -363,4 +302,9 @@ private enum SeparatePolicy {
LOG,
STRICT
}

@FunctionalInterface
public interface TracerFactory {
StdTxManagerTracer create(TxOptions options, TxName txName);
}
}
Loading
Loading