44
55import asyncio
66from contextlib import suppress
7- from datetime import datetime , timedelta , timezone
7+ from datetime import timedelta
88from itertools import chain , repeat
99from typing import TYPE_CHECKING , TypeVar , cast
1010from unittest .mock import Mock
1515from crawlee ._autoscaling ._types import LoadRatioInfo , SystemInfo
1616from crawlee ._types import ConcurrencySettings
1717from crawlee ._utils .time import measure_time
18+ from tests .unit .utils import wait_for_condition
1819
1920if TYPE_CHECKING :
2021 from collections .abc import Awaitable
2122
23+ T = TypeVar ('T' )
24+
2225
2326@pytest .fixture
2427def system_status () -> SystemStatus | Mock :
2528 return Mock (spec = SystemStatus )
2629
2730
28- T = TypeVar ('T' )
29-
30-
3131def future (value : T , / ) -> Awaitable [T ]:
3232 f = asyncio .Future [T ]()
3333 f .set_result (value )
@@ -145,10 +145,6 @@ async def run() -> None:
145145 await pool .run ()
146146
147147
148- @pytest .mark .flaky (
149- rerun = 3 ,
150- reason = 'Test is flaky on Windows and MacOS, see https://github.com/apify/crawlee-python/issues/1655.' ,
151- )
152148async def test_autoscales (
153149 monkeypatch : pytest .MonkeyPatch ,
154150 system_status : SystemStatus | Mock ,
@@ -160,7 +156,7 @@ async def run() -> None:
160156 nonlocal done_count
161157 done_count += 1
162158
163- start = datetime . now ( timezone . utc )
159+ overload_active = False
164160
165161 def get_historical_system_info () -> SystemInfo :
166162 result = SystemInfo (
@@ -170,8 +166,7 @@ def get_historical_system_info() -> SystemInfo:
170166 client_info = LoadRatioInfo (limit_ratio = 0.9 , actual_ratio = 0.3 ),
171167 )
172168
173- # 0.5 seconds after the start of the test, pretend the CPU became overloaded
174- if result .created_at - start >= timedelta (seconds = 0.5 ):
169+ if overload_active :
175170 result .cpu_info = LoadRatioInfo (limit_ratio = 0.9 , actual_ratio = 1.0 )
176171
177172 return result
@@ -196,24 +191,21 @@ def get_historical_system_info() -> SystemInfo:
196191 pool_run_task = asyncio .create_task (pool .run (), name = 'pool run task' )
197192
198193 try :
199- # After 0.2s, there should be an increase in concurrency
200- await asyncio .sleep (0.2 )
201- assert pool .desired_concurrency > 1
194+ # Wait until concurrency scales up above 1.
195+ await wait_for_condition (lambda : pool .desired_concurrency > 1 , timeout = 5.0 )
202196
203- # After 0.5s, the concurrency should reach max concurrency
204- await asyncio .sleep (0.3 )
205- assert pool .desired_concurrency == 4
197+ # Wait until concurrency reaches maximum.
198+ await wait_for_condition (lambda : pool .desired_concurrency == 4 , timeout = 5.0 )
206199
207- # The concurrency should guarantee completion of more than 10 tasks ( a single worker would complete ~5)
208- assert done_count > 10
200+ # Multiple concurrent workers should have completed more tasks than a single worker could.
201+ await wait_for_condition ( lambda : done_count > 10 , timeout = 5.0 )
209202
210- # After 0.7s, the pretend overload should have kicked in and there should be a drop in desired concurrency
211- await asyncio . sleep ( 0.2 )
212- assert pool .desired_concurrency < 4
203+ # Simulate CPU overload and wait for the pool to scale down.
204+ overload_active = True
205+ await wait_for_condition ( lambda : pool .desired_concurrency < 4 , timeout = 5.0 )
213206
214- # After a full second, the pool should scale down all the way to 1
215- await asyncio .sleep (0.3 )
216- assert pool .desired_concurrency == 1
207+ # Wait until the pool scales all the way down to minimum.
208+ await wait_for_condition (lambda : pool .desired_concurrency == 1 , timeout = 5.0 )
217209 finally :
218210 pool_run_task .cancel ()
219211 with suppress (asyncio .CancelledError ):
0 commit comments