7
7
8
8
try :
9
9
import jwt
10
+ import jwt .utils
10
11
except ImportError :
11
12
jwt = None
12
13
14
+ try :
15
+ from cryptography .hazmat .primitives .serialization import load_pem_private_key
16
+ except ImportError :
17
+ load_pem_private_key = None
18
+
13
19
14
20
class Token (abc .ABC ):
15
21
def __init__ (self , token : str , token_type : str ):
@@ -36,18 +42,19 @@ def token(self) -> Token:
36
42
37
43
class JwtTokenSource (TokenSource ):
38
44
def __init__ (
39
- self ,
40
- signing_method : str ,
41
- private_key : typing .Optional [str ] = None ,
42
- private_key_file : typing .Optional [str ] = None ,
43
- key_id : typing .Optional [str ] = None ,
44
- issuer : typing .Optional [str ] = None ,
45
- subject : typing .Optional [str ] = None ,
46
- audience : typing .Union [typing .List [str ], str , None ] = None ,
47
- id : typing .Optional [str ] = None ,
48
- token_ttl_seconds : int = 3600 ,
45
+ self ,
46
+ signing_method : str ,
47
+ private_key : typing .Optional [str ] = None ,
48
+ private_key_file : typing .Optional [str ] = None ,
49
+ key_id : typing .Optional [str ] = None ,
50
+ issuer : typing .Optional [str ] = None ,
51
+ subject : typing .Optional [str ] = None ,
52
+ audience : typing .Union [typing .List [str ], str , None ] = None ,
53
+ id : typing .Optional [str ] = None ,
54
+ token_ttl_seconds : int = 3600 ,
49
55
):
50
56
assert jwt is not None , "Install pyjwt library to use jwt tokens"
57
+ assert load_pem_private_key is not None , "Install cryptography library to use jwt tokens"
51
58
self ._signing_method = signing_method
52
59
self ._key_id = key_id
53
60
if private_key and private_key_file :
@@ -57,7 +64,7 @@ def __init__(
57
64
self ._private_key = private_key
58
65
if private_key_file :
59
66
private_key_file = os .path .expanduser (private_key_file )
60
- with open (private_key_file , "r " ) as key_file :
67
+ with open (private_key_file , "rb " ) as key_file :
61
68
self ._private_key = key_file .read ()
62
69
self ._issuer = issuer
63
70
self ._subject = subject
@@ -70,6 +77,10 @@ def __init__(
70
77
raise Exception ("JWT: no private key specified" )
71
78
if self ._token_ttl_seconds <= 0 :
72
79
raise Exception ("JWT: invalid jwt token TTL" )
80
+ if isinstance (self ._private_key , str ):
81
+ self ._private_key = self ._private_key .encode ()
82
+ if isinstance (self ._private_key , bytes ) and jwt .utils .is_pem_format (self ._private_key ):
83
+ self ._private_key = load_pem_private_key (self ._private_key , password = None )
73
84
74
85
def token (self ) -> Token :
75
86
now = time .time ()
0 commit comments