Skip to content

fix: special cases for ip network defaulting #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion netbox_diode_plugin/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class AutoSlug:


def error_from_validation_error(e, object_name):
"""Convert a drf ValidationError to a ChangeSetException."""
"""Convert a from rest_framework.exceptions.ValidationError to a ChangeSetException."""
errors = {}
if e.detail:
if isinstance(e.detail, dict):
Expand Down
10 changes: 9 additions & 1 deletion netbox_diode_plugin/api/differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@
# Copyright 2025 NetBox Labs Inc
"""Diode NetBox Plugin - API - Differ."""

import copy
import datetime
import logging

from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from utilities.data import shallow_compare_dict
from django.db.backends.postgresql.psycopg_any import NumericRange
import netaddr

from .common import Change, ChangeSet, ChangeSetException, ChangeSetResult, ChangeType, error_from_validation_error
from .plugin_utils import get_primary_value, legal_fields
from .supported_models import extract_supported_models
from .transformer import cleanup_unresolved_references, set_custom_field_defaults, transform_proto_json

Check failure on line 18 in netbox_diode_plugin/api/differ.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (I001)

netbox_diode_plugin/api/differ.py:5:1: I001 Import block is un-sorted or un-formatted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -84,6 +85,10 @@


def _harmonize_formats(prechange_data):
if prechange_data is None:
return None
if isinstance(prechange_data, (str, int, float, bool)):
return prechange_data
if isinstance(prechange_data, dict):
return {k: _harmonize_formats(v) for k, v in prechange_data.items()}
if isinstance(prechange_data, (list, tuple)):
Expand All @@ -94,8 +99,11 @@
return prechange_data.strftime("%Y-%m-%d")
if isinstance(prechange_data, NumericRange):
return (prechange_data.lower, prechange_data.upper-1)
if isinstance(prechange_data, netaddr.IPNetwork):
return str(prechange_data)

return prechange_data
logger.warning(f"Unknown type in prechange_data: {type(prechange_data)}")
return str(prechange_data)

def clean_diff_data(data: dict, exclude_empty_values: bool = True) -> dict:
"""Clean diff data by removing null values."""
Expand Down
137 changes: 129 additions & 8 deletions netbox_diode_plugin/api/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@
# Copyright 2024 NetBox Labs Inc
"""Diode NetBox Plugin - API - Object matching utilities."""

import copy
import logging
from dataclasses import dataclass
from functools import cache, lru_cache
from typing import Type

from core.models import ObjectType as NetBoxType
from django.conf import settings
from django.contrib.contenttypes.fields import ContentType
from django.core.exceptions import FieldDoesNotExist
from django.db import models
from django.db.models import F, Value
from django.db.models.fields import SlugField
from django.db.models.lookups import Exact
from django.db.models.query_utils import Q
import netaddr
from extras.models.customfields import CustomField

from .common import AutoSlug, UnresolvedReference
from .plugin_utils import content_type_id, get_object_type, get_object_type_model

Check failure on line 24 in netbox_diode_plugin/api/matcher.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (I001)

netbox_diode_plugin/api/matcher.py:5:1: I001 Import block is un-sorted or un-formatted

logger = logging.getLogger(__name__)

