Skip to content

feat: support for composite/sub partitioning #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 124 additions & 62 deletions psqlextra/backend/schema.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
from typing import Any, List, Optional
from typing import Any, List, Optional, Tuple
from unittest import mock

from django.core.exceptions import (
FieldDoesNotExist,
ImproperlyConfigured,
SuspiciousOperation,
)
from django.core.exceptions import FieldDoesNotExist, ImproperlyConfigured, SuspiciousOperation
from django.db import transaction
from django.db.models import Field, Model

from psqlextra.type_assertions import is_sql_with_params
from psqlextra.types import PostgresPartitioningMethod

from . import base_impl
from .introspection import PostgresIntrospection
from .side_effects import (
HStoreRequiredSchemaEditorSideEffect,
HStoreUniqueSchemaEditorSideEffect,
)
from .side_effects import HStoreRequiredSchemaEditorSideEffect, HStoreUniqueSchemaEditorSideEffect


class PostgresSchemaEditor(base_impl.schema_editor()):
Expand All @@ -28,23 +20,15 @@ class PostgresSchemaEditor(base_impl.schema_editor()):
sql_create_view = "CREATE VIEW %s AS (%s)"
sql_replace_view = "CREATE OR REPLACE VIEW %s AS (%s)"
sql_drop_view = "DROP VIEW IF EXISTS %s"
sql_create_materialized_view = (
"CREATE MATERIALIZED VIEW %s AS (%s) WITH DATA"
)
sql_create_materialized_view = "CREATE MATERIALIZED VIEW %s AS (%s) WITH DATA"
sql_drop_materialized_view = "DROP MATERIALIZED VIEW %s"
sql_refresh_materialized_view = "REFRESH MATERIALIZED VIEW %s"
sql_refresh_materialized_view_concurrently = (
"REFRESH MATERIALIZED VIEW CONCURRENTLY %s"
)
sql_refresh_materialized_view_concurrently = "REFRESH MATERIALIZED VIEW CONCURRENTLY %s"
sql_partition_by = " PARTITION BY %s (%s)"
sql_add_default_partition = "CREATE TABLE %s PARTITION OF %s DEFAULT"
sql_add_hash_partition = "CREATE TABLE %s PARTITION OF %s FOR VALUES WITH (MODULUS %s, REMAINDER %s)"
sql_add_range_partition = (
"CREATE TABLE %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)"
)
sql_add_list_partition = (
"CREATE TABLE %s PARTITION OF %s FOR VALUES IN (%s)"
)
sql_add_range_partition = "CREATE TABLE %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)"
sql_add_list_partition = "CREATE TABLE %s PARTITION OF %s FOR VALUES IN (%s)"
sql_delete_partition = "DROP TABLE %s"
sql_table_comment = "COMMENT ON TABLE %s IS %s"

Expand All @@ -63,6 +47,10 @@ def __init__(self, connection, collect_sql=False, atomic=True):
self.deferred_sql = []
self.introspection = PostgresIntrospection(self.connection)

def execute(self, sql, params=()):
"""execute query"""
return super().execute(sql, params)
Comment on lines +50 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that could actually be removed.
I added that to print sql statement and debug my code implementation.


def create_model(self, model: Model) -> None:
"""Creates a new model."""

Expand All @@ -79,16 +67,10 @@ def delete_model(self, model: Model) -> None:

super().delete_model(model)

def refresh_materialized_view_model(
self, model: Model, concurrently: bool = False
) -> None:
def refresh_materialized_view_model(self, model: Model, concurrently: bool = False) -> None:
"""Refreshes a materialized view."""

sql_template = (
self.sql_refresh_materialized_view_concurrently
if concurrently
else self.sql_refresh_materialized_view
)
sql_template = self.sql_refresh_materialized_view_concurrently if concurrently else self.sql_refresh_materialized_view

