Minimal, production-ready news feed data layer for MongoDB/Redis.
- 🗄️ Singleton Mongo/Redis clients with connection pooling
- 📰 Article upsert, dedupe, and retrieval with URL hashing
- 👥 User matching by watchlist (extensible)
- 📡 Feed fan-out with Redis ZADD
- ✅ Pydantic schema validation for articles
- 🔄 Idempotent index creation and operations
- 🧪 Comprehensive testing with mongomock support
pip install -e libs/vynn_corevynn_core automatically loads environment variables from a .env file in your project root.
Step 1: Create .env file in your project
# In your backend/application root directory
MONGO_URI=mongodb+srv://username:[email protected]/
MONGO_DB=your-database-name
REDIS_URL=redis://localhost:6379/0Step 2: Import vynn_core (it will automatically load .env)
# vynn_core will automatically find and load your .env file
from vynn_core import Article, init_indexes, upsert_articles
# Initialize database
init_indexes()Debugging Configuration Issues
from vynn_core.config import validate_config
# This will show you what .env file was found and which variables are loaded
config_info = validate_config()
print(config_info)from vynn_core import Article, init_indexes, upsert_articles, find_recent
from datetime import datetime
# Initialize database (run once)
init_indexes()
# Create and save articles
articles = [{
"url": "https://example.com/nvda-earnings",
"title": "NVIDIA Reports Strong Q4 Earnings",
"summary": "Record revenue driven by AI chip demand...",
"source": "TechNews",
"publishedAt": datetime.utcnow(),
"entities": {"tickers": ["NVDA"], "keywords": ["earnings", "AI"]},
"quality": {"llmScore": 8.5, "reason": "High relevance and recent news"}
}]
result = upsert_articles(articles)
print(f"Created: {len(result['created'])}, Updated: {len(result['updated'])}")
# Retrieve recent articles
recent = find_recent(limit=10)
for article in recent:
print(f"{article['title']} - {article['source']}")from vynn_core import Article, upsert_articles
# Process scraped articles
def process_scraped_articles(scraped_data_list):
articles = []
for data in scraped_data_list:
# Convert to vynn_core format
article = Article(
url=data["url"],
title=data["title"],
summary=data["summary"],
source=data["source"],
publishedAt=data["published_at"],
entities={"tickers": data.get("tickers", []), "keywords": data.get("keywords", [])},
quality={"llmScore": data.get("score", 5.0), "reason": "Scraped content"}
)
articles.append(article.to_mongo_dict())
# Save to database with automatic deduplication
return upsert_articles(articles)
# Use in your scraper
result = process_scraped_articles(your_scraped_articles)init_indexes()- Initialize database indexes (idempotent)test_connection()- Test MongoDB connectivityupsert_articles(docs)- Save articles with deduplicationget_articles_by_ids(ids)- Retrieve articles by ObjectIdfind_recent(limit, source)- Get recent articlesget_article_by_url(url)- Find article by URL
Article- Pydantic model with auto URL hashing- Auto-generates
urlHashfrom URL (UTM params removed) - Validates data structure and types
url_hash(url)- Generate SHA256 hash from clean URLutc_now()- Get current UTC datetime
python test_functionality.pypython test_mongodb.py{
"_id": ObjectId,
"url": "https://example.com/article",
"urlHash": "sha256_hash_of_clean_url", // Unique index
"title": "Article Title",
"summary": "Article summary...",
"source": "Source Name",
"image": "https://example.com/image.jpg", // Optional
"publishedAt": ISODate,
"entities": {
"tickers": ["NVDA", "AAPL"],
"keywords": ["earnings", "AI"]
},
"quality": {
"llmScore": 8.5,
"reason": "High relevance and recent news"
},
"createdAt": ISODate,
"updatedAt": ISODate
}urlHash(unique) - For deduplicationpublishedAt, source(compound) - For recent queriespublishedAt(descending) - For time-based queries
The package includes comprehensive error handling and logging:
- Connection failures are logged and re-raised
- Invalid articles are skipped with logging
- Duplicate key errors are handled gracefully
- All database operations include try-catch blocks
- Uses MongoDB connection pooling
- Batch operations for efficiency
- Background index creation
- URL normalization removes UTM parameters
- Automatic deduplication by URL hash
For detailed integration examples, see INTEGRATION.md.