From fadad74d480b8bf83aad6e84870a87997f2a536f Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Fri, 27 Mar 2015 10:33:07 +0000 Subject: [PATCH] Test, lint, and docs for network2 --- letsencrypt/acme/messages2.py | 17 +- letsencrypt/acme/messages2_test.py | 2 +- letsencrypt/client/network2.py | 201 ++++++---- letsencrypt/client/tests/network2_test.py | 453 ++++++++++++++++++++++ 4 files changed, 595 insertions(+), 78 deletions(-) create mode 100644 letsencrypt/client/tests/network2_test.py diff --git a/letsencrypt/acme/messages2.py b/letsencrypt/acme/messages2.py index 49ca24e73..f3ce53665 100644 --- a/letsencrypt/acme/messages2.py +++ b/letsencrypt/acme/messages2.py @@ -117,10 +117,10 @@ class RegistrationResource(Resource): :ivar body: `Registration` :ivar str uri: URI of the resource. - :ivar new_authz_uri: URI found in the 'next' Link header + :ivar new_authzr_uri: URI found in the 'next' Link header """ - __slots__ = ('body', 'uri', 'new_authz_uri', 'terms_of_service') + __slots__ = ('body', 'uri', 'new_authzr_uri', 'terms_of_service') class Registration(ResourceBody): @@ -138,10 +138,10 @@ class ChallengeResource(Resource, jose.JSONObjectWithFields): """Challenge resource. :ivar body: `.challenges.ChallengeBody` - :ivar authz_uri: URI found in the 'up' Link header. + :ivar authzr_uri: URI found in the 'up' Link header. """ - __slots__ = ('body', 'authz_uri') + __slots__ = ('body', 'authzr_uri') @property def uri(self): # pylint: disable=missing-docstring,no-self-argument @@ -217,10 +217,7 @@ class Authorization(ResourceBody): @challenges.decoder def challenges(value): # pylint: disable=missing-docstring,no-self-argument - return tuple( - ChallengeResource( - body=ChallengeBody.from_json(chall), authz_uri=None) - for chall in value) + return tuple(ChallengeBody.from_json(chall) for chall in value) @property def resolved_combinations(self): @@ -232,7 +229,7 @@ class Authorization(ResourceBody): class CertificateRequest(jose.JSONObjectWithFields): """ACME new-cert request. - :ivar csr: `M2Crypto.X509.Request` + :ivar csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509` """ csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr) @@ -242,7 +239,7 @@ class CertificateRequest(jose.JSONObjectWithFields): class CertificateResource(Resource): """Authorization resource. - :ivar body: `M2Crypto.X509.X509` + :ivar body: `M2Crypto.X509.X509` wrapped in `.ComparableX509` :ivar cert_chain_uri: URI found in the 'up' Link header :ivar authzrs: `list` of `AuthorizationResource`. diff --git a/letsencrypt/acme/messages2_test.py b/letsencrypt/acme/messages2_test.py index 3d94e2bf2..5297d6362 100644 --- a/letsencrypt/acme/messages2_test.py +++ b/letsencrypt/acme/messages2_test.py @@ -72,7 +72,7 @@ class ChallengeResourceTest(unittest.TestCase): def test_uri(self): from letsencrypt.acme.messages2 import ChallengeResource self.assertEqual('http://challb', ChallengeResource(body=mock.MagicMock( - uri='http://challb'), authz_uri='http://authz').uri) + uri='http://challb'), authzr_uri='http://authz').uri) class ChallengeBodyTest(unittest.TestCase): diff --git a/letsencrypt/client/network2.py b/letsencrypt/client/network2.py index c1789808d..13c3e8149 100644 --- a/letsencrypt/client/network2.py +++ b/letsencrypt/client/network2.py @@ -6,12 +6,10 @@ import itertools import logging import time +import M2Crypto import requests import werkzeug -import M2Crypto - -from letsencrypt.acme import challenges from letsencrypt.acme import jose from letsencrypt.acme import messages2 @@ -40,9 +38,13 @@ class Network(object): self.key = key self.alg = alg - def _wrap_in_jws(self, data): - """Wrap `JSONDeSerializable` object in JWS.""" - dumps = data.json_dumps() + def _wrap_in_jws(self, obj): + """Wrap `JSONDeSerializable` object in JWS. + + :rtype: `.JWS` + + """ + dumps = obj.json_dumps() logging.debug('Serialized JSON: %s', dumps) return jose.JWS.sign( payload=dumps, key=self.key, alg=self.alg).json_dumps() @@ -52,11 +54,12 @@ class Network(object): """Check response content and its type. .. note:: - Checking is not strict: skips wrong server response Content-Type - if response is an expected JSON object (c.f. Boulder #56). + Checking is not strict: wrong server response ``Content-Type`` + HTTP header is ignored if response is an expected JSON object + (c.f. Boulder #56). """ - response_ct = response.headers['content-type'] + response_ct = response.headers.get('Content-Type') try: # TODO: response.json() is called twice, once here, and @@ -81,15 +84,12 @@ class Network(object): # response is not JSON object raise errors.NetworkError(response) else: - if jobj is not None and ( - response_ct != cls.JSON_CONTENT_TYPE or - response_ct != cls.JSON_ERROR_CONTENT_TYPE): + if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE: logging.debug( 'Ignoring wrong Content-Type (%r) for JSON decodable ' 'response', response_ct) - if (content_type is not None and response_ct != content_type - and content_type != cls.JSON_CONTENT_TYPE): + if content_type == cls.JSON_CONTENT_TYPE and jobj is None: raise errors.NetworkError( 'Unexpected response Content-Type: {0}'.format(response_ct)) @@ -106,13 +106,13 @@ class Network(object): response = requests.get(uri, **kwargs) except requests.exceptions.RequestException as error: raise errors.NetworkError(error) - self._check_response(response, content_type) + self._check_response(response, content_type=content_type) return response def _post(self, uri, data, content_type=JSON_CONTENT_TYPE, **kwargs): """Send POST data. - :param str content_type: Expected Content-Type, fails if not set. + :param str content_type: Expected ``Content-Type``, fails if not set. :raises letsencrypt.acme.messages2.NetworkError: @@ -127,31 +127,35 @@ class Network(object): raise errors.NetworkError(error) logging.debug('Received response %s: %s', response, response.text) - self._check_response(response, content_type) + self._check_response(response, content_type=content_type) return response @classmethod - def _regr_from_response(cls, response, uri=None, new_authz_uri=None): + def _regr_from_response(cls, response, uri=None, new_authzr_uri=None, + terms_of_service=None): terms_of_service = ( - response.links['next']['url'] - if 'terms-of-service' in response.links else None) + response.links['terms-of-service']['url'] + if 'terms-of-service' in response.links else terms_of_service) - if new_authz_uri is None: + if new_authzr_uri is None: try: - new_authz_uri = response.links['next']['url'] + new_authzr_uri = response.links['next']['url'] except KeyError: raise errors.NetworkError('"next" link missing') return messages2.RegistrationResource( body=messages2.Registration.from_json(response.json()), - uri=response.headers.get('location', uri), - new_authz_uri=new_authz_uri, + uri=response.headers.get('Location', uri), + new_authzr_uri=new_authzr_uri, terms_of_service=terms_of_service) def register(self, contact=messages2.Registration._fields[ 'contact'].default): """Register. + :param contact: Contact list, as accpeted by `.RegistrationResource` + :type contact: `tuple` + :returns: Registration Resource. :rtype: `.RegistrationResource` @@ -188,11 +192,11 @@ class Network(object): # (c.f. acme-spec #94) updated_regr = self._regr_from_response( - response, uri=regr.uri, new_authz_uri=regr.new_authz_uri) + response, uri=regr.uri, new_authzr_uri=regr.new_authzr_uri, + terms_of_service=regr.terms_of_service) if updated_regr != regr: - pass # TODO: Boulder reregisters with new recoveryToken and new URI - #raise errors.UnexpectedUpdate(regr) + raise errors.UnexpectedUpdate(regr) return updated_regr def _authzr_from_response(self, response, identifier, @@ -205,7 +209,7 @@ class Network(object): authzr = messages2.AuthorizationResource( body=messages2.Authorization.from_json(response.json()), - uri=response.headers.get('location', uri), + uri=response.headers.get('Location', uri), new_cert_uri=new_cert_uri) if (authzr.body.key != self.key.public() or authzr.body.identifier != identifier): @@ -223,33 +227,44 @@ class Network(object): """ new_authz = messages2.Authorization(identifier=identifier) - response = self._post(regr.new_authz_uri, self._wrap_in_jws(new_authz)) + response = self._post(regr.new_authzr_uri, self._wrap_in_jws(new_authz)) assert response.status_code == httplib.CREATED # TODO: handle errors return self._authzr_from_response(response, identifier) - def answer_challenge(self, challr, response): + def request_domain_challenges(self, domain, regr): + """Request challenges for domain names.""" + return self.request_challenges(messages2.Identifier( + typ=messages2.IDENTIFIER_FQDN, value=domain), regr) + + def answer_challenge(self, challb, response): """Answer challenge. - :param challr: Corresponding challenge resource. - :type challr: `.ChallengeResource` + :param challb: Challenge Resource body. + :type challb: `.ChallengeBody` - :param response: Challenge response + :param response: Corresponding Challenge response :type response: `.challenges.ChallengeResponse` - :returns: Updated challenge resource. + :returns: Challenge resource with updated body. :rtype: `.ChallengeResource` :raises errors.UnexpectedUpdate: """ - response = self._post(challr.uri, self._wrap_in_jws(response)) - if response.headers['location'] != challr.uri: - raise errors.UnexpectedUpdate(response.headers['location']) - updated_challr = challr.update( - body=challenges.Challenge.from_json(response.json())) - return updated_challr + response = self._post(challb.uri, self._wrap_in_jws(response)) + try: + authzr_uri = response.links['up']['url'] + except KeyError: + raise errors.NetworkError('"up" Link header missing') + challr = messages2.ChallengeResource( + authzr_uri=authzr_uri, + body=messages2.ChallengeBody.from_json(response.json())) + # TODO: check that challr.uri == response.headers['Location']? + if challr.uri != challb.uri: + raise errors.UnexpectedUpdate(challr.uri) + return challr - def answer_challenges(self, challrs, responses): + def answer_challenges(self, challbs, responses): """Answer multiple challenges. .. note:: This is a convenience function to make integration @@ -257,18 +272,35 @@ class Network(object): once restification is over. """ - return [self.answer_challenge(challr, response) - for challr, response in itertools.izip(challrs, responses)] + return [self.answer_challenge(challb, response) + for challb, response in itertools.izip(challbs, responses)] @classmethod - def _retry_after(cls, response, mintime): - retry_after = response.headers.get('Retry-After', str(mintime)) + def retry_after(cls, response, default): + """Compute next `poll` time based on response ``Retry-After`` header. + + :param response: Response from `poll`. + :type response: `requests.Response` + + :param int default: Default value (in seconds), used when + ``Retry-After`` header is not present or invalid. + + :returns: Time point when next `poll` should be performed. + :rtype: `datetime.datetime` + + """ + retry_after = response.headers.get('Retry-After', str(default)) try: seconds = int(retry_after) except ValueError: - return werkzeug.parse_date(retry_after) # pylint: disable=no-member - else: - return datetime.datetime.now() + datetime.timedelta(seconds=seconds) + # pylint: disable=no-member + decoded = werkzeug.parse_date(retry_after) # RFC1123 + if decoded is None: + seconds = default + else: + return decoded + + return datetime.datetime.now() + datetime.timedelta(seconds=seconds) def poll(self, authzr): """Poll Authorization Resource for status. @@ -284,7 +316,7 @@ class Network(object): response = self._get(authzr.uri) updated_authzr = self._authzr_from_response( response, authzr.body.identifier, authzr.uri, authzr.new_cert_uri) - # TODO check UnexpectedUpdate + # TODO: check and raise UnexpectedUpdate return updated_authzr, response @@ -292,11 +324,16 @@ class Network(object): """Request issuance. :param csr: CSR - :type csr: `M2Crypto.X509.Request` + :type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509` :param authzrs: `list` of `.AuthorizationResource` + :returns: Issued certificate + :rtype: `.messages2.CertificateResource` + """ + assert authzrs, "Authorizations list is empty" + # TODO: assert len(authzrs) == number of SANs req = messages2.CertificateRequest( csr=csr, authorizations=tuple(authzr.uri for authzr in authzrs)) @@ -308,18 +345,46 @@ class Network(object): content_type=content_type, headers={'Accept': content_type}) + try: + cert_chain_uri = response.links['up']['url'] + except KeyError: + raise errors.NetworkError('"up" Link missing') + + try: + uri = response.headers['Location'] + except KeyError: + raise errors.NetworkError('"Location" Header missing') + return messages2.CertificateResource( - authzrs=authzrs, - body=M2Crypto.X509.load_cert_der_string(response.text), - cert_chain_uri=response.links['up']['url']) + uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri, + body=jose.ComparableX509( + M2Crypto.X509.load_cert_der_string(response.content))) def poll_and_request_issuance(self, csr, authzrs, mintime=5): """Poll and request issuance. - :param int mintime: Minimum time before next attempt. + This function polls all provided Authorization Resource URIs + until all challenges are valid, respecting ``Retry-After`` HTTP + headers, and then calls `request_issuance`. .. todo:: add `max_attempts` or `timeout` + :param csr: CSR. + :type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509` + + :param authzrs: `list` of `.AuthorizationResource` + + :param int mintime: Minimum time before next attempt, used if + ``Retry-After`` is not present in the response. + + :returns: ``(cert, updated_authzrs)`` `tuple` where ``cert`` is + the issued certificate (`.messages2.CertificateResource.), + and ``updated_authzrs`` is a `tuple` consisting of updated + Authorization Resources (`.AuthorizationResource`) as + present in the responses from server, and in the same order + as the input ``authzrs``. + :rtype: `tuple` + """ # priority queue with datetime (based od Retry-After) as key, # and original Authorization Resource as value @@ -337,25 +402,25 @@ class Network(object): logging.debug('Sleeping for %d seconds', seconds) time.sleep(seconds) - updated_authzr, response = self.poll(authzr) + # Note that we poll with the latest updated Authorization + # URI, which might have a different URI than initial one + updated_authzr, response = self.poll(updated[authzr]) updated[authzr] = updated_authzr - # URI must not change throughout, as we are polling - # original Authorization Resource URI only - assert updated_authzr.uri == authzr if updated_authzr.body.status != messages2.STATUS_VALID: # push back to the priority queue, with updated retry_after - heapq.heappush(waiting, (self._retry_after( - response, mintime=mintime), authzr)) + heapq.heappush(waiting, (self.retry_after( + response, default=mintime), authzr)) - return self.request_issuance(csr, authzrs), tuple( - updated[authzr] for authzr in authzrs) + updated_authzrs = tuple(updated[authzr] for authzr in authzrs) + return self.request_issuance(csr, updated_authzrs), updated_authzrs def _get_cert(self, uri): content_type = self.DER_CONTENT_TYPE # TODO: make it a param response = self._get(uri, headers={'Accept': content_type}, content_type=content_type) - return response, M2Crypto.X509.load_cert_der_string(response.text) + return response, jose.ComparableX509( + M2Crypto.X509.load_cert_der_string(response.content)) def check_cert(self, certr): """Check for new cert. @@ -370,7 +435,9 @@ class Network(object): # TODO: acme-spec 5.1 table action should be renamed to # "refresh cert", and this method integrated with self.refresh response, cert = self._get_cert(certr.uri) - if not response.headers['location'] != certr.uri: + if 'Location' not in response.headers: + raise errors.NetworkError('Location header missing') + if response.headers['Location'] != certr.uri: raise errors.UnexpectedUpdate(response.text) return certr.update(body=cert) @@ -393,7 +460,7 @@ class Network(object): :type certr: `.CertificateResource` :returns: Certificate chain - :rtype: `M2Crypto.X509.X509` + :rtype: `M2Crypto.X509.X509` wrapped in `.ComparableX509` """ return self._get_cert(certr.cert_chain_uri) @@ -401,8 +468,8 @@ class Network(object): def revoke(self, certr, when=messages2.Revocation.NOW): """Revoke certificate. - :param when: When should the revocation take place. - :type when: `.Revocation.When` + :param when: When should the revocation take place? Takes + the same values as `.messages2.Revocation.revoke`. """ rev = messages2.Revocation(revoke=when, authorizations=tuple( diff --git a/letsencrypt/client/tests/network2_test.py b/letsencrypt/client/tests/network2_test.py new file mode 100644 index 000000000..d7aa74929 --- /dev/null +++ b/letsencrypt/client/tests/network2_test.py @@ -0,0 +1,453 @@ +"""Tests for letsencrypt.client.network2.""" +import datetime +import httplib +import os +import pkg_resources +import unittest + +import M2Crypto +import mock +import requests + +from letsencrypt.client import errors + +from letsencrypt.acme import challenges +from letsencrypt.acme import jose +from letsencrypt.acme import messages2 + + +CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string( + pkg_resources.resource_string( + __name__, os.path.join('testdata/cert.pem')))) +CERT2 = jose.ComparableX509(M2Crypto.X509.load_cert_string( + pkg_resources.resource_string( + __name__, os.path.join('testdata/cert-san.pem')))) +CSR = jose.ComparableX509(M2Crypto.X509.load_request_string( + pkg_resources.resource_string( + __name__, os.path.join('testdata/csr.pem')))) +KEY = jose.JWKRSA.load(pkg_resources.resource_string( + __name__, os.path.join('testdata/rsa512_key.pem'))) +KEY2 = jose.JWKRSA.load(pkg_resources.resource_string( + __name__, os.path.join('testdata/rsa256_key.pem'))) + + +class NetworkTest(unittest.TestCase): + """Tests for letsencrypt.client.network2.Network.""" + + # pylint: disable=too-many-instance-attributes,too-many-public-methods + + def setUp(self): + from letsencrypt.client.network2 import Network + self.net = Network( + new_reg_uri='https://www.letsencrypt-demo.org/acme/new-reg', + key=KEY, alg=jose.RS256) + self.response = mock.MagicMock(ok=True, status_code=httplib.OK) + self.response.headers = {} + self.response.links = {} + + self.identifier = messages2.Identifier( + typ=messages2.IDENTIFIER_FQDN, value='example.com') + + # Registration + self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212') + reg = messages2.Registration( + contact=self.contact, key=KEY.public(), recovery_token='t') + self.regr = messages2.RegistrationResource( + body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1', + new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg', + terms_of_service='https://www.letsencrypt-demo.org/tos') + + # Authorization + authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1' + challb = messages2.ChallengeBody( + uri=(authzr_uri + '/1'), status=messages2.STATUS_VALID, + chall=challenges.DNS(token='foo')) + self.challr = messages2.ChallengeResource( + body=challb, authzr_uri=authzr_uri) + self.authz = messages2.Authorization( + identifier=messages2.Identifier( + typ=messages2.IDENTIFIER_FQDN, value='example.com'), + challenges=(challb,), combinations=None, key=KEY.public()) + self.authzr = messages2.AuthorizationResource( + body=self.authz, uri=authzr_uri, + new_cert_uri='https://www.letsencrypt-demo.org/acme/new-cert') + + # Request issuance + self.certr = messages2.CertificateResource( + body=CERT, authzrs=(self.authzr,), + uri='https://www.letsencrypt-demo.org/acme/cert/1', + cert_chain_uri='https://www.letsencrypt-demo.org/ca') + + def _mock_post_get(self): + # pylint: disable=protected-access + self.net._post = mock.MagicMock(return_value=self.response) + self.net._get = mock.MagicMock(return_value=self.response) + + def test_wrap_in_jws(self): + class MockJSONDeSerializable(jose.JSONDeSerializable): + # pylint: disable=missing-docstring + def __init__(self, value): + self.value = value + def to_json(self): + return self.value + @classmethod + def from_json(cls, value): + return cls(value) + # pylint: disable=protected-access + jws = self.net._wrap_in_jws(MockJSONDeSerializable('foo')) + self.assertEqual(jose.JWS.json_loads(jws).payload, '"foo"') + + def test_check_response_not_ok_jobj_no_error(self): + self.response.ok = False + self.response.json.return_value = {} + # pylint: disable=protected-access + self.assertRaises( + errors.NetworkError, self.net._check_response, self.response) + + def test_check_response_not_ok_jobj_error(self): + self.response.ok = False + self.response.json.return_value = messages2.Error(detail='foo') + # pylint: disable=protected-access + self.assertRaises( + messages2.Error, self.net._check_response, self.response) + + def test_check_response_not_ok_no_jobj(self): + self.response.ok = False + self.response.json.side_effect = ValueError + # pylint: disable=protected-access + self.assertRaises( + errors.NetworkError, self.net._check_response, self.response) + + def test_check_response_ok_no_jobj_ct_required(self): + self.response.json.side_effect = ValueError + for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']: + self.response.headers['Content-Type'] = response_ct + # pylint: disable=protected-access + self.assertRaises( + errors.NetworkError, self.net._check_response, self.response, + content_type=self.net.JSON_CONTENT_TYPE) + + def test_check_response_ok_no_jobj_no_ct(self): + self.response.json.side_effect = ValueError + for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']: + self.response.headers['Content-Type'] = response_ct + # pylint: disable=protected-access + self.net._check_response(self.response) + + def test_check_response_jobj(self): + self.response.json.return_value = {} + for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']: + self.response.headers['Content-Type'] = response_ct + # pylint: disable=protected-access + self.net._check_response(self.response) + + @mock.patch('letsencrypt.client.network2.requests') + def test_get_requests_error_passthrough(self, requests_mock): + requests_mock.exceptions = requests.exceptions + requests_mock.get.side_effect = requests.exceptions.RequestException + # pylint: disable=protected-access + self.assertRaises(errors.NetworkError, self.net._get, 'uri') + + @mock.patch('letsencrypt.client.network2.requests') + def test_get(self, requests_mock): + # pylint: disable=protected-access + self.net._check_response = mock.MagicMock() + self.net._get('uri', content_type='ct') + self.net._check_response.assert_called_once_with( + requests_mock.get('uri'), content_type='ct') + + @mock.patch('letsencrypt.client.network2.requests') + def test_post_requests_error_passthrough(self, requests_mock): + requests_mock.exceptions = requests.exceptions + requests_mock.post.side_effect = requests.exceptions.RequestException + # pylint: disable=protected-access + self.assertRaises(errors.NetworkError, self.net._post, 'uri', 'data') + + @mock.patch('letsencrypt.client.network2.requests') + def test_post(self, requests_mock): + # pylint: disable=protected-access + self.net._check_response = mock.MagicMock() + self.net._post('uri', 'data', content_type='ct') + self.net._check_response.assert_called_once_with( + requests_mock.post('uri', 'data'), content_type='ct') + + def test_register(self): + self.response.status_code = httplib.CREATED + self.response.json.return_value = self.regr.body.fully_serialize() + self.response.headers['Location'] = self.regr.uri + self.response.links.update({ + 'next': {'url': self.regr.new_authzr_uri}, + 'terms-of-service': {'url': self.regr.terms_of_service}, + }) + + self._mock_post_get() + self.assertEqual(self.regr, self.net.register(self.contact)) + # TODO: test POST call arguments + + # TODO: split here and separate test + reg_wrong_key = self.regr.body.update(key=KEY2.public()) + self.response.json.return_value = reg_wrong_key.fully_serialize() + self.assertRaises( + errors.UnexpectedUpdate, self.net.register, self.contact) + + def test_register_missing_next(self): + self.response.status_code = httplib.CREATED + self._mock_post_get() + self.assertRaises( + errors.NetworkError, self.net.register, self.regr.body) + + def test_update_registration(self): + self.response.headers['Location'] = self.regr.uri + self.response.json.return_value = self.regr.body.fully_serialize() + self._mock_post_get() + self.assertEqual(self.regr, self.net.update_registration(self.regr)) + + # TODO: split here and separate test + self.response.json.return_value = self.regr.body.update( + contact=()).fully_serialize() + self.assertRaises( + errors.UnexpectedUpdate, self.net.update_registration, self.regr) + + def test_request_challenges(self): + self.response.status_code = httplib.CREATED + self.response.headers['Location'] = self.authzr.uri + self.response.json.return_value = self.authz.fully_serialize() + self.response.links = { + 'next': {'url': self.authzr.new_cert_uri}, + } + + self._mock_post_get() + self.net.request_challenges(self.identifier, self.regr) + # TODO: test POST call arguments + + # TODO: split here and separate test + authz_wrong_key = self.authz.update(key=KEY2.public()) + self.response.json.return_value = authz_wrong_key.fully_serialize() + self.assertRaises( + errors.UnexpectedUpdate, self.net.request_challenges, + self.identifier, self.regr) + + def test_request_challenges_missing_next(self): + self.response.status_code = httplib.CREATED + self._mock_post_get() + self.assertRaises( + errors.NetworkError, self.net.request_challenges, + self.identifier, self.regr) + + def test_request_domain_challenges(self): + self.net.request_challenges = mock.MagicMock() + self.assertEqual( + self.net.request_challenges(self.identifier), + self.net.request_domain_challenges('example.com', self.regr)) + + def test_answer_challenge(self): + self.response.links['up'] = {'url': self.challr.authzr_uri} + self.response.json.return_value = self.challr.body.fully_serialize() + + chall_response = challenges.DNSResponse() + + self._mock_post_get() + self.net.answer_challenge(self.challr.body, chall_response) + + # TODO: split here and separate test + self.assertRaises(errors.UnexpectedUpdate, self.net.answer_challenge, + self.challr.body.update(uri='foo'), chall_response) + + def test_answer_challenge_missing_next(self): + self._mock_post_get() + self.assertRaises(errors.NetworkError, self.net.answer_challenge, + self.challr.body, challenges.DNSResponse()) + + def test_answer_challenges(self): + self.net.answer_challenge = mock.MagicMock() + self.assertEqual( + [self.net.answer_challenge( + self.challr.body, challenges.DNSResponse())], + self.net.answer_challenges( + [self.challr.body], [challenges.DNSResponse()])) + + def test_retry_after_date(self): + self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT' + self.assertEqual( + datetime.datetime(1999, 12, 31, 23, 59, 59), + self.net.retry_after(response=self.response, default=10)) + + @mock.patch('letsencrypt.client.network2.datetime') + def test_retry_after_invalid(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) + dt_mock.timedelta = datetime.timedelta + + self.response.headers['Retry-After'] = 'foooo' + self.assertEqual( + datetime.datetime(2015, 3, 27, 0, 0, 10), + self.net.retry_after(response=self.response, default=10)) + + @mock.patch('letsencrypt.client.network2.datetime') + def test_retry_after_seconds(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) + dt_mock.timedelta = datetime.timedelta + + self.response.headers['Retry-After'] = '50' + self.assertEqual( + datetime.datetime(2015, 3, 27, 0, 0, 50), + self.net.retry_after(response=self.response, default=10)) + + @mock.patch('letsencrypt.client.network2.datetime') + def test_retry_after_missing(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) + dt_mock.timedelta = datetime.timedelta + + self.assertEqual( + datetime.datetime(2015, 3, 27, 0, 0, 10), + self.net.retry_after(response=self.response, default=10)) + + def test_poll(self): + self.response.json.return_value = self.authzr.body.fully_serialize() + self._mock_post_get() + self.assertEqual((self.authzr, self.response), + self.net.poll(self.authzr)) + + def test_request_issuance(self): + self.response.content = CERT.as_der() + self.response.headers['Location'] = self.certr.uri + self.response.links['up'] = {'url': self.certr.cert_chain_uri} + self._mock_post_get() + self.assertEqual( + self.certr, self.net.request_issuance(CSR, (self.authzr,))) + # TODO: check POST args + + def test_request_issuance_missing_up(self): + self._mock_post_get() + self.assertRaises( + errors.NetworkError, self.net.request_issuance, + CSR, (self.authzr,)) + + def test_request_issuance_missing_location(self): + self.response.links['up'] = {'url': self.certr.cert_chain_uri} + self._mock_post_get() + self.assertRaises( + errors.NetworkError, self.net.request_issuance, + CSR, (self.authzr,)) + + @mock.patch('letsencrypt.client.network2.datetime') + @mock.patch('letsencrypt.client.network2.time') + def test_poll_and_request_issuance(self, time_mock, dt_mock): + # clock.dt | pylint: disable=no-member + clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27)) + + def sleep(seconds): + """increment clock""" + clock.dt += datetime.timedelta(seconds=seconds) + time_mock.sleep.side_effect = sleep + + def now(): + """return current clock value""" + return clock.dt + dt_mock.datetime.now.side_effect = now + dt_mock.timedelta = datetime.timedelta + + def poll(authzr): # pylint: disable=missing-docstring + # record poll start time based on the current clock value + authzr.times.append(clock.dt) + + # suppose it takes 2 seconds for server to produce the + # result, increment clock + clock.dt += datetime.timedelta(seconds=2) + + if not authzr.retries: # no more retries + done = mock.MagicMock(uri=authzr.uri, times=authzr.times) + done.body.status = messages2.STATUS_VALID + return done, [] + + # response (2nd result tuple element) is reduced to only + # Retry-After header contents represented as integer + # seconds; authzr.retries is a list of Retry-After + # headers, head(retries) is peeled of as a current + # Retry-After header, and tail(retries) is persisted for + # later poll() calls + return (mock.MagicMock(retries=authzr.retries[1:], + uri=authzr.uri + '.', times=authzr.times), + authzr.retries[0]) + self.net.poll = mock.MagicMock(side_effect=poll) + + mintime = 7 + + def retry_after(response, default): # pylint: disable=missing-docstring + # check that poll_and_request_issuance correctly passes mintime + self.assertEqual(default, mintime) + return clock.dt + datetime.timedelta(seconds=response) + self.net.retry_after = mock.MagicMock(side_effect=retry_after) + + def request_issuance(csr, authzrs): # pylint: disable=missing-docstring + return csr, authzrs + self.net.request_issuance = mock.MagicMock(side_effect=request_issuance) + + csr = mock.MagicMock() + authzrs = ( + mock.MagicMock(uri='a', times=[], retries=(8, 20, 30)), + mock.MagicMock(uri='b', times=[], retries=(5,)), + ) + + cert, updated_authzrs = self.net.poll_and_request_issuance( + csr, authzrs, mintime=mintime) + self.assertTrue(cert[0] is csr) + self.assertTrue(cert[1] is updated_authzrs) + self.assertEqual(updated_authzrs[0].uri, 'a...') + self.assertEqual(updated_authzrs[1].uri, 'b.') + self.assertEqual(updated_authzrs[0].times, [ + datetime.datetime(2015, 3, 27), + # a is scheduled for 10, but b is polling [9..11), so it + # will be picked up as soon as b is finished, without + # additional sleeping + datetime.datetime(2015, 3, 27, 0, 0, 11), + datetime.datetime(2015, 3, 27, 0, 0, 33), + datetime.datetime(2015, 3, 27, 0, 1, 5), + ]) + self.assertEqual(updated_authzrs[1].times, [ + datetime.datetime(2015, 3, 27, 0, 0, 2), + datetime.datetime(2015, 3, 27, 0, 0, 9), + ]) + self.assertEqual(clock.dt, datetime.datetime(2015, 3, 27, 0, 1, 7)) + + def test_check_cert(self): + self.response.headers['Location'] = self.certr.uri + self.response.content = CERT2.as_der() + self._mock_post_get() + self.assertEqual( + self.certr.update(body=CERT2), self.net.check_cert(self.certr)) + + # TODO: split here and separate test + self.response.headers['Location'] = 'foo' + self.assertRaises( + errors.UnexpectedUpdate, self.net.check_cert, self.certr) + + def test_check_cert_missing_location(self): + self.response.content = CERT2.as_der() + self._mock_post_get() + self.assertRaises(errors.NetworkError, self.net.check_cert, self.certr) + + def test_refresh(self): + self.net.check_cert = mock.MagicMock() + self.assertEqual( + self.net.check_cert(self.certr), self.net.refresh(self.certr)) + + def test_fetch_chain(self): + # pylint: disable=protected-access + self.net._get_cert = mock.MagicMock() + self.assertEqual(self.net._get_cert(self.certr.cert_chain_uri), + self.net.fetch_chain(self.certr)) + + def test_revoke(self): + self._mock_post_get() + self.net.revoke(self.certr, when=messages2.Revocation.NOW) + # pylint: disable=protected-access + self.net._post.assert_called_once_with(self.certr.uri, mock.ANY) + + def test_revoke_bad_status_raises_error(self): + self.response.status_code = httplib.METHOD_NOT_ALLOWED + self._mock_post_get() + self.assertRaises(errors.NetworkError, self.net.revoke, self.certr) + + +if __name__ == '__main__': + unittest.main()