Move letsencrypt.network to acme.client.

This commit is contained in:
Jakub Warmuz 2015-06-18 10:38:20 +00:00
parent a278d53f52
commit b4d63cbbb3
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
11 changed files with 1115 additions and 1098 deletions

559
acme/client.py Normal file
View file

@ -0,0 +1,559 @@
"""ACME client API."""
import datetime
import heapq
import httplib
import logging
import time
import M2Crypto
import requests
import werkzeug
from acme import errors
from acme import jose
from acme import jws
from acme import messages
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
class Client(object):
"""ACME client.
.. todo::
Clean up raised error types hierarchy, document, and handle (wrap)
instances of `.DeserializationError` raised in `from_json()`.
:ivar str new_reg_uri: Location of new-reg
:ivar key: `.JWK` (private)
:ivar alg: `.JWASignature`
:ivar bool verify_ssl: Verify SSL certificates?
"""
DER_CONTENT_TYPE = 'application/pkix-cert'
JSON_CONTENT_TYPE = 'application/json'
JSON_ERROR_CONTENT_TYPE = 'application/problem+json'
REPLAY_NONCE_HEADER = 'Replay-Nonce'
def __init__(self, new_reg_uri, key, alg=jose.RS256, verify_ssl=True):
self.new_reg_uri = new_reg_uri
self.key = key
self.alg = alg
self.verify_ssl = verify_ssl
self._nonces = set()
def _wrap_in_jws(self, obj, nonce):
"""Wrap `JSONDeSerializable` object in JWS.
.. todo:: Implement ``acmePath``.
:param JSONDeSerializable obj:
:rtype: `.JWS`
"""
dumps = obj.json_dumps()
logging.debug('Serialized JSON: %s', dumps)
return jws.JWS.sign(
payload=dumps, key=self.key, alg=self.alg, nonce=nonce).json_dumps()
@classmethod
def _check_response(cls, response, content_type=None):
"""Check response content and its type.
.. note::
Checking is not strict: wrong server response ``Content-Type``
HTTP header is ignored if response is an expected JSON object
(c.f. Boulder #56).
:param str content_type: Expected Content-Type response header.
If JSON is expected and not present in server response, this
function will raise an error. Otherwise, wrong Content-Type
is ignored, but logged.
:raises .messages.Error: If server response body
carries HTTP Problem (draft-ietf-appsawg-http-problem-00).
:raises .ClientError: In case of other networking errors.
"""
response_ct = response.headers.get('Content-Type')
try:
# TODO: response.json() is called twice, once here, and
# once in _get and _post clients
jobj = response.json()
except ValueError as error:
jobj = None
if not response.ok:
if jobj is not None:
if response_ct != cls.JSON_ERROR_CONTENT_TYPE:
logging.debug(
'Ignoring wrong Content-Type (%r) for JSON Error',
response_ct)
try:
logging.error("Error: %s", jobj)
logging.error("Response from server: %s", response.content)
raise messages.Error.from_json(jobj)
except jose.DeserializationError as error:
# Couldn't deserialize JSON object
raise errors.ClientError((response, error))
else:
# response is not JSON object
raise errors.ClientError(response)
else:
if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE:
logging.debug(
'Ignoring wrong Content-Type (%r) for JSON decodable '
'response', response_ct)
if content_type == cls.JSON_CONTENT_TYPE and jobj is None:
raise errors.ClientError(
'Unexpected response Content-Type: {0}'.format(response_ct))
def _get(self, uri, content_type=JSON_CONTENT_TYPE, **kwargs):
"""Send GET request.
:raises .ClientError:
:returns: HTTP Response
:rtype: `requests.Response`
"""
logging.debug('Sending GET request to %s', uri)
kwargs.setdefault('verify', self.verify_ssl)
try:
response = requests.get(uri, **kwargs)
except requests.exceptions.RequestException as error:
raise errors.ClientError(error)
self._check_response(response, content_type=content_type)
return response
def _add_nonce(self, response):
if self.REPLAY_NONCE_HEADER in response.headers:
nonce = response.headers[self.REPLAY_NONCE_HEADER]
error = jws.Header.validate_nonce(nonce)
if error is None:
logging.debug('Storing nonce: %r', nonce)
self._nonces.add(nonce)
else:
raise errors.ClientError('Invalid nonce ({0}): {1}'.format(
nonce, error))
else:
raise errors.ClientError(
'Server {0} response did not include a replay nonce'.format(
response.request.method))
def _get_nonce(self, uri):
if not self._nonces:
logging.debug('Requesting fresh nonce by sending HEAD to %s', uri)
self._add_nonce(requests.head(uri))
return self._nonces.pop()
def _post(self, uri, obj, content_type=JSON_CONTENT_TYPE, **kwargs):
"""Send POST data.
:param JSONDeSerializable obj: Will be wrapped in JWS.
:param str content_type: Expected ``Content-Type``, fails if not set.
:raises acme.messages.ClientError:
:returns: HTTP Response
:rtype: `requests.Response`
"""
data = self._wrap_in_jws(obj, self._get_nonce(uri))
logging.debug('Sending POST data to %s: %s', uri, data)
kwargs.setdefault('verify', self.verify_ssl)
try:
response = requests.post(uri, data=data, **kwargs)
except requests.exceptions.RequestException as error:
raise errors.ClientError(error)
logging.debug('Received response %s: %r', response, response.text)
self._add_nonce(response)
self._check_response(response, content_type=content_type)
return response
@classmethod
def _regr_from_response(cls, response, uri=None, new_authzr_uri=None,
terms_of_service=None):
terms_of_service = (
response.links['terms-of-service']['url']
if 'terms-of-service' in response.links else terms_of_service)
if new_authzr_uri is None:
try:
new_authzr_uri = response.links['next']['url']
except KeyError:
raise errors.ClientError('"next" link missing')
return messages.RegistrationResource(
body=messages.Registration.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service)
def register(self, contact=messages.Registration._fields[
'contact'].default):
"""Register.
:param contact: Contact list, as accepted by `.Registration`
:type contact: `tuple`
:returns: Registration Resource.
:rtype: `.RegistrationResource`
:raises .UnexpectedUpdate:
"""
new_reg = messages.Registration(contact=contact)
response = self._post(self.new_reg_uri, new_reg)
assert response.status_code == httplib.CREATED # TODO: handle errors
regr = self._regr_from_response(response)
if regr.body.key != self.key.public() or regr.body.contact != contact:
raise errors.UnexpectedUpdate(regr)
return regr
def update_registration(self, regr):
"""Update registration.
:pram regr: Registration Resource.
:type regr: `.RegistrationResource`
:returns: Updated Registration Resource.
:rtype: `.RegistrationResource`
"""
response = self._post(regr.uri, regr.body)
# TODO: Boulder returns httplib.ACCEPTED
#assert response.status_code == httplib.OK
# TODO: Boulder does not set Location or Link on update
# (c.f. acme-spec #94)
updated_regr = self._regr_from_response(
response, uri=regr.uri, new_authzr_uri=regr.new_authzr_uri,
terms_of_service=regr.terms_of_service)
if updated_regr != regr:
raise errors.UnexpectedUpdate(regr)
return updated_regr
def agree_to_tos(self, regr):
"""Agree to the terms-of-service.
Agree to the terms-of-service in a Registration Resource.
:param regr: Registration Resource.
:type regr: `.RegistrationResource`
:returns: Updated Registration Resource.
:rtype: `.RegistrationResource`
"""
return self.update_registration(
regr.update(body=regr.body.update(agreement=regr.terms_of_service)))
def _authzr_from_response(self, response, identifier,
uri=None, new_cert_uri=None):
# pylint: disable=no-self-use
if new_cert_uri is None:
try:
new_cert_uri = response.links['next']['url']
except KeyError:
raise errors.ClientError('"next" link missing')
authzr = messages.AuthorizationResource(
body=messages.Authorization.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_cert_uri=new_cert_uri)
if authzr.body.identifier != identifier:
raise errors.UnexpectedUpdate(authzr)
return authzr
def request_challenges(self, identifier, new_authzr_uri):
"""Request challenges.
:param identifier: Identifier to be challenged.
:type identifier: `.messages.Identifier`
:param str new_authzr_uri: new-authorization URI
:returns: Authorization Resource.
:rtype: `.AuthorizationResource`
"""
new_authz = messages.Authorization(identifier=identifier)
response = self._post(new_authzr_uri, new_authz)
assert response.status_code == httplib.CREATED # TODO: handle errors
return self._authzr_from_response(response, identifier)
def request_domain_challenges(self, domain, new_authz_uri):
"""Request challenges for domain names.
This is simply a convenience function that wraps around
`request_challenges`, but works with domain names instead of
generic identifiers.
:param str domain: Domain name to be challenged.
:param str new_authzr_uri: new-authorization URI
:returns: Authorization Resource.
:rtype: `.AuthorizationResource`
"""
return self.request_challenges(messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value=domain), new_authz_uri)
def answer_challenge(self, challb, response):
"""Answer challenge.
:param challb: Challenge Resource body.
:type challb: `.ChallengeBody`
:param response: Corresponding Challenge response
:type response: `.challenges.ChallengeResponse`
:returns: Challenge Resource with updated body.
:rtype: `.ChallengeResource`
:raises errors.UnexpectedUpdate:
"""
response = self._post(challb.uri, response)
try:
authzr_uri = response.links['up']['url']
except KeyError:
raise errors.ClientError('"up" Link header missing')
challr = messages.ChallengeResource(
authzr_uri=authzr_uri,
body=messages.ChallengeBody.from_json(response.json()))
# TODO: check that challr.uri == response.headers['Location']?
if challr.uri != challb.uri:
raise errors.UnexpectedUpdate(challr.uri)
return challr
@classmethod
def retry_after(cls, response, default):
"""Compute next `poll` time based on response ``Retry-After`` header.
:param response: Response from `poll`.
:type response: `requests.Response`
:param int default: Default value (in seconds), used when
``Retry-After`` header is not present or invalid.
:returns: Time point when next `poll` should be performed.
:rtype: `datetime.datetime`
"""
retry_after = response.headers.get('Retry-After', str(default))
try:
seconds = int(retry_after)
except ValueError:
# pylint: disable=no-member
decoded = werkzeug.parse_date(retry_after) # RFC1123
if decoded is None:
seconds = default
else:
return decoded
return datetime.datetime.now() + datetime.timedelta(seconds=seconds)
def poll(self, authzr):
"""Poll Authorization Resource for status.
:param authzr: Authorization Resource
:type authzr: `.AuthorizationResource`
:returns: Updated Authorization Resource and HTTP response.
:rtype: (`.AuthorizationResource`, `requests.Response`)
"""
response = self._get(authzr.uri)
updated_authzr = self._authzr_from_response(
response, authzr.body.identifier, authzr.uri, authzr.new_cert_uri)
# TODO: check and raise UnexpectedUpdate
return updated_authzr, response
def request_issuance(self, csr, authzrs):
"""Request issuance.
:param csr: CSR
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
:param authzrs: `list` of `.AuthorizationResource`
:returns: Issued certificate
:rtype: `.messages.CertificateResource`
"""
assert authzrs, "Authorizations list is empty"
logging.debug("Requesting issuance...")
# TODO: assert len(authzrs) == number of SANs
req = messages.CertificateRequest(
csr=csr, authorizations=tuple(authzr.uri for authzr in authzrs))
content_type = self.DER_CONTENT_TYPE # TODO: add 'cert_type 'argument
response = self._post(
authzrs[0].new_cert_uri, # TODO: acme-spec #90
req,
content_type=content_type,
headers={'Accept': content_type})
cert_chain_uri = response.links.get('up', {}).get('url')
try:
uri = response.headers['Location']
except KeyError:
raise errors.ClientError('"Location" Header missing')
return messages.CertificateResource(
uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri,
body=jose.ComparableX509(
M2Crypto.X509.load_cert_der_string(response.content)))
def poll_and_request_issuance(self, csr, authzrs, mintime=5):
"""Poll and request issuance.
This function polls all provided Authorization Resource URIs
until all challenges are valid, respecting ``Retry-After`` HTTP
headers, and then calls `request_issuance`.
.. todo:: add `max_attempts` or `timeout`
:param csr: CSR.
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
:param authzrs: `list` of `.AuthorizationResource`
:param int mintime: Minimum time before next attempt, used if
``Retry-After`` is not present in the response.
:returns: ``(cert, updated_authzrs)`` `tuple` where ``cert`` is
the issued certificate (`.messages.CertificateResource.),
and ``updated_authzrs`` is a `tuple` consisting of updated
Authorization Resources (`.AuthorizationResource`) as
present in the responses from server, and in the same order
as the input ``authzrs``.
:rtype: `tuple`
"""
# priority queue with datetime (based on Retry-After) as key,
# and original Authorization Resource as value
waiting = [(datetime.datetime.now(), authzr) for authzr in authzrs]
# mapping between original Authorization Resource and the most
# recently updated one
updated = dict((authzr, authzr) for authzr in authzrs)
while waiting:
# find the smallest Retry-After, and sleep if necessary
when, authzr = heapq.heappop(waiting)
now = datetime.datetime.now()
if when > now:
seconds = (when - now).seconds
logging.debug('Sleeping for %d seconds', seconds)
time.sleep(seconds)
# Note that we poll with the latest updated Authorization
# URI, which might have a different URI than initial one
updated_authzr, response = self.poll(updated[authzr])
updated[authzr] = updated_authzr
if updated_authzr.body.status != messages.STATUS_VALID:
# push back to the priority queue, with updated retry_after
heapq.heappush(waiting, (self.retry_after(
response, default=mintime), authzr))
updated_authzrs = tuple(updated[authzr] for authzr in authzrs)
return self.request_issuance(csr, updated_authzrs), updated_authzrs
def _get_cert(self, uri):
"""Returns certificate from URI.
:param str uri: URI of certificate
:returns: tuple of the form
(response, :class:`acme.jose.ComparableX509`)
:rtype: tuple
"""
content_type = self.DER_CONTENT_TYPE # TODO: make it a param
response = self._get(uri, headers={'Accept': content_type},
content_type=content_type)
return response, jose.ComparableX509(
M2Crypto.X509.load_cert_der_string(response.content))
def check_cert(self, certr):
"""Check for new cert.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:returns: Updated Certificate Resource.
:rtype: `.CertificateResource`
"""
# TODO: acme-spec 5.1 table action should be renamed to
# "refresh cert", and this method integrated with self.refresh
response, cert = self._get_cert(certr.uri)
if 'Location' not in response.headers:
raise errors.ClientError('Location header missing')
if response.headers['Location'] != certr.uri:
raise errors.UnexpectedUpdate(response.text)
return certr.update(body=cert)
def refresh(self, certr):
"""Refresh certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:returns: Updated Certificate Resource.
:rtype: `.CertificateResource`
"""
# TODO: If a client sends a refresh request and the server is
# not willing to refresh the certificate, the server MUST
# respond with status code 403 (Forbidden)
return self.check_cert(certr)
def fetch_chain(self, certr):
"""Fetch chain for certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:returns: Certificate chain, or `None` if no "up" Link was provided.
:rtype: `M2Crypto.X509.X509` wrapped in `.ComparableX509`
"""
if certr.cert_chain_uri is not None:
return self._get_cert(certr.cert_chain_uri)[1]
else:
return None
def revoke(self, certr, when=messages.Revocation.NOW):
"""Revoke certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:param when: When should the revocation take place? Takes
the same values as `.messages.Revocation.revoke`.
:raises .ClientError: If revocation is unsuccessful.
"""
rev = messages.Revocation(revoke=when, authorizations=tuple(
authzr.uri for authzr in certr.authzrs))
response = self._post(certr.uri, rev)
if response.status_code != httplib.OK:
raise errors.ClientError(
'Successful revocation must return HTTP OK status')

