Skip to content

Commit 795e93f

Browse files
committed
SA20: Add compatibility adapter for psycopg and asyncpg dialects
It introduces the `crate+psycopg://`, `crate+asyncpg://`, and `crate+urllib3://` dialect identifiers. The asynchronous variant of `psycopg` is also supported.
1 parent 7d6d6cc commit 795e93f

File tree

6 files changed

+505
-6
lines changed

6 files changed

+505
-6
lines changed

CHANGES.txt

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ Changes for crate
55
Unreleased
66
==========
77

8+
- SQLAlchemy: Add compatibility adapter for SQLAlchemy's ``psycopg`` and ``asyncpg``
9+
dialects, introducing the ``crate+psycopg://``, ``crate+asyncpg://``, and
10+
``crate+urllib3://`` dialect identifiers. The asynchronous variant of ``psycopg``
11+
is also supported.
812

913
2023/03/02 0.30.1
1014
=================

examples/async_table.py

+191
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
6+
dialect, and exercise a few basic examples using the low-level table API, this
7+
time in asynchronous mode.
8+
9+
Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised.
10+
The corresponding SQLAlchemy dialect identifiers are::
11+
12+
# PostgreSQL protocol on port 5432, using `psycopg`
13+
crate+psycopg://crate@localhost:5432/doc
14+
15+
# PostgreSQL protocol on port 5432, using `asyncpg`
16+
crate+asyncpg://crate@localhost:5432/doc
17+
18+
Synopsis
19+
========
20+
::
21+
22+
# Run CrateDB
23+
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
24+
25+
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
26+
python examples/async_table.py psycopg
27+
28+
# Use PostgreSQL protocol, with `asyncpg`
29+
python examples/async_table.py asyncpg
30+
31+
# Use with both variants
32+
python examples/async_table.py psycopg asyncpg
33+
34+
"""
35+
import asyncio
36+
import sys
37+
import typing as t
38+
from functools import lru_cache
39+
40+
import sqlalchemy as sa
41+
from sqlalchemy.ext.asyncio import create_async_engine
42+
43+
44+
class AsynchronousTableExample:
45+
"""
46+
Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
47+
"""
48+
49+
def __init__(self, dsn: str):
50+
self.dsn = dsn
51+
52+
@property
53+
@lru_cache
54+
def engine(self):
55+
"""
56+
Provide an SQLAlchemy engine object.
57+
"""
58+
return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True)
59+
60+
@property
61+
@lru_cache
62+
def table(self):
63+
"""
64+
Provide an SQLAlchemy table object.
65+
"""
66+
metadata = sa.MetaData()
67+
return sa.Table(
68+
"testdrive",
69+
metadata,
70+
sa.Column("x", sa.Integer, primary_key=True, autoincrement=False),
71+
sa.Column("y", sa.Integer),
72+
)
73+
74+
async def conn_run_sync(self, func: t.Callable, *args, **kwargs):
75+
"""
76+
To support SQLAlchemy DDL methods as well as legacy functions, the
77+
AsyncConnection.run_sync() awaitable method will pass a "sync"
78+
version of the AsyncConnection object to any synchronous method,
79+
where synchronous IO calls will be transparently translated for
80+
await.
81+
82+
https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/basic.html
83+
"""
84+
# `conn` is an instance of `AsyncConnection`
85+
async with self.engine.begin() as conn:
86+
return await conn.run_sync(func, *args, **kwargs)
87+
88+
async def run(self):
89+
"""
90+
Run the whole recipe, returning the result from the "read" step.
91+
"""
92+
await self.create()
93+
await self.insert(sync=True)
94+
return await self.read()
95+
96+
async def create(self):
97+
"""
98+
Create table schema, completely dropping it upfront.
99+
"""
100+
await self.conn_run_sync(self.table.drop, checkfirst=True)
101+
await self.conn_run_sync(self.table.create)
102+
103+
async def insert(self, sync: bool = False):
104+
"""
105+
Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account.
106+
"""
107+
async with self.engine.begin() as conn:
108+
stmt = self.table.insert().values(x=1, y=42)
109+
await conn.execute(stmt)
110+
stmt = self.table.insert().values(x=2, y=42)
111+
await conn.execute(stmt)
112+
if sync and self.dsn.startswith("crate"):
113+
await conn.execute(sa.text("REFRESH TABLE testdrive;"))
114+
115+
async def read(self):
116+
"""
117+
Read data from the database.
118+
"""
119+
async with self.engine.begin() as conn:
120+
cursor = await conn.execute(sa.text("SELECT * FROM testdrive;"))
121+
return cursor.fetchall()
122+
123+
async def reflect(self):
124+
"""
125+
Reflect the table schema from the database.
126+
"""
127+
128+
# Debugging.
129+
# self.trace()
130+
131+
def reflect(session):
132+
"""
133+
A function written in "synchronous" style that will be invoked
134+
within the asyncio event loop.
135+
136+
The session object passed is a traditional orm.Session object with
137+
synchronous interface.
138+
139+
https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/greenlet_orm.html
140+
"""
141+
meta = sa.MetaData()
142+
reflected_table = sa.Table("testdrive", meta, autoload_with=session)
143+
print("Table information:")
144+
print(f"Table: {reflected_table}")
145+
print(f"Columns: {reflected_table.columns}")
146+
print(f"Constraints: {reflected_table.constraints}")
147+
print(f"Primary key: {reflected_table.primary_key}")
148+
149+
return await self.conn_run_sync(reflect)
150+
151+
@staticmethod
152+
def trace():
153+
"""
154+
Trace execution flow through SQLAlchemy.
155+
156+
pip install hunter
157+
"""
158+
from hunter import Q, trace
159+
160+
constraint = Q(module_startswith="sqlalchemy")
161+
trace(constraint)
162+
163+
164+
async def run_example(dsn: str):
165+
example = AsynchronousTableExample(dsn)
166+
167+
# Run a basic conversation.
168+
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
169+
result = await example.run()
170+
print(result)
171+
172+
# Reflect the table schema.
173+
await example.reflect()
174+
175+
176+
def run_drivers(drivers: t.List[str]):
177+
for driver in drivers:
178+
if driver == "psycopg":
179+
dsn = "crate+psycopg://crate@localhost:5432/doc"
180+
elif driver == "asyncpg":
181+
dsn = "crate+asyncpg://crate@localhost:5432/doc"
182+
else:
183+
raise ValueError(f"Unknown driver: {driver}")
184+
185+
asyncio.run(run_example(dsn))
186+
187+
188+
if __name__ == "__main__":
189+
190+
drivers = sys.argv[1:]
191+
run_drivers(drivers)

examples/sync_table.py

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
6+
dialect, and exercise a few basic examples using the low-level table API.
7+
8+
Both the HTTP driver based on `urllib3`, and the PostgreSQL driver based on
9+
`psycopg` are exercised. The corresponding SQLAlchemy dialect identifiers are::
10+
11+
# CrateDB HTTP API on port 4200
12+
crate+urllib3://localhost:4200/doc
13+
14+
# PostgreSQL protocol on port 5432
15+
crate+psycopg://crate@localhost:5432/doc
16+
17+
Synopsis
18+
========
19+
::
20+
21+
# Run CrateDB
22+
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
23+
24+
# Use HTTP API
25+
python examples/sync_table.py urllib3
26+
27+
# Use PostgreSQL protocol
28+
python examples/sync_table.py psycopg
29+
30+
# Use with both variants
31+
python examples/sync_table.py urllib3 psycopg
32+
33+
"""
34+
import sys
35+
import typing as t
36+
from functools import lru_cache
37+
38+
import sqlalchemy as sa
39+
40+
41+
class SynchronousTableExample:
42+
"""
43+
Demonstrate the CrateDB SQLAlchemy dialect with the `urllib3` and `psycopg` drivers.
44+
"""
45+
46+
def __init__(self, dsn: str):
47+
self.dsn = dsn
48+
49+
@property
50+
@lru_cache
51+
def engine(self):
52+
"""
53+
Provide an SQLAlchemy engine object.
54+
"""
55+
return sa.create_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True)
56+
57+
@property
58+
@lru_cache
59+
def table(self):
60+
"""
61+
Provide an SQLAlchemy table object.
62+
"""
63+
metadata = sa.MetaData()
64+
return sa.Table(
65+
"testdrive",
66+
metadata,
67+
# TODO: When omitting `autoincrement`, SA's DDL generator will use `SERIAL`.
68+
# (psycopg.errors.InternalError_) Cannot find data type: serial
69+
# This is probably one more thing to redirect to the CrateDialect.
70+
sa.Column("x", sa.Integer, primary_key=True, autoincrement=False),
71+
sa.Column("y", sa.Integer),
72+
)
73+
74+
def run(self):
75+
"""
76+
Run the whole recipe, returning the result from the "read" step.
77+
"""
78+
self.create()
79+
self.insert(sync=True)
80+
return self.read()
81+
82+
def create(self):
83+
"""
84+
Create table schema, completely dropping it upfront.
85+
"""
86+
self.table.drop(bind=self.engine, checkfirst=True)
87+
self.table.create(bind=self.engine)
88+
89+
def insert(self, sync: bool = False):
90+
"""
91+
Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account.
92+
"""
93+
with self.engine.begin() as session:
94+
stmt = self.table.insert().values(x=1, y=42)
95+
session.execute(stmt)
96+
stmt = self.table.insert().values(x=2, y=42)
97+
session.execute(stmt)
98+
if sync and self.dsn.startswith("crate"):
99+
session.execute(sa.text("REFRESH TABLE testdrive;"))
100+
101+
def read(self):
102+
"""
103+
Read data from the database.
104+
"""
105+
with self.engine.begin() as session:
106+
cursor = session.execute(sa.text("SELECT * FROM testdrive;"))
107+
return cursor.fetchall()
108+
109+
def reflect(self):
110+
"""
111+
Reflect the table schema from the database.
112+
"""
113+
meta = sa.MetaData()
114+
# Debugging.
115+
# self.trace()
116+
reflected_table = sa.Table("testdrive", meta, autoload_with=self.engine)
117+
print("Table information:")
118+
print(f"Table: {reflected_table}")
119+
print(f"Columns: {reflected_table.columns}")
120+
print(f"Constraints: {reflected_table.constraints}")
121+
print(f"Primary key: {reflected_table.primary_key}")
122+
123+
@staticmethod
124+
def trace():
125+
"""
126+
Trace execution flow through SQLAlchemy.
127+
128+
pip install hunter
129+
"""
130+
from hunter import Q, trace
131+
132+
constraint = Q(module_startswith="sqlalchemy")
133+
trace(constraint)
134+
135+
136+
def run_example(dsn: str):
137+
example = SynchronousTableExample(dsn)
138+
139+
# Run a basic conversation.
140+
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
141+
result = example.run()
142+
print(result)
143+
144+
# Reflect the table schema.
145+
# example.reflect()
146+
147+
148+
def run_drivers(drivers: t.List[str]):
149+
for driver in drivers:
150+
if driver == "urllib3":
151+
dsn = "crate+urllib3://localhost:4200/doc"
152+
elif driver == "psycopg":
153+
dsn = "crate+psycopg://crate@localhost:5432/doc"
154+
else:
155+
raise ValueError(f"Unknown driver: {driver}")
156+
157+
run_example(dsn)
158+
159+
160+
if __name__ == "__main__":
161+
162+
drivers = sys.argv[1:]
163+
run_drivers(drivers)

setup.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,19 @@ def read(path):
5555
namespace_packages=['crate'],
5656
entry_points={
5757
'sqlalchemy.dialects': [
58-
'crate = crate.client.sqlalchemy:CrateDialect'
58+
'crate = crate.client.sqlalchemy:CrateDialect',
59+
'crate.urllib3 = crate.client.sqlalchemy.dialect_more:dialect_urllib3',
60+
'crate.psycopg = crate.client.sqlalchemy.dialect_more:dialect_psycopg',
61+
'crate.psycopg_async = crate.client.sqlalchemy.dialect_more:dialect_psycopg_async',
62+
'crate.asyncpg = crate.client.sqlalchemy.dialect_more:dialect_asyncpg',
5963
]
6064
},
6165
install_requires=['urllib3>=1.9,<2'],
6266
extras_require=dict(
6367
sqlalchemy=['sqlalchemy>=1.0,<2.1',
6468
'geojson>=2.5.0,<4',
6569
'backports.zoneinfo<1; python_version<"3.9"'],
70+
postgresql=['sqlalchemy-postgresql-relaxed'],
6671
test=['tox>=3,<5',
6772
'zope.testing>=4,<6',
6873
'zope.testrunner>=5,<6',

0 commit comments

Comments
 (0)