Merge remote-tracking branch 'upstream/master' into plugin_tests

This commit is contained in:
Brad Warren 2015-07-13 12:01:14 -07:00
commit 44f69c525d
176 changed files with 1112 additions and 798 deletions

View file

@ -8,4 +8,5 @@
.git
.tox
venv
venv3
docs

2
.gitignore vendored
View file

@ -4,7 +4,7 @@
build/
dist/
/venv/
/.tox/
/venv3/
letsencrypt.log
# coverage

View file

@ -240,7 +240,9 @@ ignore-mixin-members=yes
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis
ignored-modules=pkg_resources,confargparse,argparse
ignored-modules=pkg_resources,confargparse,argparse,six.moves,six.moves.urllib
# import errors ignored only in 1.4.4
# https://bitbucket.org/logilab/pylint/commits/cd000904c9e2
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).

View file

@ -12,15 +12,16 @@ env:
global:
- GOPATH=/tmp/go
matrix:
- TOXENV=py26
- TOXENV=py27
- TOXENV=py26 BOULDER_INTEGRATION=1
- TOXENV=py27 BOULDER_INTEGRATION=1
- TOXENV=py33
- TOXENV=py34
- TOXENV=lint
- TOXENV=cover
install: "travis_retry pip install tox coveralls"
before_script: '[ "${TOXENV:0:2}" != "py" ] || ./tests/boulder-start.sh'
# TODO: eliminate substring slice bashism
script: 'travis_retry tox && ([ "${TOXENV:0:2}" != "py" ] || (source .tox/$TOXENV/bin/activate && ./tests/boulder-integration.sh))'
before_script: '[ "xxx$BOULDER_INTEGRATION" = "xxx" ] || ./tests/boulder-start.sh'
script: 'travis_retry tox && ([ "xxx$BOULDER_INTEGRATION" = "xxx" ] || (source .tox/$TOXENV/bin/activate && ./tests/boulder-integration.sh))'
after_success: '[ "$TOXENV" == "cover" ] && coveralls'

View file

@ -44,13 +44,17 @@ COPY setup.py README.rst CHANGES.rst MANIFEST.in /opt/letsencrypt/src/
COPY letsencrypt /opt/letsencrypt/src/letsencrypt/
COPY acme /opt/letsencrypt/src/acme/
COPY letsencrypt_apache /opt/letsencrypt/src/letsencrypt_apache/
COPY letsencrypt_nginx /opt/letsencrypt/src/letsencrypt_nginx/
COPY letsencrypt-apache /opt/letsencrypt/src/letsencrypt-apache/
COPY letsencrypt-nginx /opt/letsencrypt/src/letsencrypt-nginx/
# requirements.txt not installed!
RUN virtualenv --no-site-packages -p python2 /opt/letsencrypt/venv && \
/opt/letsencrypt/venv/bin/pip install -e /opt/letsencrypt/src
/opt/letsencrypt/venv/bin/pip install \
-e /opt/letsencrypt/src/acme \
-e /opt/letsencrypt/src \
-e /opt/letsencrypt/src/letsencrypt-apache \
-e /opt/letsencrypt/src/letsencrypt-nginx
# install in editable mode (-e) to save space: it's not possible to
# "rm -rf /opt/letsencrypt/src" (it's stays in the underlaying image);

View file

@ -1,15 +1,7 @@
include requirements.txt
include README.rst
include CHANGES.rst
include CONTRIBUTING.md
include linter_plugin.py
include letsencrypt/EULA
recursive-include letsencrypt/tests/testdata *
recursive-include acme/schemata *.json
recursive-include acme/jose/testdata *
recursive-include letsencrypt_apache/tests/testdata *
include letsencrypt_apache/options-ssl-apache.conf
recursive-include letsencrypt_nginx/tests/testdata *
include letsencrypt_nginx/options-ssl-nginx.conf

2
Vagrantfile vendored
View file

@ -10,7 +10,7 @@ cd /vagrant
sudo ./bootstrap/ubuntu.sh
if [ ! -d "venv" ]; then
virtualenv --no-site-packages -p python2 venv
./venv/bin/pip install -r requirements.txt -e .[dev,docs,testing]
./venv/bin/pip install -r requirements.txt -e acme -e .[dev,docs,testing] -e letsencrypt-apache -e letsencrypt-nginx
fi
SETUP_SCRIPT

1
acme/MANIFEST.in Normal file
View file

@ -0,0 +1 @@
recursive-include acme/testdata *

View file

