Test, lint, and docs for network2

This commit is contained in:
Jakub Warmuz 2015-03-27 10:33:07 +00:00
parent 1349b5241c
commit fadad74d48
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
4 changed files with 595 additions and 78 deletions

View file

@ -117,10 +117,10 @@ class RegistrationResource(Resource):
:ivar body: `Registration`
:ivar str uri: URI of the resource.
:ivar new_authz_uri: URI found in the 'next' Link header
:ivar new_authzr_uri: URI found in the 'next' Link header
"""
__slots__ = ('body', 'uri', 'new_authz_uri', 'terms_of_service')
__slots__ = ('body', 'uri', 'new_authzr_uri', 'terms_of_service')
class Registration(ResourceBody):
@ -138,10 +138,10 @@ class ChallengeResource(Resource, jose.JSONObjectWithFields):
"""Challenge resource.
:ivar body: `.challenges.ChallengeBody`
:ivar authz_uri: URI found in the 'up' Link header.
:ivar authzr_uri: URI found in the 'up' Link header.
"""
__slots__ = ('body', 'authz_uri')
__slots__ = ('body', 'authzr_uri')
@property
def uri(self): # pylint: disable=missing-docstring,no-self-argument
@ -217,10 +217,7 @@ class Authorization(ResourceBody):
@challenges.decoder
def challenges(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(
ChallengeResource(
body=ChallengeBody.from_json(chall), authz_uri=None)
for chall in value)
return tuple(ChallengeBody.from_json(chall) for chall in value)
@property
def resolved_combinations(self):
@ -232,7 +229,7 @@ class Authorization(ResourceBody):
class CertificateRequest(jose.JSONObjectWithFields):
"""ACME new-cert request.
:ivar csr: `M2Crypto.X509.Request`
:ivar csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
"""
csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
@ -242,7 +239,7 @@ class CertificateRequest(jose.JSONObjectWithFields):
class CertificateResource(Resource):
"""Authorization resource.
:ivar body: `M2Crypto.X509.X509`
:ivar body: `M2Crypto.X509.X509` wrapped in `.ComparableX509`
:ivar cert_chain_uri: URI found in the 'up' Link header
:ivar authzrs: `list` of `AuthorizationResource`.

View file

@ -72,7 +72,7 @@ class ChallengeResourceTest(unittest.TestCase):
def test_uri(self):
from letsencrypt.acme.messages2 import ChallengeResource
self.assertEqual('http://challb', ChallengeResource(body=mock.MagicMock(
uri='http://challb'), authz_uri='http://authz').uri)
uri='http://challb'), authzr_uri='http://authz').uri)
class ChallengeBodyTest(unittest.TestCase):

View file

@ -6,12 +6,10 @@ import itertools
import logging
import time
import M2Crypto
import requests
import werkzeug
import M2Crypto
from letsencrypt.acme import challenges
from letsencrypt.acme import jose
from letsencrypt.acme import messages2
@ -40,9 +38,13 @@ class Network(object):
self.key = key
self.alg = alg
def _wrap_in_jws(self, data):
"""Wrap `JSONDeSerializable` object in JWS."""
dumps = data.json_dumps()
def _wrap_in_jws(self, obj):
"""Wrap `JSONDeSerializable` object in JWS.
:rtype: `.JWS`
"""
dumps = obj.json_dumps()
logging.debug('Serialized JSON: %s', dumps)
return jose.JWS.sign(
payload=dumps, key=self.key, alg=self.alg).json_dumps()
@ -52,11 +54,12 @@ class Network(object):
"""Check response content and its type.
.. note::
Checking is not strict: skips wrong server response Content-Type
if response is an expected JSON object (c.f. Boulder #56).
Checking is not strict: wrong server response ``Content-Type``
HTTP header is ignored if response is an expected JSON object
(c.f. Boulder #56).
"""
response_ct = response.headers['content-type']
response_ct = response.headers.get('Content-Type')
try:
# TODO: response.json() is called twice, once here, and
@ -81,15 +84,12 @@ class Network(object):
# response is not JSON object
raise errors.NetworkError(response)
else:
if jobj is not None and (
response_ct != cls.JSON_CONTENT_TYPE or
response_ct != cls.JSON_ERROR_CONTENT_TYPE):
if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE:
logging.debug(
'Ignoring wrong Content-Type (%r) for JSON decodable '
'response', response_ct)
if (content_type is not None and response_ct != content_type
and content_type != cls.JSON_CONTENT_TYPE):
if content_type == cls.JSON_CONTENT_TYPE and jobj is None:
raise errors.NetworkError(
'Unexpected response Content-Type: {0}'.format(response_ct))
@ -106,13 +106,13 @@ class Network(object):
response = requests.get(uri, **kwargs)
except requests.exceptions.RequestException as error:
raise errors.NetworkError(error)
self._check_response(response, content_type)
self._check_response(response, content_type=content_type)
return response
def _post(self, uri, data, content_type=JSON_CONTENT_TYPE, **kwargs):
"""Send POST data.
:param str content_type: Expected Content-Type, fails if not set.
:param str content_type: Expected ``Content-Type``, fails if not set.
:raises letsencrypt.acme.messages2.NetworkError:
@ -127,31 +127,35 @@ class Network(object):
raise errors.NetworkError(error)
logging.debug('Received response %s: %s', response, response.text)
self._check_response(response, content_type)
self._check_response(response, content_type=content_type)
return response
@classmethod
def _regr_from_response(cls, response, uri=None, new_authz_uri=None):
def _regr_from_response(cls, response, uri=None, new_authzr_uri=None,
terms_of_service=None):
terms_of_service = (
response.links['next']['url']
if 'terms-of-service' in response.links else None)
response.links['terms-of-service']['url']
if 'terms-of-service' in response.links else terms_of_service)
if new_authz_uri is None:
if new_authzr_uri is None:
try:
new_authz_uri = response.links['next']['url']
new_authzr_uri = response.links['next']['url']
except KeyError:
raise errors.NetworkError('"next" link missing')
return messages2.RegistrationResource(
body=messages2.Registration.from_json(response.json()),
uri=response.headers.get('location', uri),
new_authz_uri=new_authz_uri,
uri=response.headers.get('Location', uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service)
def register(self, contact=messages2.Registration._fields[
'contact'].default):
"""Register.
:param contact: Contact list, as accpeted by `.RegistrationResource`
:type contact: `tuple`
:returns: Registration Resource.
:rtype: `.RegistrationResource`
@ -188,11 +192,11 @@ class Network(object):
# (c.f. acme-spec #94)
updated_regr = self._regr_from_response(
response, uri=regr.uri, new_authz_uri=regr.new_authz_uri)
response, uri=regr.uri, new_authzr_uri=regr.new_authzr_uri,
terms_of_service=regr.terms_of_service)
if updated_regr != regr:
pass
# TODO: Boulder reregisters with new recoveryToken and new URI
#raise errors.UnexpectedUpdate(regr)
raise errors.UnexpectedUpdate(regr)
return updated_regr
def _authzr_from_response(self, response, identifier,
@ -205,7 +209,7 @@ class Network(object):
authzr = messages2.AuthorizationResource(
body=messages2.Authorization.from_json(response.json()),
uri=response.headers.get('location', uri),
uri=response.headers.get('Location', uri),
new_cert_uri=new_cert_uri)
if (authzr.body.key != self.key.public()
or authzr.body.identifier != identifier):
@ -223,33 +227,44 @@ class Network(object):
"""
new_authz = messages2.Authorization(identifier=identifier)
response = self._post(regr.new_authz_uri, self._wrap_in_jws(new_authz))
response = self._post(regr.new_authzr_uri, self._wrap_in_jws(new_authz))
assert response.status_code == httplib.CREATED # TODO: handle errors
return self._authzr_from_response(response, identifier)
def answer_challenge(self, challr, response):
def request_domain_challenges(self, domain, regr):
"""Request challenges for domain names."""
return self.request_challenges(messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value=domain), regr)
def answer_challenge(self, challb, response):
"""Answer challenge.
:param challr: Corresponding challenge resource.
:type challr: `.ChallengeResource`
:param challb: Challenge Resource body.
:type challb: `.ChallengeBody`
:param response: Challenge response
:param response: Corresponding Challenge response
:type response: `.challenges.ChallengeResponse`
:returns: Updated challenge resource.
:returns: Challenge resource with updated body.
:rtype: `.ChallengeResource`
:raises errors.UnexpectedUpdate:
"""
response = self._post(challr.uri, self._wrap_in_jws(response))
if response.headers['location'] != challr.uri:
raise errors.UnexpectedUpdate(response.headers['location'])
updated_challr = challr.update(
body=challenges.Challenge.from_json(response.json()))
return updated_challr
response = self._post(challb.uri, self._wrap_in_jws(response))
try:
authzr_uri = response.links['up']['url']
except KeyError:
raise errors.NetworkError('"up" Link header missing')
challr = messages2.ChallengeResource(
authzr_uri=authzr_uri,
body=messages2.ChallengeBody.from_json(response.json()))
# TODO: check that challr.uri == response.headers['Location']?
if challr.uri != challb.uri:
raise errors.UnexpectedUpdate(challr.uri)
return challr
def answer_challenges(self, challrs, responses):
def answer_challenges(self, challbs, responses):
"""Answer multiple challenges.
.. note:: This is a convenience function to make integration
@ -257,18 +272,35 @@ class Network(object):
once restification is over.
"""
return [self.answer_challenge(challr, response)
for challr, response in itertools.izip(challrs, responses)]
return [self.answer_challenge(challb, response)
for challb, response in itertools.izip(challbs, responses)]
@classmethod
def _retry_after(cls, response, mintime):
retry_after = response.headers.get('Retry-After', str(mintime))
def retry_after(cls, response, default):
"""Compute next `poll` time based on response ``Retry-After`` header.
:param response: Response from `poll`.
:type response: `requests.Response`
:param int default: Default value (in seconds), used when
``Retry-After`` header is not present or invalid.
:returns: Time point when next `poll` should be performed.
:rtype: `datetime.datetime`
"""
retry_after = response.headers.get('Retry-After', str(default))
try:
seconds = int(retry_after)
except ValueError:
return werkzeug.parse_date(retry_after) # pylint: disable=no-member
else:
return datetime.datetime.now() + datetime.timedelta(seconds=seconds)
# pylint: disable=no-member
decoded = werkzeug.parse_date(retry_after) # RFC1123
if decoded is None:
seconds = default
else:
return decoded
return datetime.datetime.now() + datetime.timedelta(seconds=seconds)
def poll(self, authzr):
"""Poll Authorization Resource for status.
@ -284,7 +316,7 @@ class Network(object):
response = self._get(authzr.uri)
updated_authzr = self._authzr_from_response(
response, authzr.body.identifier, authzr.uri, authzr.new_cert_uri)
# TODO check UnexpectedUpdate
# TODO: check and raise UnexpectedUpdate
return updated_authzr, response
@ -292,11 +324,16 @@ class Network(object):
"""Request issuance.
:param csr: CSR
:type csr: `M2Crypto.X509.Request`
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
:param authzrs: `list` of `.AuthorizationResource`
:returns: Issued certificate
:rtype: `.messages2.CertificateResource`
"""
assert authzrs, "Authorizations list is empty"
# TODO: assert len(authzrs) == number of SANs
req = messages2.CertificateRequest(
csr=csr, authorizations=tuple(authzr.uri for authzr in authzrs))
@ -308,18 +345,46 @@ class Network(object):
content_type=content_type,
headers={'Accept': content_type})
try:
cert_chain_uri = response.links['up']['url']
except KeyError:
raise errors.NetworkError('"up" Link missing')
try:
uri = response.headers['Location']
except KeyError:
raise errors.NetworkError('"Location" Header missing')
return messages2.CertificateResource(
authzrs=authzrs,
body=M2Crypto.X509.load_cert_der_string(response.text),
cert_chain_uri=response.links['up']['url'])
uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri,
body=jose.ComparableX509(
M2Crypto.X509.load_cert_der_string(response.content)))
def poll_and_request_issuance(self, csr, authzrs, mintime=5):
"""Poll and request issuance.
:param int mintime: Minimum time before next attempt.
This function polls all provided Authorization Resource URIs
until all challenges are valid, respecting ``Retry-After`` HTTP
headers, and then calls `request_issuance`.
.. todo:: add `max_attempts` or `timeout`
:param csr: CSR.
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
:param authzrs: `list` of `.AuthorizationResource`
:param int mintime: Minimum time before next attempt, used if
``Retry-After`` is not present in the response.
:returns: ``(cert, updated_authzrs)`` `tuple` where ``cert`` is
the issued certificate (`.messages2.CertificateResource.),
and ``updated_authzrs`` is a `tuple` consisting of updated
Authorization Resources (`.AuthorizationResource`) as
present in the responses from server, and in the same order
as the input ``authzrs``.
:rtype: `tuple`
"""
# priority queue with datetime (based od Retry-After) as key,
# and original Authorization Resource as value
@ -337,25 +402,25 @@ class Network(object):
logging.debug('Sleeping for %d seconds', seconds)
time.sleep(seconds)
updated_authzr, response = self.poll(authzr)
# Note that we poll with the latest updated Authorization
# URI, which might have a different URI than initial one
updated_authzr, response = self.poll(updated[authzr])
updated[authzr] = updated_authzr
# URI must not change throughout, as we are polling
# original Authorization Resource URI only
assert updated_authzr.uri == authzr
if updated_authzr.body.status != messages2.STATUS_VALID:
# push back to the priority queue, with updated retry_after
heapq.heappush(waiting, (self._retry_after(
response, mintime=mintime), authzr))
heapq.heappush(waiting, (self.retry_after(
response, default=mintime), authzr))
return self.request_issuance(csr, authzrs), tuple(
updated[authzr] for authzr in authzrs)
updated_authzrs = tuple(updated[authzr] for authzr in authzrs)
return self.request_issuance(csr, updated_authzrs), updated_authzrs
def _get_cert(self, uri):
content_type = self.DER_CONTENT_TYPE # TODO: make it a param
response = self._get(uri, headers={'Accept': content_type},
content_type=content_type)
return response, M2Crypto.X509.load_cert_der_string(response.text)
return response, jose.ComparableX509(
M2Crypto.X509.load_cert_der_string(response.content))
def check_cert(self, certr):
"""Check for new cert.
@ -370,7 +435,9 @@ class Network(object):
# TODO: acme-spec 5.1 table action should be renamed to
# "refresh cert", and this method integrated with self.refresh
response, cert = self._get_cert(certr.uri)
if not response.headers['location'] != certr.uri:
if 'Location' not in response.headers:
raise errors.NetworkError('Location header missing')
if response.headers['Location'] != certr.uri:
raise errors.UnexpectedUpdate(response.text)
return certr.update(body=cert)
@ -393,7 +460,7 @@ class Network(object):
:type certr: `.CertificateResource`
:returns: Certificate chain
:rtype: `M2Crypto.X509.X509`
:rtype: `M2Crypto.X509.X509` wrapped in `.ComparableX509`
"""
return self._get_cert(certr.cert_chain_uri)
@ -401,8 +468,8 @@ class Network(object):
def revoke(self, certr, when=messages2.Revocation.NOW):
"""Revoke certificate.
:param when: When should the revocation take place.
:type when: `.Revocation.When`
:param when: When should the revocation take place? Takes
the same values as `.messages2.Revocation.revoke`.
"""
rev = messages2.Revocation(revoke=when, authorizations=tuple(

View file

@ -0,0 +1,453 @@
"""Tests for letsencrypt.client.network2."""
import datetime
import httplib
import os
import pkg_resources
import unittest
import M2Crypto
import mock
import requests
from letsencrypt.client import errors
from letsencrypt.acme import challenges
from letsencrypt.acme import jose
from letsencrypt.acme import messages2
CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata/cert.pem'))))
CERT2 = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata/cert-san.pem'))))
CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata/csr.pem'))))
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
__name__, os.path.join('testdata/rsa512_key.pem')))
KEY2 = jose.JWKRSA.load(pkg_resources.resource_string(
__name__, os.path.join('testdata/rsa256_key.pem')))
class NetworkTest(unittest.TestCase):
"""Tests for letsencrypt.client.network2.Network."""
# pylint: disable=too-many-instance-attributes,too-many-public-methods
def setUp(self):
from letsencrypt.client.network2 import Network
self.net = Network(
new_reg_uri='https://www.letsencrypt-demo.org/acme/new-reg',
key=KEY, alg=jose.RS256)
self.response = mock.MagicMock(ok=True, status_code=httplib.OK)
self.response.headers = {}
self.response.links = {}
self.identifier = messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value='example.com')
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages2.Registration(
contact=self.contact, key=KEY.public(), recovery_token='t')
self.regr = messages2.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg',
terms_of_service='https://www.letsencrypt-demo.org/tos')
# Authorization
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
challb = messages2.ChallengeBody(
uri=(authzr_uri + '/1'), status=messages2.STATUS_VALID,
chall=challenges.DNS(token='foo'))
self.challr = messages2.ChallengeResource(
body=challb, authzr_uri=authzr_uri)
self.authz = messages2.Authorization(
identifier=messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value='example.com'),
challenges=(challb,), combinations=None, key=KEY.public())
self.authzr = messages2.AuthorizationResource(
body=self.authz, uri=authzr_uri,
new_cert_uri='https://www.letsencrypt-demo.org/acme/new-cert')
# Request issuance
self.certr = messages2.CertificateResource(
body=CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
def _mock_post_get(self):
# pylint: disable=protected-access
self.net._post = mock.MagicMock(return_value=self.response)
self.net._get = mock.MagicMock(return_value=self.response)
def test_wrap_in_jws(self):
class MockJSONDeSerializable(jose.JSONDeSerializable):
# pylint: disable=missing-docstring
def __init__(self, value):
self.value = value
def to_json(self):
return self.value
@classmethod
def from_json(cls, value):
return cls(value)
# pylint: disable=protected-access
jws = self.net._wrap_in_jws(MockJSONDeSerializable('foo'))
self.assertEqual(jose.JWS.json_loads(jws).payload, '"foo"')
def test_check_response_not_ok_jobj_no_error(self):
self.response.ok = False
self.response.json.return_value = {}
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response)
def test_check_response_not_ok_jobj_error(self):
self.response.ok = False
self.response.json.return_value = messages2.Error(detail='foo')
# pylint: disable=protected-access
self.assertRaises(
messages2.Error, self.net._check_response, self.response)
def test_check_response_not_ok_no_jobj(self):
self.response.ok = False
self.response.json.side_effect = ValueError
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response)
def test_check_response_ok_no_jobj_ct_required(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response,
content_type=self.net.JSON_CONTENT_TYPE)
def test_check_response_ok_no_jobj_no_ct(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.net._check_response(self.response)
def test_check_response_jobj(self):
self.response.json.return_value = {}
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.net._check_response(self.response)
@mock.patch('letsencrypt.client.network2.requests')
def test_get_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.get.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self.assertRaises(errors.NetworkError, self.net._get, 'uri')
@mock.patch('letsencrypt.client.network2.requests')
def test_get(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self.net._get('uri', content_type='ct')
self.net._check_response.assert_called_once_with(
requests_mock.get('uri'), content_type='ct')
@mock.patch('letsencrypt.client.network2.requests')
def test_post_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.post.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self.assertRaises(errors.NetworkError, self.net._post, 'uri', 'data')
@mock.patch('letsencrypt.client.network2.requests')
def test_post(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self.net._post('uri', 'data', content_type='ct')
self.net._check_response.assert_called_once_with(
requests_mock.post('uri', 'data'), content_type='ct')
def test_register(self):
self.response.status_code = httplib.CREATED
self.response.json.return_value = self.regr.body.fully_serialize()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
'next': {'url': self.regr.new_authzr_uri},
'terms-of-service': {'url': self.regr.terms_of_service},
})
self._mock_post_get()
self.assertEqual(self.regr, self.net.register(self.contact))
# TODO: test POST call arguments
# TODO: split here and separate test
reg_wrong_key = self.regr.body.update(key=KEY2.public())
self.response.json.return_value = reg_wrong_key.fully_serialize()
self.assertRaises(
errors.UnexpectedUpdate, self.net.register, self.contact)
def test_register_missing_next(self):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.register, self.regr.body)
def test_update_registration(self):
self.response.headers['Location'] = self.regr.uri
self.response.json.return_value = self.regr.body.fully_serialize()
self._mock_post_get()
self.assertEqual(self.regr, self.net.update_registration(self.regr))
# TODO: split here and separate test
self.response.json.return_value = self.regr.body.update(
contact=()).fully_serialize()
self.assertRaises(
errors.UnexpectedUpdate, self.net.update_registration, self.regr)
def test_request_challenges(self):
self.response.status_code = httplib.CREATED
self.response.headers['Location'] = self.authzr.uri
self.response.json.return_value = self.authz.fully_serialize()
self.response.links = {
'next': {'url': self.authzr.new_cert_uri},
}
self._mock_post_get()
self.net.request_challenges(self.identifier, self.regr)
# TODO: test POST call arguments
# TODO: split here and separate test
authz_wrong_key = self.authz.update(key=KEY2.public())
self.response.json.return_value = authz_wrong_key.fully_serialize()
self.assertRaises(
errors.UnexpectedUpdate, self.net.request_challenges,
self.identifier, self.regr)
def test_request_challenges_missing_next(self):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.request_challenges,
self.identifier, self.regr)
def test_request_domain_challenges(self):
self.net.request_challenges = mock.MagicMock()
self.assertEqual(
self.net.request_challenges(self.identifier),
self.net.request_domain_challenges('example.com', self.regr))
def test_answer_challenge(self):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.fully_serialize()
chall_response = challenges.DNSResponse()
self._mock_post_get()
self.net.answer_challenge(self.challr.body, chall_response)
# TODO: split here and separate test
self.assertRaises(errors.UnexpectedUpdate, self.net.answer_challenge,
self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self):
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.answer_challenge,
self.challr.body, challenges.DNSResponse())
def test_answer_challenges(self):
self.net.answer_challenge = mock.MagicMock()
self.assertEqual(
[self.net.answer_challenge(
self.challr.body, challenges.DNSResponse())],
self.net.answer_challenges(
[self.challr.body], [challenges.DNSResponse()]))
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.assertEqual(
datetime.datetime(1999, 12, 31, 23, 59, 59),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.client.network2.datetime')
def test_retry_after_invalid(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = 'foooo'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.client.network2.datetime')
def test_retry_after_seconds(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 50),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.client.network2.datetime')
def test_retry_after_missing(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.net.retry_after(response=self.response, default=10))
def test_poll(self):
self.response.json.return_value = self.authzr.body.fully_serialize()
self._mock_post_get()
self.assertEqual((self.authzr, self.response),
self.net.poll(self.authzr))
def test_request_issuance(self):
self.response.content = CERT.as_der()
self.response.headers['Location'] = self.certr.uri
self.response.links['up'] = {'url': self.certr.cert_chain_uri}
self._mock_post_get()
self.assertEqual(
self.certr, self.net.request_issuance(CSR, (self.authzr,)))
# TODO: check POST args
def test_request_issuance_missing_up(self):
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.request_issuance,
CSR, (self.authzr,))
def test_request_issuance_missing_location(self):
self.response.links['up'] = {'url': self.certr.cert_chain_uri}
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.request_issuance,
CSR, (self.authzr,))
@mock.patch('letsencrypt.client.network2.datetime')
@mock.patch('letsencrypt.client.network2.time')
def test_poll_and_request_issuance(self, time_mock, dt_mock):
# clock.dt | pylint: disable=no-member
clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27))
def sleep(seconds):
"""increment clock"""
clock.dt += datetime.timedelta(seconds=seconds)
time_mock.sleep.side_effect = sleep
def now():
"""return current clock value"""
return clock.dt
dt_mock.datetime.now.side_effect = now
dt_mock.timedelta = datetime.timedelta
def poll(authzr): # pylint: disable=missing-docstring
# record poll start time based on the current clock value
authzr.times.append(clock.dt)
# suppose it takes 2 seconds for server to produce the
# result, increment clock
clock.dt += datetime.timedelta(seconds=2)
if not authzr.retries: # no more retries
done = mock.MagicMock(uri=authzr.uri, times=authzr.times)
done.body.status = messages2.STATUS_VALID
return done, []
# response (2nd result tuple element) is reduced to only
# Retry-After header contents represented as integer
# seconds; authzr.retries is a list of Retry-After
# headers, head(retries) is peeled of as a current
# Retry-After header, and tail(retries) is persisted for
# later poll() calls
return (mock.MagicMock(retries=authzr.retries[1:],
uri=authzr.uri + '.', times=authzr.times),
authzr.retries[0])
self.net.poll = mock.MagicMock(side_effect=poll)
mintime = 7
def retry_after(response, default): # pylint: disable=missing-docstring
# check that poll_and_request_issuance correctly passes mintime
self.assertEqual(default, mintime)
return clock.dt + datetime.timedelta(seconds=response)
self.net.retry_after = mock.MagicMock(side_effect=retry_after)
def request_issuance(csr, authzrs): # pylint: disable=missing-docstring
return csr, authzrs
self.net.request_issuance = mock.MagicMock(side_effect=request_issuance)
csr = mock.MagicMock()
authzrs = (
mock.MagicMock(uri='a', times=[], retries=(8, 20, 30)),
mock.MagicMock(uri='b', times=[], retries=(5,)),
)
cert, updated_authzrs = self.net.poll_and_request_issuance(
csr, authzrs, mintime=mintime)
self.assertTrue(cert[0] is csr)
self.assertTrue(cert[1] is updated_authzrs)
self.assertEqual(updated_authzrs[0].uri, 'a...')
self.assertEqual(updated_authzrs[1].uri, 'b.')
self.assertEqual(updated_authzrs[0].times, [
datetime.datetime(2015, 3, 27),
# a is scheduled for 10, but b is polling [9..11), so it
# will be picked up as soon as b is finished, without
# additional sleeping
datetime.datetime(2015, 3, 27, 0, 0, 11),
datetime.datetime(2015, 3, 27, 0, 0, 33),
datetime.datetime(2015, 3, 27, 0, 1, 5),
])
self.assertEqual(updated_authzrs[1].times, [
datetime.datetime(2015, 3, 27, 0, 0, 2),
datetime.datetime(2015, 3, 27, 0, 0, 9),
])
self.assertEqual(clock.dt, datetime.datetime(2015, 3, 27, 0, 1, 7))
def test_check_cert(self):
self.response.headers['Location'] = self.certr.uri
self.response.content = CERT2.as_der()
self._mock_post_get()
self.assertEqual(
self.certr.update(body=CERT2), self.net.check_cert(self.certr))
# TODO: split here and separate test
self.response.headers['Location'] = 'foo'
self.assertRaises(
errors.UnexpectedUpdate, self.net.check_cert, self.certr)
def test_check_cert_missing_location(self):
self.response.content = CERT2.as_der()
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.check_cert, self.certr)
def test_refresh(self):
self.net.check_cert = mock.MagicMock()
self.assertEqual(
self.net.check_cert(self.certr), self.net.refresh(self.certr))
def test_fetch_chain(self):
# pylint: disable=protected-access
self.net._get_cert = mock.MagicMock()
self.assertEqual(self.net._get_cert(self.certr.cert_chain_uri),
self.net.fetch_chain(self.certr))
def test_revoke(self):
self._mock_post_get()
self.net.revoke(self.certr, when=messages2.Revocation.NOW)
# pylint: disable=protected-access
self.net._post.assert_called_once_with(self.certr.uri, mock.ANY)
def test_revoke_bad_status_raises_error(self):
self.response.status_code = httplib.METHOD_NOT_ALLOWED
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.revoke, self.certr)
if __name__ == '__main__':
unittest.main()