-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathlayer.py
345 lines (290 loc) · 11.1 KB
/
layer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""timvt models."""
import abc
import json
from dataclasses import dataclass
from typing import Any, ClassVar, Dict, List, Optional
import morecantile
from buildpg import Func, RawDangerous
from buildpg import Var as pg_variable
from buildpg import asyncpg, clauses, funcs, render, select_fields
from pydantic import BaseModel, root_validator
from timvt.dbmodel import Table as DBTable
from timvt.errors import (
InvalidGeometryColumnName,
MissingEPSGCode,
MissingGeometryColumn,
)
from timvt.settings import TileSettings
tile_settings = TileSettings()
class Layer(BaseModel, metaclass=abc.ABCMeta):
"""Layer's Abstract BaseClass.
Attributes:
id (str): Layer's name.
bounds (list): Layer's bounds (left, bottom, right, top).
minzoom (int): Layer's min zoom level.
maxzoom (int): Layer's max zoom level.
default_tms (str): TileMatrixSet name for the min/max zoom.
tileurl (str, optional): Layer's tiles url.
"""
id: str
bounds: List[float] = [-180, -90, 180, 90]
crs: str = "http://www.opengis.net/def/crs/EPSG/0/4326"
title: Optional[str] = None
description: Optional[str] = None
minzoom: int = tile_settings.default_minzoom
maxzoom: int = tile_settings.default_maxzoom
default_tms: str = tile_settings.default_tms
tileurl: Optional[str] = None
@abc.abstractmethod
async def get_tile(
self,
pool: asyncpg.BuildPgPool,
tile: morecantile.Tile,
tms: morecantile.TileMatrixSet,
**kwargs: Any,
) -> bytes:
"""Return Tile Data.
Args:
pool (asyncpg.BuildPgPool): AsyncPG database connection pool.
tile (morecantile.Tile): Tile object with X,Y,Z indices.
tms (morecantile.TileMatrixSet): Tile Matrix Set.
kwargs (any, optiona): Optional parameters to forward to the SQL function.
Returns:
bytes: Mapbox Vector Tiles.
"""
...
class Table(Layer, DBTable):
"""Table Reader.
Attributes:
id (str): Layer's name.
bounds (list): Layer's bounds (left, bottom, right, top).
minzoom (int): Layer's min zoom level.
maxzoom (int): Layer's max zoom level.
tileurl (str, optional): Layer's tiles url.
type (str): Layer's type.
table (str): Table's name.
schema (str): Table's database schema (e.g public).
description (str): Table's description.
id_column (str): name of id column
geometry_columns (list): List of geometry columns.
properties (list): List of property columns.
"""
type: str = "Table"
@root_validator(pre=True)
def bounds_default(cls, values):
"""Get default bounds from the first geometry columns."""
geoms = values.get("geometry_columns")
if geoms:
# Get the Extent of all the bounds
def get_bounds(geom):
bounds = getattr(geom, "bounds", None)
if bounds is None:
bounds = geom["bounds"]
return bounds
minx, miny, maxx, maxy = zip(*[get_bounds(geom) for geom in geoms])
values["bounds"] = [min(minx), min(miny), max(maxx), max(maxy)]
srid = geoms[0]["srid"]
values["crs"] = f"http://www.opengis.net/def/crs/EPSG/0/{srid}"
return values
async def get_tile(
self,
pool: asyncpg.BuildPgPool,
tile: morecantile.Tile,
tms: morecantile.TileMatrixSet,
**kwargs: Any,
):
"""Get Tile Data."""
bbox = tms.xy_bounds(tile)
limit = kwargs.get(
"limit", str(tile_settings.max_features_per_tile)
) # Number of features to write to a tile.
limit = min(int(limit), tile_settings.max_features_per_tile)
if limit == -1:
limit = tile_settings.max_features_per_tile
columns = kwargs.get(
"columns",
) # Comma-seprated list of properties (column's name) to include in the tile
resolution = kwargs.get(
"resolution", str(tile_settings.tile_resolution)
) # Tile's resolution
buffer = kwargs.get(
"buffer", str(tile_settings.tile_buffer)
) # Size of extra data to add for a tile.
if not self.geometry_columns:
raise MissingGeometryColumn(
f"Could not find any geometry column for Table {self.id}"
)
geom = kwargs.get("geom", None)
geometry_column = self.get_geometry_column(geom)
if not geometry_column:
raise InvalidGeometryColumnName(f"Invalid Geometry Column: {geom}.")
geometry_srid = geometry_column.srid
# create list of columns to return
cols = [p.name for p in self.properties if p.name != geometry_column.name]
if columns is not None:
include_cols = [c.strip() for c in columns.split(",")]
cols = [c for c in cols if c in include_cols]
segSize = bbox.right - bbox.left
tms_srid = tms.crs.to_epsg()
tms_proj = tms.crs.to_proj4()
# This may be more valid but it doesn't add quotes around the column names
# _fields = select_fields(*cols)
_fields = [f'"{f}"' for f in cols]
_fields = ", ".join(_fields)
async with pool.acquire() as conn:
sql_query = """
WITH
-- bounds (the tile envelope) in TMS's CRS (SRID)
bounds_tmscrs AS (
SELECT
ST_Segmentize(
ST_MakeEnvelope(
:xmin,
:ymin,
:xmax,
:ymax,
-- If EPSG is null we set it to 0
coalesce(:tms_srid, 0)
),
:seg_size
) AS geom
),
bounds_geomcrs AS (
SELECT
CASE WHEN coalesce(:tms_srid, 0) != 0 THEN
ST_Transform(bounds_tmscrs.geom, :geometry_srid)
ELSE
ST_Transform(bounds_tmscrs.geom, :tms_proj, :geometry_srid)
END as geom
FROM bounds_tmscrs
),
mvtgeom AS (
SELECT ST_AsMVTGeom(
CASE WHEN :tms_srid IS NOT NULL THEN
ST_Transform(t.:geometry_column, :tms_srid)
ELSE
ST_Transform(t.:geometry_column, :tms_proj)
END,
bounds_tmscrs.geom,
:tile_resolution,
:tile_buffer
) AS geom, :fields
FROM :tablename t, bounds_tmscrs, bounds_geomcrs
-- Find where geometries intersect with input Tile
-- Intersects test is made in table geometry's CRS (e.g WGS84)
WHERE ST_Intersects(
t.:geometry_column, bounds_geomcrs.geom
) LIMIT :limit
)
SELECT ST_AsMVT(mvtgeom.*) FROM mvtgeom
"""
q, p = render(
sql_query,
tablename=pg_variable(self.id),
geometry_column=pg_variable(geometry_column.name),
fields=RawDangerous(_fields),
xmin=bbox.left,
ymin=bbox.bottom,
xmax=bbox.right,
ymax=bbox.top,
geometry_srid=funcs.cast(geometry_srid, "int"),
tms_proj=tms_proj,
tms_srid=tms_srid,
seg_size=segSize,
tile_resolution=int(resolution),
tile_buffer=int(buffer),
limit=limit,
)
return await conn.fetchval(q, *p)
class Function(Layer):
"""Function Reader.
Attributes:
id (str): Layer's name.
bounds (list): Layer's bounds (left, bottom, right, top).
minzoom (int): Layer's min zoom level.
maxzoom (int): Layer's max zoom level.
tileurl (str, optional): Layer's tiles url.
type (str): Layer's type.
function_name (str): Nane of the SQL function to call. Defaults to `id`.
sql (str): Valid SQL function which returns Tile data.
options (list, optional): options available for the SQL function.
"""
type: str = "Function"
sql: str
function_name: Optional[str]
options: Optional[List[Dict[str, Any]]] = None
@root_validator(pre=True)
def function_name_default(cls, values):
"""Define default function's name to be same as id."""
function_name = values.get("function_name")
if function_name is None:
values["function_name"] = values.get("id")
return values
@classmethod
def from_file(cls, id: str, infile: str, **kwargs: Any):
"""load sql from file"""
with open(infile) as f:
sql = f.read()
return cls(id=id, sql=sql, **kwargs)
async def get_tile(
self,
pool: asyncpg.BuildPgPool,
tile: morecantile.Tile,
tms: morecantile.TileMatrixSet,
**kwargs: Any,
):
"""Get Tile Data."""
# We only support TMS with valid EPSG code
if not tms.crs.to_epsg():
raise MissingEPSGCode(
f"{tms.identifier}'s CRS does not have a valid EPSG code."
)
bbox = tms.xy_bounds(tile)
async with pool.acquire() as conn:
transaction = conn.transaction()
await transaction.start()
# Register the custom function
await conn.execute(self.sql)
# Build the query
sql_query = clauses.Select(
Func(
self.function_name,
":xmin",
":ymin",
":xmax",
":ymax",
":epsg",
":query_params::text::json",
),
)
q, p = render(
str(sql_query),
xmin=bbox.left,
ymin=bbox.bottom,
xmax=bbox.right,
ymax=bbox.top,
epsg=tms.crs.to_epsg(),
query_params=json.dumps(kwargs),
)
# execute the query
content = await conn.fetchval(q, *p)
# rollback
await transaction.rollback()
return content
@dataclass
class FunctionRegistry:
"""function registry"""
funcs: ClassVar[Dict[str, Function]] = {}
@classmethod
def get(cls, key: str):
"""lookup function by name"""
return cls.funcs.get(key)
@classmethod
def register(cls, *args: Function):
"""register function(s)"""
for func in args:
cls.funcs[func.id] = func
@classmethod
def values(cls):
"""get all values."""
return cls.funcs.values()