@ -35,12 +35,7 @@ class DVChallenge(Challenge): # pylint: disable=abstract-method
class ChallengeResponse(interfaces.ClientRequestableResource,
jose.TypedJSONObjectWithFields):
# _fields_to_partial_json | pylint: disable=abstract-method
"""ACME challenge response.
:ivar str mitm_resource: ACME resource identifier used in client
HTTPS requests in order to protect against MITM.
"""
"""ACME challenge response."""
TYPES = {}
resource_type = 'challenge'
@ -56,14 +51,23 @@ class ChallengeResponse(interfaces.ClientRequestableResource,
@Challenge.register
class SimpleHTTP(DVChallenge):
"""ACME "simpleHttp" challenge."""
"""ACME "simpleHttp" challenge.
:ivar unicode token:
"""
typ = "simpleHttp"
token = jose.Field("token")
@ChallengeResponse.register
class SimpleHTTPResponse(ChallengeResponse):
"""ACME "simpleHttp" challenge response."""
"""ACME "simpleHttp" challenge response.
:ivar unicode path:
:ivar unicode tls:
"""
typ = "simpleHttp"
path = jose.Field("path")
tls = jose.Field("tls", default=True, omitempty=True)
@ -107,7 +111,7 @@ class SimpleHTTPResponse(ChallengeResponse):
Forms an URI to the HTTPS server provisioned resource
(containing :attr:`~SimpleHTTP.token`).
:param str domain: Domain name being verified.
:param unicode domain: Domain name being verified.
"""
return self._URI_TEMPLATE.format(
@ -121,7 +125,7 @@ class SimpleHTTPResponse(ChallengeResponse):
``requests.get`` is called with ``verify=False``.
:param .SimpleHTTP chall: Corresponding challenge.
:param str domain: Domain name being verified.
:param unicode domain: Domain name being verified.
:param int port: Port used in the validation.
:returns: ``True`` iff validation is successful, ``False``
@ -163,13 +167,13 @@ class SimpleHTTPResponse(ChallengeResponse):
class DVSNI(DVChallenge):
"""ACME "dvsni" challenge.
:ivar str r: Random data, **not** base64-encoded.
:ivar str nonce: Random data, **not** hex-encoded.
:ivar bytes r: Random data, **not** base64-encoded.
:ivar bytes nonce: Random data, **not** hex-encoded.
"""
typ = "dvsni"
DOMAIN_SUFFIX = ".acme.invalid"
DOMAIN_SUFFIX = b".acme.invalid"
"""Domain name suffix."""
R_SIZE = 32
@ -181,15 +185,19 @@ class DVSNI(DVChallenge):
PORT = 443
"""Port to perform DVSNI challenge."""
r = jose.Field("r", encoder=jose.b64encode, # pylint: disable=invalid-name
r = jose.Field("r", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
decoder=functools.partial(jose.decode_b64jose, size=R_SIZE))
nonce = jose.Field("nonce", encoder=binascii.hexlify,
nonce = jose.Field("nonce", encoder=jose.encode_hex16,
decoder=functools.partial(functools.partial(
jose.decode_hex16, size=NONCE_SIZE)))
@property
def nonce_domain(self):
"""Domain name used in SNI."""
"""Domain name used in SNI.
:rtype: bytes
"""
return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX
@ -197,7 +205,7 @@ class DVSNI(DVChallenge):
class DVSNIResponse(ChallengeResponse):
"""ACME "dvsni" challenge response.
:param str s: Random data, **not** base64-encoded.
:param bytes s: Random data, **not** base64-encoded.
"""
typ = "dvsni"
@ -208,7 +216,7 @@ class DVSNIResponse(ChallengeResponse):
S_SIZE = 32
"""Required size of the :attr:`s` in bytes."""
s = jose.Field("s", encoder=jose.b64encode, # pylint: disable=invalid-name
s = jose.Field("s", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
decoder=functools.partial(jose.decode_b64jose, size=S_SIZE))
def __init__(self, s=None, *args, **kwargs):
@ -221,11 +229,13 @@ class DVSNIResponse(ChallengeResponse):
:param challenge: Corresponding challenge.
:type challenge: :class:`DVSNI`
:rtype: bytes
"""
z = hashlib.new("sha256") # pylint: disable=invalid-name
z.update(chall.r)
z.update(self.s)
return z.hexdigest()
return z.hexdigest().encode()
def z_domain(self, chall):
"""Domain name for certificate subjectAltName."""
@ -233,7 +243,13 @@ class DVSNIResponse(ChallengeResponse):
@Challenge.register
class RecoveryContact(ContinuityChallenge):
"""ACME "recoveryContact" challenge."""
"""ACME "recoveryContact" challenge.
:ivar unicode activation_url:
:ivar unicode success_url:
:ivar unicode contact:
"""
typ = "recoveryContact"
activation_url = jose.Field("activationURL", omitempty=True)
@ -243,7 +259,11 @@ class RecoveryContact(ContinuityChallenge):
@ChallengeResponse.register
class RecoveryContactResponse(ChallengeResponse):
"""ACME "recoveryContact" challenge response."""
"""ACME "recoveryContact" challenge response.
:ivar unicode token:
"""
typ = "recoveryContact"
token = jose.Field("token", omitempty=True)
@ -256,7 +276,11 @@ class RecoveryToken(ContinuityChallenge):
@ChallengeResponse.register
class RecoveryTokenResponse(ChallengeResponse):
"""ACME "recoveryToken" challenge response."""
"""ACME "recoveryToken" challenge response.
:ivar unicode token:
"""
typ = "recoveryToken"
token = jose.Field("token", omitempty=True)
@ -265,7 +289,8 @@ class RecoveryTokenResponse(ChallengeResponse):
class ProofOfPossession(ContinuityChallenge):
"""ACME "proofOfPossession" challenge.
:ivar str nonce: Random data, **not** base64-encoded.
:ivar .JWAAlgorithm alg:
:ivar bytes nonce: Random data, **not** base64-encoded.
:ivar hints: Various clues for the client (:class:`Hints`).
"""
@ -277,8 +302,12 @@ class ProofOfPossession(ContinuityChallenge):
"""Hints for "proofOfPossession" challenge.
:ivar jwk: JSON Web Key (:class:`acme.jose.JWK`)
:ivar list certs: List of :class:`acme.jose.ComparableX509`
:ivar tuple cert_fingerprints: `tuple` of `unicode`
:ivar tuple certs: Sequence of :class:`acme.jose.ComparableX509`
certificates.
:ivar tuple subject_key_identifiers: `tuple` of `unicode`
:ivar tuple issuers: `tuple` of `unicode`
:ivar tuple authorized_for: `tuple` of `unicode`
"""
jwk = jose.Field("jwk", decoder=jose.JWK.from_json)
@ -301,7 +330,7 @@ class ProofOfPossession(ContinuityChallenge):
alg = jose.Field("alg", decoder=jose.JWASignature.from_json)
nonce = jose.Field(
"nonce", encoder=jose.b64encode, decoder=functools.partial(
"nonce", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE))
hints = jose.Field("hints", decoder=Hints.from_json)
@ -310,8 +339,8 @@ class ProofOfPossession(ContinuityChallenge):
class ProofOfPossessionResponse(ChallengeResponse):
"""ACME "proofOfPossession" challenge response.
:ivar str nonce: Random data, **not** base64-encoded.
:ivar signature: :class:`~acme.other.Signature` of this message.
:ivar bytes nonce: Random data, **not** base64-encoded.
:ivar acme.other.Signature signature: Sugnature of this message.
"""
typ = "proofOfPossession"
@ -319,7 +348,7 @@ class ProofOfPossessionResponse(ChallengeResponse):
NONCE_SIZE = ProofOfPossession.NONCE_SIZE
nonce = jose.Field(
"nonce", encoder=jose.b64encode, decoder=functools.partial(
"nonce", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE))
signature = jose.Field("signature", decoder=other.Signature.from_json)
@ -331,7 +360,11 @@ class ProofOfPossessionResponse(ChallengeResponse):
@Challenge.register
class DNS(DVChallenge):
"""ACME "dns" challenge."""
"""ACME "dns" challenge.
:ivar unicode token:
"""
typ = "dns"
token = jose.Field("token")

View file

@ -1,26 +1,19 @@
"""Tests for acme.challenges."""
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
import requests
import urlparse
from six.moves.urllib import parse as urllib_parse # pylint: disable=import-error
from acme import jose
from acme import other
from acme import test_util
CERT = jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', 'cert.pem'))))
KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend())
CERT = test_util.load_cert('cert.pem')
KEY = test_util.load_rsa_private_key('rsa512_key.pem')
class ChallengeResponseTest(unittest.TestCase):
@ -144,7 +137,7 @@ class SimpleHTTPResponseTest(unittest.TestCase):
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_port(self, mock_get):
self.resp_http.simple_verify(self.chall, "local", 4430)
self.assertEqual("local:4430", urlparse.urlparse(
self.assertEqual("local:4430", urllib_parse.urlparse(
mock_get.mock_calls[0][1][0]).netloc)
@ -153,9 +146,9 @@ class DVSNITest(unittest.TestCase):
def setUp(self):
from acme.challenges import DVSNI
self.msg = DVSNI(
r="O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce='\xa8-_\xf8\xeft\r\x12\x88\x1fm<"w\xab.')
r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=b'\xa8-_\xf8\xeft\r\x12\x88\x1fm<"w\xab.')
self.jmsg = {
'type': 'dvsni',
'r': 'Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI',
@ -163,7 +156,7 @@ class DVSNITest(unittest.TestCase):
}
def test_nonce_domain(self):
self.assertEqual('a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid',
self.assertEqual(b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid',
self.msg.nonce_domain)
def test_to_partial_json(self):
@ -195,8 +188,8 @@ class DVSNIResponseTest(unittest.TestCase):
def setUp(self):
from acme.challenges import DVSNIResponse
self.msg = DVSNIResponse(
s='\xf5\xd6\xe3\xb2]\xe0L\x0bN\x9cKJ\x14I\xa1K\xa3#\xf9\xa8'
'\xcd\x8c7\x0e\x99\x19)\xdc\xb7\xf3\x9bw')
s=b'\xf5\xd6\xe3\xb2]\xe0L\x0bN\x9cKJ\x14I\xa1K\xa3#\xf9\xa8'
b'\xcd\x8c7\x0e\x99\x19)\xdc\xb7\xf3\x9bw')
self.jmsg = {
'type': 'dvsni',
's': '9dbjsl3gTAtOnEtKFEmhS6Mj-ajNjDcOmRkp3Lfzm3c',
@ -205,15 +198,14 @@ class DVSNIResponseTest(unittest.TestCase):
def test_z_and_domain(self):
from acme.challenges import DVSNI
challenge = DVSNI(
r="O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=long('439736375371401115242521957580409149254868992063'
'44333654741504362774620418661L'))
r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=int('439736375371401115242521957580409149254868992063'
'44333654741504362774620418661'))
# pylint: disable=invalid-name
z = '38e612b0397cc2624a07d351d7ef50e46134c0213d9ed52f7d7c611acaeed41b'
z = b'38e612b0397cc2624a07d351d7ef50e46134c0213d9ed52f7d7c611acaeed41b'
self.assertEqual(z, self.msg.z(challenge))
self.assertEqual(
'{0}.acme.invalid'.format(z), self.msg.z_domain(challenge))
self.assertEqual(z + b'.acme.invalid', self.msg.z_domain(challenge))
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
@ -370,7 +362,7 @@ class ProofOfPossessionHintsTest(unittest.TestCase):
self.jmsg_to = {
'jwk': jwk,
'certFingerprints': cert_fingerprints,
'certs': (jose.b64encode(OpenSSL.crypto.dump_certificate(
'certs': (jose.encode_b64jose(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_ASN1, CERT)),),
'subjectKeyIdentifiers': subject_key_identifiers,
'serialNumbers': serial_numbers,
@ -421,7 +413,7 @@ class ProofOfPossessionTest(unittest.TestCase):
issuers=(), authorized_for=())
self.msg = ProofOfPossession(
alg=jose.RS256, hints=hints,
nonce='xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ')
nonce=b'xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ')
self.jmsg_to = {
'type': 'proofOfPossession',
@ -457,16 +449,16 @@ class ProofOfPossessionResponseTest(unittest.TestCase):
# mistake here...
signature = other.Signature(
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.public_key()),
sig='\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83'
'\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap'
'\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde'
'\x99\x08\xf0\x0e{',
nonce='\x99\xc7Q\xb3f2\xbc\xdci\xfe\xd6\x98k\xc67\xdf',
sig=b'\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83'
b'\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap'
b'\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde'
b'\x99\x08\xf0\x0e{',
nonce=b'\x99\xc7Q\xb3f2\xbc\xdci\xfe\xd6\x98k\xc67\xdf',
)
from acme.challenges import ProofOfPossessionResponse
self.msg = ProofOfPossessionResponse(
nonce='xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ',
nonce=b'xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ',
signature=signature)
self.jmsg_to = {

View file

@ -1,13 +1,15 @@
"""ACME client API."""
import datetime
import heapq
import httplib
import json
import logging
import time
from six.moves import http_client # pylint: disable=import-error
import OpenSSL
import requests
import six
import werkzeug
from acme import errors
@ -19,7 +21,8 @@ from acme import messages
logger = logging.getLogger(__name__)
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
if six.PY2:
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
class Client(object): # pylint: disable=too-many-instance-attributes
@ -80,7 +83,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
new_reg = messages.Registration() if new_reg is None else new_reg
response = self.net.post(self.new_reg_uri, new_reg)
assert response.status_code == httplib.CREATED # TODO: handle errors
# TODO: handle errors
assert response.status_code == http_client.CREATED
# "Instance of 'Field' has no key/contact member" bug:
# pylint: disable=no-member
@ -162,7 +166,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
"""
new_authz = messages.Authorization(identifier=identifier)
response = self.net.post(new_authzr_uri, new_authz)
assert response.status_code == httplib.CREATED # TODO: handle errors
# TODO: handle errors
assert response.status_code == http_client.CREATED
return self._authzr_from_response(response, identifier)
def request_domain_challenges(self, domain, new_authz_uri):
@ -424,7 +429,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
"""
response = self.net.post(messages.Revocation.url(self.new_reg_uri),
messages.Revocation(certificate=cert))
if response.status_code != httplib.OK:
if response.status_code != http_client.OK:
raise errors.ClientError(
'Successful revocation must return HTTP OK status')
@ -447,12 +452,13 @@ class ClientNetwork(object):
.. todo:: Implement ``acmePath``.
:param .ClientRequestableResource obj:
:param bytes nonce:
:rtype: `.JWS`
"""
jobj = obj.to_json()
jobj['resource'] = obj.resource_type
dumps = json.dumps(jobj)
dumps = json.dumps(jobj).encode()
logger.debug('Serialized JSON: %s', dumps)
return jws.JWS.sign(
payload=dumps, key=self.key, alg=self.alg, nonce=nonce).json_dumps()
@ -555,12 +561,12 @@ class ClientNetwork(object):
def _add_nonce(self, response):
if self.REPLAY_NONCE_HEADER in response.headers:
nonce = response.headers[self.REPLAY_NONCE_HEADER]
error = jws.Header.validate_nonce(nonce)
if error is None:
logger.debug('Storing nonce: %r', nonce)
self._nonces.add(nonce)
else:
try:
decoded_nonce = jws.Header._fields['nonce'].decode(nonce)
except jose.DeserializationError as error:
raise errors.BadNonce(nonce, error)
logger.debug('Storing nonce: %r', decoded_nonce)
self._nonces.add(decoded_nonce)
else:
raise errors.MissingNonce(response)

View file

@ -1,11 +1,10 @@
"""Tests for acme.client."""
import datetime
import httplib
import json
import os
import pkg_resources
import unittest
from six.moves import http_client # pylint: disable=import-error
import mock
import requests
@ -15,14 +14,12 @@ from acme import jose
from acme import jws as acme_jws
from acme import messages
from acme import messages_test
from acme import test_util
CERT_DER = pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'cert.der'))
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
KEY2 = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa256_key.pem')))
CERT_DER = test_util.load_vector('cert.der')
KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
KEY2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
class ClientTest(unittest.TestCase):
@ -31,7 +28,7 @@ class ClientTest(unittest.TestCase):
def setUp(self):
self.response = mock.MagicMock(
ok=True, status_code=httplib.OK, headers={}, links={})
ok=True, status_code=http_client.OK, headers={}, links={})
self.net = mock.MagicMock()
self.net.post.return_value = self.response
self.net.get.return_value = self.response
@ -77,7 +74,7 @@ class ClientTest(unittest.TestCase):
def test_register(self):
# "Instance of 'Field' has no to_json/update member" bug:
# pylint: disable=no-member
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
@ -95,7 +92,7 @@ class ClientTest(unittest.TestCase):
errors.UnexpectedUpdate, self.client.register, self.regr.body)
def test_register_missing_next(self):
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.assertRaises(
errors.ClientError, self.client.register, self.regr.body)
@ -119,7 +116,7 @@ class ClientTest(unittest.TestCase):
self.assertEqual(self.regr.terms_of_service, regr.body.agreement)
def test_request_challenges(self):
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.response.headers['Location'] = self.authzr.uri
self.response.json.return_value = self.authz.to_json()
self.response.links = {
@ -137,7 +134,7 @@ class ClientTest(unittest.TestCase):
self.identifier, self.authzr.uri)
def test_request_challenges_missing_next(self):
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.assertRaises(
errors.ClientError, self.client.request_challenges,
self.identifier, self.regr)
@ -349,7 +346,7 @@ class ClientTest(unittest.TestCase):
self.client.new_reg_uri), mock.ANY)
def test_revoke_bad_status_raises_error(self):
self.response.status_code = httplib.METHOD_NOT_ALLOWED
self.response.status_code = http_client.METHOD_NOT_ALLOWED
self.assertRaises(errors.ClientError, self.client.revoke, self.certr)
@ -364,7 +361,7 @@ class ClientNetworkTest(unittest.TestCase):
self.net = ClientNetwork(
key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl)
self.response = mock.MagicMock(ok=True, status_code=httplib.OK)
self.response = mock.MagicMock(ok=True, status_code=http_client.OK)
self.response.headers = {}
self.response.links = {}
@ -384,12 +381,11 @@ class ClientNetworkTest(unittest.TestCase):
pass # pragma: no cover
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockClientRequestableResource('foo'), nonce='Tg')
MockClientRequestableResource('foo'), nonce=b'Tg')
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload),
self.assertEqual(json.loads(jws.payload.decode()),
{'foo': 'foo', 'resource': 'mock'})
self.assertEqual(jws.signature.combined.nonce, 'Tg')
# TODO: check that nonce is in protected header
self.assertEqual(jws.signature.combined.nonce, b'Tg')
def test_check_response_not_ok_jobj_no_error(self):
self.response.ok = False
@ -477,7 +473,7 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
from acme.client import ClientNetwork
self.net = ClientNetwork(key=None, alg=None)
self.response = mock.MagicMock(ok=True, status_code=httplib.OK)
self.response = mock.MagicMock(ok=True, status_code=http_client.OK)
self.response.headers = {}
self.response.links = {}
self.checked_response = mock.MagicMock()
@ -485,13 +481,14 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.wrapped_obj = mock.MagicMock()
self.content_type = mock.sentinel.content_type
self.all_nonces = [jose.b64encode('Nonce'), jose.b64encode('Nonce2')]
self.all_nonces = [jose.b64encode(b'Nonce'), jose.b64encode(b'Nonce2')]
self.available_nonces = self.all_nonces[:]
def send_request(*args, **kwargs):
# pylint: disable=unused-argument,missing-docstring
if self.available_nonces:
self.response.headers = {
self.net.REPLAY_NONCE_HEADER: self.available_nonces.pop()}
self.net.REPLAY_NONCE_HEADER:
self.available_nonces.pop().decode()}
else:
self.response.headers = {}
return self.response
@ -523,21 +520,21 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.assertEqual(self.checked_response, self.net.post(
'uri', self.obj, content_type=self.content_type))
self.net._wrap_in_jws.assert_called_once_with(
self.obj, self.all_nonces.pop())
self.obj, jose.b64decode(self.all_nonces.pop()))
assert not self.available_nonces
self.assertRaises(errors.MissingNonce, self.net.post,
'uri', self.obj, content_type=self.content_type)
self.net._wrap_in_jws.assert_called_with(
self.obj, self.all_nonces.pop())
self.obj, jose.b64decode(self.all_nonces.pop()))
def test_post_wrong_initial_nonce(self): # HEAD
self.available_nonces = ['f', jose.b64encode('good')]
self.available_nonces = [b'f', jose.b64encode(b'good')]
self.assertRaises(errors.BadNonce, self.net.post, 'uri',
self.obj, content_type=self.content_type)
def test_post_wrong_post_response_nonce(self):
self.available_nonces = [jose.b64encode('good'), 'f']
self.available_nonces = [jose.b64encode(b'good'), b'f']
self.assertRaises(errors.BadNonce, self.net.post, 'uri',
self.obj, content_type=self.content_type)

View file

@ -5,7 +5,7 @@ from acme import jose
class ClientRequestableResource(jose.JSONDeSerializable):
"""Resource that can be requested by client.
:ivar str resource_type: ACME resource identifier used in client
:ivar unicode resource_type: ACME resource identifier used in client
HTTPS requests in order to protect against MITM.
"""

View file

@ -44,8 +44,10 @@ from acme.jose.json_util import (
decode_cert,
decode_csr,
decode_hex16,
encode_b64jose,
encode_cert,
encode_csr,
encode_hex16,
)
from acme.jose.jwa import (

View file

@ -9,28 +9,30 @@
.. _`JOSE Base64`:
https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
.. warning:: Do NOT try to call this module "base64",
as it will "shadow" the standard library.
.. Do NOT try to call this module "base64", as it will "shadow" the
standard library.
"""
import base64
import six
def b64encode(data):
"""JOSE Base64 encode.
:param data: Data to be encoded.
:type data: str or bytearray
:type data: `bytes`
:returns: JOSE Base64 string.
:rtype: str
:rtype: bytes
:raises TypeError: if `data` is of incorrect type
"""
if not isinstance(data, str):
raise TypeError('argument should be str or bytearray')
return base64.urlsafe_b64encode(data).rstrip('=')
if not isinstance(data, six.binary_type):
raise TypeError('argument should be {0}'.format(six.binary_type))
return base64.urlsafe_b64encode(data).rstrip(b'=')
def b64decode(data):
@ -38,21 +40,22 @@ def b64decode(data):
:param data: Base64 string to be decoded. If it's unicode, then
only ASCII characters are allowed.
:type data: str or unicode
:type data: `bytes` or `unicode`
:returns: Decoded data.
:rtype: bytes
:raises TypeError: if input is of incorrect type
:raises ValueError: if input is unicode with non-ASCII characters
"""
if isinstance(data, unicode):
if isinstance(data, six.string_types):
try:
data = data.encode('ascii')
except UnicodeEncodeError:
raise ValueError(
'unicode argument should contain only ASCII characters')
elif not isinstance(data, str):
elif not isinstance(data, six.binary_type):
raise TypeError('argument should be a str or unicode')
return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4)))
return base64.urlsafe_b64decode(data + b'=' * (4 - (len(data) % 4)))

View file

@ -1,20 +1,22 @@
"""Tests for acme.jose.b64."""
import unittest
import six
# https://en.wikipedia.org/wiki/Base64#Examples
B64_PADDING_EXAMPLES = {
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
b'any carnal pleasure.': (b'YW55IGNhcm5hbCBwbGVhc3VyZS4', b'='),
b'any carnal pleasure': (b'YW55IGNhcm5hbCBwbGVhc3VyZQ', b'=='),
b'any carnal pleasur': (b'YW55IGNhcm5hbCBwbGVhc3Vy', b''),
b'any carnal pleasu': (b'YW55IGNhcm5hbCBwbGVhc3U', b'='),
b'any carnal pleas': (b'YW55IGNhcm5hbCBwbGVhcw', b'=='),
}
B64_URL_UNSAFE_EXAMPLES = {
chr(251) + chr(239): '--8',
chr(255) * 2: '__8',
six.int2byte(251) + six.int2byte(239): b'--8',
six.int2byte(255) * 2: b'__8',
}
@ -26,12 +28,15 @@ class B64EncodeTest(unittest.TestCase):
from acme.jose.b64 import b64encode
return b64encode(data)
def test_empty(self):
self.assertEqual(self._call(b''), b'')
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
for text, b64 in six.iteritems(B64_URL_UNSAFE_EXAMPLES):
self.assertEqual(self._call(text), b64)
def test_different_paddings(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
for text, (b64, _) in six.iteritems(B64_PADDING_EXAMPLES):
self.assertEqual(self._call(text), b64)
def test_unicode_fails_with_type_error(self):
@ -47,24 +52,24 @@ class B64DecodeTest(unittest.TestCase):
return b64decode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
for text, b64 in six.iteritems(B64_URL_UNSAFE_EXAMPLES):
self.assertEqual(self._call(b64), text)
def test_input_without_padding(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
for text, (b64, _) in six.iteritems(B64_PADDING_EXAMPLES):
self.assertEqual(self._call(b64), text)
def test_input_with_padding(self):
for text, (b64, pad) in B64_PADDING_EXAMPLES.iteritems():
for text, (b64, pad) in six.iteritems(B64_PADDING_EXAMPLES):
self.assertEqual(self._call(b64 + pad), text)
def test_unicode_with_ascii(self):
self.assertEqual(self._call(u'YQ'), 'a')
self.assertEqual(self._call(u'YQ'), b'a')
def test_non_ascii_unicode_fails(self):
self.assertRaises(ValueError, self._call, u'\u0105')
def test_type_error_no_unicode_or_str(self):
def test_type_error_no_unicode_or_bytes(self):
self.assertRaises(TypeError, self._call, object())

View file

@ -3,12 +3,15 @@ import abc
import collections
import json
import six
from acme.jose import util
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
# pylint: disable=too-few-public-methods
@six.add_metaclass(abc.ABCMeta)
class JSONDeSerializable(object):
# pylint: disable=too-few-public-methods
"""Interface for (de)serializable JSON objects.
@ -96,7 +99,6 @@ class JSONDeSerializable(object):
return Bar()
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def to_partial_json(self): # pragma: no cover
@ -133,7 +135,7 @@ class JSONDeSerializable(object):
def _serialize(obj):
if isinstance(obj, JSONDeSerializable):
return _serialize(obj.to_partial_json())
if isinstance(obj, basestring): # strings are sequence
if isinstance(obj, six.string_types): # strings are Sequence
return obj
elif isinstance(obj, list):
return [_serialize(subobj) for subobj in obj]
@ -143,14 +145,14 @@ class JSONDeSerializable(object):
return tuple(_serialize(subobj) for subobj in obj)
elif isinstance(obj, collections.Mapping):
return dict((_serialize(key), _serialize(value))
for key, value in obj.iteritems())
for key, value in six.iteritems(obj))
else:
return obj
return _serialize(self)
@util.abstractclassmethod
def from_json(cls, unused_jobj):
def from_json(cls, jobj): # pylint: disable=unused-argument
"""Deserialize a decoded JSON document.
:param jobj: Python object, composed of only other basic data
@ -182,7 +184,11 @@ class JSONDeSerializable(object):
return json.dumps(self, default=self.json_dump_default, **kwargs)
def json_dumps_pretty(self):
"""Dump the object to pretty JSON document string."""
"""Dump the object to pretty JSON document string.
:rtype: str
"""
return self.json_dumps(sort_keys=True, indent=4, separators=(',', ': '))
@classmethod
@ -190,7 +196,7 @@ class JSONDeSerializable(object):
"""Serialize Python object.
This function is meant to be passed as ``default`` to
:func:`json.load` or :func:`json.loads`. They call
:func:`json.dump` or :func:`json.dumps`. They call
``default(python_object)`` only for non-basic Python types, so
this function necessarily raises :class:`TypeError` if
``python_object`` is not an instance of

View file

@ -11,6 +11,7 @@ import binascii
import logging
import OpenSSL
import six
from acme.jose import b64
from acme.jose import errors
@ -109,7 +110,7 @@ class Field(object):
elif isinstance(value, dict):
return util.frozendict(
dict((cls.default_decoder(key), cls.default_decoder(value))
for key, value in value.iteritems()))
for key, value in six.iteritems(value)))
else: # integer or string
return value
@ -167,17 +168,20 @@ class JSONObjectWithFieldsMeta(abc.ABCMeta):
for base in bases:
fields.update(getattr(base, '_fields', {}))
# Do not reorder, this class might override fields from base classes!
for key, value in dikt.items(): # not iterkeys() (in-place edit!)
for key, value in tuple(six.iteritems(dikt)):
# not six.iterkeys() (in-place edit!)
if isinstance(value, Field):
fields[key] = dikt.pop(key)
dikt['_orig_slots'] = dikt.get('__slots__', ())
dikt['__slots__'] = tuple(list(dikt['_orig_slots']) + fields.keys())
dikt['__slots__'] = tuple(
list(dikt['_orig_slots']) + list(six.iterkeys(fields)))
dikt['_fields'] = fields
return abc.ABCMeta.__new__(mcs, name, bases, dikt)
@six.add_metaclass(JSONObjectWithFieldsMeta)
class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
# pylint: disable=too-few-public-methods
"""JSON object with fields.
@ -205,13 +209,12 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
assert Foo(bar='baz').bar == 'baz'
"""
__metaclass__ = JSONObjectWithFieldsMeta
@classmethod
def _defaults(cls):
"""Get default fields values."""
return dict([(slot, field.default) for slot, field
in cls._fields.iteritems() if field.omitempty])
in six.iteritems(cls._fields) if field.omitempty])
def __init__(self, **kwargs):
# pylint: disable=star-args
@ -222,7 +225,7 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
"""Serialize fields to JSON."""
jobj = {}
omitted = set()
for slot, field in self._fields.iteritems():
for slot, field in six.iteritems(self._fields):
value = getattr(self, slot)
if field.omit(value):
@ -246,7 +249,7 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
@classmethod
def _check_required(cls, jobj):
missing = set()
for _, field in cls._fields.iteritems():
for _, field in six.iteritems(cls._fields):
if not field.omitempty and field.json_name not in jobj:
missing.add(field.json_name)
@ -260,7 +263,7 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
"""Deserialize fields from JSON."""
cls._check_required(jobj)
fields = {}
for slot, field in cls._fields.iteritems():
for slot, field in six.iteritems(cls._fields):
if field.json_name not in jobj and field.omitempty:
fields[slot] = field.default
else:
@ -278,17 +281,31 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
return cls(**cls.fields_from_json(jobj))
def encode_b64jose(data):
"""Encode JOSE Base-64 field.
:param bytes data:
:rtype: `unicode`
"""
# b64encode produces ASCII characters only
return b64.b64encode(data).decode('ascii')
def decode_b64jose(data, size=None, minimum=False):
"""Decode JOSE Base-64 field.
:param unicode data:
:param int size: Required length (after decoding).
:param bool minimum: If ``True``, then `size` will be treated as
minimum required length, as opposed to exact equality.
:rtype: bytes
"""
error_cls = TypeError if six.PY2 else binascii.Error
try:
decoded = b64.b64decode(data)
except TypeError as error:
decoded = b64.b64decode(data.encode())
except error_cls as error:
raise errors.DeserializationError(error)
if size is not None and ((not minimum and len(decoded) != size)
@ -297,35 +314,53 @@ def decode_b64jose(data, size=None, minimum=False):
return decoded
def encode_hex16(value):
"""Hexlify.
:param bytes value:
:rtype: unicode
"""
return binascii.hexlify(value).decode()
def decode_hex16(value, size=None, minimum=False):
"""Decode hexlified field.
:param unicode value:
:param int size: Required length (after decoding).
:param bool minimum: If ``True``, then `size` will be treated as
minimum required length, as opposed to exact equality.
:rtype: bytes
"""
value = value.encode()
if size is not None and ((not minimum and len(value) != size * 2)
or (minimum and len(value) < size * 2)):
raise errors.DeserializationError()
error_cls = TypeError if six.PY2 else binascii.Error
try:
return binascii.unhexlify(value)
except TypeError as error:
except error_cls as error:
raise errors.DeserializationError(error)
def encode_cert(cert):
"""Encode certificate as JOSE Base-64 DER.
:param cert: Certificate.
:type cert: :class:`acme.jose.util.ComparableX509`
:type cert: `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
:rtype: unicode
"""
return b64.b64encode(OpenSSL.crypto.dump_certificate(
return encode_b64jose(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_ASN1, cert))
def decode_cert(b64der):
"""Decode JOSE Base-64 DER-encoded certificate."""
"""Decode JOSE Base-64 DER-encoded certificate.
:param unicode b64der:
:rtype: `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
"""
try:
return util.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, decode_b64jose(b64der)))
@ -333,12 +368,22 @@ def decode_cert(b64der):
raise errors.DeserializationError(error)
def encode_csr(csr):
"""Encode CSR as JOSE Base-64 DER."""
return b64.b64encode(OpenSSL.crypto.dump_certificate_request(
"""Encode CSR as JOSE Base-64 DER.
:type csr: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
:rtype: unicode
"""
return encode_b64jose(OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, csr))
def decode_csr(b64der):
"""Decode JOSE Base-64 DER-encoded CSR."""
"""Decode JOSE Base-64 DER-encoded CSR.
:param unicode b64der:
:rtype: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
"""
try:
return util.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, decode_b64jose(b64der)))
@ -372,7 +417,7 @@ class TypedJSONObjectWithFields(JSONObjectWithFields):
@classmethod
def get_type_cls(cls, jobj):
"""Get the registered class for ``jobj``."""
if cls in cls.TYPES.itervalues():
if cls in six.itervalues(cls.TYPES):
assert jobj[cls.type_field_name]
# cls is already registered type_cls, force to use it
# so that, e.g Revocation.from_json(jobj) fails if

