Skip to content

Commit

Permalink
better fields_from_json
Browse files Browse the repository at this point in the history
  • Loading branch information
atombrella committed May 16, 2021
1 parent 85cc184 commit ea7d219
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 50 deletions.
3 changes: 3 additions & 0 deletions src/josepy/jwa.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,6 @@ def _verify(self, key, msg, asn1sig):
ES384 = JWASignature.register(_JWAEC('ES384', hashes.SHA384))
#: ECDSA using P-521 and SHA-512
ES512 = JWASignature.register(_JWAEC('ES512', hashes.SHA512))

# Also implement RFC 8037, signing for OKP key type
# hashes.BLAKE2b
4 changes: 4 additions & 0 deletions src/josepy/jwa_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
EC_P256_KEY = test_util.load_ec_private_key('ec_p256_key.pem')
EC_P384_KEY = test_util.load_ec_private_key('ec_p384_key.pem')
EC_P521_KEY = test_util.load_ec_private_key('ec_p521_key.pem')
OKP_ED25519_KEY = test_util.load_ec_private_key('ed25519_key.pem')
OKP_ED448_KEY = test_util.load_ec_private_key('ed448_key.pem')
OKP_X25519_KEY = test_util.load_ec_private_key('x25519_key.pem')
OKP_X448_KEY = test_util.load_ec_private_key('x448_key.pem')


class JWASignatureTest(unittest.TestCase):
Expand Down
59 changes: 36 additions & 23 deletions src/josepy/jwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import math

from typing import Dict, Optional, Sequence, Type, Union
from typing import Dict, Optional, Sequence, Tuple, Type, Union

import cryptography.exceptions
from cryptography.hazmat.backends import default_backend
Expand All @@ -18,7 +18,6 @@
)

from josepy import errors, json_util, util
from josepy.util import ComparableOKPKey

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -341,9 +340,7 @@ def fields_to_partial_json(self):
params['d'] = private.private_value
else:
raise errors.SerializationError(
'Supplied key is neither of type EllipticCurvePublicKey '
'nor EllipticCurvePrivateKey'
)
'Supplied key is neither of type EllipticCurvePublicKey nor EllipticCurvePrivateKey')
params['x'] = public.x
params['y'] = public.y
params = {
Expand Down Expand Up @@ -388,7 +385,7 @@ class JWKOKP(JWK):
or :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey`
or :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey`
or :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey`
or :ivar: :key :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey`
or :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey`
or :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey`
or :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey`
or :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey`
Expand All @@ -405,6 +402,12 @@ class JWKOKP(JWK):
x448.X448PrivateKey, x448.X448PublicKey,
)
required = ('crv', JWK.type_field_name, 'x')
crv_to_pub_priv: Dict[str, Tuple] = {
"Ed25519": (ed25519.Ed25519PublicKey, ed25519.Ed25519PrivateKey),
"Ed448": (ed448.Ed448PublicKey, ed448.Ed448PrivateKey),
"X25519": (x25519.X25519PublicKey, x25519.X25519PrivateKey),
"X448": (x448.X448PublicKey, x448.X448PrivateKey),
}

def __init__(self, *args, **kwargs):
if 'key' in kwargs and not isinstance(kwargs['key'], util.ComparableOKPKey):
Expand All @@ -415,41 +418,38 @@ def public_key(self):
return self.key._wrapped.__class__.public_key()

def _key_to_crv(self):
if isinstance(self.key._wrapped, (ed25519.Ed25519PrivateKey, ed25519.Ed25519PrivateKey)):
if isinstance(self.key._wrapped, (ed25519.Ed25519PublicKey, ed25519.Ed25519PrivateKey)):
return "Ed25519"
elif isinstance(self.key._wrapped, (ed448.Ed448PrivateKey, ed448.Ed448PrivateKey)):
elif isinstance(self.key._wrapped, (ed448.Ed448PublicKey, ed448.Ed448PrivateKey)):
return "Ed448"
elif isinstance(self.key._wrapped, (x25519.X25519PrivateKey, x25519.X25519PrivateKey)):
elif isinstance(self.key._wrapped, (x25519.X25519PublicKey, x25519.X25519PrivateKey)):
return "X25519"
elif isinstance(self.key._wrapped, (x448.X448PrivateKey, x448.X448PrivateKey)):
elif isinstance(self.key._wrapped, (x448.X448PublicKey, x448.X448PrivateKey)):
return "X448"
return NotImplemented

