Skip to content

Commit cc3090a

Browse files
committed
Refactor: Update Doris engine adapter
1 parent b03fab3 commit cc3090a

File tree

9 files changed

+338
-448
lines changed

9 files changed

+338
-448
lines changed

sqlmesh/cli/project_init.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def _gen_example_objects(schema_name: str, dialect: str) -> ExampleObjects:
207207
name {incremental_model_name},
208208
kind INCREMENTAL_BY_TIME_RANGE (
209209
time_column event_date
210-
{"partition_by_time_column false" if dialect == "doris" else ""}
210+
{",partition_by_time_column false" if dialect == "doris" else ""}
211211
),
212212
start '2020-01-01',
213213
cron '@daily',

sqlmesh/core/config/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2304,7 +2304,7 @@ class DorisConnectionConfig(ConnectionConfig):
23042304
host: str
23052305
user: str
23062306
password: str
2307-
port: t.Optional[int] = 9030 # Default Doris FE port
2307+
port: t.Optional[int] = 9030
23082308
database: t.Optional[str] = None
23092309
charset: t.Optional[str] = None
23102310
collation: t.Optional[str] = None
@@ -2317,7 +2317,7 @@ class DorisConnectionConfig(ConnectionConfig):
23172317
type_: t.Literal["doris"] = Field(alias="type", default="doris")
23182318
DIALECT: t.ClassVar[t.Literal["doris"]] = "doris"
23192319
DISPLAY_NAME: t.ClassVar[t.Literal["Apache Doris"]] = "Apache Doris"
2320-
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17
2320+
DISPLAY_ORDER: t.ClassVar[t.Literal[18]] = 18
23212321

23222322
_engine_import_validator = _get_engine_import_validator("pymysql", "doris")
23232323

