Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

py-vapid: Basic VAPID header generation based on PyPi py_vapid #827

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions python-ecosys/py_vapid/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
metadata(
version="0.1.0",
pypi="py-vapid",
author="Jonah Bron <hi@jonah.id>",
description="""
VAPID
""",
)

require("pyjwt")

package("py_vapid")
50 changes: 50 additions & 0 deletions python-ecosys/py_vapid/py_vapid/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Based on https://github.com/web-push-libs/vapid
"""

import binascii
import time
import jwt

from cryptography import serialization


def _to_b64url(data):
return (
binascii.b2a_base64(data)
.rstrip(b"\n")
.rstrip(b"=")
.replace(b"+", b"-")
.replace(b"/", b"_")
)


class Vapid:
def __init__(self, private_key):
self._private_key = private_key

def sign(self, claims):
claim = claims
if "exp" not in claim:
# Default to expiring 24 hours into the future (the max).
# https://datatracker.ietf.org/doc/html/rfc8292#section-2
exp = int(time.time()) + 86400
# Correct the epoch offset if not the Unix standard.
if time.gmtime(0)[0] == 2000:
exp += 946684800 # Unix timestamp of 2000-01-01

claim["exp"] = exp

token = jwt.encode(claim, self._private_key, "ES256")
public_key = _to_b64url(
self._private_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
).decode()

return {"Authorization": f"vapid t={token},k={public_key}"}


# Re-export for interface compatibility with PyPi py-vapid
Vapid02 = Vapid
111 changes: 111 additions & 0 deletions python-ecosys/py_vapid/test_vapid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import jwt
import py_vapid
from time import time
from cryptography import ec
from machine import RTC


"""
Run tests by executing:

```
mpremote fs cp py_vapid/__init__.py :lib/py_vapid.py + run test_vapid.py
```

