diff --git a/pygeoapi/provider/parquet.py b/pygeoapi/provider/parquet.py index 0f4ab3de1..250a9f737 100644 --- a/pygeoapi/provider/parquet.py +++ b/pygeoapi/provider/parquet.py @@ -38,7 +38,15 @@ import pyarrow.dataset import s3fs -from pygeoapi.crs import crs_transform +try: + from pygeoapi.crs import crs_transform +except ImportError: + # Fallback for older versions of pygeoapi + def crs_transform(func): + """Fallback decorator when crs_transform is not available""" + return func + + from pygeoapi.provider.base import ( BaseProvider, ProviderConnectionError, @@ -83,31 +91,38 @@ def __init__(self, provider_def): super().__init__(provider_def) # Source url is required - self.source = self.data.get('source') + self.source = self.data.get("source") if not self.source: - msg = "Need explicit 'source' attr " \ - "in data field of provider config" + msg = "Need explicit 'source' attr " "in data field of provider config" LOGGER.error(msg) - raise Exception(msg) + raise ProviderConnectionError(msg) # Manage AWS S3 sources - if self.source.startswith('s3'): - self.source = self.source.split('://', 1)[1] - self.fs = s3fs.S3FileSystem(default_cache_type='none') + if self.source.startswith("s3"): + self.source = self.source.split("://", 1)[1] + self.fs = s3fs.S3FileSystem(default_cache_type="none") else: self.fs = None - # Build pyarrow dataset pointing to the data + # Build pyarrow dataset pointing to the data self.ds = pyarrow.dataset.dataset(self.source, filesystem=self.fs) - LOGGER.debug('Grabbing field information') + # Get the geometry column name from config, default to 'geometry' + self.geometry_column = provider_def.get("geometry_column", "geometry") + + LOGGER.debug("Grabbing field information") self.get_fields() # Must be set to visualise queryables - # Column names for bounding box data. - if None in [self.x_field, self.y_field]: - self.has_geometry = False - else: + # Determine if we have geometry data + # Check for x/y fields first (existing behavior), then fall back to geometry column + column_names = self.ds.schema.names + if None not in [self.x_field, self.y_field]: self.has_geometry = True + self.geometry_source = "xy_fields" + LOGGER.debug( + f"Using x/y fields for geometry: {self.x_field}, {self.y_field}" + ) + # Column names for bounding box data. if isinstance(self.x_field, str): self.minx = self.x_field self.maxx = self.x_field @@ -120,13 +135,28 @@ def __init__(self, provider_def): else: self.miny, self.maxy = self.y_field self.bb = [self.minx, self.miny, self.maxx, self.maxy] + elif self.geometry_column in column_names: + self.has_geometry = True + self.geometry_source = "geometry_column" + LOGGER.debug(f"Found geometry column: {self.geometry_column}") + else: + self.has_geometry = False + self.geometry_source = None + LOGGER.debug("No geometry column or x/y fields found") + if self.has_geometry: # Get the CRS of the data - geo_metadata = json.loads(self.ds.schema.metadata[b'geo']) - geom_column = geo_metadata['primary_column'] - # if the CRS is not set default to EPSG:4326, per geoparquet spec - self.crs = (geo_metadata['columns'][geom_column].get('crs') - or 'OGC:CRS84') + try: + geo_metadata = json.loads(self.ds.schema.metadata[b"geo"]) + geom_column = geo_metadata["primary_column"] + # if the CRS is not set default to OGC:CRS84, per geoparquet spec + self.crs = ( + geo_metadata["columns"][geom_column].get("crs") or "OGC:CRS84" + ) + except (KeyError, TypeError, json.JSONDecodeError): + # Fallback if geo metadata is missing or malformed + LOGGER.warning("No valid geo metadata found, defaulting to OGC:CRS84") + self.crs = "OGC:CRS84" def _read_parquet(self, return_scanner=False, **kwargs): """ @@ -150,37 +180,38 @@ def get_fields(self): if not self._fields: - for field_name, field_type in zip(self.ds.schema.names, - self.ds.schema.types): + for field_name, field_type in zip( + self.ds.schema.names, self.ds.schema.types + ): # Geometry is managed as a special case by pygeoapi - if field_name == 'geometry': + if field_name == self.geometry_column: continue field_type = str(field_type) converted_type = None converted_format = None - if field_type.startswith(('int', 'uint')): - converted_type = 'integer' + if field_type.startswith(("int", "uint")): + converted_type = "integer" converted_format = field_type - elif field_type == 'double' or field_type.startswith('float'): - converted_type = 'number' + elif field_type == "double" or field_type.startswith("float"): + converted_type = "number" converted_format = field_type - elif field_type == 'string': - converted_type = 'string' - elif field_type == 'bool': - converted_type = 'boolean' - elif field_type.startswith('timestamp'): - converted_type = 'string' - converted_format = 'date-time' + elif field_type == "string": + converted_type = "string" + elif field_type == "bool": + converted_type = "boolean" + elif field_type.startswith("timestamp"): + converted_type = "string" + converted_format = "date-time" else: - LOGGER.error(f'Unsupported field type {field_type}') + LOGGER.error(f"Unsupported field type {field_type}") if converted_format is None: - self._fields[field_name] = {'type': converted_type} + self._fields[field_name] = {"type": converted_type} else: self._fields[field_name] = { - 'type': converted_type, - 'format': converted_format, + "type": converted_type, + "format": converted_format, } return self._fields @@ -190,7 +221,7 @@ def query( self, offset=0, limit=10, - resulttype='results', + resulttype="results", bbox=[], datetime_=None, properties=[], @@ -219,37 +250,46 @@ def query( if bbox: if self.has_geometry is False: msg = ( - 'Dataset does not have a geometry field, ' - 'querying by bbox is not supported.' + "Dataset does not have a geometry field, " + "querying by bbox is not supported." ) raise ProviderQueryError(msg) - LOGGER.debug('processing bbox parameter') - if any(b is None for b in bbox): - msg = 'Dataset does not support bbox filtering' - raise ProviderQueryError(msg) - minx, miny, maxx, maxy = [float(b) for b in bbox] - filter = ( - (pc.field(self.minx) > pc.scalar(minx)) - & (pc.field(self.miny) > pc.scalar(miny)) - & (pc.field(self.maxx) < pc.scalar(maxx)) - & (pc.field(self.maxy) < pc.scalar(maxy)) - ) + # Check if we have x/y fields for bbox filtering + if hasattr(self, "minx") and hasattr(self, "miny"): + LOGGER.debug("processing bbox parameter with x/y fields") + if any(b is None for b in bbox): + msg = "Dataset does not support bbox filtering" + raise ProviderQueryError(msg) + + minx, miny, maxx, maxy = [float(b) for b in bbox] + filter = ( + (pc.field(self.minx) > pc.scalar(minx)) + & (pc.field(self.miny) > pc.scalar(miny)) + & (pc.field(self.maxx) < pc.scalar(maxx)) + & (pc.field(self.maxy) < pc.scalar(maxy)) + ) + else: + # For geometry columns, bbox filtering is not implemented yet + LOGGER.warning( + "Bbox filtering with geometry column is not yet supported" + ) + # For now, we'll return all features and let client-side filtering handle it if datetime_ is not None: if self.time_field is None: msg = ( - 'Dataset does not have a time field, ' - 'querying by datetime is not supported.' + "Dataset does not have a time field, " + "querying by datetime is not supported." ) raise ProviderQueryError(msg) timefield = pc.field(self.time_field) - if '/' in datetime_: - begin, end = datetime_.split('/') - if begin != '..': + if "/" in datetime_: + begin, end = datetime_.split("/") + if begin != "..": begin = isoparse(begin) filter = filter & (timefield >= begin) - if end != '..': + if end != "..": end = isoparse(end) filter = filter & (timefield <= end) else: @@ -257,7 +297,7 @@ def query( filter = filter & (timefield == target_time) if properties: - LOGGER.debug('processing properties') + LOGGER.debug("processing properties") for name, value in properties: field = self.ds.schema.field(name) pd_type = arrow_to_pandas_type(field.type) @@ -268,25 +308,46 @@ def query( if len(select_properties) == 0: select_properties = self.ds.schema.names else: # Load id and geometry together with any specified columns - if self.has_geometry and 'geometry' not in select_properties: - select_properties.append('geometry') + if self.has_geometry: + if ( + self.geometry_source == "geometry_column" + and self.geometry_column not in select_properties + ): + select_properties.append(self.geometry_column) + elif self.geometry_source == "xy_fields": + # For x/y fields, ensure both x and y are included + if self.x_field not in select_properties: + select_properties.append(self.x_field) + if self.y_field not in select_properties: + select_properties.append(self.y_field) if self.id_field not in select_properties: select_properties.insert(0, self.id_field) if skip_geometry: - select_properties.remove('geometry') + if ( + self.geometry_source == "geometry_column" + and self.geometry_column in select_properties + ): + select_properties.remove(self.geometry_column) + elif self.geometry_source == "xy_fields": + # For x/y fields, we can't really skip geometry since they're separate fields + pass # Make response based on resulttype specified - if resulttype == 'hits': - LOGGER.debug('hits only specified') + if resulttype == "hits": + LOGGER.debug("hits only specified") result = self._response_feature_hits(filter) - elif resulttype == 'results': - LOGGER.debug('results specified') + elif resulttype == "results": + LOGGER.debug("results specified") result = self._response_feature_collection( - filter, offset, limit, columns=select_properties + filter, + offset, + limit, + columns=select_properties, + skip_geometry=skip_geometry, ) else: - LOGGER.error(f'Invalid resulttype: {resulttype}') + LOGGER.error(f"Invalid resulttype: {resulttype}") except RuntimeError as err: LOGGER.error(err) @@ -311,34 +372,65 @@ def get(self, identifier, **kwargs): """ result = None try: - LOGGER.debug(f'Fetching identifier {identifier}') - id_type = arrow_to_pandas_type( - self.ds.schema.field(self.id_field).type) + LOGGER.debug(f"Fetching identifier {identifier}") + id_type = arrow_to_pandas_type(self.ds.schema.field(self.id_field).type) batches = self._read_parquet( - filter=( - pc.field(self.id_field) == pc.scalar(id_type(identifier)) - ) + filter=(pc.field(self.id_field) == pc.scalar(id_type(identifier))) ) for batch in batches: if batch.num_rows > 0: assert ( batch.num_rows == 1 - ), f'Multiple items found with ID {identifier}' + ), f"Multiple items found with ID {identifier}" row = batch.to_pandas() break else: - raise ProviderItemNotFoundError(f'ID {identifier} not found') + raise ProviderItemNotFoundError(f"ID {identifier} not found") + + if not self.has_geometry: + geom = gpd.GeoSeries([None] * len(row), index=row.index) + elif self.geometry_source == "xy_fields": + if self.x_field in row.columns and self.y_field in row.columns: + from shapely.geometry import Point + + geom = gpd.GeoSeries( + [Point(row[self.x_field].iloc[0], row[self.y_field].iloc[0])], + crs=self.crs, + ) + # Remove the existing geometry column if it exists to avoid conflicts + if "geometry" in row.columns: + row = row.drop(columns=["geometry"]) + else: + geom = gpd.GeoSeries([None] * len(row), index=row.index) + elif ( + self.geometry_source == "geometry_column" + and self.geometry_column in row.columns + ): + # Check if geometry is WKT string or binary + geom_data = row[self.geometry_column] + if len(geom_data) > 0 and isinstance(geom_data.iloc[0], str): + # Handle WKT geometry strings + geom = gpd.GeoSeries.from_wkt(geom_data, crs=self.crs) + else: + # Handle binary geometry (WKB) + geom = gpd.GeoSeries.from_wkb(geom_data, crs=self.crs) - if self.has_geometry: - geom = gpd.GeoSeries.from_wkb(row['geometry'], crs=self.crs) + # Rename geometry column to 'geometry' for GeoDataFrame + if self.geometry_column != "geometry": + row = row.drop(columns=[self.geometry_column]) else: - geom = [None] + geom = gpd.GeoSeries([None] * len(row), index=row.index) + + # Ensure geometry has the same index as the row DataFrame + if hasattr(geom, "index") and not geom.index.equals(row.index): + geom.index = row.index + gdf = gpd.GeoDataFrame(row, geometry=geom) - LOGGER.debug('results computed') + LOGGER.debug("results computed") # Grab the collection from geopandas geo_interface - result = gdf.__geo_interface__['features'][0] + result = gdf.__geo_interface__["features"][0] except RuntimeError as err: LOGGER.error(err) @@ -356,10 +448,11 @@ def get(self, identifier, **kwargs): return result def __repr__(self): - return f' {self.data}' + return f" {self.data}" - def _response_feature_collection(self, filter, offset, limit, - columns=None): + def _response_feature_collection( + self, filter, offset, limit, columns=None, skip_geometry=False + ): """ Assembles output from query as GeoJSON FeatureCollection structure. @@ -367,7 +460,7 @@ def _response_feature_collection(self, filter, offset, limit, :returns: GeoJSON FeatureCollection """ - LOGGER.debug(f'offset:{offset}, limit:{limit}') + LOGGER.debug(f"offset:{offset}, limit:{limit}") try: batches, scanner = self._read_parquet( @@ -418,19 +511,70 @@ def _response_feature_collection(self, filter, offset, limit, if len(rp) > limit: rp = rp.iloc[:-1] - if 'geometry' not in rp.columns: - # We need a null geometry column to create a GeoDataFrame - rp['geometry'] = None - geom = gpd.GeoSeries.from_wkb(rp['geometry']) + # Handle different geometry sources + if not self.has_geometry: + # No geometry + rp["geometry"] = None + geom = gpd.GeoSeries.from_wkb(rp["geometry"]) + elif self.geometry_source == "xy_fields": + # Handle x/y fields (existing logic) + if self.x_field in rp.columns and self.y_field in rp.columns: + # Check if geometry should be skipped + if not skip_geometry: + # Create point geometries from x/y coordinates + from shapely.geometry import Point + + geom = gpd.GeoSeries( + [ + Point(x, y) + for x, y in zip(rp[self.x_field], rp[self.y_field]) + ], + crs=self.crs, + ) + else: + # Skip geometry - create null geometry + geom = gpd.GeoSeries([None] * len(rp)) + + # Remove the existing geometry column if it exists to avoid conflicts + if "geometry" in rp.columns: + rp = rp.drop(columns=["geometry"]) + else: + rp["geometry"] = None + geom = gpd.GeoSeries.from_wkb(rp["geometry"]) + elif self.geometry_source == "geometry_column": + # Handle geometry column + if self.geometry_column not in rp.columns: + # We need a null geometry column to create a GeoDataFrame + rp["geometry"] = None + geom = gpd.GeoSeries.from_wkb(rp["geometry"]) + else: + # Check if geometry is WKT string or binary + geom_data = rp[self.geometry_column] + if len(geom_data) > 0 and isinstance(geom_data.iloc[0], str): + # Handle WKT geometry strings + geom = gpd.GeoSeries.from_wkt(geom_data, crs=self.crs) + else: + # Handle binary geometry (WKB) + geom = gpd.GeoSeries.from_wkb(geom_data, crs=self.crs) + + # Rename geometry column to 'geometry' for GeoDataFrame + if self.geometry_column != "geometry": + rp = rp.drop(columns=[self.geometry_column]) else: - geom = gpd.GeoSeries.from_wkb(rp['geometry'], crs=self.crs) + # Fallback - no geometry + rp["geometry"] = None + geom = gpd.GeoSeries.from_wkb(rp["geometry"]) + + # Fix index mismatch - ensure geometry series has same index as DataFrame + geom.index = rp.index gdf = gpd.GeoDataFrame(rp, geometry=geom) - LOGGER.debug('results computed') + + LOGGER.debug("results computed") result = gdf.__geo_interface__ # Add numberMatched to generate "next" link - result['numberMatched'] = number_matched + result["numberMatched"] = number_matched return result @@ -446,12 +590,11 @@ def _response_feature_hits(self, filter): """ try: - scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, - filter=filter) + scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, filter=filter) return { - 'type': 'FeatureCollection', - 'numberMatched': scanner.count_rows(), - 'features': [], + "type": "FeatureCollection", + "numberMatched": scanner.count_rows(), + "features": [], } except Exception as error: LOGGER.error(error)