Expand All @@ -46,17 +47,17 @@
),
],
"ipam.ipaddress": lambda: [
ObjectMatchCriteria(
fields=("address", ),
name="logical_ip_address_global_no_vrf",
GlobalIPNetworkIPMatcher(
ip_field="address",
vrf_field="vrf",
model_class=get_object_type_model("ipam.ipaddress"),
condition=Q(vrf__isnull=True),
name="logical_ip_address_global_no_vrf",
),
ObjectMatchCriteria(
fields=("address", "assigned_object_type", "assigned_object_id"),
name="logical_ip_address_within_vrf",
VRFIPNetworkIPMatcher(
ip_field="address",
vrf_field="vrf",
model_class=get_object_type_model("ipam.ipaddress"),
condition=Q(vrf__isnull=False)
name="logical_ip_address_within_vrf",
),
],
"ipam.prefix": lambda: [
Expand Down Expand Up @@ -271,6 +272,8 @@
continue
return prepared



@dataclass
class CustomFieldMatcher:
"""A matcher for a unique custom field."""
Expand Down Expand Up @@ -305,6 +308,124 @@
"""Returns True if the data given contains a value for all fields referenced by the constraint."""
return self.custom_field in data.get("custom_fields", {})


@dataclass
class GlobalIPNetworkIPMatcher:
"""A matcher that ignores the mask."""

ip_field: str
vrf_field: str
model_class: Type[models.Model]
name: str

def _check_condition(self, data: dict) -> bool:
"""Check the condition for the custom field."""
return data.get(self.vrf_field, None) is None

def fingerprint(self, data: dict) -> str|None:
"""Fingerprint the custom field value."""
if not self.has_required_fields(data):
return None

if not self._check_condition(data):
return None

value = self.ip_value(data)
if value is None:
return None

return hash((self.model_class.__name__, self.name, value))

def has_required_fields(self, data: dict) -> bool:
"""Returns True if the data given contains a value for all fields referenced by the constraint."""
return self.ip_field in data

def ip_value(self, data: dict) -> str|None:
"""Get the IP value from the data."""
value = data.get(self.ip_field)
if value is None:
return None
return _ip_only(value)

def build_queryset(self, data: dict) -> models.QuerySet:
"""Build a queryset for the custom field."""
if not self.has_required_fields(data):
return None

if not self._check_condition(data):
return None

value = self.ip_value(data)
if value is None:
return None

return self.model_class.objects.filter(**{f'{self.ip_field}__net_host': value, f'{self.vrf_field}__isnull': True})

@dataclass
class VRFIPNetworkIPMatcher:
"""Matches ip in a vrf, ignores mask."""

ip_field: str
vrf_field: str
model_class: Type[models.Model]
name: str

def _check_condition(self, data: dict) -> bool:
"""Check the condition for the custom field."""
return data.get('vrf_id', None) is not None

def fingerprint(self, data: dict) -> str|None:
"""Fingerprint the custom field value."""
if not self.has_required_fields(data):
return None

if not self._check_condition(data):
return None

value = self.ip_value(data)
if value is None:
return None

vrf_id = data[self.vrf_field]

return hash((self.model_class.__name__, self.name, value, vrf_id))

def has_required_fields(self, data: dict) -> bool:
"""Returns True if the data given contains a value for all fields referenced by the constraint."""
return self.ip_field in data and self.vrf_field in data

def ip_value(self, data: dict) -> str|None:
"""Get the IP value from the data."""
value = data.get(self.ip_field)
if value is None:
return None
return _ip_only(value)

def build_queryset(self, data: dict) -> models.QuerySet:
"""Build a queryset for the custom field."""
if not self.has_required_fields(data):
return None

if not self._check_condition(data):
return None

value = self.ip_value(data)
if value is None:
return None

vrf_id = data[self.vrf_field]
return self.model_class.objects.filter(**{f'{self.ip_field}__net_host': value, f'{self.vrf_field}': vrf_id})


def _ip_only(value: str) -> str|None:
try:
ip = netaddr.IPNetwork(value)
value = ip.ip
except netaddr.core.AddrFormatError:
return None

return value

@dataclass
class AutoSlugMatcher:
"""A special matcher that tries to match on auto generated slugs."""
Expand Down
21 changes: 20 additions & 1 deletion netbox_diode_plugin/api/plugin_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Diode plugin helpers."""

# Generated code. DO NOT EDIT.
# Timestamp: 2025-04-13 13:20:10Z
# Timestamp: 2025-04-13 16:50:25Z

from dataclasses import dataclass
import datetime
Expand All @@ -13,6 +13,8 @@
from core.models import ObjectType as NetBoxType
from django.contrib.contenttypes.models import ContentType
from django.db import models
import netaddr
from rest_framework.exceptions import ValidationError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1014,6 +1016,12 @@ def transform_float_to_decimal(value: float) -> decimal.Decimal:
def int_from_int64string(value: str) -> int:
return int(value)

def ip_network_defaulting(value: str) -> str:
try:
return str(netaddr.IPNetwork(value))
except netaddr.AddrFormatError:
raise ValueError(f'Invalid IP network value: {value}')

def collect_integer_pairs(value: list[int]) -> list[tuple[int, int]]:
if len(value) % 2 != 0:
raise ValueError('Array must have an even number of elements')
Expand Down Expand Up @@ -1114,6 +1122,7 @@ def wrapper(value):
},
'ipam.aggregate': {
'date_added': transform_timestamp_to_date_only,
'prefix': ip_network_defaulting,
},
'ipam.asn': {
'asn': int_from_int64string,
Expand All @@ -1128,6 +1137,16 @@ def wrapper(value):
'ipam.fhrpgroupassignment': {
'priority': int_from_int64string,
},
'ipam.ipaddress': {
'address': ip_network_defaulting,
},
'ipam.iprange': {
'end_address': ip_network_defaulting,
'start_address': ip_network_defaulting,
},
'ipam.prefix': {
'prefix': ip_network_defaulting,
},
'ipam.role': {
'weight': int_from_int64string,
},
Expand Down
Loading
Loading