Skip to content

Commit 2b4ccc8

Browse files
committed
Merge branch 'doris' of https://github.com/xinge-ji/sqlmesh into doris
2 parents 589713c + b6f17d6 commit 2b4ccc8

File tree

149 files changed

+1402
-3502
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

149 files changed

+1402
-3502
lines changed

docs/integrations/dbt.md

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ This section describes how to adapt dbt's incremental models to run on sqlmesh a
219219
SQLMesh supports two approaches to implement [idempotent](../concepts/glossary.md#idempotency) incremental loads:
220220

221221
* Using merge (with the sqlmesh [`INCREMENTAL_BY_UNIQUE_KEY` model kind](../concepts/models/model_kinds.md#incremental_by_unique_key))
222-
* Using insert-overwrite/delete+insert (with the sqlmesh [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range))
222+
* Using [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range)
223223

224224
#### Incremental by unique key
225225

@@ -233,28 +233,22 @@ To enable incremental_by_unique_key incrementality, the model configuration shou
233233

234234
#### Incremental by time range
235235

236-
To enable incremental_by_time_range incrementality, the model configuration should contain:
236+
To enable incremental_by_time_range incrementality, the model configuration must contain:
237237

238-
* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)
239238
* The `materialized` key with value `'incremental'`
240-
* Either:
241-
* The `incremental_strategy` key with value `'insert_overwrite'` or
242-
* The `incremental_strategy` key with value `'delete+insert'`
243-
* Note: in this context, these two strategies are synonyms. Regardless of which one is specified SQLMesh will use the [`best incremental strategy`](../concepts/models/model_kinds.md#materialization-strategy) for the target engine.
239+
* The `incremental_strategy` key with the value `incremental_by_time_range`
240+
* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)
244241

245242
### Incremental logic
246243

247-
SQLMesh requires a new jinja block gated by `{% if sqlmesh_incremental is defined %}`. The new block should supersede the existing `{% if is_incremental() %}` block and contain the `WHERE` clause selecting the time interval.
244+
Unlike dbt incremental strategies, SQLMesh does not require the use of `is_incremental` jinja blocks to implement incremental logic.
245+
Instead, SQLMesh provides predefined time macro variables that can be used in the model's SQL to filter data based on the time column.
248246

249247
For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja block gated by `{% if sqlmesh_incremental is defined %}` as follows:
250248

251249
```bash
252-
> {% if sqlmesh_incremental is defined %}
253250
> WHERE
254251
> ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
255-
> {% elif is_incremental() %}
256-
> ; < your existing is_incremental block >
257-
> {% endif %}
258252
```
259253

260254
`{{ start_ds }}` and `{{ end_ds }}` are the jinja equivalents of SQLMesh's `@start_ds` and `@end_ds` predefined time macro variables. See all [predefined time variables](../concepts/macros/macro_variables.md) available in jinja.
@@ -263,13 +257,11 @@ For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja blo
263257

264258
SQLMesh provides configuration parameters that enable control over how incremental computations occur. These parameters are set in the model's `config` block.
265259

266-
The [`batch_size` parameter](../concepts/models/overview.md#batch_size) determines the maximum number of time intervals to run in a single job.
267-
268-
The [`lookback` parameter](../concepts/models/overview.md#lookback) is used to capture late arriving data. It sets the number of units of late arriving data the model should expect and must be a positive integer.
260+
See [Incremental Model Properties](../concepts/models/overview.md#incremental-model-properties) for the full list of incremental model configuration parameters.
269261

270262
**Note:** By default, all incremental dbt models are configured to be [forward-only](../concepts/plans.md#forward-only-plans). However, you can change this behavior by setting the `forward_only: false` setting either in the configuration of an individual model or globally for all models in the `dbt_project.yaml` file. The [forward-only](../concepts/plans.md#forward-only-plans) mode aligns more closely with the typical operation of dbt and therefore better meets user's expectations.
271263

272-
Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default for incremental dbt models unless the time column is specified, or the `allow_partials` parameter is explicitly set to `false` in the model configuration.
264+
Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default unless the `allow_partials` parameter is explicitly set to `false` in the model configuration.
273265

274266
#### on_schema_change
275267

examples/sushi_dbt/models/customer_revenue_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_as_customer_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_revenue_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

pyproject.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.12.0",
27+
"sqlglot[rs]~=27.13.2",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"
@@ -61,7 +61,10 @@ dev = [
6161
"dbt-bigquery",
6262
"dbt-core",
6363
"dbt-duckdb>=1.7.1",
64-
"dbt-snowflake",
64+
# version 1.10.1 of dbt-snowflake declares that it's compatible with dbt-adapters>=1.16 but in reality
65+
# it depends on the 'InvalidCatalogIntegrationConfigError' class that only exists as of dbt-adapters==1.16.6
66+
# so we exclude it to prevent failures and hope that upstream releases a new version with the correct constraint
67+
"dbt-snowflake!=1.10.1",
6568
"dbt-athena-community",
6669
"dbt-clickhouse",
6770
"dbt-databricks",
@@ -108,7 +111,6 @@ duckdb = []
108111
fabric = ["pyodbc>=5.0.0"]
109112
gcppostgres = ["cloud-sql-python-connector[pg8000]>=1.8.0"]
110113
github = ["PyGithub>=2.6.0"]
111-
llm = ["langchain", "openai"]
112114
motherduck = ["duckdb>=1.2.0"]
113115
mssql = ["pymssql"]
114116
mssql-odbc = ["pyodbc>=5.0.0"]
@@ -214,7 +216,6 @@ module = [
214216
"pymssql.*",
215217
"pyodbc.*",
216218
"psycopg2.*",
217-
"langchain.*",
218219
"pytest_lazyfixture.*",
219220
"dbt.adapters.*",
220221
"slack_sdk.*",

sqlmesh/cli/main.py

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,50 +1079,6 @@ def rewrite(obj: Context, sql: str, read: str = "", write: str = "") -> None:
10791079
)
10801080

10811081

1082-
@cli.command("prompt")
1083-
@click.argument("prompt")
1084-
@click.option(
1085-
"-e",
1086-
"--evaluate",
1087-
is_flag=True,
1088-
help="Evaluate the generated SQL query and display the results.",
1089-
)
1090-
@click.option(
1091-
"-t",
1092-
"--temperature",
1093-
type=float,
1094-
help="Sampling temperature. 0.0 - precise and predictable, 0.5 - balanced, 1.0 - creative. Default: 0.7",
1095-
default=0.7,
1096-
)
1097-
@opt.verbose
1098-
@click.pass_context
1099-
@error_handler
1100-
@cli_analytics
1101-
def prompt(
1102-
ctx: click.Context,
1103-
prompt: str,
1104-
evaluate: bool,
1105-
temperature: float,
1106-
verbose: int,
1107-
) -> None:
1108-
"""Uses LLM to generate a SQL query from a prompt."""
1109-
from sqlmesh.integrations.llm import LLMIntegration
1110-
1111-
context = ctx.obj
1112-
1113-
llm_integration = LLMIntegration(
1114-
context.models.values(),
1115-
context.engine_adapter.dialect,
1116-
temperature=temperature,
1117-
verbosity=Verbosity(verbose),
1118-
)
1119-
query = llm_integration.query(prompt)
1120-
1121-
context.console.log_status_update(query)
1122-
if evaluate:
1123-
context.console.log_success(context.fetchdf(query))
1124-
1125-
11261082
@cli.command("clean")
11271083
@click.pass_obj
11281084
@error_handler

sqlmesh/cli/project_init.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ def _gen_config(
114114
rules:
115115
- ambiguousorinvalidcolumn
116116
- invalidselectstarexpansion
117+
- noambiguousprojections
117118
""",
118119
ProjectTemplate.DBT: f"""# --- Virtual Data Environment Mode ---
119120
# Enable Virtual Data Environments (VDE) for *development* environments.

sqlmesh/core/context.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,8 @@ def state_sync(self) -> StateSync:
587587
self._state_sync = self._new_state_sync()
588588

589589
if self._state_sync.get_versions(validate=False).schema_version == 0:
590-
self._state_sync.migrate(default_catalog=self.default_catalog)
590+
self.console.log_status_update("Initializing new project state...")
591+
self._state_sync.migrate()
591592
self._state_sync.get_versions()
592593
self._state_sync = CachingStateSync(self._state_sync) # type: ignore
593594
return self._state_sync
@@ -2355,7 +2356,6 @@ def migrate(self) -> None:
23552356
self._load_materializations()
23562357
try:
23572358
self._new_state_sync().migrate(
2358-
default_catalog=self.default_catalog,
23592359
promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
23602360
)
23612361
except Exception as e:
@@ -3130,7 +3130,9 @@ def lint_models(
31303130
found_error = False
31313131

31323132
model_list = (
3133-
list(self.get_model(model) for model in models) if models else self.models.values()
3133+
list(self.get_model(model, raise_if_missing=True) for model in models)
3134+
if models
3135+
else self.models.values()
31343136
)
31353137
all_violations = []
31363138
for model in model_list:

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,19 @@ def _create_catalog(self, catalog_name: exp.Identifier) -> None:
121121
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
122122
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
123123
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
124+
current_catalog = self.get_current_catalog()
124125

125126
logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
126127
self.api_client.delete_warehouse(warehouse_name)
127128

129+
if warehouse_name == current_catalog:
130+
# Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist
131+
# In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog"
132+
# So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads
133+
# that use an either use an existing connection pointing to this warehouse or trigger a new connection
134+
# will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data
135+
self.close()
136+
128137
def set_current_catalog(self, catalog_name: str) -> None:
129138
"""
130139
Set the current catalog for Microsoft Fabric connections.

0 commit comments

Comments
 (0)