The [ucryptography](https://github.com/dmazzella/ucryptography) library must
be present in the firmware for this library and tests to work.
"""

rtc = RTC()

GOLDEN_0 = (
0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75,
(2010, 1, 1, 0, 0, 0, 0, 0),
{
"aud": "https://updates.push.services.mozilla.com",
"sub": "mailto:admin@example.com",
"exp": 9876543,
},
"vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20iLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJleHAiOiA5ODc2NTQzfQ.DLB6PF2RApzk0n0oH-Kv_Onuwg9C7VXakM-GlEMCwj50rQ7G0hF_vLIYzCPeXT8Hu8Uup900YBapZ9y45vc8QA,k=BKoKs6nJ3466nCEQ5TvFkBIGBKSGplPTUBzJlLXM13I8S0SF-o_NSB-Q4At3BeLSrZVptEd5xBuGRXCKMe_YRg8",
)

GOLDEN_1 = (
0x4370082632776C74FDC5517AC12881413A60B25D10E863296AD67E4260A3BF56,
(2015, 1, 1, 0, 0, 0, 0, 0),
{
"aud": "https://updates.push.services.mozilla.com",
"sub": "mailto:admin@example.com",
},
"vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJleHAiOiAxNDIwMTU2ODAwLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20ifQ.NlVtqjGWy-hvNtoScrwAv-4cpNYrgUJ4EVgtxTnIn-haPtBSpak7aQN518tVYelQB1TZqc0bxAjWfK9QvZUbOA,k=BGEwf7m9F3vCvOuPeN4pEZ91t-dpSmg_y8ZXMfOyl-f22zw10ho_4EeBqZj2-NtW_Kb98b6tGjOKO_-TJiWvyfo",
)

# Set of opaquely known-good scenarios to check against
golden_test_cases = [GOLDEN_0, GOLDEN_1]


# Test basic validation of claim
private_key_0 = ec.derive_private_key(
0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1()
)
vapid = py_vapid.Vapid(private_key=private_key_0)
rtc.datetime((2018, 1, 1, 0, 0, 0, 0, 0))
headers = vapid.sign(
{
"aud": "https://fcm.googleapis.com",
"sub": "mailto:foo@bar.com",
"exp": 1493315200,
}
)

actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1]
actual_decoded_claim = jwt.decode(actual_token, private_key_0.public_key(), "ES256")
assert (
actual_decoded_claim["aud"] == "https://fcm.googleapis.com"
), f"Claim audience '{actual_decoded_claim['aud']}' does not match input"
assert (
actual_decoded_claim["sub"] == "mailto:foo@bar.com"
), f"Claim subscriber '{actual_decoded_claim['sub']}' does not match input"
assert (
actual_decoded_claim["exp"] == 1493315200
), f"Claim exp '{actual_decoded_claim['exp']}' does not match input"
print(f"Test claim validation: Passed")


# Test auto expiration date population
private_key_1 = ec.derive_private_key(
0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1()
)
vapid = py_vapid.Vapid(private_key=private_key_1)
rtc.datetime((2017, 1, 1, 0, 0, 0, 0, 0))
headers = vapid.sign(
{
"aud": "https://updates.push.services.mozilla.com",
"sub": "mailto:admin@example.com",
}
)

actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1]
actual_decoded_claim = jwt.decode(actual_token, private_key_1.public_key(), "ES256")
assert (
actual_decoded_claim["exp"] == 1483315200
), f"Claim exp '{actual_decoded_claim['exp']}' does not match expected 2017-01-02 value"
print(f"Test auto expiry: Passed")


# Because they provide the least information about what could have gone wrong,
# Run golden test cases after all more specific tests pass first.
for case_no, case in enumerate(golden_test_cases):
private_key_number, curr_time, claim, expected_id = case
try:
private_key = ec.derive_private_key(private_key_number, ec.SECP256R1())
vapid = py_vapid.Vapid(private_key=private_key)
rtc.datetime(curr_time)
headers = vapid.sign(claim)

assert (
headers["Authorization"] == expected_id
), f"Authorization header '{headers['Authorization']}' does not match golden test case {case_no}"
print(f"Golden test case {case_no}: Passed")
except Exception as e:
print(f"Golden test case {case_no}: Failed")
raise e
87 changes: 75 additions & 12 deletions python-ecosys/pyjwt/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@
import json
from time import time

# Optionally depend on https://github.com/dmazzella/ucryptography
try:
# Try importing from ucryptography port.
import cryptography
from cryptography import hashes, ec, serialization, utils

_ec_supported = True
except ImportError:
# No cryptography library available, no EC256 support.
_ec_supported = False


def _to_b64url(data):
return (
Expand All @@ -19,6 +30,28 @@ def _from_b64url(data):
return binascii.a2b_base64(data.replace(b"-", b"+").replace(b"_", b"/") + b"===")


def _sig_der_to_jws(signed):
"""Accept a DER signature and convert to JSON Web Signature bytes.

`cryptography` produces signatures encoded in DER ASN.1 binary format.
JSON Web Algorithm instead encodes the signature as the point coordinates
as bigendian byte strings concatenated.

See https://datatracker.ietf.org/doc/html/rfc7518#section-3.4
"""
r, s = utils.decode_dss_signature(signed)
return r.to_bytes(32, "big") + s.to_bytes(32, "big")


def _sig_jws_to_der(signed):
"""Accept a JSON Web Signature and convert to a DER signature.

See `_sig_der_to_jws()`
"""
r, s = int.from_bytes(signed[0:32], "big"), int.from_bytes(signed[32:], "big")
return utils.encode_dss_signature(r, s)


class exceptions:
class PyJWTError(Exception):
pass
Expand All @@ -37,19 +70,32 @@ class ExpiredSignatureError(PyJWTError):


def encode(payload, key, algorithm="HS256"):
if algorithm != "HS256":
if algorithm != "HS256" and algorithm != "ES256":
raise exceptions.InvalidAlgorithmError

if isinstance(key, str):
key = key.encode()
header = _to_b64url(json.dumps({"typ": "JWT", "alg": algorithm}).encode())
payload = _to_b64url(json.dumps(payload).encode())
signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())

if algorithm == "HS256":
if isinstance(key, str):
key = key.encode()
signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
elif algorithm == "ES256":
if not _ec_supported:
raise exceptions.InvalidAlgorithmError(
"Required dependencies for ES256 are not available"
)
if isinstance(key, int):
key = ec.derive_private_key(key, ec.SECP256R1())
signature = _to_b64url(
_sig_der_to_jws(key.sign(header + b"." + payload, ec.ECDSA(hashes.SHA256())))
)

return (header + b"." + payload + b"." + signature).decode()


def decode(token, key, algorithms=["HS256"]):
if "HS256" not in algorithms:
def decode(token, key, algorithms=["HS256", "ES256"]):
if "HS256" not in algorithms and "ES256" not in algorithms:
raise exceptions.InvalidAlgorithmError

parts = token.encode().split(b".")
Expand All @@ -63,14 +109,31 @@ def decode(token, key, algorithms=["HS256"]):
except Exception:
raise exceptions.InvalidTokenError

if header["alg"] not in algorithms or header["alg"] != "HS256":
if header["alg"] not in algorithms or (header["alg"] != "HS256" and header["alg"] != "ES256"):
raise exceptions.InvalidAlgorithmError

if isinstance(key, str):
key = key.encode()
calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
if signature != calculated_signature:
raise exceptions.InvalidSignatureError
if header["alg"] == "HS256":
if isinstance(key, str):
key = key.encode()
calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
if signature != calculated_signature:
raise exceptions.InvalidSignatureError
elif header["alg"] == "ES256":
if not _ec_supported:
raise exceptions.InvalidAlgorithmError(
"Required dependencies for ES256 are not available"
)

if isinstance(key, bytes):
key = ec.EllipticCurvePublicKey.from_encoded_point(key, ec.SECP256R1())
try:
key.verify(
_sig_jws_to_der(signature),
parts[0] + b"." + parts[1],
ec.ECDSA(hashes.SHA256()),
)
except cryptography.exceptions.InvalidSignature:
raise exceptions.InvalidSignatureError

if "exp" in payload:
if time() > payload["exp"]:
Expand Down
11 changes: 10 additions & 1 deletion python-ecosys/pyjwt/manifest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
metadata(version="0.1.0", pypi="pyjwt")
metadata(
version="0.2.0",
pypi="pyjwt",
description="""
JWT library for MicroPython. Supports HMAC (HS256) encoding essentially.
Optionally supports ECDSA (ES256) asymmetric-key signing/verification when the
[dmazella/ucryptography](https://github.com/dmazzella/ucryptography/) library
is available in the MicroPython firmware.
""",
)

require("hmac")

Expand Down
53 changes: 48 additions & 5 deletions python-ecosys/pyjwt/test_jwt.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,71 @@
import jwt
from time import time

"""
Run tests by executing:

```
mpremote fs cp jwt.py :lib/jwt.py + run test_jwt.py
```

Only the full test suite can be run if
[ucryptography](https://github.com/dmazzella/ucryptography) is present in the
firmware.
"""

# Indentation
I = " "

print("Testing HS256")
secret_key = "top-secret!"

token = jwt.encode({"user": "joe"}, secret_key, algorithm="HS256")
print(token)
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
if decoded != {"user": "joe"}:
raise Exception("Invalid decoded JWT")
else:
print("Encode/decode test: OK")
print(I, "Encode/decode test: OK")

try:
decoded = jwt.decode(token, "wrong-secret", algorithms=["HS256"])
except jwt.exceptions.InvalidSignatureError:
print("Invalid signature test: OK")
print(I, "Invalid signature test: OK")
else:
raise Exception("Invalid JWT should have failed decoding")

token = jwt.encode({"user": "joe", "exp": time() - 1}, secret_key)
print(token)
try:
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
except jwt.exceptions.ExpiredSignatureError:
print("Expired token test: OK")
print(I, "Expired token test: OK")
else:
raise Exception("Expired JWT should have failed decoding")


print("Testing ES256")
try:
from cryptography import ec
except ImportError:
raise Exception("No cryptography lib present, can't test ES256")

private_key = ec.derive_private_key(
0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75, ec.SECP256R1()
)
wrong_private_key = ec.derive_private_key(
0x25D91A0DA38F69283A0CE32B87D82817CA4E134A1693BE6083C2292BF562A451, ec.SECP256R1()
)

token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
decoded = jwt.decode(token, private_key.public_key(), algorithms=["ES256"])
if decoded != {"user": "joe"}:
raise Exception("Invalid decoded JWT")
else:
print(I, "Encode/decode test: OK")

token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
try:
decoded = jwt.decode(token + "a", wrong_private_key.public_key(), algorithms=["ES256"])
except jwt.exceptions.InvalidSignatureError:
print(I, "Invalid signature test: OK")
else:
raise Exception("Invalid JWT should have fialed decoding")