Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 32 additions & 33 deletions mcpgateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2505,53 +2505,52 @@ async def admin_login_handler(request: Request, db: Session = Depends(get_db)) -
root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=missing_fields", status_code=303)

# Authenticate using the email auth service
# First-Party
from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
from mcpgateway.services.email_auth_service import EmailAuthService

auth_service = EmailAuthService(db)

try:
# Authenticate user
LOGGER.debug(f"Attempting authentication for {email}")
user = await auth_service.authenticate_user(email, password)
LOGGER.debug(f"Authentication result: {user}")

if not user:
LOGGER.warning(f"Authentication failed for {email} - user is None")
root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303)

# Create JWT token with proper audience and issuer claims
# First-Party
from mcpgateway.routers.email_auth import create_access_token # pylint: disable=import-outside-toplevel
LOGGER.debug(f"Attempting authentication for {email}")
user = await auth_service.authenticate_user(email, password)
LOGGER.debug(f"Authentication result: {user}")

token, _ = await create_access_token(user) # expires_seconds not needed here

# Create redirect response
if not user:
root_path = request.scope.get("root_path", "")
response = RedirectResponse(url=f"{root_path}/admin", status_code=303)
return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303)

# First-Party
from mcpgateway.routers.email_auth import create_access_token

# Set JWT token as secure cookie
# First-Party
from mcpgateway.utils.security_cookies import set_auth_cookie # pylint: disable=import-outside-toplevel
token, _ = await create_access_token(user)

set_auth_cookie(response, token, remember_me=False)
root_path = request.scope.get("root_path", "")
response = RedirectResponse(url=f"{root_path}/admin", status_code=303)

LOGGER.info(f"Admin user {email} logged in successfully")
return response
# First-Party
from mcpgateway.utils.security_cookies import set_auth_cookie

except Exception as e:
LOGGER.warning(f"Login failed for {email}: {e}")
set_auth_cookie(response, token, remember_me=False)

if settings.secure_cookies and settings.environment == "development":
LOGGER.warning("Login failed - set SECURE_COOKIES to false in config for HTTP development")
# ✅ Set or clear the security reminder cookie
if email.lower() == "[email protected]" and password == "changeme":
response.set_cookie(
key="pwd_is_default",
value="true",
max_age=3600 * 24, # 1 day
httponly=False, # JS needs to read it
secure=False, # set True for HTTPS environments
samesite="Lax",
)
LOGGER.debug("Set cookie: pwd_is_default=true for [email protected]")
else:
response.delete_cookie("pwd_is_default")
LOGGER.debug("Cleared cookie: pwd_is_default")

root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303)
LOGGER.info(f"Admin user {email} logged in successfully")
return response

except Exception as e:
LOGGER.error(f"Login handler error: {e}")
LOGGER.exception(f"Login handler error: {e}")
root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=server_error", status_code=303)

Expand Down
225 changes: 225 additions & 0 deletions mcpgateway/templates/admin.html
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
rel="stylesheet"
/>
<link href="{{ root_path }}/static/admin.css" rel="stylesheet" />
<link
rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css"
/>
<style>
/* HTMX indicator visibility for bulk import only */
#bulk-import-indicator {
Expand Down Expand Up @@ -95,6 +99,25 @@

<body class="bg-gray-100 dark:bg-gray-900">
<div class="w-full max-w-screen-2xl mx-auto px-4 py-8">
<!-- Security reminder shown only when local admin email is entered -->
<div
id="security-reminder"
class="mb-6 p-4 bg-amber-50 dark:bg-amber-900/20 border border-amber-200 dark:border-amber-800 text-amber-700 dark:text-amber-400 rounded-xl text-sm hidden"
data-admin-email="{{ platform_admin_email | default('[email protected]') }}"
data-user-mgmt-href="{{ root_path }}/admin/users"
>
<div class="flex items-start">
<i class="fas fa-shield-alt mr-2 mt-0.5 flex-shrink-0"></i>
<div class="space-y-1">
<div class="font-medium">Security reminder</div>
<div class="text-sm">
Please change your password in the User Management page.
<a id="user-mgmt-link" class="underline font-medium" href="{{ root_path }}/admin/users">Open User Management</a>.
</div>
</div>
</div>
</div>

<header
id="sticky-header"
class="sticky top-0 left-0 right-0 z-50 w-full"
Expand Down Expand Up @@ -11454,6 +11477,208 @@ <h3 class="text-lg font-medium text-gray-900 dark:text-white mb-4">
document.body.removeChild(a);
URL.revokeObjectURL(url);
}

