|
17 | 17 | from django.db.models.fields import SlugField
|
18 | 18 | from django.db.models.lookups import Exact
|
19 | 19 | from django.db.models.query_utils import Q
|
| 20 | +import netaddr |
20 | 21 | from extras.models.customfields import CustomField
|
21 | 22 |
|
22 | 23 | from .common import AutoSlug, UnresolvedReference
|
|
46 | 47 | ),
|
47 | 48 | ],
|
48 | 49 | "ipam.ipaddress": lambda: [
|
49 |
| - ObjectMatchCriteria( |
50 |
| - fields=("address", ), |
51 |
| - name="logical_ip_address_global_no_vrf", |
| 50 | + GlobalIPNetworkIPMatcher( |
| 51 | + ip_field="address", |
| 52 | + vrf_field="vrf", |
52 | 53 | model_class=get_object_type_model("ipam.ipaddress"),
|
53 |
| - condition=Q(vrf__isnull=True), |
| 54 | + name="logical_ip_address_global_no_vrf", |
54 | 55 | ),
|
55 |
| - ObjectMatchCriteria( |
56 |
| - fields=("address", "assigned_object_type", "assigned_object_id"), |
57 |
| - name="logical_ip_address_within_vrf", |
| 56 | + VRFIPNetworkIPMatcher( |
| 57 | + ip_field="address", |
| 58 | + vrf_field="vrf", |
58 | 59 | model_class=get_object_type_model("ipam.ipaddress"),
|
59 |
| - condition=Q(vrf__isnull=False) |
| 60 | + name="logical_ip_address_within_vrf", |
60 | 61 | ),
|
61 | 62 | ],
|
62 | 63 | "ipam.prefix": lambda: [
|
@@ -271,6 +272,8 @@ def _prepare_data(self, data: dict) -> dict:
|
271 | 272 | continue
|
272 | 273 | return prepared
|
273 | 274 |
|
| 275 | + |
| 276 | + |
274 | 277 | @dataclass
|
275 | 278 | class CustomFieldMatcher:
|
276 | 279 | """A matcher for a unique custom field."""
|
@@ -305,6 +308,124 @@ def has_required_fields(self, data: dict) -> bool:
|
305 | 308 | """Returns True if the data given contains a value for all fields referenced by the constraint."""
|
306 | 309 | return self.custom_field in data.get("custom_fields", {})
|
307 | 310 |
|
| 311 | + |
| 312 | +@dataclass |
| 313 | +class GlobalIPNetworkIPMatcher: |
| 314 | + """A matcher that ignores the mask.""" |
| 315 | + |
| 316 | + ip_field: str |
| 317 | + vrf_field: str |
| 318 | + model_class: Type[models.Model] |
| 319 | + name: str |
| 320 | + |
| 321 | + def _check_condition(self, data: dict) -> bool: |
| 322 | + """Check the condition for the custom field.""" |
| 323 | + return data.get(self.vrf_field, None) is None |
| 324 | + |
| 325 | + def fingerprint(self, data: dict) -> str|None: |
| 326 | + """Fingerprint the custom field value.""" |
| 327 | + if not self.has_required_fields(data): |
| 328 | + return None |
| 329 | + |
| 330 | + if not self._check_condition(data): |
| 331 | + return None |
| 332 | + |
| 333 | + value = self.ip_value(data) |
| 334 | + if value is None: |
| 335 | + return None |
| 336 | + |
| 337 | + return hash((self.model_class.__name__, self.name, value)) |
| 338 | + |
| 339 | + def has_required_fields(self, data: dict) -> bool: |
| 340 | + """Returns True if the data given contains a value for all fields referenced by the constraint.""" |
| 341 | + return self.ip_field in data |
| 342 | + |
| 343 | + def ip_value(self, data: dict) -> str|None: |
| 344 | + """Get the IP value from the data.""" |
| 345 | + value = data.get(self.ip_field) |
| 346 | + if value is None: |
| 347 | + return None |
| 348 | + return _ip_only(value) |
| 349 | + |
| 350 | + def build_queryset(self, data: dict) -> models.QuerySet: |
| 351 | + """Build a queryset for the custom field.""" |
| 352 | + if not self.has_required_fields(data): |
| 353 | + return None |
| 354 | + |
| 355 | + if not self._check_condition(data): |
| 356 | + return None |
| 357 | + |
| 358 | + value = self.ip_value(data) |
| 359 | + if value is None: |
| 360 | + return None |
| 361 | + |
| 362 | + return self.model_class.objects.filter(**{f'{self.ip_field}__net_host': value, f'{self.vrf_field}__isnull': True}) |
| 363 | + |
| 364 | +@dataclass |
| 365 | +class VRFIPNetworkIPMatcher: |
| 366 | + """Matches ip in a vrf, ignores mask.""" |
| 367 | + |
| 368 | + ip_field: str |
| 369 | + vrf_field: str |
| 370 | + model_class: Type[models.Model] |
| 371 | + name: str |
| 372 | + |
| 373 | + def _check_condition(self, data: dict) -> bool: |
| 374 | + """Check the condition for the custom field.""" |
| 375 | + return data.get('vrf_id', None) is not None |
| 376 | + |
| 377 | + def fingerprint(self, data: dict) -> str|None: |
| 378 | + """Fingerprint the custom field value.""" |
| 379 | + if not self.has_required_fields(data): |
| 380 | + return None |
| 381 | + |
| 382 | + if not self._check_condition(data): |
| 383 | + return None |
| 384 | + |
| 385 | + value = self.ip_value(data) |
| 386 | + if value is None: |
| 387 | + return None |
| 388 | + |
| 389 | + vrf_id = data[self.vrf_field] |
| 390 | + |
| 391 | + return hash((self.model_class.__name__, self.name, value, vrf_id)) |
| 392 | + |
| 393 | + def has_required_fields(self, data: dict) -> bool: |
| 394 | + """Returns True if the data given contains a value for all fields referenced by the constraint.""" |
| 395 | + return self.ip_field in data and self.vrf_field in data |
| 396 | + |
| 397 | + def ip_value(self, data: dict) -> str|None: |
| 398 | + """Get the IP value from the data.""" |
| 399 | + value = data.get(self.ip_field) |
| 400 | + if value is None: |
| 401 | + return None |
| 402 | + return _ip_only(value) |
| 403 | + |
| 404 | + def build_queryset(self, data: dict) -> models.QuerySet: |
| 405 | + """Build a queryset for the custom field.""" |
| 406 | + if not self.has_required_fields(data): |
| 407 | + return None |
| 408 | + |
| 409 | + if not self._check_condition(data): |
| 410 | + return None |
| 411 | + |
| 412 | + value = self.ip_value(data) |
| 413 | + if value is None: |
| 414 | + return None |
| 415 | + |
| 416 | + vrf_id = data[self.vrf_field] |
| 417 | + return self.model_class.objects.filter(**{f'{self.ip_field}__net_host': value, f'{self.vrf_field}': vrf_id}) |
| 418 | + |
| 419 | + |
| 420 | +def _ip_only(value: str) -> str|None: |
| 421 | + try: |
| 422 | + ip = netaddr.IPNetwork(value) |
| 423 | + value = ip.ip |
| 424 | + except netaddr.core.AddrFormatError: |
| 425 | + return None |
| 426 | + |
| 427 | + return value |
| 428 | + |
308 | 429 | @dataclass
|
309 | 430 | class AutoSlugMatcher:
|
310 | 431 | """A special matcher that tries to match on auto generated slugs."""
|
|
0 commit comments