-
Notifications
You must be signed in to change notification settings - Fork 400
feat: Dataset.write()
#3092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
feat: Dataset.write()
#3092
Changes from all commits
69e4f0a
73bc8a3
3f38932
f519ac3
2c08e88
2dabe8b
5ff2804
adbed56
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1128,3 +1128,14 @@ def __getstate__(self) -> Any: | |
| del state["naming"] | ||
| del state["data_item_normalizer"] | ||
| return state | ||
|
|
||
| def __eq__(self, other: Any) -> bool: | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @djudjuu make sure to exclude this code. After research, this would be a significant breaking change. It would change the behavior of schema1 = ...
schema2 = ...
schema1 == schema2
set([schema1, schema2])
{schema1: "foo", schema2: "bar"}Also, it's an invalid (you can ask an LLM for details) |
||
| if not isinstance(other, Schema): | ||
| raise NotImplementedError( | ||
| f"Equality between `dlt.Schema` object and {type(other).__name__} is not supported." | ||
| ) | ||
|
|
||
| return self.version_hash == other.version_hash | ||
|
|
||
| def __hash__(self) -> int: | ||
| return hash(self.version_hash) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from contextlib import contextmanager | ||
| import tempfile | ||
| from types import TracebackType | ||
| from typing import Any, Optional, Type, Union, TYPE_CHECKING, Literal, overload | ||
| from typing import Any, Generator, Optional, Type, Union, TYPE_CHECKING, Literal, overload | ||
|
|
||
| from sqlglot.schema import Schema as SQLGlotSchema | ||
| import sqlglot.expressions as sge | ||
|
|
@@ -12,21 +14,27 @@ | |
| from dlt.common.json import json | ||
| from dlt.common.destination.reference import AnyDestination, TDestinationReferenceArg, Destination | ||
| from dlt.common.destination.client import JobClientBase, SupportsOpenTables, WithStateSync | ||
| from dlt.common.schema import Schema | ||
| from dlt.common.typing import Self | ||
| from dlt.common.schema.typing import C_DLT_LOAD_ID | ||
| from dlt.common.typing import Self, TDataItems | ||
| from dlt.common.schema.typing import C_DLT_LOAD_ID, TWriteDisposition | ||
| from dlt.common.pipeline import LoadInfo | ||
| from dlt.common.utils import simple_repr, without_none | ||
| from dlt.destinations.sql_client import SqlClientBase, WithSqlClient | ||
| from dlt.dataset import lineage | ||
| from dlt.dataset.utils import get_destination_clients | ||
| from dlt.destinations.queries import build_row_counts_expr | ||
| from dlt.common.destination.exceptions import SqlClientNotAvailable | ||
| from dlt.common.schema.exceptions import ( | ||
| TableNotFound, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ibis import ir | ||
| from ibis import BaseBackend as IbisBackend | ||
|
|
||
|
|
||
| _INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE = "_dlt_dataset_{dataset_name}" | ||
|
|
||
|
|
||
| class Dataset: | ||
| """Access to dataframes and arrow tables in the destination dataset via dbapi""" | ||
|
|
||
|
|
@@ -170,6 +178,62 @@ def is_same_physical_destination(self, other: dlt.Dataset) -> bool: | |
| """ | ||
| return is_same_physical_destination(self, other) | ||
|
|
||
| # TODO explain users can inspect `_dlt_loads` table to differentiate data originating | ||
| # from `pipeline.run()` or `dataset.write()` | ||
| @contextmanager | ||
| def write_pipeline(self) -> Generator[dlt.Pipeline, None, None]: | ||
| """Get the internal pipeline used by `Dataset.write()`. | ||
| It uses "_dlt_dataset_{dataset_name}" as pipeline name. Its working directory is | ||
| so that load packages can be inspected after a run, but is cleared before each write. | ||
|
|
||
| """ | ||
| pipeline = _get_internal_pipeline( | ||
| dataset_name=self.dataset_name, destination=self._destination | ||
| ) | ||
| yield pipeline | ||
|
|
||
| def write( | ||
| self, | ||
| data: TDataItems, | ||
| *, | ||
| table_name: str, | ||
| ) -> LoadInfo: | ||
| """Write `data` to the specified table. | ||
|
|
||
| This method uses a full-on `dlt.Pipeline` internally. You can retrieve this pipeline | ||
| using `Dataset.get_write_pipeline()` for complete flexibility. | ||
| The resulting load packages can be inspected in the pipeline's working directory which is | ||
| named "_dlt_dataset_{dataset_name}". | ||
| This directory will be wiped before each `write()` call. | ||
| """ | ||
| with self.write_pipeline() as internal_pipeline: | ||
| # drop all load packages from previous writes | ||
| # internal_pipeline._wipe_working_folder() | ||
| internal_pipeline.drop() | ||
|
|
||
| # get write dispostion for existing table from schema (or "append" if table is new) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about other resource level hints that make sure that the merging is done appropriately, e.g. see the docs here I don't think those will automatically be taken from the schema (easy to test though) |
||
| try: | ||
| write_disposition = self.schema.get_table(table_name)["write_disposition"] | ||
| except TableNotFound: | ||
| write_disposition = "append" | ||
| # TODO should we try/except this run to gracefully handle failed writes? | ||
| info = internal_pipeline.run( | ||
| data, | ||
| dataset_name=self.dataset_name, | ||
| table_name=table_name, | ||
| schema=self.schema, | ||
| write_disposition=write_disposition, | ||
| ) | ||
|
|
||
| # maybe update the dataset schema | ||
| self._update_schema(internal_pipeline.default_schema) | ||
| return info | ||
|
|
||
| def _update_schema(self, new_schema: dlt.Schema) -> None: | ||
| """Update the dataset schema""" | ||
| # todo: verify if we need to purge any cached objects (eg. sql_client) | ||
| self._schema = new_schema | ||
|
|
||
| def query( | ||
| self, | ||
| query: Union[str, sge.Select, ir.Expr], | ||
|
|
@@ -387,7 +451,7 @@ def __str__(self) -> str: | |
| def dataset( | ||
| destination: TDestinationReferenceArg, | ||
| dataset_name: str, | ||
| schema: Union[Schema, str, None] = None, | ||
| schema: Union[dlt.Schema, str, None] = None, | ||
| ) -> Dataset: | ||
| return Dataset(destination, dataset_name, schema) | ||
|
|
||
|
|
@@ -451,3 +515,22 @@ def _get_dataset_schema_from_destination_using_dataset_name( | |
| schema = dlt.Schema.from_stored_schema(json.loads(stored_schema.schema)) | ||
|
|
||
| return schema | ||
|
|
||
|
|
||
| def _get_internal_pipeline( | ||
| dataset_name: str, | ||
| destination: TDestinationReferenceArg, | ||
| pipelines_dir: str = None, | ||
| ) -> dlt.Pipeline: | ||
| """Setup the internal pipeline used by `Dataset.write()`""" | ||
| pipeline = dlt.pipeline( | ||
| pipeline_name=_INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE.format(dataset_name=dataset_name), | ||
| dataset_name=dataset_name, | ||
| destination=destination, | ||
| pipelines_dir=pipelines_dir, | ||
| ) | ||
| # the internal write pipeline should be stateless; it is limited to the data passed | ||
| # it shouldn't persist state (e.g., incremntal cursor) and interfere with other `pipeline.run()` | ||
| pipeline.config.restore_from_destination = False | ||
|
|
||
| return pipeline | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!