diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index 28390133..cca92c40 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -84,6 +84,16 @@ def resolve_bootnode(bootnode: str) -> str: enr = ENR.from_string(bootnode) + # Verify structural validity (correct scheme, public key present). + if not enr.is_valid(): + raise ValueError(f"ENR structurally invalid: {enr}") + + # Cryptographically verify signature to ensure authenticity. + # + # This prevents attackers from forging ENRs to redirect connections. + if not enr.verify_signature(): + raise ValueError(f"ENR signature verification failed: {enr}") + # ENR.multiaddr() returns None when the record lacks IP or TCP port. # # This happens with discovery-only ENRs that only contain UDP info. diff --git a/src/lean_spec/subspecs/networking/enr/enr.py b/src/lean_spec/subspecs/networking/enr/enr.py index 8d1b07a8..eac385da 100644 --- a/src/lean_spec/subspecs/networking/enr/enr.py +++ b/src/lean_spec/subspecs/networking/enr/enr.py @@ -57,11 +57,18 @@ from typing_extensions import Self from lean_spec.subspecs.networking.types import Multiaddr, NodeId, SeqNumber -from lean_spec.types import Bytes33, Bytes64, RLPDecodingError, StrictBaseModel, Uint64 -from lean_spec.types.rlp import decode_list as rlp_decode_list +from lean_spec.types import ( + Bytes32, + Bytes33, + Bytes64, + StrictBaseModel, + Uint64, + rlp, +) +from lean_spec.types.byte_arrays import Bytes4 from . import keys -from .eth2 import AttestationSubnets, Eth2Data +from .eth2 import AttestationSubnets, Eth2Data, SyncCommitteeSubnets from .keys import EnrKey ENR_PREFIX = "enr:" @@ -69,24 +76,7 @@ class ENR(StrictBaseModel): - r""" - Ethereum Node Record (EIP-778). - - Example from EIP-778 (IPv4 127.0.0.1, UDP 30303):: - - enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04j... - - Which decodes to RLP:: - - [ - 7098ad865b00a582..., # signature (64 bytes) - 01, # seq = 1 - "id", "v4", - "ip", 7f000001, # 127.0.0.1 - "secp256k1", 03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138, - "udp", 765f, # 30303 - ] - """ + """Ethereum Node Record (EIP-778).""" MAX_SIZE: ClassVar[int] = 300 """Maximum RLP-encoded size in bytes (EIP-778).""" @@ -152,6 +142,18 @@ def udp_port(self) -> int | None: port = self.get(keys.UDP) return int.from_bytes(port, "big") if port else None + @property + def tcp6_port(self) -> int | None: + """IPv6-specific TCP port. Falls back to tcp_port if not set.""" + port = self.get(keys.TCP6) + return int.from_bytes(port, "big") if port else None + + @property + def udp6_port(self) -> int | None: + """IPv6-specific UDP port. Falls back to udp_port if not set.""" + port = self.get(keys.UDP6) + return int.from_bytes(port, "big") if port else None + def multiaddr(self) -> Multiaddr | None: """Construct multiaddress from endpoint info.""" if self.ip4 and self.tcp_port: @@ -160,18 +162,11 @@ def multiaddr(self) -> Multiaddr | None: return f"/ip6/{self.ip6}/tcp/{self.tcp_port}" return None - # ========================================================================= - # Ethereum Consensus Extensions - # ========================================================================= - @property def eth2_data(self) -> Eth2Data | None: """Parse eth2 key: fork_digest(4) + next_fork_version(4) + next_fork_epoch(8).""" eth2_bytes = self.get(keys.ETH2) if eth2_bytes and len(eth2_bytes) >= 16: - from lean_spec.types import Uint64 - from lean_spec.types.byte_arrays import Bytes4 - return Eth2Data( fork_digest=Bytes4(eth2_bytes[0:4]), next_fork_version=Bytes4(eth2_bytes[4:8]), @@ -185,9 +180,13 @@ def attestation_subnets(self) -> AttestationSubnets | None: attnets = self.get(keys.ATTNETS) return AttestationSubnets.decode_bytes(attnets) if attnets and len(attnets) == 8 else None - # ========================================================================= - # Validation - # ========================================================================= + @property + def sync_committee_subnets(self) -> SyncCommitteeSubnets | None: + """Parse syncnets key (SSZ Bitvector[4]).""" + syncnets = self.get(keys.SYNCNETS) + if syncnets and len(syncnets) == 1: + return SyncCommitteeSubnets.decode_bytes(syncnets) + return None def is_valid(self) -> bool: """ @@ -207,9 +206,128 @@ def is_compatible_with(self, other: "ENR") -> bool: return False return self_eth2.fork_digest == other_eth2.fork_digest - # ========================================================================= - # Display - # ========================================================================= + def _build_content_items(self) -> list[bytes]: + """ + Build the list of content items for RLP encoding. + + Returns [seq, k1, v1, k2, v2, ...] with keys sorted lexicographically. + """ + sorted_keys = sorted(self.pairs.keys()) + + # Sequence number: minimal big-endian, empty bytes for zero. + seq_bytes = self.seq.to_bytes(8, "big").lstrip(b"\x00") or b"" + items: list[bytes] = [seq_bytes] + + for key in sorted_keys: + items.append(key.encode("utf-8")) + items.append(self.pairs[key]) + + return items + + def _content_rlp(self) -> bytes: + """ + Get RLP-encoded content for signing (excludes signature). + + Returns the RLP encoding of [seq, k1, v1, k2, v2, ...]. + """ + return rlp.encode_rlp(self._build_content_items()) + + def verify_signature(self) -> bool: + """ + Cryptographically verify the ENR signature. + + Per EIP-778 "v4" identity scheme: + + 1. Compute keccak256 hash of content RLP (seq + sorted key/value pairs) + 2. Verify the 64-byte secp256k1 signature against the public key + + Returns True if signature is valid, False otherwise. + """ + from Crypto.Hash import keccak + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.asymmetric.utils import ( + Prehashed, + encode_dss_signature, + ) + + if self.public_key is None: + return False + + try: + # Hash the content (excludes signature). + content = self._content_rlp() + k = keccak.new(digest_bits=256) + k.update(content) + digest = k.digest() + + # Load the compressed public key. + public_key = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256K1(), bytes(self.public_key) + ) + + # Convert r||s (64 bytes) to DER-encoded signature. + r = int.from_bytes(self.signature[:32], "big") + s = int.from_bytes(self.signature[32:], "big") + der_signature = encode_dss_signature(r, s) + + # Verify signature against pre-hashed digest. + # SHA256 is used as the algorithm marker since it has the same 32-byte digest size. + public_key.verify(der_signature, digest, ec.ECDSA(Prehashed(hashes.SHA256()))) + return True + except Exception: + return False + + def compute_node_id(self) -> NodeId | None: + """ + Compute the node ID from the public key. + + Per EIP-778 "v4" identity scheme: keccak256(uncompressed_pubkey). + The hash is computed over the 64-byte x||y coordinates. + """ + from Crypto.Hash import keccak + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ec + + if self.public_key is None: + return None + + try: + # Uncompress public key to 65 bytes (0x04 || x || y). + public_key = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256K1(), self.public_key + ) + uncompressed = public_key.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint, + ) + + # Hash the 64-byte x||y (excluding 0x04 prefix). + k = keccak.new(digest_bits=256) + k.update(uncompressed[1:]) + return Bytes32(k.digest()) + except Exception: + return None + + def to_rlp(self) -> bytes: + """ + Serialize to RLP bytes. + + Format: [signature, seq, k1, v1, k2, v2, ...] + Keys are sorted lexicographically per EIP-778. + """ + items = [bytes(self.signature)] + self._build_content_items() + return rlp.encode_rlp(items) + + def to_string(self) -> str: + """ + Serialize to text representation. + + Format: "enr:" + base64url(RLP) without padding. + """ + rlp_bytes = self.to_rlp() + b64_content = base64.urlsafe_b64encode(rlp_bytes).decode("utf-8").rstrip("=") + return ENR_PREFIX + b64_content def __str__(self) -> str: """Human-readable summary.""" @@ -260,10 +378,14 @@ def from_string(cls, enr_text: str) -> Self: # RLP decode: [signature, seq, k1, v1, k2, v2, ...] try: - items = rlp_decode_list(rlp_data) - except RLPDecodingError as e: + items = rlp.decode_rlp_list(rlp_data) + except rlp.RLPDecodingError as e: raise ValueError(f"Invalid RLP encoding: {e}") from e + # EIP-778 requires ENRs to be at most 300 bytes. + if len(rlp_data) > cls.MAX_SIZE: + raise ValueError(f"ENR exceeds max size: {len(rlp_data)} > {cls.MAX_SIZE}") + if len(items) < 2: raise ValueError("ENR must have at least signature and seq") @@ -281,14 +403,29 @@ def from_string(cls, enr_text: str) -> Self: # Parse key/value pairs. # # Keys are strings, values are arbitrary bytes. + # EIP-778 requires keys to be lexicographically sorted. pairs: dict[str, bytes] = {} + prev_key: str | None = None for i in range(2, len(items), 2): key = items[i].decode("utf-8") + if prev_key is not None and key <= prev_key: + raise ValueError( + f"ENR keys must be lexicographically sorted per EIP-778: " + f"'{key}' follows '{prev_key}'" + ) value = items[i + 1] pairs[key] = value + prev_key = key - return cls( + enr = cls( signature=signature, seq=Uint64(seq), pairs=pairs, ) + + # Compute and store node_id for routing/identification. + node_id = enr.compute_node_id() + if node_id is not None: + return enr.model_copy(update={"node_id": node_id}) + + return enr diff --git a/src/lean_spec/types/__init__.py b/src/lean_spec/types/__init__.py index 2a5ecf1d..cc872071 100644 --- a/src/lean_spec/types/__init__.py +++ b/src/lean_spec/types/__init__.py @@ -13,9 +13,7 @@ SSZTypeError, SSZValueError, ) -from .rlp import RLPDecodingError, RLPItem -from .rlp import decode as rlp_decode -from .rlp import encode as rlp_encode +from .rlp import RLPDecodingError, RLPItem, decode_rlp, decode_rlp_list, encode_rlp from .ssz_base import SSZType from .uint import Uint64 @@ -40,8 +38,9 @@ "Boolean", "Container", # RLP encoding/decoding - "rlp_encode", - "rlp_decode", + "encode_rlp", + "decode_rlp", + "decode_rlp_list", "RLPItem", "RLPDecodingError", # Exceptions diff --git a/src/lean_spec/types/rlp.py b/src/lean_spec/types/rlp.py index b278cd15..dc557d54 100644 --- a/src/lean_spec/types/rlp.py +++ b/src/lean_spec/types/rlp.py @@ -72,7 +72,7 @@ """Base for long list prefix. Final prefix = 0xf7 + length_of_length.""" -def encode(item: RLPItem) -> bytes: +def encode_rlp(item: RLPItem) -> bytes: """ Encode an item using RLP. @@ -124,7 +124,7 @@ def _encode_list(items: list[RLPItem]) -> bytes: Long lists (>55 bytes payload) use prefix 0xf7 + length-of-length, then length. """ # Recursively encode all items. - payload = b"".join(encode(item) for item in items) + payload = b"".join(encode_rlp(item) for item in items) length = len(payload) # Short list: 0-55 bytes payload. @@ -153,7 +153,7 @@ class RLPDecodingError(Exception): """Error during RLP decoding.""" -def decode(data: bytes) -> RLPItem: +def decode_rlp(data: bytes) -> RLPItem: """ Decode RLP-encoded bytes. @@ -177,7 +177,7 @@ def decode(data: bytes) -> RLPItem: return item -def decode_list(data: bytes) -> list[bytes]: +def decode_rlp_list(data: bytes) -> list[bytes]: """ Decode RLP data as a flat list of byte items. @@ -193,7 +193,7 @@ def decode_list(data: bytes) -> list[bytes]: Raises: RLPDecodingError: If data is not a list or contains nested lists. """ - item = decode(data) + item = decode_rlp(data) if not isinstance(item, list): raise RLPDecodingError("Expected RLP list") diff --git a/tests/lean_spec/subspecs/networking/enr/test_enr.py b/tests/lean_spec/subspecs/networking/enr/test_enr.py index 51d6d9cd..7b0bbd82 100644 --- a/tests/lean_spec/subspecs/networking/enr/test_enr.py +++ b/tests/lean_spec/subspecs/networking/enr/test_enr.py @@ -29,7 +29,7 @@ # 03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138 OFFICIAL_ENR_STRING = ( "enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjz" - "CBOOnrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQ" + "CBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQ" "PKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8" ) @@ -43,7 +43,7 @@ ) OFFICIAL_SIGNATURE = bytes.fromhex( "7098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b" - "76f2635f4e234738f308138e9eb9137e3e3df5266e3a1f11df72ecf1145ccb9c" + "76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c" ) @@ -203,10 +203,10 @@ def test_minimum_fields_required(self) -> None: # Create RLP for just signature (missing seq) import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp # RLP list with only signature - rlp_data = encode([b"\x00" * 64]) + rlp_data = encode_rlp([b"\x00" * 64]) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") with pytest.raises(ValueError, match=r"at least signature and seq"): @@ -216,10 +216,10 @@ def test_odd_number_of_kv_pairs_rejected(self) -> None: """ENR key/value pairs must be even count.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp # [signature, seq, key1] - odd number after signature/seq - rlp_data = encode([b"\x00" * 64, b"\x01", b"id"]) + rlp_data = encode_rlp([b"\x00" * 64, b"\x01", b"id"]) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") with pytest.raises(ValueError, match=r"key/value pairs must be even"): @@ -249,10 +249,10 @@ def test_valid_minimal_enr(self) -> None: """Minimal valid ENR with only required fields parses.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp # [signature(64), seq(1), "id", "v4", "secp256k1", pubkey(33)] - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, # signature b"\x01", # seq = 1 @@ -686,9 +686,9 @@ def test_enr_with_only_required_fields(self) -> None: """ENR with minimum required fields is valid.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, # signature b"\x01", # seq @@ -710,10 +710,10 @@ def test_enr_with_ipv6_only(self) -> None: """ENR with IPv6 but no IPv4 parses correctly.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp ipv6_bytes = bytes.fromhex("20010db8000000000000000000000001") # 2001:db8::1 - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, b"\x01", @@ -742,9 +742,9 @@ def test_enr_with_both_tcp_and_udp(self) -> None: """ENR with both TCP and UDP ports parses correctly.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, b"\x01", @@ -771,9 +771,9 @@ def test_sequence_number_zero(self) -> None: """ENR with sequence number 0 is valid.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, b"", # Empty bytes = 0 @@ -792,10 +792,10 @@ def test_large_sequence_number(self) -> None: """ENR with large sequence number parses correctly.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp large_seq = (2**32).to_bytes(5, "big") - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, large_seq, @@ -825,3 +825,654 @@ def test_scheme_constant(self) -> None: def test_prefix_constant(self) -> None: """ENR_PREFIX is 'enr:' for text encoding.""" assert ENR_PREFIX == "enr:" + + +class TestEth2DataProperty: + """Tests for eth2_data property parsing.""" + + def test_eth2_data_parses_from_enr(self) -> None: + """eth2_data property parses 16-byte eth2 key.""" + from lean_spec.types.byte_arrays import Bytes4 + + # 4 bytes fork_digest + 4 bytes next_fork_version + 8 bytes next_fork_epoch + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x01" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + + eth2 = enr.eth2_data + assert eth2 is not None + assert eth2.fork_digest == Bytes4(b"\x12\x34\x56\x78") + assert eth2.next_fork_version == Bytes4(b"\x02\x00\x00\x00") + # Epoch is little-endian + assert eth2.next_fork_epoch == Uint64(1 << 56) + + def test_eth2_data_returns_none_when_missing(self) -> None: + """eth2_data returns None when eth2 key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.eth2_data is None + + def test_eth2_data_returns_none_for_short_data(self) -> None: + """eth2_data returns None when eth2 key is too short.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: b"\x12\x34\x56\x78"}, # Only 4 bytes + ) + assert enr.eth2_data is None + + +class TestAttestationSubnetsProperty: + """Tests for attestation_subnets property parsing.""" + + def test_attestation_subnets_parses_from_enr(self) -> None: + """attestation_subnets property parses 8-byte attnets key.""" + # All bits set (64 bits = 8 bytes of 0xFF) + attnets_bytes = b"\xff" * 8 + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ATTNETS: attnets_bytes}, + ) + + attnets = enr.attestation_subnets + assert attnets is not None + assert attnets.subscription_count() == 64 + + def test_attestation_subnets_returns_none_when_missing(self) -> None: + """attestation_subnets returns None when attnets key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.attestation_subnets is None + + def test_attestation_subnets_returns_none_for_wrong_length(self) -> None: + """attestation_subnets returns None when attnets key is not 8 bytes.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ATTNETS: b"\xff\xff\xff\xff"}, # Only 4 bytes + ) + assert enr.attestation_subnets is None + + +class TestSyncCommitteeSubnetsProperty: + """Tests for sync_committee_subnets property parsing.""" + + def test_sync_committee_subnets_parses_from_enr(self) -> None: + """sync_committee_subnets property parses 1-byte syncnets key.""" + # All 4 bits set (lower nibble of 0x0F) + syncnets_bytes = b"\x0f" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SYNCNETS: syncnets_bytes}, + ) + + syncnets = enr.sync_committee_subnets + assert syncnets is not None + for i in range(4): + assert syncnets.is_subscribed(i) + + def test_sync_committee_subnets_returns_none_when_missing(self) -> None: + """sync_committee_subnets returns None when syncnets key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.sync_committee_subnets is None + + def test_sync_committee_subnets_returns_none_for_wrong_length(self) -> None: + """sync_committee_subnets returns None when syncnets key is not 1 byte.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SYNCNETS: b"\x0f\x00"}, # 2 bytes + ) + assert enr.sync_committee_subnets is None + + +class TestForkCompatibility: + """Tests for is_compatible_with() method.""" + + def test_compatible_with_same_fork_digest(self) -> None: + """ENRs with same fork digest are compatible.""" + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + + assert enr1.is_compatible_with(enr2) + + def test_incompatible_with_different_fork_digest(self) -> None: + """ENRs with different fork digests are incompatible.""" + eth2_bytes1 = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + eth2_bytes2 = b"\xab\xcd\xef\x01" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes1}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes2}, + ) + + assert not enr1.is_compatible_with(enr2) + + def test_incompatible_when_self_missing_eth2(self) -> None: + """ENR is incompatible when self lacks eth2 key.""" + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, # No eth2 + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + + assert not enr1.is_compatible_with(enr2) + + def test_incompatible_when_other_missing_eth2(self) -> None: + """ENR is incompatible when other lacks eth2 key.""" + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4"}, # No eth2 + ) + + assert not enr1.is_compatible_with(enr2) + + def test_incompatible_when_both_missing_eth2(self) -> None: + """ENRs are incompatible when both lack eth2 key.""" + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4"}, + ) + + assert not enr1.is_compatible_with(enr2) + + +class TestMaxSizeEnforcement: + """Tests for MAX_SIZE (300 bytes) enforcement.""" + + def test_enr_exactly_300_bytes_succeeds(self) -> None: + """ENR with exactly 300 bytes RLP parses successfully.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Build an ENR that is exactly 300 bytes + # Start with minimal structure and add padding in a value + signature = b"\x00" * 64 + seq = b"\x01" + # Calculate how much padding we need in value + # RLP overhead: ~4 bytes header + items + # We need to carefully construct this + + # Start with basic structure and measure + basic = encode_rlp([signature, seq, b"id", b"v4", b"secp256k1", b"\x02" + b"\x00" * 32]) + padding_needed = 300 - len(basic) + + # Add padding via a custom key with enough value bytes + # The key "z" + value needs to fit in remaining space + # Account for RLP overhead (key length byte + value length bytes) + if padding_needed > 3: + value_len = padding_needed - 3 # Approximate, may need adjustment + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + # Adjust if needed + while len(padded) < 300: + value_len += 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + while len(padded) > 300: + value_len -= 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + + assert len(padded) == 300 + b64 = base64.urlsafe_b64encode(padded).decode().rstrip("=") + enr = ENR.from_string(f"enr:{b64}") + assert enr is not None + + def test_enr_301_bytes_rejected(self) -> None: + """ENR with 301 bytes RLP is rejected.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Build an ENR that is exactly 301 bytes + signature = b"\x00" * 64 + seq = b"\x01" + + basic = encode_rlp([signature, seq, b"id", b"v4", b"secp256k1", b"\x02" + b"\x00" * 32]) + padding_needed = 301 - len(basic) + + if padding_needed > 3: + value_len = padding_needed - 3 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + while len(padded) < 301: + value_len += 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + while len(padded) > 301: + value_len -= 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + + assert len(padded) == 301 + b64 = base64.urlsafe_b64encode(padded).decode().rstrip("=") + + with pytest.raises(ValueError, match="exceeds max size"): + ENR.from_string(f"enr:{b64}") + + +class TestKeyOrderingEnforcement: + """Tests for lexicographic key ordering enforcement.""" + + def test_sorted_keys_accepted(self) -> None: + """ENR with lexicographically sorted keys parses successfully.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Keys in sorted order: id, ip, secp256k1 + rlp = encode_rlp( + [ + b"\x00" * 64, # signature + b"\x01", # seq + b"id", + b"v4", + b"ip", + b"\x7f\x00\x00\x01", + b"secp256k1", + b"\x02" + b"\x00" * 32, + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + enr = ENR.from_string(f"enr:{b64}") + assert enr is not None + + def test_unsorted_keys_rejected(self) -> None: + """ENR with unsorted keys is rejected.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Keys out of order: secp256k1 before id + rlp = encode_rlp( + [ + b"\x00" * 64, # signature + b"\x01", # seq + b"secp256k1", # Should be after "id" + b"\x02" + b"\x00" * 32, + b"id", + b"v4", + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + + with pytest.raises(ValueError, match="lexicographically sorted"): + ENR.from_string(f"enr:{b64}") + + def test_duplicate_keys_rejected(self) -> None: + """ENR with duplicate keys is rejected.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Duplicate "id" key + rlp = encode_rlp( + [ + b"\x00" * 64, # signature + b"\x01", # seq + b"id", + b"v4", + b"id", # Duplicate! + b"v5", + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + + with pytest.raises(ValueError, match="lexicographically sorted"): + ENR.from_string(f"enr:{b64}") + + +class TestRoundTripSerialization: + """Tests for ENR round-trip serialization.""" + + def test_roundtrip_official_enr(self) -> None: + """Official ENR round-trips through parse and serialize.""" + enr1 = ENR.from_string(OFFICIAL_ENR_STRING) + serialized = enr1.to_string() + enr2 = ENR.from_string(serialized) + + assert enr1.seq == enr2.seq + assert enr1.signature == enr2.signature + assert enr1.pairs == enr2.pairs + + def test_roundtrip_preserves_all_fields(self) -> None: + """Round-trip preserves all ENR fields.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + rlp = encode_rlp( + [ + b"\xab" * 64, # signature + b"\x42", # seq = 66 + b"eth2", + b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8, + b"id", + b"v4", + b"ip", + b"\xc0\xa8\x01\x01", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"tcp", + (9000).to_bytes(2, "big"), + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + + enr1 = ENR.from_string(f"enr:{b64}") + enr2 = ENR.from_string(enr1.to_string()) + + assert enr1.seq == enr2.seq == Uint64(0x42) + assert enr1.ip4 == enr2.ip4 == "192.168.1.1" + assert enr1.tcp_port == enr2.tcp_port == 9000 + assert enr1.identity_scheme == enr2.identity_scheme == "v4" + + def test_to_string_produces_valid_enr_format(self) -> None: + """to_string() produces valid 'enr:' prefixed string.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + result = enr.to_string() + + assert result.startswith("enr:") + # Should not have padding + assert "=" not in result + + +class TestSignatureVerification: + """Tests for verify_signature() method.""" + + def test_official_enr_signature_verifies(self) -> None: + """Official EIP-778 test vector signature verifies correctly.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.verify_signature() + + def test_self_signed_enr_verifies(self) -> None: + """ENR signed with cryptography library verifies correctly.""" + from Crypto.Hash import keccak + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.asymmetric.utils import ( + Prehashed, + decode_dss_signature, + ) + + from lean_spec.types.rlp import encode_rlp + + # Generate a test keypair using cryptography library. + private_key = ec.generate_private_key(ec.SECP256K1()) + public_key = private_key.public_key() + compressed_pubkey = public_key.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.CompressedPoint, + ) + + # Create content (keys must be sorted). + content_items: list[bytes] = [ + b"\x01", + b"id", + b"v4", + b"secp256k1", + compressed_pubkey, + ] + content_rlp = encode_rlp(content_items) + + # Hash content. + k = keccak.new(digest_bits=256) + k.update(content_rlp) + digest = k.digest() + + # Sign with ECDSA using Prehashed mode. + signature_der = private_key.sign(digest, ec.ECDSA(Prehashed(hashes.SHA256()))) + + # Convert DER signature to r || s format. + r, s = decode_dss_signature(signature_der) + sig_64 = r.to_bytes(32, "big") + s.to_bytes(32, "big") + + # Create ENR. + enr = ENR( + signature=Bytes64(sig_64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: compressed_pubkey}, + ) + + assert enr.verify_signature() + + def test_tampered_signature_fails_verification(self) -> None: + """ENR with tampered signature fails verification.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + + # Tamper with signature + tampered_sig = bytes([enr.signature[0] ^ 0xFF]) + bytes(enr.signature[1:]) + tampered_enr = ENR( + signature=Bytes64(tampered_sig), + seq=enr.seq, + pairs=enr.pairs, + ) + + assert not tampered_enr.verify_signature() + + def test_tampered_content_fails_verification(self) -> None: + """ENR with tampered content fails verification.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + + # Create ENR with different sequence number (content mismatch) + tampered_enr = ENR( + signature=enr.signature, + seq=Uint64(int(enr.seq) + 1), # Different sequence + pairs=enr.pairs, + ) + + assert not tampered_enr.verify_signature() + + def test_missing_public_key_fails_verification(self) -> None: + """ENR without public key fails verification.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, # No secp256k1 key + ) + + assert not enr.verify_signature() + + +class TestNodeIdComputation: + """Tests for compute_node_id() method.""" + + def test_official_enr_node_id(self) -> None: + """compute_node_id() returns correct node ID for official ENR.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + node_id = enr.compute_node_id() + + assert node_id is not None + assert node_id.hex() == OFFICIAL_NODE_ID + + def test_node_id_none_without_public_key(self) -> None: + """compute_node_id() returns None when public key is missing.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + + assert enr.compute_node_id() is None + + +class TestIPv6Ports: + """Tests for tcp6_port and udp6_port properties.""" + + def test_tcp6_port_extracts_correctly(self) -> None: + """tcp6_port extracts IPv6-specific TCP port.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.TCP6: (9001).to_bytes(2, "big"), + }, + ) + assert enr.tcp6_port == 9001 + + def test_tcp6_port_returns_none_when_missing(self) -> None: + """tcp6_port returns None when tcp6 key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.tcp6_port is None + + def test_udp6_port_extracts_correctly(self) -> None: + """udp6_port extracts IPv6-specific UDP port.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.UDP6: (30304).to_bytes(2, "big"), + }, + ) + assert enr.udp6_port == 30304 + + def test_udp6_port_returns_none_when_missing(self) -> None: + """udp6_port returns None when udp6 key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.udp6_port is None + + def test_ipv6_ports_independent_of_ipv4(self) -> None: + """IPv6 ports are independent from IPv4 ports.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.TCP: (9000).to_bytes(2, "big"), + keys.TCP6: (9001).to_bytes(2, "big"), + keys.UDP: (30303).to_bytes(2, "big"), + keys.UDP6: (30304).to_bytes(2, "big"), + }, + ) + assert enr.tcp_port == 9000 + assert enr.tcp6_port == 9001 + assert enr.udp_port == 30303 + assert enr.udp6_port == 30304 diff --git a/tests/lean_spec/subspecs/networking/enr/test_eth2.py b/tests/lean_spec/subspecs/networking/enr/test_eth2.py new file mode 100644 index 00000000..8f200fc1 --- /dev/null +++ b/tests/lean_spec/subspecs/networking/enr/test_eth2.py @@ -0,0 +1,130 @@ +"""Tests for Ethereum 2.0 ENR types (Eth2Data, AttestationSubnets, SyncCommitteeSubnets).""" + +import pytest +from pydantic import ValidationError + +from lean_spec.subspecs.networking.enr import Eth2Data +from lean_spec.subspecs.networking.enr.eth2 import AttestationSubnets, SyncCommitteeSubnets +from lean_spec.types import Uint64 +from lean_spec.types.byte_arrays import Bytes4 + + +class TestEth2Data: + """Tests for Eth2Data structure.""" + + def test_create_eth2_data(self) -> None: + """Eth2Data can be created with valid parameters.""" + data = Eth2Data( + fork_digest=Bytes4(b"\x12\x34\x56\x78"), + next_fork_version=Bytes4(b"\x02\x00\x00\x00"), + next_fork_epoch=Uint64(194048), + ) + assert data.fork_digest == Bytes4(b"\x12\x34\x56\x78") + assert data.next_fork_epoch == Uint64(194048) + + def test_no_scheduled_fork_factory(self) -> None: + """no_scheduled_fork factory creates correct data.""" + digest = Bytes4(b"\xab\xcd\xef\x01") + data = Eth2Data.no_scheduled_fork(digest) + + assert data.fork_digest == digest + assert data.next_fork_version == digest + assert data.next_fork_epoch == Uint64(2**64 - 1) + + def test_eth2_data_immutable(self) -> None: + """Eth2Data is immutable (frozen).""" + data = Eth2Data( + fork_digest=Bytes4(b"\x12\x34\x56\x78"), + next_fork_version=Bytes4(b"\x02\x00\x00\x00"), + next_fork_epoch=Uint64(0), + ) + with pytest.raises(ValidationError): + data.fork_digest = Bytes4(b"\x00\x00\x00\x00") + + +class TestAttestationSubnets: + """Tests for AttestationSubnets bitvector.""" + + def test_empty_subscriptions(self) -> None: + """none() creates empty subscriptions.""" + subnets = AttestationSubnets.none() + assert subnets.subscription_count() == 0 + assert subnets.subscribed_subnets() == [] + + def test_all_subscriptions(self) -> None: + """all() creates full subscriptions.""" + subnets = AttestationSubnets.all() + assert subnets.subscription_count() == 64 + assert len(subnets.subscribed_subnets()) == 64 + + def test_specific_subscriptions(self) -> None: + """from_subnet_ids() creates specific subscriptions.""" + subnets = AttestationSubnets.from_subnet_ids([0, 5, 63]) + + assert subnets.is_subscribed(0) + assert subnets.is_subscribed(5) + assert subnets.is_subscribed(63) + assert not subnets.is_subscribed(1) + assert not subnets.is_subscribed(62) + assert subnets.subscription_count() == 3 + + def test_subscribed_subnets_list(self) -> None: + """subscribed_subnets() returns correct list.""" + subnets = AttestationSubnets.from_subnet_ids([10, 20, 30]) + result = subnets.subscribed_subnets() + + assert result == [10, 20, 30] + + def test_invalid_subnet_id_in_from_subnet_ids(self) -> None: + """from_subnet_ids() raises for invalid subnet IDs.""" + with pytest.raises(ValueError): + AttestationSubnets.from_subnet_ids([64]) + + with pytest.raises(ValueError): + AttestationSubnets.from_subnet_ids([-1]) + + def test_invalid_subnet_id_in_is_subscribed(self) -> None: + """is_subscribed() raises for invalid subnet IDs.""" + subnets = AttestationSubnets.none() + + with pytest.raises(ValueError): + subnets.is_subscribed(64) + + with pytest.raises(ValueError): + subnets.is_subscribed(-1) + + +class TestSyncCommitteeSubnets: + """Tests for SyncCommitteeSubnets bitvector.""" + + def test_none_creates_empty_subscriptions(self) -> None: + """none() creates empty subscriptions.""" + subnets = SyncCommitteeSubnets.none() + for i in range(4): + assert not subnets.is_subscribed(i) + + def test_all_creates_full_subscriptions(self) -> None: + """all() creates full subscriptions.""" + subnets = SyncCommitteeSubnets.all() + for i in range(4): + assert subnets.is_subscribed(i) + + def test_is_subscribed_with_valid_ids(self) -> None: + """is_subscribed() works for valid subnet IDs 0-3.""" + subnets = SyncCommitteeSubnets.all() + assert subnets.is_subscribed(0) + assert subnets.is_subscribed(1) + assert subnets.is_subscribed(2) + assert subnets.is_subscribed(3) + + def test_is_subscribed_raises_for_invalid_high_id(self) -> None: + """is_subscribed() raises for subnet ID >= 4.""" + subnets = SyncCommitteeSubnets.none() + with pytest.raises(ValueError, match="must be 0-3"): + subnets.is_subscribed(4) + + def test_is_subscribed_raises_for_negative_id(self) -> None: + """is_subscribed() raises for negative subnet ID.""" + subnets = SyncCommitteeSubnets.none() + with pytest.raises(ValueError, match="must be 0-3"): + subnets.is_subscribed(-1) diff --git a/tests/lean_spec/subspecs/networking/enr/test_keys.py b/tests/lean_spec/subspecs/networking/enr/test_keys.py new file mode 100644 index 00000000..f46722bd --- /dev/null +++ b/tests/lean_spec/subspecs/networking/enr/test_keys.py @@ -0,0 +1,27 @@ +"""Tests for ENR key constants.""" + +from lean_spec.subspecs.networking.enr import keys + + +class TestEnrKeys: + """Tests for ENR key constants.""" + + def test_identity_keys(self) -> None: + """Identity keys have correct values.""" + assert keys.ID == "id" + assert keys.SECP256K1 == "secp256k1" + + def test_network_keys(self) -> None: + """Network keys have correct values.""" + assert keys.IP == "ip" + assert keys.IP6 == "ip6" + assert keys.TCP == "tcp" + assert keys.UDP == "udp" + assert keys.TCP6 == "tcp6" + assert keys.UDP6 == "udp6" + + def test_ethereum_keys(self) -> None: + """Ethereum-specific keys have correct values.""" + assert keys.ETH2 == "eth2" + assert keys.ATTNETS == "attnets" + assert keys.SYNCNETS == "syncnets" diff --git a/tests/lean_spec/subspecs/networking/test_enr.py b/tests/lean_spec/subspecs/networking/test_enr.py deleted file mode 100644 index f2a7fff3..00000000 --- a/tests/lean_spec/subspecs/networking/test_enr.py +++ /dev/null @@ -1,265 +0,0 @@ -"""Tests for Ethereum Node Record (ENR) specification.""" - -import pytest -from pydantic import ValidationError - -from lean_spec.subspecs.networking.enr import ENR, Eth2Data, keys -from lean_spec.subspecs.networking.enr.eth2 import AttestationSubnets -from lean_spec.types import Bytes64, Uint64 -from lean_spec.types.byte_arrays import Bytes4 - - -class TestEnrKeys: - """Tests for ENR key constants.""" - - def test_identity_keys(self) -> None: - """Identity keys have correct values.""" - assert keys.ID == "id" - assert keys.SECP256K1 == "secp256k1" - - def test_network_keys(self) -> None: - """Network keys have correct values.""" - assert keys.IP == "ip" - assert keys.IP6 == "ip6" - assert keys.TCP == "tcp" - assert keys.UDP == "udp" - - def test_ethereum_keys(self) -> None: - """Ethereum-specific keys have correct values.""" - assert keys.ETH2 == "eth2" - assert keys.ATTNETS == "attnets" - assert keys.SYNCNETS == "syncnets" - - -class TestEth2Data: - """Tests for Eth2Data structure.""" - - def test_create_eth2_data(self) -> None: - """Eth2Data can be created with valid parameters.""" - data = Eth2Data( - fork_digest=Bytes4(b"\x12\x34\x56\x78"), - next_fork_version=Bytes4(b"\x02\x00\x00\x00"), - next_fork_epoch=Uint64(194048), - ) - assert data.fork_digest == Bytes4(b"\x12\x34\x56\x78") - assert data.next_fork_epoch == Uint64(194048) - - def test_no_scheduled_fork_factory(self) -> None: - """no_scheduled_fork factory creates correct data.""" - digest = Bytes4(b"\xab\xcd\xef\x01") - data = Eth2Data.no_scheduled_fork(digest) - - assert data.fork_digest == digest - assert data.next_fork_version == digest - assert data.next_fork_epoch == Uint64(2**64 - 1) - - def test_eth2_data_immutable(self) -> None: - """Eth2Data is immutable (frozen).""" - data = Eth2Data( - fork_digest=Bytes4(b"\x12\x34\x56\x78"), - next_fork_version=Bytes4(b"\x02\x00\x00\x00"), - next_fork_epoch=Uint64(0), - ) - with pytest.raises(ValidationError): - data.fork_digest = Bytes4(b"\x00\x00\x00\x00") - - -class TestAttestationSubnets: - """Tests for AttestationSubnets bitvector.""" - - def test_empty_subscriptions(self) -> None: - """none() creates empty subscriptions.""" - subnets = AttestationSubnets.none() - assert subnets.subscription_count() == 0 - assert subnets.subscribed_subnets() == [] - - def test_all_subscriptions(self) -> None: - """all() creates full subscriptions.""" - subnets = AttestationSubnets.all() - assert subnets.subscription_count() == 64 - assert len(subnets.subscribed_subnets()) == 64 - - def test_specific_subscriptions(self) -> None: - """from_subnet_ids() creates specific subscriptions.""" - subnets = AttestationSubnets.from_subnet_ids([0, 5, 63]) - - assert subnets.is_subscribed(0) - assert subnets.is_subscribed(5) - assert subnets.is_subscribed(63) - assert not subnets.is_subscribed(1) - assert not subnets.is_subscribed(62) - assert subnets.subscription_count() == 3 - - def test_subscribed_subnets_list(self) -> None: - """subscribed_subnets() returns correct list.""" - subnets = AttestationSubnets.from_subnet_ids([10, 20, 30]) - result = subnets.subscribed_subnets() - - assert result == [10, 20, 30] - - def test_invalid_subnet_id_in_from_subnet_ids(self) -> None: - """from_subnet_ids() raises for invalid subnet IDs.""" - with pytest.raises(ValueError): - AttestationSubnets.from_subnet_ids([64]) - - with pytest.raises(ValueError): - AttestationSubnets.from_subnet_ids([-1]) - - def test_invalid_subnet_id_in_is_subscribed(self) -> None: - """is_subscribed() raises for invalid subnet IDs.""" - subnets = AttestationSubnets.none() - - with pytest.raises(ValueError): - subnets.is_subscribed(64) - - with pytest.raises(ValueError): - subnets.is_subscribed(-1) - - -class TestENR: - """Tests for ENR structure.""" - - def test_create_minimal_enr(self) -> None: - """ENR can be created with minimal valid data.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, # Compressed pubkey - }, - ) - assert enr.seq == Uint64(1) - assert enr.identity_scheme == "v4" - - def test_enr_ip4_property(self) -> None: - """ip4 property formats IPv4 address.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "ip": b"\xc0\xa8\x01\x01", # 192.168.1.1 - }, - ) - assert enr.ip4 == "192.168.1.1" - - def test_enr_tcp_port_property(self) -> None: - """tcp_port property extracts port number.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "tcp": (9000).to_bytes(2, "big"), - }, - ) - assert enr.tcp_port == 9000 - - def test_enr_multiaddr_construction(self) -> None: - """multiaddr() constructs valid multiaddress.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "ip": b"\xc0\xa8\x01\x01", - "tcp": (9000).to_bytes(2, "big"), - }, - ) - assert enr.multiaddr() == "/ip4/192.168.1.1/tcp/9000" - - def test_enr_has_key(self) -> None: - """has() correctly checks key presence.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - }, - ) - assert enr.has(keys.ID) - assert enr.has(keys.SECP256K1) - assert not enr.has(keys.IP) - assert not enr.has(keys.ETH2) - - def test_enr_get_key(self) -> None: - """get() retrieves values by key.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - }, - ) - assert enr.get(keys.ID) == b"v4" - assert enr.get(keys.IP) is None - - def test_enr_is_valid_basic(self) -> None: - """is_valid() checks basic structure.""" - valid_enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - }, - ) - assert valid_enr.is_valid() - - # Missing public key - invalid_enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - }, - ) - assert not invalid_enr.is_valid() - - def test_enr_compatibility(self) -> None: - """is_compatible_with() checks fork digest match.""" - eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 - - enr1 = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "eth2": eth2_bytes, - }, - ) - - enr2 = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(2), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "eth2": eth2_bytes, - }, - ) - - assert enr1.is_compatible_with(enr2) - - def test_enr_string_representation(self) -> None: - """ENR has readable string representation.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(42), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "ip": b"\xc0\xa8\x01\x01", - "tcp": (9000).to_bytes(2, "big"), - }, - ) - s = str(enr) - assert "seq=42" in s - assert "192.168.1.1" in s - assert "tcp=9000" in s diff --git a/tests/lean_spec/test_cli.py b/tests/lean_spec/test_cli.py index daf366df..0e47fddf 100644 --- a/tests/lean_spec/test_cli.py +++ b/tests/lean_spec/test_cli.py @@ -11,6 +11,10 @@ from unittest.mock import AsyncMock, patch import pytest +from Crypto.Hash import keccak +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import Prehashed, decode_dss_signature from lean_spec.__main__ import create_anchor_block, is_enr_string, resolve_bootnode from lean_spec.subspecs.containers import Block, BlockBody @@ -18,68 +22,88 @@ from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.types import Bytes32, Uint64 -from lean_spec.types.rlp import encode as rlp_encode +from lean_spec.types.rlp import encode_rlp from tests.lean_spec.helpers import make_genesis_state +# Generate a test keypair once for all ENR tests. +_TEST_PRIVATE_KEY = ec.generate_private_key(ec.SECP256K1()) +_TEST_PUBLIC_KEY = _TEST_PRIVATE_KEY.public_key() +_TEST_COMPRESSED_PUBKEY = _TEST_PUBLIC_KEY.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.CompressedPoint, +) + + +def _sign_enr_content(content_items: list[bytes]) -> bytes: + """Sign ENR content and return 64-byte r||s signature.""" + content_rlp = encode_rlp(content_items) + + k = keccak.new(digest_bits=256) + k.update(content_rlp) + digest = k.digest() + + signature_der = _TEST_PRIVATE_KEY.sign(digest, ec.ECDSA(Prehashed(hashes.SHA256()))) + r, s = decode_dss_signature(signature_der) + return r.to_bytes(32, "big") + s.to_bytes(32, "big") + -# Valid ENR with IPv4 and TCP port (derived from EIP-778 test vector structure) -# This ENR has: ip=192.168.1.1, tcp=9000 def _make_enr_with_tcp(ip_bytes: bytes, tcp_port: int) -> str: - """Create a minimal ENR string with IPv4 and TCP port.""" - rlp_data = rlp_encode( - [ - b"\x00" * 64, # signature - b"\x01", # seq = 1 - b"id", - b"v4", - b"ip", - ip_bytes, - b"secp256k1", - b"\x02" + b"\x00" * 32, # compressed pubkey - b"tcp", - tcp_port.to_bytes(2, "big"), - ] - ) + """Create a properly signed ENR string with IPv4 and TCP port.""" + # Content items (keys must be sorted). + content_items: list[bytes] = [ + b"\x01", # seq = 1 + b"id", + b"v4", + b"ip", + ip_bytes, + b"secp256k1", + _TEST_COMPRESSED_PUBKEY, + b"tcp", + tcp_port.to_bytes(2, "big"), + ] + signature = _sign_enr_content(content_items) + + rlp_data = encode_rlp([signature] + content_items) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") return f"enr:{b64_content}" def _make_enr_with_ipv6_tcp(ip6_bytes: bytes, tcp_port: int) -> str: - """Create a minimal ENR string with IPv6 and TCP port.""" - rlp_data = rlp_encode( - [ - b"\x00" * 64, # signature - b"\x01", # seq = 1 - b"id", - b"v4", - b"ip6", - ip6_bytes, - b"secp256k1", - b"\x02" + b"\x00" * 32, # compressed pubkey - b"tcp", - tcp_port.to_bytes(2, "big"), - ] - ) + """Create a properly signed ENR string with IPv6 and TCP port.""" + content_items: list[bytes] = [ + b"\x01", # seq = 1 + b"id", + b"v4", + b"ip6", + ip6_bytes, + b"secp256k1", + _TEST_COMPRESSED_PUBKEY, + b"tcp", + tcp_port.to_bytes(2, "big"), + ] + signature = _sign_enr_content(content_items) + + rlp_data = encode_rlp([signature] + content_items) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") return f"enr:{b64_content}" def _make_enr_without_tcp(ip_bytes: bytes) -> str: - """Create an ENR string with IPv4 but no TCP port (UDP only).""" - rlp_data = rlp_encode( - [ - b"\x00" * 64, # signature - b"\x01", # seq = 1 - b"id", - b"v4", - b"ip", - ip_bytes, - b"secp256k1", - b"\x02" + b"\x00" * 32, # compressed pubkey - b"udp", - (30303).to_bytes(2, "big"), # UDP only, no TCP - ] - ) + """Create a properly signed ENR string with IPv4 but no TCP port (UDP only).""" + content_items: list[bytes] = [ + b"\x01", # seq = 1 + b"id", + b"v4", + b"ip", + ip_bytes, + b"secp256k1", + _TEST_COMPRESSED_PUBKEY, + b"udp", + (30303).to_bytes(2, "big"), # UDP only, no TCP + ] + signature = _sign_enr_content(content_items) + + rlp_data = encode_rlp([signature] + content_items) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") return f"enr:{b64_content}" diff --git a/tests/lean_spec/types/test_rlp.py b/tests/lean_spec/types/test_rlp.py index 8a8d8f24..0d835811 100644 --- a/tests/lean_spec/types/test_rlp.py +++ b/tests/lean_spec/types/test_rlp.py @@ -14,9 +14,9 @@ SINGLE_BYTE_MAX, RLPDecodingError, RLPItem, - decode, - decode_list, - encode, + decode_rlp, + decode_rlp_list, + encode_rlp, ) # Derived constants for test assertions. @@ -30,7 +30,7 @@ class TestEncodeEmptyString: def test_encode_empty_string(self) -> None: """Empty string encodes to 0x80.""" - result = encode(b"") + result = encode_rlp(b"") assert result == bytes.fromhex("80") @@ -39,24 +39,24 @@ class TestEncodeSingleByte: def test_encode_byte_0x00(self) -> None: """Byte 0x00 encodes as itself.""" - result = encode(b"\x00") + result = encode_rlp(b"\x00") assert result == bytes.fromhex("00") def test_encode_byte_0x01(self) -> None: """Byte 0x01 encodes as itself.""" - result = encode(b"\x01") + result = encode_rlp(b"\x01") assert result == bytes.fromhex("01") def test_encode_byte_0x7f(self) -> None: """Maximum single-byte value (0x7f) encodes as itself.""" - result = encode(b"\x7f") + result = encode_rlp(b"\x7f") assert result == bytes.fromhex("7f") @pytest.mark.parametrize("byte_val", range(0x00, SINGLE_BYTE_MAX + 1)) def test_encode_all_single_byte_values(self, byte_val: int) -> None: """All single-byte values 0x00-0x7f encode as themselves.""" data = bytes([byte_val]) - result = encode(data) + result = encode_rlp(data) assert result == data @@ -65,14 +65,14 @@ class TestEncodeShortString: def test_encode_short_string_dog(self) -> None: """'dog' encodes with prefix 0x83 (0x80 + 3) followed by ASCII bytes.""" - result = encode(b"dog") + result = encode_rlp(b"dog") assert result == bytes.fromhex("83646f67") def test_encode_short_string_55_bytes(self) -> None: """55-byte string uses short string encoding (max for this category).""" data = b"Lorem ipsum dolor sit amet, consectetur adipisicing eli" assert len(data) == SHORT_STRING_MAX_LEN - result = encode(data) + result = encode_rlp(data) expected = bytes.fromhex( "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c69" @@ -81,7 +81,7 @@ def test_encode_short_string_55_bytes(self) -> None: def test_encode_single_byte_above_0x7f(self) -> None: """Single byte 0x80 uses short string encoding, not single-byte encoding.""" - result = encode(b"\x80") + result = encode_rlp(b"\x80") assert result == bytes([SHORT_STRING_PREFIX + 1, 0x80]) @pytest.mark.parametrize("length", [1, 10, 20, 30, 40, 50, SHORT_STRING_MAX_LEN]) @@ -89,7 +89,7 @@ def test_encode_short_string_various_lengths(self, length: int) -> None: """Short strings of various lengths are prefixed with 0x80 + length.""" # Use bytes above 0x7f to ensure short string encoding is used data = bytes([0x80 + (i % 0x7F) for i in range(length)]) - result = encode(data) + result = encode_rlp(data) assert result[0] == SHORT_STRING_PREFIX + length assert result[1:] == data @@ -101,7 +101,7 @@ def test_encode_long_string_56_bytes(self) -> None: """56-byte string uses long string encoding.""" data = b"Lorem ipsum dolor sit amet, consectetur adipisicing elit" assert len(data) == SHORT_STRING_MAX_LEN + 1 - result = encode(data) + result = encode_rlp(data) expected = bytes.fromhex( "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c6974" @@ -113,7 +113,7 @@ def test_encode_long_string_1024_bytes(self) -> None: # Use simple repeated bytes to avoid codespell false positives. data = b"x" * 1024 assert len(data) == 1024 - result = encode(data) + result = encode_rlp(data) # Prefix 0xb9 = 0xb7 + 2 (2 bytes for length) # Length 0x0400 = 1024 in big-endian assert result[0] == LONG_STRING_PREFIX + 1 # 0xb9 @@ -123,7 +123,7 @@ def test_encode_long_string_1024_bytes(self) -> None: def test_encode_long_string_boundary(self) -> None: """String at exact boundary (56 bytes) uses long encoding.""" data = b"a" * (SHORT_STRING_MAX_LEN + 1) - result = encode(data) + result = encode_rlp(data) # Prefix 0xb8 = 0xb7 + 1 (1 byte for length) assert result[0] == LONG_STRING_PREFIX assert result[1] == len(data) @@ -135,7 +135,7 @@ class TestEncodeEmptyList: def test_encode_empty_list(self) -> None: """Empty list encodes to 0xc0.""" - result = encode([]) + result = encode_rlp([]) assert result == bytes.fromhex("c0") @@ -144,14 +144,14 @@ class TestEncodeShortList: def test_encode_string_list(self) -> None: """List of strings ['dog', 'god', 'cat'] encodes correctly.""" - result = encode([b"dog", b"god", b"cat"]) + result = encode_rlp([b"dog", b"god", b"cat"]) assert result == bytes.fromhex("cc83646f6783676f6483636174") def test_encode_multilist(self) -> None: """Mixed list ['zw', [4], 1] encodes correctly.""" # 4 encodes as 0x04 (single byte) # 1 encodes as 0x01 (single byte) - result = encode([b"zw", [b"\x04"], b"\x01"]) + result = encode_rlp([b"zw", [b"\x04"], b"\x01"]) assert result == bytes.fromhex("c6827a77c10401") def test_encode_short_list_max_payload(self) -> None: @@ -159,7 +159,7 @@ def test_encode_short_list_max_payload(self) -> None: # Create a list that has exactly 55 bytes of payload # Each "a" encodes as 0x61 (single byte), so 55 "a"s = 55 bytes payload items: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN)] - result = encode(items) + result = encode_rlp(items) assert result[0] == SHORT_LIST_PREFIX + SHORT_LIST_MAX_LEN # 0xf7 @@ -169,7 +169,7 @@ class TestEncodeLongList: def test_encode_long_list_four_nested(self) -> None: """Long list with 4 nested lists encodes correctly.""" inner = [b"asdf", b"qwer", b"zxcv"] - result = encode([inner, inner, inner, inner]) + result = encode_rlp([inner, inner, inner, inner]) expected = bytes.fromhex( "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" "cf84617364668471776572847a786376cf84617364668471776572847a786376" @@ -179,7 +179,7 @@ def test_encode_long_list_four_nested(self) -> None: def test_encode_long_list_32_nested(self) -> None: """Long list with 32 nested lists uses 2-byte length prefix.""" inner = [b"asdf", b"qwer", b"zxcv"] - result = encode([inner] * 32) + result = encode_rlp([inner] * 32) expected_start = bytes.fromhex("f90200") # 0xf9 = 0xf7 + 2, length = 0x0200 = 512 assert result[:3] == expected_start @@ -198,7 +198,7 @@ def test_encode_short_list_11_elements(self) -> None: b"asdf", b"qwer", ] - result = encode(items) + result = encode_rlp(items) expected = bytes.fromhex( "f784617364668471776572847a78637684617364668471776572847a78637684617364" "668471776572847a78637684617364668471776572" @@ -211,12 +211,12 @@ class TestEncodeNestedLists: def test_encode_lists_of_lists(self) -> None: """Nested empty lists [[[], []], []] encode correctly.""" - result = encode([[[], []], []]) + result = encode_rlp([[[], []], []]) assert result == bytes.fromhex("c4c2c0c0c0") def test_encode_lists_of_lists_complex(self) -> None: """Complex nested structure [[], [[]], [[], [[]]]] encodes correctly.""" - result = encode([[], [[]], [[], [[]]]]) + result = encode_rlp([[], [[]], [[], [[]]]]) assert result == bytes.fromhex("c7c0c1c0c3c0c1c0") @@ -226,32 +226,32 @@ class TestEncodeIntegers: def test_encode_zero(self) -> None: """Integer 0 encodes as empty string (0x80).""" # In RLP, 0 is represented as empty byte string - result = encode(b"") + result = encode_rlp(b"") assert result == bytes.fromhex("80") def test_encode_small_integers(self) -> None: """Small integers 1-127 encode as single bytes.""" - assert encode(b"\x01") == bytes.fromhex("01") - assert encode(b"\x10") == bytes.fromhex("10") # 16 - assert encode(b"\x4f") == bytes.fromhex("4f") # 79 - assert encode(b"\x7f") == bytes.fromhex("7f") # 127 + assert encode_rlp(b"\x01") == bytes.fromhex("01") + assert encode_rlp(b"\x10") == bytes.fromhex("10") # 16 + assert encode_rlp(b"\x4f") == bytes.fromhex("4f") # 79 + assert encode_rlp(b"\x7f") == bytes.fromhex("7f") # 127 def test_encode_medium_integers(self) -> None: """Integers >= 128 encode as short strings.""" # 128 = 0x80 (1 byte, but > 0x7f so needs prefix) - assert encode(b"\x80") == bytes.fromhex("8180") + assert encode_rlp(b"\x80") == bytes.fromhex("8180") # 1000 = 0x03e8 (2 bytes) - assert encode((1000).to_bytes(2, "big")) == bytes.fromhex("8203e8") + assert encode_rlp((1000).to_bytes(2, "big")) == bytes.fromhex("8203e8") # 100000 = 0x0186a0 (3 bytes) - assert encode((100000).to_bytes(3, "big")) == bytes.fromhex("830186a0") + assert encode_rlp((100000).to_bytes(3, "big")) == bytes.fromhex("830186a0") def test_encode_big_integer_2_pow_256(self) -> None: """2^256 encodes as 33-byte string.""" big_int = 2**256 big_bytes = big_int.to_bytes(33, "big") - result = encode(big_bytes) + result = encode_rlp(big_bytes) expected = bytes.fromhex( "a1010000000000000000000000000000000000000000000000000000000000000000" ) @@ -264,22 +264,22 @@ class TestEncodeTypeErrors: def test_encode_invalid_type_int(self) -> None: """Encoding an integer directly raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: int"): - encode(42) # type: ignore[arg-type] + encode_rlp(42) # type: ignore[arg-type] def test_encode_invalid_type_str(self) -> None: """Encoding a string directly raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: str"): - encode("hello") # type: ignore[arg-type] + encode_rlp("hello") # type: ignore[arg-type] def test_encode_invalid_type_none(self) -> None: """Encoding None raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: NoneType"): - encode(None) # type: ignore[arg-type] + encode_rlp(None) # type: ignore[arg-type] def test_encode_invalid_nested_type(self) -> None: """Encoding a list with invalid nested type raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: int"): - encode([b"valid", 123]) # type: ignore[list-item] + encode_rlp([b"valid", 123]) # type: ignore[list-item] class TestDecodeEmptyString: @@ -287,7 +287,7 @@ class TestDecodeEmptyString: def test_decode_empty_string(self) -> None: """0x80 decodes to empty string.""" - result = decode(bytes.fromhex("80")) + result = decode_rlp(bytes.fromhex("80")) assert result == b"" @@ -296,24 +296,24 @@ class TestDecodeSingleByte: def test_decode_byte_0x00(self) -> None: """0x00 decodes to single byte 0x00.""" - result = decode(bytes.fromhex("00")) + result = decode_rlp(bytes.fromhex("00")) assert result == b"\x00" def test_decode_byte_0x01(self) -> None: """0x01 decodes to single byte 0x01.""" - result = decode(bytes.fromhex("01")) + result = decode_rlp(bytes.fromhex("01")) assert result == b"\x01" def test_decode_byte_0x7f(self) -> None: """0x7f decodes to single byte 0x7f.""" - result = decode(bytes.fromhex("7f")) + result = decode_rlp(bytes.fromhex("7f")) assert result == b"\x7f" @pytest.mark.parametrize("byte_val", range(0x00, SINGLE_BYTE_MAX + 1)) def test_decode_all_single_byte_values(self, byte_val: int) -> None: """All single-byte values 0x00-0x7f decode correctly.""" data = bytes([byte_val]) - result = decode(data) + result = decode_rlp(data) assert result == data @@ -322,7 +322,7 @@ class TestDecodeShortString: def test_decode_short_string_dog(self) -> None: """0x83646f67 decodes to 'dog'.""" - result = decode(bytes.fromhex("83646f67")) + result = decode_rlp(bytes.fromhex("83646f67")) assert result == b"dog" def test_decode_short_string_55_bytes(self) -> None: @@ -331,7 +331,7 @@ def test_decode_short_string_55_bytes(self) -> None: "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c69" ) - result = decode(encoded) + result = decode_rlp(encoded) assert result == b"Lorem ipsum dolor sit amet, consectetur adipisicing eli" @@ -344,15 +344,15 @@ def test_decode_long_string_56_bytes(self) -> None: "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c6974" ) - result = decode(encoded) + result = decode_rlp(encoded) assert result == b"Lorem ipsum dolor sit amet, consectetur adipisicing elit" def test_decode_long_string_1024_bytes(self) -> None: """1024-byte string with 2-byte length prefix decodes correctly.""" # Use simple repeated bytes to avoid codespell false positives. expected_data = b"y" * 1024 - encoded = encode(expected_data) - result = decode(encoded) + encoded = encode_rlp(expected_data) + result = decode_rlp(encoded) assert result == expected_data @@ -361,7 +361,7 @@ class TestDecodeEmptyList: def test_decode_empty_list(self) -> None: """0xc0 decodes to empty list.""" - result = decode(bytes.fromhex("c0")) + result = decode_rlp(bytes.fromhex("c0")) assert result == [] @@ -370,12 +370,12 @@ class TestDecodeShortList: def test_decode_string_list(self) -> None: """Encoded string list decodes correctly.""" - result = decode(bytes.fromhex("cc83646f6783676f6483636174")) + result = decode_rlp(bytes.fromhex("cc83646f6783676f6483636174")) assert result == [b"dog", b"god", b"cat"] def test_decode_multilist(self) -> None: """Mixed list decodes correctly.""" - result = decode(bytes.fromhex("c6827a77c10401")) + result = decode_rlp(bytes.fromhex("c6827a77c10401")) assert result == [b"zw", [b"\x04"], b"\x01"] @@ -388,7 +388,7 @@ def test_decode_long_list_four_nested(self) -> None: "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" "cf84617364668471776572847a786376cf84617364668471776572847a786376" ) - result = decode(encoded) + result = decode_rlp(encoded) inner = [b"asdf", b"qwer", b"zxcv"] assert result == [inner, inner, inner, inner] @@ -398,12 +398,12 @@ class TestDecodeNestedLists: def test_decode_lists_of_lists(self) -> None: """Nested empty lists decode correctly.""" - result = decode(bytes.fromhex("c4c2c0c0c0")) + result = decode_rlp(bytes.fromhex("c4c2c0c0c0")) assert result == [[[], []], []] def test_decode_lists_of_lists_complex(self) -> None: """Complex nested structure decodes correctly.""" - result = decode(bytes.fromhex("c7c0c1c0c3c0c1c0")) + result = decode_rlp(bytes.fromhex("c7c0c1c0c3c0c1c0")) assert result == [[], [[]], [[], [[]]]] @@ -413,43 +413,43 @@ class TestDecodeErrors: def test_decode_empty_data(self) -> None: """Decoding empty data raises RLPDecodingError.""" with pytest.raises(RLPDecodingError, match=r"Empty RLP data"): - decode(b"") + decode_rlp(b"") def test_decode_trailing_data(self) -> None: """Extra bytes after valid RLP raise RLPDecodingError.""" # Valid empty string (0x80) followed by extra byte with pytest.raises(RLPDecodingError, match=r"Trailing data"): - decode(bytes.fromhex("8000")) + decode_rlp(bytes.fromhex("8000")) def test_decode_short_string_truncated(self) -> None: """Truncated short string raises RLPDecodingError.""" # 0x83 indicates 3-byte string, but only 2 bytes provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("836465")) # "de" instead of "dog" + decode_rlp(bytes.fromhex("836465")) # "de" instead of "dog" def test_decode_long_string_truncated_length(self) -> None: """Truncated length field in long string raises RLPDecodingError.""" # 0xb9 indicates 2-byte length, but only 1 byte provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("b904")) + decode_rlp(bytes.fromhex("b904")) def test_decode_long_string_truncated_payload(self) -> None: """Truncated payload in long string raises RLPDecodingError.""" # 0xb838 indicates 56 bytes, but insufficient data provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("b8380000")) # Only 2 bytes of payload + decode_rlp(bytes.fromhex("b8380000")) # Only 2 bytes of payload def test_decode_short_list_truncated(self) -> None: """Truncated short list raises RLPDecodingError.""" # 0xc3 indicates 3-byte payload, but only 2 bytes provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("c38080")) + decode_rlp(bytes.fromhex("c38080")) def test_decode_long_list_truncated_length(self) -> None: """Truncated length field in long list raises RLPDecodingError.""" # 0xf9 indicates 2-byte length, but only 1 byte provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("f904")) + decode_rlp(bytes.fromhex("f904")) def test_decode_non_canonical_long_string_for_short(self) -> None: """Using long string encoding for short string is non-canonical.""" @@ -457,13 +457,13 @@ def test_decode_non_canonical_long_string_for_short(self) -> None: # but 0x38 <= 55, so this should be encoded as short string with pytest.raises(RLPDecodingError, match=r"Non-canonical.*long string"): # 0xb8 followed by length 0x37 (55) - should have used short encoding - decode(bytes.fromhex("b837") + b"a" * 55) + decode_rlp(bytes.fromhex("b837") + b"a" * 55) def test_decode_non_canonical_long_list_for_short(self) -> None: """Using long list encoding for short list is non-canonical.""" # 0xf8 followed by length 0x37 (55) - should have used short encoding with pytest.raises(RLPDecodingError, match=r"Non-canonical.*long list"): - decode(bytes.fromhex("f837") + bytes.fromhex("80") * 55) + decode_rlp(bytes.fromhex("f837") + bytes.fromhex("80") * 55) class TestDecodeListFunction: @@ -471,18 +471,18 @@ class TestDecodeListFunction: def test_decode_list_success(self) -> None: """decode_list returns list of bytes for flat list.""" - result = decode_list(bytes.fromhex("cc83646f6783676f6483636174")) + result = decode_rlp_list(bytes.fromhex("cc83646f6783676f6483636174")) assert result == [b"dog", b"god", b"cat"] def test_decode_list_not_a_list(self) -> None: """decode_list raises error when data is not a list.""" with pytest.raises(RLPDecodingError, match=r"Expected RLP list"): - decode_list(bytes.fromhex("83646f67")) # Encodes "dog", not a list + decode_rlp_list(bytes.fromhex("83646f67")) # Encodes "dog", not a list def test_decode_list_nested_list_rejected(self) -> None: """decode_list raises error when list contains nested lists.""" with pytest.raises(RLPDecodingError, match=r"Element .* is not bytes"): - decode_list(bytes.fromhex("c4c2c0c0c0")) # [[[], []], []] + decode_rlp_list(bytes.fromhex("c4c2c0c0c0")) # [[[], []], []] class TestEncodeDecodeRoundtrip: @@ -509,8 +509,8 @@ class TestEncodeDecodeRoundtrip: ) def test_roundtrip(self, item: RLPItem) -> None: """Encoding then decoding returns the original item.""" - encoded = encode(item) - decoded = decode(encoded) + encoded = encode_rlp(item) + decoded = decode_rlp(encoded) assert decoded == item def test_roundtrip_large_nested_structure(self) -> None: @@ -521,8 +521,8 @@ def test_roundtrip_large_nested_structure(self) -> None: [inner, inner], [[inner], [inner, inner]], ] - encoded = encode(structure) - decoded = decode(encoded) + encoded = encode_rlp(structure) + decoded = decode_rlp(encoded) assert decoded == structure @@ -531,28 +531,28 @@ class TestOfficialEthereumVectors: def test_emptystring(self) -> None: """Official test vector: emptystring.""" - assert encode(b"") == bytes.fromhex("80") - assert decode(bytes.fromhex("80")) == b"" + assert encode_rlp(b"") == bytes.fromhex("80") + assert decode_rlp(bytes.fromhex("80")) == b"" def test_bytestring00(self) -> None: """Official test vector: bytestring00.""" - assert encode(b"\x00") == bytes.fromhex("00") - assert decode(bytes.fromhex("00")) == b"\x00" + assert encode_rlp(b"\x00") == bytes.fromhex("00") + assert decode_rlp(bytes.fromhex("00")) == b"\x00" def test_bytestring01(self) -> None: """Official test vector: bytestring01.""" - assert encode(b"\x01") == bytes.fromhex("01") - assert decode(bytes.fromhex("01")) == b"\x01" + assert encode_rlp(b"\x01") == bytes.fromhex("01") + assert decode_rlp(bytes.fromhex("01")) == b"\x01" def test_bytestring7f(self) -> None: """Official test vector: bytestring7F.""" - assert encode(b"\x7f") == bytes.fromhex("7f") - assert decode(bytes.fromhex("7f")) == b"\x7f" + assert encode_rlp(b"\x7f") == bytes.fromhex("7f") + assert decode_rlp(bytes.fromhex("7f")) == b"\x7f" def test_shortstring(self) -> None: """Official test vector: shortstring.""" - assert encode(b"dog") == bytes.fromhex("83646f67") - assert decode(bytes.fromhex("83646f67")) == b"dog" + assert encode_rlp(b"dog") == bytes.fromhex("83646f67") + assert decode_rlp(bytes.fromhex("83646f67")) == b"dog" def test_shortstring2(self) -> None: """Official test vector: shortstring2 (55 bytes - max short string).""" @@ -561,8 +561,8 @@ def test_shortstring2(self) -> None: "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c69" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_longstring(self) -> None: """Official test vector: longstring (56 bytes - min long string).""" @@ -571,42 +571,42 @@ def test_longstring(self) -> None: "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c6974" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_emptylist(self) -> None: """Official test vector: emptylist.""" - assert encode([]) == bytes.fromhex("c0") - assert decode(bytes.fromhex("c0")) == [] + assert encode_rlp([]) == bytes.fromhex("c0") + assert decode_rlp(bytes.fromhex("c0")) == [] def test_stringlist(self) -> None: """Official test vector: stringlist.""" data: RLPItem = [b"dog", b"god", b"cat"] expected = bytes.fromhex("cc83646f6783676f6483636174") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_multilist(self) -> None: """Official test vector: multilist.""" # "zw" = 0x7a77, [4] = 0x04, 1 = 0x01 data: RLPItem = [b"zw", [b"\x04"], b"\x01"] expected = bytes.fromhex("c6827a77c10401") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_listsoflists(self) -> None: """Official test vector: listsoflists.""" data: RLPItem = [[[], []], []] expected = bytes.fromhex("c4c2c0c0c0") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_listsoflists2(self) -> None: """Official test vector: listsoflists2.""" data: RLPItem = [[], [[]], [[], [[]]]] expected = bytes.fromhex("c7c0c1c0c3c0c1c0") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_dicttest1(self) -> None: """Official test vector: dictTest1 (list of key-value pairs).""" @@ -620,8 +620,8 @@ def test_dicttest1(self) -> None: "ecca846b6579318476616c31ca846b6579328476616c32" "ca846b6579338476616c33ca846b6579348476616c34" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_longlist1(self) -> None: """Official test vector: longList1.""" @@ -631,8 +631,8 @@ def test_longlist1(self) -> None: "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" "cf84617364668471776572847a786376cf84617364668471776572847a786376" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data class TestBoundaryConditions: @@ -641,32 +641,32 @@ class TestBoundaryConditions: def test_single_byte_max_boundary(self) -> None: """Verify SINGLE_BYTE_MAX boundary (0x7f vs 0x80).""" # 0x7f = single byte encoding - assert encode(bytes([SINGLE_BYTE_MAX])) == bytes([SINGLE_BYTE_MAX]) + assert encode_rlp(bytes([SINGLE_BYTE_MAX])) == bytes([SINGLE_BYTE_MAX]) # 0x80 = short string encoding - assert encode(bytes([SINGLE_BYTE_MAX + 1])) == bytes([0x81, 0x80]) + assert encode_rlp(bytes([SINGLE_BYTE_MAX + 1])) == bytes([0x81, 0x80]) def test_short_string_max_boundary(self) -> None: """Verify SHORT_STRING_MAX_LEN boundary (55 vs 56 bytes).""" # 55 bytes = short string encoding (prefix 0xb7) data_55 = b"a" * SHORT_STRING_MAX_LEN - encoded_55 = encode(data_55) + encoded_55 = encode_rlp(data_55) assert encoded_55[0] == SHORT_STRING_PREFIX + SHORT_STRING_MAX_LEN # 0xb7 # 56 bytes = long string encoding (prefix 0xb8) data_56 = b"a" * (SHORT_STRING_MAX_LEN + 1) - encoded_56 = encode(data_56) + encoded_56 = encode_rlp(data_56) assert encoded_56[0] == LONG_STRING_PREFIX # 0xb8 def test_short_list_max_boundary(self) -> None: """Verify SHORT_LIST_MAX_LEN boundary (55 vs 56 bytes payload).""" # 55 bytes payload = short list encoding (prefix 0xf7) items_55: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN)] - encoded_55 = encode(items_55) + encoded_55 = encode_rlp(items_55) assert encoded_55[0] == SHORT_LIST_PREFIX + SHORT_LIST_MAX_LEN # 0xf7 # 56 bytes payload = long list encoding (prefix 0xf8) items_56: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN + 1)] - encoded_56 = encode(items_56) + encoded_56 = encode_rlp(items_56) assert encoded_56[0] == LONG_LIST_PREFIX # 0xf8 def test_prefix_boundaries(self) -> None: