Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0413cd0
implemented datastore agnostic interface
Winter-Soren Sep 20, 2025
8053df6
rafactored a comment
Winter-Soren Sep 20, 2025
34884d5
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Sep 21, 2025
f1c24e8
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Sep 22, 2025
1aea25f
fix type errors and linting issues
Winter-Soren Sep 22, 2025
966da26
restore test_websockets modifed import for python versions
Winter-Soren Sep 22, 2025
4b73783
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Sep 22, 2025
d970e80
removed __init__ from peer directory
Winter-Soren Sep 22, 2025
5e2970c
Merge branch 'feat/945-persistent-storage-for-peerstore' of https://g…
Winter-Soren Sep 22, 2025
d07153b
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Sep 23, 2025
61ff804
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Sep 24, 2025
2d78c0b
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
Winter-Soren Sep 25, 2025
0a78afc
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Sep 29, 2025
08b3744
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Oct 4, 2025
0ff9c9e
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
Winter-Soren Oct 6, 2025
e9bf229
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Oct 6, 2025
ed2f7a0
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Oct 10, 2025
c3a8beb
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Oct 13, 2025
0dc0b50
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Oct 13, 2025
5f84cbc
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Oct 20, 2025
c6bf179
refactored all data sources to use trio instead of asyncio
Winter-Soren Oct 22, 2025
9416257
added PeerData info and proper error handling
Winter-Soren Oct 22, 2025
c5cf6fd
added an example for persistent peerstore
Winter-Soren Oct 22, 2025
c7414c7
added an example for persistent peerstore
Winter-Soren Oct 22, 2025
e91fa05
added a newsfragement
Winter-Soren Oct 22, 2025
ea3f29f
added comprehensive test suite
Winter-Soren Oct 22, 2025
71b8796
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
seetadev Oct 23, 2025
b2979ab
test suite moved into tests/core/peer to be caught by CI
Winter-Soren Oct 24, 2025
7f1d398
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
Winter-Soren Oct 24, 2025
f37e8a9
fix: replace assert statements with proper ValueError exceptions in d…
Winter-Soren Oct 24, 2025
c305a11
test: add comprehensive tests for datastore error handling
Winter-Soren Oct 24, 2025
0bfd833
fix: implement background persistence thread to eliminate data corrup…
Winter-Soren Oct 27, 2025
050a3b5
fixed a test case failed due to race condition in the peer connection…
Winter-Soren Oct 27, 2025
15967c0
Merge branch 'main' into feat/945-persistent-storage-for-peerstore
Winter-Soren Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions examples/persistent_peerstore/persistent_peerstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Example demonstrating the usage of PersistentPeerStore.