View file

@ -1,23 +1,19 @@
"""Tests for acme.jose.json_util."""
import itertools
import os
import pkg_resources
import unittest
import mock
import OpenSSL
import six
from acme import test_util
from acme.jose import errors
from acme.jose import interfaces
from acme.jose import util
CERT = util.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', 'cert.pem'))))
CSR = util.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', 'csr.pem'))))
CERT = test_util.load_cert('cert.pem')
CSR = test_util.load_csr('csr.pem')
class FieldTest(unittest.TestCase):
@ -97,8 +93,8 @@ class JSONObjectWithFieldsMetaTest(unittest.TestCase):
self.field2 = Field('Baz2')
# pylint: disable=invalid-name,missing-docstring,too-few-public-methods
# pylint: disable=blacklisted-name
@six.add_metaclass(JSONObjectWithFieldsMeta)
class A(object):
__metaclass__ = JSONObjectWithFieldsMeta
__slots__ = ('bar',)
baz = self.field
class B(A):
@ -212,62 +208,82 @@ class JSONObjectWithFieldsTest(unittest.TestCase):
class DeEncodersTest(unittest.TestCase):
def setUp(self):
self.b64_cert = (
'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM'
'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz'
'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF'
'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx'
'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI'
'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW'
'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD'
'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1'
'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE'
'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd'
'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o'
u'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM'
u'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz'
u'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF'
u'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx'
u'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI'
u'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW'
u'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD'
u'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1'
u'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE'
u'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd'
u'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o'
)
self.b64_csr = (
'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F'
'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw'
'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb'
'20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As'
'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3'
'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG'
'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW'
'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg'
u'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F'
u'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw'
u'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb'
u'20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As'
u'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3'
u'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG'
u'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW'
u'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg'
)
def test_decode_b64_jose_padding_error(self):
from acme.jose.json_util import decode_b64jose
self.assertRaises(errors.DeserializationError, decode_b64jose, 'x')
def test_encode_b64jose(self):
from acme.jose.json_util import encode_b64jose
encoded = encode_b64jose(b'x')
self.assertTrue(isinstance(encoded, six.string_types))
self.assertEqual(u'eA', encoded)
def test_decode_b64_jose_size(self):
def test_decode_b64jose(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual('foo', decode_b64jose('Zm9v', size=3))
self.assertRaises(
errors.DeserializationError, decode_b64jose, 'Zm9v', size=2)
self.assertRaises(
errors.DeserializationError, decode_b64jose, 'Zm9v', size=4)
decoded = decode_b64jose(u'eA')
self.assertTrue(isinstance(decoded, six.binary_type))
self.assertEqual(b'x', decoded)
def test_decode_b64_jose_minimum_size(self):
def test_decode_b64jose_padding_error(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual('foo', decode_b64jose('Zm9v', size=3, minimum=True))
self.assertEqual('foo', decode_b64jose('Zm9v', size=2, minimum=True))
self.assertRaises(errors.DeserializationError, decode_b64jose, u'x')
def test_decode_b64jose_size(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual(b'foo', decode_b64jose(u'Zm9v', size=3))
self.assertRaises(
errors.DeserializationError, decode_b64jose, u'Zm9v', size=2)
self.assertRaises(
errors.DeserializationError, decode_b64jose, u'Zm9v', size=4)
def test_decode_b64jose_minimum_size(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual(b'foo', decode_b64jose(u'Zm9v', size=3, minimum=True))
self.assertEqual(b'foo', decode_b64jose(u'Zm9v', size=2, minimum=True))
self.assertRaises(errors.DeserializationError, decode_b64jose,
'Zm9v', size=4, minimum=True)
u'Zm9v', size=4, minimum=True)
def test_encode_hex16(self):
from acme.jose.json_util import encode_hex16
encoded = encode_hex16(b'foo')
self.assertEqual(u'666f6f', encoded)
self.assertTrue(isinstance(encoded, six.string_types))
def test_decode_hex16(self):
from acme.jose.json_util import decode_hex16
self.assertEqual('foo', decode_hex16('666f6f'))
decoded = decode_hex16(u'666f6f')
self.assertEqual(b'foo', decoded)
self.assertTrue(isinstance(decoded, six.binary_type))
def test_decode_hex16_minimum_size(self):
from acme.jose.json_util import decode_hex16
self.assertEqual('foo', decode_hex16('666f6f', size=3, minimum=True))
self.assertEqual('foo', decode_hex16('666f6f', size=2, minimum=True))
self.assertEqual(b'foo', decode_hex16(u'666f6f', size=3, minimum=True))
self.assertEqual(b'foo', decode_hex16(u'666f6f', size=2, minimum=True))
self.assertRaises(errors.DeserializationError, decode_hex16,
'666f6f', size=4, minimum=True)
u'666f6f', size=4, minimum=True)
def test_decode_hex16_odd_length(self):
from acme.jose.json_util import decode_hex16
self.assertRaises(errors.DeserializationError, decode_hex16, 'x')
self.assertRaises(errors.DeserializationError, decode_hex16, u'x')
def test_encode_cert(self):
from acme.jose.json_util import encode_cert
@ -278,7 +294,7 @@ class DeEncodersTest(unittest.TestCase):
cert = decode_cert(self.b64_cert)
self.assertTrue(isinstance(cert, util.ComparableX509))
self.assertEqual(cert, CERT)
self.assertRaises(errors.DeserializationError, decode_cert, '')
self.assertRaises(errors.DeserializationError, decode_cert, u'')
def test_encode_csr(self):
from acme.jose.json_util import encode_csr
@ -289,7 +305,7 @@ class DeEncodersTest(unittest.TestCase):
csr = decode_csr(self.b64_csr)
self.assertTrue(isinstance(csr, util.ComparableX509))
self.assertEqual(csr, CSR)
self.assertRaises(errors.DeserializationError, decode_csr, '')
self.assertRaises(errors.DeserializationError, decode_csr, u'')
class TypedJSONObjectWithFieldsTest(unittest.TestCase):

View file

@ -4,6 +4,7 @@ https://tools.ietf.org/html/draft-ietf-jose-json-web-algorithms-40
"""
import abc
import collections
import logging
import cryptography.exceptions
@ -27,7 +28,7 @@ class JWA(interfaces.JSONDeSerializable): # pylint: disable=abstract-method
"""JSON Web Algorithm."""
class JWASignature(JWA):
class JWASignature(JWA, collections.Hashable):
"""JSON Web Signature Algorithm."""
SIGNATURES = {}
@ -39,6 +40,9 @@ class JWASignature(JWA):
return NotImplemented
return self.name == other.name
def __hash__(self):
return hash((self.__class__, self.name))
def __ne__(self, other):
return not self == other

View file

@ -1,19 +1,14 @@
"""Tests for acme.jose.jwa."""
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme import test_util
from acme.jose import errors
from acme.jose import jwk_test
RSA1024_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa1024_key.pem')),
password=None, backend=default_backend())
RSA256_KEY = test_util.load_rsa_private_key('rsa256_key.pem')
RSA512_KEY = test_util.load_rsa_private_key('rsa512_key.pem')
RSA1024_KEY = test_util.load_rsa_private_key('rsa1024_key.pem')
class JWASignatureTest(unittest.TestCase):
@ -63,12 +58,12 @@ class JWAHSTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def test_it(self):
from acme.jose.jwa import HS256
sig = (
"\xceR\xea\xcd\x94\xab\xcf\xfb\xe0\xacA.:\x1a'\x08i\xe2\xc4"
"\r\x85+\x0e\x85\xaeUZ\xd4\xb3\x97zO"
b"\xceR\xea\xcd\x94\xab\xcf\xfb\xe0\xacA.:\x1a'\x08i\xe2\xc4"
b"\r\x85+\x0e\x85\xaeUZ\xd4\xb3\x97zO"
)
self.assertEqual(HS256.sign('some key', 'foo'), sig)
self.assertTrue(HS256.verify('some key', 'foo', sig) is True)
self.assertTrue(HS256.verify('some key', 'foo', sig + '!') is False)
self.assertEqual(HS256.sign(b'some key', b'foo'), sig)
self.assertTrue(HS256.verify(b'some key', b'foo', sig) is True)
self.assertTrue(HS256.verify(b'some key', b'foo', sig + b'!') is False)
class JWARSTest(unittest.TestCase):
@ -76,33 +71,33 @@ class JWARSTest(unittest.TestCase):
def test_sign_no_private_part(self):
from acme.jose.jwa import RS256
self.assertRaises(
errors.Error, RS256.sign, jwk_test.RSA512_KEY.public_key(), 'foo')
errors.Error, RS256.sign, RSA512_KEY.public_key(), b'foo')
def test_sign_key_too_small(self):
from acme.jose.jwa import RS256
from acme.jose.jwa import PS256
self.assertRaises(errors.Error, RS256.sign, jwk_test.RSA256_KEY, 'foo')
self.assertRaises(errors.Error, PS256.sign, jwk_test.RSA256_KEY, 'foo')
self.assertRaises(errors.Error, RS256.sign, RSA256_KEY, b'foo')
self.assertRaises(errors.Error, PS256.sign, RSA256_KEY, b'foo')
def test_rs(self):
from acme.jose.jwa import RS256
sig = (
'|\xc6\xb2\xa4\xab(\x87\x99\xfa*:\xea\xf8\xa0N&}\x9f\x0f\xc0O'
'\xc6t\xa3\xe6\xfa\xbb"\x15Y\x80Y\xe0\x81\xb8\x88)\xba\x0c\x9c'
'\xa4\x99\x1e\x19&\xd8\xc7\x99S\x97\xfc\x85\x0cOV\xe6\x07\x99'
'\xd2\xb9.>}\xfd'
b'|\xc6\xb2\xa4\xab(\x87\x99\xfa*:\xea\xf8\xa0N&}\x9f\x0f\xc0O'
b'\xc6t\xa3\xe6\xfa\xbb"\x15Y\x80Y\xe0\x81\xb8\x88)\xba\x0c\x9c'
b'\xa4\x99\x1e\x19&\xd8\xc7\x99S\x97\xfc\x85\x0cOV\xe6\x07\x99'
b'\xd2\xb9.>}\xfd'
)
self.assertEqual(RS256.sign(jwk_test.RSA512_KEY, 'foo'), sig)
self.assertTrue(RS256.verify(
jwk_test.RSA512_KEY.public_key(), 'foo', sig))
self.assertEqual(RS256.sign(RSA512_KEY, b'foo'), sig)
self.assertTrue(RS256.verify(RSA512_KEY.public_key(), b'foo', sig))
self.assertFalse(RS256.verify(
jwk_test.RSA512_KEY.public_key(), 'foo', sig + '!'))
RSA512_KEY.public_key(), b'foo', sig + b'!'))
def test_ps(self):
from acme.jose.jwa import PS256
sig = PS256.sign(RSA1024_KEY, 'foo')
self.assertTrue(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig))
self.assertFalse(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig + '!'))
sig = PS256.sign(RSA1024_KEY, b'foo')
self.assertTrue(PS256.verify(RSA1024_KEY.public_key(), b'foo', sig))
self.assertFalse(PS256.verify(
RSA1024_KEY.public_key(), b'foo', sig + b'!'))
if __name__ == '__main__':

View file

@ -9,7 +9,8 @@ from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import rsa
from acme.jose import b64
import six
from acme.jose import errors
from acme.jose import json_util
from acme.jose import util
@ -87,7 +88,7 @@ class JWK(json_util.TypedJSONObjectWithFields):
key, cls.cryptography_key_types):
raise errors.Error("Unable to deserialize {0} into {1}".format(
key.__class__, cls.__class__))
for jwk_cls in cls.TYPES.itervalues():
for jwk_cls in six.itervalues(cls.TYPES):
if isinstance(key, jwk_cls.cryptography_key_types):
return jwk_cls(key=key)
raise errors.Error("Unsupported algorithm: {0}".format(key.__class__))
@ -127,11 +128,11 @@ class JWKOct(JWK):
# algorithm intended to be used with the key, unless the
# application uses another means or convention to determine
# the algorithm used.
return {'k': self.key}
return {'k': json_util.encode_b64jose(self.key)}
@classmethod
def fields_from_json(cls, jobj):
return cls(key=jobj['k'])
return cls(key=json_util.decode_b64jose(jobj['k']))
def public_key(self):
return self
@ -158,18 +159,25 @@ class JWKRSA(JWK):
@classmethod
def _encode_param(cls, data):
"""Encode Base64urlUInt.
:type data: long
:rtype: unicode
"""
def _leading_zeros(arg):
if len(arg) % 2:
return '0' + arg
return arg
return b64.b64encode(binascii.unhexlify(
return json_util.encode_b64jose(binascii.unhexlify(
_leading_zeros(hex(data)[2:].rstrip('L'))))
@classmethod
def _decode_param(cls, data):
"""Decode Base64urlUInt."""
try:
return long(binascii.hexlify(json_util.decode_b64jose(data)), 16)
return int(binascii.hexlify(json_util.decode_b64jose(data)), 16)
except ValueError: # invalid literal for long() with base 16
raise errors.DeserializationError()
@ -198,17 +206,20 @@ class JWKRSA(JWK):
raise errors.Error(
"Some private parameters are missing: {0}".format(
all_params))
p, q, dp, dq, qi = tuple(cls._decode_param(x) for x in all_params)
p, q, dp, dq, qi = tuple(
cls._decode_param(x) for x in all_params)
# TODO: check for oth
else:
p, q = rsa.rsa_recover_prime_factors(n, e, d) # cryptography>=0.8
# cryptography>=0.8
p, q = rsa.rsa_recover_prime_factors(n, e, d)
dp = rsa.rsa_crt_dmp1(d, p)
dq = rsa.rsa_crt_dmq1(d, q)
qi = rsa.rsa_crt_iqmp(p, q)
key = rsa.RSAPrivateNumbers(
p, q, d, dp, dq, qi, public_numbers).private_key(default_backend())
p, q, d, dp, dq, qi, public_numbers).private_key(
default_backend())
return cls(key=key)
@ -234,4 +245,4 @@ class JWKRSA(JWK):
'qi': private.iqmp,
}
return dict((key, self._encode_param(value))
for key, value in params.iteritems())
for key, value in six.iteritems(params))

View file

@ -1,25 +1,16 @@
"""Tests for acme.jose.jwk."""
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme import test_util
from acme.jose import errors
from acme.jose import json_util
from acme.jose import util
DSA_PEM = pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', 'dsa512_key.pem'))
RSA256_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem')),
password=None, backend=default_backend())
RSA512_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend())
DSA_PEM = test_util.load_vector('dsa512_key.pem')
RSA256_KEY = test_util.load_rsa_private_key('rsa256_key.pem')
RSA512_KEY = test_util.load_rsa_private_key('rsa512_key.pem')
class JWKTest(unittest.TestCase):
@ -39,8 +30,8 @@ class JWKOctTest(unittest.TestCase):
def setUp(self):
from acme.jose.jwk import JWKOct
self.jwk = JWKOct(key='foo')
self.jobj = {'kty': 'oct', 'k': 'foo'}
self.jwk = JWKOct(key=b'foo')
self.jobj = {'kty': 'oct', 'k': json_util.encode_b64jose(b'foo')}
def test_to_partial_json(self):
self.assertEqual(self.jwk.to_partial_json(), self.jobj)
@ -55,7 +46,7 @@ class JWKOctTest(unittest.TestCase):
def test_load(self):
from acme.jose.jwk import JWKOct
self.assertEqual(self.jwk, JWKOct.load('foo'))
self.assertEqual(self.jwk, JWKOct.load(b'foo'))
def test_public_key(self):
self.assertTrue(self.jwk.public_key() is self.jwk)
@ -73,8 +64,9 @@ class JWKRSATest(unittest.TestCase):
'e': 'AQAB',
'n': 'm2Fylv-Uz7trgTW8EBHP3FQSMeZs2GNQ6VRo1sIVJEk',
}
self.jwk256_comparable = JWKRSA(key=util.ComparableRSAKey(
RSA256_KEY.public_key()))
# pylint: disable=protected-access
self.jwk256_not_comparable = JWKRSA(
key=RSA256_KEY.public_key()._wrapped)
self.jwk512 = JWKRSA(key=RSA512_KEY.public_key())
self.jwk512json = {
'kty': 'RSA',
@ -96,9 +88,16 @@ class JWKRSATest(unittest.TestCase):
'qi': 'oi45cEkbVoJjAbnQpFY87Q',
})
def test_init_comparable(self):
self.assertTrue(isinstance(self.jwk256.key, util.ComparableRSAKey))
self.assertEqual(self.jwk256, self.jwk256_comparable)
def test_init_auto_comparable(self):
self.assertTrue(isinstance(
self.jwk256_not_comparable.key, util.ComparableRSAKey))
self.assertEqual(self.jwk256, self.jwk256_not_comparable)
def test_encode_param_zero(self):
from acme.jose.jwk import JWKRSA
# pylint: disable=protected-access
# TODO: move encode/decode _param to separate class
self.assertEqual('AA', JWKRSA._encode_param(0))
def test_equals(self):
self.assertEqual(self.jwk256, self.jwk256)
@ -110,9 +109,8 @@ class JWKRSATest(unittest.TestCase):
def test_load(self):
from acme.jose.jwk import JWKRSA
self.assertEqual(
self.private, JWKRSA.load(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
self.assertEqual(self.private, JWKRSA.load(
test_util.load_vector('rsa256_key.pem')))
def test_public_key(self):
self.assertEqual(self.jwk256, self.private.public_key())

View file

@ -4,6 +4,7 @@ import base64
import sys
import OpenSSL
import six
from acme.jose import b64
from acme.jose import errors
@ -80,7 +81,7 @@ class Header(json_util.JSONObjectWithFields):
def not_omitted(self):
"""Fields that would not be omitted in the JSON object."""
return dict((name, getattr(self, name))
for name, field in self._fields.iteritems()
for name, field in six.iteritems(self._fields)
if not field.omit(getattr(self, name)))
def __add__(self, other):
@ -148,15 +149,22 @@ class Signature(json_util.JSONObjectWithFields):
header_cls = Header
__slots__ = ('combined',)
protected = json_util.Field(
'protected', omitempty=True, default='',
decoder=json_util.decode_b64jose, encoder=b64.b64encode) # TODO: utf-8?
protected = json_util.Field('protected', omitempty=True, default='')
header = json_util.Field(
'header', omitempty=True, default=header_cls(),
decoder=header_cls.from_json)
signature = json_util.Field(
'signature', decoder=json_util.decode_b64jose,
encoder=b64.b64encode)
encoder=json_util.encode_b64jose)
@protected.encoder
def protected(value): # pylint: disable=missing-docstring,no-self-argument
# wrong type guess (Signature, not bytes) | pylint: disable=no-member
return json_util.encode_b64jose(value.encode('utf-8'))
@protected.decoder
def protected(value): # pylint: disable=missing-docstring,no-self-argument
return json_util.decode_b64jose(value).decode('utf-8')
def __init__(self, **kwargs):
if 'combined' not in kwargs:
@ -178,6 +186,11 @@ class Signature(json_util.JSONObjectWithFields):
kwargs['combined'] = combined
return kwargs
@classmethod
def _msg(cls, protected, payload):
return (b64.b64encode(protected.encode('utf-8')) + b'.' +
b64.b64encode(payload))
def verify(self, payload, key=None):
"""Verify.
@ -188,8 +201,7 @@ class Signature(json_util.JSONObjectWithFields):
key = self.combined.find_key() if key is None else key
return self.combined.alg.verify(
key=key.key, sig=self.signature,
msg=(b64.b64encode(self.protected) + '.' +
b64.b64encode(payload)))
msg=self._msg(self.protected, payload))
@classmethod
def sign(cls, payload, key, alg, include_jwk=True,
@ -220,8 +232,7 @@ class Signature(json_util.JSONObjectWithFields):
protected = ''
header = cls.header_cls(**header_params) # pylint: disable=star-args
signature = alg.sign(key.key, b64.b64encode(protected)
+ '.' + b64.b64encode(payload))
signature = alg.sign(key.key, cls._msg(protected, payload))
return cls(protected=protected, header=header, signature=signature)
@ -244,7 +255,7 @@ class JWS(json_util.JSONObjectWithFields):
"""JSON Web Signature.
:ivar str payload: JWS Payload.
:ivar str signaturea: JWS Signatures.
:ivar str signature: JWS Signatures.
"""
__slots__ = ('payload', 'signatures')
@ -272,33 +283,45 @@ class JWS(json_util.JSONObjectWithFields):
return self.signatures[0]
def to_compact(self):
"""Compact serialization."""
"""Compact serialization.
:rtype: bytes
"""
assert len(self.signatures) == 1
assert 'alg' not in self.signature.header.not_omitted()
# ... it must be in protected
return '{0}.{1}.{2}'.format(
b64.b64encode(self.signature.protected),
b64.b64encode(self.payload),
return (
b64.b64encode(self.signature.protected.encode('utf-8'))
+ b'.' +
b64.b64encode(self.payload)
+ b'.' +
b64.b64encode(self.signature.signature))
@classmethod
def from_compact(cls, compact):
"""Compact deserialization."""
"""Compact deserialization.
:param bytes compact:
"""
try:
protected, payload, signature = compact.split('.')
protected, payload, signature = compact.split(b'.')
except ValueError:
raise errors.DeserializationError(
'Compact JWS serialization should comprise of exactly'
' 3 dot-separated components')
sig = cls.signature_cls(protected=json_util.decode_b64jose(protected),
signature=json_util.decode_b64jose(signature))
return cls(payload=json_util.decode_b64jose(payload), signatures=(sig,))
sig = cls.signature_cls(
protected=b64.b64decode(protected).decode('utf-8'),
signature=b64.b64decode(signature))
return cls(payload=b64.b64decode(payload), signatures=(sig,))
def to_partial_json(self, flat=True): # pylint: disable=arguments-differ
assert self.signatures
payload = b64.b64encode(self.payload)
payload = json_util.encode_b64jose(self.payload)
if flat and len(self.signatures) == 1:
ret = self.signatures[0].to_partial_json()
@ -329,34 +352,36 @@ class CLI(object):
def sign(cls, args):
"""Sign."""
key = args.alg.kty.load(args.key.read())
args.key.close()
if args.protect is None:
args.protect = []
if args.compact:
args.protect.append('alg')
sig = JWS.sign(payload=sys.stdin.read(), key=key, alg=args.alg,
sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
protect=set(args.protect))
if args.compact:
print sig.to_compact()
six.print_(sig.to_compact().decode('utf-8'))
else: # JSON
print sig.json_dumps_pretty()
six.print_(sig.json_dumps_pretty())
@classmethod
def verify(cls, args):
"""Verify."""
if args.compact:
sig = JWS.from_compact(sys.stdin.read())
sig = JWS.from_compact(sys.stdin.read().encode())
else: # JSON
try:
sig = JWS.json_loads(sys.stdin.read())
except errors.Error as error:
print error
six.print_(error)
return -1
if args.key is not None:
assert args.kty is not None
key = args.kty.load(args.key.read()).public_key()
args.key.close()
else:
key = None
@ -387,7 +412,7 @@ class CLI(object):
parser_sign = subparsers.add_parser('sign')
parser_sign.set_defaults(func=cls.sign)
parser_sign.add_argument(
'-k', '--key', type=argparse.FileType(), required=True)
'-k', '--key', type=argparse.FileType('rb'), required=True)
parser_sign.add_argument(
'-a', '--alg', type=cls._alg_type, default=jwa.RS256)
parser_sign.add_argument(
@ -396,7 +421,7 @@ class CLI(object):
parser_verify = subparsers.add_parser('verify')
parser_verify.set_defaults(func=cls.verify)
parser_verify.add_argument(
'-k', '--key', type=argparse.FileType(), required=False)
'-k', '--key', type=argparse.FileType('rb'), required=False)
parser_verify.add_argument(
'--kty', type=cls._kty_type, required=False)

View file

@ -1,28 +1,20 @@
"""Tests for acme.jose.jws."""
import base64
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
from acme.jose import b64
from acme import test_util
from acme.jose import errors
from acme.jose import json_util
from acme.jose import jwa
from acme.jose import jwk
from acme.jose import util
CERT = util.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
'letsencrypt.tests', 'testdata/cert.pem')))
RSA512_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend())
CERT = test_util.load_cert('cert.pem')
KEY = jwk.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
class MediaTypeTest(unittest.TestCase):
@ -81,7 +73,7 @@ class HeaderTest(unittest.TestCase):
self.assertEqual(jobj, {'x5c': [cert_b64, cert_b64]})
self.assertEqual(header, Header.from_json(jobj))
jobj['x5c'][0] = base64.b64encode(
'xxx' + OpenSSL.crypto.dump_certificate(
b'xxx' + OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_ASN1, CERT))
self.assertRaises(errors.DeserializationError, Header.from_json, jobj)
@ -98,7 +90,7 @@ class SignatureTest(unittest.TestCase):
from acme.jose.jws import Header
from acme.jose.jws import Signature
self.assertEqual(
Signature(signature='foo', header=Header(alg=jwa.RS256)),
Signature(signature=b'foo', header=Header(alg=jwa.RS256)),
Signature.from_json(
{'signature': 'Zm9v', 'header': {'alg': 'RS256'}}))
@ -112,17 +104,17 @@ class JWSTest(unittest.TestCase):
"""Tests for acme.jose.jws.JWS."""
def setUp(self):
self.privkey = jwk.JWKRSA(key=RSA512_KEY)
self.privkey = KEY
self.pubkey = self.privkey.public_key()
from acme.jose.jws import JWS
self.unprotected = JWS.sign(
payload='foo', key=self.privkey, alg=jwa.RS256)
payload=b'foo', key=self.privkey, alg=jwa.RS256)
self.protected = JWS.sign(
payload='foo', key=self.privkey, alg=jwa.RS256,
payload=b'foo', key=self.privkey, alg=jwa.RS256,
protect=frozenset(['jwk', 'alg']))
self.mixed = JWS.sign(
payload='foo', key=self.privkey, alg=jwa.RS256,
payload=b'foo', key=self.privkey, alg=jwa.RS256,
protect=frozenset(['alg']))
def test_pubkey_jwk(self):
@ -142,8 +134,8 @@ class JWSTest(unittest.TestCase):
def test_compact_lost_unprotected(self):
compact = self.mixed.to_compact()
self.assertEqual(
'eyJhbGciOiAiUlMyNTYifQ.Zm9v.OHdxFVj73l5LpxbFp1AmYX4yJM0Pyb'
'_893n1zQjpim_eLS5J1F61lkvrCrCDErTEJnBGOGesJ72M7b6Ve1cAJA',
b'eyJhbGciOiAiUlMyNTYifQ.Zm9v.OHdxFVj73l5LpxbFp1AmYX4yJM0Pyb'
b'_893n1zQjpim_eLS5J1F61lkvrCrCDErTEJnBGOGesJ72M7b6Ve1cAJA',
compact)
from acme.jose.jws import JWS
@ -155,7 +147,7 @@ class JWSTest(unittest.TestCase):
def test_from_compact_missing_components(self):
from acme.jose.jws import JWS
self.assertRaises(errors.DeserializationError, JWS.from_compact, '.')
self.assertRaises(errors.DeserializationError, JWS.from_compact, b'.')
def test_json_omitempty(self):
protected_jobj = self.protected.to_partial_json(flat=True)
@ -172,10 +164,12 @@ class JWSTest(unittest.TestCase):
def test_json_flat(self):
jobj_to = {
'signature': b64.b64encode(self.mixed.signature.signature),
'payload': b64.b64encode('foo'),
'signature': json_util.encode_b64jose(
self.mixed.signature.signature),
'payload': json_util.encode_b64jose(b'foo'),
'header': self.mixed.signature.header,
'protected': b64.b64encode(self.mixed.signature.protected),
'protected': json_util.encode_b64jose(
self.mixed.signature.protected.encode('utf-8')),
}
jobj_from = jobj_to.copy()
jobj_from['header'] = jobj_from['header'].to_json()
@ -187,7 +181,7 @@ class JWSTest(unittest.TestCase):
def test_json_not_flat(self):
jobj_to = {
'signatures': (self.mixed.signature,),
'payload': b64.b64encode('foo'),
'payload': json_util.encode_b64jose(b'foo'),
}
jobj_from = jobj_to.copy()
jobj_from['signatures'] = [jobj_to['signatures'][0].to_json()]
@ -209,8 +203,7 @@ class JWSTest(unittest.TestCase):
class CLITest(unittest.TestCase):
def setUp(self):
self.key_path = pkg_resources.resource_filename(
__name__, os.path.join('testdata', 'rsa512_key.pem'))
self.key_path = test_util.vector_path('rsa512_key.pem')
def test_unverified(self):
from acme.jose.jws import CLI

