Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tests/unit/admin/test_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,16 @@ def test_includeme():
"/admin/ip-addresses/{ip_address}",
domain=warehouse,
),
pretend.call(
"admin.ip_address.ban",
"/admin/ip-addresses/{ip_address}/ban",
domain=warehouse,
),
pretend.call(
"admin.ip_address.unban",
"/admin/ip-addresses/{ip_address}/unban",
domain=warehouse,
),
pretend.call("admin.project.list", "/admin/projects/", domain=warehouse),
pretend.call(
"admin.project.detail",
Expand Down
74 changes: 72 additions & 2 deletions tests/unit/admin/views/test_ipaddresses.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# SPDX-License-Identifier: Apache-2.0

from datetime import datetime

import pretend
import pytest

from pyramid.httpexceptions import HTTPBadRequest
from pyramid.httpexceptions import HTTPBadRequest, HTTPSeeOther

from tests.common.db.accounts import UserUniqueLoginFactory
from tests.common.db.ip_addresses import IpAddressFactory
from warehouse.admin.views import ip_addresses as ip_views
from warehouse.ip_addresses.models import IpAddress
from warehouse.ip_addresses.models import BanReason, IpAddress


class TestIpAddressList:
Expand Down Expand Up @@ -75,3 +77,71 @@ def test_ip_address_found_with_unique_logins(self, db_request):
"ip_address": db_request.ip_address,
"unique_logins": [unique_login],
}


class TestBanIpAddress:
def test_ban_ip_address_no_ip_address(self, db_request):
db_request.matchdict["ip_address"] = None

with pytest.raises(HTTPBadRequest):
ip_views.ban_ip(db_request)

def test_ban_ip_address_not_found(self, db_request):
db_request.matchdict["ip_address"] = "69.69.69.69"

with pytest.raises(HTTPBadRequest):
ip_views.ban_ip(db_request)

def test_ban_ip_address_banned(self, db_request):
ip_address = IpAddressFactory.create(is_banned=False)
db_request.matchdict["ip_address"] = str(ip_address.ip_address)
db_request.route_path = pretend.stub(
__call__=(
lambda *args, **kwargs: f"/admin/ip-addresses/{ip_address.ip_address}"
)
)
db_request.session.flash = pretend.call_recorder(lambda *args, **kwargs: None)

resp = ip_views.ban_ip(db_request)

assert isinstance(resp, HTTPSeeOther)
assert resp.location == f"/admin/ip-addresses/{ip_address.ip_address}"
assert ip_address.is_banned
assert ip_address.ban_reason == BanReason.ADMINISTRATIVE
assert ip_address.ban_date is not None


class TestUnbanIpAddress:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: worth adding a test case and handling for unbanning an already-unbanned entry, or are we fine with doing the unban operation regardless of current state?

def test_unban_ip_address_no_ip_address(self, db_request):
db_request.matchdict["ip_address"] = None

with pytest.raises(HTTPBadRequest):
ip_views.unban_ip(db_request)

def test_unban_ip_address_not_found(self, db_request):
db_request.matchdict["ip_address"] = "69.69.69.69"

with pytest.raises(HTTPBadRequest):
ip_views.unban_ip(db_request)

def test_unban_ip_address_unbanned(self, db_request):
ip_address = IpAddressFactory.create(
is_banned=True,
ban_reason=BanReason.ADMINISTRATIVE,
ban_date=datetime.utcnow(),
)
db_request.matchdict["ip_address"] = str(ip_address.ip_address)
db_request.route_path = pretend.stub(
__call__=(
lambda *args, **kwargs: f"/admin/ip-addresses/{ip_address.ip_address}"
)
)
db_request.session.flash = pretend.call_recorder(lambda *args, **kwargs: None)

resp = ip_views.unban_ip(db_request)

assert isinstance(resp, HTTPSeeOther)
assert resp.location == f"/admin/ip-addresses/{ip_address.ip_address}"
assert not ip_address.is_banned
assert ip_address.ban_reason is None
assert ip_address.ban_date is None
1 change: 1 addition & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ def test_root_factory_access_control_list():
Permissions.AdminFlagsRead,
Permissions.AdminFlagsWrite,
Permissions.AdminIpAddressesRead,
Permissions.AdminIpAddressesWrite,
Permissions.AdminJournalRead,
Permissions.AdminMacaroonsRead,
Permissions.AdminMacaroonsWrite,
Expand Down
10 changes: 10 additions & 0 deletions warehouse/admin/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ def includeme(config):
"/admin/ip-addresses/{ip_address}",
domain=warehouse,
)
config.add_route(
"admin.ip_address.ban",
"/admin/ip-addresses/{ip_address}/ban",
domain=warehouse,
)
config.add_route(
"admin.ip_address.unban",
"/admin/ip-addresses/{ip_address}/unban",
domain=warehouse,
)

# Project related Admin pages
config.add_route("admin.project.list", "/admin/projects/", domain=warehouse)
Expand Down
17 changes: 16 additions & 1 deletion warehouse/admin/templates/admin/ip_addresses/detail.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,24 @@ <h3 class="card-title"><code>IpAddress</code> Record</h3>
<dd class="col-sm-8">{{ ip_address.ban_date }}</dd>

<dt class="col-sm-4">Ban Reason:</dt>
<dd class="col-sm-8">{{ ip_address.ban_reason.value }}</dd>
<dd class="col-sm-8">{{ ip_address.ban_reason.value if ip_address.ban_reason is not none else 'None' }}</dd>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reading more about Jinja2 filters recently, and I think this could also be expressed as:

Suggested change
<dd class="col-sm-8">{{ ip_address.ban_reason.value if ip_address.ban_reason is not none else 'None' }}</dd>
<dd class="col-sm-8">{{ ip_address.ban_reason.value | default('None') }}</dd>

