Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions yente/data/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import asyncio
import aiofiles
from pathlib import Path
from itertools import count
from typing import Any, AsyncGenerator, Optional

from yente import settings
Expand All @@ -13,6 +12,8 @@

log = get_logger(__name__)

MAX_RETRIES = 3


async def load_yaml_url(url: str, auth_token: Optional[str] = None) -> Any:
if url.lower().endswith(".json"):
Expand Down Expand Up @@ -61,19 +62,12 @@ async def read_path_lines(path: Path) -> AsyncGenerator[Any, None]:
async def stream_http_lines(
url: str, auth_token: Optional[str] = None
) -> AsyncGenerator[Any, None]:
for retry in count():
try:
async with httpx_session(auth_token=auth_token) as client:
async with client.stream("GET", url) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
yield orjson.loads(line)
return
except httpx.TransportError as exc:
if retry > 3:
raise
await asyncio.sleep(1.0)
log.error("Streaming index HTTP error: %s, retrying..." % exc)
async with httpx_session(auth_token=auth_token) as client:
async with client.stream("GET", url) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
yield orjson.loads(line)
return


async def load_json_lines(
Expand All @@ -85,16 +79,48 @@ async def load_json_lines(
async for line in read_path_lines(path):
yield line

elif not settings.STREAM_LOAD:
else:
retries = 0
if settings.STREAM_LOAD:
log.info("Streaming data", url=url)
try:
async for line in stream_http_lines(url, auth_token=auth_token):
yield line
# If we've managed to stream all the data, we're done
return
except httpx.HTTPError as e:
log.error(
"Error streaming data, falling back to fetching instead",
url=url,
error=e,
)
retries += 1
# Continue here by falling through to the fetch code
# Note: this isn't really all that correct, the right way (tm) would be to bubble up
# the error to the indexer and then do the right thing there (at least reset the counter).
# But that's more work than I want to do right now and indexing is idempotent anyway.

path = settings.DATA_PATH.joinpath(base_name)
log.info("Fetching data", url=url, path=path.as_posix())
try:
await fetch_url_to_path(url, path, auth_token=auth_token)
async for line in read_path_lines(path):
yield line
finally:
path.unlink(missing_ok=True)
else:
log.info("Streaming data", url=url)
async for line in stream_http_lines(url, auth_token=auth_token):
yield line

while retries < MAX_RETRIES:
try:
await fetch_url_to_path(url, path, auth_token=auth_token)
async for line in read_path_lines(path):
yield line
except httpx.HTTPError as e:
retries += 1
log.error(
f"Error fetching data, this was attempt {retries}/{MAX_RETRIES}",
url=url,
error=e,
)

if retries >= MAX_RETRIES:
raise

await asyncio.sleep(2**retries)
continue

finally:
path.unlink(missing_ok=True)