Skip to content

Commit a1f6226

Browse files
committed
fix(clickhouse): complete virtual-catalog stripping across all overrides
Address reviewer feedback on PR #5826: - Add _strip_virtual_catalog helper to ClickhouseEngineAdapter; apply to delete_from, insert_overwrite_by_partition, and alter_table overrides that previously bypassed both the @set_catalog decorator and the create_schema manual strip — preventing __gateway__ prefix leaking into DELETE/INSERT OVERWRITE/ALTER TABLE SQL sent to ClickHouse - Deduplicate catalog-stripping logic in create_schema via the helper - Rename inject_virtual_catalog param catalog→gateway in base class and align docstring (adapter decides the final catalog name) - Reject virtual_catalog values containing '.' to prevent 4-level FQN - Add unit tests verifying SQL output is free of the virtual catalog prefix for each newly covered method - Add integration test exercising get_default_catalog_per_gateway + FQN uniformity + create_schema stripping end-to-end - Fix clickhouse.md trailing newline Signed-off-by: Michael Day <michael.day@cloudkitchens.com> Signed-off-by: mday-io <mdaytn@gmail.com>
1 parent 33facd8 commit a1f6226

6 files changed

Lines changed: 238 additions & 11 deletions

File tree

docs/integrations/engines/clickhouse.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,4 +486,4 @@ With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytab
486486
| `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N |
487487
| `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N |
488488
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
489-
| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N |
489+
| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N |

sqlmesh/core/config/connection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2127,6 +2127,10 @@ def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]:
21272127
"virtual_catalog cannot be an empty string. "
21282128
"Omit the field to use the default synthetic prefix (__<gateway_name>__)."
21292129
)
2130+
if v is not None and "." in v:
2131+
raise ConfigError(
2132+
f"virtual_catalog must be a single identifier with no dots (got: {v!r})"
2133+
)
21302134
return v
21312135

21322136
@property

