diff --git a/tests/unit/admin/test_routes.py b/tests/unit/admin/test_routes.py index 061278902b7a..796baf3c4b34 100644 --- a/tests/unit/admin/test_routes.py +++ b/tests/unit/admin/test_routes.py @@ -228,6 +228,16 @@ def test_includeme(): "/admin/ip-addresses/{ip_address}", domain=warehouse, ), + pretend.call( + "admin.ip_address.ban", + "/admin/ip-addresses/{ip_address}/ban", + domain=warehouse, + ), + pretend.call( + "admin.ip_address.unban", + "/admin/ip-addresses/{ip_address}/unban", + domain=warehouse, + ), pretend.call("admin.project.list", "/admin/projects/", domain=warehouse), pretend.call( "admin.project.detail", diff --git a/tests/unit/admin/views/test_ipaddresses.py b/tests/unit/admin/views/test_ipaddresses.py index 5c6b408f3d0a..7e90fdd2f75c 100644 --- a/tests/unit/admin/views/test_ipaddresses.py +++ b/tests/unit/admin/views/test_ipaddresses.py @@ -1,14 +1,16 @@ # SPDX-License-Identifier: Apache-2.0 +from datetime import datetime + import pretend import pytest -from pyramid.httpexceptions import HTTPBadRequest +from pyramid.httpexceptions import HTTPBadRequest, HTTPSeeOther from tests.common.db.accounts import UserUniqueLoginFactory from tests.common.db.ip_addresses import IpAddressFactory from warehouse.admin.views import ip_addresses as ip_views -from warehouse.ip_addresses.models import IpAddress +from warehouse.ip_addresses.models import BanReason, IpAddress class TestIpAddressList: @@ -75,3 +77,71 @@ def test_ip_address_found_with_unique_logins(self, db_request): "ip_address": db_request.ip_address, "unique_logins": [unique_login], } + + +class TestBanIpAddress: + def test_ban_ip_address_no_ip_address(self, db_request): + db_request.matchdict["ip_address"] = None + + with pytest.raises(HTTPBadRequest): + ip_views.ban_ip(db_request) + + def test_ban_ip_address_not_found(self, db_request): + db_request.matchdict["ip_address"] = "69.69.69.69" + + with pytest.raises(HTTPBadRequest): + ip_views.ban_ip(db_request) + + def test_ban_ip_address_banned(self, db_request): + ip_address = IpAddressFactory.create(is_banned=False) + db_request.matchdict["ip_address"] = str(ip_address.ip_address) + db_request.route_path = pretend.stub( + __call__=( + lambda *args, **kwargs: f"/admin/ip-addresses/{ip_address.ip_address}" + ) + ) + db_request.session.flash = pretend.call_recorder(lambda *args, **kwargs: None) + + resp = ip_views.ban_ip(db_request) + + assert isinstance(resp, HTTPSeeOther) + assert resp.location == f"/admin/ip-addresses/{ip_address.ip_address}" + assert ip_address.is_banned + assert ip_address.ban_reason == BanReason.ADMINISTRATIVE + assert ip_address.ban_date is not None + + +class TestUnbanIpAddress: + def test_unban_ip_address_no_ip_address(self, db_request): + db_request.matchdict["ip_address"] = None + + with pytest.raises(HTTPBadRequest): + ip_views.unban_ip(db_request) + + def test_unban_ip_address_not_found(self, db_request): + db_request.matchdict["ip_address"] = "69.69.69.69" + + with pytest.raises(HTTPBadRequest): + ip_views.unban_ip(db_request) + + def test_unban_ip_address_unbanned(self, db_request): + ip_address = IpAddressFactory.create( + is_banned=True, + ban_reason=BanReason.ADMINISTRATIVE, + ban_date=datetime.utcnow(), + ) + db_request.matchdict["ip_address"] = str(ip_address.ip_address) + db_request.route_path = pretend.stub( + __call__=( + lambda *args, **kwargs: f"/admin/ip-addresses/{ip_address.ip_address}" + ) + ) + db_request.session.flash = pretend.call_recorder(lambda *args, **kwargs: None) + + resp = ip_views.unban_ip(db_request) + + assert isinstance(resp, HTTPSeeOther) + assert resp.location == f"/admin/ip-addresses/{ip_address.ip_address}" + assert not ip_address.is_banned + assert ip_address.ban_reason is None + assert ip_address.ban_date is None diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 872efe0f0c3b..e51e2d64fb48 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -559,6 +559,7 @@ def test_root_factory_access_control_list(): Permissions.AdminFlagsRead, Permissions.AdminFlagsWrite, Permissions.AdminIpAddressesRead, + Permissions.AdminIpAddressesWrite, Permissions.AdminJournalRead, Permissions.AdminMacaroonsRead, Permissions.AdminMacaroonsWrite, diff --git a/warehouse/admin/routes.py b/warehouse/admin/routes.py index 8012ca66ddf2..1c2e07e82c6c 100644 --- a/warehouse/admin/routes.py +++ b/warehouse/admin/routes.py @@ -230,6 +230,16 @@ def includeme(config): "/admin/ip-addresses/{ip_address}", domain=warehouse, ) + config.add_route( + "admin.ip_address.ban", + "/admin/ip-addresses/{ip_address}/ban", + domain=warehouse, + ) + config.add_route( + "admin.ip_address.unban", + "/admin/ip-addresses/{ip_address}/unban", + domain=warehouse, + ) # Project related Admin pages config.add_route("admin.project.list", "/admin/projects/", domain=warehouse) diff --git a/warehouse/admin/templates/admin/ip_addresses/detail.html b/warehouse/admin/templates/admin/ip_addresses/detail.html index 6350122027ef..e58dfe3c97a0 100644 --- a/warehouse/admin/templates/admin/ip_addresses/detail.html +++ b/warehouse/admin/templates/admin/ip_addresses/detail.html @@ -38,9 +38,24 @@

IpAddress Record

{{ ip_address.ban_date }}
Ban Reason:
-
{{ ip_address.ban_reason.value }}
+
{{ ip_address.ban_reason.value if ip_address.ban_reason is not none else 'None' }}
+
diff --git a/warehouse/admin/views/ip_addresses.py b/warehouse/admin/views/ip_addresses.py index 66b87370feda..c27f796dfead 100644 --- a/warehouse/admin/views/ip_addresses.py +++ b/warehouse/admin/views/ip_addresses.py @@ -4,14 +4,16 @@ import typing +from datetime import datetime + from paginate_sqlalchemy import SqlalchemyOrmPage as SQLAlchemyORMPage -from pyramid.httpexceptions import HTTPBadRequest +from pyramid.httpexceptions import HTTPBadRequest, HTTPSeeOther from pyramid.view import view_config from sqlalchemy.exc import NoResultFound from warehouse.accounts.models import UserUniqueLogin from warehouse.authnz import Permissions -from warehouse.ip_addresses.models import IpAddress +from warehouse.ip_addresses.models import BanReason, IpAddress from warehouse.utils.paginate import paginate_url_factory if typing.TYPE_CHECKING: @@ -66,3 +68,59 @@ def ip_address_detail(request: Request) -> dict[str, IpAddress]: ) return {"ip_address": ip_address, "unique_logins": unique_logins} + + +@view_config( + route_name="admin.ip_address.ban", + permission=Permissions.AdminIpAddressesWrite, + request_method="POST", + uses_session=True, + require_methods=["POST"], +) +def ban_ip(request: Request): + ip_address_str = request.matchdict["ip_address"] + try: + ip_address = ( + request.db.query(IpAddress).filter_by(ip_address=ip_address_str).one() + ) + except NoResultFound: + raise HTTPBadRequest("No matching IP Address found.") + + ip_address.is_banned = True + ip_address.ban_reason = BanReason.ADMINISTRATIVE + ip_address.ban_date = datetime.utcnow() + + request.session.flash(f"Banned IP address {ip_address.ip_address}", queue="success") + + return HTTPSeeOther( + request.route_path("admin.ip_address.detail", ip_address=ip_address.ip_address) + ) + + +@view_config( + route_name="admin.ip_address.unban", + permission=Permissions.AdminIpAddressesWrite, + request_method="POST", + uses_session=True, + require_methods=["POST"], +) +def unban_ip(request: Request): + ip_address_str = request.matchdict["ip_address"] + try: + ip_address = ( + request.db.query(IpAddress).filter_by(ip_address=ip_address_str).one() + ) + except NoResultFound: + raise HTTPBadRequest("No matching IP Address found.") + + ip_address.is_banned = False + ip_address.ban_reason = None + ip_address.ban_date = None + + request.session.flash( + f"Unbanned IP address {ip_address.ip_address}", queue="success" + ) + + return HTTPSeeOther( + request.route_path("admin.ip_address.detail", ip_address=ip_address.ip_address) + ) diff --git a/warehouse/authnz/_permissions.py b/warehouse/authnz/_permissions.py index d1e443ac623e..def4203bc151 100644 --- a/warehouse/authnz/_permissions.py +++ b/warehouse/authnz/_permissions.py @@ -39,6 +39,7 @@ class Permissions(StrEnum): AdminFlagsWrite = "admin:flags:write" AdminIpAddressesRead = "admin:ip-addresses:read" + AdminIpAddressesWrite = "admin:ip-addresses:write" AdminJournalRead = "admin:journal:read" AdminMacaroonsRead = "admin:macaroons:read" diff --git a/warehouse/config.py b/warehouse/config.py index aa677803f04a..cfb75ea184c3 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -69,6 +69,7 @@ class RootFactory: Permissions.AdminFlagsRead, Permissions.AdminFlagsWrite, Permissions.AdminIpAddressesRead, + Permissions.AdminIpAddressesWrite, Permissions.AdminJournalRead, Permissions.AdminMacaroonsRead, Permissions.AdminMacaroonsWrite, diff --git a/warehouse/ip_addresses/models.py b/warehouse/ip_addresses/models.py index 0143716d92af..0878f9d317f5 100644 --- a/warehouse/ip_addresses/models.py +++ b/warehouse/ip_addresses/models.py @@ -21,6 +21,7 @@ class BanReason(enum.Enum): AUTHENTICATION_ATTEMPTS = "authentication-attempts" + ADMINISTRATIVE = "administrative" class IpAddress(db.Model): diff --git a/warehouse/migrations/versions/31ac9b5e1e8b_add_administrative_to_banreason_enum.py b/warehouse/migrations/versions/31ac9b5e1e8b_add_administrative_to_banreason_enum.py new file mode 100644 index 000000000000..7ce968cf5429 --- /dev/null +++ b/warehouse/migrations/versions/31ac9b5e1e8b_add_administrative_to_banreason_enum.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +Add ADMINISTRATIVE to BanReason enum + +Revision ID: 31ac9b5e1e8b +Revises: a6cae8e65f1a +Create Date: 2025-12-22 18:19:43.751813 +""" + + +from alembic import op +from alembic_postgresql_enum import TableReference + +revision = "31ac9b5e1e8b" +down_revision = "a6cae8e65f1a" + + +def upgrade(): + op.sync_enum_values( + enum_schema="public", + enum_name="banreason", + new_values=["authentication-attempts", "administrative"], + affected_columns=[ + TableReference( + table_schema="public", + table_name="ip_addresses", + column_name="ban_reason", + ) + ], + enum_values_to_rename=[], + ) + + +def downgrade(): + op.sync_enum_values( + enum_schema="public", + enum_name="banreason", + new_values=["authentication-attempts"], + affected_columns=[ + TableReference( + table_schema="public", + table_name="ip_addresses", + column_name="ban_reason", + ) + ], + enum_values_to_rename=[], + )