diff --git a/examples/dht_messaging/__init__.py b/examples/dht_messaging/__init__.py new file mode 100644 index 000000000..19b4deccb --- /dev/null +++ b/examples/dht_messaging/__init__.py @@ -0,0 +1,16 @@ +""" +DHT Messaging Examples - ETH Delhi Hackathon + +This package contains examples demonstrating decentralized messaging +using Kademlia DHT for peer discovery, eliminating the need for +bootstrap servers. + +Examples: +- dht_messaging.py: PubSub messaging with DHT peer discovery +- dht_direct_messaging.py: Direct peer-to-peer messaging with DHT +- demo.py: Automated demo script for multiple nodes +""" + +__version__ = "1.0.0" +__author__ = "ETH Delhi Hackathon Team" +__description__ = "Decentralized Messaging with DHT Bootstrap" diff --git a/examples/dht_messaging/demo.py b/examples/dht_messaging/demo.py new file mode 100644 index 000000000..54052883b --- /dev/null +++ b/examples/dht_messaging/demo.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +""" +DHT Messaging Demo - ETH Delhi Hackathon + +This demo script shows how to run multiple DHT messaging nodes that can +discover each other without bootstrap servers and communicate directly. + +Usage: + python demo.py --mode pubsub # Run pubsub messaging demo + python demo.py --mode direct # Run direct messaging demo + python demo.py --mode both # Run both demos +""" + +import argparse +import asyncio +import logging +import subprocess +import sys +import time +from pathlib import Path + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("dht-messaging-demo") + + +class DHTMessagingDemo: + """Demo runner for DHT messaging examples.""" + + def __init__(self, mode: str): + self.mode = mode + self.processes = [] + self.script_dir = Path(__file__).parent + + def run_pubsub_demo(self): + """Run the pubsub messaging demo with multiple nodes.""" + logger.info("๐Ÿš€ Starting DHT PubSub Messaging Demo") + logger.info("=" * 60) + + # Start 3 nodes for the demo + nodes = [ + {"username": "Alice", "port": 8001}, + {"username": "Bob", "port": 8002}, + {"username": "Charlie", "port": 8003}, + ] + + for node in nodes: + cmd = [ + sys.executable, + str(self.script_dir / "dht_messaging.py"), + "-p", str(node["port"]), + "-u", node["username"], + "-v" + ] + + logger.info(f"Starting {node['username']} on port {node['port']}") + process = subprocess.Popen(cmd) + self.processes.append(process) + time.sleep(2) # Give each node time to start + + logger.info("\n๐ŸŽ‰ Demo nodes started!") + logger.info("๐Ÿ’ก Each node will discover others via DHT and start messaging") + logger.info("๐Ÿ’ก Type messages in any terminal to see them broadcast") + logger.info("๐Ÿ’ก Type 'quit' in any terminal to stop that node") + logger.info("\nPress Ctrl+C to stop all nodes") + + def run_direct_demo(self): + """Run the direct messaging demo with multiple nodes.""" + logger.info("๐Ÿš€ Starting DHT Direct Messaging Demo") + logger.info("=" * 60) + + # Start 3 nodes for the demo + nodes = [ + {"username": "Alice", "port": 9001}, + {"username": "Bob", "port": 9002}, + {"username": "Charlie", "port": 9003}, + ] + + for node in nodes: + cmd = [ + sys.executable, + str(self.script_dir / "dht_direct_messaging.py"), + "-p", str(node["port"]), + "-u", node["username"], + "-v" + ] + + logger.info(f"Starting {node['username']} on port {node['port']}") + process = subprocess.Popen(cmd) + self.processes.append(process) + time.sleep(2) # Give each node time to start + + logger.info("\n๐ŸŽ‰ Demo nodes started!") + logger.info("๐Ÿ’ก Each node will discover others via DHT") + logger.info("๐Ÿ’ก Use 'msg ' to send direct messages") + logger.info("๐Ÿ’ก Use 'peers' to see connected peers") + logger.info("๐Ÿ’ก Type 'quit' in any terminal to stop that node") + logger.info("\nPress Ctrl+C to stop all nodes") + + def run_both_demos(self): + """Run both pubsub and direct messaging demos.""" + logger.info("๐Ÿš€ Starting Both DHT Messaging Demos") + logger.info("=" * 60) + + # Start pubsub demo + self.run_pubsub_demo() + time.sleep(5) + + # Start direct messaging demo + self.run_direct_demo() + + def run(self): + """Run the demo based on the selected mode.""" + try: + if self.mode == "pubsub": + self.run_pubsub_demo() + elif self.mode == "direct": + self.run_direct_demo() + elif self.mode == "both": + self.run_both_demos() + else: + logger.error(f"โŒ Unknown mode: {self.mode}") + return + + # Wait for user to stop + while True: + time.sleep(1) + + except KeyboardInterrupt: + logger.info("\n๐Ÿ›‘ Stopping demo...") + self.stop_all_processes() + except Exception as e: + logger.error(f"โŒ Demo error: {e}") + self.stop_all_processes() + + def stop_all_processes(self): + """Stop all running processes.""" + for process in self.processes: + try: + process.terminate() + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + except Exception as e: + logger.error(f"โŒ Error stopping process: {e}") + + logger.info("โœ… All demo processes stopped") + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="DHT Messaging Demo - ETH Delhi Hackathon" + ) + parser.add_argument( + "--mode", + choices=["pubsub", "direct", "both"], + default="pubsub", + help="Demo mode to run" + ) + + args = parser.parse_args() + + logger.info("๐ŸŽฏ ETH Delhi Hackathon - DHT Messaging Demo") + logger.info("๐Ÿ“‹ Demonstrating serverless peer discovery and messaging") + + demo = DHTMessagingDemo(args.mode) + demo.run() + + +if __name__ == "__main__": + main() diff --git a/examples/dht_messaging/dht_direct_messaging.py b/examples/dht_messaging/dht_direct_messaging.py new file mode 100644 index 000000000..66ff8fb56 --- /dev/null +++ b/examples/dht_messaging/dht_direct_messaging.py @@ -0,0 +1,445 @@ +#!/usr/bin/env python + +""" +Advanced DHT Direct Messaging - ETH Delhi Hackathon + +This example demonstrates direct peer-to-peer messaging using DHT for peer discovery. +Peers can find each other by peer ID through DHT lookups and establish direct +connections for private messaging without relying on pubsub topics. + +Key Features: +- DHT-based peer discovery (no bootstrap servers) +- Direct peer-to-peer messaging +- Peer lookup by ID through DHT +- Private messaging channels +- Serverless architecture +""" + +import argparse +import asyncio +import json +import logging +import secrets +import sys +import time +from typing import Dict, Optional, Set + +import base58 +import trio +from multiaddr import Multiaddr + +from libp2p import new_host +from libp2p.abc import IHost, INetStream +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.kad_dht.kad_dht import DHTMode, KadDHT +from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.async_service import background_trio_service +from libp2p.utils.address_validation import find_free_port, get_available_interfaces + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("dht-direct-messaging") + +# Protocol IDs +DIRECT_MESSAGING_PROTOCOL = TProtocol("/dht-direct-messaging/1.0.0") + +# Global state +discovered_peers: Set[str] = set() +peer_discovery_event = trio.Event() +active_connections: Dict[str, INetStream] = {} + + +class DHTDirectMessagingNode: + """ + A decentralized direct messaging node that uses DHT for peer discovery + and direct streams for private messaging. + """ + + def __init__(self, port: int, username: str = None): + self.port = port + self.username = username or f"user_{secrets.token_hex(4)}" + self.host: Optional[IHost] = None + self.dht: Optional[KadDHT] = None + self.termination_event = trio.Event() + self.connected_peers: Set[str] = set() + self.peer_usernames: Dict[str, str] = {} + + async def start(self) -> None: + """Start the DHT direct messaging node.""" + try: + # Create host with random key pair + key_pair = create_new_key_pair(secrets.token_bytes(32)) + self.host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + + # Get available interfaces + listen_addrs = get_available_interfaces(self.port) + + async with self.host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery: + # Start peer store cleanup + nursery.start_soon(self.host.get_peerstore().start_cleanup_task, 60) + + logger.info(f"๐Ÿš€ Starting DHT Direct Messaging Node: {self.username}") + logger.info(f"๐Ÿ“ Peer ID: {self.host.get_id()}") + logger.info(f"๐ŸŒ Listening on: {listen_addrs}") + + # Initialize DHT in SERVER mode for peer discovery + self.dht = KadDHT(self.host, DHTMode.SERVER, enable_random_walk=True) + + # Set up stream handler for incoming direct messages + self.host.set_stream_handler(DIRECT_MESSAGING_PROTOCOL, self._handle_incoming_stream) + + # Start services + async with background_trio_service(self.dht): + logger.info("โœ… DHT service started successfully") + + # Start background tasks + nursery.start_soon(self._peer_discovery_loop) + nursery.start_soon(self._peer_connection_loop) + nursery.start_soon(self._user_interface_loop) + nursery.start_soon(self._status_monitor) + + # Wait for termination + await self.termination_event.wait() + + except Exception as e: + logger.error(f"โŒ Error starting node: {e}") + raise + + async def _handle_incoming_stream(self, stream: INetStream) -> None: + """Handle incoming direct messaging streams.""" + try: + peer_id = stream.muxed_conn.peer_id + peer_id_str = peer_id.pretty() + + logger.info(f"๐Ÿ“จ New direct message connection from {peer_id_str}") + + # Store the connection + active_connections[peer_id_str] = stream + + # Start receiving messages from this peer + async with trio.open_nursery() as nursery: + nursery.start_soon(self._receive_direct_messages, stream, peer_id_str) + + except Exception as e: + logger.error(f"โŒ Error handling incoming stream: {e}") + + async def _receive_direct_messages(self, stream: INetStream, peer_id_str: str) -> None: + """Receive direct messages from a specific peer.""" + try: + while not self.termination_event.is_set(): + try: + # Read message length first + length_bytes = await stream.read(4) + if not length_bytes: + break + + message_length = int.from_bytes(length_bytes, 'big') + + # Read the actual message + message_bytes = await stream.read(message_length) + if not message_bytes: + break + + # Parse the message + message_data = json.loads(message_bytes.decode('utf-8')) + message_type = message_data.get('type') + + if message_type == 'message': + content = message_data.get('content', '') + sender_username = message_data.get('username', 'Unknown') + + # Store peer username + self.peer_usernames[peer_id_str] = sender_username + + print(f"\n๐Ÿ’ฌ [{sender_username}]: {content}") + print(f"{self.username}> ", end="", flush=True) + + elif message_type == 'username': + username = message_data.get('username', 'Unknown') + self.peer_usernames[peer_id_str] = username + print(f"\n๐Ÿ‘ค {peer_id_str[:8]} is now known as {username}") + print(f"{self.username}> ", end="", flush=True) + + except (StreamClosed, StreamEOF, StreamReset): + logger.info(f"๐Ÿ“ค Connection closed with {peer_id_str}") + break + except Exception as e: + logger.error(f"โŒ Error receiving message from {peer_id_str}: {e}") + break + + except Exception as e: + logger.error(f"โŒ Error in receive direct messages: {e}") + finally: + # Clean up connection + if peer_id_str in active_connections: + del active_connections[peer_id_str] + + async def _peer_discovery_loop(self) -> None: + """Continuously discover peers through DHT.""" + logger.info("๐Ÿ” Starting peer discovery loop...") + + while not self.termination_event.is_set(): + try: + # Get connected peers from DHT routing table + routing_peers = list(self.dht.routing_table.peers.keys()) + + # Get connected peers from host + connected_peers = set(self.host.get_connected_peers()) + + # Find new peers to connect to + new_peers = set(routing_peers) - connected_peers - {self.host.get_id()} + + if new_peers: + logger.info(f"๐Ÿ” Discovered {len(new_peers)} new peers via DHT") + for peer_id in new_peers: + peer_id_str = peer_id.pretty() + if peer_id_str not in discovered_peers: + discovered_peers.add(peer_id_str) + logger.info(f"โž• New peer discovered: {peer_id_str}") + + # Signal that we have discovered peers + if discovered_peers and not peer_discovery_event.is_set(): + peer_discovery_event.set() + + await trio.sleep(5) # Check every 5 seconds + + except Exception as e: + logger.error(f"โŒ Error in peer discovery: {e}") + await trio.sleep(5) + + async def _peer_connection_loop(self) -> None: + """Continuously attempt to connect to discovered peers.""" + logger.info("๐Ÿ”— Starting peer connection loop...") + + while not self.termination_event.is_set(): + try: + # Wait for peer discovery + await peer_discovery_event.wait() + + # Try to connect to discovered peers + for peer_id_str in list(discovered_peers): + if peer_id_str not in self.connected_peers: + try: + # Convert string back to peer ID + peer_id = self.host.get_id().__class__.from_string(peer_id_str) + + # Get peer info from DHT + peer_info = await self.dht.peer_routing.find_peer(peer_id) + if peer_info: + logger.info(f"๐Ÿ”— Attempting to connect to {peer_id_str}") + await self.host.connect(peer_info) + self.connected_peers.add(peer_id_str) + logger.info(f"โœ… Connected to {peer_id_str}") + + # Send our username to the peer + await self._send_username_to_peer(peer_id) + + else: + logger.debug(f"โš ๏ธ No peer info found for {peer_id_str}") + except Exception as e: + logger.debug(f"โŒ Failed to connect to {peer_id_str}: {e}") + + await trio.sleep(10) # Try connections every 10 seconds + + except Exception as e: + logger.error(f"โŒ Error in peer connection: {e}") + await trio.sleep(10) + + async def _send_username_to_peer(self, peer_id) -> None: + """Send our username to a peer.""" + try: + stream = await self.host.new_stream(peer_id, [DIRECT_MESSAGING_PROTOCOL]) + + # Send username message + username_message = { + 'type': 'username', + 'username': self.username + } + + message_bytes = json.dumps(username_message).encode('utf-8') + length_bytes = len(message_bytes).to_bytes(4, 'big') + + await stream.write(length_bytes + message_bytes) + + # Store the connection + peer_id_str = peer_id.pretty() + active_connections[peer_id_str] = stream + + # Start receiving messages from this peer + async with trio.open_nursery() as nursery: + nursery.start_soon(self._receive_direct_messages, stream, peer_id_str) + + except Exception as e: + logger.error(f"โŒ Error sending username to {peer_id.pretty()}: {e}") + + async def _send_direct_message(self, peer_id_str: str, message: str) -> None: + """Send a direct message to a specific peer.""" + try: + if peer_id_str not in active_connections: + logger.error(f"โŒ No active connection to {peer_id_str}") + return + + stream = active_connections[peer_id_str] + + # Create message + message_data = { + 'type': 'message', + 'content': message, + 'username': self.username + } + + message_bytes = json.dumps(message_data).encode('utf-8') + length_bytes = len(message_bytes).to_bytes(4, 'big') + + await stream.write(length_bytes + message_bytes) + print(f"๐Ÿ“ค Sent to {peer_id_str[:8]}: {message}") + + except Exception as e: + logger.error(f"โŒ Error sending message to {peer_id_str}: {e}") + + async def _user_interface_loop(self) -> None: + """Handle user interface and commands.""" + print(f"\n๐ŸŽ‰ Welcome to DHT Direct Messaging, {self.username}!") + print("๐Ÿ’ก Commands:") + print(" - 'msg ' - Send direct message") + print(" - 'peers' - Show connected peers") + print(" - 'discover' - Show discovered peers") + print(" - 'quit' - Exit") + print(f"\n{self.username}> ", end="", flush=True) + + while not self.termination_event.is_set(): + try: + # Use trio's run_sync_in_worker_thread for non-blocking input + user_input = await trio.to_thread.run_sync(input) + + if user_input.lower() == "quit": + self.termination_event.set() + break + elif user_input.lower() == "peers": + connected_count = len(self.connected_peers) + print(f"๐Ÿ”— Connected peers: {connected_count}") + for peer_id in self.connected_peers: + username = self.peer_usernames.get(peer_id, "Unknown") + print(f" - {peer_id[:8]} ({username})") + print(f"{self.username}> ", end="", flush=True) + elif user_input.lower() == "discover": + discovered_count = len(discovered_peers) + print(f"๐Ÿ” Discovered peers: {discovered_count}") + for peer_id in discovered_peers: + status = "โœ…" if peer_id in self.connected_peers else "โณ" + username = self.peer_usernames.get(peer_id, "Unknown") + print(f" {status} {peer_id[:8]} ({username})") + print(f"{self.username}> ", end="", flush=True) + elif user_input.startswith("msg "): + parts = user_input.split(" ", 2) + if len(parts) >= 3: + peer_id = parts[1] + message = parts[2] + await self._send_direct_message(peer_id, message) + else: + print("โŒ Usage: msg ") + print(f"{self.username}> ", end="", flush=True) + elif user_input.strip(): + print("โŒ Unknown command. Type 'quit' to exit.") + print(f"{self.username}> ", end="", flush=True) + else: + print(f"{self.username}> ", end="", flush=True) + + except Exception as e: + logger.error(f"โŒ Error in user interface: {e}") + await trio.sleep(1) + + async def _status_monitor(self) -> None: + """Monitor and display node status.""" + while not self.termination_event.is_set(): + try: + connected_count = len(self.connected_peers) + discovered_count = len(discovered_peers) + routing_peers = len(self.dht.routing_table.peers) + active_conns = len(active_connections) + + if connected_count > 0 or discovered_count > 0: + logger.info( + f"๐Ÿ“Š Status - Connected: {connected_count}, " + f"Discovered: {discovered_count}, " + f"Routing: {routing_peers}, " + f"Active: {active_conns}" + ) + + await trio.sleep(30) # Status every 30 seconds + + except Exception as e: + logger.error(f"โŒ Error in status monitor: {e}") + await trio.sleep(30) + + def stop(self) -> None: + """Stop the node.""" + self.termination_event.set() + + +async def run_dht_direct_messaging(port: int, username: str = None) -> None: + """Run the DHT direct messaging node.""" + node = DHTDirectMessagingNode(port, username) + try: + await node.start() + except KeyboardInterrupt: + logger.info("๐Ÿ›‘ Shutting down...") + node.stop() + except Exception as e: + logger.error(f"โŒ Fatal error: {e}") + raise + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="DHT Direct Messaging - ETH Delhi Hackathon" + ) + parser.add_argument( + "-p", "--port", + type=int, + default=0, + help="Port to listen on (0 for random)" + ) + parser.add_argument( + "-u", "--username", + type=str, + help="Username for this node" + ) + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose logging" + ) + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + if args.port == 0: + args.port = find_free_port() + + logger.info("๐Ÿš€ Starting DHT Direct Messaging System") + logger.info("๐ŸŽฏ ETH Delhi Hackathon - Direct Messaging with DHT Bootstrap") + + try: + trio.run(run_dht_direct_messaging, args.port, args.username) + except KeyboardInterrupt: + logger.info("๐Ÿ‘‹ Goodbye!") + except Exception as e: + logger.error(f"โŒ Application failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/examples/dht_messaging/dht_messaging.py b/examples/dht_messaging/dht_messaging.py new file mode 100644 index 000000000..8c416cb5a --- /dev/null +++ b/examples/dht_messaging/dht_messaging.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python + +""" +Decentralized Messaging with DHT Bootstrap - ETH Delhi Hackathon + +This example demonstrates a truly decentralized messaging system that uses +Kademlia DHT for peer discovery and routing, eliminating the need for +hardcoded bootstrap servers. Peers can find each other by peer ID through +DHT lookups and establish direct connections for messaging. + +Key Features: +- DHT-based peer discovery (no bootstrap servers) +- PubSub messaging over discovered peers +- Automatic peer routing through DHT +- Serverless architecture +""" + +import argparse +import asyncio +import logging +import random +import secrets +import sys +import time +from typing import Optional + +import base58 +import trio +from multiaddr import Multiaddr + +from libp2p import new_host +from libp2p.abc import IHost +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.kad_dht.kad_dht import DHTMode, KadDHT +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.async_service import background_trio_service +from libp2p.utils.address_validation import find_free_port, get_available_interfaces + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("dht-messaging") + +# Protocol IDs +GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0") +MESSAGING_TOPIC = "dht-messaging" + +# Global variables for peer discovery +discovered_peers = set() +peer_discovery_event = trio.Event() + + +class DHTMessagingNode: + """ + A decentralized messaging node that uses DHT for peer discovery + and PubSub for messaging. + """ + + def __init__(self, port: int, username: str = None): + self.port = port + self.username = username or f"user_{secrets.token_hex(4)}" + self.host: Optional[IHost] = None + self.dht: Optional[KadDHT] = None + self.pubsub: Optional[Pubsub] = None + self.gossipsub: Optional[GossipSub] = None + self.termination_event = trio.Event() + self.connected_peers = set() + + async def start(self) -> None: + """Start the DHT messaging node.""" + try: + # Create host with random key pair + key_pair = create_new_key_pair(secrets.token_bytes(32)) + self.host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + + # Get available interfaces + listen_addrs = get_available_interfaces(self.port) + + async with self.host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery: + # Start peer store cleanup + nursery.start_soon(self.host.get_peerstore().start_cleanup_task, 60) + + logger.info(f"๐Ÿš€ Starting DHT Messaging Node: {self.username}") + logger.info(f"๐Ÿ“ Peer ID: {self.host.get_id()}") + logger.info(f"๐ŸŒ Listening on: {listen_addrs}") + + # Initialize DHT in SERVER mode for peer discovery + self.dht = KadDHT(self.host, DHTMode.SERVER, enable_random_walk=True) + + # Initialize GossipSub and PubSub + self.gossipsub = GossipSub( + protocols=[GOSSIPSUB_PROTOCOL_ID], + degree=4, # Maintain 4 peers in mesh + degree_low=2, # Lower bound + degree_high=6, # Upper bound + time_to_live=60, + gossip_window=3, + gossip_history=5, + heartbeat_initial_delay=1.0, + heartbeat_interval=10, + ) + self.pubsub = Pubsub(self.host, self.gossipsub) + + # Start services + async with background_trio_service(self.dht): + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + logger.info("โœ… All services started successfully") + + # Start background tasks + nursery.start_soon(self._peer_discovery_loop) + nursery.start_soon(self._peer_connection_loop) + nursery.start_soon(self._messaging_loop) + nursery.start_soon(self._status_monitor) + + # Wait for termination + await self.termination_event.wait() + + except Exception as e: + logger.error(f"โŒ Error starting node: {e}") + raise + + async def _peer_discovery_loop(self) -> None: + """Continuously discover peers through DHT.""" + logger.info("๐Ÿ” Starting peer discovery loop...") + + while not self.termination_event.is_set(): + try: + # Get connected peers from DHT routing table + routing_peers = list(self.dht.routing_table.peers.keys()) + + # Get connected peers from host + connected_peers = set(self.host.get_connected_peers()) + + # Find new peers to connect to + new_peers = set(routing_peers) - connected_peers - {self.host.get_id()} + + if new_peers: + logger.info(f"๐Ÿ” Discovered {len(new_peers)} new peers via DHT") + for peer_id in new_peers: + if peer_id not in discovered_peers: + discovered_peers.add(peer_id) + logger.info(f"โž• New peer discovered: {peer_id.pretty()}") + + # Signal that we have discovered peers + if discovered_peers and not peer_discovery_event.is_set(): + peer_discovery_event.set() + + await trio.sleep(5) # Check every 5 seconds + + except Exception as e: + logger.error(f"โŒ Error in peer discovery: {e}") + await trio.sleep(5) + + async def _peer_connection_loop(self) -> None: + """Continuously attempt to connect to discovered peers.""" + logger.info("๐Ÿ”— Starting peer connection loop...") + + while not self.termination_event.is_set(): + try: + # Wait for peer discovery + await peer_discovery_event.wait() + + # Try to connect to discovered peers + for peer_id in list(discovered_peers): + if peer_id not in self.connected_peers and peer_id != self.host.get_id(): + try: + # Get peer info from DHT + peer_info = await self.dht.peer_routing.find_peer(peer_id) + if peer_info: + logger.info(f"๐Ÿ”— Attempting to connect to {peer_id.pretty()}") + await self.host.connect(peer_info) + self.connected_peers.add(peer_id) + logger.info(f"โœ… Connected to {peer_id.pretty()}") + else: + logger.debug(f"โš ๏ธ No peer info found for {peer_id.pretty()}") + except Exception as e: + logger.debug(f"โŒ Failed to connect to {peer_id.pretty()}: {e}") + + await trio.sleep(10) # Try connections every 10 seconds + + except Exception as e: + logger.error(f"โŒ Error in peer connection: {e}") + await trio.sleep(10) + + async def _messaging_loop(self) -> None: + """Handle messaging functionality.""" + logger.info("๐Ÿ’ฌ Starting messaging loop...") + + # Subscribe to messaging topic + subscription = await self.pubsub.subscribe(MESSAGING_TOPIC) + logger.info(f"๐Ÿ“ข Subscribed to topic: {MESSAGING_TOPIC}") + + # Start message receiving task + async with trio.open_nursery() as nursery: + nursery.start_soon(self._receive_messages, subscription) + nursery.start_soon(self._send_messages) + + async def _receive_messages(self, subscription) -> None: + """Receive and display messages from other peers.""" + while not self.termination_event.is_set(): + try: + message = await subscription.get() + sender_id = base58.b58encode(message.from_id).decode()[:8] + message_text = message.data.decode('utf-8') + + # Don't display our own messages + if message.from_id != self.host.get_id().to_bytes(): + print(f"\n๐Ÿ’ฌ [{sender_id}]: {message_text}") + print(f"{self.username}> ", end="", flush=True) + + except Exception as e: + logger.error(f"โŒ Error receiving message: {e}") + await trio.sleep(1) + + async def _send_messages(self) -> None: + """Send messages from user input.""" + print(f"\n๐ŸŽ‰ Welcome to DHT Messaging, {self.username}!") + print("๐Ÿ’ก Type messages to send to all connected peers") + print("๐Ÿ’ก Type 'quit' to exit") + print("๐Ÿ’ก Type 'peers' to see connected peers") + print("๐Ÿ’ก Type 'discover' to see discovered peers") + print(f"\n{self.username}> ", end="", flush=True) + + while not self.termination_event.is_set(): + try: + # Use trio's run_sync_in_worker_thread for non-blocking input + user_input = await trio.to_thread.run_sync(input) + + if user_input.lower() == "quit": + self.termination_event.set() + break + elif user_input.lower() == "peers": + connected_count = len(self.connected_peers) + print(f"๐Ÿ”— Connected peers: {connected_count}") + for peer_id in self.connected_peers: + print(f" - {peer_id.pretty()}") + print(f"{self.username}> ", end="", flush=True) + elif user_input.lower() == "discover": + discovered_count = len(discovered_peers) + print(f"๐Ÿ” Discovered peers: {discovered_count}") + for peer_id in discovered_peers: + status = "โœ…" if peer_id in self.connected_peers else "โณ" + print(f" {status} {peer_id.pretty()}") + print(f"{self.username}> ", end="", flush=True) + elif user_input.strip(): + # Send message + message = f"[{self.username}]: {user_input}" + await self.pubsub.publish(MESSAGING_TOPIC, message.encode()) + print(f"๐Ÿ“ค Sent: {user_input}") + print(f"{self.username}> ", end="", flush=True) + else: + print(f"{self.username}> ", end="", flush=True) + + except Exception as e: + logger.error(f"โŒ Error in send messages: {e}") + await trio.sleep(1) + + async def _status_monitor(self) -> None: + """Monitor and display node status.""" + while not self.termination_event.is_set(): + try: + connected_count = len(self.connected_peers) + discovered_count = len(discovered_peers) + routing_peers = len(self.dht.routing_table.peers) + + if connected_count > 0 or discovered_count > 0: + logger.info( + f"๐Ÿ“Š Status - Connected: {connected_count}, " + f"Discovered: {discovered_count}, " + f"Routing: {routing_peers}" + ) + + await trio.sleep(30) # Status every 30 seconds + + except Exception as e: + logger.error(f"โŒ Error in status monitor: {e}") + await trio.sleep(30) + + def stop(self) -> None: + """Stop the node.""" + self.termination_event.set() + + +async def run_dht_messaging(port: int, username: str = None) -> None: + """Run the DHT messaging node.""" + node = DHTMessagingNode(port, username) + try: + await node.start() + except KeyboardInterrupt: + logger.info("๐Ÿ›‘ Shutting down...") + node.stop() + except Exception as e: + logger.error(f"โŒ Fatal error: {e}") + raise + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Decentralized Messaging with DHT Bootstrap - ETH Delhi Hackathon" + ) + parser.add_argument( + "-p", "--port", + type=int, + default=0, + help="Port to listen on (0 for random)" + ) + parser.add_argument( + "-u", "--username", + type=str, + help="Username for this node" + ) + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose logging" + ) + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + if args.port == 0: + args.port = find_free_port() + + logger.info("๐Ÿš€ Starting DHT Messaging System") + logger.info("๐ŸŽฏ ETH Delhi Hackathon - Decentralized Messaging with DHT Bootstrap") + + try: + trio.run(run_dht_messaging, args.port, args.username) + except KeyboardInterrupt: + logger.info("๐Ÿ‘‹ Goodbye!") + except Exception as e: + logger.error(f"โŒ Application failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/examples/dht_messaging/test_implementation.py b/examples/dht_messaging/test_implementation.py new file mode 100644 index 000000000..1b5949bb7 --- /dev/null +++ b/examples/dht_messaging/test_implementation.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python + +""" +Test script for DHT Messaging Implementation + +This script tests the basic functionality of the DHT messaging system +to ensure it works correctly. +""" + +import asyncio +import logging +import sys +import time +from pathlib import Path + +# Add the parent directory to the path to import our modules +sys.path.insert(0, str(Path(__file__).parent)) + +from dht_messaging import DHTMessagingNode + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("test-dht-messaging") + + +async def test_dht_messaging(): + """Test the DHT messaging functionality.""" + logger.info("๐Ÿงช Testing DHT Messaging Implementation") + + try: + # Create a test node + node = DHTMessagingNode(port=0, username="TestUser") + + # Start the node in a background task + async with asyncio.TaskGroup() as tg: + # Start the node + node_task = tg.create_task(node.start()) + + # Wait a bit for the node to initialize + await asyncio.sleep(5) + + # Check if the node is running + if node.host and node.dht and node.pubsub: + logger.info("โœ… Node initialized successfully") + logger.info(f"๐Ÿ“ Peer ID: {node.host.get_id()}") + logger.info(f"๐ŸŒ Listening addresses: {node.host.get_addrs()}") + + # Check DHT status + routing_peers = len(node.dht.routing_table.peers) + logger.info(f"๐Ÿ” DHT routing table has {routing_peers} peers") + + # Check PubSub status + if node.pubsub: + logger.info("๐Ÿ“ข PubSub service is running") + + logger.info("โœ… All components initialized successfully") + else: + logger.error("โŒ Node failed to initialize properly") + return False + + # Stop the node after testing + node.stop() + + logger.info("๐ŸŽ‰ Test completed successfully!") + return True + + except Exception as e: + logger.error(f"โŒ Test failed: {e}") + return False + + +async def main(): + """Main test function.""" + logger.info("๐Ÿš€ Starting DHT Messaging Tests") + logger.info("=" * 50) + + success = await test_dht_messaging() + + if success: + logger.info("โœ… All tests passed!") + return 0 + else: + logger.error("โŒ Tests failed!") + return 1 + + +if __name__ == "__main__": + try: + exit_code = asyncio.run(main()) + sys.exit(exit_code) + except KeyboardInterrupt: + logger.info("๐Ÿ›‘ Test interrupted by user") + sys.exit(1) + except Exception as e: + logger.error(f"โŒ Test error: {e}") + sys.exit(1) diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 5a62d1694..2349e16aa 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -72,6 +72,7 @@ class GossipsubParams(NamedTuple): px_peers_count: int = 16 prune_back_off: int = 60 unsubscribe_back_off: int = 10 + flood_publish: bool = False GOSSIPSUB_PARAMS = GossipsubParams() diff --git a/newsfragments/713.feature.rst b/newsfragments/713.feature.rst new file mode 100644 index 000000000..6c0bb3bc0 --- /dev/null +++ b/newsfragments/713.feature.rst @@ -0,0 +1 @@ +Added flood publishing. diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 5c341d0bf..036126160 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -601,6 +601,48 @@ async def test_sparse_connect(): @pytest.mark.trio +async def test_flood_publish(): + async with PubsubFactory.create_batch_with_gossipsub( + 6, + degree=2, + degree_low=1, + degree_high=3, + flood_publish=False, + ) as pubsubs_gsub: + routers: list[GossipSub] = [] + for pubsub in pubsubs_gsub: + assert isinstance(pubsub.router, GossipSub) + routers.append(pubsub.router) + hosts = [ps.host for ps in pubsubs_gsub] + + topic = "flood_test_topic" + queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub] + + # connect host 0 to all other hosts + await one_to_all_connect(hosts, 0) + + # wait for connections to be established + await trio.sleep(1) + + # publish a message from the first host + msg_content = b"flood_msg" + await pubsubs_gsub[0].publish(topic, msg_content) + + # wait for messages to propagate + await trio.sleep(0.5) + + print(routers[0].mesh[topic]) + if routers[0].pubsub: + print(routers[0].pubsub.peer_topics) + + # verify all nodes received the message + for queue in queues: + msg = await queue.get() + assert msg.data == msg_content, ( + f"node did not receive expected message: {msg.data}" + ) + + async def test_connect_some_with_fewer_hosts_than_degree(): """Test connect_some when there are fewer hosts than degree.""" # Create 3 hosts with degree=5 diff --git a/tests/utils/factories.py b/tests/utils/factories.py index d94cc83e5..09604c5af 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -622,6 +622,7 @@ async def create_batch_with_gossipsub( px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count, prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off, unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off, + flood_publish: bool = GOSSIPSUB_PARAMS.flood_publish, security_protocol: TProtocol | None = None, muxer_opt: TMuxerOptions | None = None, msg_id_constructor: None @@ -646,6 +647,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) else: gossipsubs = GossipsubFactory.create_batch( @@ -664,6 +666,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) async with cls._create_batch_with_router(