1919from jwt import PyJWKClient , ExpiredSignatureError , InvalidTokenError , DecodeError
2020from jwt import InvalidAudienceError , InvalidIssuerError , InvalidSignatureError
2121from firebase_admin import _utils
22+ from firebase_admin import _http_client
2223
2324_APP_CHECK_ATTRIBUTE = '_app_check'
2425
2526def _get_app_check_service (app ) -> Any :
2627 return _utils .get_app_service (app , _APP_CHECK_ATTRIBUTE , _AppCheckService )
2728
28- def verify_token (token : str , app = None ) -> Dict [str , Any ]:
29+ def verify_token (token : str , app = None , consume : bool = False ) -> Dict [str , Any ]:
2930 """Verifies a Firebase App Check token.
3031
3132 Args:
3233 token: A token from App Check.
3334 app: An App instance (optional).
35+ consume: A boolean indicating whether to consume the token (optional).
3436
3537 Returns:
3638 Dict[str, Any]: The token's decoded claims.
@@ -40,7 +42,7 @@ def verify_token(token: str, app=None) -> Dict[str, Any]:
4042 or if the token's headers or payload are invalid.
4143 PyJWKClientError: If PyJWKClient fails to fetch a valid signing key.
4244 """
43- return _get_app_check_service (app ).verify_token (token )
45+ return _get_app_check_service (app ).verify_token (token , consume )
4446
4547class _AppCheckService :
4648 """Service class that implements Firebase App Check functionality."""
@@ -50,6 +52,7 @@ class _AppCheckService:
5052 _project_id = None
5153 _scoped_project_id = None
5254 _jwks_client = None
55+ _http_client = None
5356
5457 _APP_CHECK_HEADERS = {
5558 'x-goog-api-client' : _utils .get_metrics_header (),
@@ -68,9 +71,12 @@ def __init__(self, app):
6871 # Default lifespan is 300 seconds (5 minutes) so we change it to 21600 seconds (6 hours).
6972 self ._jwks_client = PyJWKClient (
7073 self ._JWKS_URL , lifespan = 21600 , headers = self ._APP_CHECK_HEADERS )
74+ self ._http_client = _http_client .JsonHttpClient (
75+ credential = app .credential ,
76+ base_url = 'https://firebaseappcheck.googleapis.com/v1beta' )
7177
7278
73- def verify_token (self , token : str ) -> Dict [str , Any ]:
79+ def verify_token (self , token : str , consume : bool = False ) -> Dict [str , Any ]:
7480 """Verifies a Firebase App Check token."""
7581 _Validators .check_string ("app check token" , token )
7682
@@ -87,8 +93,18 @@ def verify_token(self, token: str) -> Dict[str, Any]:
8793 ) from exception
8894
8995 verified_claims ['app_id' ] = verified_claims .get ('sub' )
96+ if consume :
97+ already_consumed = self ._verify_replay_protection (token )
98+ verified_claims ['already_consumed' ] = already_consumed
9099 return verified_claims
91100
101+ def _verify_replay_protection (self , token : str ) -> bool :
102+ """Verifies the token's consumption status."""
103+ path = f'/{ self ._scoped_project_id } :verifyAppCheckToken'
104+ body = {'app_check_token' : token }
105+ response = self ._http_client .body ('post' , path , json = body )
106+ return response .get ('alreadyConsumed' , False )
107+
92108 def _has_valid_token_headers (self , headers : Any ) -> None :
93109 """Checks whether the token has valid headers for App Check."""
94110 # Ensure the token's header has type JWT
0 commit comments