sql = sql_template % self.quote_name(model._meta.db_table)
self.execute(sql)
Expand Down Expand Up @@ -131,9 +113,7 @@ def replace_materialized_view_model(self, model: Model) -> None:
"""

with self.connection.cursor() as cursor:
constraints = self.introspection.get_constraints(
cursor, model._meta.db_table
)
constraints = self.introspection.get_constraints(cursor, model._meta.db_table)

with transaction.atomic():
self.delete_materialized_view_model(model)
Expand All @@ -151,9 +131,7 @@ def replace_materialized_view_model(self, model: Model) -> None:
def delete_materialized_view_model(self, model: Model) -> None:
"""Deletes a materialized view model."""

sql = self.sql_drop_materialized_view % self.quote_name(
model._meta.db_table
)
sql = self.sql_drop_materialized_view % self.quote_name(model._meta.db_table)
self.execute(sql)

def create_partitioned_model(self, model: Model) -> None:
Expand All @@ -165,19 +143,19 @@ def create_partitioned_model(self, model: Model) -> None:
# table creations..
sql, params = self._extract_sql(self.create_model, model)

partitioning_key_sql = ", ".join(
self.quote_name(field_name) for field_name in meta.key
)
partkeys = meta.key + (getattr(meta, "subkey", None) or [])
primary_key_sql = ", ".join(self.quote_name(field_name) for field_name in partkeys)
partitioning_key_sql = ", ".join(self.quote_name(field_name) for field_name in meta.key)

# create a composite key that includes the partitioning key
sql = sql.replace(" PRIMARY KEY", "")
if model._meta.pk.name not in meta.key:
if model._meta.pk.name not in partkeys:
sql = sql[:-1] + ", PRIMARY KEY (%s, %s))" % (
self.quote_name(model._meta.pk.name),
partitioning_key_sql,
primary_key_sql,
)
else:
sql = sql[:-1] + ", PRIMARY KEY (%s))" % (partitioning_key_sql,)
sql = sql[:-1] + ", PRIMARY KEY (%s))" % (primary_key_sql,)

# extend the standard CREATE TABLE statement with
# 'PARTITION BY ...'
Expand All @@ -200,6 +178,8 @@ def add_range_partition(
from_values: Any,
to_values: Any,
comment: Optional[str] = None,
partition_by: Optional[Tuple[str, List[str]]] = None,
parent_partition_name: Optional[str] = None,
) -> None:
"""Creates a new range partition for the specified partitioned model.

