-
Notifications
You must be signed in to change notification settings - Fork 104
/
Copy pathplan.py
137 lines (103 loc) · 4.26 KB
/
plan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import contextlib
import sys
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, ContextManager, List, Optional, Union, cast
from django.db import connections, transaction
from .config import PostgresPartitioningConfig
from .constants import AUTO_PARTITIONED_COMMENT
from .partition import PostgresPartition
if TYPE_CHECKING:
from psqlextra.backend.schema import PostgresSchemaEditor
@dataclass
class PostgresModelPartitioningPlan:
"""Describes the partitions that are going to be created/deleted for a
particular partitioning config.
A "partitioning config" applies to one model.
"""
config: PostgresPartitioningConfig
creations: List[PostgresPartition] = field(default_factory=list)
deletions: List[PostgresPartition] = field(default_factory=list)
def apply(self, using: Optional[str]) -> None:
"""Applies this partitioning plan by creating and deleting the planned
partitions.
Applying the plan runs in a transaction.
Arguments:
using:
Optional name of the database connection to use.
"""
connection = connections[using or "default"]
with self._migration_context_manager():
with connection.schema_editor(
atomic=self.config.atomic
) as schema_editor:
for partition in self.creations:
partition.create(
self.config.model,
cast("PostgresSchemaEditor", schema_editor),
comment=AUTO_PARTITIONED_COMMENT,
)
for partition in self.deletions:
partition.delete(
self.config.model,
cast("PostgresSchemaEditor", schema_editor),
)
def _migration_context_manager(
self,
) -> Union[transaction.Atomic, ContextManager[None]]:
if sys.version_info >= (3, 7):
return (
transaction.atomic()
if self.config.atomic
else contextlib.nullcontext()
)
else:
return (
transaction.atomic()
if self.config.atomic
else contextlib.suppress()
)
def print(self) -> None:
"""Prints this model plan to the terminal in a readable format."""
print(f"{self.config.model.__name__}:")
for partition in self.deletions:
print(" - %s" % partition.name())
for key, value in partition.deconstruct().items():
print(f" {key}: {value}")
for partition in self.creations:
print(" + %s" % partition.name())
for key, value in partition.deconstruct().items():
print(f" {key}: {value}")
@dataclass
class PostgresPartitioningPlan:
"""Describes the partitions that are going to be created/deleted."""
model_plans: List[PostgresModelPartitioningPlan]
@property
def creations(self) -> List[PostgresPartition]:
"""Gets a complete flat list of the partitions that are going to be
created."""
creations = []
for model_plan in self.model_plans:
creations.extend(model_plan.creations)
return creations
@property
def deletions(self) -> List[PostgresPartition]:
"""Gets a complete flat list of the partitions that are going to be
deleted."""
deletions = []
for model_plan in self.model_plans:
deletions.extend(model_plan.deletions)
return deletions
def apply(self, using: Optional[str] = None) -> None:
"""Applies this plan by creating/deleting all planned partitions."""
for model_plan in self.model_plans:
model_plan.apply(using=using)
def print(self) -> None:
"""Prints this plan to the terminal in a readable format."""
for model_plan in self.model_plans:
model_plan.print()
print("")
create_count = len(self.creations)
delete_count = len(self.deletions)
print(f"{delete_count} partitions will be deleted")
print(f"{create_count} partitions will be created")
__all__ = ["PostgresPartitioningPlan", "PostgresModelPartitioningPlan"]