Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 68 additions & 51 deletions mcpgateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2505,53 +2505,52 @@ async def admin_login_handler(request: Request, db: Session = Depends(get_db)) -
root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=missing_fields", status_code=303)

# Authenticate using the email auth service
# First-Party
from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
from mcpgateway.services.email_auth_service import EmailAuthService

auth_service = EmailAuthService(db)

try:
# Authenticate user
LOGGER.debug(f"Attempting authentication for {email}")
user = await auth_service.authenticate_user(email, password)
LOGGER.debug(f"Authentication result: {user}")

if not user:
LOGGER.warning(f"Authentication failed for {email} - user is None")
root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303)

# Create JWT token with proper audience and issuer claims
# First-Party
from mcpgateway.routers.email_auth import create_access_token # pylint: disable=import-outside-toplevel
LOGGER.debug(f"Attempting authentication for {email}")
user = await auth_service.authenticate_user(email, password)
LOGGER.debug(f"Authentication result: {user}")

token, _ = await create_access_token(user) # expires_seconds not needed here

# Create redirect response
if not user:
root_path = request.scope.get("root_path", "")
response = RedirectResponse(url=f"{root_path}/admin", status_code=303)
return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303)

# Set JWT token as secure cookie
# First-Party
from mcpgateway.utils.security_cookies import set_auth_cookie # pylint: disable=import-outside-toplevel
# First-Party
from mcpgateway.routers.email_auth import create_access_token

set_auth_cookie(response, token, remember_me=False)
token, _ = await create_access_token(user)

LOGGER.info(f"Admin user {email} logged in successfully")
return response
root_path = request.scope.get("root_path", "")
response = RedirectResponse(url=f"{root_path}/admin", status_code=303)

except Exception as e:
LOGGER.warning(f"Login failed for {email}: {e}")
# First-Party
from mcpgateway.utils.security_cookies import set_auth_cookie

if settings.secure_cookies and settings.environment == "development":
LOGGER.warning("Login failed - set SECURE_COOKIES to false in config for HTTP development")
set_auth_cookie(response, token, remember_me=False)

root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303)
# ✅ Set or clear the security reminder cookie
if email.lower() == "[email protected]" and password == "changeme":
response.set_cookie(
key="pwd_is_default",
value="true",
max_age=3600 * 24, # 1 day
httponly=False, # JS needs to read it
secure=False, # set True for HTTPS environments
samesite="Lax",
)
LOGGER.debug("Set cookie: pwd_is_default=true for [email protected]")
else:
response.delete_cookie("pwd_is_default")
LOGGER.debug("Cleared cookie: pwd_is_default")

LOGGER.info(f"Admin user {email} logged in successfully")
return response

except Exception as e:
LOGGER.error(f"Login handler error: {e}")
LOGGER.exception(f"Login handler error: {e}")
root_path = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root_path}/admin/login?error=server_error", status_code=303)

Expand Down Expand Up @@ -4377,7 +4376,8 @@ async def admin_update_user(
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
) -> HTMLResponse:
"""Update user via admin UI.
"""
Update user via admin UI.

Args:
user_email: Email of user to update
Expand All @@ -4392,53 +4392,70 @@ async def admin_update_user(

try:
# First-Party
from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel

auth_service = EmailAuthService(db)

# URL decode the email

decoded_email = urllib.parse.unquote(user_email)

form = await request.form()
full_name = form.get("full_name")
is_admin = form.get("is_admin") == "on"
password = form.get("password")
confirm_password = form.get("confirm_password")

# Validate password confirmation if password is being changed
# Validate password confirmation
if password and password != confirm_password:
return HTMLResponse(content='<div class="text-red-500">Passwords do not match</div>', status_code=400)

# Check if trying to remove admin privileges from last admin
# Prevent removing admin role from the last admin
user_obj = await auth_service.get_user_by_email(decoded_email)
if user_obj and user_obj.is_admin and not is_admin:
# This user is currently an admin and we're trying to remove admin privileges
if await auth_service.is_last_active_admin(decoded_email):
return HTMLResponse(content='<div class="text-red-500">Cannot remove administrator privileges from the last remaining admin user</div>', status_code=400)

# Update user
fn_val = form.get("full_name")
pw_val = form.get("password")
full_name = fn_val if isinstance(fn_val, str) else None
password = pw_val if isinstance(pw_val, str) else None
await auth_service.update_user(email=decoded_email, full_name=full_name, is_admin=is_admin, password=password if password else None)
# ✅ Update user record
await auth_service.update_user(
email=decoded_email,
full_name=full_name if isinstance(full_name, str) else None,
is_admin=is_admin,
password=password if isinstance(password, str) and password else None,
)

# Return success message with auto-close and refresh
# ✅ Create a response object
success_html = """
<div class="text-green-500 text-center p-4">
<p>User updated successfully</p>
<script>
// Hide banner immediately if password was updated
document.dispatchEvent(new Event('passwordUpdated'));
setTimeout(() => {
// Close the modal
hideUserEditModal();
// Refresh the users list
htmx.trigger(document.getElementById('users-list'), 'load');
}, 1500);
</script>
</div>
"""
return HTMLResponse(content=success_html)

response = HTMLResponse(content=success_html)

# ✅ Manage "pwd_is_default" cookie for [email protected]
if decoded_email.lower() == "[email protected]":
if password == "changeme":
response.set_cookie(
key="pwd_is_default",
value="true",
max_age=3600 * 24, # 1 day
httponly=False, # allow JS read
secure=False, # True for HTTPS
samesite="Lax",
)
LOGGER.debug("Set cookie: pwd_is_default=true (default password restored)")
elif password: # password updated to something else
response.delete_cookie("pwd_is_default")
LOGGER.debug("Cleared cookie: pwd_is_default (non-default password set)")

LOGGER.info(f"User {decoded_email} updated successfully")
return response

except Exception as e:
LOGGER.error(f"Error updating user {user_email}: {e}")
Expand Down
Loading