|
1 | 1 | """ |
2 | 2 | Models for customer billing app. |
3 | 3 | """ |
| 4 | +import logging |
4 | 5 | from datetime import timedelta |
5 | 6 | from typing import Self |
6 | 7 |
|
|
16 | 17 | from simple_history.models import HistoricalRecords |
17 | 18 | from simple_history.utils import bulk_update_with_history |
18 | 19 |
|
| 20 | +from enterprise_access.apps.customer_billing.constants import ALLOWED_CHECKOUT_INTENT_STATE_TRANSITIONS |
19 | 21 | from enterprise_access.apps.provisioning.models import ProvisionNewCustomerWorkflow |
20 | 22 |
|
21 | 23 | from .constants import INTENT_RESERVATION_DURATION_MINUTES, CheckoutIntentState |
22 | 24 |
|
| 25 | +logger = logging.getLogger(__name__) |
23 | 26 | User = get_user_model() |
24 | 27 |
|
25 | 28 |
|
@@ -135,50 +138,92 @@ class StateChoices(models.TextChoices): |
135 | 138 |
|
136 | 139 | def __str__(self): |
137 | 140 | return ( |
138 | | - f"{self.user.email}, slug: {self.enterprise_slug}, name: {self.enterprise_name}, " |
139 | | - f"state: {self.state}, (expires {self.expires_at})" |
| 141 | + "<CheckoutIntent " |
| 142 | + f"id={self.id}, " |
| 143 | + f"email={self.user.email}, " |
| 144 | + f"enterprise_slug={self.enterprise_slug}, " |
| 145 | + f"enterprise_name={self.enterprise_name}, " |
| 146 | + f"state={self.state}, " |
| 147 | + f"expires_at={self.expires_at}>" |
140 | 148 | ) |
141 | 149 |
|
| 150 | + @classmethod |
| 151 | + def is_valid_state_transition( |
| 152 | + cls, |
| 153 | + current_state: CheckoutIntentState, |
| 154 | + new_state: CheckoutIntentState, |
| 155 | + ) -> bool: |
| 156 | + """ |
| 157 | + Validate if the state transition is allowed. |
| 158 | +
|
| 159 | + Args: |
| 160 | + current_state: Current state of the CheckoutIntent |
| 161 | + new_state: Proposed new state |
| 162 | +
|
| 163 | + Returns: |
| 164 | + bool: True if transition is allowed, False otherwise |
| 165 | + """ |
| 166 | + if current_state == new_state: |
| 167 | + return True |
| 168 | + allowed_transitions = ALLOWED_CHECKOUT_INTENT_STATE_TRANSITIONS.get(current_state, []) |
| 169 | + return new_state in allowed_transitions |
| 170 | + |
142 | 171 | def mark_as_paid(self, stripe_session_id=None): |
143 | 172 | """Mark the intent as paid after successful Stripe checkout.""" |
144 | | - if self.state not in (CheckoutIntentState.CREATED, CheckoutIntentState.PAID): |
145 | | - raise ValueError(f"Cannot transition to PAID from {self.state}") |
| 173 | + if not self.is_valid_state_transition(CheckoutIntentState(self.state), CheckoutIntentState.PAID): |
| 174 | + raise ValueError(f"Cannot transition from {self.state} to {CheckoutIntentState.PAID}.") |
146 | 175 |
|
147 | 176 | if stripe_session_id: |
148 | 177 | if self.state == CheckoutIntentState.PAID and stripe_session_id != self.stripe_checkout_session_id: |
149 | | - raise ValueError("Cannot transition to PAID from PAID with a different stripe_checkout_session_id") |
| 178 | + raise ValueError("Cannot transition from PAID to PAID with a different stripe_checkout_session_id") |
150 | 179 |
|
151 | 180 | self.state = CheckoutIntentState.PAID |
152 | 181 | if stripe_session_id: |
153 | 182 | self.stripe_checkout_session_id = stripe_session_id |
154 | 183 | self.save(update_fields=['state', 'stripe_checkout_session_id', 'modified']) |
| 184 | + logger.info(f'CheckoutIntent {self} marked as {CheckoutIntentState.PAID}.') |
155 | 185 | return self |
156 | 186 |
|
157 | 187 | def mark_as_fulfilled(self, workflow=None): |
158 | 188 | """Mark the intent as fulfilled after successful provisioning.""" |
159 | | - if self.state != CheckoutIntentState.PAID: |
160 | | - raise ValueError(f"Cannot transition to FULFILLED from {self.state}") |
| 189 | + if not self.is_valid_state_transition(CheckoutIntentState(self.state), CheckoutIntentState.FULFILLED): |
| 190 | + raise ValueError(f"Cannot transition from {self.state} to {CheckoutIntentState.FULFILLED}.") |
161 | 191 |
|
162 | 192 | self.state = CheckoutIntentState.FULFILLED |
163 | 193 | if workflow: |
164 | 194 | self.workflow = workflow |
165 | 195 | self.save(update_fields=['state', 'workflow', 'modified']) |
| 196 | + logger.info(f'CheckoutIntent {self} marked as {CheckoutIntentState.FULFILLED}.') |
166 | 197 | return self |
167 | 198 |
|
168 | 199 | def mark_checkout_error(self, error_message): |
169 | 200 | """Record a checkout error.""" |
| 201 | + if not self.is_valid_state_transition( |
| 202 | + CheckoutIntentState(self.state), |
| 203 | + CheckoutIntentState.ERRORED_STRIPE_CHECKOUT, |
| 204 | + ): |
| 205 | + raise ValueError(f"Cannot transition from {self.state} to {CheckoutIntentState.ERRORED_STRIPE_CHECKOUT}.") |
| 206 | + |
170 | 207 | self.state = CheckoutIntentState.ERRORED_STRIPE_CHECKOUT |
171 | 208 | self.last_checkout_error = error_message |
172 | 209 | self.save(update_fields=['state', 'last_checkout_error', 'modified']) |
| 210 | + logger.info(f'CheckoutIntent {self} marked as {CheckoutIntentState.ERRORED_STRIPE_CHECKOUT}.') |
173 | 211 | return self |
174 | 212 |
|
175 | 213 | def mark_provisioning_error(self, error_message, workflow=None): |
176 | 214 | """Record a provisioning error.""" |
| 215 | + if not self.is_valid_state_transition( |
| 216 | + CheckoutIntentState(self.state), |
| 217 | + CheckoutIntentState.ERRORED_PROVISIONING, |
| 218 | + ): |
| 219 | + raise ValueError(f"Cannot transition from {self.state} to {CheckoutIntentState.ERRORED_PROVISIONING}.") |
| 220 | + |
177 | 221 | self.state = CheckoutIntentState.ERRORED_PROVISIONING |
178 | 222 | self.last_provisioning_error = error_message |
179 | 223 | if workflow: |
180 | 224 | self.workflow = workflow |
181 | 225 | self.save(update_fields=['state', 'last_provisioning_error', 'workflow', 'modified']) |
| 226 | + logger.info(f'CheckoutIntent {self} marked as {CheckoutIntentState.ERRORED_PROVISIONING}.') |
182 | 227 | return self |
183 | 228 |
|
184 | 229 | @property |
|
0 commit comments