Skip to content

Commit b2fef24

Browse files
feat(api): initial version of check-delegation management command
1 parent 190bae5 commit b2fef24

File tree

1 file changed

+135
-0
lines changed

1 file changed

+135
-0
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
from functools import cache
2+
from socket import getaddrinfo
3+
4+
from django.conf import settings
5+
from django.core.management import BaseCommand
6+
import dns.exception, dns.message, dns.name, dns.query, dns.resolver
7+
8+
from desecapi.models import Domain
9+
10+
11+
LPS = {dns.name.from_text(lps) for lps in settings.LOCAL_PUBLIC_SUFFIXES}
12+
SERVER = "8.8.8.8"
13+
14+
15+
@cache
16+
def lookup(target):
17+
try:
18+
addrinfo = getaddrinfo(str(target), None)
19+
except OSError:
20+
addrinfo = []
21+
return {v[-1][0] for v in addrinfo}
22+
23+
24+
class Command(BaseCommand):
25+
help = "Check delegation status."
26+
27+
def __init__(self, *args, **kwargs):
28+
self.our_ns_set = {dns.name.from_text(ns) for ns in settings.DEFAULT_NS}
29+
self.our_ip_set = set.union(*(lookup(ns) for ns in self.our_ns_set))
30+
self.resolver = dns.resolver.Resolver()
31+
super().__init__(*args, **kwargs)
32+
33+
def add_arguments(self, parser):
34+
parser.add_argument(
35+
"domain-name",
36+
nargs="*",
37+
help="Domain name to check. If omitted, will check all domains not registered under a local public suffix.",
38+
)
39+
40+
def handle_domain(self, domain):
41+
# Identify parent
42+
domain_name = dns.name.from_text(domain.name)
43+
parent = domain_name.parent()
44+
while len(parent):
45+
query = dns.message.make_query(parent, dns.rdatatype.NS)
46+
try:
47+
res = dns.query.udp(query, SERVER, timeout=5)
48+
except:
49+
res = dns.query.tcp(query, SERVER, timeout=5)
50+
if res.answer:
51+
break
52+
parent = parent.parent()
53+
54+
# Find delegation NS hostnames and IP addresses
55+
ns = res.find_rrset(res.answer, parent, dns.rdataclass.IN, dns.rdatatype.NS)
56+
ipv4 = set()
57+
ipv6 = set()
58+
for rr in ns:
59+
ipv4 |= {ip for ip in lookup(rr.target) if "." in ip}
60+
ipv6 |= {ip for ip in lookup(rr.target) if "." not in ip}
61+
62+
self.resolver.nameserver = list(ipv4) + list(ipv6)
63+
try:
64+
answer = dns.resolver.resolve(domain_name, dns.rdatatype.NS)
65+
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
66+
domain.is_registered = False
67+
return
68+
domain.is_registered = True
69+
70+
# Compute overlap of delegation NS hostnames and IP addresses with ours
71+
ns_intersection = self.our_ns_set & {name.target for name in answer}
72+
domain.has_all_nameservers = ns_intersection == self.our_ns_set
73+
74+
ns_ip_intersection = self.our_ip_set & set.union(
75+
*(lookup(rr.target) for rr in answer)
76+
)
77+
# .is_delegated: None means "not delegated to deSEC", False means "partial", True means "fully"
78+
if not ns_ip_intersection:
79+
domain.is_delegated = None
80+
else:
81+
domain.is_delegated = ns_ip_intersection == self.our_ip_set
82+
83+
# Find delegation DS records
84+
if ns_ip_intersection:
85+
query = dns.message.make_query(domain_name, dns.rdatatype.DS)
86+
try:
87+
res = dns.query.udp(query, "8.8.8.8", timeout=5)
88+
except:
89+
res = dns.query.tcp(query, "8.8.8.8", timeout=5)
90+
try:
91+
ds = res.find_rrset(
92+
res.answer, domain_name, dns.rdataclass.IN, dns.rdatatype.DS
93+
)
94+
except KeyError:
95+
ds = set()
96+
ds = {rr.to_text() for rr in ds}
97+
98+
# Compute overlap of delegation DS records with ours
99+
our_ds_set = set()
100+
for key in domain.keys:
101+
# Only digest type 2 is mandatory to implement; delegation only fully set up if present
102+
our_ds_set |= {ds for ds in key["ds"] if ds.split(" ")[2] == "2"}
103+
ds_intersection = our_ds_set & ds
104+
# .is_secured: None means "not secured with deSEC", False means "partial", True means "fully"
105+
if not ds_intersection:
106+
domain.is_secured = None
107+
else:
108+
domain.is_secured = ds_intersection == our_ds_set
109+
110+
def handle(self, *args, **options):
111+
qs = Domain.objects
112+
if options["domain-name"]:
113+
qs = qs.filter(
114+
name__in=[name.rstrip(".") for name in options["domain-name"]]
115+
)
116+
for domain in qs.all():
117+
if domain.is_locally_registrable:
118+
continue
119+
120+
try:
121+
self.handle_domain(domain)
122+
except dns.resolver.LifetimeTimeout:
123+
print(f"{domain.name} Timeout")
124+
continue
125+
except dns.resolver.NoNameservers:
126+
print(f"{domain.name} Unresponsive")
127+
continue
128+
if domain.is_registered and domain.is_delegated is not None:
129+
print(
130+
f"{domain.owner.email} {domain.name} {domain.has_all_nameservers=} {domain.is_secured=}"
131+
)
132+
else:
133+
print(
134+
f"{domain.owner.email} {domain.name} {domain.is_registered=} delegated=False"
135+
)

0 commit comments

Comments
 (0)