mirror of
https://github.com/certbot/certbot.git
synced 2026-04-24 07:40:02 -04:00
JWS
This commit is contained in:
parent
8208a7f4d5
commit
7def7df897
7 changed files with 701 additions and 4 deletions
|
|
@ -21,6 +21,13 @@ JSON Web Key
|
|||
:members:
|
||||
|
||||
|
||||
JSON Web Signature
|
||||
------------------
|
||||
|
||||
.. automodule:: letsencrypt.acme.jose.jws
|
||||
:members:
|
||||
|
||||
|
||||
Implementation details
|
||||
----------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ particular the following RFCs:
|
|||
|
||||
- `JSON Web Algorithms (JWA)`_
|
||||
- `JSON Web Key (JWK)`_
|
||||
- `JSON Web Signature (JWS)`_
|
||||
|
||||
|
||||
.. _`Javascript Object Signing and Encryption (Active WG)`:
|
||||
|
|
@ -17,6 +18,9 @@ particular the following RFCs:
|
|||
.. _`JSON Web Key (JWK)`:
|
||||
https://datatracker.ietf.org/doc/draft-ietf-jose-json-web-key/
|
||||
|
||||
.. _`JSON Web Signature (JWS)`:
|
||||
https://datatracker.ietf.org/doc/draft-ietf-jose-json-web-signature/
|
||||
|
||||
"""
|
||||
from letsencrypt.acme.jose.b64 import (
|
||||
b64decode,
|
||||
|
|
@ -62,6 +66,8 @@ from letsencrypt.acme.jose.jwk import (
|
|||
JWKRSA,
|
||||
)
|
||||
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
|
||||
from letsencrypt.acme.jose.util import (
|
||||
ComparableX509,
|
||||
ImmutableMap,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
"""JSON Web Key."""
|
||||
import abc
|
||||
import binascii
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
|
|
@ -6,6 +7,7 @@ import Crypto.PublicKey.RSA
|
|||
from letsencrypt.acme.jose import b64
|
||||
from letsencrypt.acme.jose import errors
|
||||
from letsencrypt.acme.jose import json_util
|
||||
from letsencrypt.acme.jose import util
|
||||
|
||||
|
||||
class JWK(json_util.TypedJSONObjectWithFields):
|
||||
|
|
@ -14,6 +16,20 @@ class JWK(json_util.TypedJSONObjectWithFields):
|
|||
type_field_name = 'kty'
|
||||
TYPES = {}
|
||||
|
||||
@util.abstractclassmethod
|
||||
def load(cls, string): # pragma: no cover
|
||||
"""Load key from normalized string form."""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def public(self): # pragma: no cover
|
||||
"""Generate JWK with public key.
|
||||
|
||||
For symmetric cryptosystems, this would return ``self``.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@JWK.register
|
||||
class JWKES(JWK): # pragma: no cover
|
||||
|
|
@ -32,6 +48,13 @@ class JWKES(JWK): # pragma: no cover
|
|||
def fields_from_json(cls, jobj):
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def load(cls, string):
|
||||
raise NotImplementedError()
|
||||
|
||||
def public(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@JWK.register
|
||||
class JWKOct(JWK):
|
||||
|
|
@ -50,6 +73,13 @@ class JWKOct(JWK):
|
|||
def fields_from_json(cls, jobj):
|
||||
return cls(key=jobj['k'])
|
||||
|
||||
@classmethod
|
||||
def load(cls, string):
|
||||
return cls(key=string)
|
||||
|
||||
def public(self):
|
||||
return self
|
||||
|
||||
|
||||
@JWK.register
|
||||
class JWKRSA(JWK):
|
||||
|
|
@ -75,16 +105,19 @@ class JWKRSA(JWK):
|
|||
raise errors.DeserializationError()
|
||||
|
||||
@classmethod
|
||||
def load(cls, key):
|
||||
def load(cls, string):
|
||||
"""Load RSA key from string.
|
||||
|
||||
:param str key: RSA key in string form.
|
||||
:param str string: RSA key in string form.
|
||||
|
||||
:returns:
|
||||
:rtype: :class:`JWKRSA`
|
||||
|
||||
"""
|
||||
return cls(key=Crypto.PublicKey.RSA.importKey(key))
|
||||
return cls(key=Crypto.PublicKey.RSA.importKey(string))
|
||||
|
||||
def public(self):
|
||||
return type(self)(key=self.key.publickey())
|
||||
|
||||
@classmethod
|
||||
def fields_from_json(cls, jobj):
|
||||
|
|
@ -97,4 +130,3 @@ class JWKRSA(JWK):
|
|||
'n': self._encode_param(self.key.n),
|
||||
'e': self._encode_param(self.key.e),
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,13 @@ class JWKOctTest(unittest.TestCase):
|
|||
from letsencrypt.acme.jose.jwk import JWKOct
|
||||
self.assertEqual(self.jwk, JWKOct.from_json(self.jobj))
|
||||
|
||||
def test_load(self):
|
||||
from letsencrypt.acme.jose.jwk import JWKOct
|
||||
self.assertEqual(self.jwk, JWKOct.load('foo'))
|
||||
|
||||
def test_public(self):
|
||||
self.assertTrue(self.jwk.public() is self.jwk)
|
||||
|
||||
|
||||
class JWKRSATest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.acme.jose.jwk.JWKRSA."""
|
||||
|
|
@ -36,6 +43,7 @@ class JWKRSATest(unittest.TestCase):
|
|||
def setUp(self):
|
||||
from letsencrypt.acme.jose.jwk import JWKRSA
|
||||
self.jwk256 = JWKRSA(key=RSA256_KEY.publickey())
|
||||
self.jwk256_private = JWKRSA(key=RSA256_KEY)
|
||||
self.jwk256json = {
|
||||
'kty': 'RSA',
|
||||
'e': 'AQAB',
|
||||
|
|
@ -65,6 +73,9 @@ class JWKRSATest(unittest.TestCase):
|
|||
'letsencrypt.client.tests',
|
||||
os.path.join('testdata', 'rsa256_key.pem'))))
|
||||
|
||||
def test_public(self):
|
||||
self.assertEqual(self.jwk256, self.jwk256_private.public())
|
||||
|
||||
def test_to_json(self):
|
||||
self.assertEqual(self.jwk256.to_json(), self.jwk256json)
|
||||
self.assertEqual(self.jwk512.to_json(), self.jwk512json)
|
||||
|
|
|
|||
406
letsencrypt/acme/jose/jws.py
Normal file
406
letsencrypt/acme/jose/jws.py
Normal file
|
|
@ -0,0 +1,406 @@
|
|||
"""JOSE Web Signature."""
|
||||
import argparse
|
||||
import base64
|
||||
import sys
|
||||
|
||||
import M2Crypto.X509
|
||||
|
||||
from letsencrypt.acme.jose import b64
|
||||
from letsencrypt.acme.jose import errors
|
||||
from letsencrypt.acme.jose import json_util
|
||||
from letsencrypt.acme.jose import jwa
|
||||
from letsencrypt.acme.jose import jwk
|
||||
from letsencrypt.acme.jose import util
|
||||
|
||||
|
||||
class MediaType(object):
|
||||
"""MediaType field encoder/decoder."""
|
||||
|
||||
PREFIX = 'application/'
|
||||
"""MIME Media Type and Content Type prefix."""
|
||||
|
||||
@classmethod
|
||||
def decode(cls, value):
|
||||
"""Decoder."""
|
||||
# 4.1.10
|
||||
if '/' not in value:
|
||||
if ';' in value:
|
||||
raise errors.DeserializationError('Unexpected semi-colon')
|
||||
return cls.PREFIX + value
|
||||
return value
|
||||
|
||||
@classmethod
|
||||
def encode(cls, value):
|
||||
"""Encoder."""
|
||||
# 4.1.10
|
||||
if ';' not in value:
|
||||
assert value.startswith(cls.PREFIX)
|
||||
return value[len(cls.PREFIX):]
|
||||
return value
|
||||
|
||||
|
||||
class Header(json_util.JSONObjectWithFields):
|
||||
"""JOSE Header.
|
||||
|
||||
.. warning:: This class supports **only** Registered Header
|
||||
Parameter Names (as defined in section 4.1 of the
|
||||
protocol). If you need Public Header Parameter Names (4.2)
|
||||
or Private Header Parameter Names (4.3), you must subclass
|
||||
and override :meth:`from_json` and :meth:`to_json`
|
||||
appropriately.
|
||||
|
||||
.. warning:: This class does not support any extensions through
|
||||
the "crit" (Critical) Header Parameter (4.1.11) and as a
|
||||
conforming implementation, :meth:`from_json` treats its
|
||||
occurence as an error. Please subclass if you seek for
|
||||
a diferent behaviour.
|
||||
|
||||
:ivar x5tS256: "x5t#S256"
|
||||
:ivar str typ: MIME Media Type, inc. :const:`MediaType.PREFIX`.
|
||||
:ivar str cty: Content-Type, inc. :const:`MediaType.PREFIX`.
|
||||
|
||||
"""
|
||||
alg = json_util.Field(
|
||||
'alg', decoder=jwa.JWASignature.from_json, omitempty=True)
|
||||
jku = json_util.Field('jku', omitempty=True)
|
||||
jwk = json_util.Field('jwk', decoder=jwk.JWK.from_json, omitempty=True)
|
||||
kid = json_util.Field('kid', omitempty=True)
|
||||
x5u = json_util.Field('x5u', omitempty=True)
|
||||
x5c = json_util.Field('x5c', omitempty=True, default=())
|
||||
x5t = json_util.Field(
|
||||
'x5t', decoder=json_util.decode_b64jose, omitempty=True)
|
||||
x5tS256 = json_util.Field(
|
||||
'x5t#S256', decoder=json_util.decode_b64jose, omitempty=True)
|
||||
typ = json_util.Field('typ', encoder=MediaType.encode,
|
||||
decoder=MediaType.decode, omitempty=True)
|
||||
cty = json_util.Field('cty', encoder=MediaType.encode,
|
||||
decoder=MediaType.decode, omitempty=True)
|
||||
crit = json_util.Field('crit', omitempty=True, default=())
|
||||
|
||||
def not_omitted(self):
|
||||
"""Fields that would not be omitted in the JSON object."""
|
||||
return dict((name, getattr(self, name))
|
||||
for name, field in self._fields.iteritems()
|
||||
if not field.omit(getattr(self, name)))
|
||||
|
||||
def __add__(self, other):
|
||||
if not isinstance(other, type(self)):
|
||||
raise TypeError('Header cannot be added to: {0}'.format(
|
||||
type(other)))
|
||||
|
||||
not_omitted_self = self.not_omitted()
|
||||
not_omitted_other = other.not_omitted()
|
||||
|
||||
if set(not_omitted_self).intersection(not_omitted_other):
|
||||
raise TypeError('Addition of overlapping headers not defined')
|
||||
|
||||
not_omitted_self.update(not_omitted_other)
|
||||
return type(self)(**not_omitted_self) # pylint: disable=star-args
|
||||
|
||||
def find_key(self):
|
||||
"""Find key based on header.
|
||||
|
||||
.. todo:: Supports only "jwk" header parameter lookup.
|
||||
|
||||
:returns: (Public) key found in the header.
|
||||
:rtype: :class:`letsencrypt.acme.jose.jwk.JWK`
|
||||
|
||||
:raises letsencrypt.acme.jose.errors.Error: if key could not be found
|
||||
|
||||
"""
|
||||
if self.jwk is None:
|
||||
raise errors.Error('No key found')
|
||||
return self.jwk
|
||||
|
||||
@crit.decoder
|
||||
def crit(unused_value):
|
||||
# pylint: disable=missing-docstring,no-self-argument,no-self-use
|
||||
raise errors.DeserializationError(
|
||||
'"crit" is not supported, please subclass')
|
||||
|
||||
# x5c does NOT use JOSE Base64 (4.1.6)
|
||||
|
||||
@x5c.encoder
|
||||
def x5c(value): # pylint: disable=missing-docstring,no-self-argument
|
||||
return [base64.b64encode(cert.as_der()) for cert in value]
|
||||
|
||||
@x5c.decoder
|
||||
def x5c(value): # pylint: disable=missing-docstring,no-self-argument
|
||||
try:
|
||||
return tuple(util.ComparableX509(M2Crypto.X509.load_cert_der_string(
|
||||
base64.b64decode(cert))) for cert in value)
|
||||
except M2Crypto.X509.X509Error as error:
|
||||
raise errors.DeserializationError(error)
|
||||
|
||||
|
||||
class Signature(json_util.JSONObjectWithFields):
|
||||
"""JWS Signature.
|
||||
|
||||
:ivar combined: Combined Header (protected and unprotected,
|
||||
:class:`Header`).
|
||||
:ivar unicode protected: JWS protected header (Jose Base-64 decoded).
|
||||
:ivar header: JWS Unprotected Header (:class:`Header`).
|
||||
:ivar str signature: The signature.
|
||||
|
||||
"""
|
||||
header_cls = Header
|
||||
|
||||
__slots__ = ('combined',)
|
||||
protected = json_util.Field(
|
||||
'protected', omitempty=True, default='',
|
||||
decoder=json_util.decode_b64jose, encoder=b64.b64encode) # TODO: utf-8?
|
||||
header = json_util.Field(
|
||||
'header', omitempty=True, default=header_cls(),
|
||||
decoder=header_cls.from_json)
|
||||
signature = json_util.Field(
|
||||
'signature', decoder=json_util.decode_b64jose,
|
||||
encoder=b64.b64encode)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if 'combined' not in kwargs:
|
||||
kwargs = self._with_combined(kwargs)
|
||||
super(Signature, self).__init__(**kwargs)
|
||||
assert self.combined.alg is not None
|
||||
|
||||
@classmethod
|
||||
def _with_combined(cls, kwargs):
|
||||
assert 'combined' not in kwargs
|
||||
header = kwargs.get('header', cls._fields['header'].default)
|
||||
protected = kwargs.get('protected', cls._fields['protected'].default)
|
||||
|
||||
if protected:
|
||||
combined = header + cls.header_cls.json_loads(protected)
|
||||
else:
|
||||
combined = header
|
||||
|
||||
kwargs['combined'] = combined
|
||||
return kwargs
|
||||
|
||||
def verify(self, payload, key=None):
|
||||
"""Verify.
|
||||
|
||||
:param key: Key used for verification.
|
||||
:type key: :class:`letsencrypt.acme.jose.jwk.JWK`
|
||||
|
||||
"""
|
||||
key = self.combined.find_key() if key is None else key
|
||||
return self.combined.alg.verify(
|
||||
key=key.key, sig=self.signature,
|
||||
msg=(b64.b64encode(self.protected) + '.' +
|
||||
b64.b64encode(payload)))
|
||||
|
||||
@classmethod
|
||||
def sign(cls, payload, key, alg, include_jwk=True,
|
||||
protect=frozenset(), **kwargs):
|
||||
"""Sign.
|
||||
|
||||
:param key: Key for signature.
|
||||
:type key: :class:`letsencrypt.acme.jose.jwk.JWK`
|
||||
|
||||
"""
|
||||
assert isinstance(key, alg.kty)
|
||||
|
||||
header_params = kwargs
|
||||
header_params['alg'] = alg
|
||||
if include_jwk:
|
||||
header_params['jwk'] = key.public()
|
||||
|
||||
assert set(header_params).issubset(cls.header_cls._fields)
|
||||
assert protect.issubset(cls.header_cls._fields)
|
||||
|
||||
protected_params = {}
|
||||
for header in protect:
|
||||
protected_params[header] = header_params.pop(header)
|
||||
if protected_params:
|
||||
# pylint: disable=star-args
|
||||
protected = cls.header_cls(**protected_params).json_dumps()
|
||||
else:
|
||||
protected = ''
|
||||
|
||||
header = cls.header_cls(**header_params) # pylint: disable=star-args
|
||||
signature = alg.sign(key.key, b64.b64encode(protected)
|
||||
+ '.' + b64.b64encode(payload))
|
||||
|
||||
return cls(protected=protected, header=header, signature=signature)
|
||||
|
||||
def fields_to_json(self):
|
||||
fields = super(Signature, self).fields_to_json()
|
||||
if not fields['header'].not_omitted():
|
||||
del fields['header']
|
||||
return fields
|
||||
|
||||
@classmethod
|
||||
def fields_from_json(cls, jobj):
|
||||
fields = super(Signature, cls).fields_from_json(jobj)
|
||||
fields_with_combined = cls._with_combined(fields)
|
||||
if 'alg' not in fields_with_combined['combined'].not_omitted():
|
||||
raise errors.DeserializationError('alg not present')
|
||||
return fields_with_combined
|
||||
|
||||
|
||||
class JWS(json_util.JSONObjectWithFields):
|
||||
"""JSON Web Signature.
|
||||
|
||||
from letsencrypt.acme.jose import interfaces
|
||||
|
||||
:ivar str payload: JWS Payload.
|
||||
:ivar str signaturea: JWS Signatures.
|
||||
|
||||
"""
|
||||
__slots__ = ('payload', 'signatures')
|
||||
|
||||
def verify(self, key=None):
|
||||
"""Verify."""
|
||||
return all(sig.verify(self.payload, key) for sig in self.signatures)
|
||||
|
||||
@classmethod
|
||||
def sign(cls, payload, **kwargs):
|
||||
"""Sign."""
|
||||
return cls(payload=payload, signatures=(
|
||||
Signature.sign(payload=payload, **kwargs),))
|
||||
|
||||
@property
|
||||
def signature(self):
|
||||
"""Get a singleton signature.
|
||||
|
||||
:rtype: :class:`Signature`
|
||||
|
||||
"""
|
||||
assert len(self.signatures) == 1
|
||||
return self.signatures[0]
|
||||
|
||||
def to_compact(self):
|
||||
"""Compact serialization."""
|
||||
assert len(self.signatures) == 1
|
||||
|
||||
assert 'alg' not in self.signature.header.not_omitted()
|
||||
# ... it must be in protected
|
||||
|
||||
return '{0}.{1}.{2}'.format(
|
||||
b64.b64encode(self.signature.protected),
|
||||
b64.b64encode(self.payload),
|
||||
b64.b64encode(self.signature.signature))
|
||||
|
||||
@classmethod
|
||||
def from_compact(cls, compact):
|
||||
"""Compact deserialization."""
|
||||
try:
|
||||
protected, payload, signature = compact.split('.')
|
||||
except ValueError:
|
||||
raise errors.DeserializationError(
|
||||
'Compact JWS serialization should comprise of exactly'
|
||||
' 3 dot-separated components')
|
||||
sig = Signature(protected=json_util.decode_b64jose(protected),
|
||||
signature=json_util.decode_b64jose(signature))
|
||||
return cls(payload=json_util.decode_b64jose(payload), signatures=(sig,))
|
||||
|
||||
def to_json(self, flat=True): # pylint: disable=arguments-differ
|
||||
assert self.signatures
|
||||
payload = b64.b64encode(self.payload)
|
||||
|
||||
if flat and len(self.signatures) == 1:
|
||||
ret = self.signatures[0].to_json()
|
||||
ret['payload'] = payload
|
||||
return ret
|
||||
else:
|
||||
return {
|
||||
'payload': payload,
|
||||
'signatures': self.signatures,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, jobj):
|
||||
if 'signature' in jobj and 'signatures' in jobj:
|
||||
raise errors.DeserializationError('Flat mixed with non-flat')
|
||||
elif 'signature' in jobj: # flat
|
||||
return cls(payload=json_util.decode_b64jose(jobj.pop('payload')),
|
||||
signatures=(Signature.from_json(jobj),))
|
||||
else:
|
||||
return cls(payload=json_util.decode_b64jose(jobj['payload']),
|
||||
signatures=tuple(Signature.from_json(sig)
|
||||
for sig in jobj['signatures']))
|
||||
|
||||
class CLI(object):
|
||||
"""JWS CLI."""
|
||||
|
||||
@classmethod
|
||||
def sign(cls, args):
|
||||
"""Sign."""
|
||||
key = args.alg.kty.load(args.key.read())
|
||||
if args.protect is None:
|
||||
args.protect = []
|
||||
if args.compact:
|
||||
args.protect.append('alg')
|
||||
|
||||
sig = JWS.sign(payload=sys.stdin.read(), key=key, alg=args.alg,
|
||||
protect=set(args.protect))
|
||||
|
||||
if args.compact:
|
||||
print sig.to_compact()
|
||||
else: # JSON
|
||||
print sig.json_dumps_pretty()
|
||||
|
||||
@classmethod
|
||||
def verify(cls, args):
|
||||
"""Verify."""
|
||||
if args.compact:
|
||||
sig = JWS.from_compact(sys.stdin.read())
|
||||
else: # JSON
|
||||
try:
|
||||
sig = JWS.json_loads(sys.stdin.read())
|
||||
except errors.Error as error:
|
||||
print error
|
||||
return -1
|
||||
|
||||
if args.key is not None:
|
||||
assert args.kty is not None
|
||||
key = args.kty.load(args.key.read())
|
||||
else:
|
||||
key = None
|
||||
|
||||
sys.stdout.write(sig.payload)
|
||||
return int(not sig.verify(key=key))
|
||||
|
||||
@classmethod
|
||||
def _alg_type(cls, arg):
|
||||
return jwa.JWASignature.from_json(arg)
|
||||
|
||||
@classmethod
|
||||
def _header_type(cls, arg):
|
||||
assert arg in Signature.header_cls._fields
|
||||
return arg
|
||||
|
||||
@classmethod
|
||||
def _kty_type(cls, arg):
|
||||
assert arg in jwk.JWK.TYPES
|
||||
return jwk.JWK.TYPES[arg]
|
||||
|
||||
@classmethod
|
||||
def run(cls, args=sys.argv[1:]):
|
||||
"""Parse arguments and sign/verify."""
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--compact', action='store_true')
|
||||
|
||||
subparsers = parser.add_subparsers()
|
||||
parser_sign = subparsers.add_parser('sign')
|
||||
parser_sign.set_defaults(func=cls.sign)
|
||||
parser_sign.add_argument(
|
||||
'-k', '--key', type=argparse.FileType(), required=True)
|
||||
parser_sign.add_argument(
|
||||
'-a', '--alg', type=cls._alg_type, default=jwa.RS256)
|
||||
parser_sign.add_argument(
|
||||
'-p', '--protect', action='append', type=cls._header_type)
|
||||
|
||||
parser_verify = subparsers.add_parser('verify')
|
||||
parser_verify.set_defaults(func=cls.verify)
|
||||
parser_verify.add_argument(
|
||||
'-k', '--key', type=argparse.FileType(), required=False)
|
||||
parser_verify.add_argument(
|
||||
'--kty', type=cls._kty_type, required=False)
|
||||
|
||||
parsed = parser.parse_args(args)
|
||||
return parsed.func(parsed)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
exit(CLI.run()) # pragma: no cover
|
||||
234
letsencrypt/acme/jose/jws_test.py
Normal file
234
letsencrypt/acme/jose/jws_test.py
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
"""Tests for letsencrypt.acme.jose.jws."""
|
||||
import base64
|
||||
import os
|
||||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
import M2Crypto.X509
|
||||
import mock
|
||||
|
||||
from letsencrypt.acme.jose import b64
|
||||
from letsencrypt.acme.jose import errors
|
||||
from letsencrypt.acme.jose import jwa
|
||||
from letsencrypt.acme.jose import jwk
|
||||
from letsencrypt.acme.jose import util
|
||||
|
||||
|
||||
CERT = util.ComparableX509(M2Crypto.X509.load_cert(
|
||||
pkg_resources.resource_filename(
|
||||
'letsencrypt.client.tests', 'testdata/cert.pem')))
|
||||
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem')))
|
||||
|
||||
|
||||
class MediaTypeTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.acme.jose.jws.MediaType."""
|
||||
|
||||
def test_decode(self):
|
||||
from letsencrypt.acme.jose.jws import MediaType
|
||||
self.assertEqual('application/app', MediaType.decode('application/app'))
|
||||
self.assertEqual('application/app', MediaType.decode('app'))
|
||||
self.assertRaises(
|
||||
errors.DeserializationError, MediaType.decode, 'app;foo')
|
||||
|
||||
def test_encode(self):
|
||||
from letsencrypt.acme.jose.jws import MediaType
|
||||
self.assertEqual('app', MediaType.encode('application/app'))
|
||||
self.assertEqual('application/app;foo',
|
||||
MediaType.encode('application/app;foo'))
|
||||
|
||||
|
||||
class HeaderTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.acme.jose.jws.Header."""
|
||||
|
||||
def setUp(self):
|
||||
from letsencrypt.acme.jose.jws import Header
|
||||
self.header1 = Header(jwk='foo')
|
||||
self.header2 = Header(jwk='bar')
|
||||
self.crit = Header(crit=('a', 'b'))
|
||||
self.empty = Header()
|
||||
|
||||
def test_add_non_empty(self):
|
||||
from letsencrypt.acme.jose.jws import Header
|
||||
self.assertEqual(Header(jwk='foo', crit=('a', 'b')),
|
||||
self.header1 + self.crit)
|
||||
|
||||
def test_add_empty(self):
|
||||
self.assertEqual(self.header1, self.header1 + self.empty)
|
||||
self.assertEqual(self.header1, self.empty + self.header1)
|
||||
|
||||
def test_add_overlapping_error(self):
|
||||
self.assertRaises(TypeError, self.header1.__add__, self.header2)
|
||||
|
||||
def test_add_wrong_type_error(self):
|
||||
self.assertRaises(TypeError, self.header1.__add__, 'xxx')
|
||||
|
||||
def test_crit_decode_always_errors(self):
|
||||
from letsencrypt.acme.jose.jws import Header
|
||||
self.assertRaises(errors.DeserializationError, Header.from_json,
|
||||
{'crit': ['a', 'b']})
|
||||
|
||||
def test_x5c_decoding(self):
|
||||
from letsencrypt.acme.jose.jws import Header
|
||||
header = Header(x5c=(CERT, CERT))
|
||||
jobj = header.to_json()
|
||||
cert_b64 = base64.b64encode(CERT.as_der())
|
||||
self.assertEqual(jobj, {'x5c': [cert_b64, cert_b64]})
|
||||
self.assertEqual(header, Header.from_json(jobj))
|
||||
jobj['x5c'][0] = base64.b64encode('xxx' + CERT.as_der())
|
||||
self.assertRaises(errors.DeserializationError, Header.from_json, jobj)
|
||||
|
||||
def test_find_key(self):
|
||||
self.assertEqual('foo', self.header1.find_key())
|
||||
self.assertEqual('bar', self.header2.find_key())
|
||||
self.assertRaises(errors.Error, self.crit.find_key)
|
||||
|
||||
|
||||
class SignatureTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.acme.jose.jws.Signature."""
|
||||
|
||||
def test_from_json(self):
|
||||
from letsencrypt.acme.jose.jws import Header
|
||||
from letsencrypt.acme.jose.jws import Signature
|
||||
self.assertEqual(
|
||||
Signature(signature='foo', header=Header(alg=jwa.RS256)),
|
||||
Signature.from_json(
|
||||
{'signature': 'Zm9v', 'header': {'alg': 'RS256'}}))
|
||||
|
||||
def test_from_json_no_alg_error(self):
|
||||
from letsencrypt.acme.jose.jws import Signature
|
||||
self.assertRaises(errors.DeserializationError,
|
||||
Signature.from_json, {'signature': 'foo'})
|
||||
|
||||
|
||||
class JWSTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.acme.jose.jws.JWS."""
|
||||
|
||||
def setUp(self):
|
||||
self.privkey = jwk.JWKRSA(key=RSA512_KEY)
|
||||
self.pubkey = self.privkey.public()
|
||||
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
self.unprotected = JWS.sign(
|
||||
payload='foo', key=self.privkey, alg=jwa.RS256)
|
||||
self.protected = JWS.sign(
|
||||
payload='foo', key=self.privkey, alg=jwa.RS256,
|
||||
protect=frozenset(['jwk', 'alg']))
|
||||
self.mixed = JWS.sign(
|
||||
payload='foo', key=self.privkey, alg=jwa.RS256,
|
||||
protect=frozenset(['alg']))
|
||||
|
||||
def test_pubkey_jwk(self):
|
||||
self.assertEqual(self.unprotected.signature.combined.jwk, self.pubkey)
|
||||
self.assertEqual(self.protected.signature.combined.jwk, self.pubkey)
|
||||
self.assertEqual(self.mixed.signature.combined.jwk, self.pubkey)
|
||||
|
||||
def test_sign_unprotected(self):
|
||||
self.assertTrue(self.unprotected.verify())
|
||||
|
||||
def test_sign_protected(self):
|
||||
self.assertTrue(self.protected.verify())
|
||||
|
||||
def test_sign_mixed(self):
|
||||
self.assertTrue(self.mixed.verify())
|
||||
|
||||
def test_compact_lost_unprotected(self):
|
||||
compact = self.mixed.to_compact()
|
||||
self.assertEqual(
|
||||
'eyJhbGciOiAiUlMyNTYifQ.Zm9v.KBvYScRMEqJlp2xsReoY3CNDpVCWEU'
|
||||
'1PyRrf44nPBsmyQz__iuNR56pPNcACeHzJQnXhTVTxqFgjge2i_vw9NA',
|
||||
compact)
|
||||
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
mixed = JWS.from_compact(compact)
|
||||
|
||||
self.assertNotEqual(self.mixed, mixed)
|
||||
self.assertEqual(
|
||||
set(['alg']), set(mixed.signature.combined.not_omitted()))
|
||||
|
||||
def test_from_compact_missing_components(self):
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
self.assertRaises(errors.DeserializationError, JWS.from_compact, '.')
|
||||
|
||||
def test_json_omitempty(self):
|
||||
protected_jobj = self.protected.to_json(flat=True)
|
||||
unprotected_jobj = self.unprotected.to_json(flat=True)
|
||||
|
||||
self.assertTrue('protected' not in unprotected_jobj)
|
||||
self.assertTrue('header' not in protected_jobj)
|
||||
|
||||
unprotected_jobj['header'] = unprotected_jobj[
|
||||
'header'].fully_serialize()
|
||||
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
self.assertEqual(JWS.from_json(protected_jobj), self.protected)
|
||||
self.assertEqual(JWS.from_json(unprotected_jobj), self.unprotected)
|
||||
|
||||
def test_json_flat(self):
|
||||
jobj_to = {
|
||||
'signature': b64.b64encode(self.mixed.signature.signature),
|
||||
'payload': b64.b64encode('foo'),
|
||||
'header': self.mixed.signature.header,
|
||||
'protected': b64.b64encode(self.mixed.signature.protected),
|
||||
}
|
||||
jobj_from = jobj_to.copy()
|
||||
jobj_from['header'] = jobj_from['header'].fully_serialize()
|
||||
|
||||
self.assertEqual(self.mixed.to_json(flat=True), jobj_to)
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
self.assertEqual(self.mixed, JWS.from_json(jobj_from))
|
||||
|
||||
def test_json_not_flat(self):
|
||||
jobj_to = {
|
||||
'signatures': (self.mixed.signature,),
|
||||
'payload': b64.b64encode('foo'),
|
||||
}
|
||||
jobj_from = jobj_to.copy()
|
||||
jobj_from['signatures'] = [jobj_to['signatures'][0].fully_serialize()]
|
||||
|
||||
self.assertEqual(self.mixed.to_json(flat=False), jobj_to)
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
self.assertEqual(self.mixed, JWS.from_json(jobj_from))
|
||||
|
||||
def test_from_json_mixed_flat(self):
|
||||
from letsencrypt.acme.jose.jws import JWS
|
||||
self.assertRaises(errors.DeserializationError, JWS.from_json,
|
||||
{'signatures': (), 'signature': 'foo'})
|
||||
|
||||
|
||||
class CLITest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.key_path = pkg_resources.resource_filename(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem'))
|
||||
|
||||
def test_unverified(self):
|
||||
from letsencrypt.acme.jose.jws import CLI
|
||||
with mock.patch('sys.stdin') as sin, mock.patch('sys.stdout'):
|
||||
sin.read.return_value = '{"payload": "foo", "signature": "xxx"}'
|
||||
self.assertEqual(-1, CLI.run(['verify']))
|
||||
|
||||
def test_json(self):
|
||||
from letsencrypt.acme.jose.jws import CLI
|
||||
|
||||
with mock.patch('sys.stdin') as sin, mock.patch('sys.stdout') as sout:
|
||||
sin.read.return_value = 'foo'
|
||||
CLI.run(['sign', '-k', self.key_path, '-a', 'RS256',
|
||||
'-p', 'jwk'])
|
||||
sin.read.return_value = sout.write.mock_calls[0][1][0]
|
||||
self.assertEqual(0, CLI.run(['verify']))
|
||||
|
||||
def test_compact(self):
|
||||
from letsencrypt.acme.jose.jws import CLI
|
||||
|
||||
with mock.patch('sys.stdin') as sin, mock.patch('sys.stdout') as sout:
|
||||
sin.read.return_value = 'foo'
|
||||
CLI.run(['--compact', 'sign', '-k', self.key_path])
|
||||
sin.read.return_value = sout.write.mock_calls[0][1][0]
|
||||
self.assertEqual(0, CLI.run([
|
||||
'--compact', 'verify', '--kty', 'RSA', '-k', self.key_path]))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
1
setup.py
1
setup.py
|
|
@ -107,6 +107,7 @@ setup(
|
|||
entry_points={
|
||||
'console_scripts': [
|
||||
'letsencrypt = letsencrypt.scripts.main:main',
|
||||
'jws = letsencrypt.acme.jose.jws:CLI.run',
|
||||
],
|
||||
},
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue