Skip to content

Commit 67b2e32

Browse files
amotlmatriv
andcommitted
Add table_kwargs context manager to make pandas/Dask support dialect
Unlock SQLAlchemy ORM's `__table_args__` on the pandas/Dask `to_sql()` interface, in order to support CrateDB's special SQL DDL options. Co-authored-by: Marios Trivyzas <[email protected]>
1 parent 1549fe6 commit 67b2e32

File tree

5 files changed

+157
-1
lines changed

5 files changed

+157
-1
lines changed

CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
- Added re-usable patches and polyfills from application adapters.
1010
New utilities: `patch_autoincrement_timestamp`, `refresh_after_dml`,
1111
`check_uniqueness_factory`
12+
- Added `table_kwargs` context manager to enable pandas/Dask to support
13+
CrateDB dialect table options.
1214

1315
## 2024/06/13 0.37.0
1416
- Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying

docs/support.md

+30
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,36 @@ df.to_sql(
5454
)
5555
```
5656

57+
58+
(support-table-kwargs)=
59+
## Context Manager `table_kwargs`
60+
61+
:::{rubric} Background
62+
:::
63+
CrateDB's special SQL DDL options to support [](inv:crate-reference#partitioned-tables),
64+
[](inv:crate-reference#ddl-sharding), or [](inv:crate-reference#ddl-replication)
65+
sometimes can't be configured easily when SQLAlchemy is wrapped into a 3rd-party
66+
framework like pandas or Dask.
67+
68+
:::{rubric} Utility
69+
:::
70+
The `table_kwargs` utility is a context manager that is able to forward CrateDB's
71+
dialect-specific table creation options to the `sa.Table()` constructor call sites
72+
at runtime.
73+
74+
:::{rubric} Synopsis
75+
:::
76+
Using a context manager incantation like outlined below will render a
77+
`PARTITIONED BY ("time")` SQL clause, without touching the call site of
78+
`sa.Table(...)`.
79+
```python
80+
from sqlalchemy_cratedb.support import table_kwargs
81+
82+
with table_kwargs(crate_partitioned_by="time"):
83+
return df.to_sql(...)
84+
```
85+
86+
5787
(support-autoincrement)=
5888
## Synthetic Autoincrement using Timestamps
5989

src/sqlalchemy_cratedb/support/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from sqlalchemy_cratedb.support.pandas import insert_bulk
1+
from sqlalchemy_cratedb.support.pandas import insert_bulk, table_kwargs
22
from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \
33
patch_autoincrement_timestamp
44
from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty
@@ -10,4 +10,5 @@
1010
refresh_after_dml,
1111
refresh_dirty,
1212
refresh_table,
13+
table_kwargs,
1314
]

src/sqlalchemy_cratedb/support/pandas.py

+49
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,15 @@
1818
# However, if you have executed another commercial license agreement
1919
# with Crate these terms will supersede the license and you may use the
2020
# software solely pursuant to the terms of the relevant commercial agreement.
21+
from contextlib import contextmanager
22+
from typing import Any
23+
from unittest.mock import patch
24+
2125
import logging
2226

27+
import sqlalchemy as sa
28+
29+
from sqlalchemy_cratedb import SA_VERSION, SA_2_0
2330

2431
logger = logging.getLogger(__name__)
2532

@@ -60,3 +67,45 @@ def insert_bulk(pd_table, conn, keys, data_iter):
6067
cursor = conn._dbapi_connection.cursor()
6168
cursor.execute(sql=sql, bulk_parameters=data)
6269
cursor.close()
70+
71+
72+
@contextmanager
73+
def table_kwargs(**kwargs):
74+
"""
75+
Context manager for adding SQLAlchemy dialect-specific table options at runtime.
76+
77+
In certain cases where SQLAlchemy orchestration is implemented within a
78+
framework, like at this spot [1] in pandas' `SQLTable._create_table_setup`,
79+
it is not easily possible to forward SQLAlchemy dialect options at table
80+
creation time.
81+
82+
In order to augment the SQL DDL statement to make it honor database-specific
83+
dialect options, the only way to work around the unfortunate situation is by
84+
monkey-patching the call to `sa.Table()` at runtime, relaying additional
85+
dialect options through corresponding keyword arguments in their original
86+
`<dialect>_<kwarg>` format [2].
87+
88+
[1] https://github.com/pandas-dev/pandas/blob/v2.2.2/pandas/io/sql.py#L1282-L1285
89+
[2] https://docs.sqlalchemy.org/en/20/core/foundation.html#sqlalchemy.sql.base.DialectKWArgs.dialect_kwargs
90+
"""
91+
92+
if SA_VERSION < SA_2_0:
93+
_init_dist = sa.sql.schema.Table._init
94+
95+
def _init(self, name, metadata, *args, **kwargs_effective):
96+
kwargs_effective.update(kwargs)
97+
return _init_dist(self, name, metadata, *args, **kwargs_effective)
98+
99+
with patch("sqlalchemy.sql.schema.Table._init", _init):
100+
yield
101+
102+
else:
103+
new_dist = sa.sql.schema.Table._new
104+
105+
def _new(cls, *args: Any, **kw: Any) -> Any:
106+
kw.update(kwargs)
107+
table = new_dist(cls, *args, **kw)
108+
return table
109+
110+
with patch("sqlalchemy.sql.schema.Table._new", _new):
111+
yield

tests/test_support_pandas.py

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import re
2+
import sys
3+
4+
import pytest
5+
import sqlalchemy as sa
6+
from sqlalchemy.exc import ProgrammingError
7+
from sqlalchemy.orm import sessionmaker
8+
9+
from pueblo.testing.pandas import makeTimeDataFrame
10+
11+
from sqlalchemy_cratedb import SA_VERSION, SA_2_0
12+
from sqlalchemy_cratedb.support.pandas import table_kwargs
13+
14+
TABLE_NAME = "foobar"
15+
INSERT_RECORDS = 42
16+
17+
# Create dataframe, to be used as input data.
18+
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
19+
df["time"] = df.index
20+
21+
22+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier")
23+
@pytest.mark.skipif(SA_VERSION < SA_2_0, reason="Feature not supported on SQLAlchemy 1.4 and earlier")
24+
def test_table_kwargs_partitioned_by(cratedb_service):
25+
"""
26+
Validate adding CrateDB dialect table option `PARTITIONED BY` at runtime.
27+
"""
28+
29+
engine = cratedb_service.database.engine
30+
session = sessionmaker(bind=engine)()
31+
32+
# Insert records from pandas dataframe.
33+
with table_kwargs(crate_partitioned_by="time"):
34+
df.to_sql(
35+
TABLE_NAME,
36+
engine,
37+
if_exists="replace",
38+
index=False,
39+
)
40+
41+
# Synchronize writes.
42+
cratedb_service.database.refresh_table(TABLE_NAME)
43+
44+
# Inquire table cardinality.
45+
metadata = sa.MetaData()
46+
query = sa.select(sa.func.count()).select_from(sa.Table(TABLE_NAME, metadata))
47+
results = session.execute(query)
48+
count = results.scalar()
49+
50+
# Compare outcome.
51+
assert count == INSERT_RECORDS
52+
53+
# Validate SQL DDL.
54+
ddl = cratedb_service.database.run_sql(f"SHOW CREATE TABLE {TABLE_NAME}")
55+
assert 'PARTITIONED BY ("time")' in ddl[0][0]
56+
57+
58+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier")
59+
@pytest.mark.skipif(SA_VERSION < SA_2_0, reason="Feature not supported on SQLAlchemy 1.4 and earlier")
60+
def test_table_kwargs_unknown(cratedb_service):
61+
"""
62+
Validate behaviour when adding an unknown CrateDB dialect table option.
63+
"""
64+
engine = cratedb_service.database.engine
65+
with table_kwargs(crate_unknown_option="'bazqux'"):
66+
with pytest.raises(ProgrammingError) as ex:
67+
df.to_sql(
68+
TABLE_NAME,
69+
engine,
70+
if_exists="replace",
71+
index=False,
72+
)
73+
assert ex.match(re.escape('SQLParseException[Invalid property "unknown_option" '
74+
'passed to [ALTER | CREATE] TABLE statement]'))

0 commit comments

Comments
 (0)