sqlmesh/core/engine_adapter/base.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,14 @@ def supports_virtual_catalog(self) -> bool:
229229
"""
230230
return False
231231

232-
def inject_virtual_catalog(self, catalog: str) -> None:
233-
"""Inject a virtual catalog name for multi-gateway nesting alignment.
234-
235-
Only call this on adapters that return True from supports_virtual_catalog(). After
236-
injection, catalog_support should return SINGLE_CATALOG_ONLY so the set_catalog decorator
237-
strips the virtual catalog from DDL expressions instead of raising an error.
232+
def inject_virtual_catalog(self, gateway: str) -> None:
233+
"""Inject a gateway name to configure the adapter's virtual catalog.
234+
235+
The adapter determines the final catalog name from the gateway name (e.g. ClickHouse
236+
wraps it as __{gateway}__). Only call this on adapters that return True from
237+
supports_virtual_catalog(). After injection, catalog_support should return
238+
SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL
239+
expressions instead of raising an error.
238240
"""
239241
raise NotImplementedError(
240242
f"{self.dialect} does not support virtual catalog injection. "

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,7 @@ def create_schema(
208208
f"clickhouse requires that all catalog operations be against a single catalog: "
209209
f"{self._default_catalog}. Provided catalog: {catalog_name}"
210210
)
211-
schema_exp.set("catalog", None)
212-
schema_name = schema_exp
211+
schema_name = self._strip_virtual_catalog(schema_exp)
213212

214213
# can't call super() because it will try to set a catalog
215214
return self._create_schema(
@@ -481,6 +480,7 @@ def insert_overwrite_by_partition(
481480
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
482481
source_columns: t.Optional[t.List[str]] = None,
483482
) -> None:
483+
table_name = self._strip_virtual_catalog(table_name)
484484
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
485485
query_or_df,
486486
target_columns_to_types,
@@ -595,6 +595,20 @@ def _create_table(
595595
target_columns_to_types or self.columns(table_name),
596596
)
597597

598+
def _strip_virtual_catalog(self, name: "TableName") -> exp.Table:
599+
"""Strip the virtual catalog prefix from a table name if present.
600+
601+
When a virtual catalog has been injected, ClickHouse table names carry a
602+
synthetic catalog prefix (e.g. ``__gw__``) so they match the 3-level FQN
603+
depth of catalog-aware peers. This helper removes that prefix before any
604+
SQL is sent to the wire, since ClickHouse only supports a two-level
605+
``[database].[table]`` naming scheme.
606+
"""
607+
table = exp.to_table(name)
608+
if self._default_catalog and table.catalog == self._default_catalog:
609+
table.set("catalog", None)
610+
return table
611+
598612
def _exchange_tables(
599613
self,
600614
old_table_name: TableName,
@@ -633,7 +647,7 @@ def _rename_table(
633647
self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}")
634648

635649
def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None:
636-
delete_expr = exp.delete(table_name, where)
650+
delete_expr = exp.delete(self._strip_virtual_catalog(table_name), where)
637651
if self.engine_run_mode.is_cluster:
638652
delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
639653
self.execute(delete_expr)
@@ -649,6 +663,9 @@ def alter_table(
649663
for alter_expression in [
650664
x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
651665
]:
666+
if self._default_catalog and isinstance(alter_expression.this, exp.Table):
667+
if alter_expression.this.catalog == self._default_catalog:
668+
alter_expression.this.set("catalog", None)
652669
if self.engine_run_mode.is_cluster:
653670
alter_expression.set(
654671
"cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))

tests/core/engine_adapter/test_clickhouse.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,3 +1490,88 @@ def test_clickhouse_connection_config_virtual_catalog_empty_string_rejected():
14901490

14911491
with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"):
14921492
ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog=" ")
1493+
1494+
1495+
def test_virtual_catalog_stripped_in_delete_from(make_mocked_engine_adapter: t.Callable):
1496+
"""delete_from() must strip the virtual catalog prefix before building the DELETE expression."""
1497+
adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter)
1498+
adapter.inject_virtual_catalog("ch_gw")
1499+
assert adapter._default_catalog == "__ch_gw__"
1500+
1501+
adapter.delete_from("__ch_gw__.mydb.my_table", "a = 1")
1502+
1503+
sql_calls = to_sql_calls(adapter)
1504+
assert len(sql_calls) == 1
1505+
assert "__ch_gw__" not in sql_calls[0]
1506+
assert "mydb" in sql_calls[0]
1507+
assert "my_table" in sql_calls[0]
1508+
assert "DELETE FROM" in sql_calls[0]
1509+
1510+
1511+
def test_virtual_catalog_stripped_in_insert_overwrite_by_partition(
1512+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
1513+
):
1514+
"""insert_overwrite_by_partition() must strip the virtual catalog prefix before any SQL is sent."""
1515+
adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter)
1516+
adapter.inject_virtual_catalog("ch_gw")
1517+
assert adapter._default_catalog == "__ch_gw__"
1518+
1519+
# Patch _insert_overwrite_by_condition so we can inspect how table_name was passed.
1520+
overwrite_mock = mocker.patch.object(adapter, "_insert_overwrite_by_condition")
1521+
# Also patch _get_source_queries_and_columns_to_types to avoid needing a real query.
1522+
source_query_mock = mocker.patch.object(
1523+
adapter,
1524+
"_get_source_queries_and_columns_to_types",
1525+
return_value=([], {}),
1526+
)
1527+
1528+
adapter.insert_overwrite_by_partition(
1529+
"__ch_gw__.mydb.my_table",
1530+
parse_one("SELECT 1 AS col"),
1531+
partitioned_by=[exp.column("ds")],
1532+
)
1533+
1534+
# The table_name passed to _insert_overwrite_by_condition must not contain the virtual catalog.
1535+
assert overwrite_mock.called
1536+
table_name_arg = overwrite_mock.call_args[0][0]
1537+
table_name_sql = (
1538+
table_name_arg.sql("clickhouse")
1539+
if isinstance(table_name_arg, exp.Expression)
1540+
else str(table_name_arg)
1541+
)
1542+
assert "__ch_gw__" not in table_name_sql
1543+
1544+
# The target_table passed to _get_source_queries_and_columns_to_types must also be stripped.
1545+
assert source_query_mock.called
1546+
target_table_kwarg = source_query_mock.call_args[1].get(
1547+
"target_table",
1548+
source_query_mock.call_args[0][2] if len(source_query_mock.call_args[0]) > 2 else None,
1549+
)
1550+
if target_table_kwarg is not None:
1551+
target_sql = (
1552+
target_table_kwarg.sql("clickhouse")
1553+
if isinstance(target_table_kwarg, exp.Expression)
1554+
else str(target_table_kwarg)
1555+
)
1556+
assert "__ch_gw__" not in target_sql
1557+
1558+
1559+
def test_virtual_catalog_stripped_in_alter_table(make_mocked_engine_adapter: t.Callable):
1560+
"""alter_table() must strip the virtual catalog prefix from each ALTER TABLE statement."""
1561+
adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter)
1562+
adapter.inject_virtual_catalog("ch_gw")
1563+
assert adapter._default_catalog == "__ch_gw__"
1564+
1565+
alter_expr = exp.Alter(
1566+
this=exp.to_table("__ch_gw__.mydb.my_table"),
1567+
kind="TABLE",
1568+
actions=[exp.Drop(this=exp.to_column("col_a"), kind="COLUMN")],
1569+
)
1570+
adapter.alter_table([alter_expr])
1571+
1572+
sql_calls = to_sql_calls(adapter)
1573+
assert len(sql_calls) == 1
1574+
assert "__ch_gw__" not in sql_calls[0]
1575+
assert "mydb" in sql_calls[0]
1576+
assert "my_table" in sql_calls[0]
1577+
assert "ALTER TABLE" in sql_calls[0]

tests/core/test_context.py

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,10 +468,13 @@ def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker):
468468
)
469469

