11package tech .ydb .yoj .repository .db ;
22
33import com .google .common .base .Preconditions ;
4- import io .prometheus .client .Counter ;
5- import io .prometheus .client .Histogram ;
6- import io .prometheus .client .Histogram .Timer ;
74import lombok .AccessLevel ;
85import lombok .AllArgsConstructor ;
96import lombok .Getter ;
107import lombok .NonNull ;
118import lombok .RequiredArgsConstructor ;
129import lombok .With ;
13- import org .slf4j .Logger ;
14- import org .slf4j .LoggerFactory ;
15- import org .slf4j .MDC ;
1610import tech .ydb .yoj .repository .db .cache .TransactionLog ;
1711import tech .ydb .yoj .repository .db .exception .QueryInterruptedException ;
1812import tech .ydb .yoj .repository .db .exception .RetryableException ;
19- import tech .ydb .yoj .util .lang .Strings ;
2013
2114import javax .annotation .Nullable ;
2215import java .time .Duration ;
23- import java .util .concurrent .atomic .AtomicLong ;
16+ import java .util .concurrent .atomic .AtomicReference ;
2417import java .util .function .Supplier ;
2518
2619import static java .lang .String .format ;
4437 */
4538@ RequiredArgsConstructor (access = AccessLevel .PRIVATE )
4639public final class StdTxManager implements TxManager , TxManagerState {
47- private static final Logger log = LoggerFactory .getLogger (StdTxManager .class );
48-
4940 private static final int DEFAULT_MAX_ATTEMPT_COUNT = 100 ;
50- private static final double [] TX_ATTEMPTS_BUCKETS = new double []
51- {1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 12 , 14 , 16 , 18 , 20 , 25 , 35 , 40 , 45 , 50 , 60 , 70 , 80 , 90 , 100 };
52- private static final double [] DURATION_BUCKETS = {
53- .001 , .0025 , .005 , .0075 ,
54- .01 , .025 , .05 , .075 ,
55- .1 , .25 , .5 , .75 ,
56- 1 , 2.5 , 5 , 7.5 ,
57- 10 , 25 , 50 , 75 ,
58- 100
59- };
60- private static final Histogram totalDuration = Histogram .build ("tx_total_duration_seconds" , "Tx total duration (seconds)" )
61- .labelNames ("tx_name" )
62- .buckets (DURATION_BUCKETS )
63- .register ();
64- private static final Histogram attemptDuration = Histogram .build ("tx_attempt_duration_seconds" , "Tx attempt duration (seconds)" )
65- .labelNames ("tx_name" )
66- .buckets (DURATION_BUCKETS )
67- .register ();
68- private static final Histogram attempts = Histogram .build ("tx_attempts" , "Tx attempts spent to success" )
69- .labelNames ("tx_name" )
70- .buckets (TX_ATTEMPTS_BUCKETS )
71- .register ();
72- private static final Counter results = Counter .build ("tx_result" , "Tx commits/rollbacks/fails" )
73- .labelNames ("tx_name" , "result" )
74- .register ();
75- private static final Counter retries = Counter .build ("tx_retries" , "Tx retry reasons" )
76- .labelNames ("tx_name" , "reason" )
77- .register ();
78- private static final AtomicLong txLogIdSeq = new AtomicLong ();
7941
8042 @ Getter
8143 private final Repository repository ;
@@ -90,8 +52,8 @@ public final class StdTxManager implements TxManager, TxManagerState {
9052 private final SeparatePolicy separatePolicy ;
9153 @ With
9254 private final TxNameGenerator txNameGenerator ;
93-
94- private final long txLogId = txLogIdSeq . incrementAndGet () ;
55+ @ With
56+ private final TracerFactory tracerFactory ;
9557
9658 public StdTxManager (@ NonNull Repository repository ) {
9759 this (
@@ -100,7 +62,8 @@ public StdTxManager(@NonNull Repository repository) {
10062 /* logContext */ null ,
10163 /* options */ TxOptions .create (SERIALIZABLE_READ_WRITE ),
10264 /* separatePolicy */ SeparatePolicy .LOG ,
103- /* txNameGenerator */ new TxNameGenerator .Default ()
65+ /* txNameGenerator */ new TxNameGenerator .Default (),
66+ /* tracerFactory */ StdTxManagerTracer .Default ::new
10467 );
10568 }
10669
@@ -191,89 +154,64 @@ public void tx(Runnable runnable) {
191154 @ Override
192155 public <T > T tx (Supplier <T > supplier ) {
193156 TxName txName = txNameGenerator .generate ();
194- String name = txName .name ();
195-
196- checkSeparatePolicy (separatePolicy , txName .logName ());
197-
198- RetryableException lastRetryableException = null ;
199- TxImpl lastTx = null ;
200- try (Timer ignored = totalDuration .labels (name ).startTimer ()) {
201- for (int attempt = 1 ; attempt <= maxAttemptCount ; attempt ++) {
202- try {
203- attempts .labels (name ).observe (attempt );
204- T result ;
205- try (
206- var ignored1 = attemptDuration .labels (name ).startTimer ();
207- var ignored2 = MDC .putCloseable ("tx" , formatTx (txName ));
208- var ignored3 = MDC .putCloseable ("tx-id" , formatTxId ());
209- var ignored4 = MDC .putCloseable ("tx-name" , txName .logName ())
210- ) {
211- RepositoryTransaction transaction = repository .startTransaction (options );
212- lastTx = new TxImpl (name , transaction , options );
213- result = lastTx .run (supplier );
214- }
215157
216- if (options .isDryRun ()) {
217- results .labels (name , "rollback" ).inc ();
218- results .labels (name , "dry_run" ).inc ();
219- } else {
220- results .labels (name , "commit" ).inc ();
221- }
222- return result ;
223- } catch (RetryableException e ) {
224- retries .labels (name , getExceptionNameForMetric (e )).inc ();
225- lastRetryableException = e ;
226- if (attempt + 1 <= maxAttemptCount ) {
227- try {
228- MILLISECONDS .sleep (e .getRetryPolicy ().calcDuration (attempt ).toMillis ());
229- } catch (InterruptedException ex ) {
230- Thread .currentThread ().interrupt ();
231- throw new QueryInterruptedException ("DB query interrupted" , ex );
158+ StdTxManagerTracer tracer = tracerFactory .create (options , txName );
159+
160+ checkSeparatePolicy (separatePolicy , tracer );
161+
162+ AtomicReference <TxImpl > lastTxContainer = new AtomicReference <>(null );
163+ try {
164+ return tracer .wrapTx (() -> {
165+ RetryableException lastRetryableException = null ;
166+ for (int attempt = 1 ; attempt <= maxAttemptCount ; attempt ++) {
167+ try {
168+ return tracer .wrapAttempt (logContext , attempt , () -> {
169+ RepositoryTransaction transaction = repository .startTransaction (options );
170+ var lastTx = new TxImpl (txName .name (), transaction , options );
171+ lastTxContainer .set (lastTx );
172+ return lastTx .run (supplier );
173+ });
174+ } catch (RetryableException e ) {
175+ tracer .onRetry (e );
176+ lastRetryableException = e ;
177+ if (attempt + 1 <= maxAttemptCount ) {
178+ try {
179+ MILLISECONDS .sleep (e .getRetryPolicy ().calcDuration (attempt ).toMillis ());
180+ } catch (InterruptedException ex ) {
181+ Thread .currentThread ().interrupt ();
182+ throw new QueryInterruptedException ("DB query interrupted" , ex );
183+ }
232184 }
185+ } catch (Exception e ) {
186+ tracer .onException ();
187+ throw e ;
233188 }
234- } catch (Exception e ) {
235- results .labels (name , "rollback" ).inc ();
236- throw e ;
237189 }
238- }
239- results .labels (name , "fail" ).inc ();
190+ tracer .onRetryExceeded ();
240191
241- throw requireNonNull (lastRetryableException ).rethrow ();
192+ throw requireNonNull (lastRetryableException ).rethrow ();
193+ });
242194 } finally {
195+ TxImpl lastTx = lastTxContainer .get ();
243196 if (!options .isDryRun () && lastTx != null ) {
244197 lastTx .runDeferredFinally ();
245198 }
246199 }
247200 }
248201
249- private static void checkSeparatePolicy (SeparatePolicy separatePolicy , String txName ) {
202+ private static void checkSeparatePolicy (SeparatePolicy separatePolicy , StdTxManagerTracer tracer ) {
250203 if (!Tx .Current .exists ()) {
251204 return ;
252205 }
253206
254207 switch (separatePolicy ) {
255208 case ALLOW -> {
256209 }
257- case STRICT ->
258- throw new IllegalStateException (format ("Transaction %s was run when another transaction is active" , txName ));
259- case LOG ->
260- log .warn ("Transaction '{}' was run when another transaction is active. Perhaps unexpected behavior. " +
261- "Use TxManager.separate() to avoid this message" , txName );
210+ case STRICT -> throw new IllegalStateException ("Transaction was run when another transaction is active" );
211+ case LOG -> tracer .onLogSeparatePolicy ();
262212 }
263213 }
264214
265- private String getExceptionNameForMetric (RetryableException e ) {
266- return Strings .removeSuffix (e .getClass ().getSimpleName (), "Exception" );
267- }
268-
269- private String formatTx (TxName txName ) {
270- return formatTxId () + " {" + txName .logName () + (logContext != null ? "/" + logContext : "" ) + "}" ;
271- }
272-
273- private String formatTxId () {
274- return Strings .leftPad (Long .toUnsignedString (txLogId , 36 ), 6 , '0' ) + options .getIsolationLevel ().getTxIdSuffix ();
275- }
276-
277215 @ Override
278216 public TxManagerState getState () {
279217 return this ;
@@ -343,7 +281,8 @@ private class ReadonlyBuilderImpl implements ReadonlyBuilder {
343281 @ Override
344282 public ReadonlyBuilder withStatementIsolationLevel (IsolationLevel isolationLevel ) {
345283 Preconditions .checkArgument (isolationLevel .isReadOnly (),
346- "readOnly() can only be used with a read-only tx isolation level, but got: %s" , isolationLevel );
284+ "readOnly() can only be used with a read-only tx isolation level, but got: %s" , isolationLevel
285+ );
347286 return withOptions (options .withIsolationLevel (isolationLevel ));
348287 }
349288
@@ -363,4 +302,9 @@ private enum SeparatePolicy {
363302 LOG ,
364303 STRICT
365304 }
305+
306+ @ FunctionalInterface
307+ public interface TracerFactory {
308+ StdTxManagerTracer create (TxOptions options , TxName txName );
309+ }
366310}
0 commit comments