diff --git a/examples/stac-stream-response.ipynb b/examples/stac-stream-response.ipynb new file mode 100644 index 0000000..1cd4a6a --- /dev/null +++ b/examples/stac-stream-response.ipynb @@ -0,0 +1,183 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 41, + "metadata": {}, + "outputs": [], + "source": [ + "import httpx\n", + "import json\n", + "import re\n", + "\n", + "endpoint = \"https://stac.eoapi.dev\"\n", + "collection = \"openaerialmap\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "200\n", + "Number of Items found: 16157\n" + ] + } + ], + "source": [ + "resp = httpx.get(f\"{endpoint}/search\", params={\"collections\": collection})\n", + "print(\"Number of Items found:\", resp.json()[\"numberMatched\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 43, + "metadata": {}, + "outputs": [], + "source": [ + "# https://github.com/kennethreitz/requests/blob/f5dacf84468ab7e0631cc61a3f1431a32e3e143c/requests/utils.py#L580-L612\n", + "def parse_header_links(value):\n", + " \"\"\"Return a dict of parsed link headers proxies.\n", + "\n", + " i.e. Link: ; rel=front; type=\"image/jpeg\",; rel=back;type=\"image/jpeg\"\n", + "\n", + " \"\"\"\n", + "\n", + " links = []\n", + "\n", + " replace_chars = \" '\\\"\"\n", + "\n", + " for val in re.split(\", *<\", value):\n", + " try:\n", + " url, params = val.split(\";\", 1)\n", + " except ValueError:\n", + " url, params = val, \"\"\n", + "\n", + " link = {}\n", + "\n", + " link[\"url\"] = url.strip(\"<> '\\\"\")\n", + "\n", + " for param in params.split(\";\"):\n", + " try:\n", + " key, value = param.split(\"=\")\n", + " except ValueError:\n", + " break\n", + "\n", + " link[key.strip(replace_chars)] = value.strip(replace_chars)\n", + "\n", + " links.append(link)\n", + "\n", + " return links" + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "metadata": {}, + "outputs": [], + "source": [ + "%timeit\n", + "\n", + "url = f\"{endpoint}/search?collections={collection}&limit=1000\"\n", + "while True:\n", + " resp = httpx.get(url, headers={\"Accept\": \"application/geo+json-seq\"})\n", + " for r in resp.iter_lines():\n", + " _ = json.loads(r)\n", + "\n", + " links = parse_header_links(resp.headers.get(\"link\", \"\"))\n", + " url = next((link[\"url\"] for link in links if link[\"rel\"] == \"next\"), None)\n", + " if not url:\n", + " break" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%timeit\n", + "\n", + "url = f\"{endpoint}/search?collections={collection}&limit=1000\"\n", + "while True:\n", + " resp = httpx.get(url)\n", + " items = resp.json()\n", + "\n", + " links = items[\"links\"]\n", + " url = next((link[\"href\"] for link in links if link[\"rel\"] == \"next\"), None)\n", + " if not url:\n", + " break" + ] + }, + { + "cell_type": "code", + "execution_count": 70, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "605b13e7d75d458eba6f266ec9624272", + "version_major": 2, + "version_minor": 1 + }, + "text/plain": [ + "Map(basemap_style= Generator[str, None, None]: return _create_csv_rows(rows) +async def create_parquet(items: Dict) -> bytes: + """Create parquet binary body.""" + fp = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) + fp.close() + + content = b"" + + try: + await stacrs.write(fp.name, items) + with open(fp.name, "rb") as f: + content = f.read() + + finally: + os.remove(fp.name) + + return content + + @attr.s class FiltersClient(PgSTACFiltersClient): async def get_queryables( @@ -529,6 +550,16 @@ async def item_collection( }, ) + elif output_type == MimeTypes.parquet: + return Response( + await create_parquet(item_collection), + media_type=MimeTypes.parquet, + headers={ + "Content-Disposition": "attachment;filename=items.parquet", + **additional_headers, + }, + ) + # If we have the `fields` extension enabled # we need to avoid Pydantic validation because the # Items might not be a valid STAC Item objects @@ -671,6 +702,16 @@ async def get_search( }, ) + elif output_type == MimeTypes.parquet: + return Response( + await create_parquet(item_collection), + media_type=MimeTypes.parquet, + headers={ + "Content-Disposition": "attachment;filename=items.parquet", + **additional_headers, + }, + ) + if fields := getattr(search_request, "fields", None): if fields.include or fields.exclude: return JSONResponse(item_collection) # type: ignore @@ -726,6 +767,16 @@ async def post_search( }, ) + elif output_type == MimeTypes.parquet: + return Response( + await create_parquet(item_collection), + media_type=MimeTypes.parquet, + headers={ + "Content-Disposition": "attachment;filename=items.parquet", + **additional_headers, + }, + ) + if fields := getattr(search_request, "fields", None): if fields.include or fields.exclude: return JSONResponse(item_collection) # type: ignore diff --git a/runtimes/eoapi/stac/eoapi/stac/extensions.py b/runtimes/eoapi/stac/eoapi/stac/extensions.py index 3677457..61de48a 100644 --- a/runtimes/eoapi/stac/eoapi/stac/extensions.py +++ b/runtimes/eoapi/stac/eoapi/stac/extensions.py @@ -136,7 +136,7 @@ class HTMLorGeoGetRequestMulti(APIRequest): """HTML, GeoJSON, GeoJSONSeq or CSV output.""" f: Annotated[ - Optional[Literal["geojson", "html", "csv", "geojsonseq"]], + Optional[Literal["geojson", "html", "csv", "geojsonseq", "parquet"]], Query(description="Response MediaType."), ] = attr.ib(default=None) diff --git a/runtimes/eoapi/stac/pyproject.toml b/runtimes/eoapi/stac/pyproject.toml index d594ee4..1ed20de 100644 --- a/runtimes/eoapi/stac/pyproject.toml +++ b/runtimes/eoapi/stac/pyproject.toml @@ -21,6 +21,7 @@ classifiers = [ dynamic = ["version"] dependencies = [ "stac-fastapi.pgstac>=4.0.2,<4.1", + "stacrs", "jinja2>=2.11.2,<4.0.0", "starlette-cramjam>=0.4,<0.5", "psycopg_pool",