but that means that the .value call would have failed a lookup on None.

Maybe we should instead change the BanReason from an Enum to a StrEnum, like we do in other places, and thus don't need to refer to it by .value?

A larger refactor, but probably worthwhile to consider as a follow-up.

</dl>
</div> <!-- /.card-body -->
<div class="card-footer">
{% if request.has_permission(Permissions.AdminIpAddressesWrite) %}
{% if not ip_address.is_banned %}
<form method="POST" action="{{ request.route_path('admin.ip_address.ban', ip_address=ip_address.ip_address) }}">
<input name="csrf_token" type="hidden" value="{{ request.session.get_csrf_token() }}">
<button type="submit" class="btn btn-danger">Ban this IP address</button>
</form>
{% else %}
<form method="POST" action="{{ request.route_path('admin.ip_address.unban', ip_address=ip_address.ip_address) }}">
<input name="csrf_token" type="hidden" value="{{ request.session.get_csrf_token() }}">
<button type="submit" class="btn btn-warning">Unban this IP address</button>
</form>
{% endif %}
{% endif %}
</div> <!-- /.card-footer -->
</div> <!-- /.card -->

<div class="card">
Expand Down
62 changes: 60 additions & 2 deletions warehouse/admin/views/ip_addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

import typing

from datetime import datetime

from paginate_sqlalchemy import SqlalchemyOrmPage as SQLAlchemyORMPage
from pyramid.httpexceptions import HTTPBadRequest
from pyramid.httpexceptions import HTTPBadRequest, HTTPSeeOther
from pyramid.view import view_config
from sqlalchemy.exc import NoResultFound

from warehouse.accounts.models import UserUniqueLogin
from warehouse.authnz import Permissions
from warehouse.ip_addresses.models import IpAddress
from warehouse.ip_addresses.models import BanReason, IpAddress
from warehouse.utils.paginate import paginate_url_factory

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -66,3 +68,59 @@ def ip_address_detail(request: Request) -> dict[str, IpAddress]:
)

return {"ip_address": ip_address, "unique_logins": unique_logins}


@view_config(
route_name="admin.ip_address.ban",
permission=Permissions.AdminIpAddressesWrite,
request_method="POST",
uses_session=True,
require_methods=["POST"],
)
def ban_ip(request: Request):
ip_address_str = request.matchdict["ip_address"]
try:
ip_address = (
request.db.query(IpAddress).filter_by(ip_address=ip_address_str).one()
)
except NoResultFound:
raise HTTPBadRequest("No matching IP Address found.")

ip_address.is_banned = True
ip_address.ban_reason = BanReason.ADMINISTRATIVE
ip_address.ban_date = datetime.utcnow()

request.session.flash(f"Banned IP address {ip_address.ip_address}", queue="success")

return HTTPSeeOther(
request.route_path("admin.ip_address.detail", ip_address=ip_address.ip_address)
)


@view_config(
route_name="admin.ip_address.unban",
permission=Permissions.AdminIpAddressesWrite,
request_method="POST",
uses_session=True,
require_methods=["POST"],
)
def unban_ip(request: Request):
ip_address_str = request.matchdict["ip_address"]
try:
ip_address = (
request.db.query(IpAddress).filter_by(ip_address=ip_address_str).one()
)
except NoResultFound:
raise HTTPBadRequest("No matching IP Address found.")

ip_address.is_banned = False
ip_address.ban_reason = None
ip_address.ban_date = None

request.session.flash(
f"Unbanned IP address {ip_address.ip_address}", queue="success"
)

return HTTPSeeOther(
request.route_path("admin.ip_address.detail", ip_address=ip_address.ip_address)
)
1 change: 1 addition & 0 deletions warehouse/authnz/_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Permissions(StrEnum):
AdminFlagsWrite = "admin:flags:write"

AdminIpAddressesRead = "admin:ip-addresses:read"
AdminIpAddressesWrite = "admin:ip-addresses:write"
AdminJournalRead = "admin:journal:read"

AdminMacaroonsRead = "admin:macaroons:read"
Expand Down
1 change: 1 addition & 0 deletions warehouse/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class RootFactory:
Permissions.AdminFlagsRead,
Permissions.AdminFlagsWrite,
Permissions.AdminIpAddressesRead,
Permissions.AdminIpAddressesWrite,
Permissions.AdminJournalRead,
Permissions.AdminMacaroonsRead,
Permissions.AdminMacaroonsWrite,
Expand Down
1 change: 1 addition & 0 deletions warehouse/ip_addresses/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

class BanReason(enum.Enum):
AUTHENTICATION_ATTEMPTS = "authentication-attempts"
ADMINISTRATIVE = "administrative"


class IpAddress(db.Model):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# SPDX-License-Identifier: Apache-2.0
"""
Add ADMINISTRATIVE to BanReason enum

Revision ID: 31ac9b5e1e8b
Revises: a6cae8e65f1a
Create Date: 2025-12-22 18:19:43.751813
"""


from alembic import op
from alembic_postgresql_enum import TableReference

revision = "31ac9b5e1e8b"
down_revision = "a6cae8e65f1a"


def upgrade():
op.sync_enum_values(
enum_schema="public",
enum_name="banreason",
new_values=["authentication-attempts", "administrative"],
affected_columns=[
TableReference(
table_schema="public",
table_name="ip_addresses",
column_name="ban_reason",
)
],
enum_values_to_rename=[],
)


def downgrade():
op.sync_enum_values(
enum_schema="public",
enum_name="banreason",
new_values=["authentication-attempts"],
affected_columns=[
TableReference(
table_schema="public",
table_name="ip_addresses",
column_name="ban_reason",
)
],
enum_values_to_rename=[],
)