diff --git a/tests/unit/manage/test_views.py b/tests/unit/manage/test_views.py index 9f8f2f723350..d45ba2d3757e 100644 --- a/tests/unit/manage/test_views.py +++ b/tests/unit/manage/test_views.py @@ -7551,6 +7551,90 @@ def test_add_oidc_publisher_already_registered_with_project( ) ] + def test_add_oidc_publisher_already_registered_after_normalization( + self, monkeypatch, db_request + ): + publisher = GitHubPublisher( + repository_name="some-repository", + repository_owner="some-owner", + repository_owner_id="666", + workflow_filename="some-workflow-filename.yml", + environment="some-environment", + ) + post_body = MultiDict( + { + "owner": "some-owner", + "repository": "some-repository", + "workflow_filename": "some-workflow-filename.yml", + "environment": "SOME-environment", + } + ) + db_request.user = UserFactory.create() + EmailFactory(user=db_request.user, verified=True, primary=True) + db_request.db.add(publisher) + db_request.db.flush() # To get it in the DB + + project = pretend.stub( + name="fakeproject", + oidc_publishers=[publisher], + record_event=pretend.call_recorder(lambda *a, **kw: None), + ) + + db_request.registry = pretend.stub( + settings={ + "github.token": "fake-api-token", + } + ) + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.session = pretend.stub( + flash=pretend.call_recorder(lambda *a, **kw: None) + ) + db_request.POST = post_body + + view = views.ManageOIDCPublisherViews(project, db_request) + monkeypatch.setattr( + views.GitHubPublisherForm, + "_lookup_owner", + lambda *a: {"login": "some-owner", "id": "some-owner-id"}, + ) + + monkeypatch.setattr( + view, "_hit_ratelimits", pretend.call_recorder(lambda: None) + ) + monkeypatch.setattr( + view, "_check_ratelimits", pretend.call_recorder(lambda: None) + ) + + assert view.add_github_oidc_publisher() == { + "disabled": { + "GitHub": False, + "GitLab": False, + "Google": False, + "ActiveState": False, + }, + "project": project, + "github_publisher_form": view.github_publisher_form, + "gitlab_publisher_form": view.gitlab_publisher_form, + "google_publisher_form": view.google_publisher_form, + "activestate_publisher_form": view.activestate_publisher_form, + "prefilled_provider": view.prefilled_provider, + } + assert view.metrics.increment.calls == [ + pretend.call( + "warehouse.oidc.add_publisher.attempt", + tags=["publisher:GitHub"], + ), + ] + assert project.record_event.calls == [] + assert db_request.session.flash.calls == [ + pretend.call( + f"{str(publisher)} is already registered with fakeproject", + queue="error", + ) + ] + @pytest.mark.parametrize( ("view_name", "publisher_name"), [ diff --git a/tests/unit/oidc/models/test_activestate.py b/tests/unit/oidc/models/test_activestate.py index 8797b2d451fe..1902ab039f03 100644 --- a/tests/unit/oidc/models/test_activestate.py +++ b/tests/unit/oidc/models/test_activestate.py @@ -37,14 +37,6 @@ SUBJECT = f"org:{ORG_URL_NAME}:project:{PROJECT_NAME}" -def test_lookup_strategies(): - assert ( - len(ActiveStatePublisher.__lookup_strategies__) - == len(PendingActiveStatePublisher.__lookup_strategies__) - == 1 - ) - - def new_signed_claims( sub: str = SUBJECT, actor: str = ACTOR, @@ -381,6 +373,13 @@ def test_exists(self, db_request, exists_in_db): assert publisher.exists(db_request.db) == exists_in_db + def test_lookup_no_matching_publishers(self, db_request): + signed_claims = new_signed_claims(actor_id="my_id") + + with pytest.raises(InvalidPublisherError) as e: + ActiveStatePublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + class TestPendingActiveStatePublisher: def test_reify_does_not_exist_yet(self, db_request): diff --git a/tests/unit/oidc/models/test_core.py b/tests/unit/oidc/models/test_core.py index 0832e4dddd9c..bd1ed8080969 100644 --- a/tests/unit/oidc/models/test_core.py +++ b/tests/unit/oidc/models/test_core.py @@ -43,9 +43,8 @@ def test_check_claim_invariant(): class TestOIDCPublisher: def test_lookup_by_claims_raises(self): - with pytest.raises(errors.InvalidPublisherError) as e: + with pytest.raises(NotImplementedError): _core.OIDCPublisher.lookup_by_claims(pretend.stub(), pretend.stub()) - assert str(e.value) == "All lookup strategies exhausted" def test_oidc_publisher_not_default_verifiable(self): publisher = _core.OIDCPublisher(projects=[]) diff --git a/tests/unit/oidc/models/test_github.py b/tests/unit/oidc/models/test_github.py index aafa23ff4f46..e095596d2663 100644 --- a/tests/unit/oidc/models/test_github.py +++ b/tests/unit/oidc/models/test_github.py @@ -96,13 +96,6 @@ def test_check_sub(claim): class TestGitHubPublisher: - def test_lookup_strategies(self): - assert ( - len(github.GitHubPublisher.__lookup_strategies__) - == len(github.PendingGitHubPublisher.__lookup_strategies__) - == 2 - ) - @pytest.mark.parametrize("environment", [None, "some_environment"]) def test_lookup_fails_invalid_workflow_ref(self, environment): signed_claims = { @@ -116,7 +109,8 @@ def test_lookup_fails_invalid_workflow_ref(self, environment): # The `job_workflow_ref` is malformed, so no queries are performed. with pytest.raises( - errors.InvalidPublisherError, match="All lookup strategies exhausted" + errors.InvalidPublisherError, + match="Could not job extract workflow filename from OIDC claims", ): github.GitHubPublisher.lookup_by_claims(pretend.stub(), signed_claims) @@ -165,6 +159,28 @@ def test_lookup_escapes(self, db_request, environment, workflow_a, workflow_b): == workflow ) + def test_lookup_no_matching_publishers(self, db_request): + GitHubPublisherFactory( + id="aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + repository_owner="foo", + repository_name="bar", + repository_owner_id="1234", + workflow_filename="release.yml", + environment="environment", + ) + signed_claims = { + "repository": "foo/bar", + "job_workflow_ref": ( + "foo/bar/.github/workflows/release.yml@refs/heads/main" + ), + "repository_owner_id": "1234", + "environment": "another_environment", + } + + with pytest.raises(errors.InvalidPublisherError) as e: + github.GitHubPublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + def test_github_publisher_all_known_claims(self): assert github.GitHubPublisher.all_known_claims() == { # required verifiable claims diff --git a/tests/unit/oidc/models/test_gitlab.py b/tests/unit/oidc/models/test_gitlab.py index 0202a3e7b893..0ff5708f43b6 100644 --- a/tests/unit/oidc/models/test_gitlab.py +++ b/tests/unit/oidc/models/test_gitlab.py @@ -64,13 +64,6 @@ def test_check_sub(claim): class TestGitLabPublisher: - def test_lookup_strategies(self): - assert ( - len(gitlab.GitLabPublisher.__lookup_strategies__) - == len(gitlab.PendingGitLabPublisher.__lookup_strategies__) - == 2 - ) - @pytest.mark.parametrize("environment", [None, "some_environment"]) def test_lookup_fails_invalid_ci_config_ref_uri(self, environment): signed_claims = { @@ -83,7 +76,8 @@ def test_lookup_fails_invalid_ci_config_ref_uri(self, environment): # The `ci_config_ref_uri` is malformed, so no queries are performed. with pytest.raises( - errors.InvalidPublisherError, match="All lookup strategies exhausted" + errors.InvalidPublisherError, + match="Could not extract workflow filename from OIDC claims", ): gitlab.GitLabPublisher.lookup_by_claims(pretend.stub(), signed_claims) @@ -131,6 +125,15 @@ def test_lookup_escapes( == workflow_filepath ) + def test_lookup_no_matching_publisher(self, db_request): + signed_claims = { + "project_path": "foo/bar", + "ci_config_ref_uri": ("gitlab.com/foo/bar//.gitlab-ci.yml@refs/heads/main"), + } + with pytest.raises(errors.InvalidPublisherError) as e: + gitlab.GitLabPublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + def test_gitlab_publisher_all_known_claims(self): assert gitlab.GitLabPublisher.all_known_claims() == { # required verifiable claims diff --git a/tests/unit/oidc/models/test_google.py b/tests/unit/oidc/models/test_google.py index 04a3c0a2de20..a63886d306a7 100644 --- a/tests/unit/oidc/models/test_google.py +++ b/tests/unit/oidc/models/test_google.py @@ -19,14 +19,6 @@ from warehouse.oidc.models import _core, google -def test_lookup_strategies(): - assert ( - len(google.GooglePublisher.__lookup_strategies__) - == len(google.PendingGooglePublisher.__lookup_strategies__) - == 2 - ) - - class TestGooglePublisher: def test_publisher_name(self): publisher = google.GooglePublisher(email="fake@example.com") @@ -196,6 +188,16 @@ def test_google_publisher_sub_is_optional(self, expected_sub, actual_sub, valid) ) assert str(e.value) == "Check failed for optional claim 'sub'" + def test_lookup_no_matching_publishers(self, db_request): + signed_claims = { + "email": "fake@example.com", + "email_verified": True, + } + + with pytest.raises(errors.InvalidPublisherError) as e: + google.GooglePublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + @pytest.mark.parametrize("exists_in_db", [True, False]) def test_exists(self, db_request, exists_in_db): publisher = google.GooglePublisher( diff --git a/warehouse/oidc/models/_core.py b/warehouse/oidc/models/_core.py index 4cbc3511c2d1..7ffd2dc7bafc 100644 --- a/warehouse/oidc/models/_core.py +++ b/warehouse/oidc/models/_core.py @@ -13,7 +13,7 @@ from __future__ import annotations from collections.abc import Callable -from typing import TYPE_CHECKING, Any, TypedDict, TypeVar, Unpack +from typing import TYPE_CHECKING, Any, Self, TypedDict, TypeVar, Unpack import rfc3986 import sentry_sdk @@ -162,32 +162,15 @@ class OIDCPublisherMixin: # required and optional attributes, and thus can't be naively looked # up from a raw claim set. # - # Each subclass should explicitly override this list to contain - # class methods that take a `SignedClaims` and return a SQLAlchemy - # expression that, when queried, should produce exactly one or no result. - # This list should be ordered by specificity, e.g. selecting for the - # expression with the most optional constraints first, and ending with - # the expression with only required constraints. + # Each subclass should explicitly override this method, which takes + # a set of claims (`SignedClaims`) and returns a Publisher. + # In case that multiple publishers satisfy the given claims, the + # most specific publisher should be the one returned, i.e. the one with + # the most optional constraints satisfied. # - # TODO(ww): In principle this list is computable directly from - # `__required_verifiable_claims__` and `__optional_verifiable_claims__`, - # but there are a few problems: those claim sets don't map to their - # "equivalent" column (only to an instantiated property), and may not - # even have an "equivalent" column. - __lookup_strategies__: list = [] - @classmethod - def lookup_by_claims(cls, session, signed_claims: SignedClaims): - for lookup in cls.__lookup_strategies__: - query = lookup(cls, signed_claims) - if not query: - # We might not build a query if we know the claim set can't - # satisfy it. If that's the case, then we skip. - continue - - if publisher := query.with_session(session).one_or_none(): - return publisher - raise InvalidPublisherError("All lookup strategies exhausted") + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: + raise NotImplementedError @classmethod def all_known_claims(cls) -> set[str]: @@ -244,7 +227,7 @@ def check_claims_existence(cls, signed_claims: SignedClaims) -> None: with sentry_sdk.new_scope() as scope: scope.fingerprint = [claim_name] sentry_sdk.capture_message( - f"JWT for {cls.__name__} is missing claim: " f"{claim_name}" + f"JWT for {cls.__name__} is missing claim: {claim_name}" ) raise InvalidPublisherError(f"Missing claim {claim_name!r}") diff --git a/warehouse/oidc/models/activestate.py b/warehouse/oidc/models/activestate.py index 9544e6ac762f..936c2b43a8eb 100644 --- a/warehouse/oidc/models/activestate.py +++ b/warehouse/oidc/models/activestate.py @@ -13,7 +13,7 @@ import urllib -from typing import Any +from typing import Any, Self from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID @@ -92,18 +92,6 @@ class ActiveStatePublisherMixin: "project_visibility", } - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims): - return Query(klass).filter_by( - organization=signed_claims["organization"], - activestate_project_name=signed_claims["project"], - actor_id=signed_claims["actor_id"], - ) - - __lookup_strategies__ = [ - __lookup_all__, - ] - @property def sub(self) -> str: return f"org:{self.organization}:project:{self.activestate_project_name}" @@ -147,6 +135,17 @@ def exists(self, session) -> bool: ) ).scalar() + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: + query: Query = Query(cls).filter_by( + organization=signed_claims["organization"], + activestate_project_name=signed_claims["project"], + actor_id=signed_claims["actor_id"], + ) + if publisher := query.with_session(session).one_or_none(): + return publisher + raise InvalidPublisherError("Publisher with matching claims was not found") + class ActiveStatePublisher(ActiveStatePublisherMixin, OIDCPublisher): __tablename__ = "activestate_oidc_publishers" diff --git a/warehouse/oidc/models/github.py b/warehouse/oidc/models/github.py index e728fbab3b11..aaa1beca02fc 100644 --- a/warehouse/oidc/models/github.py +++ b/warehouse/oidc/models/github.py @@ -12,8 +12,9 @@ import re -from typing import Any +from typing import Any, Self +from more_itertools import first_true from pypi_attestations import GitHubPublisher as GitHubIdentity, Publisher from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID @@ -192,49 +193,50 @@ class GitHubPublisherMixin: "ref_protected", } - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims) -> Query | None: - # This lookup requires the environment claim to be present; - # if it isn't, bail out early. - if not (environment := signed_claims.get("environment")): - return None - - repository = signed_claims["repository"] - repository_owner, repository_name = repository.split("/", 1) - workflow_ref = signed_claims["job_workflow_ref"] - - if not (workflow_filename := _extract_workflow_filename(workflow_ref)): - return None + # Get the most specific publisher from a list of publishers, + # where publishers constrained with an environment are more + # specific than publishers not constrained on environment. + @classmethod + def _get_publisher_for_environment( + cls, publishers: list[Self], environment: str | None + ) -> Self | None: + if environment: + if specific_publisher := first_true( + publishers, pred=lambda p: p.environment == environment.lower() + ): + return specific_publisher + + if general_publisher := first_true( + publishers, pred=lambda p: p.environment == "" + ): + return general_publisher - return Query(klass).filter_by( - repository_name=repository_name, - repository_owner=repository_owner, - repository_owner_id=signed_claims["repository_owner_id"], - environment=environment.lower(), - workflow_filename=workflow_filename, - ) + return None - @staticmethod - def __lookup_no_environment__(klass, signed_claims: SignedClaims) -> Query | None: + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: repository = signed_claims["repository"] repository_owner, repository_name = repository.split("/", 1) - workflow_ref = signed_claims["job_workflow_ref"] + job_workflow_ref = signed_claims["job_workflow_ref"] + environment = signed_claims.get("environment") - if not (workflow_filename := _extract_workflow_filename(workflow_ref)): - return None + if not (job_workflow_filename := _extract_workflow_filename(job_workflow_ref)): + raise InvalidPublisherError( + "Could not job extract workflow filename from OIDC claims" + ) - return Query(klass).filter_by( + query: Query = Query(cls).filter_by( repository_name=repository_name, repository_owner=repository_owner, repository_owner_id=signed_claims["repository_owner_id"], - environment="", - workflow_filename=workflow_filename, + workflow_filename=job_workflow_filename, ) + publishers = query.with_session(session).all() - __lookup_strategies__ = [ - __lookup_all__, - __lookup_no_environment__, - ] + if publisher := cls._get_publisher_for_environment(publishers, environment): + return publisher + else: + raise InvalidPublisherError("Publisher with matching claims was not found") @property def _workflow_slug(self): diff --git a/warehouse/oidc/models/gitlab.py b/warehouse/oidc/models/gitlab.py index baf25a921091..5ec9916d7f33 100644 --- a/warehouse/oidc/models/gitlab.py +++ b/warehouse/oidc/models/gitlab.py @@ -12,8 +12,9 @@ import re -from typing import Any +from typing import Any, Self +from more_itertools import first_true from pypi_attestations import GitLabPublisher as GitLabIdentity, Publisher from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID @@ -188,47 +189,48 @@ class GitLabPublisherMixin: "groups_direct", } - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims) -> Query | None: - # This lookup requires the environment claim to be present; - # if it isn't, bail out early. - if not (environment := signed_claims.get("environment")): - return None - - project_path = signed_claims["project_path"] - ci_config_ref_uri = signed_claims["ci_config_ref_uri"] - namespace, project = project_path.rsplit("/", 1) - - if not (workflow_filepath := _extract_workflow_filepath(ci_config_ref_uri)): - return None + # Get the most specific publisher from a list of publishers, + # where publishers constrained with an environment are more + # specific than publishers not constrained on environment. + @classmethod + def _get_publisher_for_environment( + cls, publishers: list[Self], environment: str | None + ) -> Self | None: + if environment: + if specific_publisher := first_true( + publishers, pred=lambda p: p.environment == environment.lower() + ): + return specific_publisher + + if general_publisher := first_true( + publishers, pred=lambda p: p.environment == "" + ): + return general_publisher - return Query(klass).filter_by( - namespace=namespace, - project=project, - environment=environment, - workflow_filepath=workflow_filepath, - ) + return None - @staticmethod - def __lookup_no_environment__(klass, signed_claims: SignedClaims) -> Query | None: + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: project_path = signed_claims["project_path"] ci_config_ref_uri = signed_claims["ci_config_ref_uri"] namespace, project = project_path.rsplit("/", 1) + environment = signed_claims.get("environment") if not (workflow_filepath := _extract_workflow_filepath(ci_config_ref_uri)): - return None + raise InvalidPublisherError( + "Could not extract workflow filename from OIDC claims" + ) - return Query(klass).filter_by( + query: Query = Query(cls).filter_by( namespace=namespace, project=project, - environment="", workflow_filepath=workflow_filepath, ) + publishers = query.with_session(session).all() + if publisher := cls._get_publisher_for_environment(publishers, environment): + return publisher - __lookup_strategies__ = [ - __lookup_all__, - __lookup_no_environment__, - ] + raise InvalidPublisherError("Publisher with matching claims was not found") @property def project_path(self): diff --git a/warehouse/oidc/models/google.py b/warehouse/oidc/models/google.py index 04476d22cc6e..5ee90e855edd 100644 --- a/warehouse/oidc/models/google.py +++ b/warehouse/oidc/models/google.py @@ -10,13 +10,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any +from typing import Any, Self +from more_itertools import first_true from pypi_attestations import GooglePublisher as GoogleIdentity, Publisher from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Query, mapped_column +from warehouse.oidc.errors import InvalidPublisherError from warehouse.oidc.interfaces import SignedClaims from warehouse.oidc.models._core import ( CheckClaimCallable, @@ -68,20 +70,21 @@ class GooglePublisherMixin: __unchecked_claims__ = {"azp", "google"} - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims) -> Query | None: - return Query(klass).filter_by( - email=signed_claims["email"], sub=signed_claims["sub"] - ) + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: + query: Query = Query(cls).filter_by(email=signed_claims["email"]) + publishers = query.with_session(session).all() + + if sub := signed_claims.get("sub"): + if specific_publisher := first_true( + publishers, pred=lambda p: p.sub == sub + ): + return specific_publisher - @staticmethod - def __lookup_no_sub__(klass, signed_claims: SignedClaims) -> Query | None: - return Query(klass).filter_by(email=signed_claims["email"], sub="") + if general_publisher := first_true(publishers, pred=lambda p: p.sub == ""): + return general_publisher - __lookup_strategies__ = [ - __lookup_all__, - __lookup_no_sub__, - ] + raise InvalidPublisherError("Publisher with matching claims was not found") @property def publisher_name(self):