diff --git a/mcpgateway/admin.py b/mcpgateway/admin.py index 5f22df961..8a6110893 100644 --- a/mcpgateway/admin.py +++ b/mcpgateway/admin.py @@ -2505,53 +2505,52 @@ async def admin_login_handler(request: Request, db: Session = Depends(get_db)) - root_path = request.scope.get("root_path", "") return RedirectResponse(url=f"{root_path}/admin/login?error=missing_fields", status_code=303) - # Authenticate using the email auth service # First-Party - from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel + from mcpgateway.services.email_auth_service import EmailAuthService auth_service = EmailAuthService(db) - try: - # Authenticate user - LOGGER.debug(f"Attempting authentication for {email}") - user = await auth_service.authenticate_user(email, password) - LOGGER.debug(f"Authentication result: {user}") - - if not user: - LOGGER.warning(f"Authentication failed for {email} - user is None") - root_path = request.scope.get("root_path", "") - return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303) - - # Create JWT token with proper audience and issuer claims - # First-Party - from mcpgateway.routers.email_auth import create_access_token # pylint: disable=import-outside-toplevel + LOGGER.debug(f"Attempting authentication for {email}") + user = await auth_service.authenticate_user(email, password) + LOGGER.debug(f"Authentication result: {user}") - token, _ = await create_access_token(user) # expires_seconds not needed here - - # Create redirect response + if not user: root_path = request.scope.get("root_path", "") - response = RedirectResponse(url=f"{root_path}/admin", status_code=303) + return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303) - # Set JWT token as secure cookie - # First-Party - from mcpgateway.utils.security_cookies import set_auth_cookie # pylint: disable=import-outside-toplevel + # First-Party + from mcpgateway.routers.email_auth import create_access_token - set_auth_cookie(response, token, remember_me=False) + token, _ = await create_access_token(user) - LOGGER.info(f"Admin user {email} logged in successfully") - return response + root_path = request.scope.get("root_path", "") + response = RedirectResponse(url=f"{root_path}/admin", status_code=303) - except Exception as e: - LOGGER.warning(f"Login failed for {email}: {e}") + # First-Party + from mcpgateway.utils.security_cookies import set_auth_cookie - if settings.secure_cookies and settings.environment == "development": - LOGGER.warning("Login failed - set SECURE_COOKIES to false in config for HTTP development") + set_auth_cookie(response, token, remember_me=False) - root_path = request.scope.get("root_path", "") - return RedirectResponse(url=f"{root_path}/admin/login?error=invalid_credentials", status_code=303) + # ✅ Set or clear the security reminder cookie + if email.lower() == "admin@example.com" and password == "changeme": + response.set_cookie( + key="pwd_is_default", + value="true", + max_age=3600 * 24, # 1 day + httponly=False, # JS needs to read it + secure=False, # set True for HTTPS environments + samesite="Lax", + ) + LOGGER.debug("Set cookie: pwd_is_default=true for admin@example.com") + else: + response.delete_cookie("pwd_is_default") + LOGGER.debug("Cleared cookie: pwd_is_default") + + LOGGER.info(f"Admin user {email} logged in successfully") + return response except Exception as e: - LOGGER.error(f"Login handler error: {e}") + LOGGER.exception(f"Login handler error: {e}") root_path = request.scope.get("root_path", "") return RedirectResponse(url=f"{root_path}/admin/login?error=server_error", status_code=303) @@ -4377,7 +4376,8 @@ async def admin_update_user( db: Session = Depends(get_db), _user=Depends(get_current_user_with_permissions), ) -> HTMLResponse: - """Update user via admin UI. + """ + Update user via admin UI. Args: user_email: Email of user to update @@ -4392,53 +4392,70 @@ async def admin_update_user( try: # First-Party - from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel + from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel auth_service = EmailAuthService(db) - # URL decode the email - decoded_email = urllib.parse.unquote(user_email) - form = await request.form() full_name = form.get("full_name") is_admin = form.get("is_admin") == "on" password = form.get("password") confirm_password = form.get("confirm_password") - # Validate password confirmation if password is being changed + # Validate password confirmation if password and password != confirm_password: return HTMLResponse(content='
Passwords do not match
', status_code=400) - # Check if trying to remove admin privileges from last admin + # Prevent removing admin role from the last admin user_obj = await auth_service.get_user_by_email(decoded_email) if user_obj and user_obj.is_admin and not is_admin: - # This user is currently an admin and we're trying to remove admin privileges if await auth_service.is_last_active_admin(decoded_email): return HTMLResponse(content='
Cannot remove administrator privileges from the last remaining admin user
', status_code=400) - # Update user - fn_val = form.get("full_name") - pw_val = form.get("password") - full_name = fn_val if isinstance(fn_val, str) else None - password = pw_val if isinstance(pw_val, str) else None - await auth_service.update_user(email=decoded_email, full_name=full_name, is_admin=is_admin, password=password if password else None) + # ✅ Update user record + await auth_service.update_user( + email=decoded_email, + full_name=full_name if isinstance(full_name, str) else None, + is_admin=is_admin, + password=password if isinstance(password, str) and password else None, + ) - # Return success message with auto-close and refresh + # ✅ Create a response object success_html = """

User updated successfully

""" - return HTMLResponse(content=success_html) + + response = HTMLResponse(content=success_html) + + # ✅ Manage "pwd_is_default" cookie for admin@example.com + if decoded_email.lower() == "admin@example.com": + if password == "changeme": + response.set_cookie( + key="pwd_is_default", + value="true", + max_age=3600 * 24, # 1 day + httponly=False, # allow JS read + secure=False, # True for HTTPS + samesite="Lax", + ) + LOGGER.debug("Set cookie: pwd_is_default=true (default password restored)") + elif password: # password updated to something else + response.delete_cookie("pwd_is_default") + LOGGER.debug("Cleared cookie: pwd_is_default (non-default password set)") + + LOGGER.info(f"User {decoded_email} updated successfully") + return response except Exception as e: LOGGER.error(f"Error updating user {user_email}: {e}") diff --git a/mcpgateway/templates/admin.html b/mcpgateway/templates/admin.html index 15746df80..163bd08be 100644 --- a/mcpgateway/templates/admin.html +++ b/mcpgateway/templates/admin.html @@ -63,6 +63,10 @@ rel="stylesheet" /> +