sqlmesh/core/engine_adapter/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2658,7 +2658,7 @@ def _get_temp_table_name(self, table: TableName) -> str:
26582658
"""
26592659
table_obj = exp.to_table(table)
26602660
return f"__temp_{table_obj.name}_{random_id(short=True)}"
2661-
2661+
26622662
def _get_data_objects(
26632663
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
26642664
) -> t.List[DataObject]:

sqlmesh/core/engine_adapter/doris.py

Lines changed: 300 additions & 432 deletions
Large diffs are not rendered by default.

sqlmesh/core/snapshot/evaluator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1768,7 +1768,9 @@ def create(
17681768
):
17691769
physical_properties = dict(physical_properties)
17701770
physical_properties["unique_key"] = (
1771-
model.unique_key[0] if len(model.unique_key) == 1 else exp.Tuple(expressions=model.unique_key)
1771+
model.unique_key[0]
1772+
if len(model.unique_key) == 1
1773+
else exp.Tuple(expressions=model.unique_key)
17721774
)
17731775

17741776
logger.info("Creating table '%s'", table_name)

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,7 @@ def test_insert_overwrite_by_time_partition(ctx_query_and_df: TestContext):
824824
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
825825
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
826826
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
827-
827+
828828
ctx.columns_to_types = {"id": "int", "ds": ds_type}
829829
table = ctx.table("test_table")
830830
if ctx.dialect == "bigquery":
@@ -933,7 +933,7 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes
933933
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
934934
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
935935
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
936-
936+
937937
ctx.columns_to_types = {"id": "int", "ds": ds_type}
938938
columns_to_types = {
939939
"id": exp.DataType.build("int"),
@@ -1114,7 +1114,9 @@ def test_merge_source_columns(ctx_query_and_df: TestContext):
11141114
columns_to_types = ctx.columns_to_types.copy()
11151115
columns_to_types["ignored_column"] = exp.DataType.build("int")
11161116

1117-
ctx.engine_adapter.create_table(table, columns_to_types, table_format=table_format, table_properties=table_properties)
1117+
ctx.engine_adapter.create_table(
1118+
table, columns_to_types, table_format=table_format, table_properties=table_properties
1119+
)
11181120
input_data = pd.DataFrame(
11191121
[
11201122
{"id": 1, "ds": "2022-01-01", "ignored_source": "ignored_value"},
@@ -3843,6 +3845,10 @@ def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture):
38433845
pytest.skip(f"Skipping engine {dialect} as it does not support materialized views")
38443846
elif dialect in ("snowflake", "databricks"):
38453847
pytest.skip(f"Skipping {dialect} as they're not enabled on standard accounts")
3848+
elif dialect == "doris":
3849+
pytest.skip(
3850+
f"Skipping doris as synchronous materialized views do not support specifying schema"
3851+
)
38463852

38473853
model_name = ctx.table("test_tbl")
38483854
mview_name = ctx.table("test_mview")

tests/core/engine_adapter/test_doris.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ def test_create_materialized_view(make_mocked_engine_adapter: t.Callable[..., Do
8181
target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
8282
column_descriptions={"a": "test_column_description", "b": "test_column_description"},
8383
)
84-
adapter.create_view("test_view", parse_one("SELECT a, b FROM tbl"), replace=False, materialized=True)
84+
adapter.create_view(
85+
"test_view", parse_one("SELECT a, b FROM tbl"), replace=False, materialized=True
86+
)
8587

8688
assert to_sql_calls(adapter) == [
8789
"CREATE MATERIALIZED VIEW `test_view` (`a` COMMENT 'test_column_description', `b` COMMENT 'test_column_description') AS SELECT `a`, `b` FROM `tbl`",
@@ -160,7 +162,9 @@ def test_rename_table(make_mocked_engine_adapter: t.Callable[..., DorisEngineAda
160162
adapter.cursor.execute.assert_called_once_with("ALTER TABLE `old_table` RENAME `new_table`")
161163

162164

163-
def test_replace_by_key(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], mocker: MockerFixture):
165+
def test_replace_by_key(
166+
make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], mocker: MockerFixture
167+
):
164168
adapter = make_mocked_engine_adapter(DorisEngineAdapter)
165169
temp_table = parse_one("temp_table")
166170
mocker.patch.object(adapter, "_get_temp_table", return_value=temp_table)
@@ -174,8 +178,12 @@ def test_replace_by_key(make_mocked_engine_adapter: t.Callable[..., DorisEngineA
174178

175179
adapter._replace_by_key(target_table, source_query, columns_to_types, [exp.column("a")], True)
176180
adapter._replace_by_key(target_table, source_query, columns_to_types, [exp.column("a")], False)
177-
adapter._replace_by_key(target_table, source_query, columns_to_types, [exp.column("a"), exp.column("b")], True)
178-
adapter._replace_by_key(target_table, source_query, columns_to_types, [exp.column("a"), exp.column("b")], False)
181+
adapter._replace_by_key(
182+
target_table, source_query, columns_to_types, [exp.column("a"), exp.column("b")], True
183+
)
184+
adapter._replace_by_key(
185+
target_table, source_query, columns_to_types, [exp.column("a"), exp.column("b")], False
186+
)
179187

180188
assert to_sql_calls(adapter, identify=True) == [
181189
"INSERT INTO `target_table` (`a`, `b`) SELECT `a`, `b` FROM (SELECT `a` AS `a`, `b` AS `b`, ROW_NUMBER() OVER (PARTITION BY `a` ORDER BY `a`) AS _row_number FROM `temp_table`) AS _t WHERE _row_number = 1",
@@ -189,7 +197,9 @@ def test_create_index(make_mocked_engine_adapter: t.Callable[..., DorisEngineAda
189197
adapter = make_mocked_engine_adapter(DorisEngineAdapter)
190198

191199
adapter.create_index("test_table", "test_index", ("cola",))
192-
adapter.cursor.execute.assert_called_once_with("CREATE INDEX IF NOT EXISTS `test_index` ON `test_table`(`cola`)")
200+
adapter.cursor.execute.assert_called_once_with(
201+
"CREATE INDEX IF NOT EXISTS `test_index` ON `test_table`(`cola`)"
202+
)
193203

194204

195205
def test_create_table_with_distributed_by(
@@ -271,7 +281,11 @@ def test_create_table_with_partitioned_by(
271281
"test_table",
272282
target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("DATE")},
273283
partitioned_by=[exp.to_column("b")],
274-
table_properties={"partitions": exp.Literal.string("FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR")},
284+
table_properties={
285+
"partitions": exp.Literal.string(
286+
"FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR"
287+
)
288+
},
275289
)
276290

277291
assert to_sql_calls(adapter) == [

tests/core/test_connection_config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1797,6 +1797,7 @@ def test_schema_differ_overrides(make_config) -> None:
17971797
assert adapter._schema_differ_overrides == override
17981798
assert adapter.schema_differ.parameterized_type_defaults == {}
17991799

1800+
18001801
def test_doris(make_config):
18011802
"""Test DorisConnectionConfig basic functionality"""
18021803
# Basic configuration
@@ -1818,7 +1819,7 @@ def test_doris(make_config):
18181819
assert config.database == "demo"
18191820
assert config.DIALECT == "doris"
18201821
assert config.DISPLAY_NAME == "Apache Doris"
1821-
assert config.DISPLAY_ORDER == 17
1822+
assert config.DISPLAY_ORDER == 18
18221823
assert config.is_recommended_for_state_sync is False
18231824

18241825
# Test with minimal configuration (using default port)

tests/core/test_snapshot_evaluator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,6 @@ def test_create_view_with_properties(mocker: MockerFixture, adapter_mock, make_s
11851185
"clustered_by": [],
11861186
"partition_interval_unit": None,
11871187
"partitioned_by": [],
1188-
"key": exp.convert("value"), # Physical properties should also be in materialized_properties
11891188
},
11901189
table_description=None,
11911190
replace=False,

0 commit comments

Comments
 (0)