From a47762d925889e680e13e0bda6388a905abc9e29 Mon Sep 17 00:00:00 2001 From: Leon Handreke Date: Mon, 25 Aug 2025 17:09:51 +0200 Subject: [PATCH] loader: Fall through to fetch if stream fails and retry fetch See https://github.com/opensanctions/yente/pull/848 --- yente/data/loader.py | 76 +++++++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/yente/data/loader.py b/yente/data/loader.py index 9d9e6f49..e785e56b 100644 --- a/yente/data/loader.py +++ b/yente/data/loader.py @@ -4,7 +4,6 @@ import asyncio import aiofiles from pathlib import Path -from itertools import count from typing import Any, AsyncGenerator, Optional from yente import settings @@ -13,6 +12,8 @@ log = get_logger(__name__) +MAX_RETRIES = 3 + async def load_yaml_url(url: str, auth_token: Optional[str] = None) -> Any: if url.lower().endswith(".json"): @@ -61,19 +62,12 @@ async def read_path_lines(path: Path) -> AsyncGenerator[Any, None]: async def stream_http_lines( url: str, auth_token: Optional[str] = None ) -> AsyncGenerator[Any, None]: - for retry in count(): - try: - async with httpx_session(auth_token=auth_token) as client: - async with client.stream("GET", url) as resp: - resp.raise_for_status() - async for line in resp.aiter_lines(): - yield orjson.loads(line) - return - except httpx.TransportError as exc: - if retry > 3: - raise - await asyncio.sleep(1.0) - log.error("Streaming index HTTP error: %s, retrying..." % exc) + async with httpx_session(auth_token=auth_token) as client: + async with client.stream("GET", url) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + yield orjson.loads(line) + return async def load_json_lines( @@ -85,16 +79,48 @@ async def load_json_lines( async for line in read_path_lines(path): yield line - elif not settings.STREAM_LOAD: + else: + retries = 0 + if settings.STREAM_LOAD: + log.info("Streaming data", url=url) + try: + async for line in stream_http_lines(url, auth_token=auth_token): + yield line + # If we've managed to stream all the data, we're done + return + except httpx.HTTPError as e: + log.error( + "Error streaming data, falling back to fetching instead", + url=url, + error=e, + ) + retries += 1 + # Continue here by falling through to the fetch code + # Note: this isn't really all that correct, the right way (tm) would be to bubble up + # the error to the indexer and then do the right thing there (at least reset the counter). + # But that's more work than I want to do right now and indexing is idempotent anyway. + path = settings.DATA_PATH.joinpath(base_name) log.info("Fetching data", url=url, path=path.as_posix()) - try: - await fetch_url_to_path(url, path, auth_token=auth_token) - async for line in read_path_lines(path): - yield line - finally: - path.unlink(missing_ok=True) - else: - log.info("Streaming data", url=url) - async for line in stream_http_lines(url, auth_token=auth_token): - yield line + + while retries < MAX_RETRIES: + try: + await fetch_url_to_path(url, path, auth_token=auth_token) + async for line in read_path_lines(path): + yield line + except httpx.HTTPError as e: + retries += 1 + log.error( + f"Error fetching data, this was attempt {retries}/{MAX_RETRIES}", + url=url, + error=e, + ) + + if retries >= MAX_RETRIES: + raise + + await asyncio.sleep(2**retries) + continue + + finally: + path.unlink(missing_ok=True)