@@ -108,15 +108,24 @@ class DynamoDBOnlineStore(OnlineStore):
108108 Attributes:
109109 _dynamodb_client: Boto3 DynamoDB client.
110110 _dynamodb_resource: Boto3 DynamoDB resource.
111+ _aioboto_session: Async boto session.
112+ _aioboto_client: Async boto client.
113+ _aioboto_context_stack: Async context stack.
111114 """
112115
113116 _dynamodb_client = None
114117 _dynamodb_resource = None
115118
119+ def __init__ (self ):
120+ super ().__init__ ()
121+ self ._aioboto_session = None
122+ self ._aioboto_client = None
123+ self ._aioboto_context_stack = None
124+
116125 async def initialize (self , config : RepoConfig ):
117126 online_config = config .online_store
118127
119- await _get_aiodynamodb_client (
128+ await self . _get_aiodynamodb_client (
120129 online_config .region ,
121130 online_config .max_pool_connections ,
122131 online_config .keepalive_timeout ,
@@ -127,7 +136,59 @@ async def initialize(self, config: RepoConfig):
127136 )
128137
129138 async def close (self ):
130- await _aiodynamodb_close ()
139+ await self ._aiodynamodb_close ()
140+
141+ def _get_aioboto_session (self ):
142+ if self ._aioboto_session is None :
143+ logger .debug ("initializing the aiobotocore session" )
144+ self ._aioboto_session = session .get_session ()
145+ return self ._aioboto_session
146+
147+ async def _get_aiodynamodb_client (
148+ self ,
149+ region : str ,
150+ max_pool_connections : int ,
151+ keepalive_timeout : float ,
152+ connect_timeout : Union [int , float ],
153+ read_timeout : Union [int , float ],
154+ total_max_retry_attempts : Union [int , None ],
155+ retry_mode : Union [Literal ["legacy" , "standard" , "adaptive" ], None ],
156+ ):
157+ if self ._aioboto_client is None :
158+ logger .debug ("initializing the aiobotocore dynamodb client" )
159+
160+ retries : Dict [str , Any ] = {}
161+ if total_max_retry_attempts is not None :
162+ retries ["total_max_attempts" ] = total_max_retry_attempts
163+ if retry_mode is not None :
164+ retries ["mode" ] = retry_mode
165+
166+ client_context = self ._get_aioboto_session ().create_client (
167+ "dynamodb" ,
168+ region_name = region ,
169+ config = AioConfig (
170+ max_pool_connections = max_pool_connections ,
171+ connect_timeout = connect_timeout ,
172+ read_timeout = read_timeout ,
173+ retries = retries if retries else None ,
174+ connector_args = {"keepalive_timeout" : keepalive_timeout },
175+ ),
176+ )
177+ self ._aioboto_context_stack = contextlib .AsyncExitStack ()
178+ self ._aioboto_client = await self ._aioboto_context_stack .enter_async_context (
179+ client_context
180+ )
181+ return self ._aioboto_client
182+
183+ async def _aiodynamodb_close (self ):
184+ if self ._aioboto_client :
185+ await self ._aioboto_client .close ()
186+ self ._aioboto_client = None
187+ if self ._aioboto_context_stack :
188+ await self ._aioboto_context_stack .aclose ()
189+ self ._aioboto_context_stack = None
190+ if self ._aioboto_session :
191+ self ._aioboto_session = None
131192
132193 @property
133194 def async_supported (self ) -> SupportedAsyncMethods :
@@ -362,7 +423,7 @@ async def online_write_batch_async(
362423 _to_client_write_item (config , entity_key , features , timestamp )
363424 for entity_key , features , timestamp , _ in _latest_data_to_write (data )
364425 ]
365- client = await _get_aiodynamodb_client (
426+ client = await self . _get_aiodynamodb_client (
366427 online_config .region ,
367428 online_config .max_pool_connections ,
368429 online_config .keepalive_timeout ,
@@ -473,7 +534,7 @@ def to_tbl_resp(raw_client_response):
473534 batches .append (batch )
474535 entity_id_batches .append (entity_id_batch )
475536
476- client = await _get_aiodynamodb_client (
537+ client = await self . _get_aiodynamodb_client (
477538 online_config .region ,
478539 online_config .max_pool_connections ,
479540 online_config .keepalive_timeout ,
@@ -627,66 +688,7 @@ def _to_client_batch_get_payload(online_config, table_name, batch):
627688 }
628689
629690
630- _aioboto_session = None
631- _aioboto_client = None
632- _aioboto_context_stack = None
633-
634-
635- def _get_aioboto_session ():
636- global _aioboto_session
637- if _aioboto_session is None :
638- logger .debug ("initializing the aiobotocore session" )
639- _aioboto_session = session .get_session ()
640- return _aioboto_session
641-
642-
643- async def _get_aiodynamodb_client (
644- region : str ,
645- max_pool_connections : int ,
646- keepalive_timeout : float ,
647- connect_timeout : Union [int , float ],
648- read_timeout : Union [int , float ],
649- total_max_retry_attempts : Union [int , None ],
650- retry_mode : Union [Literal ["legacy" , "standard" , "adaptive" ], None ],
651- ):
652- global _aioboto_client , _aioboto_context_stack
653- if _aioboto_client is None :
654- logger .debug ("initializing the aiobotocore dynamodb client" )
655-
656- retries : Dict [str , Any ] = {}
657- if total_max_retry_attempts is not None :
658- retries ["total_max_attempts" ] = total_max_retry_attempts
659- if retry_mode is not None :
660- retries ["mode" ] = retry_mode
661-
662- client_context = _get_aioboto_session ().create_client (
663- "dynamodb" ,
664- region_name = region ,
665- config = AioConfig (
666- max_pool_connections = max_pool_connections ,
667- connect_timeout = connect_timeout ,
668- read_timeout = read_timeout ,
669- retries = retries if retries else None ,
670- connector_args = {"keepalive_timeout" : keepalive_timeout },
671- ),
672- )
673- _aioboto_context_stack = contextlib .AsyncExitStack ()
674- _aioboto_client = await _aioboto_context_stack .enter_async_context (
675- client_context
676- )
677- return _aioboto_client
678-
679-
680- async def _aiodynamodb_close ():
681- global _aioboto_client , _aioboto_session , _aioboto_context_stack
682- if _aioboto_client :
683- await _aioboto_client .close ()
684- _aioboto_client = None
685- if _aioboto_context_stack :
686- await _aioboto_context_stack .aclose ()
687- _aioboto_context_stack = None
688- if _aioboto_session :
689- _aioboto_session = None
691+ # Global async client functions removed - now using instance methods
690692
691693
692694def _initialize_dynamodb_client (
0 commit comments