OpsMate Implementation Plan
Refactor: ExecOps → OpsMate
Scope : Complete replacement (ExecOps becomes OpsMate)
Infra : Keep Docker (Neo4j/Postgres/Redis), add AWS Boto3
Sentinel : Keep PR compliance work
New Features : Night Watchman, Zombie Hunter, Access Guard
┌─────────────────────────────────────────────────────────────┐
│ OpsMate AI Service │
│ (FastAPI) │
├─────────────────────────────────────────────────────────────┤
│ Verticals: │
│ ├── Sentinel (PR Compliance) ← Keep existing │
│ ├── Watchman (Night Watchman) ← NEW │
│ ├── Hunter (Zombie Hunter) ← NEW │
│ └── Guard (Access Guard) ← NEW │
├─────────────────────────────────────────────────────────────┤
│ Integrations: │
│ ├── AWS Boto3 (EC2, EBS, IAM) │
│ ├── GitHub (Commits, Activity) │
│ ├── Linear (Tickets, Status) │
│ ├── Slack (Notifications, Actions) │
│ └── Lemon Squeezy (Subscriptions) │
├─────────────────────────────────────────────────────────────┤
│ Data Layer: │
│ ├── Neo4j (Graph Context: Devs, Repos, Instances) │
│ ├── PostgreSQL (User Config, SOPs, Webhooks) │
│ └── Redis (Celery Tasks, Caching) │
└─────────────────────────────────────────────────────────────┘
Phase 1: AWS Infrastructure Integration
1.1 AWS Boto3 Client (src/integrations/aws.py)
EC2 instance management (start/stop by tags)
EBS volume scanning (unattached volumes)
IAM user access tracking
Cost estimation utilities
1.2 AWS Graph Schema Extension
(:EC2Instance { id , name , tag_env , tag_team , state } )
(:EBSVolume { id , size , state , cost_monthly } )
(:IAMUser { id , name , email , last_active } )
1.3 AWS Config Models (src/schemas/aws.py)
@dataclass
class EC2Instance :
id : str
name : str
tag_env : str # 'staging', 'production'
tag_team : str
state : str # 'running', 'stopped'
@dataclass
class ZombieVolume :
volume_id : str
size_gb : int
cost_monthly : float
attached_instance : Optional [str ]
Phase 2: Night Watchman Agent
2.1 Watchman Agent (src/agents/watchman/)
state.py: WatchmanState TypedDict
nodes.py: context_check, should_shutdown, execute_shutdown
graph.py: StateGraph with schedule trigger
async def should_shutdown (org_id : str ) -> tuple [bool , str ]:
# 1. Check GitHub activity (Neo4j)
commits = await graph .query ("""
MATCH (d:Developer)-[:COMMITTED]->(r:Repo)
WHERE r.timestamp > NOW() - 30 mins
RETURN count(d) as active_devs
""" )
# 2. Check Linear urgency
urgent_tickets = await linear .get_urgent_tickets ()
if commits > 0 :
return False , "Active commits detected"
if urgent_tickets :
return False , f"{ len (urgent_tickets )} urgent tickets"
return True , "Team is offline"
2.3 Schedule Trigger (FastAPI Cron)
from celery import Celery
from celery .schedules import crontab
app = Celery ()
app .conf .beat_schedule = {
'night-watchman' : {
'task' : 'watchman.run' ,
'schedule' : crontab (hour = 20 , minute = 0 ), # 8 PM
},
}
Phase 3: Zombie Hunter Agent
3.1 Hunter Agent (src/agents/hunter/)
Scan for unattached EBS volumes
Calculate monthly waste
Generate Slack action blocks with delete buttons
async def find_zombies (org_id : str ) -> list [ZombieVolume ]:
ec2 = get_boto3_client ('ec2' )
# Find unattached volumes
for vol in ec2 .volumes .filter (
Filters = [{'Name' : 'status' , 'Values' : ['available' ]}]
):
zombie = ZombieVolume (
volume_id = vol .id ,
size_gb = vol .size ,
cost_monthly = vol .size * 0.08 , # ~$0.08/GB/month
attached_instance = None
)
yield zombie
# Find old snapshots (>= 30 days)
for snap in ec2 .snapshots .filter (OwnerIds = ['self' ]):
if snap .start_time < datetime .now () - timedelta (days = 30 ):
yield ZombieSnapshot (snap .id , cost_monthly = snap .size * 0.05 )
{
"blocks" : [
{
"type" : " section" ,
"text" : {"type" : " mrkdwn" , "text" : " 🧟 *Zombie Found!*" }
},
{
"type" : " section" ,
"fields" : [
{"type" : " mrkdwn" , "text" : " *Volume:* `vol-12345`" },
{"type" : " mrkdwn" , "text" : " *Waste:* $12.40/month" }
]
},
{
"type" : " actions" ,
"elements" : [
{"type" : " button" , "text" : " Delete ($12.40/mo)" , "action_id" : " delete_volume" }
]
}
]
}
Phase 4: Access Guard Agent
4.1 Guard Agent (src/agents/guard/)
Monitor Slack workspace membership
Monitor GitHub org membership
Auto-revoke AWS IAM access on departure (with human approval)
4.2 Departure Detection (Safe with Grace Period)
import re
from urllib .parse import quote_plus
def normalize_email (email : str ) -> str :
"""Normalize email, handling plus addressing and aliases."""
email = email .lower ().strip ()
# Handle Gmail/Google aliases (user+tag@gmail.com)
local , domain = email .split ('@' , 1 )
if domain in ('gmail.com' , 'googlemail.com' ):
local = local .split ('+' )[0 ]
return f"{ local } @{ domain } "
async def check_departures (org_id : str , grace_period_days : int = 7 ):
"""Check for departed users with configurable grace period.
Args:
org_id: Organization ID
grace_period_days: Days to wait before flagging as departed
"""
# Get current Slack members (with retry on transient failure)
slack_members = await retry_slack_call (slack .get_workspace_members )
# Get current GitHub org members (with retry on transient failure)
gh_members = await retry_github_call (github .get_org_members )
# Normalize and dedupe emails
slack_emails = {normalize_email (m .email ) for m in slack_members if m .email }
gh_emails = {normalize_email (m .email ) for m in gh_members if m .email }
# Get IAM users
iam_users = await aws .list_iam_users ()
departed = []
for user in iam_users :
email = normalize_email (user .email ) if user .email else ""
if not email :
continue
# Check if user is missing from both platforms
missing_from = []
if email not in slack_emails :
missing_from .append ("slack" )
if email not in gh_emails :
missing_from .append ("github" )
if missing_from :
# Add grace period check (would check last_active, etc.)
departed .append ({
"user" : user ,
"missing_from" : missing_from ,
"grace_period_days" : grace_period_days ,
})
return departed
4.3 Manual Approval Flow (Safe, No Auto-Revocation)
async def request_access_revocation_approval (departed_user : dict ):
"""Send Slack approval request for human review.
Never revoke access automatically - require explicit approval.
"""
user = departed_user ["user" ]
missing = ", " .join (departed_user ["missing_from" ])
# 1. Log detection event to audit trail BEFORE any action
await audit_log .log ({
"event" : "departure_detected" ,
"user" : user .name ,
"email" : user .email ,
"missing_from" : missing ,
"grace_period" : departed_user ["grace_period_days" ],
"timestamp" : datetime .utcnow (),
})
# 2. Send approval request to Slack
approval_id = await slack .request_approval (
channel = "#security" ,
title = "AWS Access Revocation Request" ,
blocks = [
{
"type" : "section" ,
"text" : {
"type" : "mrkdwn" ,
"text" : f"*🚨 Departure Detected*\n "
f"User: `{ user .name } ` <mailto:{ user .email } |{ user .email } >\n "
f"Missing from: { missing } \n "
f"Grace period: { departed_user ['grace_period_days' ]} days" ,
}
},
{
"type" : "actions" ,
"elements" : [
{"type" : "button" , "text" : "✅ Approve Revocation" , "style" : "danger" , "action_id" : "approve_revocation" },
{"type" : "button" , "text" : "❌ Keep Access" , "action_id" : "keep_access" },
]
}
]
)
return approval_id
async def revoke_access (user : IAMUser , approved_by : str ):
"""Perform access revocation after human approval.
All actions are logged to audit trail.
"""
# 1. Log pre-revocation audit entry
await audit_log .log ({
"event" : "revocation_initiated" ,
"user" : user .name ,
"approved_by" : approved_by ,
"timestamp" : datetime .utcnow (),
})
# 2. Remove from IAM groups
removed_groups = await aws .remove_user_from_groups (user )
await audit_log .log ({
"event" : "groups_removed" ,
"user" : user .name ,
"groups" : removed_groups ,
})
# 3. Revoke access keys
deleted_keys = await aws .delete_access_keys (user )
await audit_log .log ({
"event" : "access_keys_deleted" ,
"user" : user .name ,
"keys" : deleted_keys ,
})
# 4. Notify in Slack
await slack .send_admin_alert (
f"🔐 Revoked AWS access for { user .name } (approved by { approved_by } )\n "
f"Groups removed: { ', ' .join (removed_groups )} \n "
f"Keys deleted: { len (deleted_keys )} "
)
Phase 5: Lemon Squeezy Payments
5.1 Payment Integration (src/integrations/payments.py)
Webhook handler for subscription events
Customer portal link generation
Usage-based pricing tracking
5.2 Subscription Models (src/schemas/billing.py)
@dataclass
class Subscription :
id : str
customer_id : str
status : str # 'active', 'past_due', 'cancelled'
plan : str # 'starter', 'pro', 'enterprise'
monthly_cost : int # cents
@dataclass
class UsageRecord :
id : str
subscription_id : str
action_type : str # 'shutdown', 'zombie_found', 'access_revoked'
quantity : int
timestamp : datetime
5.3 Webhook Handler (Secure with Signature + Idempotency)
import hmac
import hashlib
from fastapi .responses import JSONResponse
from redis import Redis
@app .post ("/webhooks/lemonsqueezy" )
async def lemonsqueezy_webhook (request : Request , redis : Redis = Depends (get_redis )):
# 1. Verify signature
signature = request .headers .get ('X-Signature' )
raw_body = await request .body ()
expected = hmac .new (
settings .LEMON_SQUEEZY_SECRET .encode (),
raw_body ,
hashlib .sha256 ,
).hexdigest ()
if not hmac .compare_digest (signature , expected ):
return JSONResponse (status_code = 401 , content = {"error" : "Invalid signature" })
# 2. Idempotency check
payload = json .loads (raw_body )
event_name = payload .get ('event_name' )
resource_id = payload .get ('data' , {}).get ('id' )
idempotency_key = f"lemonsqueezy:{ event_name } :{ resource_id } "
if redis .exists (idempotency_key ):
return {"status" : "already_processed" }
redis .setex (idempotency_key , 86400 , "1" ) # 24h TTL
# 3. Acknowledge immediately, process asynchronously
asyncio .create_task (process_webhook_event (payload ))
return {"status" : "received" }
async def process_webhook_event (payload : dict ):
event_name = payload .get ('event_name' )
try :
if event_name == 'subscription_created' :
await handle_subscription_created (payload )
elif event_name == 'subscription_cancelled' :
await handle_subscription_cancelled (payload )
elif event_name == 'payment_failed' :
await handle_payment_failed (payload )
except Exception as e :
logger .error (f"Webhook processing failed: { e } " )
# Could retry via Celery or dead-letter queue
Phase 6: Updated Project Structure
ai-service/
├── src/ai_service/
│ ├── agents/
│ │ ├── sentinel/ # Keep existing (PR compliance)
│ │ ├── watchman/ # NEW: Night Watchman
│ │ │ ├── state.py
│ │ │ ├── nodes.py
│ │ │ └── graph.py
│ │ ├── hunter/ # NEW: Zombie Hunter
│ │ │ ├── state.py
│ │ │ ├── nodes.py
│ │ │ └── scanner.py
│ │ └── guard/ # NEW: Access Guard
│ │ ├── state.py
│ │ ├── nodes.py
│ │ └── detector.py
│ ├── integrations/
│ │ ├── github.py # Keep existing
│ │ ├── linear.py # Keep existing
│ │ ├── slack.py # Keep existing
│ │ ├── aws.py # NEW: Boto3 wrapper
│ │ └── payments.py # NEW: Lemon Squeezy
│ ├── memory/
│ │ ├── graph.py # Extend with AWS nodes
│ │ └── checkpoint.py # Keep existing
│ ├── schemas/
│ │ ├── aws.py # NEW: AWS models
│ │ ├── billing.py # NEW: Payment models
│ │ └── sentinel.py # Keep existing
│ └── tasks/
│ └── scheduler.py # NEW: Celery beat schedule
├── tests/
│ ├── test_sentinel.py # Keep existing
│ ├── test_watchman.py # NEW
│ ├── test_hunter.py # NEW
│ └── test_guard.py # NEW
└── pyproject.toml # Add boto3, lemonsqueezy
[dependencies ]
# Existing...
"boto3>=1.35.0",
"botocore>=1.35.0",
"lemonsqueezy>=0.2.0", # Requires Python >= 3.10
# NOTE: lemonsqueezy is a thin wrapper around the Lemon Squeezy API.
# For production, consider using direct API calls (httpx) if stability
# is a concern, as the library may have limited maintenance.
[dev-dependencies ]
# Existing...
"moto>=5.0.0", # AWS mocking for tests
Watchman : Context check logic, shutdown decision
Hunter : Zombie detection, cost calculation
Guard : Departure detection, revocation workflow
AWS : Mock EC2, EBS, IAM operations
Full Watchman workflow (cron trigger → context → action)
Hunter scan → Slack block → delete action
Guard detection → revocation → notification
Watchman decision quality (shutdown vs keep online)
Hunter cost accuracy
Guard access revocation correctness
Feature
Status
Tests
AWS Boto3 Integration
Not Started
0
Night Watchman Agent
Not Started
0
Zombie Hunter Agent
Not Started
0
Access Guard Agent
Not Started
0
Lemon Squeezy Payments
Not Started
0
Sentinel (PR Compliance)
Keep Existing
32
Phase
Files
Tests
Complexity
AWS Integration
1
8
Medium
Watchman Agent
3
12
High
Hunter Agent
3
8
Medium
Guard Agent
3
8
Medium
Payments
1
6
Low
Total
11
42
-