Skip to content

Commit 17b7e0c

Browse files
committed
pyjwt: ES256 algorithm support for PyJWT.
Add optional support for ES256 JWT signing/verifying to PyJWT using @dmazzella's cryptography port. Signed-off-by: Jonah Bron <[email protected]>
1 parent 45ead11 commit 17b7e0c

File tree

2 files changed

+123
-17
lines changed

2 files changed

+123
-17
lines changed

python-ecosys/pyjwt/jwt.py

+75-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@
44
import json
55
from time import time
66

7+
# Optionally depend on https://github.com/dmazzella/ucryptography
8+
try:
9+
# Try importing from ucryptography port.
10+
import cryptography
11+
from cryptography import hashes, ec, serialization, utils
12+
13+
_ec_supported = True
14+
except ImportError:
15+
# No cryptography library available, no EC256 support.
16+
_ec_supported = False
17+
718

819
def _to_b64url(data):
920
return (
@@ -19,6 +30,28 @@ def _from_b64url(data):
1930
return binascii.a2b_base64(data.replace(b"-", b"+").replace(b"_", b"/") + b"===")
2031

2132

33+
def _sig_der_to_jws(signed):
34+
"""Accept a DER signature and convert to JSON Web Signature bytes.
35+
36+
`cryptography` produces signatures encoded in DER ASN.1 binary format.
37+
JSON Web Algorithm instead encodes the signature as the point coordinates
38+
as bigendian byte strings concatenated.
39+
40+
See https://datatracker.ietf.org/doc/html/rfc7518#section-3.4
41+
"""
42+
r, s = utils.decode_dss_signature(signed)
43+
return r.to_bytes(32, "big") + s.to_bytes(32, "big")
44+
45+
46+
def _sig_jws_to_der(signed):
47+
"""Accept a JSON Web Signature and convert to a DER signature.
48+
49+
See `_sig_der_to_jws()`
50+
"""
51+
r, s = int.from_bytes(signed[0:32], "big"), int.from_bytes(signed[32:], "big")
52+
return utils.encode_dss_signature(r, s)
53+
54+
2255
class exceptions:
2356
class PyJWTError(Exception):
2457
pass
@@ -37,19 +70,32 @@ class ExpiredSignatureError(PyJWTError):
3770

3871

3972
def encode(payload, key, algorithm="HS256"):
40-
if algorithm != "HS256":
73+
if algorithm != "HS256" and algorithm != "ES256":
4174
raise exceptions.InvalidAlgorithmError
4275

43-
if isinstance(key, str):
44-
key = key.encode()
4576
header = _to_b64url(json.dumps({"typ": "JWT", "alg": algorithm}).encode())
4677
payload = _to_b64url(json.dumps(payload).encode())
47-
signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
78+
79+
if algorithm == "HS256":
80+
if isinstance(key, str):
81+
key = key.encode()
82+
signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
83+
elif algorithm == "ES256":
84+
if not _ec_supported:
85+
raise exceptions.InvalidAlgorithmError(
86+
"Required dependencies for ES256 are not available"
87+
)
88+
if isinstance(key, int):
89+
key = ec.derive_private_key(key, ec.SECP256R1())
90+
signature = _to_b64url(
91+
_sig_der_to_jws(key.sign(header + b"." + payload, ec.ECDSA(hashes.SHA256())))
92+
)
93+
4894
return (header + b"." + payload + b"." + signature).decode()
4995

5096

51-
def decode(token, key, algorithms=["HS256"]):
52-
if "HS256" not in algorithms:
97+
def decode(token, key, algorithms=["HS256", "ES256"]):
98+
if "HS256" not in algorithms and "ES256" not in algorithms:
5399
raise exceptions.InvalidAlgorithmError
54100

55101
parts = token.encode().split(b".")
@@ -63,14 +109,31 @@ def decode(token, key, algorithms=["HS256"]):
63109
except Exception:
64110
raise exceptions.InvalidTokenError
65111

66-
if header["alg"] not in algorithms or header["alg"] != "HS256":
112+
if header["alg"] not in algorithms or (header["alg"] != "HS256" and header["alg"] != "ES256"):
67113
raise exceptions.InvalidAlgorithmError
68114

69-
if isinstance(key, str):
70-
key = key.encode()
71-
calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
72-
if signature != calculated_signature:
73-
raise exceptions.InvalidSignatureError
115+
if header["alg"] == "HS256":
116+
if isinstance(key, str):
117+
key = key.encode()
118+
calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
119+
if signature != calculated_signature:
120+
raise exceptions.InvalidSignatureError
121+
elif header["alg"] == "ES256":
122+
if not _ec_supported:
123+
raise exceptions.InvalidAlgorithmError(
124+
"Required dependencies for ES256 are not available"
125+
)
126+
127+
if isinstance(key, bytes):
128+
key = ec.EllipticCurvePublicKey.from_encoded_point(key, ec.SECP256R1())
129+
try:
130+
key.verify(
131+
_sig_jws_to_der(signature),
132+
parts[0] + b"." + parts[1],
133+
ec.ECDSA(hashes.SHA256()),
134+
)
135+
except cryptography.exceptions.InvalidSignature:
136+
raise exceptions.InvalidSignatureError
74137

75138
if "exp" in payload:
76139
if time() > payload["exp"]:

python-ecosys/pyjwt/test_jwt.py

+48-5
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,71 @@
11
import jwt
22
from time import time
33

4+
"""
5+
Run tests by executing:
6+
7+
```
8+
mpremote fs cp jwt.py :lib/jwt.py + run test_jwt.py
9+
```
10+
11+
Only the full test suite can be run if
12+
[ucryptography](https://github.com/dmazzella/ucryptography) is present in the
13+
firmware.
14+
"""
15+
16+
# Indentation
17+
I = " "
18+
19+
print("Testing HS256")
420
secret_key = "top-secret!"
521

622
token = jwt.encode({"user": "joe"}, secret_key, algorithm="HS256")
7-
print(token)
823
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
924
if decoded != {"user": "joe"}:
1025
raise Exception("Invalid decoded JWT")
1126
else:
12-
print("Encode/decode test: OK")
27+
print(I, "Encode/decode test: OK")
1328

1429
try:
1530
decoded = jwt.decode(token, "wrong-secret", algorithms=["HS256"])
1631
except jwt.exceptions.InvalidSignatureError:
17-
print("Invalid signature test: OK")
32+
print(I, "Invalid signature test: OK")
1833
else:
1934
raise Exception("Invalid JWT should have failed decoding")
2035

2136
token = jwt.encode({"user": "joe", "exp": time() - 1}, secret_key)
22-
print(token)
2337
try:
2438
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
2539
except jwt.exceptions.ExpiredSignatureError:
26-
print("Expired token test: OK")
40+
print(I, "Expired token test: OK")
2741
else:
2842
raise Exception("Expired JWT should have failed decoding")
43+
44+
45+
print("Testing ES256")
46+
try:
47+
from cryptography import ec
48+
except ImportError:
49+
raise Exception("No cryptography lib present, can't test ES256")
50+
51+
private_key = ec.derive_private_key(
52+
0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75, ec.SECP256R1()
53+
)
54+
wrong_private_key = ec.derive_private_key(
55+
0x25D91A0DA38F69283A0CE32B87D82817CA4E134A1693BE6083C2292BF562A451, ec.SECP256R1()
56+
)
57+
58+
token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
59+
decoded = jwt.decode(token, private_key.public_key(), algorithms=["ES256"])
60+
if decoded != {"user": "joe"}:
61+
raise Exception("Invalid decoded JWT")
62+
else:
63+
print(I, "Encode/decode test: OK")
64+
65+
token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
66+
try:
67+
decoded = jwt.decode(token + "a", wrong_private_key.public_key(), algorithms=["ES256"])
68+
except jwt.exceptions.InvalidSignatureError:
69+
print(I, "Invalid signature test: OK")
70+
else:
71+
raise Exception("Invalid JWT should have fialed decoding")

0 commit comments

Comments
 (0)