diff --git a/acme/challenges_test.py b/acme/challenges_test.py index 4c61c0e3d..f0b025ad3 100644 --- a/acme/challenges_test.py +++ b/acme/challenges_test.py @@ -18,6 +18,13 @@ KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey( 'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))) +class ChallengeResponseTest(unittest.TestCase): + + def test_from_json_none(self): + from acme.challenges import ChallengeResponse + self.assertTrue(ChallengeResponse.from_json(None) is None) + + class SimpleHTTPTest(unittest.TestCase): def setUp(self): diff --git a/letsencrypt/network2.py b/acme/client.py similarity index 84% rename from letsencrypt/network2.py rename to acme/client.py index a20194a79..629048d03 100644 --- a/letsencrypt/network2.py +++ b/acme/client.py @@ -1,4 +1,4 @@ -"""Networking for ACME protocol v02.""" +"""ACME client API.""" import datetime import heapq import httplib @@ -9,19 +9,18 @@ import M2Crypto import requests import werkzeug +from acme import errors from acme import jose -from acme import jws as acme_jws -from acme import messages2 - -from letsencrypt import errors +from acme import jws +from acme import messages # https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3() -class Network(object): - """ACME networking. +class Client(object): # pylint: disable=too-many-instance-attributes + """ACME client. .. todo:: Clean up raised error types hierarchy, document, and handle (wrap) @@ -33,8 +32,6 @@ class Network(object): :ivar bool verify_ssl: Verify SSL certificates? """ - - # TODO: Move below to acme module? DER_CONTENT_TYPE = 'application/pkix-cert' JSON_CONTENT_TYPE = 'application/json' JSON_ERROR_CONTENT_TYPE = 'application/problem+json' @@ -58,7 +55,7 @@ class Network(object): """ dumps = obj.json_dumps() logging.debug('Serialized JSON: %s', dumps) - return acme_jws.JWS.sign( + return jws.JWS.sign( payload=dumps, key=self.key, alg=self.alg, nonce=nonce).json_dumps() @classmethod @@ -75,10 +72,9 @@ class Network(object): function will raise an error. Otherwise, wrong Content-Type is ignored, but logged. - :raises letsencrypt.messages2.Error: If server response body + :raises .messages.Error: If server response body carries HTTP Problem (draft-ietf-appsawg-http-problem-00). - :raises letsencrypt.errors.NetworkError: In case of other - networking errors. + :raises .ClientError: In case of other networking errors. """ response_ct = response.headers.get('Content-Type') @@ -98,13 +94,13 @@ class Network(object): try: logging.error("Error: %s", jobj) logging.error("Response from server: %s", response.content) - raise messages2.Error.from_json(jobj) + raise messages.Error.from_json(jobj) except jose.DeserializationError as error: # Couldn't deserialize JSON object - raise errors.NetworkError((response, error)) + raise errors.ClientError((response, error)) else: # response is not JSON object - raise errors.NetworkError(response) + raise errors.ClientError(response) else: if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE: logging.debug( @@ -112,13 +108,13 @@ class Network(object): 'response', response_ct) if content_type == cls.JSON_CONTENT_TYPE and jobj is None: - raise errors.NetworkError( + raise errors.ClientError( 'Unexpected response Content-Type: {0}'.format(response_ct)) def _get(self, uri, content_type=JSON_CONTENT_TYPE, **kwargs): """Send GET request. - :raises letsencrypt.errors.NetworkError: + :raises .ClientError: :returns: HTTP Response :rtype: `requests.Response` @@ -129,22 +125,22 @@ class Network(object): try: response = requests.get(uri, **kwargs) except requests.exceptions.RequestException as error: - raise errors.NetworkError(error) + raise errors.ClientError(error) self._check_response(response, content_type=content_type) return response def _add_nonce(self, response): if self.REPLAY_NONCE_HEADER in response.headers: nonce = response.headers[self.REPLAY_NONCE_HEADER] - error = acme_jws.Header.validate_nonce(nonce) + error = jws.Header.validate_nonce(nonce) if error is None: logging.debug('Storing nonce: %r', nonce) self._nonces.add(nonce) else: - raise errors.NetworkError('Invalid nonce ({0}): {1}'.format( + raise errors.ClientError('Invalid nonce ({0}): {1}'.format( nonce, error)) else: - raise errors.NetworkError( + raise errors.ClientError( 'Server {0} response did not include a replay nonce'.format( response.request.method)) @@ -160,7 +156,7 @@ class Network(object): :param JSONDeSerializable obj: Will be wrapped in JWS. :param str content_type: Expected ``Content-Type``, fails if not set. - :raises acme.messages2.NetworkError: + :raises acme.messages.ClientError: :returns: HTTP Response :rtype: `requests.Response` @@ -172,7 +168,7 @@ class Network(object): try: response = requests.post(uri, data=data, **kwargs) except requests.exceptions.RequestException as error: - raise errors.NetworkError(error) + raise errors.ClientError(error) logging.debug('Received response %s: %r', response, response.text) self._add_nonce(response) @@ -190,15 +186,15 @@ class Network(object): try: new_authzr_uri = response.links['next']['url'] except KeyError: - raise errors.NetworkError('"next" link missing') + raise errors.ClientError('"next" link missing') - return messages2.RegistrationResource( - body=messages2.Registration.from_json(response.json()), + return messages.RegistrationResource( + body=messages.Registration.from_json(response.json()), uri=response.headers.get('Location', uri), new_authzr_uri=new_authzr_uri, terms_of_service=terms_of_service) - def register(self, contact=messages2.Registration._fields[ + def register(self, contact=messages.Registration._fields[ 'contact'].default): """Register. @@ -208,10 +204,10 @@ class Network(object): :returns: Registration Resource. :rtype: `.RegistrationResource` - :raises letsencrypt.errors.UnexpectedUpdate: + :raises .UnexpectedUpdate: """ - new_reg = messages2.Registration(contact=contact) + new_reg = messages.Registration(contact=contact) response = self._post(self.new_reg_uri, new_reg) assert response.status_code == httplib.CREATED # TODO: handle errors @@ -222,24 +218,6 @@ class Network(object): return regr - def register_from_account(self, account): - """Register with server. - - :param account: Account - :type account: :class:`letsencrypt.account.Account` - - :returns: Updated account - :rtype: :class:`letsencrypt.account.Account` - - """ - details = ( - "mailto:" + account.email if account.email is not None else None, - "tel:" + account.phone if account.phone is not None else None, - ) - account.regr = self.register(contact=tuple( - det for det in details if det is not None)) - return account - def update_registration(self, regr): """Update registration. @@ -257,12 +235,12 @@ class Network(object): # TODO: Boulder does not set Location or Link on update # (c.f. acme-spec #94) + updated_regr = self._regr_from_response( response, uri=regr.uri, new_authzr_uri=regr.new_authzr_uri, terms_of_service=regr.terms_of_service) if updated_regr != regr: raise errors.UnexpectedUpdate(regr) - return updated_regr def agree_to_tos(self, regr): @@ -287,10 +265,10 @@ class Network(object): try: new_cert_uri = response.links['next']['url'] except KeyError: - raise errors.NetworkError('"next" link missing') + raise errors.ClientError('"next" link missing') - authzr = messages2.AuthorizationResource( - body=messages2.Authorization.from_json(response.json()), + authzr = messages.AuthorizationResource( + body=messages.Authorization.from_json(response.json()), uri=response.headers.get('Location', uri), new_cert_uri=new_cert_uri) if authzr.body.identifier != identifier: @@ -301,7 +279,7 @@ class Network(object): """Request challenges. :param identifier: Identifier to be challenged. - :type identifier: `.messages2.Identifier` + :type identifier: `.messages.Identifier` :param str new_authzr_uri: new-authorization URI @@ -309,7 +287,7 @@ class Network(object): :rtype: `.AuthorizationResource` """ - new_authz = messages2.Authorization(identifier=identifier) + new_authz = messages.Authorization(identifier=identifier) response = self._post(new_authzr_uri, new_authz) assert response.status_code == httplib.CREATED # TODO: handle errors return self._authzr_from_response(response, identifier) @@ -328,8 +306,8 @@ class Network(object): :rtype: `.AuthorizationResource` """ - return self.request_challenges(messages2.Identifier( - typ=messages2.IDENTIFIER_FQDN, value=domain), new_authz_uri) + return self.request_challenges(messages.Identifier( + typ=messages.IDENTIFIER_FQDN, value=domain), new_authz_uri) def answer_challenge(self, challb, response): """Answer challenge. @@ -350,10 +328,10 @@ class Network(object): try: authzr_uri = response.links['up']['url'] except KeyError: - raise errors.NetworkError('"up" Link header missing') - challr = messages2.ChallengeResource( + raise errors.ClientError('"up" Link header missing') + challr = messages.ChallengeResource( authzr_uri=authzr_uri, - body=messages2.ChallengeBody.from_json(response.json())) + body=messages.ChallengeBody.from_json(response.json())) # TODO: check that challr.uri == response.headers['Location']? if challr.uri != challb.uri: raise errors.UnexpectedUpdate(challr.uri) @@ -412,14 +390,14 @@ class Network(object): :param authzrs: `list` of `.AuthorizationResource` :returns: Issued certificate - :rtype: `.messages2.CertificateResource` + :rtype: `.messages.CertificateResource` """ assert authzrs, "Authorizations list is empty" logging.debug("Requesting issuance...") # TODO: assert len(authzrs) == number of SANs - req = messages2.CertificateRequest( + req = messages.CertificateRequest( csr=csr, authorizations=tuple(authzr.uri for authzr in authzrs)) content_type = self.DER_CONTENT_TYPE # TODO: add 'cert_type 'argument @@ -434,9 +412,9 @@ class Network(object): try: uri = response.headers['Location'] except KeyError: - raise errors.NetworkError('"Location" Header missing') + raise errors.ClientError('"Location" Header missing') - return messages2.CertificateResource( + return messages.CertificateResource( uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri, body=jose.ComparableX509( M2Crypto.X509.load_cert_der_string(response.content))) @@ -459,7 +437,7 @@ class Network(object): ``Retry-After`` is not present in the response. :returns: ``(cert, updated_authzrs)`` `tuple` where ``cert`` is - the issued certificate (`.messages2.CertificateResource.), + the issued certificate (`.messages.CertificateResource.), and ``updated_authzrs`` is a `tuple` consisting of updated Authorization Resources (`.AuthorizationResource`) as present in the responses from server, and in the same order @@ -488,7 +466,7 @@ class Network(object): updated_authzr, response = self.poll(updated[authzr]) updated[authzr] = updated_authzr - if updated_authzr.body.status != messages2.STATUS_VALID: + if updated_authzr.body.status != messages.STATUS_VALID: # push back to the priority queue, with updated retry_after heapq.heappush(waiting, (self.retry_after( response, default=mintime), authzr)) @@ -526,7 +504,7 @@ class Network(object): # "refresh cert", and this method integrated with self.refresh response, cert = self._get_cert(certr.uri) if 'Location' not in response.headers: - raise errors.NetworkError('Location header missing') + raise errors.ClientError('Location header missing') if response.headers['Location'] != certr.uri: raise errors.UnexpectedUpdate(response.text) return certr.update(body=cert) @@ -561,22 +539,21 @@ class Network(object): else: return None - def revoke(self, certr, when=messages2.Revocation.NOW): + def revoke(self, certr, when=messages.Revocation.NOW): """Revoke certificate. :param certr: Certificate Resource :type certr: `.CertificateResource` :param when: When should the revocation take place? Takes - the same values as `.messages2.Revocation.revoke`. + the same values as `.messages.Revocation.revoke`. - :raises letsencrypt.errors.NetworkError: If revocation is - unsuccessful. + :raises .ClientError: If revocation is unsuccessful. """ - rev = messages2.Revocation(revoke=when, authorizations=tuple( + rev = messages.Revocation(revoke=when, authorizations=tuple( authzr.uri for authzr in certr.authzrs)) response = self._post(certr.uri, rev) if response.status_code != httplib.OK: - raise errors.NetworkError( + raise errors.ClientError( 'Successful revocation must return HTTP OK status') diff --git a/letsencrypt/tests/network2_test.py b/acme/client_test.py similarity index 82% rename from letsencrypt/tests/network2_test.py rename to acme/client_test.py index 3f745ffa7..5e4cc1720 100644 --- a/letsencrypt/tests/network2_test.py +++ b/acme/client_test.py @@ -1,10 +1,8 @@ -"""Tests for letsencrypt.network2.""" +"""Tests for acme.client.""" import datetime import httplib import os import pkg_resources -import shutil -import tempfile import unittest import M2Crypto @@ -12,31 +10,28 @@ import mock import requests from acme import challenges +from acme import errors from acme import jose from acme import jws as acme_jws -from acme import messages2 - -from letsencrypt import account -from letsencrypt import errors +from acme import messages CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string( pkg_resources.resource_string( - __name__, os.path.join('testdata', 'cert.pem')))) -CERT2 = jose.ComparableX509(M2Crypto.X509.load_cert_string( - pkg_resources.resource_string( - __name__, os.path.join('testdata', 'cert-san.pem')))) + 'acme.jose', os.path.join('testdata', 'cert.der')), + M2Crypto.X509.FORMAT_DER)) CSR = jose.ComparableX509(M2Crypto.X509.load_request_string( pkg_resources.resource_string( - __name__, os.path.join('testdata', 'csr.pem')))) + 'acme.jose', os.path.join('testdata', 'csr.der')), + M2Crypto.X509.FORMAT_DER)) KEY = jose.JWKRSA.load(pkg_resources.resource_string( 'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))) KEY2 = jose.JWKRSA.load(pkg_resources.resource_string( 'acme.jose', os.path.join('testdata', 'rsa256_key.pem'))) -class NetworkTest(unittest.TestCase): - """Tests for letsencrypt.network2.Network.""" +class ClientTest(unittest.TestCase): + """Tests for acme.client.Client.""" # pylint: disable=too-many-instance-attributes,too-many-public-methods @@ -44,8 +39,8 @@ class NetworkTest(unittest.TestCase): self.verify_ssl = mock.MagicMock() self.wrap_in_jws = mock.MagicMock(return_value=mock.sentinel.wrapped) - from letsencrypt.network2 import Network - self.net = Network( + from acme.client import Client + self.net = Client( new_reg_uri='https://www.letsencrypt-demo.org/acme/new-reg', key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl) self.nonce = jose.b64encode('Nonce') @@ -58,44 +53,39 @@ class NetworkTest(unittest.TestCase): self.post = mock.MagicMock(return_value=self.response) self.get = mock.MagicMock(return_value=self.response) - self.identifier = messages2.Identifier( - typ=messages2.IDENTIFIER_FQDN, value='example.com') - - self.config = mock.Mock(accounts_dir=tempfile.mkdtemp()) + self.identifier = messages.Identifier( + typ=messages.IDENTIFIER_FQDN, value='example.com') # Registration self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212') - reg = messages2.Registration( + reg = messages.Registration( contact=self.contact, key=KEY.public(), recovery_token='t') - self.regr = messages2.RegistrationResource( + self.regr = messages.RegistrationResource( body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1', new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg', terms_of_service='https://www.letsencrypt-demo.org/tos') # Authorization authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1' - challb = messages2.ChallengeBody( - uri=(authzr_uri + '/1'), status=messages2.STATUS_VALID, + challb = messages.ChallengeBody( + uri=(authzr_uri + '/1'), status=messages.STATUS_VALID, chall=challenges.DNS(token='foo')) - self.challr = messages2.ChallengeResource( + self.challr = messages.ChallengeResource( body=challb, authzr_uri=authzr_uri) - self.authz = messages2.Authorization( - identifier=messages2.Identifier( - typ=messages2.IDENTIFIER_FQDN, value='example.com'), + self.authz = messages.Authorization( + identifier=messages.Identifier( + typ=messages.IDENTIFIER_FQDN, value='example.com'), challenges=(challb,), combinations=None) - self.authzr = messages2.AuthorizationResource( + self.authzr = messages.AuthorizationResource( body=self.authz, uri=authzr_uri, new_cert_uri='https://www.letsencrypt-demo.org/acme/new-cert') # Request issuance - self.certr = messages2.CertificateResource( + self.certr = messages.CertificateResource( body=CERT, authzrs=(self.authzr,), uri='https://www.letsencrypt-demo.org/acme/cert/1', cert_chain_uri='https://www.letsencrypt-demo.org/ca') - def tearDown(self): - shutil.rmtree(self.config.accounts_dir) - def _mock_post_get(self): # pylint: disable=protected-access self.net._post = self.post @@ -127,22 +117,22 @@ class NetworkTest(unittest.TestCase): self.response.json.return_value = {} # pylint: disable=protected-access self.assertRaises( - errors.NetworkError, self.net._check_response, self.response) + errors.ClientError, self.net._check_response, self.response) def test_check_response_not_ok_jobj_error(self): self.response.ok = False - self.response.json.return_value = messages2.Error( + self.response.json.return_value = messages.Error( detail='foo', typ='serverInternal', title='some title').to_json() # pylint: disable=protected-access self.assertRaises( - messages2.Error, self.net._check_response, self.response) + messages.Error, self.net._check_response, self.response) def test_check_response_not_ok_no_jobj(self): self.response.ok = False self.response.json.side_effect = ValueError # pylint: disable=protected-access self.assertRaises( - errors.NetworkError, self.net._check_response, self.response) + errors.ClientError, self.net._check_response, self.response) def test_check_response_ok_no_jobj_ct_required(self): self.response.json.side_effect = ValueError @@ -150,7 +140,7 @@ class NetworkTest(unittest.TestCase): self.response.headers['Content-Type'] = response_ct # pylint: disable=protected-access self.assertRaises( - errors.NetworkError, self.net._check_response, self.response, + errors.ClientError, self.net._check_response, self.response, content_type=self.net.JSON_CONTENT_TYPE) def test_check_response_ok_no_jobj_no_ct(self): @@ -167,14 +157,14 @@ class NetworkTest(unittest.TestCase): # pylint: disable=protected-access self.net._check_response(self.response) - @mock.patch('letsencrypt.network2.requests') + @mock.patch('acme.client.requests') def test_get_requests_error_passthrough(self, requests_mock): requests_mock.exceptions = requests.exceptions requests_mock.get.side_effect = requests.exceptions.RequestException # pylint: disable=protected-access - self.assertRaises(errors.NetworkError, self.net._get, 'uri') + self.assertRaises(errors.ClientError, self.net._get, 'uri') - @mock.patch('letsencrypt.network2.requests') + @mock.patch('acme.client.requests') def test_get(self, requests_mock): # pylint: disable=protected-access self.net._check_response = mock.MagicMock() @@ -186,16 +176,16 @@ class NetworkTest(unittest.TestCase): # pylint: disable=protected-access self.net._wrap_in_jws = self.wrap_in_jws - @mock.patch('letsencrypt.network2.requests') + @mock.patch('acme.client.requests') def test_post_requests_error_passthrough(self, requests_mock): requests_mock.exceptions = requests.exceptions requests_mock.post.side_effect = requests.exceptions.RequestException # pylint: disable=protected-access self._mock_wrap_in_jws() self.assertRaises( - errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj) + errors.ClientError, self.net._post, 'uri', mock.sentinel.obj) - @mock.patch('letsencrypt.network2.requests') + @mock.patch('acme.client.requests') def test_post(self, requests_mock): # pylint: disable=protected-access self.net._check_response = mock.MagicMock() @@ -206,7 +196,7 @@ class NetworkTest(unittest.TestCase): self.net._check_response.assert_called_once_with( requests_mock.post('uri', mock.sentinel.wrapped), content_type='ct') - @mock.patch('letsencrypt.network2.requests') + @mock.patch('acme.client.requests') def test_post_replay_nonce_handling(self, requests_mock): # pylint: disable=protected-access self.net._check_response = mock.MagicMock() @@ -214,7 +204,7 @@ class NetworkTest(unittest.TestCase): self.net._nonces.clear() self.assertRaises( - errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj) + errors.ClientError, self.net._post, 'uri', mock.sentinel.obj) nonce2 = jose.b64encode('Nonce2') requests_mock.head('uri').headers = { @@ -231,9 +221,9 @@ class NetworkTest(unittest.TestCase): # wrong nonce requests_mock.post('uri').headers = {self.net.REPLAY_NONCE_HEADER: 'F'} self.assertRaises( - errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj) + errors.ClientError, self.net._post, 'uri', mock.sentinel.obj) - @mock.patch('letsencrypt.client.network2.requests') + @mock.patch('acme.client.requests') def test_get_post_verify_ssl(self, requests_mock): # pylint: disable=protected-access self._mock_wrap_in_jws() @@ -274,30 +264,7 @@ class NetworkTest(unittest.TestCase): self.response.status_code = httplib.CREATED self._mock_post_get() self.assertRaises( - errors.NetworkError, self.net.register, self.regr.body) - - def test_register_from_account(self): - self.net.register = mock.Mock() - acc = account.Account( - self.config, 'key', email='cert-admin@example.com', - phone='+12025551212') - - self.net.register_from_account(acc) - - self.net.register.assert_called_with(contact=self.contact) - - def test_register_from_account_partial_info(self): - self.net.register = mock.Mock() - acc = account.Account( - self.config, 'key', email='cert-admin@example.com') - acc2 = account.Account(self.config, 'key') - - self.net.register_from_account(acc) - self.net.register.assert_called_with( - contact=('mailto:cert-admin@example.com',)) - - self.net.register_from_account(acc2) - self.net.register.assert_called_with(contact=()) + errors.ClientError, self.net.register, self.regr.body) def test_update_registration(self): self.response.headers['Location'] = self.regr.uri @@ -339,7 +306,7 @@ class NetworkTest(unittest.TestCase): self.response.status_code = httplib.CREATED self._mock_post_get() self.assertRaises( - errors.NetworkError, self.net.request_challenges, + errors.ClientError, self.net.request_challenges, self.identifier, self.regr) def test_request_domain_challenges(self): @@ -363,7 +330,7 @@ class NetworkTest(unittest.TestCase): def test_answer_challenge_missing_next(self): self._mock_post_get() - self.assertRaises(errors.NetworkError, self.net.answer_challenge, + self.assertRaises(errors.ClientError, self.net.answer_challenge, self.challr.body, challenges.DNSResponse()) def test_retry_after_date(self): @@ -372,7 +339,7 @@ class NetworkTest(unittest.TestCase): datetime.datetime(1999, 12, 31, 23, 59, 59), self.net.retry_after(response=self.response, default=10)) - @mock.patch('letsencrypt.network2.datetime') + @mock.patch('acme.client.datetime') def test_retry_after_invalid(self, dt_mock): dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) dt_mock.timedelta = datetime.timedelta @@ -382,7 +349,7 @@ class NetworkTest(unittest.TestCase): datetime.datetime(2015, 3, 27, 0, 0, 10), self.net.retry_after(response=self.response, default=10)) - @mock.patch('letsencrypt.network2.datetime') + @mock.patch('acme.client.datetime') def test_retry_after_seconds(self, dt_mock): dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) dt_mock.timedelta = datetime.timedelta @@ -392,7 +359,7 @@ class NetworkTest(unittest.TestCase): datetime.datetime(2015, 3, 27, 0, 0, 50), self.net.retry_after(response=self.response, default=10)) - @mock.patch('letsencrypt.network2.datetime') + @mock.patch('acme.client.datetime') def test_retry_after_missing(self, dt_mock): dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) dt_mock.timedelta = datetime.timedelta @@ -432,11 +399,11 @@ class NetworkTest(unittest.TestCase): def test_request_issuance_missing_location(self): self._mock_post_get() self.assertRaises( - errors.NetworkError, self.net.request_issuance, + errors.ClientError, self.net.request_issuance, CSR, (self.authzr,)) - @mock.patch('letsencrypt.network2.datetime') - @mock.patch('letsencrypt.network2.time') + @mock.patch('acme.client.datetime') + @mock.patch('acme.client.time') def test_poll_and_request_issuance(self, time_mock, dt_mock): # clock.dt | pylint: disable=no-member clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27)) @@ -462,7 +429,7 @@ class NetworkTest(unittest.TestCase): if not authzr.retries: # no more retries done = mock.MagicMock(uri=authzr.uri, times=authzr.times) - done.body.status = messages2.STATUS_VALID + done.body.status = messages.STATUS_VALID return done, [] # response (2nd result tuple element) is reduced to only @@ -517,10 +484,10 @@ class NetworkTest(unittest.TestCase): def test_check_cert(self): self.response.headers['Location'] = self.certr.uri - self.response.content = CERT2.as_der() + self.response.content = CERT.as_der() self._mock_post_get() self.assertEqual( - self.certr.update(body=CERT2), self.net.check_cert(self.certr)) + self.certr.update(body=CERT), self.net.check_cert(self.certr)) # TODO: split here and separate test self.response.headers['Location'] = 'foo' @@ -528,9 +495,9 @@ class NetworkTest(unittest.TestCase): errors.UnexpectedUpdate, self.net.check_cert, self.certr) def test_check_cert_missing_location(self): - self.response.content = CERT2.as_der() + self.response.content = CERT.as_der() self._mock_post_get() - self.assertRaises(errors.NetworkError, self.net.check_cert, self.certr) + self.assertRaises(errors.ClientError, self.net.check_cert, self.certr) def test_refresh(self): self.net.check_cert = mock.MagicMock() @@ -550,13 +517,13 @@ class NetworkTest(unittest.TestCase): def test_revoke(self): self._mock_post_get() - self.net.revoke(self.certr, when=messages2.Revocation.NOW) + self.net.revoke(self.certr, when=messages.Revocation.NOW) self.post.assert_called_once_with(self.certr.uri, mock.ANY) def test_revoke_bad_status_raises_error(self): self.response.status_code = httplib.METHOD_NOT_ALLOWED self._mock_post_get() - self.assertRaises(errors.NetworkError, self.net.revoke, self.certr) + self.assertRaises(errors.ClientError, self.net.revoke, self.certr) if __name__ == '__main__': diff --git a/acme/errors.py b/acme/errors.py index 957e781af..5046d7aee 100644 --- a/acme/errors.py +++ b/acme/errors.py @@ -1,8 +1,15 @@ """ACME errors.""" from acme.jose import errors as jose_errors + class Error(Exception): """Generic ACME error.""" class SchemaValidationError(jose_errors.DeserializationError): """JSON schema ACME object validation error.""" + +class ClientError(Error): + """Network error.""" + +class UnexpectedUpdate(ClientError): + """Unexpected update.""" diff --git a/acme/jose/testdata/README b/acme/jose/testdata/README index 72ec557e0..be3d8b2f7 100644 --- a/acme/jose/testdata/README +++ b/acme/jose/testdata/README @@ -4,7 +4,8 @@ The following command has been used to generate test keys: and for the CSR: - python -c from 'letsencrypt.crypto_util import make_csr; - import pkg_resources; open("csr2.pem", - "w").write(make_csr(pkg_resources.resource_string("letsencrypt.tests", - "testdata/rsa512_key.pem"), ["example2.com"])[0])' + openssl req -key rsa512_key.pem -new -subj '/CN=example.com' -outform DER > csr.der + +and for the certificate: + + openssl req -key rsa512_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der diff --git a/acme/jose/testdata/cert.der b/acme/jose/testdata/cert.der new file mode 100644 index 000000000..5f1018505 Binary files /dev/null and b/acme/jose/testdata/cert.der differ diff --git a/acme/jose/testdata/csr.der b/acme/jose/testdata/csr.der new file mode 100644 index 000000000..adc29ff18 Binary files /dev/null and b/acme/jose/testdata/csr.der differ diff --git a/acme/jose/testdata/csr2.pem b/acme/jose/testdata/csr2.pem deleted file mode 100644 index bd059a448..000000000 --- a/acme/jose/testdata/csr2.pem +++ /dev/null @@ -1,10 +0,0 @@ ------BEGIN CERTIFICATE REQUEST----- -MIIBXzCCAQkCAQAwejELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIw -EAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECwwWVW5pdmVy -c2l0eSBvZiBNaWNoaWdhbjEVMBMGA1UEAwwMZXhhbXBsZTIuY29tMFwwDQYJKoZI -hvcNAQEBBQADSwAwSAJBAPS2EXFRNza/qpXnnBHF/CcFQ543htV+7nLAmrLrmTNH -tPXJmLlM8SJDIzv/ceAFXL110VzxFfi81lpH5E5c0TMCAwEAAaAqMCgGCSqGSIb3 -DQEJDjEbMBkwFwYDVR0RBBAwDoIMZXhhbXBsZTIuY29tMA0GCSqGSIb3DQEBCwUA -A0EAwsdL4FLIgISKV4vXFmc6QTV7CjBiP4XmPFbeN+gMFdR7QcnRyyxSpXxB0v8Z -oqYboP5LGFt9zC6/9GyjcI9/IQ== ------END CERTIFICATE REQUEST----- diff --git a/acme/messages.py b/acme/messages.py index 6d46f894c..aa041caed 100644 --- a/acme/messages.py +++ b/acme/messages.py @@ -1,106 +1,241 @@ -"""ACME protocol v00 messages. - -.. warning:: This module is an implementation of the draft `ACME - protocol version 00`_, and not the "RESTified" `ACME protocol version - 01`_ or later. It should work with `older Node.js implementation`_, - but will definitely not work with Boulder_. It is kept for reference - purposes only. - - -.. _`ACME protocol version 00`: - https://github.com/letsencrypt/acme-spec/blob/v00/draft-barnes-acme.md - -.. _`ACME protocol version 01`: - https://github.com/letsencrypt/acme-spec/blob/v01/draft-barnes-acme.md - -.. _Boulder: https://github.com/letsencrypt/boulder - -.. _`older Node.js implementation`: - https://github.com/letsencrypt/node-acme/commit/f42aa5b7fad4cd2fc289653c4ab14f18052367b3 - - -""" -import jsonschema - +"""ACME protocol messages.""" from acme import challenges -from acme import errors +from acme import fields from acme import jose -from acme import other -from acme import util -class Message(jose.TypedJSONObjectWithFields): - # _fields_to_partial_json | pylint: disable=abstract-method - # pylint: disable=too-few-public-methods - """ACME message.""" - TYPES = {} - type_field_name = "type" +class Error(jose.JSONObjectWithFields, Exception): + """ACME error. - schema = NotImplemented - """JSON schema the object is tested against in :meth:`from_json`. - - Subclasses must overrride it with a value that is acceptable by - :func:`jsonschema.validate`, most probably using - :func:`acme.util.load_schema`. + https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00 """ + ERROR_TYPE_NAMESPACE = 'urn:acme:error:' + ERROR_TYPE_DESCRIPTIONS = { + 'malformed': 'The request message was malformed', + 'unauthorized': 'The client lacks sufficient authorization', + 'serverInternal': 'The server experienced an internal error', + 'badCSR': 'The CSR is unacceptable (e.g., due to a short key)', + 'badNonce': 'The client sent an unacceptable anti-replay nonce', + } + + typ = jose.Field('type') + title = jose.Field('title', omitempty=True) + detail = jose.Field('detail') + + @typ.encoder + def typ(value): # pylint: disable=missing-docstring,no-self-argument + return Error.ERROR_TYPE_NAMESPACE + value + + @typ.decoder + def typ(value): # pylint: disable=missing-docstring,no-self-argument + # pylint thinks isinstance(value, Error), so startswith is not found + # pylint: disable=no-member + if not value.startswith(Error.ERROR_TYPE_NAMESPACE): + raise jose.DeserializationError('Missing error type prefix') + + without_prefix = value[len(Error.ERROR_TYPE_NAMESPACE):] + if without_prefix not in Error.ERROR_TYPE_DESCRIPTIONS: + raise jose.DeserializationError('Error type not recognized') + + return without_prefix + + @property + def description(self): + """Hardcoded error description based on its type.""" + return self.ERROR_TYPE_DESCRIPTIONS[self.typ] + + def __str__(self): + if self.typ is not None: + return ' :: '.join([self.typ, self.description, self.detail]) + else: + return str(self.detail) + +class _Constant(jose.JSONDeSerializable): + """ACME constant.""" + __slots__ = ('name',) + POSSIBLE_NAMES = NotImplemented + + def __init__(self, name): + self.POSSIBLE_NAMES[name] = self + self.name = name + + def to_partial_json(self): + return self.name @classmethod - def from_json(cls, jobj): - """Deserialize from (possibly invalid) JSON object. + def from_json(cls, value): + if value not in cls.POSSIBLE_NAMES: + raise jose.DeserializationError( + '{0} not recognized'.format(cls.__name__)) + return cls.POSSIBLE_NAMES[value] - Note that the input ``jobj`` has not been sanitized in any way. + def __repr__(self): + return '{0}({1})'.format(self.__class__.__name__, self.name) - :param jobj: JSON object. + def __eq__(self, other): + return isinstance(other, type(self)) and other.name == self.name - :raises acme.errors.SchemaValidationError: if the input - JSON object could not be validated against JSON schema specified - in :attr:`schema`. - :raises acme.jose.errors.DeserializationError: for any - other generic error in decoding. - - :returns: instance of the class - - """ - msg_cls = cls.get_type_cls(jobj) - - # TODO: is that schema testing still relevant? - try: - jsonschema.validate(jobj, msg_cls.schema) - except jsonschema.ValidationError as error: - raise errors.SchemaValidationError(error) - - return super(Message, cls).from_json(jobj) + def __ne__(self, other): + return not self.__eq__(other) -@Message.register # pylint: disable=too-few-public-methods -class Challenge(Message): - """ACME "challenge" message. +class Status(_Constant): + """ACME "status" field.""" + POSSIBLE_NAMES = {} +STATUS_UNKNOWN = Status('unknown') +STATUS_PENDING = Status('pending') +STATUS_PROCESSING = Status('processing') +STATUS_VALID = Status('valid') +STATUS_INVALID = Status('invalid') +STATUS_REVOKED = Status('revoked') - :ivar str nonce: Random data, **not** base64-encoded. - :ivar list challenges: List of - :class:`~acme.challenges.Challenge` objects. - .. todo:: - 1. can challenges contain two challenges of the same type? - 2. can challenges contain duplicates? - 3. check "combinations" indices are in valid range - 4. turn "combinations" elements into sets? - 5. turn "combinations" into set? +class IdentifierType(_Constant): + """ACME identifier type.""" + POSSIBLE_NAMES = {} +IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder + + +class Identifier(jose.JSONObjectWithFields): + """ACME identifier. + + :ivar acme.messages.IdentifierType typ: """ - typ = "challenge" - schema = util.load_schema(typ) + typ = jose.Field('type', decoder=IdentifierType.from_json) + value = jose.Field('value') - session_id = jose.Field("sessionID") - nonce = jose.Field("nonce", encoder=jose.b64encode, - decoder=jose.decode_b64jose) - challenges = jose.Field("challenges") - combinations = jose.Field("combinations", omitempty=True, default=()) + +class Resource(jose.ImmutableMap): + """ACME Resource. + + :ivar acme.messages.ResourceBody body: Resource body. + :ivar str uri: Location of the resource. + + """ + __slots__ = ('body', 'uri') + + +class ResourceBody(jose.JSONObjectWithFields): + """ACME Resource Body.""" + + +class RegistrationResource(Resource): + """Registration Resource. + + :ivar acme.messages.Registration body: + :ivar str new_authzr_uri: URI found in the 'next' ``Link`` header + :ivar str terms_of_service: URL for the CA TOS. + + """ + __slots__ = ('body', 'uri', 'new_authzr_uri', 'terms_of_service') + + +class Registration(ResourceBody): + """Registration Resource Body. + + :ivar acme.jose.jwk.JWK key: Public key. + :ivar tuple contact: Contact information following ACME spec + + """ + # on new-reg key server ignores 'key' and populates it based on + # JWS.signature.combined.jwk + key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json) + contact = jose.Field('contact', omitempty=True, default=()) + recovery_token = jose.Field('recoveryToken', omitempty=True) + agreement = jose.Field('agreement', omitempty=True) + + +class ChallengeResource(Resource, jose.JSONObjectWithFields): + """Challenge Resource. + + :ivar acme.messages.ChallengeBody body: + :ivar str authzr_uri: URI found in the 'up' ``Link`` header. + + """ + __slots__ = ('body', 'authzr_uri') + + @property + def uri(self): # pylint: disable=missing-docstring,no-self-argument + # bug? 'method already defined line None' + # pylint: disable=function-redefined + return self.body.uri + + +class ChallengeBody(ResourceBody): + """Challenge Resource Body. + + .. todo:: + Confusingly, this has a similar name to `.challenges.Challenge`, + as well as `.achallenges.AnnotatedChallenge`. Please use names + such as ``challb`` to distinguish instances of this class from + ``achall``. + + :ivar acme.challenges.Challenge: Wrapped challenge. + Conveniently, all challenge fields are proxied, i.e. you can + call ``challb.x`` to get ``challb.chall.x`` contents. + :ivar acme.messages.Status status: + :ivar datetime.datetime validated: + + """ + __slots__ = ('chall',) + uri = jose.Field('uri') + status = jose.Field('status', decoder=Status.from_json) + validated = fields.RFC3339Field('validated', omitempty=True) + + def to_partial_json(self): + jobj = super(ChallengeBody, self).to_partial_json() + jobj.update(self.chall.to_partial_json()) + return jobj + + @classmethod + def fields_from_json(cls, jobj): + jobj_fields = super(ChallengeBody, cls).fields_from_json(jobj) + jobj_fields['chall'] = challenges.Challenge.from_json(jobj) + return jobj_fields + + def __getattr__(self, name): + return getattr(self.chall, name) + + +class AuthorizationResource(Resource): + """Authorization Resource. + + :ivar acme.messages.Authorization body: + :ivar str new_cert_uri: URI found in the 'next' ``Link`` header + + """ + __slots__ = ('body', 'uri', 'new_cert_uri') + + +class Authorization(ResourceBody): + """Authorization Resource Body. + + :ivar acme.messages.Identifier identifier: + :ivar list challenges: `list` of `.ChallengeBody` + :ivar tuple combinations: Challenge combinations (`tuple` of `tuple` + of `int`, as opposed to `list` of `list` from the spec). + :ivar acme.jose.jwk.JWK key: Public key. + :ivar tuple contact: + :ivar acme.messages.Status status: + :ivar datetime.datetime expires: + + """ + identifier = jose.Field('identifier', decoder=Identifier.from_json) + challenges = jose.Field('challenges', omitempty=True) + combinations = jose.Field('combinations', omitempty=True) + + status = jose.Field('status', omitempty=True, decoder=Status.from_json) + # TODO: 'expires' is allowed for Authorization Resources in + # general, but for Key Authorization '[t]he "expires" field MUST + # be absent'... then acme-spec gives example with 'expires' + # present... That's confusing! + expires = fields.RFC3339Field('expires', omitempty=True) @challenges.decoder def challenges(value): # pylint: disable=missing-docstring,no-self-argument - return tuple(challenges.Challenge.from_json(chall) for chall in value) + return tuple(ChallengeBody.from_json(chall) for chall in value) @property def resolved_combinations(self): @@ -109,259 +244,55 @@ class Challenge(Message): for combo in self.combinations) -@Message.register # pylint: disable=too-few-public-methods -class ChallengeRequest(Message): - """ACME "challengeRequest" message.""" - typ = "challengeRequest" - schema = util.load_schema(typ) - identifier = jose.Field("identifier") +class CertificateRequest(jose.JSONObjectWithFields): + """ACME new-cert request. - -@Message.register # pylint: disable=too-few-public-methods -class Authorization(Message): - """ACME "authorization" message. - - :ivar jwk: :class:`acme.jose.JWK` + :ivar acme.jose.util.ComparableX509 csr: + `M2Crypto.X509.Request` wrapped in `.ComparableX509` + :ivar tuple authorizations: `tuple` of URIs (`str`) """ - typ = "authorization" - schema = util.load_schema(typ) - - recovery_token = jose.Field("recoveryToken", omitempty=True) - identifier = jose.Field("identifier", omitempty=True) - jwk = jose.Field("jwk", decoder=jose.JWK.from_json, omitempty=True) + csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr) + authorizations = jose.Field('authorizations', decoder=tuple) -@Message.register -class AuthorizationRequest(Message): - """ACME "authorizationRequest" message. +class CertificateResource(Resource): + """Certificate Resource. - :ivar str nonce: Random data from the corresponding - :attr:`Challenge.nonce`, **not** base64-encoded. - :ivar list responses: List of completed challenges ( - :class:`acme.challenges.ChallengeResponse`). - :ivar signature: Signature (:class:`acme.other.Signature`). + :ivar acme.jose.util.ComparableX509 body: + `M2Crypto.X509.X509` wrapped in `.ComparableX509` + :ivar str cert_chain_uri: URI found in the 'up' ``Link`` header + :ivar tuple authzrs: `tuple` of `AuthorizationResource`. """ - typ = "authorizationRequest" - schema = util.load_schema(typ) - - session_id = jose.Field("sessionID") - nonce = jose.Field("nonce", encoder=jose.b64encode, - decoder=jose.decode_b64jose) - responses = jose.Field("responses") - signature = jose.Field("signature", decoder=other.Signature.from_json) - contact = jose.Field("contact", omitempty=True, default=()) - - @responses.decoder - def responses(value): # pylint: disable=missing-docstring,no-self-argument - return tuple(challenges.ChallengeResponse.from_json(chall) - for chall in value) - - @classmethod - def create(cls, name, key, sig_nonce=None, **kwargs): - """Create signed "authorizationRequest". - - :param str name: Hostname - - :param key: Key used for signing. - :type key: :class:`Crypto.PublicKey.RSA` - - :param str sig_nonce: Nonce used for signature. Useful for testing. - :kwargs: Any other arguments accepted by the class constructor. - - :returns: Signed "authorizationRequest" ACME message. - :rtype: :class:`AuthorizationRequest` - - """ - # pylint: disable=too-many-arguments - signature = other.Signature.from_msg( - name + kwargs["nonce"], key, sig_nonce) - return cls( - signature=signature, contact=kwargs.pop("contact", ()), **kwargs) - - def verify(self, name): - """Verify signature. - - .. warning:: Caller must check that the public key encoded in the - :attr:`signature`'s :class:`acme.jose.JWK` object - is the correct key for a given context. - - :param str name: Hostname - - :returns: True iff ``signature`` can be verified, False otherwise. - :rtype: bool - - """ - # self.signature is not Field | pylint: disable=no-member - return self.signature.verify(name + self.nonce) + __slots__ = ('body', 'uri', 'cert_chain_uri', 'authzrs') -@Message.register # pylint: disable=too-few-public-methods -class Certificate(Message): - """ACME "certificate" message. +class Revocation(jose.JSONObjectWithFields): + """Revocation message. - :ivar certificate: The certificate (:class:`M2Crypto.X509.X509` - wrapped in :class:`acme.util.ComparableX509`). - - :ivar list chain: Chain of certificates (:class:`M2Crypto.X509.X509` - wrapped in :class:`acme.util.ComparableX509` ). + :ivar revoke: Either a `datetime.datetime` or `Revocation.NOW`. + :ivar tuple authorizations: Same as `CertificateRequest.authorizations` """ - typ = "certificate" - schema = util.load_schema(typ) - certificate = jose.Field("certificate", encoder=jose.encode_cert, - decoder=jose.decode_cert) - chain = jose.Field("chain", omitempty=True, default=()) - refresh = jose.Field("refresh", omitempty=True) + NOW = 'now' + """A possible value for `revoke`, denoting that certificate should + be revoked now.""" - @chain.decoder - def chain(value): # pylint: disable=missing-docstring,no-self-argument - return tuple(jose.decode_cert(cert) for cert in value) + revoke = jose.Field('revoke') + authorizations = CertificateRequest._fields['authorizations'] - @chain.encoder - def chain(value): # pylint: disable=missing-docstring,no-self-argument - return tuple(jose.encode_cert(cert) for cert in value) + @revoke.decoder + def revoke(value): # pylint: disable=missing-docstring,no-self-argument + if value == Revocation.NOW: + return value + else: + return fields.RFC3339Field.default_decoder(value) - -@Message.register -class CertificateRequest(Message): - """ACME "certificateRequest" message. - - :ivar csr: Certificate Signing Request (:class:`M2Crypto.X509.Request` - wrapped in :class:`acme.util.ComparableX509`. - :ivar signature: Signature (:class:`acme.other.Signature`). - - """ - typ = "certificateRequest" - schema = util.load_schema(typ) - - csr = jose.Field("csr", encoder=jose.encode_csr, - decoder=jose.decode_csr) - signature = jose.Field("signature", decoder=other.Signature.from_json) - - @classmethod - def create(cls, key, sig_nonce=None, **kwargs): - """Create signed "certificateRequest". - - :param key: Key used for signing. - :type key: :class:`Crypto.PublicKey.RSA` - - :param str sig_nonce: Nonce used for signature. Useful for testing. - :kwargs: Any other arguments accepted by the class constructor. - - :returns: Signed "certificateRequest" ACME message. - :rtype: :class:`CertificateRequest` - - """ - return cls(signature=other.Signature.from_msg( - kwargs["csr"].as_der(), key, sig_nonce), **kwargs) - - def verify(self): - """Verify signature. - - .. warning:: Caller must check that the public key encoded in the - :attr:`signature`'s :class:`acme.jose.JWK` object - is the correct key for a given context. - - :returns: True iff ``signature`` can be verified, False otherwise. - :rtype: bool - - """ - # self.signature is not Field | pylint: disable=no-member - return self.signature.verify(self.csr.as_der()) - - -@Message.register # pylint: disable=too-few-public-methods -class Defer(Message): - """ACME "defer" message.""" - typ = "defer" - schema = util.load_schema(typ) - - token = jose.Field("token") - interval = jose.Field("interval", omitempty=True) - message = jose.Field("message", omitempty=True) - - -@Message.register # pylint: disable=too-few-public-methods -class Error(Message): - """ACME "error" message.""" - typ = "error" - schema = util.load_schema(typ) - - error = jose.Field("error") - message = jose.Field("message", omitempty=True) - more_info = jose.Field("moreInfo", omitempty=True) - - MESSAGE_CODES = { - "malformed": "The request message was malformed", - "unauthorized": "The client lacks sufficient authorization", - "serverInternal": "The server experienced an internal error", - "notSupported": "The request type is not supported", - "unknown": "The server does not recognize an ID/token in the request", - "badCSR": "The CSR is unacceptable (e.g., due to a short key)", - } - - -@Message.register # pylint: disable=too-few-public-methods -class Revocation(Message): - """ACME "revocation" message.""" - typ = "revocation" - schema = util.load_schema(typ) - - -@Message.register -class RevocationRequest(Message): - """ACME "revocationRequest" message. - - :ivar certificate: Certificate (:class:`M2Crypto.X509.X509` - wrapped in :class:`acme.util.ComparableX509`). - :ivar signature: Signature (:class:`acme.other.Signature`). - - """ - typ = "revocationRequest" - schema = util.load_schema(typ) - - certificate = jose.Field("certificate", decoder=jose.decode_cert, - encoder=jose.encode_cert) - signature = jose.Field("signature", decoder=other.Signature.from_json) - - @classmethod - def create(cls, key, sig_nonce=None, **kwargs): - """Create signed "revocationRequest". - - :param key: Key used for signing. - :type key: :class:`Crypto.PublicKey.RSA` - - :param str sig_nonce: Nonce used for signature. Useful for testing. - :kwargs: Any other arguments accepted by the class constructor. - - :returns: Signed "revocationRequest" ACME message. - :rtype: :class:`RevocationRequest` - - """ - return cls(signature=other.Signature.from_msg( - kwargs["certificate"].as_der(), key, sig_nonce), **kwargs) - - def verify(self): - """Verify signature. - - .. warning:: Caller must check that the public key encoded in the - :attr:`signature`'s :class:`acme.jose.JWK` object - is the correct key for a given context. - - :returns: True iff ``signature`` can be verified, False otherwise. - :rtype: bool - - """ - # self.signature is not Field | pylint: disable=no-member - return self.signature.verify(self.certificate.as_der()) - - -@Message.register # pylint: disable=too-few-public-methods -class StatusRequest(Message): - """ACME "statusRequest" message.""" - typ = "statusRequest" - schema = util.load_schema(typ) - token = jose.Field("token") + @revoke.encoder + def revoke(value): # pylint: disable=missing-docstring,no-self-argument + if value == Revocation.NOW: + return value + else: + return fields.RFC3339Field.default_encoder(value) diff --git a/acme/messages2.py b/acme/messages2.py deleted file mode 100644 index 15b4521de..000000000 --- a/acme/messages2.py +++ /dev/null @@ -1,298 +0,0 @@ -"""ACME protocol messages.""" -from acme import challenges -from acme import fields -from acme import jose - - -class Error(jose.JSONObjectWithFields, Exception): - """ACME error. - - https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00 - - """ - ERROR_TYPE_NAMESPACE = 'urn:acme:error:' - ERROR_TYPE_DESCRIPTIONS = { - 'malformed': 'The request message was malformed', - 'unauthorized': 'The client lacks sufficient authorization', - 'serverInternal': 'The server experienced an internal error', - 'badCSR': 'The CSR is unacceptable (e.g., due to a short key)', - 'badNonce': 'The client sent an unacceptable anti-replay nonce', - } - - typ = jose.Field('type') - title = jose.Field('title', omitempty=True) - detail = jose.Field('detail') - - @typ.encoder - def typ(value): # pylint: disable=missing-docstring,no-self-argument - return Error.ERROR_TYPE_NAMESPACE + value - - @typ.decoder - def typ(value): # pylint: disable=missing-docstring,no-self-argument - # pylint thinks isinstance(value, Error), so startswith is not found - # pylint: disable=no-member - if not value.startswith(Error.ERROR_TYPE_NAMESPACE): - raise jose.DeserializationError('Missing error type prefix') - - without_prefix = value[len(Error.ERROR_TYPE_NAMESPACE):] - if without_prefix not in Error.ERROR_TYPE_DESCRIPTIONS: - raise jose.DeserializationError('Error type not recognized') - - return without_prefix - - @property - def description(self): - """Hardcoded error description based on its type.""" - return self.ERROR_TYPE_DESCRIPTIONS[self.typ] - - def __str__(self): - if self.typ is not None: - return ' :: '.join([self.typ, self.description, self.detail]) - else: - return str(self.detail) - -class _Constant(jose.JSONDeSerializable): - """ACME constant.""" - __slots__ = ('name',) - POSSIBLE_NAMES = NotImplemented - - def __init__(self, name): - self.POSSIBLE_NAMES[name] = self - self.name = name - - def to_partial_json(self): - return self.name - - @classmethod - def from_json(cls, value): - if value not in cls.POSSIBLE_NAMES: - raise jose.DeserializationError( - '{0} not recognized'.format(cls.__name__)) - return cls.POSSIBLE_NAMES[value] - - def __repr__(self): - return '{0}({1})'.format(self.__class__.__name__, self.name) - - def __eq__(self, other): - return isinstance(other, type(self)) and other.name == self.name - - def __ne__(self, other): - return not self.__eq__(other) - - -class Status(_Constant): - """ACME "status" field.""" - POSSIBLE_NAMES = {} -STATUS_UNKNOWN = Status('unknown') -STATUS_PENDING = Status('pending') -STATUS_PROCESSING = Status('processing') -STATUS_VALID = Status('valid') -STATUS_INVALID = Status('invalid') -STATUS_REVOKED = Status('revoked') - - -class IdentifierType(_Constant): - """ACME identifier type.""" - POSSIBLE_NAMES = {} -IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder - - -class Identifier(jose.JSONObjectWithFields): - """ACME identifier. - - :ivar acme.messages2.IdentifierType typ: - - """ - typ = jose.Field('type', decoder=IdentifierType.from_json) - value = jose.Field('value') - - -class Resource(jose.ImmutableMap): - """ACME Resource. - - :ivar acme.messages2.ResourceBody body: Resource body. - :ivar str uri: Location of the resource. - - """ - __slots__ = ('body', 'uri') - - -class ResourceBody(jose.JSONObjectWithFields): - """ACME Resource Body.""" - - -class RegistrationResource(Resource): - """Registration Resource. - - :ivar acme.messages2.Registration body: - :ivar str new_authzr_uri: URI found in the 'next' ``Link`` header - :ivar str terms_of_service: URL for the CA TOS. - - """ - __slots__ = ('body', 'uri', 'new_authzr_uri', 'terms_of_service') - - -class Registration(ResourceBody): - """Registration Resource Body. - - :ivar acme.jose.jwk.JWK key: Public key. - :ivar tuple contact: Contact information following ACME spec - - """ - # on new-reg key server ignores 'key' and populates it based on - # JWS.signature.combined.jwk - key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json) - contact = jose.Field('contact', omitempty=True, default=()) - recovery_token = jose.Field('recoveryToken', omitempty=True) - agreement = jose.Field('agreement', omitempty=True) - - -class ChallengeResource(Resource, jose.JSONObjectWithFields): - """Challenge Resource. - - :ivar acme.messages2.ChallengeBody body: - :ivar str authzr_uri: URI found in the 'up' ``Link`` header. - - """ - __slots__ = ('body', 'authzr_uri') - - @property - def uri(self): # pylint: disable=missing-docstring,no-self-argument - # bug? 'method already defined line None' - # pylint: disable=function-redefined - return self.body.uri - - -class ChallengeBody(ResourceBody): - """Challenge Resource Body. - - .. todo:: - Confusingly, this has a similar name to `.challenges.Challenge`, - as well as `.achallenges.AnnotatedChallenge`. Please use names - such as ``challb`` to distinguish instances of this class from - ``achall``. - - :ivar acme.challenges.Challenge: Wrapped challenge. - Conveniently, all challenge fields are proxied, i.e. you can - call ``challb.x`` to get ``challb.chall.x`` contents. - :ivar acme.messages2.Status status: - :ivar datetime.datetime validated: - - """ - __slots__ = ('chall',) - uri = jose.Field('uri') - status = jose.Field('status', decoder=Status.from_json) - validated = fields.RFC3339Field('validated', omitempty=True) - - def to_partial_json(self): - jobj = super(ChallengeBody, self).to_partial_json() - jobj.update(self.chall.to_partial_json()) - return jobj - - @classmethod - def fields_from_json(cls, jobj): - jobj_fields = super(ChallengeBody, cls).fields_from_json(jobj) - jobj_fields['chall'] = challenges.Challenge.from_json(jobj) - return jobj_fields - - def __getattr__(self, name): - return getattr(self.chall, name) - - -class AuthorizationResource(Resource): - """Authorization Resource. - - :ivar acme.messages2.Authorization body: - :ivar str new_cert_uri: URI found in the 'next' ``Link`` header - - """ - __slots__ = ('body', 'uri', 'new_cert_uri') - - -class Authorization(ResourceBody): - """Authorization Resource Body. - - :ivar acme.messages2.Identifier identifier: - :ivar list challenges: `list` of `.ChallengeBody` - :ivar tuple combinations: Challenge combinations (`tuple` of `tuple` - of `int`, as opposed to `list` of `list` from the spec). - :ivar acme.jose.jwk.JWK key: Public key. - :ivar tuple contact: - :ivar acme.messages2.Status status: - :ivar datetime.datetime expires: - - """ - identifier = jose.Field('identifier', decoder=Identifier.from_json) - challenges = jose.Field('challenges', omitempty=True) - combinations = jose.Field('combinations', omitempty=True) - - status = jose.Field('status', omitempty=True, decoder=Status.from_json) - # TODO: 'expires' is allowed for Authorization Resources in - # general, but for Key Authorization '[t]he "expires" field MUST - # be absent'... then acme-spec gives example with 'expires' - # present... That's confusing! - expires = fields.RFC3339Field('expires', omitempty=True) - - @challenges.decoder - def challenges(value): # pylint: disable=missing-docstring,no-self-argument - return tuple(ChallengeBody.from_json(chall) for chall in value) - - @property - def resolved_combinations(self): - """Combinations with challenges instead of indices.""" - return tuple(tuple(self.challenges[idx] for idx in combo) - for combo in self.combinations) - - -class CertificateRequest(jose.JSONObjectWithFields): - """ACME new-cert request. - - :ivar acme.jose.util.ComparableX509 csr: - `M2Crypto.X509.Request` wrapped in `.ComparableX509` - :ivar tuple authorizations: `tuple` of URIs (`str`) - - """ - csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr) - authorizations = jose.Field('authorizations', decoder=tuple) - - -class CertificateResource(Resource): - """Certificate Resource. - - :ivar acme.jose.util.ComparableX509 body: - `M2Crypto.X509.X509` wrapped in `.ComparableX509` - :ivar str cert_chain_uri: URI found in the 'up' ``Link`` header - :ivar tuple authzrs: `tuple` of `AuthorizationResource`. - - """ - __slots__ = ('body', 'uri', 'cert_chain_uri', 'authzrs') - - -class Revocation(jose.JSONObjectWithFields): - """Revocation message. - - :ivar revoke: Either a `datetime.datetime` or `Revocation.NOW`. - :ivar tuple authorizations: Same as `CertificateRequest.authorizations` - - """ - - NOW = 'now' - """A possible value for `revoke`, denoting that certificate should - be revoked now.""" - - revoke = jose.Field('revoke') - authorizations = CertificateRequest._fields['authorizations'] - - @revoke.decoder - def revoke(value): # pylint: disable=missing-docstring,no-self-argument - if value == Revocation.NOW: - return value - else: - return fields.RFC3339Field.default_decoder(value) - - @revoke.encoder - def revoke(value): # pylint: disable=missing-docstring,no-self-argument - if value == Revocation.NOW: - return value - else: - return fields.RFC3339Field.default_encoder(value) diff --git a/acme/messages2_test.py b/acme/messages2_test.py deleted file mode 100644 index 72ffc954a..000000000 --- a/acme/messages2_test.py +++ /dev/null @@ -1,250 +0,0 @@ -"""Tests for acme.messages2.""" -import datetime -import os -import pkg_resources -import unittest - -import mock -import pytz -from Crypto.PublicKey import RSA - -from acme import challenges -from acme import jose - - -KEY = jose.util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string( - 'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))) - - -class ErrorTest(unittest.TestCase): - """Tests for acme.messages2.Error.""" - - def setUp(self): - from acme.messages2 import Error - self.error = Error(detail='foo', typ='malformed', title='title') - self.jobj = {'detail': 'foo', 'title': 'some title'} - - def test_typ_prefix(self): - self.assertEqual('malformed', self.error.typ) - self.assertEqual( - 'urn:acme:error:malformed', self.error.to_partial_json()['type']) - self.assertEqual( - 'malformed', self.error.from_json(self.error.to_partial_json()).typ) - - def test_typ_decoder_missing_prefix(self): - from acme.messages2 import Error - self.jobj['type'] = 'malformed' - self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj) - self.jobj['type'] = 'not valid bare type' - self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj) - - def test_typ_decoder_not_recognized(self): - from acme.messages2 import Error - self.jobj['type'] = 'urn:acme:error:baz' - self.assertRaises(jose.DeserializationError, Error.from_json, self.jobj) - - def test_description(self): - self.assertEqual( - 'The request message was malformed', self.error.description) - - def test_from_json_hashable(self): - from acme.messages2 import Error - hash(Error.from_json(self.error.to_json())) - - def test_str(self): - self.assertEqual( - 'malformed :: The request message was malformed :: foo', - str(self.error)) - self.assertEqual('foo', str(self.error.update(typ=None))) - - -class ConstantTest(unittest.TestCase): - """Tests for acme.messages2._Constant.""" - - def setUp(self): - from acme.messages2 import _Constant - class MockConstant(_Constant): # pylint: disable=missing-docstring - POSSIBLE_NAMES = {} - - self.MockConstant = MockConstant # pylint: disable=invalid-name - self.const_a = MockConstant('a') - self.const_b = MockConstant('b') - - def test_to_partial_json(self): - self.assertEqual('a', self.const_a.to_partial_json()) - self.assertEqual('b', self.const_b.to_partial_json()) - - def test_from_json(self): - self.assertEqual(self.const_a, self.MockConstant.from_json('a')) - self.assertRaises( - jose.DeserializationError, self.MockConstant.from_json, 'c') - - def test_from_json_hashable(self): - hash(self.MockConstant.from_json('a')) - - def test_repr(self): - self.assertEqual('MockConstant(a)', repr(self.const_a)) - self.assertEqual('MockConstant(b)', repr(self.const_b)) - - def test_equality(self): - const_a_prime = self.MockConstant('a') - self.assertFalse(self.const_a == self.const_b) - self.assertTrue(self.const_a == const_a_prime) - - self.assertTrue(self.const_a != self.const_b) - self.assertFalse(self.const_a != const_a_prime) - -class RegistrationTest(unittest.TestCase): - """Tests for acme.messages2.Registration.""" - - def setUp(self): - key = jose.jwk.JWKRSA(key=KEY.publickey()) - contact = ('mailto:letsencrypt-client@letsencrypt.org',) - recovery_token = 'XYZ' - agreement = 'https://letsencrypt.org/terms' - - from acme.messages2 import Registration - self.reg = Registration( - key=key, contact=contact, recovery_token=recovery_token, - agreement=agreement) - - self.jobj_to = { - 'contact': contact, - 'recoveryToken': recovery_token, - 'agreement': agreement, - 'key': key, - } - self.jobj_from = self.jobj_to.copy() - self.jobj_from['key'] = key.to_json() - - def test_to_partial_json(self): - self.assertEqual(self.jobj_to, self.reg.to_partial_json()) - - def test_from_json(self): - from acme.messages2 import Registration - self.assertEqual(self.reg, Registration.from_json(self.jobj_from)) - - def test_from_json_hashable(self): - from acme.messages2 import Registration - hash(Registration.from_json(self.jobj_from)) - - -class ChallengeResourceTest(unittest.TestCase): - """Tests for acme.messages2.ChallengeResource.""" - - def test_uri(self): - from acme.messages2 import ChallengeResource - self.assertEqual('http://challb', ChallengeResource(body=mock.MagicMock( - uri='http://challb'), authzr_uri='http://authz').uri) - - -class ChallengeBodyTest(unittest.TestCase): - """Tests for acme.messages2.ChallengeBody.""" - - def setUp(self): - self.chall = challenges.DNS(token='foo') - - from acme.messages2 import ChallengeBody - from acme.messages2 import STATUS_VALID - self.status = STATUS_VALID - self.challb = ChallengeBody( - uri='http://challb', chall=self.chall, status=self.status) - - self.jobj_to = { - 'uri': 'http://challb', - 'status': self.status, - 'type': 'dns', - 'token': 'foo', - } - self.jobj_from = self.jobj_to.copy() - self.jobj_from['status'] = 'valid' - - def test_to_partial_json(self): - self.assertEqual(self.jobj_to, self.challb.to_partial_json()) - - def test_from_json(self): - from acme.messages2 import ChallengeBody - self.assertEqual(self.challb, ChallengeBody.from_json(self.jobj_from)) - - def test_from_json_hashable(self): - from acme.messages2 import ChallengeBody - hash(ChallengeBody.from_json(self.jobj_from)) - - def test_proxy(self): - self.assertEqual('foo', self.challb.token) - - -class AuthorizationTest(unittest.TestCase): - """Tests for acme.messages2.Authorization.""" - - def setUp(self): - from acme.messages2 import ChallengeBody - from acme.messages2 import STATUS_VALID - self.challbs = ( - ChallengeBody( - uri='http://challb1', status=STATUS_VALID, - chall=challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A')), - ChallengeBody(uri='http://challb2', status=STATUS_VALID, - chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')), - ChallengeBody(uri='http://challb3', status=STATUS_VALID, - chall=challenges.RecoveryToken()), - ) - combinations = ((0, 2), (1, 2)) - - from acme.messages2 import Authorization - from acme.messages2 import Identifier - from acme.messages2 import IDENTIFIER_FQDN - identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com') - self.authz = Authorization( - identifier=identifier, combinations=combinations, - challenges=self.challbs) - - self.jobj_from = { - 'identifier': identifier.to_json(), - 'challenges': [challb.to_json() for challb in self.challbs], - 'combinations': combinations, - } - - def test_from_json(self): - from acme.messages2 import Authorization - Authorization.from_json(self.jobj_from) - - def test_from_json_hashable(self): - from acme.messages2 import Authorization - hash(Authorization.from_json(self.jobj_from)) - - def test_resolved_combinations(self): - self.assertEqual(self.authz.resolved_combinations, ( - (self.challbs[0], self.challbs[2]), - (self.challbs[1], self.challbs[2]), - )) - - -class RevocationTest(unittest.TestCase): - """Tests for acme.messages2.RevocationTest.""" - - def setUp(self): - from acme.messages2 import Revocation - self.rev_now = Revocation(authorizations=(), revoke=Revocation.NOW) - self.rev_date = Revocation(authorizations=(), revoke=datetime.datetime( - 2015, 3, 27, tzinfo=pytz.utc)) - self.jobj_now = {'authorizations': (), 'revoke': Revocation.NOW} - self.jobj_date = {'authorizations': (), - 'revoke': '2015-03-27T00:00:00Z'} - - def test_revoke_decoder(self): - from acme.messages2 import Revocation - self.assertEqual(self.rev_now, Revocation.from_json(self.jobj_now)) - self.assertEqual(self.rev_date, Revocation.from_json(self.jobj_date)) - - def test_revoke_encoder(self): - self.assertEqual(self.jobj_now, self.rev_now.to_partial_json()) - self.assertEqual(self.jobj_date, self.rev_date.to_partial_json()) - - def test_from_json_hashable(self): - from acme.messages2 import Revocation - hash(Revocation.from_json(self.rev_now.to_json())) - - -if __name__ == '__main__': - unittest.main() # pragma: no cover diff --git a/acme/messages_test.py b/acme/messages_test.py index baff2a21a..4f86d7809 100644 --- a/acme/messages_test.py +++ b/acme/messages_test.py @@ -1,479 +1,249 @@ """Tests for acme.messages.""" +import datetime import os import pkg_resources import unittest -import Crypto.PublicKey.RSA -import M2Crypto +import mock +import pytz +from Crypto.PublicKey import RSA from acme import challenges -from acme import errors from acme import jose -from acme import other -KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey( - pkg_resources.resource_string( - 'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))) -CERT = jose.ComparableX509(M2Crypto.X509.load_cert( - pkg_resources.resource_filename( - 'letsencrypt.tests', os.path.join('testdata', 'cert.pem')))) -CSR = jose.ComparableX509(M2Crypto.X509.load_request( - pkg_resources.resource_filename( - 'letsencrypt.tests', os.path.join('testdata', 'csr.pem')))) -CSR2 = jose.ComparableX509(M2Crypto.X509.load_request( - pkg_resources.resource_filename( - 'acme.jose', os.path.join('testdata', 'csr2.pem')))) - - -class MessageTest(unittest.TestCase): - """Tests for acme.messages.Message.""" - - def setUp(self): - # pylint: disable=missing-docstring,too-few-public-methods - from acme.messages import Message - - class MockParentMessage(Message): - # pylint: disable=abstract-method - TYPES = {} - - @MockParentMessage.register - class MockMessage(MockParentMessage): - typ = 'test' - schema = { - 'type': 'object', - 'properties': { - 'price': {'type': 'number'}, - 'name': {'type': 'string'}, - }, - } - price = jose.Field('price') - name = jose.Field('name') - - self.parent_cls = MockParentMessage - self.msg = MockMessage(price=123, name='foo') - - def test_from_json_validates(self): - self.assertRaises(errors.SchemaValidationError, - self.parent_cls.from_json, - {'type': 'test', 'price': 'asd'}) - - -class ChallengeTest(unittest.TestCase): - - def setUp(self): - challs = ( - challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A'), - challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA'), - challenges.RecoveryToken(), - ) - combinations = ((0, 2), (1, 2)) - - from acme.messages import Challenge - self.msg = Challenge( - session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4', - nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9', - challenges=challs, combinations=combinations) - - self.jmsg_to = { - 'type': 'challenge', - 'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4', - 'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ', - 'challenges': challs, - 'combinations': combinations, - } - - self.jmsg_from = { - 'type': 'challenge', - 'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4', - 'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ', - 'challenges': [chall.to_json() for chall in challs], - 'combinations': [[0, 2], [1, 2]], # TODO array tuples - } - - def test_resolved_combinations(self): - self.assertEqual(self.msg.resolved_combinations, ( - ( - challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A'), - challenges.RecoveryToken() - ), - ( - challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA'), - challenges.RecoveryToken(), - ) - )) - - def test_to_partial_json(self): - self.assertEqual(self.msg.to_partial_json(), self.jmsg_to) - - def test_from_json(self): - from acme.messages import Challenge - self.assertEqual(Challenge.from_json(self.jmsg_from), self.msg) - - def test_json_without_optionals(self): - del self.jmsg_from['combinations'] - del self.jmsg_to['combinations'] - - from acme.messages import Challenge - msg = Challenge.from_json(self.jmsg_from) - - self.assertEqual(msg.combinations, ()) - self.assertEqual(msg.to_partial_json(), self.jmsg_to) - - -class ChallengeRequestTest(unittest.TestCase): - - def setUp(self): - from acme.messages import ChallengeRequest - self.msg = ChallengeRequest(identifier='example.com') - - self.jmsg = { - 'type': 'challengeRequest', - 'identifier': 'example.com', - } - - def test_to_partial_json(self): - self.assertEqual(self.msg.to_partial_json(), self.jmsg) - - def test_from_json(self): - from acme.messages import ChallengeRequest - self.assertEqual(ChallengeRequest.from_json(self.jmsg), self.msg) - - -class AuthorizationTest(unittest.TestCase): - - def setUp(self): - jwk = jose.JWKRSA(key=KEY.publickey()) - - from acme.messages import Authorization - self.msg = Authorization(recovery_token='tok', jwk=jwk, - identifier='example.com') - - self.jmsg = { - 'type': 'authorization', - 'recoveryToken': 'tok', - 'identifier': 'example.com', - 'jwk': jwk, - } - - def test_to_partial_json(self): - self.assertEqual(self.msg.to_partial_json(), self.jmsg) - - def test_from_json(self): - self.jmsg['jwk'] = self.jmsg['jwk'].to_partial_json() - - from acme.messages import Authorization - self.assertEqual(Authorization.from_json(self.jmsg), self.msg) - - def test_json_without_optionals(self): - del self.jmsg['recoveryToken'] - del self.jmsg['identifier'] - del self.jmsg['jwk'] - - from acme.messages import Authorization - msg = Authorization.from_json(self.jmsg) - - self.assertTrue(msg.recovery_token is None) - self.assertTrue(msg.identifier is None) - self.assertTrue(msg.jwk is None) - self.assertEqual(self.jmsg, msg.to_partial_json()) - - -class AuthorizationRequestTest(unittest.TestCase): - - def setUp(self): - self.responses = ( - challenges.SimpleHTTPResponse(path='Hf5GrX4Q7EBax9hc2jJnfw'), - None, # null - challenges.RecoveryTokenResponse(token='23029d88d9e123e'), - ) - self.contact = ("mailto:cert-admin@example.com", "tel:+12025551212") - signature = other.Signature( - alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()), - sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe' - '\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v' - '\xe4\xed\xe8\x03J\xe8\xc8