Skip to content

Commit 10e8728

Browse files
committed
Python/SQLAlchemy: Demonstrate support for asyncpg and psycopg
The `sqlalchemy-cratedb` package supports the vanilla HTTP-based transport using urllib3, and the standard PostgreSQL drivers `asyncpg` and `psycopg`.
1 parent 3f15707 commit 10e8728

File tree

6 files changed

+563
-7
lines changed

6 files changed

+563
-7
lines changed

by-language/python-sqlalchemy/README.rst

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ Navigate to example program directory, and install prerequisites::
7979
Examples
8080
********
8181

82-
Run example programs::
83-
84-
# Connect to CrateDB on localhost.
82+
The ``insert`` example programs are about efficient data loading::
8583

8684
time python insert_efficient.py cratedb multirow
8785
time python insert_efficient.py cratedb batched
@@ -92,15 +90,30 @@ Run example programs::
9290

9391
time python insert_dask.py
9492

95-
Use ``insert_pandas.py`` to connect to any other database instance::
93+
The ``sync`` and ``async`` example programs demonstrate SQLAlchemy's
94+
low-level table/core API using both the HTTP-based transport driver
95+
using ``urllib3``, as well as the canonical ``asyncpg`` and ``psycopg3``
96+
drivers using the PostgreSQL wire protocol::
97+
98+
time python sync_table.py urllib3 psycopg
99+
time python async_table.py asyncpg psycopg
100+
time python async_streaming.py asyncpg psycopg
101+
102+
Connect to CrateDB Cloud
103+
========================
104+
105+
By default, the example programs will connect to CrateDB on ``localhost``.
106+
In order to connect to any other database instance, for example on CrateDB
107+
Cloud::
96108

97109
export DBURI="crate://crate@localhost:4200/"
98110
export DBURI="crate://admin:<PASSWORD>@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true"
99111
time python insert_pandas.py --dburi="${DBURI}"
100112

101113
.. TIP::
102114

103-
For more information, please refer to the header sections of each of the provided example programs.
115+
For more information, please refer to the header sections of each of the
116+
provided example programs.
104117

105118

106119
*****
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
6+
dialect, and exercise a few basic examples using the low-level table API, in
7+
asynchronous mode.
8+
9+
Specific to the asynchronous mode of SQLAlchemy is the streaming of results:
10+
11+
> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()`
12+
> method that returns an `AsyncResult` object. This result object uses a server-side cursor
13+
> and provides an async/await API, such as an async iterator.
14+
>
15+
> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
16+
17+
Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used.
18+
The corresponding SQLAlchemy dialect identifiers are::
19+
20+
# PostgreSQL protocol on port 5432, using `asyncpg`
21+
crate+asyncpg://crate@localhost:5432/doc
22+
23+
# PostgreSQL protocol on port 5432, using `psycopg`
24+
crate+psycopg://crate@localhost:5432/doc
25+
26+
27+
Synopsis
28+
========
29+
::
30+
31+
# Run CrateDB
32+
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
33+
34+
# Use PostgreSQL protocol, with `asyncpg`
35+
python async_streaming.py asyncpg
36+
37+
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
38+
python async_streaming.py psycopg
39+
40+
# Use with both variants
41+
python async_streaming.py asyncpg psycopg
42+
43+
"""
44+
import asyncio
45+
import sys
46+
import typing as t
47+
from functools import lru_cache
48+
49+
import sqlalchemy as sa
50+
from sqlalchemy.ext.asyncio import create_async_engine
51+
52+
metadata = sa.MetaData()
53+
table = sa.Table(
54+
"t1",
55+
metadata,
56+
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
57+
sa.Column("name", sa.String),
58+
)
59+
60+
61+
class AsynchronousTableStreamingExample:
62+
"""
63+
Demonstrate reading streamed results when using the CrateDB SQLAlchemy
64+
dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
65+
66+
- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
67+
- https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html
68+
"""
69+
70+
def __init__(self, dsn: str):
71+
self.dsn = dsn
72+
73+
@property
74+
@lru_cache
75+
def engine(self):
76+
"""
77+
Provide an SQLAlchemy engine object.
78+
"""
79+
return create_async_engine(self.dsn, echo=True)
80+
81+
async def run(self):
82+
"""
83+
Run the whole recipe.
84+
"""
85+
await self.create_and_insert()
86+
await self.read_buffered()
87+
await self.read_streaming()
88+
89+
async def create_and_insert(self):
90+
"""
91+
Create table schema, completely dropping it upfront, and insert a few records.
92+
"""
93+
# conn is an instance of AsyncConnection
94+
async with self.engine.begin() as conn:
95+
# to support SQLAlchemy DDL methods as well as legacy functions, the
96+
# AsyncConnection.run_sync() awaitable method will pass a "sync"
97+
# version of the AsyncConnection object to any synchronous method,
98+
# where synchronous IO calls will be transparently translated for
99+
# await.
100+
await conn.run_sync(metadata.drop_all, checkfirst=True)
101+
await conn.run_sync(metadata.create_all)
102+
103+
# for normal statement execution, a traditional "await execute()"
104+
# pattern is used.
105+
await conn.execute(
106+
table.insert(),
107+
[{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}],
108+
)
109+
110+
# CrateDB specifics to flush/synchronize the write operation.
111+
await conn.execute(sa.text("REFRESH TABLE t1;"))
112+
113+
async def read_buffered(self):
114+
"""
115+
Read data from the database, in buffered mode.
116+
"""
117+
async with self.engine.connect() as conn:
118+
# the default result object is the
119+
# sqlalchemy.engine.Result object
120+
result = await conn.execute(table.select())
121+
122+
# the results are buffered so no await call is necessary
123+
# for this case.
124+
print(result.fetchall())
125+
126+
async def read_streaming(self):
127+
"""
128+
Read data from the database, in streaming mode.
129+
"""
130+
async with self.engine.connect() as conn:
131+
132+
# for a streaming result that buffers only segments of the
133+
# result at time, the AsyncConnection.stream() method is used.
134+
# this returns a sqlalchemy.ext.asyncio.AsyncResult object.
135+
async_result = await conn.stream(table.select())
136+
137+
# this object supports async iteration and awaitable
138+
# versions of methods like .all(), fetchmany(), etc.
139+
async for row in async_result:
140+
print(row)
141+
142+
143+
async def run_example(dsn: str):
144+
example = AsynchronousTableStreamingExample(dsn)
145+
146+
# Run a basic conversation.
147+
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
148+
await example.run()
149+
150+
151+
def run_drivers(drivers: t.List[str]):
152+
for driver in drivers:
153+
if driver == "asyncpg":
154+
dsn = "crate+asyncpg://crate@localhost:5432/doc"
155+
elif driver == "psycopg":
156+
dsn = "crate+psycopg://crate@localhost:5432/doc"
157+
else:
158+
raise ValueError(f"Unknown driver: {driver}")
159+
160+
asyncio.run(run_example(dsn))
161+
162+
163+
if __name__ == "__main__":
164+
165+
drivers = sys.argv[1:]
166+
if not drivers:
167+
raise ValueError("Please select driver")
168+
run_drivers(drivers)

0 commit comments

Comments
 (0)