View file

@ -3,6 +3,7 @@ import collections
from cryptography.hazmat.primitives.asymmetric import rsa
import OpenSSL
import six
class abstractclassmethod(classmethod):
@ -156,7 +157,8 @@ class ImmutableMap(collections.Mapping, collections.Hashable):
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, ', '.join(
'{0}={1!r}'.format(key, value) for key, value in self.iteritems()))
'{0}={1!r}'.format(key, value)
for key, value in six.iteritems(self)))
class frozendict(collections.Mapping, collections.Hashable):
@ -174,7 +176,7 @@ class frozendict(collections.Mapping, collections.Hashable):
# TODO: support generators/iterators
object.__setattr__(self, '_items', items)
object.__setattr__(self, '_keys', tuple(sorted(items.iterkeys())))
object.__setattr__(self, '_keys', tuple(sorted(six.iterkeys(items))))
def __getitem__(self, key):
return self._items[key]
@ -185,8 +187,11 @@ class frozendict(collections.Mapping, collections.Hashable):
def __len__(self):
return len(self._items)
def _sorted_items(self):
return tuple((key, self[key]) for key in self._keys)
def __hash__(self):
return hash(tuple((key, value) for key, value in self.items()))
return hash(self._sorted_items())
def __getattr__(self, name):
try:
@ -198,5 +203,5 @@ class frozendict(collections.Mapping, collections.Hashable):
raise AttributeError("can't set attribute")
def __repr__(self):
return 'frozendict({0})'.format(', '.join(
'{0}={1!r}'.format(key, value) for key, value in self.iteritems()))
return 'frozendict({0})'.format(', '.join('{0}={1!r}'.format(
key, value) for key, value in self._sorted_items()))

