Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion subwiz/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def _counting_progress_dot():
return {str(dom) for dom in predictions}

predictions_that_resolve = asyncio.run(
get_registered_domains(predictions, resolution_concurrency)
get_registered_domains(predictions, resolution_concurrency, apex_domain=apex)
)

if not quiet:
Expand Down
105 changes: 100 additions & 5 deletions subwiz/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
This module provides asynchronous DNS resolution functionality to check whether
domains are registered and resolve to IP addresses. It uses multiple nameservers
for reliability and implements concurrency control for efficient batch processing.
Includes wildcard DNS detection to filter false positives on domains with
catch-all DNS records.
"""

from __future__ import annotations

import asyncio
import random
import string
from typing import Optional

import aiodns
import idna.core
Expand All @@ -16,28 +23,116 @@
NAME_SERVERS = ["1.1.1.1", "1.0.0.1", "8.8.8.8"]
TIMEOUT = 3
TRIES = 1
WILDCARD_PROBE_COUNT = 3


async def _resolve_ips(
hostname: str, resolver: aiodns.DNSResolver
) -> frozenset[str]:
"""Resolve a hostname to its set of IP addresses.

Args:
hostname: The hostname to resolve
resolver: DNS resolver instance

Returns:
Frozenset of IP address strings, or empty frozenset on failure
"""
try:
results = await resolver.query(hostname, "A")
return frozenset(r.host for r in results)
except (aiodns.error.DNSError, idna.IDNAError):
return frozenset()


async def detect_wildcard(
apex_domain: str, resolver: aiodns.DNSResolver
) -> Optional[frozenset[str]]:
"""Detect whether an apex domain has a wildcard DNS record.

Probes multiple random subdomains that are extremely unlikely to exist.
If all probes resolve to the same set of IPs, a wildcard is present.

Args:
apex_domain: The apex domain to check (e.g. "example.com")
resolver: DNS resolver instance

Returns:
Frozenset of wildcard IP addresses if wildcard detected, None otherwise
"""
random_labels = [
"".join(random.choices(string.ascii_lowercase + string.digits, k=16))
for _ in range(WILDCARD_PROBE_COUNT)
]
probe_hosts = [f"{label}.{apex_domain}" for label in random_labels]

ip_sets = await asyncio.gather(
*[_resolve_ips(host, resolver) for host in probe_hosts]
)

# If any probe failed to resolve, there is no wildcard
if any(len(ips) == 0 for ips in ip_sets):
return None

# All probes resolved — check if they share at least one common IP
common_ips = ip_sets[0]
for ips in ip_sets[1:]:
common_ips = common_ips & ips

if common_ips:
return common_ips

return None


async def get_registered_domains(
domains_to_check: set[Domain], resolution_concurrency: int
domains_to_check: set[Domain],
resolution_concurrency: int,
apex_domain: Optional[str] = None,
) -> set[Domain]:
"""Check which domains from a set are registered and resolve to IP addresses.

When an apex_domain is provided, performs wildcard detection first and filters
out domains that resolve solely to wildcard IP addresses.

Args:
domains_to_check: Set of Domain objects to check for registration
resolution_concurrency: Maximum number of concurrent DNS resolutions
apex_domain: Optional apex domain for wildcard detection

Returns:
Set of Domain objects that are registered and resolve successfully
Set of Domain objects that are registered and resolve successfully,
excluding wildcard false positives
"""

semaphore = asyncio.Semaphore(resolution_concurrency)
resolver = aiodns.DNSResolver(
nameservers=NAME_SERVERS, timeout=TIMEOUT, tries=TRIES
)

# Detect wildcard DNS before resolving predictions
wildcard_ips = None
if apex_domain:
wildcard_ips = await detect_wildcard(apex_domain, resolver)

if wildcard_ips is None:
# No wildcard — use the original fast path (just check if registered)
domains_list = list(domains_to_check)
tasks = [dom.is_registered(resolver, semaphore) for dom in domains_to_check]
results = await asyncio.gather(*tasks)
return {dom for dom, is_reg in zip(domains_list, results) if is_reg}

# Wildcard detected — resolve each domain and keep only those with
# at least one IP outside the wildcard set
async def _resolves_non_wildcard(domain: Domain) -> bool:
async with semaphore:
ips = await _resolve_ips(str(domain), resolver)
if not ips:
return False
# Keep the domain if it has any IP not in the wildcard set
return not ips.issubset(wildcard_ips)

domains_list = list(domains_to_check)
tasks = [dom.is_registered(resolver, semaphore) for dom in domains_to_check]
tasks = [_resolves_non_wildcard(dom) for dom in domains_list]
results = await asyncio.gather(*tasks)

return {dom for dom, is_reg in zip(domains_list, results) if is_reg}
return {dom for dom, keep in zip(domains_list, results) if keep}
35 changes: 32 additions & 3 deletions tests/test_resolve.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""Tests for DNS resolution functionality.

This module contains tests that verify the DNS resolution and domain
registration checking works correctly for various domain inputs.
registration checking works correctly for various domain inputs,
including wildcard DNS detection and filtering.
"""

import asyncio

from subwiz.resolve import get_registered_domains
import aiodns

from subwiz.resolve import detect_wildcard, get_registered_domains, NAME_SERVERS, TIMEOUT, TRIES
from subwiz.type import Domain


def test_():
def test_registered_domains():
"""Test that DNS resolution correctly identifies registered domains.

Verifies that the get_registered_domains function can distinguish between
Expand All @@ -23,3 +26,29 @@ def test_():
get_registered_domains(input_domains, resolution_concurrency=10)
)
assert registered_domains == {Domain("api.hadrian.io"), Domain("app.hadrian.io")}


def test_wildcard_detection_non_wildcard():
"""Test that wildcard detection returns None for non-wildcard domains."""

async def _check():
resolver = aiodns.DNSResolver(
nameservers=NAME_SERVERS, timeout=TIMEOUT, tries=TRIES
)
return await detect_wildcard("hadrian.io", resolver)

# hadrian.io does not have a wildcard record
result = asyncio.run(_check())
assert result is None


def test_registered_domains_with_apex():
"""Test that passing apex_domain still returns correct results for non-wildcard domains."""
domain_strings = {"api.hadrian.io", "app.hadrian.io", "random_string.hadrian.io"}
input_domains = {Domain(dom) for dom in domain_strings}
registered_domains = asyncio.run(
get_registered_domains(
input_domains, resolution_concurrency=10, apex_domain="hadrian.io"
)
)
assert registered_domains == {Domain("api.hadrian.io"), Domain("app.hadrian.io")}