Merge pull request #527 from kuba/acme-client

Remove ACME v00 code, move networking to acme.client
This commit is contained in:
James Kasten 2015-06-22 16:24:06 -04:00
commit ac8d9e4ded
47 changed files with 793 additions and 2285 deletions

View file

@ -18,6 +18,13 @@ KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
class ChallengeResponseTest(unittest.TestCase):
def test_from_json_none(self):
from acme.challenges import ChallengeResponse
self.assertTrue(ChallengeResponse.from_json(None) is None)
class SimpleHTTPTest(unittest.TestCase):
def setUp(self):

View file

@ -1,4 +1,4 @@
"""Networking for ACME protocol v02."""
"""ACME client API."""
import datetime
import heapq
import httplib
@ -9,19 +9,18 @@ import M2Crypto
import requests
import werkzeug
from acme import errors
from acme import jose
from acme import jws as acme_jws
from acme import messages2
from letsencrypt import errors
from acme import jws
from acme import messages
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
class Network(object):
"""ACME networking.
class Client(object): # pylint: disable=too-many-instance-attributes
"""ACME client.
.. todo::
Clean up raised error types hierarchy, document, and handle (wrap)
@ -33,8 +32,6 @@ class Network(object):
:ivar bool verify_ssl: Verify SSL certificates?
"""
# TODO: Move below to acme module?
DER_CONTENT_TYPE = 'application/pkix-cert'
JSON_CONTENT_TYPE = 'application/json'
JSON_ERROR_CONTENT_TYPE = 'application/problem+json'
@ -58,7 +55,7 @@ class Network(object):
"""
dumps = obj.json_dumps()
logging.debug('Serialized JSON: %s', dumps)
return acme_jws.JWS.sign(
return jws.JWS.sign(
payload=dumps, key=self.key, alg=self.alg, nonce=nonce).json_dumps()
@classmethod
@ -75,10 +72,9 @@ class Network(object):
function will raise an error. Otherwise, wrong Content-Type
is ignored, but logged.
:raises letsencrypt.messages2.Error: If server response body
:raises .messages.Error: If server response body
carries HTTP Problem (draft-ietf-appsawg-http-problem-00).
:raises letsencrypt.errors.NetworkError: In case of other
networking errors.
:raises .ClientError: In case of other networking errors.
"""
response_ct = response.headers.get('Content-Type')
@ -98,13 +94,13 @@ class Network(object):
try:
logging.error("Error: %s", jobj)
logging.error("Response from server: %s", response.content)
raise messages2.Error.from_json(jobj)
raise messages.Error.from_json(jobj)
except jose.DeserializationError as error:
# Couldn't deserialize JSON object
raise errors.NetworkError((response, error))
raise errors.ClientError((response, error))
else:
# response is not JSON object
raise errors.NetworkError(response)
raise errors.ClientError(response)
else:
if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE:
logging.debug(
@ -112,13 +108,13 @@ class Network(object):
'response', response_ct)
if content_type == cls.JSON_CONTENT_TYPE and jobj is None:
raise errors.NetworkError(
raise errors.ClientError(
'Unexpected response Content-Type: {0}'.format(response_ct))
def _get(self, uri, content_type=JSON_CONTENT_TYPE, **kwargs):
"""Send GET request.
:raises letsencrypt.errors.NetworkError:
:raises .ClientError:
:returns: HTTP Response
:rtype: `requests.Response`
@ -129,22 +125,22 @@ class Network(object):
try:
response = requests.get(uri, **kwargs)
except requests.exceptions.RequestException as error:
raise errors.NetworkError(error)
raise errors.ClientError(error)
self._check_response(response, content_type=content_type)
return response
def _add_nonce(self, response):
if self.REPLAY_NONCE_HEADER in response.headers:
nonce = response.headers[self.REPLAY_NONCE_HEADER]
error = acme_jws.Header.validate_nonce(nonce)
error = jws.Header.validate_nonce(nonce)
if error is None:
logging.debug('Storing nonce: %r', nonce)
self._nonces.add(nonce)
else:
raise errors.NetworkError('Invalid nonce ({0}): {1}'.format(
raise errors.ClientError('Invalid nonce ({0}): {1}'.format(
nonce, error))
else:
raise errors.NetworkError(
raise errors.ClientError(
'Server {0} response did not include a replay nonce'.format(
response.request.method))
@ -160,7 +156,7 @@ class Network(object):
:param JSONDeSerializable obj: Will be wrapped in JWS.
:param str content_type: Expected ``Content-Type``, fails if not set.
:raises acme.messages2.NetworkError:
:raises acme.messages.ClientError:
:returns: HTTP Response
:rtype: `requests.Response`
@ -172,7 +168,7 @@ class Network(object):
try:
response = requests.post(uri, data=data, **kwargs)
except requests.exceptions.RequestException as error:
raise errors.NetworkError(error)
raise errors.ClientError(error)
logging.debug('Received response %s: %r', response, response.text)
self._add_nonce(response)
@ -190,15 +186,15 @@ class Network(object):
try:
new_authzr_uri = response.links['next']['url']
except KeyError:
raise errors.NetworkError('"next" link missing')
raise errors.ClientError('"next" link missing')
return messages2.RegistrationResource(
body=messages2.Registration.from_json(response.json()),
return messages.RegistrationResource(
body=messages.Registration.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service)
def register(self, contact=messages2.Registration._fields[
def register(self, contact=messages.Registration._fields[
'contact'].default):
"""Register.
@ -208,10 +204,10 @@ class Network(object):
:returns: Registration Resource.
:rtype: `.RegistrationResource`
:raises letsencrypt.errors.UnexpectedUpdate:
:raises .UnexpectedUpdate:
"""
new_reg = messages2.Registration(contact=contact)
new_reg = messages.Registration(contact=contact)
response = self._post(self.new_reg_uri, new_reg)
assert response.status_code == httplib.CREATED # TODO: handle errors
@ -222,24 +218,6 @@ class Network(object):
return regr
def register_from_account(self, account):
"""Register with server.
:param account: Account
:type account: :class:`letsencrypt.account.Account`
:returns: Updated account
:rtype: :class:`letsencrypt.account.Account`
"""
details = (
"mailto:" + account.email if account.email is not None else None,
"tel:" + account.phone if account.phone is not None else None,
)
account.regr = self.register(contact=tuple(
det for det in details if det is not None))
return account
def update_registration(self, regr):
"""Update registration.
@ -257,12 +235,12 @@ class Network(object):
# TODO: Boulder does not set Location or Link on update
# (c.f. acme-spec #94)
updated_regr = self._regr_from_response(
response, uri=regr.uri, new_authzr_uri=regr.new_authzr_uri,
terms_of_service=regr.terms_of_service)
if updated_regr != regr:
raise errors.UnexpectedUpdate(regr)
return updated_regr
def agree_to_tos(self, regr):
@ -287,10 +265,10 @@ class Network(object):
try:
new_cert_uri = response.links['next']['url']
except KeyError:
raise errors.NetworkError('"next" link missing')
raise errors.ClientError('"next" link missing')
authzr = messages2.AuthorizationResource(
body=messages2.Authorization.from_json(response.json()),
authzr = messages.AuthorizationResource(
body=messages.Authorization.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_cert_uri=new_cert_uri)
if authzr.body.identifier != identifier:
@ -301,7 +279,7 @@ class Network(object):
"""Request challenges.
:param identifier: Identifier to be challenged.
:type identifier: `.messages2.Identifier`
:type identifier: `.messages.Identifier`
:param str new_authzr_uri: new-authorization URI
@ -309,7 +287,7 @@ class Network(object):
:rtype: `.AuthorizationResource`
"""
new_authz = messages2.Authorization(identifier=identifier)
new_authz = messages.Authorization(identifier=identifier)
response = self._post(new_authzr_uri, new_authz)
assert response.status_code == httplib.CREATED # TODO: handle errors
return self._authzr_from_response(response, identifier)
@ -328,8 +306,8 @@ class Network(object):
:rtype: `.AuthorizationResource`
"""
return self.request_challenges(messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value=domain), new_authz_uri)
return self.request_challenges(messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value=domain), new_authz_uri)
def answer_challenge(self, challb, response):
"""Answer challenge.
@ -350,10 +328,10 @@ class Network(object):
try:
authzr_uri = response.links['up']['url']
except KeyError:
raise errors.NetworkError('"up" Link header missing')
challr = messages2.ChallengeResource(
raise errors.ClientError('"up" Link header missing')
challr = messages.ChallengeResource(
authzr_uri=authzr_uri,
body=messages2.ChallengeBody.from_json(response.json()))
body=messages.ChallengeBody.from_json(response.json()))
# TODO: check that challr.uri == response.headers['Location']?
if challr.uri != challb.uri:
raise errors.UnexpectedUpdate(challr.uri)
@ -412,14 +390,14 @@ class Network(object):
:param authzrs: `list` of `.AuthorizationResource`
:returns: Issued certificate
:rtype: `.messages2.CertificateResource`
:rtype: `.messages.CertificateResource`
"""
assert authzrs, "Authorizations list is empty"
logging.debug("Requesting issuance...")
# TODO: assert len(authzrs) == number of SANs
req = messages2.CertificateRequest(
req = messages.CertificateRequest(
csr=csr, authorizations=tuple(authzr.uri for authzr in authzrs))
content_type = self.DER_CONTENT_TYPE # TODO: add 'cert_type 'argument
@ -434,9 +412,9 @@ class Network(object):
try:
uri = response.headers['Location']
except KeyError:
raise errors.NetworkError('"Location" Header missing')
raise errors.ClientError('"Location" Header missing')
return messages2.CertificateResource(
return messages.CertificateResource(
uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri,
body=jose.ComparableX509(
M2Crypto.X509.load_cert_der_string(response.content)))
@ -459,7 +437,7 @@ class Network(object):
``Retry-After`` is not present in the response.
:returns: ``(cert, updated_authzrs)`` `tuple` where ``cert`` is
the issued certificate (`.messages2.CertificateResource.),
the issued certificate (`.messages.CertificateResource.),
and ``updated_authzrs`` is a `tuple` consisting of updated
Authorization Resources (`.AuthorizationResource`) as
present in the responses from server, and in the same order
@ -488,7 +466,7 @@ class Network(object):
updated_authzr, response = self.poll(updated[authzr])
updated[authzr] = updated_authzr
if updated_authzr.body.status != messages2.STATUS_VALID:
if updated_authzr.body.status != messages.STATUS_VALID:
# push back to the priority queue, with updated retry_after
heapq.heappush(waiting, (self.retry_after(
response, default=mintime), authzr))
@ -526,7 +504,7 @@ class Network(object):
# "refresh cert", and this method integrated with self.refresh
response, cert = self._get_cert(certr.uri)
if 'Location' not in response.headers:
raise errors.NetworkError('Location header missing')
raise errors.ClientError('Location header missing')
if response.headers['Location'] != certr.uri:
raise errors.UnexpectedUpdate(response.text)
return certr.update(body=cert)
@ -561,22 +539,21 @@ class Network(object):
else:
return None
def revoke(self, certr, when=messages2.Revocation.NOW):
def revoke(self, certr, when=messages.Revocation.NOW):
"""Revoke certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:param when: When should the revocation take place? Takes
the same values as `.messages2.Revocation.revoke`.
the same values as `.messages.Revocation.revoke`.
:raises letsencrypt.errors.NetworkError: If revocation is
unsuccessful.
:raises .ClientError: If revocation is unsuccessful.
"""
rev = messages2.Revocation(revoke=when, authorizations=tuple(
rev = messages.Revocation(revoke=when, authorizations=tuple(
authzr.uri for authzr in certr.authzrs))
response = self._post(certr.uri, rev)
if response.status_code != httplib.OK:
raise errors.NetworkError(
raise errors.ClientError(
'Successful revocation must return HTTP OK status')

View file

@ -1,10 +1,8 @@
"""Tests for letsencrypt.network2."""
"""Tests for acme.client."""
import datetime
import httplib
import os
import pkg_resources
import shutil
import tempfile
import unittest
import M2Crypto
@ -12,31 +10,28 @@ import mock
import requests
from acme import challenges
from acme import errors
from acme import jose
from acme import jws as acme_jws
from acme import messages2
from letsencrypt import account
from letsencrypt import errors
from acme import messages
CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'cert.pem'))))
CERT2 = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'cert-san.pem'))))
'acme.jose', os.path.join('testdata', 'cert.der')),
M2Crypto.X509.FORMAT_DER))
CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'csr.pem'))))
'acme.jose', os.path.join('testdata', 'csr.der')),
M2Crypto.X509.FORMAT_DER))
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
KEY2 = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa256_key.pem')))
class NetworkTest(unittest.TestCase):
"""Tests for letsencrypt.network2.Network."""
class ClientTest(unittest.TestCase):
"""Tests for acme.client.Client."""
# pylint: disable=too-many-instance-attributes,too-many-public-methods
@ -44,8 +39,8 @@ class NetworkTest(unittest.TestCase):
self.verify_ssl = mock.MagicMock()
self.wrap_in_jws = mock.MagicMock(return_value=mock.sentinel.wrapped)
from letsencrypt.network2 import Network
self.net = Network(
from acme.client import Client
self.net = Client(
new_reg_uri='https://www.letsencrypt-demo.org/acme/new-reg',
key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl)
self.nonce = jose.b64encode('Nonce')
@ -58,44 +53,39 @@ class NetworkTest(unittest.TestCase):
self.post = mock.MagicMock(return_value=self.response)
self.get = mock.MagicMock(return_value=self.response)
self.identifier = messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value='example.com')
self.config = mock.Mock(accounts_dir=tempfile.mkdtemp())
self.identifier = messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com')
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages2.Registration(
reg = messages.Registration(
contact=self.contact, key=KEY.public(), recovery_token='t')
self.regr = messages2.RegistrationResource(
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg',
terms_of_service='https://www.letsencrypt-demo.org/tos')
# Authorization
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
challb = messages2.ChallengeBody(
uri=(authzr_uri + '/1'), status=messages2.STATUS_VALID,
challb = messages.ChallengeBody(
uri=(authzr_uri + '/1'), status=messages.STATUS_VALID,
chall=challenges.DNS(token='foo'))
self.challr = messages2.ChallengeResource(
self.challr = messages.ChallengeResource(
body=challb, authzr_uri=authzr_uri)
self.authz = messages2.Authorization(
identifier=messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value='example.com'),
self.authz = messages.Authorization(
identifier=messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com'),
challenges=(challb,), combinations=None)
self.authzr = messages2.AuthorizationResource(
self.authzr = messages.AuthorizationResource(
body=self.authz, uri=authzr_uri,
new_cert_uri='https://www.letsencrypt-demo.org/acme/new-cert')
# Request issuance
self.certr = messages2.CertificateResource(
self.certr = messages.CertificateResource(
body=CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
def tearDown(self):
shutil.rmtree(self.config.accounts_dir)
def _mock_post_get(self):
# pylint: disable=protected-access
self.net._post = self.post
@ -127,22 +117,22 @@ class NetworkTest(unittest.TestCase):
self.response.json.return_value = {}
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response)
errors.ClientError, self.net._check_response, self.response)
def test_check_response_not_ok_jobj_error(self):
self.response.ok = False
self.response.json.return_value = messages2.Error(
self.response.json.return_value = messages.Error(
detail='foo', typ='serverInternal', title='some title').to_json()
# pylint: disable=protected-access
self.assertRaises(
messages2.Error, self.net._check_response, self.response)
messages.Error, self.net._check_response, self.response)
def test_check_response_not_ok_no_jobj(self):
self.response.ok = False
self.response.json.side_effect = ValueError
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response)
errors.ClientError, self.net._check_response, self.response)
def test_check_response_ok_no_jobj_ct_required(self):
self.response.json.side_effect = ValueError
@ -150,7 +140,7 @@ class NetworkTest(unittest.TestCase):
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response,
errors.ClientError, self.net._check_response, self.response,
content_type=self.net.JSON_CONTENT_TYPE)
def test_check_response_ok_no_jobj_no_ct(self):
@ -167,14 +157,14 @@ class NetworkTest(unittest.TestCase):
# pylint: disable=protected-access
self.net._check_response(self.response)
@mock.patch('letsencrypt.network2.requests')
@mock.patch('acme.client.requests')
def test_get_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.get.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self.assertRaises(errors.NetworkError, self.net._get, 'uri')
self.assertRaises(errors.ClientError, self.net._get, 'uri')
@mock.patch('letsencrypt.network2.requests')
@mock.patch('acme.client.requests')
def test_get(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
@ -186,16 +176,16 @@ class NetworkTest(unittest.TestCase):
# pylint: disable=protected-access
self.net._wrap_in_jws = self.wrap_in_jws
@mock.patch('letsencrypt.network2.requests')
@mock.patch('acme.client.requests')
def test_post_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.post.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self._mock_wrap_in_jws()
self.assertRaises(
errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj)
errors.ClientError, self.net._post, 'uri', mock.sentinel.obj)
@mock.patch('letsencrypt.network2.requests')
@mock.patch('acme.client.requests')
def test_post(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
@ -206,7 +196,7 @@ class NetworkTest(unittest.TestCase):
self.net._check_response.assert_called_once_with(
requests_mock.post('uri', mock.sentinel.wrapped), content_type='ct')
@mock.patch('letsencrypt.network2.requests')
@mock.patch('acme.client.requests')
def test_post_replay_nonce_handling(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
@ -214,7 +204,7 @@ class NetworkTest(unittest.TestCase):
self.net._nonces.clear()
self.assertRaises(
errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj)
errors.ClientError, self.net._post, 'uri', mock.sentinel.obj)
nonce2 = jose.b64encode('Nonce2')
requests_mock.head('uri').headers = {
@ -231,9 +221,9 @@ class NetworkTest(unittest.TestCase):
# wrong nonce
requests_mock.post('uri').headers = {self.net.REPLAY_NONCE_HEADER: 'F'}
self.assertRaises(
errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj)
errors.ClientError, self.net._post, 'uri', mock.sentinel.obj)
@mock.patch('letsencrypt.client.network2.requests')
@mock.patch('acme.client.requests')
def test_get_post_verify_ssl(self, requests_mock):
# pylint: disable=protected-access
self._mock_wrap_in_jws()
@ -274,30 +264,7 @@ class NetworkTest(unittest.TestCase):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.register, self.regr.body)
def test_register_from_account(self):
self.net.register = mock.Mock()
acc = account.Account(
self.config, 'key', email='cert-admin@example.com',
phone='+12025551212')
self.net.register_from_account(acc)
self.net.register.assert_called_with(contact=self.contact)
def test_register_from_account_partial_info(self):
self.net.register = mock.Mock()
acc = account.Account(
self.config, 'key', email='cert-admin@example.com')
acc2 = account.Account(self.config, 'key')
self.net.register_from_account(acc)
self.net.register.assert_called_with(
contact=('mailto:cert-admin@example.com',))
self.net.register_from_account(acc2)
self.net.register.assert_called_with(contact=())
errors.ClientError, self.net.register, self.regr.body)
def test_update_registration(self):
self.response.headers['Location'] = self.regr.uri
@ -339,7 +306,7 @@ class NetworkTest(unittest.TestCase):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.request_challenges,
errors.ClientError, self.net.request_challenges,
self.identifier, self.regr)
def test_request_domain_challenges(self):
@ -363,7 +330,7 @@ class NetworkTest(unittest.TestCase):
def test_answer_challenge_missing_next(self):
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.answer_challenge,
self.assertRaises(errors.ClientError, self.net.answer_challenge,
self.challr.body, challenges.DNSResponse())
def test_retry_after_date(self):
@ -372,7 +339,7 @@ class NetworkTest(unittest.TestCase):
datetime.datetime(1999, 12, 31, 23, 59, 59),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.network2.datetime')
@mock.patch('acme.client.datetime')
def test_retry_after_invalid(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
@ -382,7 +349,7 @@ class NetworkTest(unittest.TestCase):
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.network2.datetime')
@mock.patch('acme.client.datetime')
def test_retry_after_seconds(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
@ -392,7 +359,7 @@ class NetworkTest(unittest.TestCase):
datetime.datetime(2015, 3, 27, 0, 0, 50),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.network2.datetime')
@mock.patch('acme.client.datetime')
def test_retry_after_missing(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
@ -432,11 +399,11 @@ class NetworkTest(unittest.TestCase):
def test_request_issuance_missing_location(self):
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.request_issuance,
errors.ClientError, self.net.request_issuance,
CSR, (self.authzr,))
@mock.patch('letsencrypt.network2.datetime')
@mock.patch('letsencrypt.network2.time')
@mock.patch('acme.client.datetime')
@mock.patch('acme.client.time')
def test_poll_and_request_issuance(self, time_mock, dt_mock):
# clock.dt | pylint: disable=no-member
clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27))
@ -462,7 +429,7 @@ class NetworkTest(unittest.TestCase):
if not authzr.retries: # no more retries
done = mock.MagicMock(uri=authzr.uri, times=authzr.times)
done.body.status = messages2.STATUS_VALID
done.body.status = messages.STATUS_VALID
return done, []
# response (2nd result tuple element) is reduced to only
@ -517,10 +484,10 @@ class NetworkTest(unittest.TestCase):
def test_check_cert(self):
self.response.headers['Location'] = self.certr.uri
self.response.content = CERT2.as_der()
self.response.content = CERT.as_der()
self._mock_post_get()
self.assertEqual(
self.certr.update(body=CERT2), self.net.check_cert(self.certr))
self.certr.update(body=CERT), self.net.check_cert(self.certr))
# TODO: split here and separate test
self.response.headers['Location'] = 'foo'
@ -528,9 +495,9 @@ class NetworkTest(unittest.TestCase):
errors.UnexpectedUpdate, self.net.check_cert, self.certr)
def test_check_cert_missing_location(self):
self.response.content = CERT2.as_der()
self.response.content = CERT.as_der()
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.check_cert, self.certr)
self.assertRaises(errors.ClientError, self.net.check_cert, self.certr)
def test_refresh(self):
self.net.check_cert = mock.MagicMock()
@ -550,13 +517,13 @@ class NetworkTest(unittest.TestCase):
def test_revoke(self):
self._mock_post_get()
self.net.revoke(self.certr, when=messages2.Revocation.NOW)
self.net.revoke(self.certr, when=messages.Revocation.NOW)
self.post.assert_called_once_with(self.certr.uri, mock.ANY)
def test_revoke_bad_status_raises_error(self):
self.response.status_code = httplib.METHOD_NOT_ALLOWED
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.revoke, self.certr)
self.assertRaises(errors.ClientError, self.net.revoke, self.certr)
if __name__ == '__main__':