View file

@ -1,31 +1,24 @@
"""Tests for acme.jose.util."""
import functools
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import OpenSSL
import six
from acme import test_util
class ComparableX509Test(unittest.TestCase):
"""Tests for acme.jose.util.ComparableX509."""
def setUp(self):
from acme.jose.util import ComparableX509
def _load(method, filename): # pylint: disable=missing-docstring
return ComparableX509(method(
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', filename))))
# test_util.load_{csr,cert} return ComparableX509
self.req1 = test_util.load_csr('csr.pem')
self.req2 = test_util.load_csr('csr.pem')
self.req_other = test_util.load_csr('csr-san.pem')
self.req1 = _load(OpenSSL.crypto.load_certificate_request, 'csr.pem')
self.req2 = _load(OpenSSL.crypto.load_certificate_request, 'csr.pem')
self.req_other = _load(OpenSSL.crypto.load_certificate_request, 'csr-san.pem')
self.cert1 = _load(OpenSSL.crypto.load_certificate, 'cert.pem')
self.cert2 = _load(OpenSSL.crypto.load_certificate, 'cert.pem')
self.cert_other = _load(OpenSSL.crypto.load_certificate, 'cert-san.pem')
self.cert1 = test_util.load_cert('cert.pem')
self.cert2 = test_util.load_cert('cert.pem')
self.cert_other = test_util.load_cert('cert-san.pem')
def test_eq(self):
self.assertEqual(self.req1, self.req2)
@ -56,19 +49,10 @@ class ComparableRSAKeyTest(unittest.TestCase):
"""Tests for acme.jose.util.ComparableRSAKey."""
def setUp(self):
from acme.jose.util import ComparableRSAKey
backend = default_backend()
def load_key(): # pylint: disable=missing-docstring
return ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem')),
password=None, backend=backend))
self.key = load_key()
self.key_same = load_key()
self.key2 = ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=backend))
# test_utl.load_rsa_private_key return ComparableRSAKey
self.key = test_util.load_rsa_private_key('rsa256_key.pem')
self.key_same = test_util.load_rsa_private_key('rsa256_key.pem')
self.key2 = test_util.load_rsa_private_key('rsa512_key.pem')
def test_getattr_proxy(self):
self.assertEqual(256, self.key.key_size)
@ -186,13 +170,13 @@ class frozendictTest(unittest.TestCase): # pylint: disable=invalid-name
def test_init_other_raises_type_error(self):
from acme.jose.util import frozendict
# specifically fail for generators...
self.assertRaises(TypeError, frozendict, {'a': 'b'}.iteritems())
self.assertRaises(TypeError, frozendict, six.iteritems({'a': 'b'}))
def test_len(self):
self.assertEqual(2, len(self.fdict))
def test_hash(self):
self.assertEqual(1278944519403861804, hash(self.fdict))
self.assertTrue(isinstance(hash(self.fdict), int))
def test_getattr_proxy(self):
self.assertEqual(1, self.fdict.x)