This example shows how to use the PersistentPeerStore with different datastore backends
to maintain peer information across application restarts.
"""

from pathlib import Path
import tempfile

from multiaddr import Multiaddr
import trio

from libp2p.peer.id import ID
from libp2p.peer.persistent_peerstore import PersistentPeerStore
from libp2p.peer.persistent_peerstore_factory import (
create_leveldb_peerstore,
create_memory_peerstore,
create_rocksdb_peerstore,
create_sqlite_peerstore,
)


async def demonstrate_peerstore_operations(peerstore: PersistentPeerStore, name: str):
"""Demonstrate basic peerstore operations."""
print(f"\n=== {name} PeerStore Demo ===")

# Create some test peer IDs
peer_id_1 = ID.from_base58("QmPeer1")
peer_id_2 = ID.from_base58("QmPeer2")

# Add addresses for peers
addr1 = Multiaddr("/ip4/127.0.0.1/tcp/4001")
addr2 = Multiaddr("/ip4/192.168.1.1/tcp/4002")

print(f"Adding addresses for {peer_id_1}")
peerstore.add_addrs(peer_id_1, [addr1], 3600) # 1 hour TTL

print(f"Adding addresses for {peer_id_2}")
peerstore.add_addrs(peer_id_2, [addr2], 7200) # 2 hours TTL

# Add protocols
print(f"Adding protocols for {peer_id_1}")
peerstore.add_protocols(peer_id_1, ["/ipfs/ping/1.0.0", "/ipfs/id/1.0.0"])

print(f"Adding protocols for {peer_id_2}")
peerstore.add_protocols(peer_id_2, ["/ipfs/ping/1.0.0", "/ipfs/kad/1.0.0"])

# Add metadata
print(f"Adding metadata for {peer_id_1}")
peerstore.put(peer_id_1, "agent", "go-libp2p/0.1.0")
peerstore.put(peer_id_1, "version", "1.0.0")

print(f"Adding metadata for {peer_id_2}")
peerstore.put(peer_id_2, "agent", "js-libp2p/0.1.0")
peerstore.put(peer_id_2, "version", "2.0.0")

# Record latency metrics
print(f"Recording latency for {peer_id_1}")
peerstore.record_latency(peer_id_1, 0.05) # 50ms

print(f"Recording latency for {peer_id_2}")
peerstore.record_latency(peer_id_2, 0.1) # 100ms

# Retrieve and display information
print(f"\nRetrieved peer info for {peer_id_1}:")
try:
peer_info = peerstore.peer_info(peer_id_1)
print(f" Addresses: {[str(addr) for addr in peer_info.addrs]}")
except Exception as e:
print(f" Error: {e}")

print(f"\nRetrieved protocols for {peer_id_1}:")
try:
protocols = peerstore.get_protocols(peer_id_1)
print(f" Protocols: {protocols}")
except Exception as e:
print(f" Error: {e}")

print(f"\nRetrieved metadata for {peer_id_1}:")
try:
agent = peerstore.get(peer_id_1, "agent")
version = peerstore.get(peer_id_1, "version")
print(f" Agent: {agent}")
print(f" Version: {version}")
except Exception as e:
print(f" Error: {e}")

print(f"\nRetrieved latency for {peer_id_1}:")
try:
latency = peerstore.latency_EWMA(peer_id_1)
print(f" Latency EWMA: {latency:.3f}s")
except Exception as e:
print(f" Error: {e}")

# List all peers
print(f"\nAll peer IDs: {[str(pid) for pid in peerstore.peer_ids()]}")
print(f"Valid peer IDs: {[str(pid) for pid in peerstore.valid_peer_ids()]}")
print(f"Peers with addresses: {[str(pid) for pid in peerstore.peers_with_addrs()]}")


async def demonstrate_persistence():
"""Demonstrate persistence across restarts."""
print("\n=== Persistence Demo ===")

# Create a temporary directory for SQLite database
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "peerstore.db"

# First session - add some data
print("First session: Adding peer data...")
peerstore1 = create_sqlite_peerstore(str(db_path))

peer_id = ID.from_base58("QmPersistentPeer")
addr = Multiaddr("/ip4/10.0.0.1/tcp/4001")

peerstore1.add_addrs(peer_id, [addr], 3600)
peerstore1.add_protocols(peer_id, ["/ipfs/ping/1.0.0"])
peerstore1.put(peer_id, "session", "first")

print(f"Added peer {peer_id} with address {addr}")
print(f"Peer protocols: {peerstore1.get_protocols(peer_id)}")
print(f"Peer metadata: {peerstore1.get(peer_id, 'session')}")

# Trigger persistence by calling an async method
await peerstore1._load_peer_data(peer_id)

# Close the first peerstore
await peerstore1.close()

# Second session - data should persist
print("\nSecond session: Reopening peerstore...")
peerstore2 = create_sqlite_peerstore(str(db_path))

# Trigger data loading by calling an async method
await peerstore2._load_peer_data(peer_id)

# Check if data persisted
try:
peer_info = peerstore2.peer_info(peer_id)
print(f"Retrieved peer info: {[str(addr) for addr in peer_info.addrs]}")
print(f"Retrieved protocols: {peerstore2.get_protocols(peer_id)}")
print(f"Retrieved metadata: {peerstore2.get(peer_id, 'session')}")
print("✅ Data persisted successfully!")
except Exception as e:
print(f"❌ Data did not persist: {e}")

# Update data in second session
peerstore2.put(peer_id, "session", "second")
print(f"Updated metadata: {peerstore2.get(peer_id, 'session')}")

await peerstore2.close()


async def demonstrate_different_backends():
"""Demonstrate different datastore backends."""
print("\n=== Different Backend Demo ===")

# Memory backend (not persistent)
print("\n1. Memory Backend (not persistent):")
memory_peerstore = create_memory_peerstore()
await demonstrate_peerstore_operations(memory_peerstore, "Memory")
await memory_peerstore.close()

# SQLite backend
print("\n2. SQLite Backend:")
with tempfile.TemporaryDirectory() as temp_dir:
sqlite_peerstore = create_sqlite_peerstore(Path(temp_dir) / "sqlite.db")
await demonstrate_peerstore_operations(sqlite_peerstore, "SQLite")
await sqlite_peerstore.close()

# LevelDB backend (if available)
print("\n3. LevelDB Backend:")
try:
with tempfile.TemporaryDirectory() as temp_dir:
leveldb_peerstore = create_leveldb_peerstore(Path(temp_dir) / "leveldb")
await demonstrate_peerstore_operations(leveldb_peerstore, "LevelDB")
await leveldb_peerstore.close()
except ImportError:
print("LevelDB backend not available (plyvel not installed)")

# RocksDB backend (if available)
print("\n4. RocksDB Backend:")
try:
with tempfile.TemporaryDirectory() as temp_dir:
rocksdb_peerstore = create_rocksdb_peerstore(Path(temp_dir) / "rocksdb")
await demonstrate_peerstore_operations(rocksdb_peerstore, "RocksDB")
await rocksdb_peerstore.close()
except ImportError:
print("RocksDB backend not available (pyrocksdb not installed)")


async def demonstrate_async_operations():
"""Demonstrate async operations and cleanup."""
print("\n=== Async Operations Demo ===")

with tempfile.TemporaryDirectory() as temp_dir:
peerstore = create_sqlite_peerstore(Path(temp_dir) / "async.db")

# Start cleanup task
print("Starting cleanup task...")
async with trio.open_nursery() as nursery:
nursery.start_soon(peerstore.start_cleanup_task, 1) # 1 second interval

# Add some peers
peer_id = ID.from_base58("QmAsyncPeer")
addr = Multiaddr("/ip4/127.0.0.1/tcp/4001")
peerstore.add_addrs(peer_id, [addr], 1) # 1 second TTL

print("Added peer with 1-second TTL")
print(f"Peer addresses: {[str(addr) for addr in peerstore.addrs(peer_id)]}")

# Wait for expiration
print("Waiting for peer to expire...")
await trio.sleep(2)

# Check if peer expired
try:
addrs = peerstore.addrs(peer_id)
print(f"Peer still has addresses: {[str(addr) for addr in addrs]}")
except Exception as e:
print(f"Peer expired: {e}")

# Stop the cleanup task
nursery.cancel_scope.cancel()

await peerstore.close()


async def main():
"""Main demonstration function."""
print("PersistentPeerStore Usage Examples")
print("=" * 50)

# Demonstrate basic operations
await demonstrate_peerstore_operations(create_memory_peerstore(), "Basic")

# Demonstrate persistence
await demonstrate_persistence()

# Demonstrate different backends
await demonstrate_different_backends()

# Demonstrate async operations
await demonstrate_async_operations()

print("\n" + "=" * 50)
print("All examples completed!")


if __name__ == "__main__":
trio.run(main)
7 changes: 7 additions & 0 deletions libp2p/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@
PeerStore,
create_signed_peer_record,
)
from libp2p.peer.persistent_peerstore_factory import (
create_persistent_peerstore,
create_sqlite_peerstore,
create_memory_peerstore,
create_leveldb_peerstore,
create_rocksdb_peerstore,
)
from libp2p.security.insecure.transport import (
PLAINTEXT_PROTOCOL_ID,
InsecureTransport,
Expand Down
5 changes: 0 additions & 5 deletions libp2p/peer/README.md

This file was deleted.

Empty file removed libp2p/peer/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions libp2p/peer/datastore/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Datastore abstraction layer for persistent peer storage.

This module provides a pluggable datastore interface that allows different
storage backends to be used for persistent peer storage, similar to the
go-datastore interface in go-libp2p.

The datastore interface is completely backend-agnostic, allowing users to
choose from various storage backends including:
- SQLite (for simple file-based storage)
- LevelDB (for high-performance key-value storage)
- RocksDB (for advanced features and performance)
- BadgerDB (for Go-like performance)
- In-memory (for testing and development)
- Custom backends (user-defined implementations)

All backends implement the same IDatastore interface, making the peerstore
completely portable across different storage technologies.
"""

from .base import IDatastore, IBatchingDatastore, IBatch
from .sqlite import SQLiteDatastore
from .memory import MemoryDatastore
from .leveldb import LevelDBDatastore
from .rocksdb import RocksDBDatastore

__all__ = [
"IDatastore",
"IBatchingDatastore",
"IBatch",
"SQLiteDatastore",
"MemoryDatastore",
"LevelDBDatastore",
"RocksDBDatastore",
]
Loading
Loading