View file

@ -1,8 +1,15 @@
"""ACME errors."""
from acme.jose import errors as jose_errors
class Error(Exception):
"""Generic ACME error."""
class SchemaValidationError(jose_errors.DeserializationError):
"""JSON schema ACME object validation error."""
class ClientError(Error):
"""Network error."""
class UnexpectedUpdate(ClientError):
"""Unexpected update."""

View file

@ -4,7 +4,8 @@ The following command has been used to generate test keys:
and for the CSR:
python -c from 'letsencrypt.crypto_util import make_csr;
import pkg_resources; open("csr2.pem",
"w").write(make_csr(pkg_resources.resource_string("letsencrypt.tests",
"testdata/rsa512_key.pem"), ["example2.com"])[0])'
openssl req -key rsa512_key.pem -new -subj '/CN=example.com' -outform DER > csr.der
and for the certificate:
openssl req -key rsa512_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der

BIN
acme/jose/testdata/cert.der vendored Normal file

Binary file not shown.

BIN
acme/jose/testdata/csr.der vendored Normal file

Binary file not shown.

View file

@ -1,10 +0,0 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIBXzCCAQkCAQAwejELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIw
EAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECwwWVW5pdmVy
c2l0eSBvZiBNaWNoaWdhbjEVMBMGA1UEAwwMZXhhbXBsZTIuY29tMFwwDQYJKoZI
hvcNAQEBBQADSwAwSAJBAPS2EXFRNza/qpXnnBHF/CcFQ543htV+7nLAmrLrmTNH
tPXJmLlM8SJDIzv/ceAFXL110VzxFfi81lpH5E5c0TMCAwEAAaAqMCgGCSqGSIb3
DQEJDjEbMBkwFwYDVR0RBBAwDoIMZXhhbXBsZTIuY29tMA0GCSqGSIb3DQEBCwUA
A0EAwsdL4FLIgISKV4vXFmc6QTV7CjBiP4XmPFbeN+gMFdR7QcnRyyxSpXxB0v8Z
oqYboP5LGFt9zC6/9GyjcI9/IQ==
-----END CERTIFICATE REQUEST-----

View file

