diff --git a/python/py_vapid/__init__.py b/python/py_vapid/__init__.py index 37b01e2..0529061 100644 --- a/python/py_vapid/__init__.py +++ b/python/py_vapid/__init__.py @@ -9,7 +9,7 @@ import re from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import ec, utils as ecutils from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import hashes @@ -63,6 +63,16 @@ def from_raw(cls, private_raw): backend=default_backend()) return cls(key) + @classmethod + def from_raw_public(cls, public_raw): + key = ec.EllipticCurvePublicNumbers.from_encoded_point( + curve=ec.SECP256R1(), + data=b64urldecode(public_raw) + ).public_key(default_backend()) + ss = cls() + ss._public_key = key + return ss + @classmethod def from_pem(cls, private_key): """Initialize VAPID using a private key in PEM format. @@ -113,6 +123,16 @@ def from_file(cls, private_key_file=None): logging.error("Could not open private key file: %s", repr(exc)) raise VapidException(exc) + @classmethod + def verify(cls, key, auth): + # TODO: add v2 validation + tokens = auth.rsplit(' ', 1)[1].rsplit('.', 1) + kp = cls().from_raw_public(key.encode()) + return kp.verify_token( + validation_token=tokens[0].encode(), + verification_token=tokens[1] + ) + @property def private_key(self): """The VAPID private ECDSA key""" @@ -184,21 +204,6 @@ def save_public_key(self, key_file): file.write(self.public_pem()) file.close() - def validate(self, validation_token): - """Sign a Valdiation token from the dashboard - - :param validation_token: Short validation token from the dev dashboard - :type validation_token: str - :returns: corresponding token for key verification - :rtype: str - - """ - sig = self.private_key.sign( - validation_token, - signature_algorithm=ec.ECDSA(hashes.SHA256())) - verification_token = b64urlencode(sig) - return verification_token - def verify_token(self, validation_token, verification_token): """Internally used to verify the verification token is correct. @@ -211,8 +216,10 @@ def verify_token(self, validation_token, verification_token): """ hsig = b64urldecode(verification_token.encode('utf8')) + r = int(binascii.hexlify(hsig[:32]), 16) + s = int(binascii.hexlify(hsig[32:]), 16) return self.public_key.verify( - hsig, + ecutils.encode_dss_signature(r, s), validation_token, signature_algorithm=ec.ECDSA(hashes.SHA256()) ) diff --git a/python/py_vapid/jwt.py b/python/py_vapid/jwt.py index bdcc2a7..a296690 100644 --- a/python/py_vapid/jwt.py +++ b/python/py_vapid/jwt.py @@ -6,13 +6,12 @@ from cryptography.hazmat.primitives.asymmetric import ec, utils from cryptography.hazmat.primitives import hashes -from py_vapid.utils import b64urldecode, b64urlencode +from py_vapid.utils import b64urldecode, b64urlencode, num_to_bytes def extract_signature(auth): - """Fix the JWT auth token - - convert a ecdsa integer pair into an OpenSSL DER pair. + """Extracts the payload and signature from a JWT, converting from RFC7518 + to RFC 3279 :param auth: A JWT Authorization Token. :type auth: str @@ -23,7 +22,7 @@ def extract_signature(auth): payload, asig = auth.encode('utf8').rsplit(b'.', 1) sig = b64urldecode(asig) if len(sig) != 64: - return payload, sig + raise InvalidSignature() encoded = utils.encode_dss_signature( s=int(binascii.hexlify(sig[32:]), 16), @@ -35,8 +34,6 @@ def extract_signature(auth): def decode(token, key): """Decode a web token into an assertion dictionary - This attempts to rectify both ecdsa and openssl generated signatures. - :param token: VAPID auth token :type token: str :param key: bitarray containing the public key @@ -80,9 +77,12 @@ def sign(claims, key): """ header = b64urlencode(b"""{"typ":"JWT","alg":"ES256"}""") + # Unfortunately, chrome seems to require the claims to be sorted. claims = b64urlencode(json.dumps(claims, - separators=(',', ':')).encode('utf8')) + separators=(',', ':'), + sort_keys=True).encode('utf8')) token = "{}.{}".format(header, claims) rsig = key.sign(token.encode('utf8'), ec.ECDSA(hashes.SHA256())) - sig = b64urlencode(rsig) + (r, s) = utils.decode_dss_signature(rsig) + sig = b64urlencode(num_to_bytes(r) + num_to_bytes(s)) return "{}.{}".format(token, sig) diff --git a/python/py_vapid/tests/test_vapid.py b/python/py_vapid/tests/test_vapid.py index 0b55a50..6a4322a 100644 --- a/python/py_vapid/tests/test_vapid.py +++ b/python/py_vapid/tests/test_vapid.py @@ -6,12 +6,8 @@ from nose.tools import eq_, ok_ from mock import patch, Mock -from cryptography.hazmat.primitives.asymmetric import ec, utils -from cryptography.hazmat.primitives import hashes - from py_vapid import Vapid01, Vapid02, VapidException from py_vapid.jwt import decode -from py_vapid.utils import b64urldecode # This is a private key in DER form. T_DER = """ @@ -126,17 +122,6 @@ def test_from_raw(self): v = Vapid01.from_raw(T_RAW) self.check_keys(v) - def test_validate(self): - v = Vapid01.from_file("/tmp/private") - msg = "foobar".encode('utf8') - vtoken = v.validate(msg) - ok_(v.public_key.verify( - base64.urlsafe_b64decode(self.repad(vtoken).encode()), - msg, - ec.ECDSA(hashes.SHA256()))) - # test verify - ok_(v.verify_token(msg, vtoken)) - def test_sign_01(self): v = Vapid01.from_file("/tmp/private") claims = {"aud": "https://example.com", @@ -152,6 +137,12 @@ def test_sign_01(self): result = v.sign(claims) eq_(result['Crypto-Key'], 'p256ecdsa=' + T_PUBLIC_RAW.decode('utf8')) + # Verify using the same function as Integration + # this should ensure that the r,s sign values are correctly formed + ok_(Vapid01.verify( + key=result['Crypto-Key'].split('=')[1], + auth=result['Authorization'] + )) def test_sign_02(self): v = Vapid02.from_file("/tmp/private") @@ -174,26 +165,17 @@ def test_sign_02(self): for k in claims: eq_(t_val[k], claims[k]) - def test_alt_sign(self): - """ecdsa uses a raw key pair to sign, openssl uses a DER.""" - v = Vapid01.from_file("/tmp/private") - claims = {"aud": "https://example.com", - "sub": "mailto:admin@example.com", - "foo": "extra value"} - # Get a signed token. - result = v.sign(claims) - # Convert the dss into raw. - auth, sig = result.get('Authorization').split(' ')[1].rsplit('.', 1) - ss = utils.decode_dss_signature(b64urldecode(sig.encode('utf8'))) - new_sig = binascii.b2a_base64( - binascii.unhexlify("%064x%064x" % ss) - ).strip().strip(b'=').decode() - new_auth = auth + '.' + new_sig - # phew, all that done, now check - pkey = result.get("Crypto-Key").split('=')[1] - items = decode(new_auth, pkey) - - eq_(items, claims) + def test_integration(self): + # These values were taken from a test page. DO NOT ALTER! + key = ("BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI" + "iBHXRdJI2Qhumhf6_LFTeZaNndIo") + + auth = ("WebPush eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod" + "HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV" + "4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb" + "W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc" + "4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZolQA") + ok_(Vapid01.verify(key=key, auth=auth)) def test_bad_sign(self): v = Vapid01.from_file("/tmp/private") diff --git a/python/py_vapid/utils.py b/python/py_vapid/utils.py index c966d6c..44c821e 100644 --- a/python/py_vapid/utils.py +++ b/python/py_vapid/utils.py @@ -1,4 +1,5 @@ import base64 +import binascii def b64urldecode(data): @@ -23,3 +24,13 @@ def b64urlencode(data): """ return base64.urlsafe_b64encode(data).replace(b'=', b'').decode('utf8') + + +def num_to_bytes(n): + """Returns the byte representation of an integer, in big-endian order. + :param n: The integer to encode. + :type n: int + :returns bytes + """ + h = '%x' % n + return binascii.unhexlify('0' * (len(h) % 2) + h) diff --git a/python/setup.py b/python/setup.py index 3c37580..6bfec9a 100644 --- a/python/setup.py +++ b/python/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages -__version__ = "1.2.2" +__version__ = "1.2.3" def read_from(file):