-
Notifications
You must be signed in to change notification settings - Fork 196
/
Copy pathsync_vs_async_v8.py
86 lines (74 loc) · 3.06 KB
/
sync_vs_async_v8.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import requests
import logging
import time
import asyncio
import aiohttp
logging.basicConfig(
format="%(levelname)s @ %(asctime)s : %(message)s",
datefmt="%d.%m.%Y %H:%M:%S",
level=logging.INFO,
handlers=[logging.FileHandler("requests.log", mode="w"), logging.StreamHandler()],
)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_6) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/106.0.0.0 Safari/537.36"
}
BASE_URL = "https://www.canbula.com/prime"
MAX_RETRIES = 3
TIMEOUT = 10
def sync_request(session: requests.Session, n: int) -> dict:
"""Synchronous request with error handling, logging, and retry logic"""
for attempt in range(MAX_RETRIES):
try:
response = session.get(f"{BASE_URL}/{n}", headers=HEADERS, timeout=TIMEOUT)
response.raise_for_status()
logging.info(
f"Request returned for {n} with status code {response.status_code}"
)
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Request for {n} failed: {e} (attempt {attempt + 1})")
time.sleep(2**attempt) # Exponential backoff
logging.error(f"Request for {n} failed after {MAX_RETRIES} attempts")
return {}
async def async_request(session: aiohttp.ClientSession, n: int) -> dict:
"""Asynchronous request with error handling, logging, and retry logic"""
for attempt in range(MAX_RETRIES):
try:
response = await session.get(
f"{BASE_URL}/{n}", headers=HEADERS, timeout=TIMEOUT
)
response.raise_for_status()
logging.info(f"Request returned for {n} with status code {response.status}")
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logging.error(f"Request for {n} failed: {e} (attempt {attempt + 1})")
await asyncio.sleep(2**attempt)
logging.error(f"Request for {n} failed after {MAX_RETRIES} attempts")
return {}
def sync_main(n: int) -> dict:
with requests.Session() as session:
start_time = time.time()
results = [sync_request(session, i) for i in range(1, n + 1)]
total_time = time.time() - start_time
logging.info(f"Time taken for {n} requests: {total_time:.2f} seconds with sync")
for result in results:
logging.info(result)
async def async_main(n: int) -> dict:
async with aiohttp.ClientSession() as session:
start_time = time.time()
tasks = [async_request(session, i) for i in range(1, n + 1)]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
logging.info(
f"Time taken for {n} requests: {total_time:.2f} seconds with async"
)
for result in results:
logging.info(result)
if __name__ == "__main__":
n = 25
logging.info(f"Starting synchronous requests for {n} numbers")
sync_main(n)
logging.info(f"Starting asynchronous requests for {n} numbers")
asyncio.run(async_main(n))