@@ -17,6 +17,7 @@ use graph::prelude::{
1717use graph:: prelude:: { tokio, MetricsRegistry } ;
1818use graph:: slog:: warn;
1919use graph:: util:: timed_rw_lock:: TimedMutex ;
20+ use tokio:: sync:: OwnedSemaphorePermit ;
2021
2122use std:: fmt:: { self } ;
2223use std:: ops:: { Deref , DerefMut } ;
@@ -48,7 +49,7 @@ pub type AsyncPgConnection = deadpool::managed::Object<ConnectionManager>;
4849/// backpressure to prevent pool exhaustion during mass operations.
4950pub struct PermittedConnection {
5051 conn : AsyncPgConnection ,
51- _permit : tokio :: sync :: OwnedSemaphorePermit ,
52+ _permit : OwnedSemaphorePermit ,
5253}
5354
5455impl Deref for PermittedConnection {
@@ -571,6 +572,10 @@ impl PoolInner {
571572 ///
572573 /// If `timeouts` is `None`, the default pool timeouts are used.
573574 ///
575+ /// The `prev_wait` duration is the time already spent waiting for a
576+ /// permit to get a connection; that time is added to the total wait
577+ /// time recorded.
578+ ///
574579 /// On error, returns `StoreError::DatabaseUnavailable` and marks the
575580 /// pool as unavailable if we can tell that the error is due to the pool
576581 /// being closed. Returns `StoreError::StatementTimeout` if the error is
@@ -579,13 +584,14 @@ impl PoolInner {
579584 & self ,
580585 pool : & AsyncPool ,
581586 timeouts : Option < Timeouts > ,
587+ prev_wait : Duration ,
582588 ) -> Result < AsyncPgConnection , StoreError > {
583589 let start = Instant :: now ( ) ;
584590 let res = match timeouts {
585591 Some ( timeouts) => pool. timeout_get ( & timeouts) . await ,
586592 None => pool. get ( ) . await ,
587593 } ;
588- let elapsed = start. elapsed ( ) ;
594+ let elapsed = start. elapsed ( ) + prev_wait ;
589595 self . wait_meter . add_conn_wait_time ( elapsed) ;
590596 match res {
591597 Ok ( conn) => {
@@ -610,7 +616,7 @@ impl PoolInner {
610616 }
611617
612618 async fn get ( & self ) -> Result < AsyncPgConnection , StoreError > {
613- self . get_from_pool ( & self . pool , None ) . await
619+ self . get_from_pool ( & self . pool , None , Duration :: ZERO ) . await
614620 }
615621
616622 /// Get the pool for fdw connections. It is an error if none is configured
@@ -644,7 +650,7 @@ impl PoolInner {
644650 {
645651 let pool = self . fdw_pool ( logger) ?;
646652 loop {
647- match self . get_from_pool ( & pool, None ) . await {
653+ match self . get_from_pool ( & pool, None , Duration :: ZERO ) . await {
648654 Ok ( conn) => return Ok ( conn) ,
649655 Err ( e) => {
650656 if timeout ( ) {
@@ -671,7 +677,10 @@ impl PoolInner {
671677 create : None ,
672678 recycle : None ,
673679 } ;
674- let Ok ( conn) = self . get_from_pool ( fdw_pool, Some ( timeouts) ) . await else {
680+ let Ok ( conn) = self
681+ . get_from_pool ( fdw_pool, Some ( timeouts) , Duration :: ZERO )
682+ . await
683+ else {
675684 return None ;
676685 } ;
677686 Some ( conn)
@@ -708,7 +717,7 @@ impl PoolInner {
708717 )
709718 }
710719
711- pub ( crate ) async fn query_permit ( & self ) -> tokio :: sync :: OwnedSemaphorePermit {
720+ pub ( crate ) async fn query_permit ( & self ) -> OwnedSemaphorePermit {
712721 let start = Instant :: now ( ) ;
713722 let permit = self . query_semaphore . cheap_clone ( ) . acquire_owned ( ) . await ;
714723 self . semaphore_wait_stats
@@ -721,23 +730,24 @@ impl PoolInner {
721730 /// Acquire a permit for indexing operations. This provides backpressure
722731 /// to prevent connection pool exhaustion during mass subgraph startup
723732 /// or high write load.
724- async fn indexing_permit ( & self ) -> tokio :: sync :: OwnedSemaphorePermit {
733+ async fn indexing_permit ( & self ) -> ( OwnedSemaphorePermit , Duration ) {
725734 let start = Instant :: now ( ) ;
726735 let permit = self . indexing_semaphore . cheap_clone ( ) . acquire_owned ( ) . await ;
736+ let elapsed = start. elapsed ( ) ;
727737 self . indexing_semaphore_wait_stats
728738 . write ( )
729739 . unwrap ( )
730- . add_and_register ( start . elapsed ( ) , & self . indexing_semaphore_wait_gauge ) ;
731- permit. unwrap ( )
740+ . add_and_register ( elapsed, & self . indexing_semaphore_wait_gauge ) ;
741+ ( permit. unwrap ( ) , elapsed )
732742 }
733743
734744 /// Get a connection with backpressure via semaphore permit. Use this
735745 /// for indexing operations to prevent pool exhaustion. This method will
736746 /// wait indefinitely until a permit, and with that, a connection is
737747 /// available.
738748 pub ( crate ) async fn get_permitted ( & self ) -> Result < PermittedConnection , StoreError > {
739- let permit = self . indexing_permit ( ) . await ;
740- let conn = self . get ( ) . await ?;
749+ let ( permit, permit_wait ) = self . indexing_permit ( ) . await ;
750+ let conn = self . get_from_pool ( & self . pool , None , permit_wait ) . await ?;
741751 Ok ( PermittedConnection {
742752 conn,
743753 _permit : permit,
0 commit comments