@ -1,106 +1,241 @@
"""ACME protocol v00 messages.
.. warning:: This module is an implementation of the draft `ACME
protocol version 00`_, and not the "RESTified" `ACME protocol version
01`_ or later. It should work with `older Node.js implementation`_,
but will definitely not work with Boulder_. It is kept for reference
purposes only.
.. _`ACME protocol version 00`:
https://github.com/letsencrypt/acme-spec/blob/v00/draft-barnes-acme.md
.. _`ACME protocol version 01`:
https://github.com/letsencrypt/acme-spec/blob/v01/draft-barnes-acme.md
.. _Boulder: https://github.com/letsencrypt/boulder
.. _`older Node.js implementation`:
https://github.com/letsencrypt/node-acme/commit/f42aa5b7fad4cd2fc289653c4ab14f18052367b3
"""
import jsonschema
"""ACME protocol messages."""
from acme import challenges
from acme import errors
from acme import fields
from acme import jose
from acme import other
from acme import util
class Message(jose.TypedJSONObjectWithFields):
# _fields_to_partial_json | pylint: disable=abstract-method
# pylint: disable=too-few-public-methods
"""ACME message."""
TYPES = {}
type_field_name = "type"
class Error(jose.JSONObjectWithFields, Exception):
"""ACME error.
schema = NotImplemented
"""JSON schema the object is tested against in :meth:`from_json`.
Subclasses must overrride it with a value that is acceptable by
:func:`jsonschema.validate`, most probably using
:func:`acme.util.load_schema`.
https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00
"""
ERROR_TYPE_NAMESPACE = 'urn:acme:error:'
ERROR_TYPE_DESCRIPTIONS = {
'malformed': 'The request message was malformed',
'unauthorized': 'The client lacks sufficient authorization',
'serverInternal': 'The server experienced an internal error',
'badCSR': 'The CSR is unacceptable (e.g., due to a short key)',
'badNonce': 'The client sent an unacceptable anti-replay nonce',
}
typ = jose.Field('type')
title = jose.Field('title', omitempty=True)
detail = jose.Field('detail')
@typ.encoder
def typ(value): # pylint: disable=missing-docstring,no-self-argument
return Error.ERROR_TYPE_NAMESPACE + value
@typ.decoder
def typ(value): # pylint: disable=missing-docstring,no-self-argument
# pylint thinks isinstance(value, Error), so startswith is not found
# pylint: disable=no-member
if not value.startswith(Error.ERROR_TYPE_NAMESPACE):
raise jose.DeserializationError('Missing error type prefix')
without_prefix = value[len(Error.ERROR_TYPE_NAMESPACE):]
if without_prefix not in Error.ERROR_TYPE_DESCRIPTIONS:
raise jose.DeserializationError('Error type not recognized')
return without_prefix
@property
def description(self):
"""Hardcoded error description based on its type."""
return self.ERROR_TYPE_DESCRIPTIONS[self.typ]
def __str__(self):
if self.typ is not None:
return ' :: '.join([self.typ, self.description, self.detail])
else:
return str(self.detail)
class _Constant(jose.JSONDeSerializable):
"""ACME constant."""
__slots__ = ('name',)
POSSIBLE_NAMES = NotImplemented
def __init__(self, name):
self.POSSIBLE_NAMES[name] = self
self.name = name
def to_partial_json(self):
return self.name
@classmethod
def from_json(cls, jobj):
"""Deserialize from (possibly invalid) JSON object.
def from_json(cls, value):
if value not in cls.POSSIBLE_NAMES:
raise jose.DeserializationError(
'{0} not recognized'.format(cls.__name__))
return cls.POSSIBLE_NAMES[value]
Note that the input ``jobj`` has not been sanitized in any way.
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, self.name)
:param jobj: JSON object.
def __eq__(self, other):
return isinstance(other, type(self)) and other.name == self.name
:raises acme.errors.SchemaValidationError: if the input
JSON object could not be validated against JSON schema specified
in :attr:`schema`.
:raises acme.jose.errors.DeserializationError: for any
other generic error in decoding.
:returns: instance of the class
"""
msg_cls = cls.get_type_cls(jobj)
# TODO: is that schema testing still relevant?
try:
jsonschema.validate(jobj, msg_cls.schema)
except jsonschema.ValidationError as error:
raise errors.SchemaValidationError(error)
return super(Message, cls).from_json(jobj)
def __ne__(self, other):
return not self.__eq__(other)
@Message.register # pylint: disable=too-few-public-methods
class Challenge(Message):
"""ACME "challenge" message.
class Status(_Constant):
"""ACME "status" field."""
POSSIBLE_NAMES = {}
STATUS_UNKNOWN = Status('unknown')
STATUS_PENDING = Status('pending')
STATUS_PROCESSING = Status('processing')
STATUS_VALID = Status('valid')
STATUS_INVALID = Status('invalid')
STATUS_REVOKED = Status('revoked')
:ivar str nonce: Random data, **not** base64-encoded.
:ivar list challenges: List of
:class:`~acme.challenges.Challenge` objects.
.. todo::
1. can challenges contain two challenges of the same type?
2. can challenges contain duplicates?
3. check "combinations" indices are in valid range
4. turn "combinations" elements into sets?
5. turn "combinations" into set?
class IdentifierType(_Constant):
"""ACME identifier type."""
POSSIBLE_NAMES = {}
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
class Identifier(jose.JSONObjectWithFields):
"""ACME identifier.
:ivar acme.messages.IdentifierType typ:
"""
typ = "challenge"
schema = util.load_schema(typ)
typ = jose.Field('type', decoder=IdentifierType.from_json)
value = jose.Field('value')
session_id = jose.Field("sessionID")
nonce = jose.Field("nonce", encoder=jose.b64encode,
decoder=jose.decode_b64jose)
challenges = jose.Field("challenges")
combinations = jose.Field("combinations", omitempty=True, default=())
class Resource(jose.ImmutableMap):
"""ACME Resource.
:ivar acme.messages.ResourceBody body: Resource body.
:ivar str uri: Location of the resource.
"""
__slots__ = ('body', 'uri')
class ResourceBody(jose.JSONObjectWithFields):
"""ACME Resource Body."""
class RegistrationResource(Resource):
"""Registration Resource.
:ivar acme.messages.Registration body:
:ivar str new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar str terms_of_service: URL for the CA TOS.
"""
__slots__ = ('body', 'uri', 'new_authzr_uri', 'terms_of_service')
class Registration(ResourceBody):
"""Registration Resource Body.
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec
"""
# on new-reg key server ignores 'key' and populates it based on
# JWS.signature.combined.jwk
key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json)
contact = jose.Field('contact', omitempty=True, default=())
recovery_token = jose.Field('recoveryToken', omitempty=True)
agreement = jose.Field('agreement', omitempty=True)
class ChallengeResource(Resource, jose.JSONObjectWithFields):
"""Challenge Resource.
:ivar acme.messages.ChallengeBody body:
:ivar str authzr_uri: URI found in the 'up' ``Link`` header.
"""
__slots__ = ('body', 'authzr_uri')
@property
def uri(self): # pylint: disable=missing-docstring,no-self-argument
# bug? 'method already defined line None'
# pylint: disable=function-redefined
return self.body.uri
class ChallengeBody(ResourceBody):
"""Challenge Resource Body.
.. todo::
Confusingly, this has a similar name to `.challenges.Challenge`,
as well as `.achallenges.AnnotatedChallenge`. Please use names
such as ``challb`` to distinguish instances of this class from
``achall``.
:ivar acme.challenges.Challenge: Wrapped challenge.
Conveniently, all challenge fields are proxied, i.e. you can
call ``challb.x`` to get ``challb.chall.x`` contents.
:ivar acme.messages.Status status:
:ivar datetime.datetime validated:
"""
__slots__ = ('chall',)
uri = jose.Field('uri')
status = jose.Field('status', decoder=Status.from_json)
validated = fields.RFC3339Field('validated', omitempty=True)
def to_partial_json(self):
jobj = super(ChallengeBody, self).to_partial_json()
jobj.update(self.chall.to_partial_json())
return jobj
@classmethod
def fields_from_json(cls, jobj):
jobj_fields = super(ChallengeBody, cls).fields_from_json(jobj)
jobj_fields['chall'] = challenges.Challenge.from_json(jobj)
return jobj_fields
def __getattr__(self, name):
return getattr(self.chall, name)
class AuthorizationResource(Resource):
"""Authorization Resource.
:ivar acme.messages.Authorization body:
:ivar str new_cert_uri: URI found in the 'next' ``Link`` header
"""
__slots__ = ('body', 'uri', 'new_cert_uri')
class Authorization(ResourceBody):
"""Authorization Resource Body.
:ivar acme.messages.Identifier identifier:
:ivar list challenges: `list` of `.ChallengeBody`
:ivar tuple combinations: Challenge combinations (`tuple` of `tuple`
of `int`, as opposed to `list` of `list` from the spec).
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact:
:ivar acme.messages.Status status:
:ivar datetime.datetime expires:
"""
identifier = jose.Field('identifier', decoder=Identifier.from_json)
challenges = jose.Field('challenges', omitempty=True)
combinations = jose.Field('combinations', omitempty=True)
status = jose.Field('status', omitempty=True, decoder=Status.from_json)
# TODO: 'expires' is allowed for Authorization Resources in
# general, but for Key Authorization '[t]he "expires" field MUST
# be absent'... then acme-spec gives example with 'expires'
# present... That's confusing!
expires = fields.RFC3339Field('expires', omitempty=True)
@challenges.decoder
def challenges(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(challenges.Challenge.from_json(chall) for chall in value)
return tuple(ChallengeBody.from_json(chall) for chall in value)
@property
def resolved_combinations(self):
@ -109,259 +244,55 @@ class Challenge(Message):
for combo in self.combinations)
@Message.register # pylint: disable=too-few-public-methods
class ChallengeRequest(Message):
"""ACME "challengeRequest" message."""
typ = "challengeRequest"
schema = util.load_schema(typ)
identifier = jose.Field("identifier")
class CertificateRequest(jose.JSONObjectWithFields):
"""ACME new-cert request.
@Message.register # pylint: disable=too-few-public-methods
class Authorization(Message):
"""ACME "authorization" message.
:ivar jwk: :class:`acme.jose.JWK`
:ivar acme.jose.util.ComparableX509 csr:
`M2Crypto.X509.Request` wrapped in `.ComparableX509`
:ivar tuple authorizations: `tuple` of URIs (`str`)
"""
typ = "authorization"
schema = util.load_schema(typ)
recovery_token = jose.Field("recoveryToken", omitempty=True)
identifier = jose.Field("identifier", omitempty=True)
jwk = jose.Field("jwk", decoder=jose.JWK.from_json, omitempty=True)
csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
authorizations = jose.Field('authorizations', decoder=tuple)
@Message.register
class AuthorizationRequest(Message):
"""ACME "authorizationRequest" message.
class CertificateResource(Resource):
"""Certificate Resource.
:ivar str nonce: Random data from the corresponding
:attr:`Challenge.nonce`, **not** base64-encoded.
:ivar list responses: List of completed challenges (
:class:`acme.challenges.ChallengeResponse`).
:ivar signature: Signature (:class:`acme.other.Signature`).
:ivar acme.jose.util.ComparableX509 body:
`M2Crypto.X509.X509` wrapped in `.ComparableX509`
:ivar str cert_chain_uri: URI found in the 'up' ``Link`` header
:ivar tuple authzrs: `tuple` of `AuthorizationResource`.
"""
typ = "authorizationRequest"
schema = util.load_schema(typ)
session_id = jose.Field("sessionID")
nonce = jose.Field("nonce", encoder=jose.b64encode,
decoder=jose.decode_b64jose)
responses = jose.Field("responses")
signature = jose.Field("signature", decoder=other.Signature.from_json)
contact = jose.Field("contact", omitempty=True, default=())
@responses.decoder
def responses(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(challenges.ChallengeResponse.from_json(chall)
for chall in value)
@classmethod
def create(cls, name, key, sig_nonce=None, **kwargs):
"""Create signed "authorizationRequest".
:param str name: Hostname
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "authorizationRequest" ACME message.
:rtype: :class:`AuthorizationRequest`
"""
# pylint: disable=too-many-arguments
signature = other.Signature.from_msg(
name + kwargs["nonce"], key, sig_nonce)
return cls(
signature=signature, contact=kwargs.pop("contact", ()), **kwargs)
def verify(self, name):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`acme.jose.JWK` object
is the correct key for a given context.
:param str name: Hostname
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
# self.signature is not Field | pylint: disable=no-member
return self.signature.verify(name + self.nonce)
__slots__ = ('body', 'uri', 'cert_chain_uri', 'authzrs')
@Message.register # pylint: disable=too-few-public-methods
class Certificate(Message):
"""ACME "certificate" message.
class Revocation(jose.JSONObjectWithFields):
"""Revocation message.
:ivar certificate: The certificate (:class:`M2Crypto.X509.X509`
wrapped in :class:`acme.util.ComparableX509`).
:ivar list chain: Chain of certificates (:class:`M2Crypto.X509.X509`
wrapped in :class:`acme.util.ComparableX509` ).
:ivar revoke: Either a `datetime.datetime` or `Revocation.NOW`.
:ivar tuple authorizations: Same as `CertificateRequest.authorizations`
"""
typ = "certificate"
schema = util.load_schema(typ)
certificate = jose.Field("certificate", encoder=jose.encode_cert,
decoder=jose.decode_cert)
chain = jose.Field("chain", omitempty=True, default=())
refresh = jose.Field("refresh", omitempty=True)
NOW = 'now'
"""A possible value for `revoke`, denoting that certificate should
be revoked now."""
@chain.decoder
def chain(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(jose.decode_cert(cert) for cert in value)
revoke = jose.Field('revoke')
authorizations = CertificateRequest._fields['authorizations']
@chain.encoder
def chain(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(jose.encode_cert(cert) for cert in value)
@revoke.decoder
def revoke(value): # pylint: disable=missing-docstring,no-self-argument
if value == Revocation.NOW:
return value
else:
return fields.RFC3339Field.default_decoder(value)
@Message.register
class CertificateRequest(Message):
"""ACME "certificateRequest" message.
:ivar csr: Certificate Signing Request (:class:`M2Crypto.X509.Request`
wrapped in :class:`acme.util.ComparableX509`.
:ivar signature: Signature (:class:`acme.other.Signature`).
"""
typ = "certificateRequest"
schema = util.load_schema(typ)
csr = jose.Field("csr", encoder=jose.encode_csr,
decoder=jose.decode_csr)
signature = jose.Field("signature", decoder=other.Signature.from_json)
@classmethod
def create(cls, key, sig_nonce=None, **kwargs):
"""Create signed "certificateRequest".
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "certificateRequest" ACME message.
:rtype: :class:`CertificateRequest`
"""
return cls(signature=other.Signature.from_msg(
kwargs["csr"].as_der(), key, sig_nonce), **kwargs)
def verify(self):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`acme.jose.JWK` object
is the correct key for a given context.
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
# self.signature is not Field | pylint: disable=no-member
return self.signature.verify(self.csr.as_der())
@Message.register # pylint: disable=too-few-public-methods
class Defer(Message):
"""ACME "defer" message."""
typ = "defer"
schema = util.load_schema(typ)
token = jose.Field("token")
interval = jose.Field("interval", omitempty=True)
message = jose.Field("message", omitempty=True)
@Message.register # pylint: disable=too-few-public-methods
class Error(Message):
"""ACME "error" message."""
typ = "error"
schema = util.load_schema(typ)
error = jose.Field("error")
message = jose.Field("message", omitempty=True)
more_info = jose.Field("moreInfo", omitempty=True)
MESSAGE_CODES = {
"malformed": "The request message was malformed",
"unauthorized": "The client lacks sufficient authorization",
"serverInternal": "The server experienced an internal error",
"notSupported": "The request type is not supported",
"unknown": "The server does not recognize an ID/token in the request",
"badCSR": "The CSR is unacceptable (e.g., due to a short key)",
}
@Message.register # pylint: disable=too-few-public-methods
class Revocation(Message):
"""ACME "revocation" message."""
typ = "revocation"
schema = util.load_schema(typ)
@Message.register
class RevocationRequest(Message):
"""ACME "revocationRequest" message.
:ivar certificate: Certificate (:class:`M2Crypto.X509.X509`
wrapped in :class:`acme.util.ComparableX509`).
:ivar signature: Signature (:class:`acme.other.Signature`).
"""
typ = "revocationRequest"
schema = util.load_schema(typ)
certificate = jose.Field("certificate", decoder=jose.decode_cert,
encoder=jose.encode_cert)
signature = jose.Field("signature", decoder=other.Signature.from_json)
@classmethod
def create(cls, key, sig_nonce=None, **kwargs):
"""Create signed "revocationRequest".
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "revocationRequest" ACME message.
:rtype: :class:`RevocationRequest`
"""
return cls(signature=other.Signature.from_msg(
kwargs["certificate"].as_der(), key, sig_nonce), **kwargs)
def verify(self):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`acme.jose.JWK` object
is the correct key for a given context.
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
# self.signature is not Field | pylint: disable=no-member
return self.signature.verify(self.certificate.as_der())
@Message.register # pylint: disable=too-few-public-methods
class StatusRequest(Message):
"""ACME "statusRequest" message."""
typ = "statusRequest"
schema = util.load_schema(typ)
token = jose.Field("token")
@revoke.encoder
def revoke(value): # pylint: disable=missing-docstring,no-self-argument
if value == Revocation.NOW:
return value
else:
return fields.RFC3339Field.default_encoder(value)

View file

@ -1,298 +0,0 @@
"""ACME protocol messages."""
from acme import challenges
from acme import fields
from acme import jose
class Error(jose.JSONObjectWithFields, Exception):
"""ACME error.
https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00
"""
ERROR_TYPE_NAMESPACE = 'urn:acme:error:'
ERROR_TYPE_DESCRIPTIONS = {
'malformed': 'The request message was malformed',
'unauthorized': 'The client lacks sufficient authorization',
'serverInternal': 'The server experienced an internal error',
'badCSR': 'The CSR is unacceptable (e.g., due to a short key)',
'badNonce': 'The client sent an unacceptable anti-replay nonce',
}
typ = jose.Field('type')
title = jose.Field('title', omitempty=True)
detail = jose.Field('detail')
@typ.encoder
def typ(value): # pylint: disable=missing-docstring,no-self-argument
return Error.ERROR_TYPE_NAMESPACE + value
@typ.decoder
def typ(value): # pylint: disable=missing-docstring,no-self-argument
# pylint thinks isinstance(value, Error), so startswith is not found
# pylint: disable=no-member
if not value.startswith(Error.ERROR_TYPE_NAMESPACE):
raise jose.DeserializationError('Missing error type prefix')
without_prefix = value[len(Error.ERROR_TYPE_NAMESPACE):]
if without_prefix not in Error.ERROR_TYPE_DESCRIPTIONS:
raise jose.DeserializationError('Error type not recognized')
return without_prefix
@property
def description(self):
"""Hardcoded error description based on its type."""
return self.ERROR_TYPE_DESCRIPTIONS[self.typ]
def __str__(self):
if self.typ is not None:
return ' :: '.join([self.typ, self.description, self.detail])
else:
return str(self.detail)
class _Constant(jose.JSONDeSerializable):
"""ACME constant."""
__slots__ = ('name',)
POSSIBLE_NAMES = NotImplemented
def __init__(self, name):
self.POSSIBLE_NAMES[name] = self
self.name = name
def to_partial_json(self):
return self.name
@classmethod
def from_json(cls, value):
if value not in cls.POSSIBLE_NAMES:
raise jose.DeserializationError(
'{0} not recognized'.format(cls.__name__))
return cls.POSSIBLE_NAMES[value]
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, self.name)
def __eq__(self, other):
return isinstance(other, type(self)) and other.name == self.name
def __ne__(self, other):
return not self.__eq__(other)
class Status(_Constant):
"""ACME "status" field."""
POSSIBLE_NAMES = {}
STATUS_UNKNOWN = Status('unknown')
STATUS_PENDING = Status('pending')
STATUS_PROCESSING = Status('processing')
STATUS_VALID = Status('valid')
STATUS_INVALID = Status('invalid')
STATUS_REVOKED = Status('revoked')
class IdentifierType(_Constant):
"""ACME identifier type."""
POSSIBLE_NAMES = {}
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
class Identifier(jose.JSONObjectWithFields):
"""ACME identifier.
:ivar acme.messages2.IdentifierType typ:
"""
typ = jose.Field('type', decoder=IdentifierType.from_json)
value = jose.Field('value')
class Resource(jose.ImmutableMap):
"""ACME Resource.
:ivar acme.messages2.ResourceBody body: Resource body.
:ivar str uri: Location of the resource.
"""
__slots__ = ('body', 'uri')
class ResourceBody(jose.JSONObjectWithFields):
"""ACME Resource Body."""
class RegistrationResource(Resource):
"""Registration Resource.
:ivar acme.messages2.Registration body:
:ivar str new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar str terms_of_service: URL for the CA TOS.
"""
__slots__ = ('body', 'uri', 'new_authzr_uri', 'terms_of_service')
class Registration(ResourceBody):
"""Registration Resource Body.
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec
"""
# on new-reg key server ignores 'key' and populates it based on
# JWS.signature.combined.jwk
key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json)
contact = jose.Field('contact', omitempty=True, default=())
recovery_token = jose.Field('recoveryToken', omitempty=True)
agreement = jose.Field('agreement', omitempty=True)
class ChallengeResource(Resource, jose.JSONObjectWithFields):
"""Challenge Resource.
:ivar acme.messages2.ChallengeBody body:
:ivar str authzr_uri: URI found in the 'up' ``Link`` header.
"""
__slots__ = ('body', 'authzr_uri')
@property
def uri(self): # pylint: disable=missing-docstring,no-self-argument
# bug? 'method already defined line None'
# pylint: disable=function-redefined
return self.body.uri
class ChallengeBody(ResourceBody):
"""Challenge Resource Body.
.. todo::
Confusingly, this has a similar name to `.challenges.Challenge`,
as well as `.achallenges.AnnotatedChallenge`. Please use names
such as ``challb`` to distinguish instances of this class from
``achall``.
:ivar acme.challenges.Challenge: Wrapped challenge.
Conveniently, all challenge fields are proxied, i.e. you can
call ``challb.x`` to get ``challb.chall.x`` contents.
:ivar acme.messages2.Status status:
:ivar datetime.datetime validated:
"""
__slots__ = ('chall',)
uri = jose.Field('uri')
status = jose.Field('status', decoder=Status.from_json)
validated = fields.RFC3339Field('validated', omitempty=True)
def to_partial_json(self):
jobj = super(ChallengeBody, self).to_partial_json()
jobj.update(self.chall.to_partial_json())
return jobj
@classmethod
def fields_from_json(cls, jobj):
jobj_fields = super(ChallengeBody, cls).fields_from_json(jobj)
jobj_fields['chall'] = challenges.Challenge.from_json(jobj)
return jobj_fields
def __getattr__(self, name):
return getattr(self.chall, name)
class AuthorizationResource(Resource):
"""Authorization Resource.
:ivar acme.messages2.Authorization body:
:ivar str new_cert_uri: URI found in the 'next' ``Link`` header
"""
__slots__ = ('body', 'uri', 'new_cert_uri')
class Authorization(ResourceBody):
"""Authorization Resource Body.
:ivar acme.messages2.Identifier identifier:
:ivar list challenges: `list` of `.ChallengeBody`
:ivar tuple combinations: Challenge combinations (`tuple` of `tuple`
of `int`, as opposed to `list` of `list` from the spec).
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact:
:ivar acme.messages2.Status status:
:ivar datetime.datetime expires:
"""
identifier = jose.Field('identifier', decoder=Identifier.from_json)
challenges = jose.Field('challenges', omitempty=True)
combinations = jose.Field('combinations', omitempty=True)
status = jose.Field('status', omitempty=True, decoder=Status.from_json)
# TODO: 'expires' is allowed for Authorization Resources in
# general, but for Key Authorization '[t]he "expires" field MUST
# be absent'... then acme-spec gives example with 'expires'
# present... That's confusing!
expires = fields.RFC3339Field('expires', omitempty=True)
@challenges.decoder
def challenges(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(ChallengeBody.from_json(chall) for chall in value)
@property
def resolved_combinations(self):
"""Combinations with challenges instead of indices."""
return tuple(tuple(self.challenges[idx] for idx in combo)
for combo in self.combinations)
class CertificateRequest(jose.JSONObjectWithFields):
"""ACME new-cert request.
:ivar acme.jose.util.ComparableX509 csr:
`M2Crypto.X509.Request` wrapped in `.ComparableX509`
:ivar tuple authorizations: `tuple` of URIs (`str`)
"""
csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
authorizations = jose.Field('authorizations', decoder=tuple)
class CertificateResource(Resource):
"""Certificate Resource.
:ivar acme.jose.util.ComparableX509 body:
`M2Crypto.X509.X509` wrapped in `.ComparableX509`
:ivar str cert_chain_uri: URI found in the 'up' ``Link`` header
:ivar tuple authzrs: `tuple` of `AuthorizationResource`.
"""
__slots__ = ('body', 'uri', 'cert_chain_uri', 'authzrs')
class Revocation(jose.JSONObjectWithFields):
"""Revocation message.
:ivar revoke: Either a `datetime.datetime` or `Revocation.NOW`.
:ivar tuple authorizations: Same as `CertificateRequest.authorizations`
"""
NOW = 'now'
"""A possible value for `revoke`, denoting that certificate should
be revoked now."""
revoke = jose.Field('revoke')
authorizations = CertificateRequest._fields['authorizations']
@revoke.decoder
def revoke(value): # pylint: disable=missing-docstring,no-self-argument
if value == Revocation.NOW:
return value
else:
return fields.RFC3339Field.default_decoder(value)
@revoke.encoder
def revoke(value): # pylint: disable=missing-docstring,no-self-argument
if value == Revocation.NOW:
return value
else:
return fields.RFC3339Field.default_encoder(value)

View file

@ -1,250 +0,0 @@
"""Tests for acme.messages2."""
import datetime
import os
import pkg_resources
import unittest
import mock
import pytz
from Crypto.PublicKey import RSA
from acme import challenges
from acme import jose
KEY = jose.util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
class ErrorTest(unittest.TestCase):
"""Tests for acme.messages2.Error."""
def setUp(self):
from acme.messages2 import Error
self.error = Error(detail='foo', typ='malformed', title='title')
self.jobj = {'detail': 'foo', 'title': 'some title'}
def test_typ_prefix(self):
self.assertEqual('malformed', self.error.typ)
self.assertEqual(
'urn:acme:error:malformed', self.error.to_partial_json()['type'])
self.assertEqual(
'malformed', self.error.from_json(self.error.to_partial_json()).typ)
def test_typ_decoder_missing_prefix(self):
from acme.messages2 import Error
self.jobj['type'] = 'malformed'
self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj)
self.jobj['type'] = 'not valid bare type'
self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj)
def test_typ_decoder_not_recognized(self):
from acme.messages2 import Error
self.jobj['type'] = 'urn:acme:error:baz'
self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj)
def test_description(self):
self.assertEqual(
'The request message was malformed', self.error.description)
def test_from_json_hashable(self):
from acme.messages2 import Error
hash(Error.from_json(self.error.to_json()))
def test_str(self):
self.assertEqual(
'malformed :: The request message was malformed :: foo',
str(self.error))
self.assertEqual('foo', str(self.error.update(typ=None)))
class ConstantTest(unittest.TestCase):
"""Tests for acme.messages2._Constant."""
def setUp(self):
from acme.messages2 import _Constant
class MockConstant(_Constant): # pylint: disable=missing-docstring
POSSIBLE_NAMES = {}
self.MockConstant = MockConstant # pylint: disable=invalid-name
self.const_a = MockConstant('a')
self.const_b = MockConstant('b')
def test_to_partial_json(self):
self.assertEqual('a', self.const_a.to_partial_json())
self.assertEqual('b', self.const_b.to_partial_json())
def test_from_json(self):
self.assertEqual(self.const_a, self.MockConstant.from_json('a'))
self.assertRaises(
jose.DeserializationError, self.MockConstant.from_json, 'c')
def test_from_json_hashable(self):
hash(self.MockConstant.from_json('a'))
def test_repr(self):
self.assertEqual('MockConstant(a)', repr(self.const_a))
self.assertEqual('MockConstant(b)', repr(self.const_b))
def test_equality(self):
const_a_prime = self.MockConstant('a')
self.assertFalse(self.const_a == self.const_b)
self.assertTrue(self.const_a == const_a_prime)
self.assertTrue(self.const_a != self.const_b)
self.assertFalse(self.const_a != const_a_prime)
class RegistrationTest(unittest.TestCase):
"""Tests for acme.messages2.Registration."""
def setUp(self):
key = jose.jwk.JWKRSA(key=KEY.publickey())
contact = ('mailto:letsencrypt-client@letsencrypt.org',)
recovery_token = 'XYZ'
agreement = 'https://letsencrypt.org/terms'
from acme.messages2 import Registration
self.reg = Registration(
key=key, contact=contact, recovery_token=recovery_token,
agreement=agreement)
self.jobj_to = {
'contact': contact,
'recoveryToken': recovery_token,
'agreement': agreement,
'key': key,
}
self.jobj_from = self.jobj_to.copy()
self.jobj_from['key'] = key.to_json()
def test_to_partial_json(self):
self.assertEqual(self.jobj_to, self.reg.to_partial_json())
def test_from_json(self):
from acme.messages2 import Registration
self.assertEqual(self.reg, Registration.from_json(self.jobj_from))
def test_from_json_hashable(self):
from acme.messages2 import Registration
hash(Registration.from_json(self.jobj_from))
class ChallengeResourceTest(unittest.TestCase):
"""Tests for acme.messages2.ChallengeResource."""
def test_uri(self):
from acme.messages2 import ChallengeResource
self.assertEqual('http://challb', ChallengeResource(body=mock.MagicMock(
uri='http://challb'), authzr_uri='http://authz').uri)
class ChallengeBodyTest(unittest.TestCase):
"""Tests for acme.messages2.ChallengeBody."""
def setUp(self):
self.chall = challenges.DNS(token='foo')
from acme.messages2 import ChallengeBody
from acme.messages2 import STATUS_VALID
self.status = STATUS_VALID
self.challb = ChallengeBody(
uri='http://challb', chall=self.chall, status=self.status)
self.jobj_to = {
'uri': 'http://challb',
'status': self.status,
'type': 'dns',
'token': 'foo',
}
self.jobj_from = self.jobj_to.copy()
self.jobj_from['status'] = 'valid'
def test_to_partial_json(self):
self.assertEqual(self.jobj_to, self.challb.to_partial_json())
def test_from_json(self):
from acme.messages2 import ChallengeBody
self.assertEqual(self.challb, ChallengeBody.from_json(self.jobj_from))
def test_from_json_hashable(self):
from acme.messages2 import ChallengeBody
hash(ChallengeBody.from_json(self.jobj_from))
def test_proxy(self):
self.assertEqual('foo', self.challb.token)
class AuthorizationTest(unittest.TestCase):
"""Tests for acme.messages2.Authorization."""
def setUp(self):
from acme.messages2 import ChallengeBody
from acme.messages2 import STATUS_VALID
self.challbs = (
ChallengeBody(
uri='http://challb1', status=STATUS_VALID,
chall=challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A')),
ChallengeBody(uri='http://challb2', status=STATUS_VALID,
chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')),
ChallengeBody(uri='http://challb3', status=STATUS_VALID,
chall=challenges.RecoveryToken()),
)
combinations = ((0, 2), (1, 2))
from acme.messages2 import Authorization
from acme.messages2 import Identifier
from acme.messages2 import IDENTIFIER_FQDN
identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com')
self.authz = Authorization(
identifier=identifier, combinations=combinations,
challenges=self.challbs)
self.jobj_from = {
'identifier': identifier.to_json(),
'challenges': [challb.to_json() for challb in self.challbs],
'combinations': combinations,
}
def test_from_json(self):
from acme.messages2 import Authorization
Authorization.from_json(self.jobj_from)
def test_from_json_hashable(self):
from acme.messages2 import Authorization
hash(Authorization.from_json(self.jobj_from))
def test_resolved_combinations(self):
self.assertEqual(self.authz.resolved_combinations, (
(self.challbs[0], self.challbs[2]),
(self.challbs[1], self.challbs[2]),
))
class RevocationTest(unittest.TestCase):
"""Tests for acme.messages2.RevocationTest."""
def setUp(self):
from acme.messages2 import Revocation
self.rev_now = Revocation(authorizations=(), revoke=Revocation.NOW)
self.rev_date = Revocation(authorizations=(), revoke=datetime.datetime(
2015, 3, 27, tzinfo=pytz.utc))
self.jobj_now = {'authorizations': (), 'revoke': Revocation.NOW}
self.jobj_date = {'authorizations': (),
'revoke': '2015-03-27T00:00:00Z'}
def test_revoke_decoder(self):
from acme.messages2 import Revocation
self.assertEqual(self.rev_now, Revocation.from_json(self.jobj_now))
self.assertEqual(self.rev_date, Revocation.from_json(self.jobj_date))
def test_revoke_encoder(self):
self.assertEqual(self.jobj_now, self.rev_now.to_partial_json())
self.assertEqual(self.jobj_date, self.rev_date.to_partial_json())
def test_from_json_hashable(self):
from acme.messages2 import Revocation
hash(Revocation.from_json(self.rev_now.to_json()))
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,479 +1,249 @@
"""Tests for acme.messages."""
import datetime
import os
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
import M2Crypto
import mock
import pytz
from Crypto.PublicKey import RSA
from acme import challenges
from acme import errors
from acme import jose
from acme import other
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
CERT = jose.ComparableX509(M2Crypto.X509.load_cert(
pkg_resources.resource_filename(
'letsencrypt.tests', os.path.join('testdata', 'cert.pem'))))
CSR = jose.ComparableX509(M2Crypto.X509.load_request(
pkg_resources.resource_filename(
'letsencrypt.tests', os.path.join('testdata', 'csr.pem'))))
CSR2 = jose.ComparableX509(M2Crypto.X509.load_request(
pkg_resources.resource_filename(
'acme.jose', os.path.join('testdata', 'csr2.pem'))))
class MessageTest(unittest.TestCase):
"""Tests for acme.messages.Message."""
def setUp(self):
# pylint: disable=missing-docstring,too-few-public-methods
from acme.messages import Message
class MockParentMessage(Message):
# pylint: disable=abstract-method
TYPES = {}
@MockParentMessage.register
class MockMessage(MockParentMessage):
typ = 'test'
schema = {
'type': 'object',
'properties': {
'price': {'type': 'number'},
'name': {'type': 'string'},
},
}
price = jose.Field('price')
name = jose.Field('name')
self.parent_cls = MockParentMessage
self.msg = MockMessage(price=123, name='foo')
def test_from_json_validates(self):
self.assertRaises(errors.SchemaValidationError,
self.parent_cls.from_json,
{'type': 'test', 'price': 'asd'})
class ChallengeTest(unittest.TestCase):
def setUp(self):
challs = (
challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A'),
challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA'),
challenges.RecoveryToken(),
)
combinations = ((0, 2), (1, 2))
from acme.messages import Challenge
self.msg = Challenge(
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
challenges=challs, combinations=combinations)
self.jmsg_to = {
'type': 'challenge',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'challenges': challs,
'combinations': combinations,
}
self.jmsg_from = {
'type': 'challenge',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'challenges': [chall.to_json() for chall in challs],
'combinations': [[0, 2], [1, 2]], # TODO array tuples
}
def test_resolved_combinations(self):
self.assertEqual(self.msg.resolved_combinations, (
(
challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A'),
challenges.RecoveryToken()
),
(
challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA'),
challenges.RecoveryToken(),
)
))
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg_to)
def test_from_json(self):
from acme.messages import Challenge
self.assertEqual(Challenge.from_json(self.jmsg_from), self.msg)
def test_json_without_optionals(self):
del self.jmsg_from['combinations']
del self.jmsg_to['combinations']
from acme.messages import Challenge
msg = Challenge.from_json(self.jmsg_from)
self.assertEqual(msg.combinations, ())
self.assertEqual(msg.to_partial_json(), self.jmsg_to)
class ChallengeRequestTest(unittest.TestCase):
def setUp(self):
from acme.messages import ChallengeRequest
self.msg = ChallengeRequest(identifier='example.com')
self.jmsg = {
'type': 'challengeRequest',
'identifier': 'example.com',
}
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg)
def test_from_json(self):
from acme.messages import ChallengeRequest
self.assertEqual(ChallengeRequest.from_json(self.jmsg), self.msg)
class AuthorizationTest(unittest.TestCase):
def setUp(self):
jwk = jose.JWKRSA(key=KEY.publickey())
from acme.messages import Authorization
self.msg = Authorization(recovery_token='tok', jwk=jwk,
identifier='example.com')
self.jmsg = {
'type': 'authorization',
'recoveryToken': 'tok',
'identifier': 'example.com',
'jwk': jwk,
}
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg)
def test_from_json(self):
self.jmsg['jwk'] = self.jmsg['jwk'].to_partial_json()
from acme.messages import Authorization
self.assertEqual(Authorization.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['recoveryToken']
del self.jmsg['identifier']
del self.jmsg['jwk']
from acme.messages import Authorization
msg = Authorization.from_json(self.jmsg)
self.assertTrue(msg.recovery_token is None)
self.assertTrue(msg.identifier is None)
self.assertTrue(msg.jwk is None)
self.assertEqual(self.jmsg, msg.to_partial_json())
class AuthorizationRequestTest(unittest.TestCase):
def setUp(self):
self.responses = (
challenges.SimpleHTTPResponse(path='Hf5GrX4Q7EBax9hc2jJnfw'),
None, # null
challenges.RecoveryTokenResponse(token='23029d88d9e123e'),
)
self.contact = ("mailto:cert-admin@example.com", "tel:+12025551212")
signature = other.Signature(
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()),
sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe'
'\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v'
'\xe4\xed\xe8\x03J\xe8\xc8<?\xc8W\x94\x94cj(\xe7\xaa$'
'\x92\xe9\x96\x11\xc2\xefx\x0bR',
nonce='\xab?\x08o\xe6\x81$\x9f\xa1\xc9\x025\x1c\x1b\xa5+')
from acme.messages import AuthorizationRequest
self.msg = AuthorizationRequest(
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
responses=self.responses,
signature=signature,
contact=self.contact,
)
self.jmsg_to = {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'responses': self.responses,
'signature': signature,
'contact': self.contact,
}
self.jmsg_from = {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'responses': [None if response is None else response.to_json()
for response in self.responses],
'signature': signature.to_json(),
# TODO: schema validation doesn't recognize tuples as
# arrays :(
'contact': list(self.contact),
}
def test_create(self):
from acme.messages import AuthorizationRequest
self.assertEqual(self.msg, AuthorizationRequest.create(
name='example.com', key=KEY, responses=self.responses,
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
sig_nonce='\xab?\x08o\xe6\x81$\x9f\xa1\xc9\x025\x1c\x1b\xa5+',
contact=self.contact))
def test_verify(self):
self.assertTrue(self.msg.verify('example.com'))
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg_to)
def test_from_json(self):
from acme.messages import AuthorizationRequest
self.assertEqual(
self.msg, AuthorizationRequest.from_json(self.jmsg_from))
def test_json_without_optionals(self):
del self.jmsg_from['contact']
del self.jmsg_to['contact']
from acme.messages import AuthorizationRequest
msg = AuthorizationRequest.from_json(self.jmsg_from)
self.assertEqual(msg.contact, ())
self.assertEqual(self.jmsg_to, msg.to_partial_json())
class CertificateTest(unittest.TestCase):
def setUp(self):
refresh = 'https://example.com/refresh/Dr8eAwTVQfSS/'
from acme.messages import Certificate
self.msg = Certificate(
certificate=CERT, chain=(CERT,), refresh=refresh)
self.jmsg_to = {
'type': 'certificate',
'certificate': jose.b64encode(CERT.as_der()),
'chain': (jose.b64encode(CERT.as_der()),),
'refresh': refresh,
}
self.jmsg_from = self.jmsg_to.copy()
# TODO: schema validation array tuples
self.jmsg_from['chain'] = list(self.jmsg_from['chain'])
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg_to)
def test_from_json(self):
from acme.messages import Certificate
self.assertEqual(Certificate.from_json(self.jmsg_from), self.msg)
def test_json_without_optionals(self):
del self.jmsg_from['chain']
del self.jmsg_from['refresh']
del self.jmsg_to['chain']
del self.jmsg_to['refresh']
from acme.messages import Certificate
msg = Certificate.from_json(self.jmsg_from)
self.assertEqual(msg.chain, ())
self.assertTrue(msg.refresh is None)
self.assertEqual(self.jmsg_to, msg.to_partial_json())
class CertificateRequestTest(unittest.TestCase):
def setUp(self):
signature = other.Signature(
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()),
sig='\x15\xed\x84\xaa:\xf2DO\x0e9 \xbcg\xf8\xc0\xcf\x87\x9a'
'\x95\xeb\xffT[\x84[\xec\x85\x7f\x8eK\xe9\xc2\x12\xc8Q'
'\xafo\xc6h\x07\xba\xa6\xdf\xd1\xa7"$\xba=Z\x13n\x14\x0b'
'k\xfe\xee\xb4\xe4\xc8\x05\x9a\x08\xa7',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9')
from acme.messages import CertificateRequest
self.msg = CertificateRequest(csr=CSR, signature=signature)
self.jmsg_to = {
'type': 'certificateRequest',
'csr': jose.b64encode(CSR.as_der()),
'signature': signature,
}
self.jmsg_from = self.jmsg_to.copy()
self.jmsg_from['signature'] = self.jmsg_from['signature'].to_json()
def test_create(self):
from acme.messages import CertificateRequest
self.assertEqual(self.msg, CertificateRequest.create(
csr=CSR, key=KEY,
sig_nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'))
def test_verify(self):
self.assertTrue(self.msg.verify())
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg_to)
def test_from_json(self):
from acme.messages import CertificateRequest
self.assertEqual(self.msg, CertificateRequest.from_json(self.jmsg_from))
class DeferTest(unittest.TestCase):
def setUp(self):
from acme.messages import Defer
self.msg = Defer(
token='O7-s9MNq1siZHlgrMzi9_A', interval=60,
message='Warming up the HSM')
self.jmsg = {
'type': 'defer',
'token': 'O7-s9MNq1siZHlgrMzi9_A',
'interval': 60,
'message': 'Warming up the HSM',
}
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg)
def test_from_json(self):
from acme.messages import Defer
self.assertEqual(Defer.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['interval']
del self.jmsg['message']
from acme.messages import Defer
msg = Defer.from_json(self.jmsg)
self.assertTrue(msg.interval is None)
self.assertTrue(msg.message is None)
self.assertEqual(self.jmsg, msg.to_partial_json())
KEY = jose.util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
class ErrorTest(unittest.TestCase):
"""Tests for acme.messages.Error."""
def setUp(self):
from acme.messages import Error
self.msg = Error(
error='badCSR', message='RSA keys must be at least 2048 bits long',
more_info='https://ca.example.com/documentation/csr-requirements')
self.error = Error(detail='foo', typ='malformed', title='title')
self.jobj = {'detail': 'foo', 'title': 'some title'}
self.jmsg = {
'type': 'error',
'error': 'badCSR',
'message':'RSA keys must be at least 2048 bits long',
'moreInfo': 'https://ca.example.com/documentation/csr-requirements',
}
def test_typ_prefix(self):
self.assertEqual('malformed', self.error.typ)
self.assertEqual(
'urn:acme:error:malformed', self.error.to_partial_json()['type'])
self.assertEqual(
'malformed', self.error.from_json(self.error.to_partial_json()).typ)
def test_typ_decoder_missing_prefix(self):
from acme.messages import Error
self.jobj['type'] = 'malformed'
self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj)
self.jobj['type'] = 'not valid bare type'
self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj)
def test_typ_decoder_not_recognized(self):
from acme.messages import Error
self.jobj['type'] = 'urn:acme:error:baz'
self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj)
def test_description(self):
self.assertEqual(
'The request message was malformed', self.error.description)
def test_from_json_hashable(self):
from acme.messages import Error
hash(Error.from_json(self.error.to_json()))
def test_str(self):
self.assertEqual(
'malformed :: The request message was malformed :: foo',
str(self.error))
self.assertEqual('foo', str(self.error.update(typ=None)))
class ConstantTest(unittest.TestCase):
"""Tests for acme.messages._Constant."""
def setUp(self):
from acme.messages import _Constant
class MockConstant(_Constant): # pylint: disable=missing-docstring
POSSIBLE_NAMES = {}
self.MockConstant = MockConstant # pylint: disable=invalid-name
self.const_a = MockConstant('a')
self.const_b = MockConstant('b')
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg)
self.assertEqual('a', self.const_a.to_partial_json())
self.assertEqual('b', self.const_b.to_partial_json())
def test_from_json(self):
from acme.messages import Error
self.assertEqual(Error.from_json(self.jmsg), self.msg)
self.assertEqual(self.const_a, self.MockConstant.from_json('a'))
self.assertRaises(
jose.DeserializationError, self.MockConstant.from_json, 'c')
def test_json_without_optionals(self):
del self.jmsg['message']
del self.jmsg['moreInfo']
def test_from_json_hashable(self):
hash(self.MockConstant.from_json('a'))
from acme.messages import Error
msg = Error.from_json(self.jmsg)
def test_repr(self):
self.assertEqual('MockConstant(a)', repr(self.const_a))
self.assertEqual('MockConstant(b)', repr(self.const_b))
self.assertTrue(msg.message is None)
self.assertTrue(msg.more_info is None)
self.assertEqual(self.jmsg, msg.to_partial_json())
def test_equality(self):
const_a_prime = self.MockConstant('a')
self.assertFalse(self.const_a == self.const_b)
self.assertTrue(self.const_a == const_a_prime)
self.assertTrue(self.const_a != self.const_b)
self.assertFalse(self.const_a != const_a_prime)
class RegistrationTest(unittest.TestCase):
"""Tests for acme.messages.Registration."""
def setUp(self):
key = jose.jwk.JWKRSA(key=KEY.publickey())
contact = ('mailto:letsencrypt-client@letsencrypt.org',)
recovery_token = 'XYZ'
agreement = 'https://letsencrypt.org/terms'
from acme.messages import Registration
self.reg = Registration(
key=key, contact=contact, recovery_token=recovery_token,
agreement=agreement)
self.jobj_to = {
'contact': contact,
'recoveryToken': recovery_token,
'agreement': agreement,
'key': key,
}
self.jobj_from = self.jobj_to.copy()
self.jobj_from['key'] = key.to_json()
def test_to_partial_json(self):
self.assertEqual(self.jobj_to, self.reg.to_partial_json())
def test_from_json(self):
from acme.messages import Registration
self.assertEqual(self.reg, Registration.from_json(self.jobj_from))
def test_from_json_hashable(self):
from acme.messages import Registration
hash(Registration.from_json(self.jobj_from))
class ChallengeResourceTest(unittest.TestCase):
"""Tests for acme.messages.ChallengeResource."""
def test_uri(self):
from acme.messages import ChallengeResource
self.assertEqual('http://challb', ChallengeResource(body=mock.MagicMock(
uri='http://challb'), authzr_uri='http://authz').uri)
class ChallengeBodyTest(unittest.TestCase):
"""Tests for acme.messages.ChallengeBody."""
def setUp(self):
self.chall = challenges.DNS(token='foo')
from acme.messages import ChallengeBody
from acme.messages import STATUS_VALID
self.status = STATUS_VALID
self.challb = ChallengeBody(
uri='http://challb', chall=self.chall, status=self.status)
self.jobj_to = {
'uri': 'http://challb',
'status': self.status,
'type': 'dns',
'token': 'foo',
}
self.jobj_from = self.jobj_to.copy()
self.jobj_from['status'] = 'valid'
def test_to_partial_json(self):
self.assertEqual(self.jobj_to, self.challb.to_partial_json())
def test_from_json(self):
from acme.messages import ChallengeBody
self.assertEqual(self.challb, ChallengeBody.from_json(self.jobj_from))
def test_from_json_hashable(self):
from acme.messages import ChallengeBody
hash(ChallengeBody.from_json(self.jobj_from))
def test_proxy(self):
self.assertEqual('foo', self.challb.token)
class AuthorizationTest(unittest.TestCase):
"""Tests for acme.messages.Authorization."""
def setUp(self):
from acme.messages import ChallengeBody
from acme.messages import STATUS_VALID
self.challbs = (
ChallengeBody(
uri='http://challb1', status=STATUS_VALID,
chall=challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A')),
ChallengeBody(uri='http://challb2', status=STATUS_VALID,
chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')),
ChallengeBody(uri='http://challb3', status=STATUS_VALID,
chall=challenges.RecoveryToken()),
)
combinations = ((0, 2), (1, 2))
from acme.messages import Authorization
from acme.messages import Identifier
from acme.messages import IDENTIFIER_FQDN
identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com')
self.authz = Authorization(
identifier=identifier, combinations=combinations,
challenges=self.challbs)
self.jobj_from = {
'identifier': identifier.to_json(),
'challenges': [challb.to_json() for challb in self.challbs],
'combinations': combinations,
}
def test_from_json(self):
from acme.messages import Authorization
Authorization.from_json(self.jobj_from)
def test_from_json_hashable(self):
from acme.messages import Authorization
hash(Authorization.from_json(self.jobj_from))
def test_resolved_combinations(self):
self.assertEqual(self.authz.resolved_combinations, (
(self.challbs[0], self.challbs[2]),
(self.challbs[1], self.challbs[2]),
))
class RevocationTest(unittest.TestCase):
"""Tests for acme.messages.RevocationTest."""
def setUp(self):
from acme.messages import Revocation
self.msg = Revocation()
self.jmsg = {'type': 'revocation'}
self.rev_now = Revocation(authorizations=(), revoke=Revocation.NOW)
self.rev_date = Revocation(authorizations=(), revoke=datetime.datetime(
2015, 3, 27, tzinfo=pytz.utc))
self.jobj_now = {'authorizations': (), 'revoke': Revocation.NOW}
self.jobj_date = {'authorizations': (),
'revoke': '2015-03-27T00:00:00Z'}
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg)
def test_from_json(self):
def test_revoke_decoder(self):
from acme.messages import Revocation
self.assertEqual(Revocation.from_json(self.jmsg), self.msg)
self.assertEqual(self.rev_now, Revocation.from_json(self.jobj_now))
self.assertEqual(self.rev_date, Revocation.from_json(self.jobj_date))
def test_revoke_encoder(self):
self.assertEqual(self.jobj_now, self.rev_now.to_partial_json())
self.assertEqual(self.jobj_date, self.rev_date.to_partial_json())
class RevocationRequestTest(unittest.TestCase):
def setUp(self):
self.sig_nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
signature = other.Signature(
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()),
sig='eJ\xfe\x12"U\x87\x8b\xbf/ ,\xdeP\xb2\xdc1\xb00\xe5\x1dB'
'\xfch<\xc6\x9eH@!\x1c\x16\xb2\x0b_\xc4\xddP\x89\xc8\xce?'
'\x16g\x069I\xb9\xb3\x91\xb9\x0e$3\x9f\x87\x8e\x82\xca\xc5'
's\xd9\xd0\xe7',
nonce=self.sig_nonce)
from acme.messages import RevocationRequest
self.msg = RevocationRequest(certificate=CERT, signature=signature)
self.jmsg_to = {
'type': 'revocationRequest',
'certificate': jose.b64encode(CERT.as_der()),
'signature': signature,
}
self.jmsg_from = self.jmsg_to.copy()
self.jmsg_from['signature'] = self.jmsg_from['signature'].to_json()
def test_create(self):
from acme.messages import RevocationRequest
self.assertEqual(self.msg, RevocationRequest.create(
certificate=CERT, key=KEY, sig_nonce=self.sig_nonce))
def test_verify(self):
self.assertTrue(self.msg.verify())
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg_to)
def test_from_json(self):
from acme.messages import RevocationRequest
self.assertEqual(self.msg, RevocationRequest.from_json(self.jmsg_from))
class StatusRequestTest(unittest.TestCase):
def setUp(self):
from acme.messages import StatusRequest
self.msg = StatusRequest(token=u'O7-s9MNq1siZHlgrMzi9_A')
self.jmsg = {
'type': 'statusRequest',
'token': u'O7-s9MNq1siZHlgrMzi9_A',
}
def test_to_partial_json(self):
self.assertEqual(self.msg.to_partial_json(), self.jmsg)
def test_from_json(self):
from acme.messages import StatusRequest
self.assertEqual(StatusRequest.from_json(self.jmsg), self.msg)
def test_from_json_hashable(self):
from acme.messages import Revocation
hash(Revocation.from_json(self.rev_now.to_json()))
if __name__ == '__main__':

View file

@ -1,21 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/authorization#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for an authorization message",
"type": "object",
"required": ["type"],
"properties": {
"type" : {
"enum" : [ "authorization" ]
},
"recoveryToken" : {
"type": "string"
},
"identifier" : {
"type": "string"
},
"jwk": {
"$ref": "file:acme/schemata/jwk.json"
}
}
}

View file

@ -1,38 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/authorizationRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for an authorizationRequest message",
"type": "object",
"required": ["type", "sessionID", "nonce", "signature", "responses"],
"properties": {
"type" : {
"enum" : [ "authorizationRequest" ]
},
"sessionID" : {
"type" : "string"
},
"nonce" : {
"type": "string"
},
"signature" : {
"$ref": "file:acme/schemata/signature.json"
},
"responses": {
"type": "array",
"minItems": 1,
"items": {
"anyOf": [
{ "$ref": "file:acme/schemata/responseobject.json" },
{ "type": "null" }
]
}
},
"contact": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
}
}
}

View file

@ -1,25 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/certificate#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a certificate message",
"type": "object",
"required": ["type", "certificate"],
"properties": {
"type" : {
"enum" : [ "certificate" ]
},
"certificate" : {
"type" : "string"
},
"chain" : {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
},
"refresh" : {
"type": "string"
}
}
}

View file

@ -1,19 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/certificateRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a certificateRequest message",
"type": "object",
"required": ["type", "csr", "signature"],
"properties": {
"type" : {
"enum" : [ "certificateRequest" ]
},
"csr" : {
"type" : "string" ,
"pattern": "^[-_=0-9A-Za-z]+$"
},
"signature" : {
"$ref": "file:acme/schemata/signature.json"
}
}
}

View file

@ -1,36 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/challenge#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a challenge message",
"type": "object",
"required": ["type", "sessionID", "nonce", "challenges"],
"properties": {
"type" : {
"enum" : [ "challenge" ]
},
"sessionID" : {
"type" : "string"
},
"nonce" : {
"type": "string"
},
"challenges": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "file:acme/schemata/challengeobject.json"
}
},
"combinations": {
"type": "array",
"minItems": 1,
"items": {
"type": "array",
"minItems": 1,
"items": {
"type": "integer"
}
}
}
}
}

View file

@ -1,15 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/challengeRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a challengeRequest message",
"type": "object",
"required": ["type", "identifier"],
"properties": {
"type" : {
"enum" : [ "challengeRequest" ]
},
"identifier" : {
"type": "string"
}
}
}

View file

@ -1,130 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/challengeobject#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Subschema for an individual challenge (within challenge)",
"anyOf": [
{ "type": "object",
"required": ["type", "token"],
"properties": {
"type": {
"enum" : [ "simpleHttp" ]
},
"token": {
"type": "string"
}
}
},
{ "type": "object",
"required": ["type", "r", "nonce"],
"properties": {
"type": {
"enum" : [ "dvsni" ]
},
"r": {
"type" : [ "string" ],
"pattern": "^[-_=0-9A-Za-z]+$"
},
"nonce": {
"type": "string",
"pattern": "^[0-9a-f]+$"
}
}
},
{ "type": "object",
"required": ["type"],
"properties": {
"type": {
"enum" : [ "recoveryContact" ]
},
"activationURL": {
"type" : "string"
},
"successURL": {
"type": "string"
},
"contact": {
"type": "string"
}
}
},
{ "type": "object",
"required": ["type"],
"properties": {
"type": {
"enum" : [ "recoveryToken" ]
}
}
},
{ "type": "object",
"required": ["type", "alg", "nonce", "hints"],
"properties": {
"type": {
"enum" : [ "proofOfPossession" ]
},
"alg": {
"type": "string"
},
"nonce": {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
},
"hints": {
"type": "object",
"properties": {
"jwk": {
"type": "object"
},
"certFingerprints": {
"type": "array",
"minItems": 1,
"items": {
"type": "string",
"pattern": "^[0-9a-f]+$"
}
},
"subjectKeyIdentifiers": {
"type": "array",
"minItems": 1,
"items": {
"type": "string",
"pattern": "^[0-9a-f]+$"
}
},
"serialNumbers": {
"type": "array",
"minItems": 1,
"items": {
"type": "integer"
}
},
"issuers": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
},
"authorizedFor": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
}
}
}
}
},
{ "type": "object",
"required": ["type", "token"],
"properties": {
"type": {
"enum" : [ "dns" ]
},
"token": {
"type": "string"
}
}
}
]
}

View file

@ -1,21 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/defer#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a defer message",
"type": "object",
"required": ["type", "token"],
"properties": {
"type" : {
"enum" : [ "defer" ]
},
"token" : {
"type": "string"
},
"interval" : {
"type": "integer"
},
"message": {
"type": "string"
}
}
}

View file

@ -1,21 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/error#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for an error message",
"type": "object",
"required": ["type", "error"],
"properties": {
"type" : {
"enum" : [ "error" ]
},
"error" : {
"enum" : [ "malformed", "unauthorized", "serverInternal", "nonSupported", "unknown", "badCSR" ]
},
"message" : {
"type": "string"
},
"moreInfo": {
"type": "string"
}
}
}

View file

@ -1,19 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/jwk#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a jwk (**kty RSA/e=65537 ONLY**)",
"type": "object",
"required": ["kty", "e", "n"],
"properties": {
"kty": {
"enum" : [ "RSA" ]
},
"e": {
"enum" : [ "AQAB" ]
},
"n": {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
}
}
}

View file

@ -1,75 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/responseobject#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Subschema for an individual challenge response (within authorizationRequest)",
"anyOf": [
{ "type": "object",
"required": ["type", "path"],
"properties": {
"type": {
"enum" : [ "simpleHttp" ]
},
"path": {
"type": "string"
}
}
},
{ "type": "object",
"required": ["type", "s"],
"properties": {
"type": {
"enum" : [ "dvsni" ]
},
"s": {
"type" : [ "string" ],
"pattern": "^[-_=0-9A-Za-z]+$"
}
}
},
{ "type": "object",
"required": ["type"],
"properties": {
"type": {
"enum" : [ "recoveryContact" ]
},
"token": {
"type" : "string"
}
}
},
{ "type": "object",
"required": ["type"],
"properties": {
"type": {
"enum" : [ "recoveryToken" ]
},
"token": {
"type" : "string"
}
}
},
{ "type": "object",
"required": ["type", "nonce", "signature"],
"properties": {
"type": {
"enum" : [ "proofOfPossession" ]
},
"nonce": {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
},
"signature": {
"$ref": "file:acme/schemata/signature.json"
}
}
},
{ "type": "object",
"required": ["type"],
"properties": {
"type": {
"enum" : [ "dns" ]
}
}
}
]
}

View file

@ -1,12 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/revocation#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a revocation message",
"type": "object",
"required": ["type"],
"properties": {
"type" : {
"enum" : [ "revocation" ]
}
}
}

View file

@ -1,18 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/revocationRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a revocationRequest message",
"type": "object",
"required": ["type", "certificate", "signature"],
"properties": {
"type" : {
"enum" : [ "revocationRequest" ]
},
"certificate" : {
"type" : "string"
},
"signature" : {
"$ref": "file:acme/schemata/signature.json"
}
}
}

View file

@ -1,71 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/signature#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a signature (alg RS256/e=65537 or P-256 ONLY)",
"type": "object",
"required": ["alg", "nonce", "sig", "jwk"],
"properties": {
"anyOf": [
{
"alg" : {
"enum" : [ "RS256" ]
},
"nonce" : {
"type" : "string"
},
"sig" : {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
},
"jwk": {
"type": "object",
"required": ["kty", "e", "n"],
"properties": {
"kty": {
"enum" : [ "RSA" ]
},
"e": {
"enum" : [ "AQAB" ]
},
"n": {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
}
}
}
},
{
"alg" : {
"enum" : [ "ES256" ]
},
"nonce" : {
"type" : "string"
},
"sig" : {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
},
"jwk": {
"type": "object",
"required": ["kty", "crv", "x", "y"],
"properties": {
"kty": {
"enum" : [ "EC" ]
},
"crv": {
"enum" : [ "P-256" ]
},
"x": {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
},
"y": {
"type": "string",
"pattern": "^[-_=0-9A-Za-z]+$"
}
}
}
}
]
}
}

View file

@ -1,15 +0,0 @@
{
"id": "https://letsencrypt.org/schema/01/statusRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a statusRequest message",
"type": "object",
"required": ["type", "token"],
"properties": {
"type" : {
"enum" : [ "statusRequest" ]
},
"token" : {
"type": "string"
}
}
}

View file

@ -1,5 +0,0 @@
:mod:`letsencrypt.network2`
---------------------------
.. automodule:: letsencrypt.network2
:members:

View file

@ -7,21 +7,19 @@
:members:
Client
------
.. automodule:: acme.client
:members:
Messages
--------
v00
~~~
.. automodule:: acme.messages
:members:
v02
~~~
.. automodule:: acme.messages2
:members:
Challenges
----------

45
examples/acme_client.py Normal file
View file

@ -0,0 +1,45 @@
"""Example script showing how to use acme client API."""
import logging
import os
import pkg_resources
import Crypto.PublicKey.RSA
import M2Crypto
from acme import client
from acme import messages
from acme import jose
logging.basicConfig(level=logging.DEBUG)
NEW_REG_URL = 'https://www.letsencrypt-demo.org/acme/new-reg'
BITS = 2048 # minimum for Boulder
DOMAIN = 'example1.com' # example.com is ignored by Boulder
key = jose.JWKRSA.load(
Crypto.PublicKey.RSA.generate(BITS).exportKey(format="PEM"))
acme = client.Client(NEW_REG_URL, key)
regr = acme.register(contact=())
logging.info('Auto-accepting TOS: %s', regr.terms_of_service)
acme.update_registration(regr.update(
body=regr.body.update(agreement=regr.terms_of_service)))
logging.debug(regr)
authzr = acme.request_challenges(
identifier=messages.Identifier(typ=messages.IDENTIFIER_FQDN, value=DOMAIN),
new_authzr_uri=regr.new_authzr_uri)
logging.debug(authzr)
authzr, authzr_response = acme.poll(authzr)
csr = M2Crypto.X509.load_request_string(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der')),
M2Crypto.X509.FORMAT_DER)
try:
acme.request_issuance(csr, (authzr,))
except messages.Error as error:
print ("This script is doomed to fail as no authorization "
"challenges are ever solved. Error from server: {0}".format(error))

View file

@ -1,42 +0,0 @@
import logging
import os
import pkg_resources
import M2Crypto
from acme import messages2
from acme import jose
from letsencrypt import network2
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
NEW_REG_URL = 'https://www.letsencrypt-demo.org/acme/new-reg'
key = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
net = network2.Network(NEW_REG_URL, key)
regr = net.register(contact=(
'mailto:cert-admin@example.com', 'tel:+12025551212'))
logging.info('Auto-accepting TOS: %s', regr.terms_of_service)
net.update_registration(regr.update(
body=regr.body.update(agreement=regr.terms_of_service)))
logging.debug(regr)
authzr = net.request_challenges(
identifier=messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value='example1.com'),
new_authzr_uri=regr.new_authzr_uri)
logging.debug(authzr)
authzr, authzr_response = net.poll(authzr)
csr = M2Crypto.X509.load_request_string(pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', 'csr.pem')))
try:
net.request_issuance(csr, (authzr,))
except messages2.Error as error:
print error.detail

View file

@ -6,7 +6,7 @@ import re
import configobj
import zope.component
from acme import messages2
from acme import messages
from letsencrypt import crypto_util
from letsencrypt import errors
@ -28,7 +28,7 @@ class Account(object):
:ivar str phone: Client's phone number
:ivar regr: Registration Resource
:type regr: :class:`~acme.messages2.RegistrationResource`
:type regr: :class:`~acme.messages.RegistrationResource`
"""
@ -141,11 +141,11 @@ class Account(object):
if "RegistrationResource" in acc_config:
acc_config_rr = acc_config["RegistrationResource"]
regr = messages2.RegistrationResource(
regr = messages.RegistrationResource(
uri=acc_config_rr["uri"],
new_authzr_uri=acc_config_rr["new_authzr_uri"],
terms_of_service=acc_config_rr["terms_of_service"],
body=messages2.Registration.from_json(acc_config_rr["body"]))
body=messages.Registration.from_json(acc_config_rr["body"]))
else:
regr = None

View file

@ -5,11 +5,11 @@ Please use names such as ``achall`` to distiguish from variables "of type"
and :class:`.ChallengeBody` (denoted by ``challb``)::
from acme import challenges
from acme import messages2
from acme import messages
from letsencrypt import achallenges
chall = challenges.DNS(token='foo')
challb = messages2.ChallengeBody(chall=chall)
challb = messages.ChallengeBody(chall=chall)
achall = achallenges.DNS(chall=challb, domain='example.com')
Note, that all annotated challenges act as a proxy objects::

View file

@ -4,7 +4,7 @@ import logging
import time
from acme import challenges
from acme import messages2
from acme import messages
from letsencrypt import achallenges
from letsencrypt import constants
@ -24,13 +24,13 @@ class AuthHandler(object):
:ivar network: Network object for sending and receiving authorization
messages
:type network: :class:`letsencrypt.network2.Network`
:type network: :class:`letsencrypt.network.Network`
:ivar account: Client's Account
:type account: :class:`letsencrypt.account.Account`
:ivar dict authzr: ACME Authorization Resource dict where keys are domains
and values are :class:`acme.messages2.AuthorizationResource`
and values are :class:`acme.messages.AuthorizationResource`
:ivar list dv_c: DV challenges in the form of
:class:`letsencrypt.achallenges.AnnotatedChallenge`
:ivar list cont_c: Continuity challenges in the
@ -82,7 +82,7 @@ class AuthHandler(object):
self.verify_authzr_complete()
# Only return valid authorizations
return [authzr for authzr in self.authzr.values()
if authzr.body.status == messages2.STATUS_VALID]
if authzr.body.status == messages.STATUS_VALID]
def _choose_challenges(self, domains):
"""Retrieve necessary challenges to satisfy server."""
@ -198,7 +198,7 @@ class AuthHandler(object):
failed = []
self.authzr[domain], _ = self.network.poll(self.authzr[domain])
if self.authzr[domain].body.status == messages2.STATUS_VALID:
if self.authzr[domain].body.status == messages.STATUS_VALID:
return achalls, []
# Note: if the whole authorization is invalid, the individual failed
@ -207,9 +207,9 @@ class AuthHandler(object):
status = self._get_chall_status(self.authzr[domain], achall)
# This does nothing for challenges that have yet to be decided yet.
if status == messages2.STATUS_VALID:
if status == messages.STATUS_VALID:
completed.append(achall)
elif status == messages2.STATUS_INVALID:
elif status == messages.STATUS_INVALID:
failed.append(achall)
return completed, failed
@ -221,7 +221,7 @@ class AuthHandler(object):
each challenge resource.
:param authzr: Authorization Resource
:type authzr: :class:`acme.messages2.AuthorizationResource`
:type authzr: :class:`acme.messages.AuthorizationResource`
:param achall: Annotated challenge for which to get status
:type achall: :class:`letsencrypt.achallenges.AnnotatedChallenge`
@ -279,8 +279,8 @@ class AuthHandler(object):
"""
for authzr in self.authzr.values():
if (authzr.body.status != messages2.STATUS_VALID and
authzr.body.status != messages2.STATUS_INVALID):
if (authzr.body.status != messages.STATUS_VALID and
authzr.body.status != messages.STATUS_INVALID):
raise errors.AuthorizationError("Incomplete authorizations")
def _challenge_factory(self, domain, path):
@ -321,7 +321,7 @@ def challb_to_achall(challb, key, domain):
"""Converts a ChallengeBody object to an AnnotatedChallenge.
:param challb: ChallengeBody
:type challb: :class:`acme.messages2.ChallengeBody`
:type challb: :class:`acme.messages.ChallengeBody`
:param key: Key
:type key: :class:`letsencrypt.le_util.Key`
@ -370,8 +370,8 @@ def gen_challenge_path(challbs, preferences, combinations):
.. todo:: This can be possibly be rewritten to use resolved_combinations.
:param tuple challbs: A tuple of challenges
(:class:`acme.messages2.Challenge`) from
:class:`acme.messages2.AuthorizationResource` to be
(:class:`acme.messages.Challenge`) from
:class:`acme.messages.AuthorizationResource` to be
fulfilled by the client in order to prove possession of the
identifier.

View file

@ -16,7 +16,7 @@ from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt import network2
from letsencrypt import network
from letsencrypt import reverter
from letsencrypt import revoker
from letsencrypt import storage
@ -29,7 +29,7 @@ class Client(object):
"""ACME protocol client.
:ivar network: Network object for sending and receiving messages
:type network: :class:`letsencrypt.network2.Network`
:type network: :class:`letsencrypt.network.Network`
:ivar account: Account object used for registration
:type account: :class:`letsencrypt.account.Account`
@ -62,7 +62,7 @@ class Client(object):
self.installer = installer
# TODO: Allow for other alg types besides RS256
self.network = network2.Network(
self.network = network.Network(
config.server, jwk.JWKRSA.load(self.account.key.pem),
verify_ssl=(not config.no_verify_ssl))

View file

@ -5,14 +5,6 @@ class LetsEncryptClientError(Exception):
"""Generic Let's Encrypt client error."""
class NetworkError(LetsEncryptClientError):
"""Network error."""
class UnexpectedUpdate(NetworkError):
"""Unexpected update."""
class LetsEncryptReverterError(LetsEncryptClientError):
"""Let's Encrypt Reverter error."""

View file

@ -1,121 +1,26 @@
"""Network Module."""
import logging
import sys
import time
import requests
from acme import jose
from acme import messages
from letsencrypt import errors
"""Networking for ACME protocol."""
from acme import client
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
class Network(client.Client):
"""ACME networking."""
logging.getLogger("requests").setLevel(logging.WARNING)
def register_from_account(self, account):
"""Register with server.
.. todo:: this should probably not be a part of network...
class Network(object):
"""Class for communicating with ACME servers.
:param account: Account
:type account: :class:`letsencrypt.account.Account`
:ivar str server_url: Full URL of the ACME service
"""
def __init__(self, server):
"""Initialize Network instance.
:param str server: ACME (CA) server[:port]
:returns: Updated account
:rtype: :class:`letsencrypt.account.Account`
"""
self.server_url = "https://%s/acme/" % server
def send(self, msg):
"""Send ACME message to server.
:param msg: ACME message.
:type msg: :class:`acme.messages.Message`
:returns: Server response message.
:rtype: :class:`acme.messages.Message`
:raises acme.errors.ValidationError: if `msg` is not
valid serializable ACME JSON message.
:raises errors.LetsEncryptClientError: in case of connection error
or if response from server is not a valid ACME message.
"""
try:
response = requests.post(
self.server_url,
data=msg.json_dumps(),
headers={"Content-Type": "application/json"},
verify=True
)
except requests.exceptions.RequestException as error:
raise errors.LetsEncryptClientError(
'Sending ACME message to server has failed: %s' % error)
json_string = response.json()
try:
return messages.Message.from_json(json_string)
except jose.DeserializationError as error:
logging.error(json_string)
raise # TODO
def send_and_receive_expected(self, msg, expected):
"""Send ACME message to server and return expected message.
:param msg: ACME message.
:type msg: :class:`acme.Message`
:returns: ACME response message of expected type.
:rtype: :class:`acme.messages.Message`
:raises errors.LetsEncryptClientError: An exception is thrown
"""
response = self.send(msg)
return self.is_expected_msg(response, expected)
def is_expected_msg(self, response, expected, delay=3, rounds=20):
"""Is response expected ACME message?
:param response: ACME response message from server.
:type response: :class:`acme.messages.Message`
:param expected: Expected response type.
:type expected: subclass of :class:`acme.messages.Message`
:param int delay: Number of seconds to delay before next round
in case of ACME "defer" response message.
:param int rounds: Number of resend attempts in case of ACME "defer"
response message.
:returns: ACME response message from server.
:rtype: :class:`acme.messages.Message`
:raises LetsEncryptClientError: if server sent ACME "error" message
"""
for _ in xrange(rounds):
if isinstance(response, expected):
return response
elif isinstance(response, messages.Error):
logging.error("%s", response)
raise errors.LetsEncryptClientError(response.error)
elif isinstance(response, messages.Defer):
logging.info("Waiting for %d seconds...", delay)
time.sleep(delay)
response = self.send(
messages.StatusRequest(token=response.token))
else:
logging.fatal("Received unexpected message")
logging.fatal("Expected: %s", expected)
logging.fatal("Received: %s", response)
sys.exit(33)
logging.error(
"Server has deferred past the max of %d seconds", rounds * delay)
details = (
"mailto:" + account.email if account.email is not None else None,
"tel:" + account.phone if account.phone is not None else None,
)
account.regr = self.register(contact=tuple(
det for det in details if det is not None))
return account

View file

@ -16,7 +16,6 @@ import tempfile
import Crypto.PublicKey.RSA
import M2Crypto
from acme import messages
from acme.jose import util as jose_util
from letsencrypt import errors
@ -45,7 +44,9 @@ class Revoker(object):
"""
def __init__(self, installer, config, no_confirm=False):
self.network = network.Network(config.server)
# XXX
self.network = network.Network(new_reg_uri=None, key=None, alg=None)
self.installer = installer
self.config = config
self.no_confirm = no_confirm
@ -238,6 +239,8 @@ class Revoker(object):
:returns: TODO
"""
# XXX | pylint: disable=unused-variable
# These will both have to change in the future away from M2Crypto
# pylint: disable=protected-access
certificate = jose_util.ComparableX509(cert._cert)
@ -250,10 +253,7 @@ class Revoker(object):
raise errors.LetsEncryptRevokerError(
"Corrupted backup key file: %s" % cert.backup_key_path)
# TODO: Catch error associated with already revoked and proceed.
return self.network.send_and_receive_expected(
messages.RevocationRequest.create(certificate=certificate, key=key),
messages.Revocation)
return self.network.revoke(certr=None) # XXX
def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use
"""Remove certificate and key.

View file

@ -7,7 +7,7 @@ import shutil
import tempfile
import unittest
from acme import messages2
from acme import messages
from letsencrypt import configuration
from letsencrypt import errors
@ -40,11 +40,11 @@ class AccountTest(unittest.TestCase):
self.key = le_util.Key(key_file, key_pem)
self.email = "client@letsencrypt.org"
self.regr = messages2.RegistrationResource(
self.regr = messages.RegistrationResource(
uri="uri",
new_authzr_uri="new_authzr_uri",
terms_of_service="terms_of_service",
body=messages2.Registration(
body=messages.Registration(
recovery_token="recovery_token", agreement="agreement")
)

View file

@ -8,7 +8,7 @@ import Crypto.PublicKey.RSA
from acme import challenges
from acme import jose
from acme import messages2
from acme import messages
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
@ -78,19 +78,19 @@ def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
"status": status,
}
if status == messages2.STATUS_VALID:
if status == messages.STATUS_VALID:
kwargs.update({"validated": datetime.datetime.now()})
return messages2.ChallengeBody(**kwargs) # pylint: disable=star-args
return messages.ChallengeBody(**kwargs) # pylint: disable=star-args
# Pending ChallengeBody objects
DVSNI_P = chall_to_challb(DVSNI, messages2.STATUS_PENDING)
SIMPLE_HTTP_P = chall_to_challb(SIMPLE_HTTP, messages2.STATUS_PENDING)
DNS_P = chall_to_challb(DNS, messages2.STATUS_PENDING)
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages2.STATUS_PENDING)
RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, messages2.STATUS_PENDING)
POP_P = chall_to_challb(POP, messages2.STATUS_PENDING)
DVSNI_P = chall_to_challb(DVSNI, messages.STATUS_PENDING)
SIMPLE_HTTP_P = chall_to_challb(SIMPLE_HTTP, messages.STATUS_PENDING)
DNS_P = chall_to_challb(DNS, messages.STATUS_PENDING)
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages.STATUS_PENDING)
RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, messages.STATUS_PENDING)
POP_P = chall_to_challb(POP, messages.STATUS_PENDING)
CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P,
RECOVERY_CONTACT_P, RECOVERY_TOKEN_P, POP_P]
@ -106,7 +106,7 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
"""Generate an authorization resource.
:param authz_status: Status object
:type authz_status: :class:`acme.messages2.Status`
:type authz_status: :class:`acme.messages.Status`
:param list challs: Challenge objects
:param list statuses: status of each challenge object
:param bool combos: Whether or not to add combinations
@ -118,13 +118,13 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
for chall, status in itertools.izip(challs, statuses)
)
authz_kwargs = {
"identifier": messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value=domain),
"identifier": messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value=domain),
"challenges": challbs,
}
if combos:
authz_kwargs.update({"combinations": gen_combos(challbs)})
if authz_status == messages2.STATUS_VALID:
if authz_status == messages.STATUS_VALID:
authz_kwargs.update({
"status": authz_status,
"expires": datetime.datetime.now() + datetime.timedelta(days=31),
@ -135,8 +135,8 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
})
# pylint: disable=star-args
return messages2.AuthorizationResource(
return messages.AuthorizationResource(
uri="https://trusted.ca/new-authz-resource",
new_cert_uri="https://trusted.ca/new-cert",
body=messages2.Authorization(**authz_kwargs)
body=messages.Authorization(**authz_kwargs)
)

View file

@ -6,11 +6,11 @@ import unittest
import mock
from acme import challenges
from acme import messages2
from acme import messages
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt import network2
from letsencrypt import network
from letsencrypt.tests import acme_util
@ -37,8 +37,8 @@ class ChallengeFactoryTest(unittest.TestCase):
self.dom = "test"
self.handler.authzr[self.dom] = acme_util.gen_authzr(
messages2.STATUS_PENDING, self.dom, acme_util.CHALLENGES,
[messages2.STATUS_PENDING]*6, False)
messages.STATUS_PENDING, self.dom, acme_util.CHALLENGES,
[messages.STATUS_PENDING]*6, False)
def test_all(self):
cont_c, dv_c = self.handler._challenge_factory(self.dom, range(0, 6))
@ -57,9 +57,9 @@ class ChallengeFactoryTest(unittest.TestCase):
def test_unrecognized(self):
self.handler.authzr["failure.com"] = acme_util.gen_authzr(
messages2.STATUS_PENDING, "failure.com",
messages.STATUS_PENDING, "failure.com",
[mock.Mock(chall="chall", typ="unrecognized")],
[messages2.STATUS_PENDING])
[messages.STATUS_PENDING])
self.assertRaises(errors.LetsEncryptClientError,
self.handler._challenge_factory, "failure.com", [0])
@ -86,7 +86,7 @@ class GetAuthorizationsTest(unittest.TestCase):
self.mock_dv_auth.perform.side_effect = gen_auth_resp
self.mock_account = mock.Mock(key=le_util.Key("file_path", "PEM"))
self.mock_net = mock.MagicMock(spec=network2.Network)
self.mock_net = mock.MagicMock(spec=network.Network)
self.handler = AuthHandler(
self.mock_dv_auth, self.mock_cont_auth,
@ -160,10 +160,10 @@ class GetAuthorizationsTest(unittest.TestCase):
for dom in self.handler.authzr.keys():
azr = self.handler.authzr[dom]
self.handler.authzr[dom] = acme_util.gen_authzr(
messages2.STATUS_VALID,
messages.STATUS_VALID,
dom,
[challb.chall for challb in azr.body.challenges],
[messages2.STATUS_VALID]*len(azr.body.challenges),
[messages.STATUS_VALID]*len(azr.body.challenges),
azr.body.combinations)
@ -182,16 +182,16 @@ class PollChallengesTest(unittest.TestCase):
self.doms = ["0", "1", "2"]
self.handler.authzr[self.doms[0]] = acme_util.gen_authzr(
messages2.STATUS_PENDING, self.doms[0],
acme_util.DV_CHALLENGES, [messages2.STATUS_PENDING]*3, False)
messages.STATUS_PENDING, self.doms[0],
acme_util.DV_CHALLENGES, [messages.STATUS_PENDING]*3, False)
self.handler.authzr[self.doms[1]] = acme_util.gen_authzr(
messages2.STATUS_PENDING, self.doms[1],
acme_util.DV_CHALLENGES, [messages2.STATUS_PENDING]*3, False)
messages.STATUS_PENDING, self.doms[1],
acme_util.DV_CHALLENGES, [messages.STATUS_PENDING]*3, False)
self.handler.authzr[self.doms[2]] = acme_util.gen_authzr(
messages2.STATUS_PENDING, self.doms[2],
acme_util.DV_CHALLENGES, [messages2.STATUS_PENDING]*3, False)
messages.STATUS_PENDING, self.doms[2],
acme_util.DV_CHALLENGES, [messages.STATUS_PENDING]*3, False)
self.chall_update = {}
for dom in self.doms:
@ -205,7 +205,7 @@ class PollChallengesTest(unittest.TestCase):
self.handler._poll_challenges(self.chall_update, False)
for authzr in self.handler.authzr.values():
self.assertEqual(authzr.body.status, messages2.STATUS_VALID)
self.assertEqual(authzr.body.status, messages.STATUS_VALID)
@mock.patch("letsencrypt.auth_handler.time")
def test_poll_challenges_failure_best_effort(self, unused_mock_time):
@ -213,7 +213,7 @@ class PollChallengesTest(unittest.TestCase):
self.handler._poll_challenges(self.chall_update, True)
for authzr in self.handler.authzr.values():
self.assertEqual(authzr.body.status, messages2.STATUS_PENDING)
self.assertEqual(authzr.body.status, messages.STATUS_PENDING)
@mock.patch("letsencrypt.auth_handler.time")
def test_poll_challenges_failure(self, unused_mock_time):
@ -241,10 +241,10 @@ class PollChallengesTest(unittest.TestCase):
# Basically it didn't raise an error and it stopped earlier than
# Making all challenges invalid which would make mock_poll_solve_one
# change authzr to invalid
return self._mock_poll_solve_one_chall(authzr, messages2.STATUS_VALID)
return self._mock_poll_solve_one_chall(authzr, messages.STATUS_VALID)
def _mock_poll_solve_one_invalid(self, authzr):
return self._mock_poll_solve_one_chall(authzr, messages2.STATUS_INVALID)
return self._mock_poll_solve_one_chall(authzr, messages.STATUS_INVALID)
def _mock_poll_solve_one_chall(self, authzr, desired_status):
# pylint: disable=no-self-use
@ -269,10 +269,10 @@ class PollChallengesTest(unittest.TestCase):
else:
status_ = authzr.body.status
new_authzr = messages2.AuthorizationResource(
new_authzr = messages.AuthorizationResource(
uri=authzr.uri,
new_cert_uri=authzr.new_cert_uri,
body=messages2.Authorization(
body=messages.Authorization(
identifier=authzr.body.identifier,
challenges=new_challbs,
combinations=authzr.body.combinations,
@ -429,8 +429,8 @@ def gen_auth_resp(chall_list):
def gen_dom_authzr(domain, unused_new_authzr_uri, challs):
"""Generates new authzr for domains."""
return acme_util.gen_authzr(
messages2.STATUS_PENDING, domain, challs,
[messages2.STATUS_PENDING]*len(challs))
messages.STATUS_PENDING, domain, challs,
[messages.STATUS_PENDING]*len(challs))
if __name__ == "__main__":

View file

@ -26,14 +26,14 @@ class ClientTest(unittest.TestCase):
self.account = mock.MagicMock(**{"key.pem": KEY})
from letsencrypt.client import Client
with mock.patch("letsencrypt.client.network2") as network2:
with mock.patch("letsencrypt.client.network") as network:
self.client = Client(
config=self.config, account_=self.account, dv_auth=None,
installer=None)
self.network2 = network2
self.network = network
def test_init_network_verify_ssl(self):
self.network2.Network.assert_called_once_with(
self.network.Network.assert_called_once_with(
mock.ANY, mock.ANY, verify_ssl=True)
@mock.patch("letsencrypt.client.zope.component.getUtility")

View file

@ -0,0 +1,50 @@
"""Tests for letsencrypt.network."""
import shutil
import tempfile
import unittest
import mock
from letsencrypt import account
class NetworkTest(unittest.TestCase):
"""Tests for letsencrypt.network.Network."""
def setUp(self):
from letsencrypt.network import Network
self.net = Network(
new_reg_uri=None, key=None, alg=None, verify_ssl=None)
self.config = mock.Mock(accounts_dir=tempfile.mkdtemp())
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
def tearDown(self):
shutil.rmtree(self.config.accounts_dir)
def test_register_from_account(self):
self.net.register = mock.Mock()
acc = account.Account(
self.config, 'key', email='cert-admin@example.com',
phone='+12025551212')
self.net.register_from_account(acc)
self.net.register.assert_called_with(contact=self.contact)
def test_register_from_account_partial_info(self):
self.net.register = mock.Mock()
acc = account.Account(
self.config, 'key', email='cert-admin@example.com')
acc2 = account.Account(self.config, 'key')
self.net.register_from_account(acc)
self.net.register.assert_called_with(
contact=('mailto:cert-admin@example.com',))
self.net.register_from_account(acc2)
self.net.register.assert_called_with(contact=())
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -8,7 +8,7 @@ import mock
from acme import challenges
from acme import jose
from acme import messages2
from acme import messages
from letsencrypt import achallenges
from letsencrypt import proof_of_possession
@ -48,8 +48,8 @@ class ProofOfPossessionTest(unittest.TestCase):
issuers=(), authorized_for=())
chall = challenges.ProofOfPossession(
alg=jose.RS256, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
challb = messages2.ChallengeBody(
chall=chall, uri="http://example", status=messages2.STATUS_PENDING)
challb = messages.ChallengeBody(
chall=chall, uri="http://example", status=messages.STATUS_PENDING)
self.achall = achallenges.ProofOfPossession(
challb=challb, domain="example.com")
@ -60,8 +60,8 @@ class ProofOfPossessionTest(unittest.TestCase):
issuers=(), authorized_for=())
chall = challenges.ProofOfPossession(
alg=jose.HS512, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
challb = messages2.ChallengeBody(
chall=chall, uri="http://example", status=messages2.STATUS_PENDING)
challb = messages.ChallengeBody(
chall=chall, uri="http://example", status=messages.STATUS_PENDING)
self.achall = achallenges.ProofOfPossession(
challb=challb, domain="example.com")
self.assertEqual(self.proof_of_pos.perform(self.achall), None)

View file

@ -63,7 +63,7 @@ class RevokerTest(RevokerBase):
def tearDown(self):
shutil.rmtree(self.backup_dir)
@mock.patch("letsencrypt.revoker.network.Network.send_and_receive_expected")
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_key_all(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
@ -89,7 +89,7 @@ class RevokerTest(RevokerBase):
self.revoker.revoke_from_key,
self.key)
@mock.patch("letsencrypt.revoker.network.Network.send_and_receive_expected")
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_wrong_key(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
@ -105,7 +105,7 @@ class RevokerTest(RevokerBase):
# No revocation went through
self.assertEqual(mock_net.call_count, 0)
@mock.patch("letsencrypt.revoker.network.Network.send_and_receive_expected")
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_cert(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
@ -122,7 +122,7 @@ class RevokerTest(RevokerBase):
self.assertEqual(mock_net.call_count, 1)
@mock.patch("letsencrypt.revoker.network.Network.send_and_receive_expected")
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_cert_not_found(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
@ -141,7 +141,7 @@ class RevokerTest(RevokerBase):
self.assertEqual(mock_net.call_count, 1)
@mock.patch("letsencrypt.revoker.network.Network.send_and_receive_expected")
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_menu(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
@ -165,7 +165,7 @@ class RevokerTest(RevokerBase):
self.assertEqual(mock_display.more_info_cert.call_count, 1)
@mock.patch("letsencrypt.revoker.logging")
@mock.patch("letsencrypt.revoker.network.Network.send_and_receive_expected")
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_menu_delete_all(self, mock_display, mock_net, mock_log):
mock_display().confirm_revocation.return_value = True

View file

@ -5,7 +5,7 @@ import unittest
import mock
from acme import challenges
from acme import messages2
from acme import messages
from letsencrypt import achallenges
from letsencrypt import errors
@ -165,20 +165,20 @@ class NginxConfiguratorTest(util.NginxTest):
# Note: As more challenges are offered this will have to be expanded
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
achall1 = achallenges.DVSNI(
challb=messages2.ChallengeBody(
challb=messages.ChallengeBody(
chall=challenges.DVSNI(
r="foo",
nonce="bar"),
uri="https://ca.org/chall0_uri",
status=messages2.Status("pending"),
status=messages.Status("pending"),
), domain="localhost", key=auth_key)
achall2 = achallenges.DVSNI(
challb=messages2.ChallengeBody(
challb=messages.ChallengeBody(
chall=challenges.DVSNI(
r="abc",
nonce="def"),
uri="https://ca.org/chall1_uri",
status=messages2.Status("pending"),
status=messages.Status("pending"),
), domain="example.com", key=auth_key)
dvsni_ret_val = [

View file

@ -32,7 +32,6 @@ install_requires = [
'argparse',
'ConfigArgParse',
'configobj',
'jsonschema<2.5.1', # https://github.com/Julian/jsonschema/issues/233
'mock',
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'parsedatetime',