Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/_workspace/cli/_pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
schema_str = s.to_dbml()
elif format_ == "dot":
schema_str = s.to_dot()
elif format == "mermaid":
schema_str = s.to_mermaid()
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults_)

Expand Down
2 changes: 2 additions & 0 deletions dlt/_workspace/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ def schema_command_wrapper(file_path: str, format_: str, remove_defaults: bool)
schema_str = s.to_dbml()
elif format_ == "dot":
schema_str = s.to_dot()
elif format == "mermaid":
schema_str = s.to_mermaid()
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)

Expand Down
28 changes: 28 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,34 @@ def to_dot(
)
return dot

def to_mermaid(
self,
remove_processing_hints: bool = False,
include_dlt_tables: bool = True,
) -> str:
"""Convert schema to a Mermaid diagram string.
Args:
remove_processing_hints: If True, remove hints used for data processing and redundant information.
This reduces the size of the schema and improves readability.
include_dlt_tables: If ``True`` (the default), include the data tables
as well as the internal DLT tables (``_dlt_version``,
``_dlt_loads``, ``_dlt_pipeline_state``). If ``False``, these tables
are omitted from the diagram.

Returns:
A string containing a Mermaid ERdiagram of the schema.
"""
from dlt.helpers.mermaid import schema_to_mermaid

stored_schema = self.to_dict(
# setting this to `True` removes `name` fields that are used in `schema_to_dbml()`
# if required, we can refactor `dlt.helpers.dbml` to support this
remove_defaults=False,
remove_processing_hints=remove_processing_hints,
)

return schema_to_mermaid(stored_schema, self.references, include_dlt_tables)

def clone(
self,
with_name: str = None,
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dlt.common.utils import digest128


TSchemaFileFormat = Literal["json", "yaml", "dbml", "dot"]
TSchemaFileFormat = Literal["json", "yaml", "dbml", "dot", "mermaid"]
SCHEMA_FILES_EXTENSIONS = get_args(TSchemaFileFormat)


Expand Down
2 changes: 2 additions & 0 deletions dlt/common/storages/schema_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ def _parse_schema_str(schema_str: str, extension: TSchemaFileFormat) -> DictStrA
raise ValueError(extension, "Schema parser for `dbml` not yet implemented")
elif extension == "dot":
raise ValueError(extension, "Schema parser for `dot` not yet implemented")
elif extension == "mermaid":
raise ValueError(extension, "Schema parser for `mermaid` not yet implemented")
else:
raise ValueError(extension)
return imported_schema
Expand Down
100 changes: 100 additions & 0 deletions dlt/helpers/mermaid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Build a mermaid graph representation using raw strings without additional dependencies"""
from enum import Enum
from typing import Literal
from dlt.common.schema.typing import (
TReferenceCardinality,
TStoredSchema,
TTableReferenceStandalone,
TTableSchema,
TTableSchemaColumns,
)


def schema_to_mermaid(
schema: TStoredSchema,
references: list[TTableReferenceStandalone],
include_dlt_tables: bool = True,
) -> str:
mermaid_er_diagram = "erDiagram\n"

for table_name, table_schema in schema["tables"].items():
if not include_dlt_tables and table_name.startswith("_dlt"):
continue
mermaid_table = _to_mermaid_table(table_schema)
mermaid_er_diagram += mermaid_table

if not include_dlt_tables:
references = list(filter(lambda x: not _is_dlt_table_reference(x), references))
for ref in references:
ref_txt = _to_mermaid_reference(ref)
mermaid_er_diagram += ref_txt
return mermaid_er_diagram


def _is_dlt_table_reference(ref: TTableReferenceStandalone) -> bool:
"""returns True if reference table or table is a _dlt_table"""
if ref["table"].startswith("_dlt") or ref["referenced_table"].startswith("_dlt"):
return True
return False


def _to_mermaid_table(table: TTableSchema) -> str:
items = [table.get("name", ""), "{", _to_mermaid_column(table.get("columns", {})), "}\n"]
return "".join(items)


def _to_mermaid_column(columns: TTableSchemaColumns) -> str:
rows = ""
for column_name, column_schema in columns.items():
if column_schema.get("primary_key"):
rows += f"{column_schema['data_type']} {column_name} PK \n"
elif column_schema.get("unique"):
rows += f"{column_schema['data_type']} {column_name} UK \n"
else:
rows += f"{column_schema['data_type']} {column_name} \n"
return rows


class TMermaidArrows(str, Enum):
ONE_TO_MANY = "||--|{"
MANY_TO_ONE = "}|--||"
ZERO_TO_MANY = "|o--|{"
MANY_TO_ZERO = "}|--o|"
ONE_TO_MORE = "||--o{"
MORE_TO_ONE = "}o--||"
ONE_TO_ONE = "||--||"
MANY_TO_MANY = "}|--|{"
ZERO_TO_ONE = "|o--o|"


_CARDINALITY_ARROW: dict[Literal["default"] | TReferenceCardinality, TMermaidArrows] = {
"one_to_many": TMermaidArrows.ONE_TO_MANY,
"many_to_one": TMermaidArrows.MANY_TO_ONE,
"zero_to_many": TMermaidArrows.ZERO_TO_MANY,
"many_to_zero": TMermaidArrows.MANY_TO_ZERO,
"one_to_one": TMermaidArrows.ONE_TO_ONE,
"many_to_many": TMermaidArrows.MANY_TO_MANY,
"zero_to_one": TMermaidArrows.ZERO_TO_ONE,
"one_to_zero": TMermaidArrows.ZERO_TO_ONE,
"default": TMermaidArrows.ZERO_TO_ONE,
}


def _to_mermaid_reference(ref: TTableReferenceStandalone) -> str:
"""Builds references in the following format using cardinality and label to describe
the relationship
<first-entity> [<relationship> <second-entity> : <relationship-label>]
"""
first = ref.get("table")
second = ref.get("referenced_table")
raw_card = ref.get("cardinality", "default")
label = ref.get("label", "contains")

# Map cardinality to arrow syntax; fall back to one to many
arrow = _CARDINALITY_ARROW.get(raw_card)

parts = [first, arrow, second]
line = " ".join(parts)
if label:
line += f" : {label} \n"
return line
Loading