Skip to content

Commit 1f39998

Browse files
fix: ensure _get_data_objects respects snapshots with different schemas (#5515)
Co-authored-by: Themis Valtinos <[email protected]>
1 parent 4c5a6d3 commit 1f39998

File tree

2 files changed

+26
-24
lines changed

2 files changed

+26
-24
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1593,14 +1593,14 @@ def _get_data_objects(
15931593
tables_by_gateway_and_schema: t.Dict[t.Union[str, None], t.Dict[exp.Table, set[str]]] = (
15941594
defaultdict(lambda: defaultdict(set))
15951595
)
1596-
snapshots_by_table_name: t.Dict[str, Snapshot] = {}
1596+
snapshots_by_table_name: t.Dict[exp.Table, t.Dict[str, Snapshot]] = defaultdict(dict)
15971597
for snapshot in target_snapshots:
15981598
if not snapshot.is_model or snapshot.is_symbolic:
15991599
continue
16001600
table = table_name_callable(snapshot)
16011601
table_schema = d.schema_(table.db, catalog=table.catalog)
16021602
tables_by_gateway_and_schema[snapshot.model_gateway][table_schema].add(table.name)
1603-
snapshots_by_table_name[table.name] = snapshot
1603+
snapshots_by_table_name[table_schema][table.name] = snapshot
16041604

16051605
def _get_data_objects_in_schema(
16061606
schema: exp.Table,
@@ -1613,23 +1613,25 @@ def _get_data_objects_in_schema(
16131613
)
16141614

16151615
with self.concurrent_context():
1616-
existing_objects: t.List[DataObject] = []
1616+
snapshot_id_to_obj: t.Dict[SnapshotId, DataObject] = {}
16171617
# A schema can be shared across multiple engines, so we need to group tables by both gateway and schema
16181618
for gateway, tables_by_schema in tables_by_gateway_and_schema.items():
1619-
objs_for_gateway = [
1620-
obj
1621-
for objs in concurrent_apply_to_values(
1622-
list(tables_by_schema),
1623-
lambda s: _get_data_objects_in_schema(
1624-
schema=s, object_names=tables_by_schema.get(s), gateway=gateway
1625-
),
1626-
self.ddl_concurrent_tasks,
1627-
)
1628-
for obj in objs
1629-
]
1630-
existing_objects.extend(objs_for_gateway)
1619+
schema_list = list(tables_by_schema.keys())
1620+
results = concurrent_apply_to_values(
1621+
schema_list,
1622+
lambda s: _get_data_objects_in_schema(
1623+
schema=s, object_names=tables_by_schema.get(s), gateway=gateway
1624+
),
1625+
self.ddl_concurrent_tasks,
1626+
)
1627+
1628+
for schema, objs in zip(schema_list, results):
1629+
snapshots_by_name = snapshots_by_table_name.get(schema, {})
1630+
for obj in objs:
1631+
if obj.name in snapshots_by_name:
1632+
snapshot_id_to_obj[snapshots_by_name[obj.name].snapshot_id] = obj
16311633

1632-
return {snapshots_by_table_name[obj.name].snapshot_id: obj for obj in existing_objects}
1634+
return snapshot_id_to_obj
16331635

16341636

16351637
def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy:

tests/core/test_snapshot_evaluator.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,7 +1418,7 @@ def columns(table_name):
14181418
"get_data_objects",
14191419
return_value=[
14201420
DataObject(
1421-
schema="test_schema",
1421+
schema="sqlmesh__test_schema",
14221422
name=f"test_schema__test_model__{snapshot.version}",
14231423
type="table",
14241424
)
@@ -1500,7 +1500,7 @@ def test_migrate_view(
15001500
"sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects",
15011501
return_value=[
15021502
DataObject(
1503-
schema="test_schema",
1503+
schema="sqlmesh__test_schema",
15041504
name=f"test_schema__test_model__{snapshot.version}",
15051505
type="view",
15061506
)
@@ -1950,7 +1950,7 @@ def columns(table_name):
19501950
"sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects",
19511951
return_value=[
19521952
DataObject(
1953-
schema="test_schema",
1953+
schema="sqlmesh__test_schema",
19541954
name=f"test_schema__test_model__{snapshot.version}",
19551955
type=DataObjectType.TABLE,
19561956
)
@@ -2037,7 +2037,7 @@ def columns(table_name):
20372037
"sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects",
20382038
return_value=[
20392039
DataObject(
2040-
schema="test_schema",
2040+
schema="sqlmesh__test_schema",
20412041
name=f"test_schema__test_model__{snapshot.version}",
20422042
type=DataObjectType.TABLE,
20432043
)
@@ -4016,7 +4016,7 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc
40164016

40174017
adapter_mock.get_data_objects.return_value = [
40184018
DataObject(
4019-
schema="test_schema",
4019+
schema="sqlmesh__db",
40204020
name=f"db__model__{new_snapshot.version}",
40214021
type=DataObjectType.TABLE,
40224022
)
@@ -4154,7 +4154,7 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture):
41544154

41554155
adapter_mock.get_data_objects.return_value = [
41564156
DataObject(
4157-
schema="test_schema",
4157+
schema="sqlmesh__test_schema",
41584158
name=f"test_schema__test_model__{snapshot.version}",
41594159
type=DataObjectType.MANAGED_TABLE,
41604160
)
@@ -4380,12 +4380,12 @@ def columns(table_name):
43804380
"sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects",
43814381
return_value=[
43824382
DataObject(
4383-
schema="test_schema",
4383+
schema="sqlmesh__test_schema",
43844384
name=f"test_schema__test_model__{snapshot_1.version}",
43854385
type=DataObjectType.TABLE,
43864386
),
43874387
DataObject(
4388-
schema="test_schema",
4388+
schema="sqlmesh__test_schema",
43894389
name=f"test_schema__test_model_2__{snapshot_2.version}",
43904390
type=DataObjectType.TABLE,
43914391
),

0 commit comments

Comments
 (0)