1+ import datetime
12import functools
23import getpass
34import json
45import logging
5- from typing import TYPE_CHECKING , Callable , Optional , Type , cast
6+ import time
7+ import typing as t
8+ from typing import cast
69from urllib .parse import urlparse
710
11+ import requests .auth
812from id import AmbientCredentialError # type: ignore
913from id import detect_credential
1014
1115# keyring has an indirect dependency on PyCA cryptography, which has no
1216# pre-built wheels for ppc64le and s390x, see #1158.
13- if TYPE_CHECKING :
17+ if t . TYPE_CHECKING :
1418 import keyring
1519 from keyring .errors import NoKeyringError
1620else :
2630
2731logger = logging .getLogger (__name__ )
2832
33+ TOKEN_USERNAME : t .Final [str ] = "__token__"
34+ #: Tokens expire after 15 minutes, let's start allowing renewal/replacement
35+ #: after 10 minutes that way if we fail, we may still have time to replace it
36+ #: before it expires. Thus, if our current time + this threshold is past the
37+ #: greater or equal to the expiration time, we should start trying to replace
38+ #: the token.
39+ TOKEN_RENEWAL_THRESHOLD : t .Final [datetime .timedelta ] = datetime .timedelta (
40+ minutes = 5 ,
41+ )
42+
2943
3044class CredentialInput :
3145 def __init__ (
32- self , username : Optional [str ] = None , password : Optional [str ] = None
46+ self , username : t . Optional [str ] = None , password : t . Optional [str ] = None
3347 ) -> None :
3448 self .username = username
3549 self .password = password
3650
3751
52+ class TrustedPublishingTokenRetrievalError (t .TypedDict ):
53+ code : str
54+ description : str
55+
56+
57+ class TrustedPublishingToken (t .TypedDict , total = False ):
58+ message : t .Optional [str ]
59+ errors : t .Optional [list [TrustedPublishingTokenRetrievalError ]]
60+ token : t .Optional [str ]
61+ success : t .Optional [bool ]
62+ # Depends on https://github.com/pypi/warehouse/issues/18235
63+ expires : t .Optional [int ]
64+
65+
66+ class TrustedPublishingAuthenticator (requests .auth .AuthBase ):
67+ def __init__ (
68+ self ,
69+ resolver : "Resolver" ,
70+ ) -> None :
71+ self .resolver = resolver
72+
73+ def __call__ (
74+ self , request : "requests.models.PreparedRequest"
75+ ) -> "requests.models.PreparedRequest" :
76+ token = self .resolver .make_trusted_publishing_token ()
77+ if token is None :
78+ raise exceptions .TrustedPublishingFailure (
79+ "Expected a trusted publishing token but got None"
80+ )
81+
82+ # Instead of reconstructing Basic Auth headers ourself, let's just
83+ # rely on the underlying class to do the right thing.
84+ basic_auth = requests .auth .HTTPBasicAuth (
85+ username = TOKEN_USERNAME ,
86+ password = token ,
87+ )
88+ return cast (requests .models .PreparedRequest , basic_auth (request ))
89+
90+
3891class Resolver :
92+ _tp_token : t .Optional [TrustedPublishingToken ] = None
93+ _expires : t .Optional [int ] = None
94+
3995 def __init__ (
4096 self ,
4197 config : utils .RepositoryConfig ,
@@ -44,16 +100,37 @@ def __init__(
44100 self .config = config
45101 self .input = input
46102
103+ @property
104+ @functools .lru_cache ()
105+ def authenticator (self ) -> "requests.auth.AuthBase" :
106+ username = self .username
107+ password = self .password
108+ if self ._tp_token :
109+ # If `self.password` ended up getting a Trusted Publishing token,
110+ # we've cached it here so we should use that as the authenticator.
111+ # We have a custom authenticator so we can repeatedly invoke
112+ # `make_trusted_publishing_token` which if the token is 10 minutes
113+ # old or more, we should get a new one automatically.
114+ return TrustedPublishingAuthenticator (resolver = self )
115+ if username and password :
116+ return requests .auth .HTTPBasicAuth (
117+ username = username ,
118+ password = password ,
119+ )
120+ raise exceptions .InvalidConfiguration (
121+ "could not determine credentials for configured repository"
122+ )
123+
47124 @classmethod
48- def choose (cls , interactive : bool ) -> Type ["Resolver" ]:
125+ def choose (cls , interactive : bool ) -> t . Type ["Resolver" ]:
49126 return cls if interactive else Private
50127
51128 @property
52129 @functools .lru_cache ()
53- def username (self ) -> Optional [str ]:
130+ def username (self ) -> t . Optional [str ]:
54131 if self .is_pypi () and not self .input .username :
55132 # Default username.
56- self .input .username = "__token__"
133+ self .input .username = TOKEN_USERNAME
57134
58135 return utils .get_userpass_value (
59136 self .input .username ,
@@ -64,15 +141,23 @@ def username(self) -> Optional[str]:
64141
65142 @property
66143 @functools .lru_cache ()
67- def password (self ) -> Optional [str ]:
144+ def password (self ) -> t . Optional [str ]:
68145 return utils .get_userpass_value (
69146 self .input .password ,
70147 self .config ,
71148 key = "password" ,
72149 prompt_strategy = self .password_from_keyring_or_trusted_publishing_or_prompt ,
73150 )
74151
75- def make_trusted_publishing_token (self ) -> Optional [str ]:
152+ def _has_valid_cached_tp_token (self ) -> bool :
153+ return self ._tp_token is not None and (
154+ int (time .time ()) + TOKEN_RENEWAL_THRESHOLD .seconds
155+ < cast (int , self ._tp_token .get ("expires" , self ._expires ))
156+ )
157+
158+ def _make_trusted_publishing_token (self ) -> t .Optional [TrustedPublishingToken ]:
159+ if self ._has_valid_cached_tp_token ():
160+ return self ._tp_token
76161 # Trusted publishing (OpenID Connect): get one token from the CI
77162 # system, and exchange that for a PyPI token.
78163 repository_domain = cast (str , urlparse (self .system ).netloc )
@@ -97,7 +182,13 @@ def make_trusted_publishing_token(self) -> Optional[str]:
97182
98183 if oidc_token is None :
99184 logger .debug ("This environment is not supported for trusted publishing" )
100- return None # Fall back to prompting for a token (if possible)
185+ if self ._tp_token and int (time .time ()) > cast (
186+ int , self ._tp_token .get ("expires" , self ._expires )
187+ ):
188+ return None # Fall back to prompting for a token (if possible)
189+ # The cached trusted publishing token may still be valid for a
190+ # while longer, let's continue using it instead of prompting
191+ return self ._tp_token
101192
102193 logger .debug ("Got OIDC token for audience %s" , audience )
103194
@@ -121,18 +212,26 @@ def make_trusted_publishing_token(self) -> Optional[str]:
121212 for error in mint_token_payload ["errors" ]
122213 )
123214 raise exceptions .TrustedPublishingFailure (
124- "The token request failed; the index server gave the following "
125- f"reasons:\n \n { reasons } "
215+ "The token request failed; the index server gave the following"
216+ f" reasons:\n \n { reasons } "
126217 )
127218
128219 logger .debug ("Minted upload token for trusted publishing" )
220+ self ._tp_token = cast (TrustedPublishingToken , mint_token_payload )
221+ self ._expires = int (time .time ()) + 900
222+ return self ._tp_token
223+
224+ def make_trusted_publishing_token (self ) -> t .Optional [str ]:
225+ mint_token_payload = self ._make_trusted_publishing_token ()
226+ if not mint_token_payload :
227+ return None
129228 return cast (str , mint_token_payload ["token" ])
130229
131230 @property
132- def system (self ) -> Optional [str ]:
231+ def system (self ) -> t . Optional [str ]:
133232 return self .config ["repository" ]
134233
135- def get_username_from_keyring (self ) -> Optional [str ]:
234+ def get_username_from_keyring (self ) -> t . Optional [str ]:
136235 if keyring is None :
137236 logger .info ("keyring module is not available" )
138237 return None
@@ -149,7 +248,7 @@ def get_username_from_keyring(self) -> Optional[str]:
149248 logger .warning ("Error getting username from keyring" , exc_info = exc )
150249 return None
151250
152- def get_password_from_keyring (self ) -> Optional [str ]:
251+ def get_password_from_keyring (self ) -> t . Optional [str ]:
153252 if keyring is None :
154253 logger .info ("keyring module is not available" )
155254 return None
@@ -178,7 +277,7 @@ def password_from_keyring_or_trusted_publishing_or_prompt(self) -> str:
178277 logger .info ("password set from keyring" )
179278 return password
180279
181- if self .is_pypi () and self .username == "__token__" :
280+ if self .is_pypi () and self .username == TOKEN_USERNAME :
182281 logger .debug (
183282 "Trying to use trusted publishing (no token was explicitly provided)"
184283 )
@@ -190,7 +289,7 @@ def password_from_keyring_or_trusted_publishing_or_prompt(self) -> str:
190289
191290 return self .prompt (what , getpass .getpass )
192291
193- def prompt (self , what : str , how : Callable [..., str ]) -> str :
292+ def prompt (self , what : str , how : t . Callable [..., str ]) -> str :
194293 return how (f"Enter your { what } : " )
195294
196295 def is_pypi (self ) -> bool :
@@ -204,5 +303,5 @@ def is_pypi(self) -> bool:
204303
205304
206305class Private (Resolver ):
207- def prompt (self , what : str , how : Optional [Callable [..., str ]] = None ) -> str :
306+ def prompt (self , what : str , how : t . Optional [t . Callable [..., str ]] = None ) -> str :
208307 raise exceptions .NonInteractive (f"Credential not found for { what } ." )
0 commit comments