View file

@ -1,5 +1,4 @@
"""ACME JOSE JWS."""
from acme import errors
from acme import jose
@ -9,29 +8,15 @@ class Header(jose.Header):
.. todo:: Implement ``acmePath``.
"""
nonce = jose.Field('nonce', omitempty=True)
@classmethod
def validate_nonce(cls, nonce):
"""Validate nonce.
:returns: ``None`` if ``nonce`` is valid, decoding errors otherwise.
"""
try:
jose.b64decode(nonce)
except (ValueError, TypeError) as error:
return error
else:
return None
nonce = jose.Field('nonce', omitempty=True, encoder=jose.encode_b64jose)
@nonce.decoder
def nonce(value): # pylint: disable=missing-docstring,no-self-argument
error = Header.validate_nonce(value)
if error is not None:
try:
return jose.decode_b64jose(value)
except jose.DeserializationError as error:
# TODO: custom error
raise errors.Error("Invalid nonce: {0}".format(error))
return value
raise jose.DeserializationError("Invalid nonce: {0}".format(error))
class Signature(jose.Signature):

52
acme/acme/jws_test.py Normal file
View file

@ -0,0 +1,52 @@
"""Tests for acme.jws."""
import unittest
from acme import jose
from acme import test_util
KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
class HeaderTest(unittest.TestCase):
"""Tests for acme.jws.Header."""
good_nonce = jose.encode_b64jose(b'foo')
wrong_nonce = u'F'
# Following just makes sure wrong_nonce is wrong
try:
jose.b64decode(wrong_nonce)
except (ValueError, TypeError):
assert True
else:
assert False # pragma: no cover
def test_nonce_decoder(self):
from acme.jws import Header
nonce_field = Header._fields['nonce']
self.assertRaises(
jose.DeserializationError, nonce_field.decode, self.wrong_nonce)
self.assertEqual(b'foo', nonce_field.decode(self.good_nonce))
class JWSTest(unittest.TestCase):
"""Tests for acme.jws.JWS."""
def setUp(self):
self.privkey = KEY
self.pubkey = self.privkey.public_key()
self.nonce = jose.b64encode(b'Nonce')
def test_it(self):
from acme.jws import JWS
jws = JWS.sign(payload=b'foo', key=self.privkey,
alg=jose.RS256, nonce=self.nonce)
self.assertEqual(jws.signature.combined.nonce, self.nonce)
# TODO: check that nonce is in protected header
self.assertEqual(jws, JWS.from_json(jws.to_json()))
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,5 +1,7 @@
"""ACME protocol messages."""
import urlparse
import collections
from six.moves.urllib import parse as urllib_parse # pylint: disable=import-error
from acme import challenges
from acme import fields
@ -12,6 +14,10 @@ class Error(jose.JSONObjectWithFields, Exception):
https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00
:ivar unicode typ:
:ivar unicode title:
:ivar unicode detail:
"""
ERROR_TYPE_NAMESPACE = 'urn:acme:error:'
ERROR_TYPE_DESCRIPTIONS = {
@ -49,7 +55,11 @@ class Error(jose.JSONObjectWithFields, Exception):
@property
def description(self):
"""Hardcoded error description based on its type."""
"""Hardcoded error description based on its type.
:rtype: unicode
"""
return self.ERROR_TYPE_DESCRIPTIONS[self.typ]
def __str__(self):
@ -59,7 +69,7 @@ class Error(jose.JSONObjectWithFields, Exception):
return str(self.detail)
class _Constant(jose.JSONDeSerializable):
class _Constant(jose.JSONDeSerializable, collections.Hashable):
"""ACME constant."""
__slots__ = ('name',)
POSSIBLE_NAMES = NotImplemented
@ -84,6 +94,9 @@ class _Constant(jose.JSONDeSerializable):
def __eq__(self, other):
return isinstance(other, type(self)) and other.name == self.name
def __hash__(self):
return hash((self.__class__, self.name))
def __ne__(self, other):
return not self == other
@ -108,7 +121,8 @@ IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
class Identifier(jose.JSONObjectWithFields):
"""ACME identifier.
:ivar acme.messages.IdentifierType typ:
:ivar IdentifierType typ:
:ivar unicode value:
"""
typ = jose.Field('type', decoder=IdentifierType.from_json)
@ -127,7 +141,7 @@ class Resource(jose.JSONObjectWithFields):
class ResourceWithURI(Resource):
"""ACME Resource with URI.
:ivar str uri: Location of the resource.
:ivar unicode uri: Location of the resource.
"""
uri = jose.Field('uri') # no ChallengeResource.uri
@ -141,7 +155,10 @@ class Registration(interfaces.ClientRequestableResource, ResourceBody):
"""Registration Resource Body.
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec
:ivar tuple contact: Contact information following ACME spec,
`tuple` of `unicode`.
:ivar unicode recovery_token:
:ivar unicode agreement:
"""
resource_type = 'new-reg'
@ -188,8 +205,8 @@ class RegistrationResource(interfaces.ClientRequestableResource,
"""Registration Resource.
:ivar acme.messages.Registration body:
:ivar str new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar str terms_of_service: URL for the CA TOS.
:ivar unicode new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar unicode terms_of_service: URL for the CA TOS.
"""
resource_type = 'reg'
@ -212,6 +229,7 @@ class ChallengeBody(ResourceBody):
call ``challb.x`` to get ``challb.chall.x`` contents.
:ivar acme.messages.Status status:
:ivar datetime.datetime validated:
:ivar Error error:
"""
__slots__ = ('chall',)
@ -241,7 +259,7 @@ class ChallengeResource(Resource):
"""Challenge Resource.
:ivar acme.messages.ChallengeBody body:
:ivar str authzr_uri: URI found in the 'up' ``Link`` header.
:ivar unicode authzr_uri: URI found in the 'up' ``Link`` header.
"""
body = jose.Field('body', decoder=ChallengeBody.from_json)
@ -261,8 +279,6 @@ class Authorization(interfaces.ClientRequestableResource, ResourceBody):
:ivar list challenges: `list` of `.ChallengeBody`
:ivar tuple combinations: Challenge combinations (`tuple` of `tuple`
of `int`, as opposed to `list` of `list` from the spec).
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact:
:ivar acme.messages.Status status:
:ivar datetime.datetime expires:
@ -294,7 +310,7 @@ class AuthorizationResource(ResourceWithURI):
"""Authorization Resource.
:ivar acme.messages.Authorization body:
:ivar str new_cert_uri: URI found in the 'next' ``Link`` header
:ivar unicode new_cert_uri: URI found in the 'next' ``Link`` header
"""
body = jose.Field('body', decoder=Authorization.from_json)
@ -321,7 +337,7 @@ class CertificateResource(interfaces.ClientRequestableResource,
:ivar acme.jose.util.ComparableX509 body:
`OpenSSL.crypto.X509` wrapped in `.ComparableX509`
:ivar str cert_chain_uri: URI found in the 'up' ``Link`` header
:ivar unicode cert_chain_uri: URI found in the 'up' ``Link`` header
:ivar tuple authzrs: `tuple` of `AuthorizationResource`.
"""
@ -353,4 +369,4 @@ class Revocation(interfaces.ClientRequestableResource,
:param str base: New Registration Resource or server (root) URL.
"""
return urlparse.urljoin(base, cls.PATH)
return urllib_parse.urljoin(base, cls.PATH)

View file

@ -1,30 +1,16 @@
"""Tests for acme.messages."""
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
from acme import challenges
from acme import jose
from acme import test_util
CERT = jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'cert.der'))))
CSR = jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der'))))
KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend())
CERT = jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'cert.der'))))
CERT = test_util.load_cert('cert.der')
CSR = test_util.load_csr('csr.der')
KEY = test_util.load_rsa_private_key('rsa512_key.pem')
class ErrorTest(unittest.TestCase):

View file

@ -12,22 +12,20 @@ logger = logging.getLogger(__name__)
class Signature(jose.JSONObjectWithFields):
"""ACME signature.
:ivar str alg: Signature algorithm.
:ivar str sig: Signature.
:ivar str nonce: Nonce.
:ivar jwk: JWK.
:type jwk: :class:`JWK`
:ivar .JWASignature alg: Signature algorithm.
:ivar bytes sig: Signature.
:ivar bytes nonce: Nonce.
:ivar .JWK jwk: JWK.
"""
NONCE_SIZE = 16
"""Minimum size of nonce in bytes."""
alg = jose.Field('alg', decoder=jose.JWASignature.from_json)
sig = jose.Field('sig', encoder=jose.b64encode,
sig = jose.Field('sig', encoder=jose.encode_b64jose,
decoder=jose.decode_b64jose)
nonce = jose.Field(
'nonce', encoder=jose.b64encode, decoder=functools.partial(
'nonce', encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE, minimum=True))
jwk = jose.Field('jwk', decoder=jose.JWK.from_json)
@ -35,27 +33,26 @@ class Signature(jose.JSONObjectWithFields):
def from_msg(cls, msg, key, nonce=None, nonce_size=None, alg=jose.RS256):
"""Create signature with nonce prepended to the message.
.. todo:: Protect against crypto unicode errors... is this sufficient?
Do I need to escape?
:param str msg: Message to be signed.
:param bytes msg: Message to be signed.
:param key: Key used for signing.
:type key: `cryptography.hazmat.primitives.assymetric.rsa.RSAPrivateKey`
(optionally wrapped in `.ComparableRSAKey`).
:param str nonce: Nonce to be used. If None, nonce of
:param bytes nonce: Nonce to be used. If None, nonce of
``nonce_size`` will be randomly generated.
:param int nonce_size: Size of the automatically generated nonce.
Defaults to :const:`NONCE_SIZE`.
:param .JWASignature alg:
"""
nonce_size = cls.NONCE_SIZE if nonce_size is None else nonce_size
nonce = os.urandom(nonce_size) if nonce is None else nonce
msg_with_nonce = nonce + msg
sig = alg.sign(key, nonce + msg)
logger.debug('%s signed as %s', msg_with_nonce, sig)
logger.debug('%r signed as %r', msg_with_nonce, sig)
return cls(alg=alg, sig=sig, nonce=nonce,
jwk=alg.kty(key=key.public_key()))
@ -63,7 +60,7 @@ class Signature(jose.JSONObjectWithFields):
def verify(self, msg):
"""Verify the signature.
:param str msg: Message that was used in signing.
:param bytes msg: Message that was used in signing.
"""
# self.alg is not Field, but JWA | pylint: disable=no-member

View file

@ -1,18 +1,11 @@
"""Tests for acme.sig."""
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme import jose
from acme import test_util
KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend())
KEY = test_util.load_rsa_private_key('rsa512_key.pem')
class SignatureTest(unittest.TestCase):
@ -20,12 +13,12 @@ class SignatureTest(unittest.TestCase):
"""Tests for acme.sig.Signature."""
def setUp(self):
self.msg = 'message'
self.sig = ('IC\xd8*\xe7\x14\x9e\x19S\xb7\xcf\xec3\x12\xe2\x8a\x03'
'\x98u\xff\xf0\x94\xe2\xd7<\x8f\xa8\xed\xa4KN\xc3\xaa'
'\xb9X\xc3w\xaa\xc0_\xd0\x05$y>l#\x10<\x96\xd2\xcdr\xa3'
'\x1b\xa1\xf5!f\xef\xc64\xb6\x13')
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.msg = b'message'
self.sig = (b'IC\xd8*\xe7\x14\x9e\x19S\xb7\xcf\xec3\x12\xe2\x8a\x03'
b'\x98u\xff\xf0\x94\xe2\xd7<\x8f\xa8\xed\xa4KN\xc3\xaa'
b'\xb9X\xc3w\xaa\xc0_\xd0\x05$y>l#\x10<\x96\xd2\xcdr\xa3'
b'\x1b\xa1\xf5!f\xef\xc64\xb6\x13')
self.nonce = b'\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.alg = jose.RS256
self.jwk = jose.JWKRSA(key=KEY.public_key())
@ -61,7 +54,7 @@ class SignatureTest(unittest.TestCase):
self.assertTrue(self.signature.verify(self.msg))
def test_verify_bad_fails(self):
self.assertFalse(self.signature.verify(self.msg + 'x'))
self.assertFalse(self.signature.verify(self.msg + b'x'))
@classmethod
def _from_msg(cls, *args, **kwargs):

57
acme/acme/test_util.py Normal file
View file

@ -0,0 +1,57 @@
# Symlinked in letsencrypt/tests/test_util.py, casues duplicate-code
# warning that cannot be disabled locally.
"""Test utilities.
.. warning:: This module is not part of the public API.
"""
import os
import pkg_resources
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import OpenSSL
from acme import jose
def vector_path(*names):
"""Path to a test vector."""
return pkg_resources.resource_filename(
__name__, os.path.join('testdata', *names))
def load_vector(*names):
"""Load contents of a test vector."""
# luckily, resource_string opens file in binary mode
return pkg_resources.resource_string(
__name__, os.path.join('testdata', *names))
def _guess_loader(filename, loader_pem, loader_der):
_, ext = os.path.splitext(filename)
if ext.lower() == '.pem':
return loader_pem
elif ext.lower() == '.der':
return loader_der
else: # pragma: no cover
raise ValueError("Loader could not be recognized based on extension")
def load_cert(*names):
"""Load certificate."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return jose.ComparableX509(OpenSSL.crypto.load_certificate(
loader, load_vector(*names)))
def load_csr(*names):
"""Load certificate request."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
loader, load_vector(*names)))
def load_rsa_private_key(*names):
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))

View file

@ -1,3 +1,7 @@
In order for acme.test_util._guess_loader to work properly, make sure
to use appropriate extension for vector filenames: .pem for PEM and
.der for DER.
The following command has been used to generate test keys:
for x in 256 512 1024; do openssl genrsa -out rsa${k}_key.pem $k; done

14
acme/acme/testdata/cert-san.pem vendored Normal file
View file

@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICFjCCAcCgAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhMCVVMx
ETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoM
IlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4
YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIxODIyMzQ0NVowdzELMAkG
A1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3Ix
KzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxFDAS
BgNVBAMMC2V4YW1wbGUuY29tMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR
7R/drnBSQ/zfx1vQLHUbFLh1AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c
+pVE6K+EdE/twuUCAwEAAaM2MDQwCQYDVR0TBAIwADAnBgNVHREEIDAeggtleGFt
cGxlLmNvbYIPd3d3LmV4YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA0EASuvNKFTF
nTJsvnSXn52f4BMZJJ2id/kW7+r+FJRm+L20gKQ1aqq8d3e/lzRUrv5SMf1TAOe7
RDjyGMKy5ZgM2w==
-----END CERTIFICATE-----

13
acme/acme/testdata/cert.pem vendored Normal file
View file

@ -0,0 +1,13 @@
-----BEGIN CERTIFICATE-----
MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhMCVVMx
ETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoM
IlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4
YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIxODIyMzQ0NVowdzELMAkG
A1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3Ix
KzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxFDAS
BgNVBAMMC2V4YW1wbGUuY29tMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR
7R/drnBSQ/zfx1vQLHUbFLh1AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c
+pVE6K+EdE/twuUCAwEAATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksll
vr6zJepBH5fMndfk3XJp10jT6VE+14KNtjh02a56GoraAvJAT5/H67E8GvJ/ocNn
B/o=
-----END CERTIFICATE-----

10
acme/acme/testdata/csr-san.pem vendored Normal file
View file

@ -0,0 +1,10 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIBbjCCARgCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIw
EAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECwwWVW5pdmVy
c2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wXDANBgkqhkiG
9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD/N/HW9AsdRsUuHUBBBDlHwNlRd3f
p580rv2+6QWE30cWgdmJS86ObRz6lUTor4R0T+3C5QIDAQABoDowOAYJKoZIhvcN
AQkOMSswKTAnBgNVHREEIDAeggtleGFtcGxlLmNvbYIPd3d3LmV4YW1wbGUuY29t
MA0GCSqGSIb3DQEBCwUAA0EAZGBM8J1rRs7onFgtc76mOeoT1c3v0ZsEmxQfb2Wy
tmReY6X1N4cs38D9VSow+VMRu2LWkKvzS7RUFSaTaeQz1A==
-----END CERTIFICATE REQUEST-----

10
acme/acme/testdata/csr.pem vendored Normal file
View file

@ -0,0 +1,10 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIw
EAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECwwWVW5pdmVy
c2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wXDANBgkqhkiG
9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD/N/HW9AsdRsUuHUBBBDlHwNlRd3f
p580rv2+6QWE30cWgdmJS86ObRz6lUTor4R0T+3C5QIDAQABoCkwJwYJKoZIhvcN
AQkOMRowGDAWBgNVHREEDzANggtleGFtcGxlLmNvbTANBgkqhkiG9w0BAQsFAANB
AHJH/O6BtC9aGzEVCMGOZ7z9iIRHWSzr9x/bOzn7hLwsbXPAgO1QxEwL+X+4g20G
n9XBE1N9W6HCIEut2d8wACg=
-----END CERTIFICATE REQUEST-----

14
acme/acme/testdata/dsa512_key.pem vendored Normal file
View file

@ -0,0 +1,14 @@
-----BEGIN DSA PARAMETERS-----
MIGdAkEAwebEoGBfokKQeALHHnAZMQwYU35ILEBdV8oUmzv7qpSVUoHihyqfn6GC
OixAKSP8EJYcTilIqPbFbfFyOPlbLwIVANoFHEDiQgknAvKrG78pHzAJdQSPAkEA
qfka5Bnl+CeEMpzVZGrOVqZE/LFdZK9eT6YtWjzqtIkf3hwXUVxJsTnBG4xmrfvl
41pgNJpgu99YOYqPpS0g7A==
-----END DSA PARAMETERS-----
-----BEGIN DSA PRIVATE KEY-----
MIH5AgEAAkEAwebEoGBfokKQeALHHnAZMQwYU35ILEBdV8oUmzv7qpSVUoHihyqf
n6GCOixAKSP8EJYcTilIqPbFbfFyOPlbLwIVANoFHEDiQgknAvKrG78pHzAJdQSP
AkEAqfka5Bnl+CeEMpzVZGrOVqZE/LFdZK9eT6YtWjzqtIkf3hwXUVxJsTnBG4xm
rfvl41pgNJpgu99YOYqPpS0g7AJATQ2LUzjGQSM6UljcPY5I2OD9THkUR9kH2tth
zZd70UoI9btrVaTizgqYShuok94glSQNK0H92JgUk3scJPaAkAIVAMDn61h6vrCE
mNv063So6E+eYaIN
-----END DSA PRIVATE KEY-----

View file

@ -1,61 +0,0 @@
"""Tests for acme.jws."""
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme import errors
from acme import jose
RSA512_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend())
class HeaderTest(unittest.TestCase):
"""Tests for acme.jws.Header."""
good_nonce = jose.b64encode('foo')
wrong_nonce = 'F'
# Following just makes sure wrong_nonce is wrong
try:
jose.b64decode(wrong_nonce)
except (ValueError, TypeError):
assert True
else:
assert False # pragma: no cover
def test_validate_nonce(self):
from acme.jws import Header
self.assertTrue(Header.validate_nonce(self.good_nonce) is None)
self.assertFalse(Header.validate_nonce(self.wrong_nonce) is None)
def test_nonce_decoder(self):
from acme.jws import Header
nonce_field = Header._fields['nonce']
self.assertRaises(errors.Error, nonce_field.decode, self.wrong_nonce)
self.assertEqual(self.good_nonce, nonce_field.decode(self.good_nonce))
class JWSTest(unittest.TestCase):
"""Tests for acme.jws.JWS."""
def setUp(self):
self.privkey = jose.JWKRSA(key=RSA512_KEY)
self.pubkey = self.privkey.public_key()
self.nonce = jose.b64encode('Nonce')
def test_it(self):
from acme.jws import JWS
jws = JWS.sign(payload='foo', key=self.privkey,
alg=jose.RS256, nonce=self.nonce)
JWS.from_json(jws.to_json())
if __name__ == '__main__':
unittest.main() # pragma: no cover

40
acme/setup.py Normal file
View file

@ -0,0 +1,40 @@
from setuptools import setup
from setuptools import find_packages
install_requires = [
'argparse',
# load_pem_private/public_key (>=0.6)
# rsa_recover_prime_factors (>=0.8)
'cryptography>=0.8',
'mock<1.1.0', # py26
'pyrfc3339',
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
'PyOpenSSL',
'pytz',
'requests',
'six',
'werkzeug',
]
testing_extras = [
'nose',
'tox',
]
setup(
name='acme',
packages=find_packages(),
install_requires=install_requires,
extras_require={
'testing': testing_extras,
},
entry_points={
'console_scripts': [
'jws = acme.jose.jws:CLI.run',
],
},
test_suite='acme',
)

View file

@ -36,6 +36,8 @@ with codecs.open(init_fn, encoding='utf8') as fd:
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
sys.path.insert(0, os.path.abspath(os.path.join(here, '..')))
for pkg in 'acme', 'letsencrypt-apache', 'letsencrypt-nginx':
sys.path.insert(0, os.path.abspath(os.path.join(here, '..', pkg)))
# -- General configuration ------------------------------------------------

View file

@ -26,7 +26,7 @@ Install the development packages:
.. code-block:: shell
pip install -r requirements.txt -e .[dev,docs,testing]
pip install -r requirements.txt -e acme -e .[dev,docs,testing] -e letsencrypt-apache -e letsencrypt-nginx
.. note:: `-e` (short for `--editable`) turns on *editable mode* in
which any source code changes in the current working

View file

@ -105,10 +105,13 @@ Centos 7
Installation
============
.. "pip install acme" doesn't search for "acme" in cwd, just like "pip
install -e acme" does
.. code-block:: shell
virtualenv --no-site-packages -p python2 venv
./venv/bin/pip install -r requirements.txt .
./venv/bin/pip install -r requirements.txt acme/ . letsencrypt-apache/ letsencrypt-nginx/
.. warning:: Please do **not** use ``python setup.py install``. Please
do **not** attempt the installation commands as

View file

@ -41,7 +41,7 @@ authzr, authzr_response = acme.poll(authzr)
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der')))
'acme', os.path.join('testdata', 'csr.der')))
try:
acme.request_issuance(csr, (authzr,))
except messages.Error as error:

View file

@ -0,0 +1,2 @@
recursive-include letsencrypt_apache/tests/testdata *
include letsencrypt_apache/options-ssl-apache.conf

Some files were not shown because too many files have changed in this diff Show more