def fields_to_partial_json(self) -> Dict:
params = {}
print(dir(self))
if self.key.is_private():
params['d'] = json_util.encode_b64jose(self.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
))
params['x'] = self.key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
else:
params['x'] = json_util.encode_b64jose(self.key.public_bytes(
serialization.Encoding.Raw,
serialization.PublicFormat.Raw,
serialization.NoEncryption(),
))
params['crv'] = self._key_to_crv()
return params

@classmethod
def fields_from_json(cls, jobj) -> ComparableOKPKey:
# this was mostly copy/pasted from some source. Find out which.
def fields_from_json(cls, jobj):
try:
if isinstance(jobj, str):
obj = json.loads(jobj)
Expand All @@ -464,17 +464,30 @@ def fields_from_json(cls, jobj) -> ComparableOKPKey:
raise errors.DeserializationError("Not an Octet Key Pair")

curve = obj.get("crv")
if curve not in ("Ed25519", "Ed448", "X25519", "X448"):
if curve not in cls.crv_to_pub_priv:
raise errors.DeserializationError(f"Invalid curve: {curve}")

if "x" not in obj:
raise errors.DeserializationError('OKP should have "x" parameter')
x = json_util.decode_b64jose(jobj.get("x"))

try:
if "d" not in obj:
return jobj["key"]._wrapped.__class__.from_public_bytes(x) # noqa
d = json_util.decode_b64jose(obj.get("d"))
return jobj["key"]._wrapped.__class__.from_private_bytes(d) # noqa
if "d" not in obj: # public key
pub_class: Union[
ed25519.Ed25519PublicKey,
ed448.Ed448PublicKey,
x25519.X25519PublicKey,
x448.X448PublicKey,
] = cls.crv_to_pub_priv[curve][0]
return cls(key=pub_class.from_public_bytes(x))
else: # private key
d = json_util.decode_b64jose(obj.get("d"))
priv_key_class: Union[
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
x25519.X25519PrivateKey,
x448.X448PrivateKey,
] = cls.crv_to_pub_priv[curve][1]
return cls(key=priv_key_class.from_private_bytes(d))
except ValueError as err:
raise errors.DeserializationError("Invalid key parameter") from err
43 changes: 29 additions & 14 deletions src/josepy/jwk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ def setUp(self):
self.jwk = self.private

def test_init_auto_comparable(self):
self.assertIsInstance(
self.jwk256_not_comparable.key, util.ComparableECKey)
self.assertIsInstance(self.jwk256_not_comparable.key, util.ComparableECKey)
self.assertEqual(self.jwk256, self.jwk256_not_comparable)

def test_encode_param_zero(self):
Expand Down Expand Up @@ -328,9 +327,9 @@ class JWKOKPTest(unittest.TestCase):
"""Tests for josepy.jwk.JWKOKP."""
# pylint: disable=too-many-instance-attributes

# What to put in the thumbprint
# TODO: write the thumbprint
thumbprint = (

b'kPrK_qmxVWaYVA9wwBF6Iuo3vVzz7TxHCTwXBygrS4k'
)

def setUp(self):
Expand All @@ -341,14 +340,14 @@ def setUp(self):
self.x448_key = JWKOKP(key=X448_KEY.public_key())
self.private = self.x448_key
self.jwk = self.private
# Test vectors taken from
# Test vectors taken from RFC 8037, A.2
self.jwked25519json = {
'kty': 'OKP',
'crv': 'Ed25519',
'x': '',
'x': '11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo',
}
self.jwked448json = {
'kty': 'EC',
'kty': 'OKP',
'crv': 'Ed448',
'x':
"9b08f7cc31b7e3e67d22d5aea121074a273bd2b83de09c63faa73d2c"
Expand All @@ -357,27 +356,25 @@ def setUp(self):
# Test vectors taken from
# https://datatracker.ietf.org/doc/html/rfc7748#section-6.1
self.jwkx25519json = {
'kty': 'EC',
'kty': 'OKP',
'crv': 'X25519',
'x': '8520f0098930a754748b7ddcb43ef75a0dbf3a0d26381af4eba4a98eaa9b4e6a',
}
self.jwkx448json = {
'kty': 'EC',
'kty': 'OKP',
'crv': 'X448',
'x': 'jjQtV-fA7J_tK8dPzYq7jRPNjF8r5p6LW2R25S2Gw5U',
}

def test_encode_ed448(self):
from josepy.jwk import JWKOKP
import josepy
data = b"""-----BEGIN PRIVATE KEY-----
MEcCAQAwBQYDK2VxBDsEOfqsAFWdop10FFPW7Ha2tx2AZh0Ii+jfL2wFXU/dY/fe
iU7/vrGmQ+ux26NkgzfploOHZjEmltLJ9w==
-----END PRIVATE KEY-----"""
key = JWKOKP.load(data)
data = key.to_partial_json()
x = josepy.json_util.encode_b64jose(data['x'])
self.assertEqual(len(x), 195)
partial = key.to_partial_json()
self.assertEqual(partial['crv'], 'Ed448')

def test_encode_ed25519(self):
import josepy
Expand All @@ -388,7 +385,25 @@ def test_encode_ed25519(self):
key = JWKOKP.load(data)
data = key.to_partial_json()
x = josepy.json_util.encode_b64jose(data['x'])
self.assertEqual(len(x), 151)
self.assertEqual(x, "9ujoz88QZL05w2lhaqUbBaBpwmM12Y7Y8Ybfwjibk-I")

def test_from_json_ed25519(self):
from josepy.jwk import JWK
key = JWK.from_json(self.jwked25519json)
with self.subTest(key=[
self.jwked448json, self.jwked25519json,
self.jwkx25519json, self.jwkx448json,
]):
self.assertIsInstance(key.key, util.ComparableOKPKey)

def test_fields_to_json(self):
from josepy.jwk import JWK
data = b"""-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEIPIAha9VqyHHpY1GtEW8JXWqLU5mrPRhXPwJqCtL3bWZ
-----END PRIVATE KEY-----"""
key = JWK.load(data)
data = key.fields_to_partial_json()
self.assertEqual(data['crv'], "Ed25519")

def test_init_auto_comparable(self):
self.assertIsInstance(self.x448_key.key, util.ComparableOKPKey)
Expand Down
18 changes: 5 additions & 13 deletions src/josepy/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""JOSE utilities."""
from collections.abc import Hashable, Mapping
from typing import Union

import OpenSSL
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -168,34 +167,27 @@ class ComparableOKPKey(ComparableKey):
"""Wrapper for ``cryptography`` OKP keys.
Wraps around:
- :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey`
- :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey`
"""

def __hash__(self):
return hash((self.__class__, self._wrapped.curve.name, self._wrapped.x))

def is_private(self):
def is_private(self) -> bool:
return isinstance(
self._wrapped, (
ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey,
x25519.X25519PrivateKey, x448.X448PrivateKey
)
)

def public_key(self) -> Union[
ed25519.Ed25519PublicKey, ed448.Ed448PublicKey,
x25519.X25519PublicKey, x448.X448PublicKey,
]:
"""Get wrapped public key."""
return self._wrapped.public_key()


class ImmutableMap(Mapping, Hashable):
# pylint: disable=too-few-public-methods
Expand Down

0 comments on commit ea7d219

Please sign in to comment.