530
acme/client_test.py Normal file
View file

@ -0,0 +1,530 @@
"""Tests for acme.client."""
import datetime
import httplib
import os
import pkg_resources
import unittest
import M2Crypto
import mock
import requests
from acme import challenges
from acme import errors
from acme import jose
from acme import jws as acme_jws
from acme import messages
CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'cert.der')),
M2Crypto.X509.FORMAT_DER))
CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der')),
M2Crypto.X509.FORMAT_DER))
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
KEY2 = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa256_key.pem')))
class ClientTest(unittest.TestCase):
"""Tests for acme.client.Client."""
# pylint: disable=too-many-instance-attributes,too-many-public-methods
def setUp(self):
self.verify_ssl = mock.MagicMock()
self.wrap_in_jws = mock.MagicMock(return_value=mock.sentinel.wrapped)
from acme.client import Client
self.net = Client(
new_reg_uri='https://www.letsencrypt-demo.org/acme/new-reg',
key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl)
self.nonce = jose.b64encode('Nonce')
self.net._nonces.add(self.nonce) # pylint: disable=protected-access
self.response = mock.MagicMock(ok=True, status_code=httplib.OK)
self.response.headers = {}
self.response.links = {}
self.post = mock.MagicMock(return_value=self.response)
self.get = mock.MagicMock(return_value=self.response)
self.identifier = messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com')
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public(), recovery_token='t')
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg',
terms_of_service='https://www.letsencrypt-demo.org/tos')
# Authorization
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
challb = messages.ChallengeBody(
uri=(authzr_uri + '/1'), status=messages.STATUS_VALID,
chall=challenges.DNS(token='foo'))
self.challr = messages.ChallengeResource(
body=challb, authzr_uri=authzr_uri)
self.authz = messages.Authorization(
identifier=messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com'),
challenges=(challb,), combinations=None)
self.authzr = messages.AuthorizationResource(
body=self.authz, uri=authzr_uri,
new_cert_uri='https://www.letsencrypt-demo.org/acme/new-cert')
# Request issuance
self.certr = messages.CertificateResource(
body=CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
def _mock_post_get(self):
# pylint: disable=protected-access
self.net._post = self.post
self.net._get = self.get
def test_init(self):
self.assertTrue(self.net.verify_ssl is self.verify_ssl)
def test_wrap_in_jws(self):
class MockJSONDeSerializable(jose.JSONDeSerializable):
# pylint: disable=missing-docstring
def __init__(self, value):
self.value = value
def to_partial_json(self):
return self.value
@classmethod
def from_json(cls, value):
pass # pragma: no cover
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce='Tg')
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(jws.payload, '"foo"')
self.assertEqual(jws.signature.combined.nonce, 'Tg')
# TODO: check that nonce is in protected header
def test_check_response_not_ok_jobj_no_error(self):
self.response.ok = False
self.response.json.return_value = {}
# pylint: disable=protected-access
self.assertRaises(
errors.ClientError, self.net._check_response, self.response)
def test_check_response_not_ok_jobj_error(self):
self.response.ok = False
self.response.json.return_value = messages.Error(
detail='foo', typ='serverInternal', title='some title').to_json()
# pylint: disable=protected-access
self.assertRaises(
messages.Error, self.net._check_response, self.response)
def test_check_response_not_ok_no_jobj(self):
self.response.ok = False
self.response.json.side_effect = ValueError
# pylint: disable=protected-access
self.assertRaises(
errors.ClientError, self.net._check_response, self.response)
def test_check_response_ok_no_jobj_ct_required(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.assertRaises(
errors.ClientError, self.net._check_response, self.response,
content_type=self.net.JSON_CONTENT_TYPE)
def test_check_response_ok_no_jobj_no_ct(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.net._check_response(self.response)
def test_check_response_jobj(self):
self.response.json.return_value = {}
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.net._check_response(self.response)
@mock.patch('acme.client.requests')
def test_get_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.get.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self.assertRaises(errors.ClientError, self.net._get, 'uri')
@mock.patch('acme.client.requests')
def test_get(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self.net._get('uri', content_type='ct')
self.net._check_response.assert_called_once_with(
requests_mock.get('uri'), content_type='ct')
def _mock_wrap_in_jws(self):
# pylint: disable=protected-access
self.net._wrap_in_jws = self.wrap_in_jws
@mock.patch('acme.client.requests')
def test_post_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.post.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self._mock_wrap_in_jws()
self.assertRaises(
errors.ClientError, self.net._post, 'uri', mock.sentinel.obj)
@mock.patch('acme.client.requests')
def test_post(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self._mock_wrap_in_jws()
requests_mock.post().headers = {
self.net.REPLAY_NONCE_HEADER: self.nonce}
self.net._post('uri', mock.sentinel.obj, content_type='ct')
self.net._check_response.assert_called_once_with(
requests_mock.post('uri', mock.sentinel.wrapped), content_type='ct')
@mock.patch('acme.client.requests')
def test_post_replay_nonce_handling(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self._mock_wrap_in_jws()
self.net._nonces.clear()
self.assertRaises(
errors.ClientError, self.net._post, 'uri', mock.sentinel.obj)
nonce2 = jose.b64encode('Nonce2')
requests_mock.head('uri').headers = {
self.net.REPLAY_NONCE_HEADER: nonce2}
requests_mock.post('uri').headers = {
self.net.REPLAY_NONCE_HEADER: self.nonce}
self.net._post('uri', mock.sentinel.obj)
requests_mock.head.assert_called_with('uri')
self.wrap_in_jws.assert_called_once_with(mock.sentinel.obj, nonce2)
self.assertEqual(self.net._nonces, set([self.nonce]))
# wrong nonce
requests_mock.post('uri').headers = {self.net.REPLAY_NONCE_HEADER: 'F'}
self.assertRaises(
errors.ClientError, self.net._post, 'uri', mock.sentinel.obj)
@mock.patch('acme.client.requests')
def test_get_post_verify_ssl(self, requests_mock):
# pylint: disable=protected-access
self._mock_wrap_in_jws()
self.net._check_response = mock.MagicMock()
for verify_ssl in [True, False]:
self.net.verify_ssl = verify_ssl
self.net._get('uri')
self.net._nonces.add('N')
requests_mock.post().headers = {
self.net.REPLAY_NONCE_HEADER: self.nonce}
self.net._post('uri', mock.sentinel.obj)
requests_mock.get.assert_called_once_with('uri', verify=verify_ssl)
requests_mock.post.assert_called_with(
'uri', data=mock.sentinel.wrapped, verify=verify_ssl)
requests_mock.reset_mock()
def test_register(self):
self.response.status_code = httplib.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
'next': {'url': self.regr.new_authzr_uri},
'terms-of-service': {'url': self.regr.terms_of_service},
})
self._mock_post_get()
self.assertEqual(self.regr, self.net.register(self.contact))
# TODO: test POST call arguments
# TODO: split here and separate test
reg_wrong_key = self.regr.body.update(key=KEY2.public())
self.response.json.return_value = reg_wrong_key.to_json()
self.assertRaises(
errors.UnexpectedUpdate, self.net.register, self.contact)
def test_register_missing_next(self):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.ClientError, self.net.register, self.regr.body)
def test_update_registration(self):
self.response.headers['Location'] = self.regr.uri
self.response.json.return_value = self.regr.body.to_json()
self._mock_post_get()
self.assertEqual(self.regr, self.net.update_registration(self.regr))
# TODO: split here and separate test
self.response.json.return_value = self.regr.body.update(
contact=()).to_json()
self.assertRaises(
errors.UnexpectedUpdate, self.net.update_registration, self.regr)
def test_agree_to_tos(self):
self.net.update_registration = mock.Mock()
self.net.agree_to_tos(self.regr)
regr = self.net.update_registration.call_args[0][0]
self.assertEqual(self.regr.terms_of_service, regr.body.agreement)
def test_request_challenges(self):
self.response.status_code = httplib.CREATED
self.response.headers['Location'] = self.authzr.uri
self.response.json.return_value = self.authz.to_json()
self.response.links = {
'next': {'url': self.authzr.new_cert_uri},
}
self._mock_post_get()
self.net.request_challenges(self.identifier, self.authzr.uri)
# TODO: test POST call arguments
# TODO: split here and separate test
self.response.json.return_value = self.authz.update(
identifier=self.identifier.update(value='foo')).to_json()
self.assertRaises(errors.UnexpectedUpdate, self.net.request_challenges,
self.identifier, self.authzr.uri)
def test_request_challenges_missing_next(self):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.ClientError, self.net.request_challenges,
self.identifier, self.regr)
def test_request_domain_challenges(self):
self.net.request_challenges = mock.MagicMock()
self.assertEqual(
self.net.request_challenges(self.identifier),
self.net.request_domain_challenges('example.com', self.regr))
def test_answer_challenge(self):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.to_json()
chall_response = challenges.DNSResponse()
self._mock_post_get()
self.net.answer_challenge(self.challr.body, chall_response)
# TODO: split here and separate test
self.assertRaises(errors.UnexpectedUpdate, self.net.answer_challenge,
self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self):
self._mock_post_get()
self.assertRaises(errors.ClientError, self.net.answer_challenge,
self.challr.body, challenges.DNSResponse())
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.assertEqual(
datetime.datetime(1999, 12, 31, 23, 59, 59),
self.net.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_invalid(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = 'foooo'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.net.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_seconds(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 50),
self.net.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_missing(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.net.retry_after(response=self.response, default=10))
def test_poll(self):
self.response.json.return_value = self.authzr.body.to_json()
self._mock_post_get()
self.assertEqual((self.authzr, self.response),
self.net.poll(self.authzr))
# TODO: split here and separate test
self.response.json.return_value = self.authz.update(
identifier=self.identifier.update(value='foo')).to_json()
self.assertRaises(errors.UnexpectedUpdate, self.net.poll, self.authzr)
def test_request_issuance(self):
self.response.content = CERT.as_der()
self.response.headers['Location'] = self.certr.uri
self.response.links['up'] = {'url': self.certr.cert_chain_uri}
self._mock_post_get()
self.assertEqual(
self.certr, self.net.request_issuance(CSR, (self.authzr,)))
# TODO: check POST args
def test_request_issuance_missing_up(self):
self.response.content = CERT.as_der()
self.response.headers['Location'] = self.certr.uri
self._mock_post_get()
self.assertEqual(
self.certr.update(cert_chain_uri=None),
self.net.request_issuance(CSR, (self.authzr,)))
def test_request_issuance_missing_location(self):
self._mock_post_get()
self.assertRaises(
errors.ClientError, self.net.request_issuance,
CSR, (self.authzr,))
@mock.patch('acme.client.datetime')
@mock.patch('acme.client.time')
def test_poll_and_request_issuance(self, time_mock, dt_mock):
# clock.dt | pylint: disable=no-member
clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27))
def sleep(seconds):
"""increment clock"""
clock.dt += datetime.timedelta(seconds=seconds)
time_mock.sleep.side_effect = sleep
def now():
"""return current clock value"""
return clock.dt
dt_mock.datetime.now.side_effect = now
dt_mock.timedelta = datetime.timedelta
def poll(authzr): # pylint: disable=missing-docstring
# record poll start time based on the current clock value
authzr.times.append(clock.dt)
# suppose it takes 2 seconds for server to produce the
# result, increment clock
clock.dt += datetime.timedelta(seconds=2)
if not authzr.retries: # no more retries
done = mock.MagicMock(uri=authzr.uri, times=authzr.times)
done.body.status = messages.STATUS_VALID
return done, []
# response (2nd result tuple element) is reduced to only
# Retry-After header contents represented as integer
# seconds; authzr.retries is a list of Retry-After
# headers, head(retries) is peeled of as a current
# Retry-After header, and tail(retries) is persisted for
# later poll() calls
return (mock.MagicMock(retries=authzr.retries[1:],
uri=authzr.uri + '.', times=authzr.times),
authzr.retries[0])
self.net.poll = mock.MagicMock(side_effect=poll)
mintime = 7
def retry_after(response, default): # pylint: disable=missing-docstring
# check that poll_and_request_issuance correctly passes mintime
self.assertEqual(default, mintime)
return clock.dt + datetime.timedelta(seconds=response)
self.net.retry_after = mock.MagicMock(side_effect=retry_after)
def request_issuance(csr, authzrs): # pylint: disable=missing-docstring
return csr, authzrs
self.net.request_issuance = mock.MagicMock(side_effect=request_issuance)
csr = mock.MagicMock()
authzrs = (
mock.MagicMock(uri='a', times=[], retries=(8, 20, 30)),
mock.MagicMock(uri='b', times=[], retries=(5,)),
)
cert, updated_authzrs = self.net.poll_and_request_issuance(
csr, authzrs, mintime=mintime)
self.assertTrue(cert[0] is csr)
self.assertTrue(cert[1] is updated_authzrs)
self.assertEqual(updated_authzrs[0].uri, 'a...')
self.assertEqual(updated_authzrs[1].uri, 'b.')
self.assertEqual(updated_authzrs[0].times, [
datetime.datetime(2015, 3, 27),
# a is scheduled for 10, but b is polling [9..11), so it
# will be picked up as soon as b is finished, without
# additional sleeping
datetime.datetime(2015, 3, 27, 0, 0, 11),
datetime.datetime(2015, 3, 27, 0, 0, 33),
datetime.datetime(2015, 3, 27, 0, 1, 5),
])
self.assertEqual(updated_authzrs[1].times, [
datetime.datetime(2015, 3, 27, 0, 0, 2),
datetime.datetime(2015, 3, 27, 0, 0, 9),
])
self.assertEqual(clock.dt, datetime.datetime(2015, 3, 27, 0, 1, 7))
def test_check_cert(self):
self.response.headers['Location'] = self.certr.uri
self.response.content = CERT.as_der()
self._mock_post_get()
self.assertEqual(
self.certr.update(body=CERT), self.net.check_cert(self.certr))
# TODO: split here and separate test
self.response.headers['Location'] = 'foo'
self.assertRaises(
errors.UnexpectedUpdate, self.net.check_cert, self.certr)
def test_check_cert_missing_location(self):
self.response.content = CERT.as_der()
self._mock_post_get()
self.assertRaises(errors.ClientError, self.net.check_cert, self.certr)
def test_refresh(self):
self.net.check_cert = mock.MagicMock()
self.assertEqual(
self.net.check_cert(self.certr), self.net.refresh(self.certr))
def test_fetch_chain(self):
# pylint: disable=protected-access
self.net._get_cert = mock.MagicMock()
self.net._get_cert.return_value = ("response", "certificate")
self.assertEqual(self.net._get_cert(self.certr.cert_chain_uri)[1],
self.net.fetch_chain(self.certr))
def test_fetch_chain_no_up_link(self):
self.assertTrue(self.net.fetch_chain(self.certr.update(
cert_chain_uri=None)) is None)
def test_revoke(self):
self._mock_post_get()
self.net.revoke(self.certr, when=messages.Revocation.NOW)
self.post.assert_called_once_with(self.certr.uri, mock.ANY)
def test_revoke_bad_status_raises_error(self):
self.response.status_code = httplib.METHOD_NOT_ALLOWED
self._mock_post_get()
self.assertRaises(errors.ClientError, self.net.revoke, self.certr)
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,8 +1,15 @@
"""ACME errors."""
from acme.jose import errors as jose_errors
class Error(Exception):
"""Generic ACME error."""
class SchemaValidationError(jose_errors.DeserializationError):
"""JSON schema ACME object validation error."""
class ClientError(Error):
"""Network error."""
class UnexpectedUpdate(ClientError):
"""Unexpected update."""

View file

@ -4,7 +4,8 @@ The following command has been used to generate test keys:
and for the CSR:
python -c from 'letsencrypt.crypto_util import make_csr;
import pkg_resources; open("csr2.pem",
"w").write(make_csr(pkg_resources.resource_string("letsencrypt.tests",
"testdata/rsa512_key.pem"), ["example2.com"])[0])'
openssl req -key rsa512_key.pem -new -subj '/CN=example.com' -outform DER > csr.der
and for the certificate:
openssl req -key rsa512_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der

BIN
acme/jose/testdata/cert.der vendored Normal file

Binary file not shown.

BIN
acme/jose/testdata/csr.der vendored Normal file

Binary file not shown.

View file

@ -1,10 +0,0 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIBXzCCAQkCAQAwejELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIw
EAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECwwWVW5pdmVy
c2l0eSBvZiBNaWNoaWdhbjEVMBMGA1UEAwwMZXhhbXBsZTIuY29tMFwwDQYJKoZI
hvcNAQEBBQADSwAwSAJBAPS2EXFRNza/qpXnnBHF/CcFQ543htV+7nLAmrLrmTNH
tPXJmLlM8SJDIzv/ceAFXL110VzxFfi81lpH5E5c0TMCAwEAAaAqMCgGCSqGSIb3
DQEJDjEbMBkwFwYDVR0RBBAwDoIMZXhhbXBsZTIuY29tMA0GCSqGSIb3DQEBCwUA
A0EAwsdL4FLIgISKV4vXFmc6QTV7CjBiP4XmPFbeN+gMFdR7QcnRyyxSpXxB0v8Z
oqYboP5LGFt9zC6/9GyjcI9/IQ==
-----END CERTIFICATE REQUEST-----

View file

@ -7,6 +7,13 @@
:members:
Client
------
.. automodule:: acme.client
:members:
Messages
--------

View file

@ -5,14 +5,6 @@ class LetsEncryptClientError(Exception):
"""Generic Let's Encrypt client error."""
class NetworkError(LetsEncryptClientError):
"""Network error."""
class UnexpectedUpdate(NetworkError):
"""Unexpected update."""
class LetsEncryptReverterError(LetsEncryptClientError):
"""Let's Encrypt Reverter error."""

View file

@ -1,230 +1,15 @@
"""Networking for ACME protocol v02."""
import datetime
import heapq
import httplib
import logging
import time
import M2Crypto
import requests
import werkzeug
from acme import jose
from acme import jws as acme_jws
from acme import messages
from letsencrypt import errors
"""Networking for ACME protocol."""
from acme import client
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
class Network(object):
"""ACME networking.
.. todo::
Clean up raised error types hierarchy, document, and handle (wrap)
instances of `.DeserializationError` raised in `from_json()`.
:ivar str new_reg_uri: Location of new-reg
:ivar key: `.JWK` (private)
:ivar alg: `.JWASignature`
:ivar bool verify_ssl: Verify SSL certificates?
"""
# TODO: Move below to acme module?
DER_CONTENT_TYPE = 'application/pkix-cert'
JSON_CONTENT_TYPE = 'application/json'
JSON_ERROR_CONTENT_TYPE = 'application/problem+json'
REPLAY_NONCE_HEADER = 'Replay-Nonce'
def __init__(self, new_reg_uri, key, alg=jose.RS256, verify_ssl=True):
self.new_reg_uri = new_reg_uri
self.key = key
self.alg = alg
self.verify_ssl = verify_ssl
self._nonces = set()
def _wrap_in_jws(self, obj, nonce):
"""Wrap `JSONDeSerializable` object in JWS.
.. todo:: Implement ``acmePath``.
:param JSONDeSerializable obj:
:rtype: `.JWS`
"""
dumps = obj.json_dumps()
logging.debug('Serialized JSON: %s', dumps)
return acme_jws.JWS.sign(
payload=dumps, key=self.key, alg=self.alg, nonce=nonce).json_dumps()
@classmethod
def _check_response(cls, response, content_type=None):
"""Check response content and its type.
.. note::
Checking is not strict: wrong server response ``Content-Type``
HTTP header is ignored if response is an expected JSON object
(c.f. Boulder #56).
:param str content_type: Expected Content-Type response header.
If JSON is expected and not present in server response, this
function will raise an error. Otherwise, wrong Content-Type
is ignored, but logged.
:raises letsencrypt.messages.Error: If server response body
carries HTTP Problem (draft-ietf-appsawg-http-problem-00).
:raises letsencrypt.errors.NetworkError: In case of other
networking errors.
"""
response_ct = response.headers.get('Content-Type')
try:
# TODO: response.json() is called twice, once here, and
# once in _get and _post clients
jobj = response.json()
except ValueError as error:
jobj = None
if not response.ok:
if jobj is not None:
if response_ct != cls.JSON_ERROR_CONTENT_TYPE:
logging.debug(
'Ignoring wrong Content-Type (%r) for JSON Error',
response_ct)
try:
logging.error("Error: %s", jobj)
logging.error("Response from server: %s", response.content)
raise messages.Error.from_json(jobj)
except jose.DeserializationError as error:
# Couldn't deserialize JSON object
raise errors.NetworkError((response, error))
else:
# response is not JSON object
raise errors.NetworkError(response)
else:
if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE:
logging.debug(
'Ignoring wrong Content-Type (%r) for JSON decodable '
'response', response_ct)
if content_type == cls.JSON_CONTENT_TYPE and jobj is None:
raise errors.NetworkError(
'Unexpected response Content-Type: {0}'.format(response_ct))
def _get(self, uri, content_type=JSON_CONTENT_TYPE, **kwargs):
"""Send GET request.
:raises letsencrypt.errors.NetworkError:
:returns: HTTP Response
:rtype: `requests.Response`
"""
logging.debug('Sending GET request to %s', uri)
kwargs.setdefault('verify', self.verify_ssl)
try:
response = requests.get(uri, **kwargs)
except requests.exceptions.RequestException as error:
raise errors.NetworkError(error)
self._check_response(response, content_type=content_type)
return response
def _add_nonce(self, response):
if self.REPLAY_NONCE_HEADER in response.headers:
nonce = response.headers[self.REPLAY_NONCE_HEADER]
error = acme_jws.Header.validate_nonce(nonce)
if error is None:
logging.debug('Storing nonce: %r', nonce)
self._nonces.add(nonce)
else:
raise errors.NetworkError('Invalid nonce ({0}): {1}'.format(
nonce, error))
else:
raise errors.NetworkError(
'Server {0} response did not include a replay nonce'.format(
response.request.method))
def _get_nonce(self, uri):
if not self._nonces:
logging.debug('Requesting fresh nonce by sending HEAD to %s', uri)
self._add_nonce(requests.head(uri))
return self._nonces.pop()
def _post(self, uri, obj, content_type=JSON_CONTENT_TYPE, **kwargs):
"""Send POST data.
:param JSONDeSerializable obj: Will be wrapped in JWS.
:param str content_type: Expected ``Content-Type``, fails if not set.
:raises acme.messages.NetworkError:
:returns: HTTP Response
:rtype: `requests.Response`
"""
data = self._wrap_in_jws(obj, self._get_nonce(uri))
logging.debug('Sending POST data to %s: %s', uri, data)
kwargs.setdefault('verify', self.verify_ssl)
try:
response = requests.post(uri, data=data, **kwargs)
except requests.exceptions.RequestException as error:
raise errors.NetworkError(error)
logging.debug('Received response %s: %r', response, response.text)
self._add_nonce(response)
self._check_response(response, content_type=content_type)
return response
@classmethod
def _regr_from_response(cls, response, uri=None, new_authzr_uri=None,
terms_of_service=None):
terms_of_service = (
response.links['terms-of-service']['url']
if 'terms-of-service' in response.links else terms_of_service)
if new_authzr_uri is None:
try:
new_authzr_uri = response.links['next']['url']
except KeyError:
raise errors.NetworkError('"next" link missing')
return messages.RegistrationResource(
body=messages.Registration.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service)
def register(self, contact=messages.Registration._fields[
'contact'].default):
"""Register.
:param contact: Contact list, as accepted by `.Registration`
:type contact: `tuple`
:returns: Registration Resource.
:rtype: `.RegistrationResource`
:raises letsencrypt.errors.UnexpectedUpdate:
"""
new_reg = messages.Registration(contact=contact)
response = self._post(self.new_reg_uri, new_reg)
assert response.status_code == httplib.CREATED # TODO: handle errors
regr = self._regr_from_response(response)
if regr.body.key != self.key.public() or regr.body.contact != contact:
raise errors.UnexpectedUpdate(regr)
return regr
class Network(client.Client):
"""ACME networking."""
def register_from_account(self, account):
"""Register with server.
.. todo:: this should probably not be a part of network...
:param account: Account
:type account: :class:`letsencrypt.account.Account`
@ -239,344 +24,3 @@ class Network(object):
account.regr = self.register(contact=tuple(
det for det in details if det is not None))
return account
def update_registration(self, regr):
"""Update registration.
:pram regr: Registration Resource.
:type regr: `.RegistrationResource`
:returns: Updated Registration Resource.
:rtype: `.RegistrationResource`
"""
response = self._post(regr.uri, regr.body)
# TODO: Boulder returns httplib.ACCEPTED
#assert response.status_code == httplib.OK
# TODO: Boulder does not set Location or Link on update
# (c.f. acme-spec #94)
updated_regr = self._regr_from_response(
response, uri=regr.uri, new_authzr_uri=regr.new_authzr_uri,
terms_of_service=regr.terms_of_service)
if updated_regr != regr:
raise errors.UnexpectedUpdate(regr)
return updated_regr
def agree_to_tos(self, regr):
"""Agree to the terms-of-service.
Agree to the terms-of-service in a Registration Resource.
:param regr: Registration Resource.
:type regr: `.RegistrationResource`
:returns: Updated Registration Resource.
:rtype: `.RegistrationResource`
"""
return self.update_registration(
regr.update(body=regr.body.update(agreement=regr.terms_of_service)))
def _authzr_from_response(self, response, identifier,
uri=None, new_cert_uri=None):
# pylint: disable=no-self-use
if new_cert_uri is None:
try:
new_cert_uri = response.links['next']['url']
except KeyError:
raise errors.NetworkError('"next" link missing')
authzr = messages.AuthorizationResource(
body=messages.Authorization.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_cert_uri=new_cert_uri)
if authzr.body.identifier != identifier:
raise errors.UnexpectedUpdate(authzr)
return authzr
def request_challenges(self, identifier, new_authzr_uri):
"""Request challenges.
:param identifier: Identifier to be challenged.
:type identifier: `.messages.Identifier`
:param str new_authzr_uri: new-authorization URI
:returns: Authorization Resource.
:rtype: `.AuthorizationResource`
"""
new_authz = messages.Authorization(identifier=identifier)
response = self._post(new_authzr_uri, new_authz)
assert response.status_code == httplib.CREATED # TODO: handle errors
return self._authzr_from_response(response, identifier)
def request_domain_challenges(self, domain, new_authz_uri):
"""Request challenges for domain names.
This is simply a convenience function that wraps around
`request_challenges`, but works with domain names instead of
generic identifiers.
:param str domain: Domain name to be challenged.
:param str new_authzr_uri: new-authorization URI
:returns: Authorization Resource.
:rtype: `.AuthorizationResource`
"""
return self.request_challenges(messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value=domain), new_authz_uri)
def answer_challenge(self, challb, response):
"""Answer challenge.
:param challb: Challenge Resource body.
:type challb: `.ChallengeBody`
:param response: Corresponding Challenge response
:type response: `.challenges.ChallengeResponse`
:returns: Challenge Resource with updated body.
:rtype: `.ChallengeResource`
:raises errors.UnexpectedUpdate:
"""
response = self._post(challb.uri, response)
try:
authzr_uri = response.links['up']['url']
except KeyError:
raise errors.NetworkError('"up" Link header missing')
challr = messages.ChallengeResource(
authzr_uri=authzr_uri,
body=messages.ChallengeBody.from_json(response.json()))
# TODO: check that challr.uri == response.headers['Location']?
if challr.uri != challb.uri:
raise errors.UnexpectedUpdate(challr.uri)
return challr
@classmethod
def retry_after(cls, response, default):
"""Compute next `poll` time based on response ``Retry-After`` header.
:param response: Response from `poll`.
:type response: `requests.Response`
:param int default: Default value (in seconds), used when
``Retry-After`` header is not present or invalid.
:returns: Time point when next `poll` should be performed.
:rtype: `datetime.datetime`
"""
retry_after = response.headers.get('Retry-After', str(default))
try:
seconds = int(retry_after)
except ValueError:
# pylint: disable=no-member
decoded = werkzeug.parse_date(retry_after) # RFC1123
if decoded is None:
seconds = default
else:
return decoded
return datetime.datetime.now() + datetime.timedelta(seconds=seconds)
def poll(self, authzr):
"""Poll Authorization Resource for status.
:param authzr: Authorization Resource
:type authzr: `.AuthorizationResource`
:returns: Updated Authorization Resource and HTTP response.
:rtype: (`.AuthorizationResource`, `requests.Response`)
"""
response = self._get(authzr.uri)
updated_authzr = self._authzr_from_response(
response, authzr.body.identifier, authzr.uri, authzr.new_cert_uri)
# TODO: check and raise UnexpectedUpdate
return updated_authzr, response
def request_issuance(self, csr, authzrs):
"""Request issuance.
:param csr: CSR
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
:param authzrs: `list` of `.AuthorizationResource`
:returns: Issued certificate
:rtype: `.messages.CertificateResource`
"""
assert authzrs, "Authorizations list is empty"
logging.debug("Requesting issuance...")
# TODO: assert len(authzrs) == number of SANs
req = messages.CertificateRequest(
csr=csr, authorizations=tuple(authzr.uri for authzr in authzrs))
content_type = self.DER_CONTENT_TYPE # TODO: add 'cert_type 'argument
response = self._post(
authzrs[0].new_cert_uri, # TODO: acme-spec #90
req,
content_type=content_type,
headers={'Accept': content_type})
cert_chain_uri = response.links.get('up', {}).get('url')
try:
uri = response.headers['Location']
except KeyError:
raise errors.NetworkError('"Location" Header missing')
return messages.CertificateResource(
uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri,
body=jose.ComparableX509(
M2Crypto.X509.load_cert_der_string(response.content)))
def poll_and_request_issuance(self, csr, authzrs, mintime=5):
"""Poll and request issuance.
This function polls all provided Authorization Resource URIs
until all challenges are valid, respecting ``Retry-After`` HTTP
headers, and then calls `request_issuance`.
.. todo:: add `max_attempts` or `timeout`
:param csr: CSR.
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
:param authzrs: `list` of `.AuthorizationResource`
:param int mintime: Minimum time before next attempt, used if
``Retry-After`` is not present in the response.
:returns: ``(cert, updated_authzrs)`` `tuple` where ``cert`` is
the issued certificate (`.messages.CertificateResource.),
and ``updated_authzrs`` is a `tuple` consisting of updated
Authorization Resources (`.AuthorizationResource`) as
present in the responses from server, and in the same order
as the input ``authzrs``.
:rtype: `tuple`
"""
# priority queue with datetime (based on Retry-After) as key,
# and original Authorization Resource as value
waiting = [(datetime.datetime.now(), authzr) for authzr in authzrs]
# mapping between original Authorization Resource and the most
# recently updated one
updated = dict((authzr, authzr) for authzr in authzrs)
while waiting:
# find the smallest Retry-After, and sleep if necessary
when, authzr = heapq.heappop(waiting)
now = datetime.datetime.now()
if when > now:
seconds = (when - now).seconds
logging.debug('Sleeping for %d seconds', seconds)
time.sleep(seconds)
# Note that we poll with the latest updated Authorization
# URI, which might have a different URI than initial one
updated_authzr, response = self.poll(updated[authzr])
updated[authzr] = updated_authzr
if updated_authzr.body.status != messages.STATUS_VALID:
# push back to the priority queue, with updated retry_after
heapq.heappush(waiting, (self.retry_after(
response, default=mintime), authzr))
updated_authzrs = tuple(updated[authzr] for authzr in authzrs)
return self.request_issuance(csr, updated_authzrs), updated_authzrs
def _get_cert(self, uri):
"""Returns certificate from URI.
:param str uri: URI of certificate
:returns: tuple of the form
(response, :class:`acme.jose.ComparableX509`)
:rtype: tuple
"""
content_type = self.DER_CONTENT_TYPE # TODO: make it a param
response = self._get(uri, headers={'Accept': content_type},
content_type=content_type)
return response, jose.ComparableX509(
M2Crypto.X509.load_cert_der_string(response.content))
def check_cert(self, certr):
"""Check for new cert.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:returns: Updated Certificate Resource.
:rtype: `.CertificateResource`
"""
# TODO: acme-spec 5.1 table action should be renamed to
# "refresh cert", and this method integrated with self.refresh
response, cert = self._get_cert(certr.uri)
if 'Location' not in response.headers:
raise errors.NetworkError('Location header missing')
if response.headers['Location'] != certr.uri:
raise errors.UnexpectedUpdate(response.text)
return certr.update(body=cert)
def refresh(self, certr):
"""Refresh certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:returns: Updated Certificate Resource.
:rtype: `.CertificateResource`
"""
# TODO: If a client sends a refresh request and the server is
# not willing to refresh the certificate, the server MUST
# respond with status code 403 (Forbidden)
return self.check_cert(certr)
def fetch_chain(self, certr):
"""Fetch chain for certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:returns: Certificate chain, or `None` if no "up" Link was provided.
:rtype: `M2Crypto.X509.X509` wrapped in `.ComparableX509`
"""
if certr.cert_chain_uri is not None:
return self._get_cert(certr.cert_chain_uri)[1]
else:
return None
def revoke(self, certr, when=messages.Revocation.NOW):
"""Revoke certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:param when: When should the revocation take place? Takes
the same values as `.messages.Revocation.revoke`.
:raises letsencrypt.errors.NetworkError: If revocation is
unsuccessful.
"""
rev = messages.Revocation(revoke=when, authorizations=tuple(
authzr.uri for authzr in certr.authzrs))
response = self._post(certr.uri, rev)
if response.status_code != httplib.OK:
raise errors.NetworkError(
'Successful revocation must return HTTP OK status')

View file

@ -1,281 +1,27 @@
"""Tests for letsencrypt.network."""
import datetime
import httplib
import os
import pkg_resources
import shutil
import tempfile
import unittest
import M2Crypto
import mock
import requests
from acme import challenges
from acme import jose
from acme import jws as acme_jws
from acme import messages
from letsencrypt import account
from letsencrypt import errors
CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'cert.pem'))))
CERT2 = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'cert-san.pem'))))
CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'csr.pem'))))
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
KEY2 = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa256_key.pem')))
class NetworkTest(unittest.TestCase):
"""Tests for letsencrypt.network.Network."""
# pylint: disable=too-many-instance-attributes,too-many-public-methods
def setUp(self):
self.verify_ssl = mock.MagicMock()
self.wrap_in_jws = mock.MagicMock(return_value=mock.sentinel.wrapped)
from letsencrypt.network import Network
self.net = Network(
new_reg_uri='https://www.letsencrypt-demo.org/acme/new-reg',
key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl)
self.nonce = jose.b64encode('Nonce')
self.net._nonces.add(self.nonce) # pylint: disable=protected-access
self.response = mock.MagicMock(ok=True, status_code=httplib.OK)
self.response.headers = {}
self.response.links = {}
self.post = mock.MagicMock(return_value=self.response)
self.get = mock.MagicMock(return_value=self.response)
self.identifier = messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com')
new_reg_uri=None, key=None, alg=None, verify_ssl=None)
self.config = mock.Mock(accounts_dir=tempfile.mkdtemp())
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public(), recovery_token='t')
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg',
terms_of_service='https://www.letsencrypt-demo.org/tos')
# Authorization
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
challb = messages.ChallengeBody(
uri=(authzr_uri + '/1'), status=messages.STATUS_VALID,
chall=challenges.DNS(token='foo'))
self.challr = messages.ChallengeResource(
body=challb, authzr_uri=authzr_uri)
self.authz = messages.Authorization(
identifier=messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com'),
challenges=(challb,), combinations=None)
self.authzr = messages.AuthorizationResource(
body=self.authz, uri=authzr_uri,
new_cert_uri='https://www.letsencrypt-demo.org/acme/new-cert')
# Request issuance
self.certr = messages.CertificateResource(
body=CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
def tearDown(self):
shutil.rmtree(self.config.accounts_dir)
def _mock_post_get(self):
# pylint: disable=protected-access
self.net._post = self.post
self.net._get = self.get
def test_init(self):
self.assertTrue(self.net.verify_ssl is self.verify_ssl)
def test_wrap_in_jws(self):
class MockJSONDeSerializable(jose.JSONDeSerializable):
# pylint: disable=missing-docstring
def __init__(self, value):
self.value = value
def to_partial_json(self):
return self.value
@classmethod
def from_json(cls, value):
pass # pragma: no cover
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce='Tg')
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(jws.payload, '"foo"')
self.assertEqual(jws.signature.combined.nonce, 'Tg')
# TODO: check that nonce is in protected header
def test_check_response_not_ok_jobj_no_error(self):
self.response.ok = False
self.response.json.return_value = {}
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response)
def test_check_response_not_ok_jobj_error(self):
self.response.ok = False
self.response.json.return_value = messages.Error(
detail='foo', typ='serverInternal', title='some title').to_json()
# pylint: disable=protected-access
self.assertRaises(
messages.Error, self.net._check_response, self.response)
def test_check_response_not_ok_no_jobj(self):
self.response.ok = False
self.response.json.side_effect = ValueError
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response)
def test_check_response_ok_no_jobj_ct_required(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.assertRaises(
errors.NetworkError, self.net._check_response, self.response,
content_type=self.net.JSON_CONTENT_TYPE)
def test_check_response_ok_no_jobj_no_ct(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.net._check_response(self.response)
def test_check_response_jobj(self):
self.response.json.return_value = {}
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
self.net._check_response(self.response)
@mock.patch('letsencrypt.network.requests')
def test_get_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.get.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self.assertRaises(errors.NetworkError, self.net._get, 'uri')
@mock.patch('letsencrypt.network.requests')
def test_get(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self.net._get('uri', content_type='ct')
self.net._check_response.assert_called_once_with(
requests_mock.get('uri'), content_type='ct')
def _mock_wrap_in_jws(self):
# pylint: disable=protected-access
self.net._wrap_in_jws = self.wrap_in_jws
@mock.patch('letsencrypt.network.requests')
def test_post_requests_error_passthrough(self, requests_mock):
requests_mock.exceptions = requests.exceptions
requests_mock.post.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
self._mock_wrap_in_jws()
self.assertRaises(
errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj)
@mock.patch('letsencrypt.network.requests')
def test_post(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self._mock_wrap_in_jws()
requests_mock.post().headers = {
self.net.REPLAY_NONCE_HEADER: self.nonce}
self.net._post('uri', mock.sentinel.obj, content_type='ct')
self.net._check_response.assert_called_once_with(
requests_mock.post('uri', mock.sentinel.wrapped), content_type='ct')
@mock.patch('letsencrypt.network.requests')
def test_post_replay_nonce_handling(self, requests_mock):
# pylint: disable=protected-access
self.net._check_response = mock.MagicMock()
self._mock_wrap_in_jws()
self.net._nonces.clear()
self.assertRaises(
errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj)
nonce2 = jose.b64encode('Nonce2')
requests_mock.head('uri').headers = {
self.net.REPLAY_NONCE_HEADER: nonce2}
requests_mock.post('uri').headers = {
self.net.REPLAY_NONCE_HEADER: self.nonce}
self.net._post('uri', mock.sentinel.obj)
requests_mock.head.assert_called_with('uri')
self.wrap_in_jws.assert_called_once_with(mock.sentinel.obj, nonce2)
self.assertEqual(self.net._nonces, set([self.nonce]))
# wrong nonce
requests_mock.post('uri').headers = {self.net.REPLAY_NONCE_HEADER: 'F'}
self.assertRaises(
errors.NetworkError, self.net._post, 'uri', mock.sentinel.obj)
@mock.patch('letsencrypt.client.network.requests')
def test_get_post_verify_ssl(self, requests_mock):
# pylint: disable=protected-access
self._mock_wrap_in_jws()
self.net._check_response = mock.MagicMock()
for verify_ssl in [True, False]:
self.net.verify_ssl = verify_ssl
self.net._get('uri')
self.net._nonces.add('N')
requests_mock.post().headers = {
self.net.REPLAY_NONCE_HEADER: self.nonce}
self.net._post('uri', mock.sentinel.obj)
requests_mock.get.assert_called_once_with('uri', verify=verify_ssl)
requests_mock.post.assert_called_with(
'uri', data=mock.sentinel.wrapped, verify=verify_ssl)
requests_mock.reset_mock()
def test_register(self):
self.response.status_code = httplib.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
'next': {'url': self.regr.new_authzr_uri},
'terms-of-service': {'url': self.regr.terms_of_service},
})
self._mock_post_get()
self.assertEqual(self.regr, self.net.register(self.contact))
# TODO: test POST call arguments
# TODO: split here and separate test
reg_wrong_key = self.regr.body.update(key=KEY2.public())
self.response.json.return_value = reg_wrong_key.to_json()
self.assertRaises(
errors.UnexpectedUpdate, self.net.register, self.contact)
def test_register_missing_next(self):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.register, self.regr.body)
def test_register_from_account(self):
self.net.register = mock.Mock()
acc = account.Account(
@ -299,265 +45,6 @@ class NetworkTest(unittest.TestCase):
self.net.register_from_account(acc2)
self.net.register.assert_called_with(contact=())
def test_update_registration(self):
self.response.headers['Location'] = self.regr.uri
self.response.json.return_value = self.regr.body.to_json()
self._mock_post_get()
self.assertEqual(self.regr, self.net.update_registration(self.regr))
# TODO: split here and separate test
self.response.json.return_value = self.regr.body.update(
contact=()).to_json()
self.assertRaises(
errors.UnexpectedUpdate, self.net.update_registration, self.regr)
def test_agree_to_tos(self):
self.net.update_registration = mock.Mock()
self.net.agree_to_tos(self.regr)
regr = self.net.update_registration.call_args[0][0]
self.assertEqual(self.regr.terms_of_service, regr.body.agreement)
def test_request_challenges(self):
self.response.status_code = httplib.CREATED
self.response.headers['Location'] = self.authzr.uri
self.response.json.return_value = self.authz.to_json()
self.response.links = {
'next': {'url': self.authzr.new_cert_uri},
}
self._mock_post_get()
self.net.request_challenges(self.identifier, self.authzr.uri)
# TODO: test POST call arguments
# TODO: split here and separate test
self.response.json.return_value = self.authz.update(
identifier=self.identifier.update(value='foo')).to_json()
self.assertRaises(errors.UnexpectedUpdate, self.net.request_challenges,
self.identifier, self.authzr.uri)
def test_request_challenges_missing_next(self):
self.response.status_code = httplib.CREATED
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.request_challenges,
self.identifier, self.regr)
def test_request_domain_challenges(self):
self.net.request_challenges = mock.MagicMock()
self.assertEqual(
self.net.request_challenges(self.identifier),
self.net.request_domain_challenges('example.com', self.regr))
def test_answer_challenge(self):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.to_json()
chall_response = challenges.DNSResponse()
self._mock_post_get()
self.net.answer_challenge(self.challr.body, chall_response)
# TODO: split here and separate test
self.assertRaises(errors.UnexpectedUpdate, self.net.answer_challenge,
self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self):
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.answer_challenge,
self.challr.body, challenges.DNSResponse())
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.assertEqual(
datetime.datetime(1999, 12, 31, 23, 59, 59),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.network.datetime')
def test_retry_after_invalid(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = 'foooo'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.network.datetime')
def test_retry_after_seconds(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 50),
self.net.retry_after(response=self.response, default=10))
@mock.patch('letsencrypt.network.datetime')
def test_retry_after_missing(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.net.retry_after(response=self.response, default=10))
def test_poll(self):
self.response.json.return_value = self.authzr.body.to_json()
self._mock_post_get()
self.assertEqual((self.authzr, self.response),
self.net.poll(self.authzr))
# TODO: split here and separate test
self.response.json.return_value = self.authz.update(
identifier=self.identifier.update(value='foo')).to_json()
self.assertRaises(errors.UnexpectedUpdate, self.net.poll, self.authzr)
def test_request_issuance(self):
self.response.content = CERT.as_der()
self.response.headers['Location'] = self.certr.uri
self.response.links['up'] = {'url': self.certr.cert_chain_uri}
self._mock_post_get()
self.assertEqual(
self.certr, self.net.request_issuance(CSR, (self.authzr,)))
# TODO: check POST args
def test_request_issuance_missing_up(self):
self.response.content = CERT.as_der()
self.response.headers['Location'] = self.certr.uri
self._mock_post_get()
self.assertEqual(
self.certr.update(cert_chain_uri=None),
self.net.request_issuance(CSR, (self.authzr,)))
def test_request_issuance_missing_location(self):
self._mock_post_get()
self.assertRaises(
errors.NetworkError, self.net.request_issuance,
CSR, (self.authzr,))
@mock.patch('letsencrypt.network.datetime')
@mock.patch('letsencrypt.network.time')
def test_poll_and_request_issuance(self, time_mock, dt_mock):
# clock.dt | pylint: disable=no-member
clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27))
def sleep(seconds):
"""increment clock"""
clock.dt += datetime.timedelta(seconds=seconds)
time_mock.sleep.side_effect = sleep
def now():
"""return current clock value"""
return clock.dt
dt_mock.datetime.now.side_effect = now
dt_mock.timedelta = datetime.timedelta
def poll(authzr): # pylint: disable=missing-docstring
# record poll start time based on the current clock value
authzr.times.append(clock.dt)
# suppose it takes 2 seconds for server to produce the
# result, increment clock
clock.dt += datetime.timedelta(seconds=2)
if not authzr.retries: # no more retries
done = mock.MagicMock(uri=authzr.uri, times=authzr.times)
done.body.status = messages.STATUS_VALID
return done, []
# response (2nd result tuple element) is reduced to only
# Retry-After header contents represented as integer
# seconds; authzr.retries is a list of Retry-After
# headers, head(retries) is peeled of as a current
# Retry-After header, and tail(retries) is persisted for
# later poll() calls
return (mock.MagicMock(retries=authzr.retries[1:],
uri=authzr.uri + '.', times=authzr.times),
authzr.retries[0])
self.net.poll = mock.MagicMock(side_effect=poll)
mintime = 7
def retry_after(response, default): # pylint: disable=missing-docstring
# check that poll_and_request_issuance correctly passes mintime
self.assertEqual(default, mintime)
return clock.dt + datetime.timedelta(seconds=response)
self.net.retry_after = mock.MagicMock(side_effect=retry_after)
def request_issuance(csr, authzrs): # pylint: disable=missing-docstring
return csr, authzrs
self.net.request_issuance = mock.MagicMock(side_effect=request_issuance)
csr = mock.MagicMock()
authzrs = (
mock.MagicMock(uri='a', times=[], retries=(8, 20, 30)),
mock.MagicMock(uri='b', times=[], retries=(5,)),
)
cert, updated_authzrs = self.net.poll_and_request_issuance(
csr, authzrs, mintime=mintime)
self.assertTrue(cert[0] is csr)
self.assertTrue(cert[1] is updated_authzrs)
self.assertEqual(updated_authzrs[0].uri, 'a...')
self.assertEqual(updated_authzrs[1].uri, 'b.')
self.assertEqual(updated_authzrs[0].times, [
datetime.datetime(2015, 3, 27),
# a is scheduled for 10, but b is polling [9..11), so it
# will be picked up as soon as b is finished, without
# additional sleeping
datetime.datetime(2015, 3, 27, 0, 0, 11),
datetime.datetime(2015, 3, 27, 0, 0, 33),
datetime.datetime(2015, 3, 27, 0, 1, 5),
])
self.assertEqual(updated_authzrs[1].times, [
datetime.datetime(2015, 3, 27, 0, 0, 2),
datetime.datetime(2015, 3, 27, 0, 0, 9),
])
self.assertEqual(clock.dt, datetime.datetime(2015, 3, 27, 0, 1, 7))
def test_check_cert(self):
self.response.headers['Location'] = self.certr.uri
self.response.content = CERT2.as_der()
self._mock_post_get()
self.assertEqual(
self.certr.update(body=CERT2), self.net.check_cert(self.certr))
# TODO: split here and separate test
self.response.headers['Location'] = 'foo'
self.assertRaises(
errors.UnexpectedUpdate, self.net.check_cert, self.certr)
def test_check_cert_missing_location(self):
self.response.content = CERT2.as_der()
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.check_cert, self.certr)
def test_refresh(self):
self.net.check_cert = mock.MagicMock()
self.assertEqual(
self.net.check_cert(self.certr), self.net.refresh(self.certr))
def test_fetch_chain(self):
# pylint: disable=protected-access
self.net._get_cert = mock.MagicMock()
self.net._get_cert.return_value = ("response", "certificate")
self.assertEqual(self.net._get_cert(self.certr.cert_chain_uri)[1],
self.net.fetch_chain(self.certr))
def test_fetch_chain_no_up_link(self):
self.assertTrue(self.net.fetch_chain(self.certr.update(
cert_chain_uri=None)) is None)
def test_revoke(self):
self._mock_post_get()
self.net.revoke(self.certr, when=messages.Revocation.NOW)
self.post.assert_called_once_with(self.certr.uri, mock.ANY)
def test_revoke_bad_status_raises_error(self):
self.response.status_code = httplib.METHOD_NOT_ALLOWED
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.revoke, self.certr)
if __name__ == '__main__':
unittest.main() # pragma: no cover