From 7fb24ddff165929b49a0af787517e8d95fd93820 Mon Sep 17 00:00:00 2001 From: vincentsarago <vincent.sarago@gmail.com> Date: Wed, 19 Feb 2025 12:02:53 -0500 Subject: [PATCH 1/3] sketc geoparquet output --- runtimes/eoapi/stac/eoapi/stac/api.py | 3 ++ runtimes/eoapi/stac/eoapi/stac/client.py | 57 ++++++++++++++++++-- runtimes/eoapi/stac/eoapi/stac/extensions.py | 2 +- runtimes/eoapi/stac/pyproject.toml | 1 + 4 files changed, 59 insertions(+), 4 deletions(-) diff --git a/runtimes/eoapi/stac/eoapi/stac/api.py b/runtimes/eoapi/stac/eoapi/stac/api.py index 5a7f05c..9577174 100644 --- a/runtimes/eoapi/stac/eoapi/stac/api.py +++ b/runtimes/eoapi/stac/eoapi/stac/api.py @@ -159,6 +159,7 @@ def register_get_item_collection(self): MimeTypes.html.value: {}, MimeTypes.csv.value: {}, MimeTypes.geojsonseq.value: {}, + MimeTypes.parquet.value: {}, }, "model": api.ItemCollection, }, @@ -191,6 +192,7 @@ def register_get_search(self): MimeTypes.html.value: {}, MimeTypes.csv.value: {}, MimeTypes.geojsonseq.value: {}, + MimeTypes.parquet.value: {}, }, "model": api.ItemCollection, }, @@ -222,6 +224,7 @@ def register_post_search(self): MimeTypes.geojson.value: {}, MimeTypes.csv.value: {}, MimeTypes.geojsonseq.value: {}, + MimeTypes.parquet.value: {}, }, "model": api.ItemCollection, }, diff --git a/runtimes/eoapi/stac/eoapi/stac/client.py b/runtimes/eoapi/stac/eoapi/stac/client.py index 16d4726..63621aa 100644 --- a/runtimes/eoapi/stac/eoapi/stac/client.py +++ b/runtimes/eoapi/stac/eoapi/stac/client.py @@ -1,7 +1,9 @@ """eoapi-devseed: Custom pgstac client.""" import csv +import os import re +import tempfile from typing import ( Any, Dict, @@ -18,6 +20,7 @@ import attr import jinja2 import orjson +import stacrs from fastapi import Request from geojson_pydantic.geometries import parse_geometry_obj from stac_fastapi.api.models import JSONResponse @@ -37,14 +40,14 @@ ) from stac_pydantic.links import Relations from stac_pydantic.shared import BBox, MimeTypes -from starlette.responses import StreamingResponse +from starlette.responses import Response, StreamingResponse from starlette.templating import Jinja2Templates, _TemplateResponse ResponseType = Literal["json", "html"] GeoResponseType = Literal["geojson", "html"] QueryablesResponseType = Literal["jsonschema", "html"] -GeoMultiResponseType = Literal["geojson", "html", "geojsonseq", "csv"] -PostMultiResponseType = Literal["geojson", "geojsonseq", "csv"] +GeoMultiResponseType = Literal["geojson", "html", "geojsonseq", "csv", "parquet"] +PostMultiResponseType = Literal["geojson", "geojsonseq", "csv", "parquet"] jinja2_env = jinja2.Environment( @@ -206,6 +209,24 @@ def items_to_csv_rows(items: Iterable[Dict]) -> Generator[str, None, None]: return _create_csv_rows(rows) +def create_parquet(items: Dict) -> bytes: + """Create parquet binary body.""" + fp = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) + fp.close() + + content = b"" + + try: + stacrs.write(fp.name, items) + with open(fp.name, "rb") as f: + content = f.read() + + finally: + os.remove(fp.name) + + return content + + @attr.s class FiltersClient(PgSTACFiltersClient): async def get_queryables( @@ -529,6 +550,16 @@ async def item_collection( }, ) + elif output_type == MimeTypes.parquet: + return Response( + create_parquet(item_collection), + media_type=MimeTypes.parquet, + headers={ + "Content-Disposition": "attachment;filename=items.parquet", + **additional_headers, + }, + ) + # If we have the `fields` extension enabled # we need to avoid Pydantic validation because the # Items might not be a valid STAC Item objects @@ -671,6 +702,16 @@ async def get_search( }, ) + elif output_type == MimeTypes.parquet: + return Response( + create_parquet(item_collection), + media_type=MimeTypes.parquet, + headers={ + "Content-Disposition": "attachment;filename=items.parquet", + **additional_headers, + }, + ) + if fields := getattr(search_request, "fields", None): if fields.include or fields.exclude: return JSONResponse(item_collection) # type: ignore @@ -726,6 +767,16 @@ async def post_search( }, ) + elif output_type == MimeTypes.parquet: + return Response( + create_parquet(item_collection), + media_type=MimeTypes.parquet, + headers={ + "Content-Disposition": "attachment;filename=items.parquet", + **additional_headers, + }, + ) + if fields := getattr(search_request, "fields", None): if fields.include or fields.exclude: return JSONResponse(item_collection) # type: ignore diff --git a/runtimes/eoapi/stac/eoapi/stac/extensions.py b/runtimes/eoapi/stac/eoapi/stac/extensions.py index 3677457..61de48a 100644 --- a/runtimes/eoapi/stac/eoapi/stac/extensions.py +++ b/runtimes/eoapi/stac/eoapi/stac/extensions.py @@ -136,7 +136,7 @@ class HTMLorGeoGetRequestMulti(APIRequest): """HTML, GeoJSON, GeoJSONSeq or CSV output.""" f: Annotated[ - Optional[Literal["geojson", "html", "csv", "geojsonseq"]], + Optional[Literal["geojson", "html", "csv", "geojsonseq", "parquet"]], Query(description="Response MediaType."), ] = attr.ib(default=None) diff --git a/runtimes/eoapi/stac/pyproject.toml b/runtimes/eoapi/stac/pyproject.toml index d594ee4..1ed20de 100644 --- a/runtimes/eoapi/stac/pyproject.toml +++ b/runtimes/eoapi/stac/pyproject.toml @@ -21,6 +21,7 @@ classifiers = [ dynamic = ["version"] dependencies = [ "stac-fastapi.pgstac>=4.0.2,<4.1", + "stacrs", "jinja2>=2.11.2,<4.0.0", "starlette-cramjam>=0.4,<0.5", "psycopg_pool", From 0e09290a706706dd5fa903730f1c1e5044e3cf43 Mon Sep 17 00:00:00 2001 From: vincentsarago <vincent.sarago@gmail.com> Date: Wed, 19 Feb 2025 15:28:18 -0500 Subject: [PATCH 2/3] ASYNC --- runtimes/eoapi/stac/eoapi/stac/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/runtimes/eoapi/stac/eoapi/stac/client.py b/runtimes/eoapi/stac/eoapi/stac/client.py index 63621aa..82ea94a 100644 --- a/runtimes/eoapi/stac/eoapi/stac/client.py +++ b/runtimes/eoapi/stac/eoapi/stac/client.py @@ -209,7 +209,7 @@ def items_to_csv_rows(items: Iterable[Dict]) -> Generator[str, None, None]: return _create_csv_rows(rows) -def create_parquet(items: Dict) -> bytes: +async def create_parquet(items: Dict) -> bytes: """Create parquet binary body.""" fp = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) fp.close() @@ -217,7 +217,7 @@ def create_parquet(items: Dict) -> bytes: content = b"" try: - stacrs.write(fp.name, items) + await stacrs.write(fp.name, items) with open(fp.name, "rb") as f: content = f.read() @@ -552,7 +552,7 @@ async def item_collection( elif output_type == MimeTypes.parquet: return Response( - create_parquet(item_collection), + await create_parquet(item_collection), media_type=MimeTypes.parquet, headers={ "Content-Disposition": "attachment;filename=items.parquet", @@ -704,7 +704,7 @@ async def get_search( elif output_type == MimeTypes.parquet: return Response( - create_parquet(item_collection), + await create_parquet(item_collection), media_type=MimeTypes.parquet, headers={ "Content-Disposition": "attachment;filename=items.parquet", @@ -769,7 +769,7 @@ async def post_search( elif output_type == MimeTypes.parquet: return Response( - create_parquet(item_collection), + await create_parquet(item_collection), media_type=MimeTypes.parquet, headers={ "Content-Disposition": "attachment;filename=items.parquet", From 23531595016b7ab1e6f3fd3972466e621ac30afe Mon Sep 17 00:00:00 2001 From: vincentsarago <vincent.sarago@gmail.com> Date: Mon, 24 Feb 2025 09:43:47 +0100 Subject: [PATCH 3/3] add example --- examples/stac-stream-response.ipynb | 183 ++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 examples/stac-stream-response.ipynb diff --git a/examples/stac-stream-response.ipynb b/examples/stac-stream-response.ipynb new file mode 100644 index 0000000..1cd4a6a --- /dev/null +++ b/examples/stac-stream-response.ipynb @@ -0,0 +1,183 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 41, + "metadata": {}, + "outputs": [], + "source": [ + "import httpx\n", + "import json\n", + "import re\n", + "\n", + "endpoint = \"https://stac.eoapi.dev\"\n", + "collection = \"openaerialmap\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "200\n", + "Number of Items found: 16157\n" + ] + } + ], + "source": [ + "resp = httpx.get(f\"{endpoint}/search\", params={\"collections\": collection})\n", + "print(\"Number of Items found:\", resp.json()[\"numberMatched\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 43, + "metadata": {}, + "outputs": [], + "source": [ + "# https://github.com/kennethreitz/requests/blob/f5dacf84468ab7e0631cc61a3f1431a32e3e143c/requests/utils.py#L580-L612\n", + "def parse_header_links(value):\n", + " \"\"\"Return a dict of parsed link headers proxies.\n", + "\n", + " i.e. Link: <http:/.../front.jpeg>; rel=front; type=\"image/jpeg\",<http://.../back.jpeg>; rel=back;type=\"image/jpeg\"\n", + "\n", + " \"\"\"\n", + "\n", + " links = []\n", + "\n", + " replace_chars = \" '\\\"\"\n", + "\n", + " for val in re.split(\", *<\", value):\n", + " try:\n", + " url, params = val.split(\";\", 1)\n", + " except ValueError:\n", + " url, params = val, \"\"\n", + "\n", + " link = {}\n", + "\n", + " link[\"url\"] = url.strip(\"<> '\\\"\")\n", + "\n", + " for param in params.split(\";\"):\n", + " try:\n", + " key, value = param.split(\"=\")\n", + " except ValueError:\n", + " break\n", + "\n", + " link[key.strip(replace_chars)] = value.strip(replace_chars)\n", + "\n", + " links.append(link)\n", + "\n", + " return links" + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "metadata": {}, + "outputs": [], + "source": [ + "%timeit\n", + "\n", + "url = f\"{endpoint}/search?collections={collection}&limit=1000\"\n", + "while True:\n", + " resp = httpx.get(url, headers={\"Accept\": \"application/geo+json-seq\"})\n", + " for r in resp.iter_lines():\n", + " _ = json.loads(r)\n", + "\n", + " links = parse_header_links(resp.headers.get(\"link\", \"\"))\n", + " url = next((link[\"url\"] for link in links if link[\"rel\"] == \"next\"), None)\n", + " if not url:\n", + " break" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%timeit\n", + "\n", + "url = f\"{endpoint}/search?collections={collection}&limit=1000\"\n", + "while True:\n", + " resp = httpx.get(url)\n", + " items = resp.json()\n", + "\n", + " links = items[\"links\"]\n", + " url = next((link[\"href\"] for link in links if link[\"rel\"] == \"next\"), None)\n", + " if not url:\n", + " break" + ] + }, + { + "cell_type": "code", + "execution_count": 70, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "605b13e7d75d458eba6f266ec9624272", + "version_major": 2, + "version_minor": 1 + }, + "text/plain": [ + "Map(basemap_style=<CartoBasemap.DarkMatter: 'https://basemaps.cartocdn.com/gl/dark-matter-gl-style/style.json'…" + ] + }, + "execution_count": 70, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from geoarrow.rust.io import read_parquet\n", + "from lonboard import viz\n", + "\n", + "resp = httpx.get(\n", + " \"http://127.0.0.1:8081/search?limit=1000\",\n", + " headers={\"Accept\": \"application/vnd.apache.parquet\"},\n", + ")\n", + "with open(\"items.parquet\", \"wb\") as f:\n", + " f.write(resp.content)\n", + "\n", + "# Example: A GeoParquet file with Polygon or MultiPolygon geometries\n", + "table = read_parquet(\"items.parquet\")\n", + "v = viz(table)\n", + "v" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py312", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}