Skip to content

Commit 387b19f

Browse files
committed
1 parent 795e93f commit 387b19f

File tree

2 files changed

+175
-0
lines changed

2 files changed

+175
-0
lines changed

CHANGES.txt

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ Unreleased
99
dialects, introducing the ``crate+psycopg://``, ``crate+asyncpg://``, and
1010
``crate+urllib3://`` dialect identifiers. The asynchronous variant of ``psycopg``
1111
is also supported.
12+
- SQLAlchemy: Add example demonstrating asynchronous streaming mode, using server-side
13+
cursors
14+
1215

1316
2023/03/02 0.30.1
1417
=================

examples/async_streaming.py

+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
6+
dialect, and exercise a few basic examples using the low-level table API, this
7+
time in asynchronous mode.
8+
9+
Specific to the asynchronous mode of SQLAlchemy is the streaming of results:
10+
11+
> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()`
12+
> method that returns an `AsyncResult` object. This result object uses a server-side cursor
13+
> and provides an async/await API, such as an async iterator.
14+
>
15+
> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
16+
17+
Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised.
18+
The corresponding SQLAlchemy dialect identifiers are::
19+
20+
# PostgreSQL protocol on port 5432, using `psycopg`
21+
crate+psycopg://crate@localhost:5432/doc
22+
23+
# PostgreSQL protocol on port 5432, using `asyncpg`
24+
crate+asyncpg://crate@localhost:5432/doc
25+
26+
Synopsis
27+
========
28+
::
29+
30+
# Run CrateDB
31+
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
32+
33+
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
34+
python examples/async_streaming.py psycopg
35+
36+
# Use PostgreSQL protocol, with `asyncpg`
37+
python examples/async_streaming.py asyncpg
38+
39+
# Use with both variants
40+
python examples/async_streaming.py psycopg asyncpg
41+
42+
Bugs
43+
====
44+
45+
When using the `psycopg` driver, the program currently croaks like::
46+
47+
sqlalchemy.exc.InternalError: (psycopg.errors.InternalError_) Cannot find portal: c_10479c0a0_1
48+
49+
"""
50+
import asyncio
51+
import sys
52+
import typing as t
53+
from functools import lru_cache
54+
55+
import sqlalchemy as sa
56+
from sqlalchemy.ext.asyncio import create_async_engine
57+
58+
metadata = sa.MetaData()
59+
table = sa.Table(
60+
"t1",
61+
metadata,
62+
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
63+
sa.Column("name", sa.String),
64+
)
65+
66+
67+
class AsynchronousTableStreamingExample:
68+
"""
69+
Demonstrate reading streamed results when using the CrateDB SQLAlchemy
70+
dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
71+
72+
- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
73+
- https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/basic.html
74+
"""
75+
76+
def __init__(self, dsn: str):
77+
self.dsn = dsn
78+
79+
@property
80+
@lru_cache
81+
def engine(self):
82+
"""
83+
Provide an SQLAlchemy engine object.
84+
"""
85+
return create_async_engine(self.dsn, echo=True)
86+
87+
async def run(self):
88+
"""
89+
Run the whole recipe.
90+
"""
91+
await self.create_and_insert()
92+
await self.read_buffered()
93+
await self.read_streaming()
94+
95+
async def create_and_insert(self):
96+
"""
97+
Create table schema, completely dropping it upfront, and insert a few records.
98+
"""
99+
# conn is an instance of AsyncConnection
100+
async with self.engine.begin() as conn:
101+
# to support SQLAlchemy DDL methods as well as legacy functions, the
102+
# AsyncConnection.run_sync() awaitable method will pass a "sync"
103+
# version of the AsyncConnection object to any synchronous method,
104+
# where synchronous IO calls will be transparently translated for
105+
# await.
106+
await conn.run_sync(metadata.drop_all, checkfirst=True)
107+
await conn.run_sync(metadata.create_all)
108+
109+
# for normal statement execution, a traditional "await execute()"
110+
# pattern is used.
111+
await conn.execute(
112+
table.insert(),
113+
[{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}],
114+
)
115+
116+
# CrateDB specifics to flush/synchronize the write operation.
117+
await conn.execute(sa.text("REFRESH TABLE t1;"))
118+
119+
async def read_buffered(self):
120+
"""
121+
Read data from the database, in buffered mode.
122+
"""
123+
async with self.engine.connect() as conn:
124+
# the default result object is the
125+
# sqlalchemy.engine.Result object
126+
result = await conn.execute(table.select())
127+
128+
# the results are buffered so no await call is necessary
129+
# for this case.
130+
print(result.fetchall())
131+
132+
async def read_streaming(self):
133+
"""
134+
Read data from the database, in streaming mode.
135+
"""
136+
async with self.engine.connect() as conn:
137+
138+
# for a streaming result that buffers only segments of the
139+
# result at time, the AsyncConnection.stream() method is used.
140+
# this returns a sqlalchemy.ext.asyncio.AsyncResult object.
141+
async_result = await conn.stream(table.select())
142+
143+
# this object supports async iteration and awaitable
144+
# versions of methods like .all(), fetchmany(), etc.
145+
async for row in async_result:
146+
print(row)
147+
148+
149+
async def run_example(dsn: str):
150+
example = AsynchronousTableStreamingExample(dsn)
151+
152+
# Run a basic conversation.
153+
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
154+
await example.run()
155+
156+
157+
def run_drivers(drivers: t.List[str]):
158+
for driver in drivers:
159+
if driver == "psycopg":
160+
dsn = "crate+psycopg://crate@localhost:5432/doc"
161+
elif driver == "asyncpg":
162+
dsn = "crate+asyncpg://crate@localhost:5432/doc"
163+
else:
164+
raise ValueError(f"Unknown driver: {driver}")
165+
166+
asyncio.run(run_example(dsn))
167+
168+
169+
if __name__ == "__main__":
170+
171+
drivers = sys.argv[1:]
172+
run_drivers(drivers)

0 commit comments

Comments
 (0)