// Security reminder functionality for admin page
document.addEventListener('DOMContentLoaded', function() {
const banner = document.getElementById('security-reminder');
if (!banner) return;

const adminEmail = (banner.getAttribute('data-admin-email') || '[email protected]').toLowerCase();
const userMgmtLink = document.getElementById('user-mgmt-link');
const userMgmtHref = banner.getAttribute('data-user-mgmt-href');
if (userMgmtLink && userMgmtHref) userMgmtLink.href = userMgmtHref;

// Extract current user email from header
const currentUserElement = document.querySelector('.header-user');
let currentUserEmail = '';
if (currentUserElement) {
const textContent = currentUserElement.textContent || currentUserElement.innerText || '';
currentUserEmail = (textContent.replace(/\s+/g, ' ').trim().toLowerCase().split(' ').pop() || '');
}

// Helper already defined above
function getCookie(name) {
const value = '; ' + document.cookie;
const parts = value.split('; ' + name + '=');
if (parts.length === 2) return parts.pop().split(';').shift();
return '';
}


// Infer provider from either server-provided value or JWT cookie
function inferAuthProvider() {
try {
// Prefer server-provided hint if available
if (window.AUTH_PROVIDER && typeof window.AUTH_PROVIDER === 'string') {
return window.AUTH_PROVIDER.toLowerCase();
}

// Try to decode JWT cookie
const token = getCookie('jwt_token');
if (!token) return '';

const parts = token.split('.');
if (parts.length !== 3) return '';

// Base64URL decode payload
const b64 = parts[1].replace(/-/g, '+').replace(/_/g, '/');
const payloadJson = JSON.parse(atob(b64));
if (!payloadJson) return '';

const raw =
(payloadJson.auth_provider ||
payloadJson.provider ||
payloadJson.idp ||
payloadJson.iss ||
'').toString().toLowerCase();

// Normalize common SSO signals
if (!raw) return '';
if (
raw.includes('google') ||
raw.includes('okta') ||
raw.includes('github') ||
raw.includes('oidc') ||
raw.includes('oauth') ||
raw.includes('saml') ||
raw.includes('ibm') ||
raw.includes('verify')
) {
return 'sso';
}
if (raw.includes('local') || raw.includes('email') || raw.includes('password')) {
return 'local';
}

// If 'iss' is a URL, classify common IdPs as SSO
if (raw.startsWith('http')) {
if (
raw.includes('google') ||
raw.includes('okta') ||
raw.includes('github') ||
raw.includes('verify.ibm.com')
) {
return 'sso';
}
}

// Unknown means we cannot confidently classify
return '';
} catch {
return '';
}
}

const provider = inferAuthProvider();
const isSSO =
provider === 'sso' ||
['google', 'okta', 'github', 'ibm', 'verify', 'oidc', 'oauth', 'saml'].some(p => provider.includes(p));


// let currentPassword = '';
// try {
// const token = getCookie('jwt_token');
// if (token) {
// const payload = JSON.parse(atob(token.split('.')[1]));
// currentPassword = (payload.password || '').toLowerCase();
// }
// } catch {
// currentPassword = '';
// }

// Fallback: check if backend sets pwd_is_default cookie
const pwdIsDefault = getCookie('pwd_is_default') === 'true';


// Show banner only if the logged-in user is the platform admin AND the session is NOT SSO
const shouldShow =
!!currentUserEmail &&
currentUserEmail === adminEmail &&
!isSSO &&
pwdIsDefault;

if (shouldShow) {
banner.classList.remove('hidden');
} else {
banner.classList.add('hidden');
}
});
</script>
<!-- Add this helper just before </body> -->
<script>
(function () {
function activateUsersPanel() {
// Try to find the Users panel by common selectors
const usersPanel =
document.getElementById('users-panel') ||
document.querySelector('[data-panel="users"]');

if (usersPanel) {
// Hide other panels if a tabbed SPA structure exists
const panels = document.querySelectorAll('.tab-panel,[data-panel]');
if (panels.length) {
panels.forEach((p) => {
const isUsers = p === usersPanel || p.getAttribute('data-panel') === 'users';
p.classList.toggle('hidden', !isUsers);
});
} else {
// Minimal fallback: ensure users panel is visible
usersPanel.classList.remove('hidden');
}

// Optional: mark Users tab active if present
const maybeTabs = document.querySelectorAll(
'a[data-testid="users-tab"], nav a[href="#users"]'
);
if (maybeTabs.length) {
maybeTabs.forEach((a) => a.classList.add('active'));
}
}
}

function routeToUsers(e) {
if (e) e.preventDefault();
// If already on admin SPA, just switch panels
if (location.pathname.endsWith('/admin')) {
if (location.hash !== '#users') history.replaceState(null, '', '#users');
// Defer to allow any SPA init to complete
setTimeout(activateUsersPanel, 0);
} else {
// Fallback: navigate to SPA with deep-link
location.href = '{{ root_path }}/admin#users';
}
}

function fixUserLinks() {
// Normalize any links pointing to /admin/users → #users and intercept click
const selectors = [
'a[href="/admin/users"]',
'a[href$="/admin/users"]',
'a[href^="{{ root_path }}/admin/users"]',
'#user-mgmt-link'
];
document.querySelectorAll(selectors.join(',')).forEach((a) => {
try {
a.setAttribute('href', '#users');
a.addEventListener('click', routeToUsers, { capture: true });
} catch {}
});
}

document.addEventListener('DOMContentLoaded', () => {
fixUserLinks();

// Open Users panel on deep-link
if (location.hash === '#users') {
setTimeout(activateUsersPanel, 0);
}

// Keep SPA in sync on hash changes
window.addEventListener('hashchange', () => {
if (location.hash === '#users') activateUsersPanel();
});
});
})();
</script>
</body>
</html>
23 changes: 15 additions & 8 deletions tests/unit/mcpgateway/cache/test_resource_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,23 @@ def test_get_missing(cache):


def test_expiration(cache):
"""Test that cache entry expires after TTL."""
# Use a more generous sleep duration to account for timing variability
cache.set("foo", "bar")

# Sleep for 1.5 seconds (50% longer than TTL) to ensure expiration
# This accounts for system load, clock precision, and floating point issues
"""Test that a cache entry expires after its TTL."""

# Set a test key with a known short TTL (0.5 seconds)
test_key = "foo"
test_value = "bar"

# Set the cache entry
cache.set(test_key, test_value)

# Immediately check the value exists
assert cache.get(test_key) == test_value

# Sleep slightly longer than TTL to ensure expiration
time.sleep(1.5)

# Entry should definitely be expired now
assert cache.get("foo") is None
# Now the cache entry should be expired
assert cache.get(test_key) is None


def test_lru_eviction(cache):
Expand Down
Loading