Expand All @@ -224,32 +204,53 @@ def add_range_partition(
comment:
Optionally, a comment to add on this
partition table.

partition_by:
Optionally, a tuple of submethod, subkey list that is used to subpartition this table

parent_partition_name:
Optionnally, for subpartitions, the name of the parent partition
"""

# asserts the model is a model set up for partitioning
self._partitioning_properties_for_model(model)

table_name = self.create_partition_table_name(model, name)

parent_table_name = model._meta.db_table
if parent_partition_name:
parent_table_name += "_" + parent_partition_name

sql = self.sql_add_range_partition % (
self.quote_name(table_name),
self.quote_name(model._meta.db_table),
self.quote_name(parent_table_name),
"%s",
"%s",
)

if partition_by:
sql += self.sql_partition_by % (
partition_by[0].upper(),
", ".join(self.quote_name(field_name) for field_name in partition_by[1]),
)

with transaction.atomic():
self.execute(sql, (from_values, to_values))

if comment:
self.set_comment_on_table(table_name, comment)

if partition_by:
self.add_default_partition(model, name + "_default", comment, parent_partition_name=name)

def add_list_partition(
self,
model: Model,
name: str,
values: List[Any],
comment: Optional[str] = None,
partition_by: Optional[Tuple[str, List[str]]] = None,
parent_partition_name: Optional[str] = None,
) -> None:
"""Creates a new list partition for the specified partitioned model.

Expand All @@ -268,32 +269,53 @@ def add_list_partition(
comment:
Optionally, a comment to add on this
partition table.

partition_by:
Optionally, a tuple of submethod, subkey list that is used to subpartition this table

parent_partition_name:
Optionnally, for subpartitions, the name of the parent partition
"""

# asserts the model is a model set up for partitioning
self._partitioning_properties_for_model(model)

table_name = self.create_partition_table_name(model, name)

parent_table_name = model._meta.db_table
if parent_partition_name:
parent_table_name += "_" + parent_partition_name

sql = self.sql_add_list_partition % (
self.quote_name(table_name),
self.quote_name(model._meta.db_table),
self.quote_name(parent_table_name),
",".join(["%s" for _ in range(len(values))]),
)

if partition_by:
sql += self.sql_partition_by % (
partition_by[0].upper(),
", ".join(self.quote_name(field_name) for field_name in partition_by[1]),
)

with transaction.atomic():
self.execute(sql, values)

if comment:
self.set_comment_on_table(table_name, comment)

if partition_by:
self.add_default_partition(model, name + "_default", comment, parent_partition_name=table_name)

def add_hash_partition(
self,
model: Model,
name: str,
modulus: int,
remainder: int,
comment: Optional[str] = None,
partition_by: Optional[Tuple[str, List[str]]] = None,
parent_partition_name: Optional[str] = None,
) -> None:
"""Creates a new hash partition for the specified partitioned model.

Expand All @@ -313,28 +335,47 @@ def add_hash_partition(

comment:
Optionally, a comment to add on this partition table.

partition_by:
Optionally, a tuple of submethod, subkey list that is used to subpartition this table

parent_partition_name:
Optionnally, for subpartitions, the name of the parent partition
"""

# asserts the model is a model set up for partitioning
self._partitioning_properties_for_model(model)

table_name = self.create_partition_table_name(model, name)

parent_table_name = model._meta.db_table
if parent_partition_name:
parent_table_name += "_" + parent_partition_name

sql = self.sql_add_hash_partition % (
self.quote_name(table_name),
self.quote_name(model._meta.db_table),
self.quote_name(parent_table_name),
"%s",
"%s",
)

if partition_by:
sql += self.sql_partition_by % (
partition_by[0].upper(),
", ".join(self.quote_name(field_name) for field_name in partition_by[1]),
)

with transaction.atomic():
self.execute(sql, (modulus, remainder))

if comment:
self.set_comment_on_table(table_name, comment)

if partition_by:
self.add_default_partition(model, name + "_default", comment, parent_partition_name=table_name)

def add_default_partition(
self, model: Model, name: str, comment: Optional[str] = None
self, model: Model, name: str, comment: Optional[str] = None, parent_partition_name: Optional[str] = None
) -> None:
"""Creates a new default partition for the specified partitioned model.

Expand All @@ -352,16 +393,23 @@ def add_default_partition(
comment:
Optionally, a comment to add on this
partition table.

parent_partition_name:
Optionnally, for subpartitions, the name of the parent partition
"""

# asserts the model is a model set up for partitioning
self._partitioning_properties_for_model(model)

table_name = self.create_partition_table_name(model, name)

parent_table_name = model._meta.db_table
if parent_partition_name:
parent_table_name += "_" + parent_partition_name

sql = self.sql_add_default_partition % (
self.quote_name(table_name),
self.quote_name(model._meta.db_table),
self.quote_name(parent_table_name),
)

with transaction.atomic():
Expand All @@ -373,14 +421,10 @@ def add_default_partition(
def delete_partition(self, model: Model, name: str) -> None:
"""Deletes the partition with the specified name."""

sql = self.sql_delete_partition % self.quote_name(
self.create_partition_table_name(model, name)
)
sql = self.sql_delete_partition % self.quote_name(self.create_partition_table_name(model, name))
self.execute(sql)

def alter_db_table(
self, model: Model, old_db_table: str, new_db_table: str
) -> None:
def alter_db_table(self, model: Model, old_db_table: str, new_db_table: str) -> None:
"""Alters a table/model."""

super().alter_db_table(model, old_db_table, new_db_table)
Expand Down Expand Up @@ -461,10 +505,7 @@ def _view_properties_for_model(model: Model):
meta = getattr(model, "_view_meta", None)
if not meta:
raise ImproperlyConfigured(
(
"Model '%s' is not properly configured to be a view."
" Create the `ViewMeta` class as a child of '%s'."
)
("Model '%s' is not properly configured to be a view." " Create the `ViewMeta` class as a child of '%s'.")
% (model.__name__, model.__name__)
)

Expand Down Expand Up @@ -493,10 +534,7 @@ def _partitioning_properties_for_model(model: Model):
meta = getattr(model, "_partitioning_meta", None)
if not meta:
raise ImproperlyConfigured(
(
"Model '%s' is not properly configured to be partitioned."
" Create the `PartitioningMeta` class as a child of '%s'."
)
("Model '%s' is not properly configured to be partitioned." " Create the `PartitioningMeta` class as a child of '%s'.")
% (model.__name__, model.__name__)
)

Expand Down Expand Up @@ -542,6 +580,30 @@ def _partitioning_properties_for_model(model: Model):
% (model.__name__, field_name, meta.key, model.__name__)
)

if getattr(meta, "subkey", None):
if not isinstance(meta.subkey, list):
raise ImproperlyConfigured(
(
"Model '%s' is not properly configured to be subpartitioned."
" Partitioning subkey should be a list (of field names or values,"
" depending on the partitioning method)."
)
% model.__name__
)

try:
for field_name in meta.subkey:
model._meta.get_field(field_name)
except FieldDoesNotExist:
raise ImproperlyConfigured(
(
"Model '%s' is not properly configured to be subpartitioned."
" Field '%s' in partitioning subkey %s is not a valid field on"
" '%s'."
)
% (model.__name__, field_name, meta.subkey, model.__name__)
)

return meta

def create_partition_table_name(self, model: Model, name: str) -> str:
Expand Down
Loading