470470
# Both models must have 3-level FQNs so MappingSchema nesting is uniform.
471+
# count(".") == 2 means 3 parts (catalog.db.table), i.e. a 3-level FQN.
471472
assert duckdb_model.fqn.count(".") == 2, (
472473
f"Expected 3-level FQN for duckdb model, got: {duckdb_model.fqn}"
473474
)
474-
assert ch_model.fqn.count(".") == 2, f"Expected 3-level FQN for ch model, got: {ch_model.fqn}"
475+
assert ch_model.fqn.count(".") == 2, (
476+
f"Expected 3-level FQN for ch model, got: {ch_model.fqn}"
477+
) # 3 parts = 2 dots
475478

476479
# Both models loaded into the same MappingSchema must not raise a nesting SchemaError.
477480
from sqlglot.schema import MappingSchema
@@ -558,6 +561,122 @@ def test_snapshot_evaluator_calls_ensure_virtual_catalog_injection(mocker):
558561
inject_spy.assert_called_once()
559562

560563

564+
@pytest.mark.fast
565+
def test_multi_gateway_virtual_catalog_create_schema_strips_prefix(tmp_path: Path, mocker):
566+
"""Integration test: create_schema with a 3-level virtual-catalog FQN must strip the synthetic
567+
catalog prefix before sending DDL to ClickHouse.
568+
569+
Flow exercised:
570+
1. get_default_catalog_per_gateway detects a catalog-aware gateway (DuckDB) alongside
571+
a catalog-unsupported gateway (ClickHouse) and calls inject_virtual_catalog().
572+
2. The ClickHouse adapter's _default_catalog is set to ``__clickhouse_gw__``.
573+
3. A ClickHouse model loaded with that virtual catalog gets a 3-level FQN.
574+
4. When create_schema is called with the 3-level schema name the virtual catalog prefix
575+
is stripped, so the SQL that reaches the wire uses only a 2-level name.
576+
5. The DuckDB adapter's _default_catalog is NOT set to a synthetic value.
577+
"""
578+
from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig
579+
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
580+
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
581+
from sqlmesh.core.engine_adapter.shared import CatalogSupport
582+
583+
db_path = str(tmp_path / "db.db")
584+
585+
duck_adapter = DuckDBEngineAdapter(
586+
lambda *a, **k: __import__("duckdb").connect(db_path),
587+
dialect="duckdb",
588+
)
589+
590+
# ClickHouse adapter with a mocked connection — no real server needed.
591+
ch_adapter = ClickhouseEngineAdapter(
592+
lambda *a, **k: mocker.NonCallableMock(),
593+
dialect="clickhouse",
594+
)
595+
596+
ctx_mock = mocker.MagicMock()
597+
ctx_mock.engine_adapters = {
598+
"duckdb_gw": duck_adapter,
599+
"clickhouse_gw": ch_adapter,
600+
}
601+
602+
scheduler = BuiltInSchedulerConfig()
603+
catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock)
604+
605+
# --- Phase 1: virtual catalog injection assertions ---
606+
607+
# DuckDB gateway must carry a real catalog entry.
608+
assert "duckdb_gw" in catalog_per_gw
609+
610+
# ClickHouse gateway must receive the synthetic ``__clickhouse_gw__`` virtual catalog.
611+
assert catalog_per_gw["clickhouse_gw"] == "__clickhouse_gw__"
612+
613+
# The ClickHouse adapter's _default_catalog must be mutated.
614+
assert ch_adapter._default_catalog == "__clickhouse_gw__"
615+
616+
# catalog_support must flip to SINGLE_CATALOG_ONLY so the set_catalog decorator strips
617+
# the virtual catalog instead of raising when DDL is executed.
618+
assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY
619+
620+
# DuckDB adapter must be untouched — it already has real catalog support.
621+
assert duck_adapter._default_catalog != "__duckdb_gw__"
622+
623+
# --- Phase 2: FQN uniformity ---
624+
625+
ch_model = load_sql_based_model(
626+
parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"),
627+
default_catalog="__clickhouse_gw__",
628+
)
629+
duckdb_model = load_sql_based_model(
630+
parse("MODEL(name main.duckdb_tbl, kind FULL, gateway duckdb_gw);\nSELECT 1 AS col"),
631+
default_catalog=catalog_per_gw["duckdb_gw"],
632+
)
633+
634+
# Both models must have 3-level FQNs (catalog.db.table → 2 dots) so MappingSchema nesting
635+
# is uniform and does not raise a SchemaError.
636+
assert ch_model.fqn.count(".") == 2, (
637+
f"Expected 3-level FQN for ClickHouse model, got: {ch_model.fqn}"
638+
)
639+
assert duckdb_model.fqn.count(".") == 2, (
640+
f"Expected 3-level FQN for DuckDB model, got: {duckdb_model.fqn}"
641+
)
642+
643+
from sqlglot.schema import MappingSchema
644+
645+
schema = MappingSchema(normalize=False)
646+
schema.add_table(ch_model.fqn, ch_model.columns_to_types or {})
647+
schema.add_table(duckdb_model.fqn, duckdb_model.columns_to_types or {})
648+
649+
# --- Phase 3: create_schema strips the virtual catalog prefix ---
650+
651+
# Spy on _create_schema to inspect what schema name is passed after stripping.
652+
create_schema_calls: t.List[str] = []
653+
654+
def _capture_create_schema(
655+
schema_name,
656+
ignore_if_exists,
657+
warn_on_error,
658+
properties,
659+
kind,
660+
):
661+
create_schema_calls.append(
662+
schema_name.sql(dialect="clickhouse")
663+
if hasattr(schema_name, "sql")
664+
else str(schema_name)
665+
)
666+
667+
mocker.patch.object(ch_adapter, "_create_schema", side_effect=_capture_create_schema)
668+
669+
# Call create_schema with the 3-level virtual-catalog-prefixed schema name.
670+
ch_adapter.create_schema("__clickhouse_gw__.mydb")
671+
672+
assert len(create_schema_calls) == 1, "Expected exactly one _create_schema call"
673+
passed_schema = create_schema_calls[0]
674+
# The virtual catalog prefix must NOT appear in the SQL sent to the wire.
675+
assert "__clickhouse_gw__" not in passed_schema, (
676+
f"Virtual catalog prefix should be stripped before reaching _create_schema, got: {passed_schema!r}"
677+
)
678+
679+
561680
def test_plan_execution_time():
562681
context = Context(config=Config())
563682
context.upsert_model(

0 commit comments

Comments
 (0)