acme: remove Client and BackwardsCompatibleClientV2 (#9356)

* acme: remove Client and BackwardsCompatibleClientV2

* remove ClientTestBase and some unused variables

* add ClientV2.get_directory

* tweak ToS callback code

* acme: update example to use ClientV2.get_directory

* simplify ToS callback further into one step

* further removal of acmev1-related code

- remove acme.client.ClientBase
- remove acme.mixins.VersionedLEACMEMixin
- remove acme.client.DER_CONTENT_TYPE
- remove various ACMEv1 special cases
- remove acme.messages.ChallengeResources.combinations

* remove .mixins.ResourceMixin, fields.resource, fields.Resource
and resource field from various .message classes.

* simplify acme.messages.Directory:

- remove Directory.register
- remove HasResourceType and GenericHasResourceType
- remove ability to look up Directory resources by anything other
  than the exact field name in RFC8555 (section 9.7.5)

* remove acme.messages.OLD_ERROR_PREFIX and support the old prefix

* remove acme.mixins

* reorder imports

* add comment to Directory about resource lookups

* s/new-cert/newOrder/

* get rid of `resource` sillyness in tests

* remove acmev1 terms-of-service support from directory
This commit is contained in:
alexzorin 2022-09-07 07:36:55 +10:00 committed by GitHub
parent d8e45c286d
commit 804ca32314
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 452 additions and 1917 deletions

View file

@ -23,9 +23,6 @@ import requests
from acme import crypto_util
from acme import errors
from acme import fields
from acme.mixins import ResourceMixin
from acme.mixins import TypeMixin
logger = logging.getLogger(__name__)
@ -47,12 +44,17 @@ class Challenge(jose.TypedJSONObjectWithFields):
return UnrecognizedChallenge.from_json(jobj)
class ChallengeResponse(ResourceMixin, TypeMixin, jose.TypedJSONObjectWithFields):
class ChallengeResponse(jose.TypedJSONObjectWithFields):
# _fields_to_partial_json
"""ACME challenge response."""
TYPES: Dict[str, Type['ChallengeResponse']] = {}
resource_type = 'challenge'
resource: str = fields.resource(resource_type)
def to_partial_json(self) -> Dict[str, Any]:
# Removes the `type` field which is inserted by TypedJSONObjectWithFields.to_partial_json.
# This field breaks RFC8555 compliance.
jobj = super().to_partial_json()
jobj.pop(self.type_field_name, None)
return jobj
class UnrecognizedChallenge(Challenge):

File diff suppressed because it is too large Load diff

View file

@ -51,22 +51,6 @@ class RFC3339Field(jose.Field):
raise jose.DeserializationError(error)
class Resource(jose.Field):
"""Resource MITM field."""
def __init__(self, resource_type: str, *args: Any, **kwargs: Any) -> None:
self.resource_type = resource_type
kwargs['default'] = resource_type
super().__init__('resource', *args, **kwargs)
def decode(self, value: Any) -> Any:
if value != self.resource_type:
raise jose.DeserializationError(
'Wrong resource type: {0} instead of {1}'.format(
value, self.resource_type))
return value
def fixed(json_name: str, value: Any) -> Any:
"""Generates a type-friendly Fixed field."""
return Fixed(json_name, value)
@ -75,8 +59,3 @@ def fixed(json_name: str, value: Any) -> Any:
def rfc3339(json_name: str, omitempty: bool = False) -> Any:
"""Generates a type-friendly RFC3339 field."""
return RFC3339Field(json_name, omitempty=omitempty)
def resource(resource_type: str) -> Any:
"""Generates a type-friendly Resource field."""
return Resource(resource_type)

View file

@ -1,18 +0,0 @@
"""Simple shim around the typing module.
This was useful when this code supported Python 2 and typing wasn't always
available. This code is being kept for now for backwards compatibility.
"""
import warnings
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
from typing import Any
warnings.warn("acme.magic_typing is deprecated and will be removed in a future release.",
DeprecationWarning)
class TypingClass:
"""Ignore import errors by getting anything"""
def __getattr__(self, name: str) -> Any:
return None # pragma: no cover

View file

@ -11,9 +11,7 @@ from typing import MutableMapping
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
import josepy as jose
@ -22,14 +20,8 @@ from acme import errors
from acme import fields
from acme import jws
from acme import util
from acme.mixins import ResourceMixin
if TYPE_CHECKING:
from typing_extensions import Protocol # pragma: no cover
else:
Protocol = object
OLD_ERROR_PREFIX = "urn:acme:error:"
ERROR_PREFIX = "urn:ietf:params:acme:error:"
ERROR_CODES = {
@ -67,15 +59,13 @@ ERROR_CODES = {
ERROR_TYPE_DESCRIPTIONS = {**{
ERROR_PREFIX + name: desc for name, desc in ERROR_CODES.items()
}, **{ # add errors with old prefix, deprecate me
OLD_ERROR_PREFIX + name: desc for name, desc in ERROR_CODES.items()
}}
def is_acme_error(err: BaseException) -> bool:
"""Check if argument is an ACME error."""
if isinstance(err, Error) and (err.typ is not None):
return (ERROR_PREFIX in err.typ) or (OLD_ERROR_PREFIX in err.typ)
return ERROR_PREFIX in err.typ
return False
@ -223,25 +213,15 @@ STATUS_READY = Status('ready')
STATUS_DEACTIVATED = Status('deactivated')
class HasResourceType(Protocol):
"""
Represents a class with a resource_type class parameter of type string.
"""
resource_type: str = NotImplemented
GenericHasResourceType = TypeVar("GenericHasResourceType", bound=HasResourceType)
class Directory(jose.JSONDeSerializable):
"""Directory."""
"""Directory.
_REGISTERED_TYPES: Dict[str, Type[HasResourceType]] = {}
Directory resources must be accessed by the exact field name in RFC8555 (section 9.7.5).
"""
class Meta(jose.JSONObjectWithFields):
"""Directory Meta."""
_terms_of_service: str = jose.field('terms-of-service', omitempty=True)
_terms_of_service_v2: str = jose.field('termsOfService', omitempty=True)
_terms_of_service: str = jose.field('termsOfService', omitempty=True)
website: str = jose.field('website', omitempty=True)
caa_identities: List[str] = jose.field('caaIdentities', omitempty=True)
external_account_required: bool = jose.field('externalAccountRequired', omitempty=True)
@ -253,7 +233,7 @@ class Directory(jose.JSONDeSerializable):
@property
def terms_of_service(self) -> str:
"""URL for the CA TOS"""
return self._terms_of_service or self._terms_of_service_v2
return self._terms_of_service
def __iter__(self) -> Iterator[str]:
# When iterating over fields, use the external name 'terms_of_service' instead of
@ -264,41 +244,23 @@ class Directory(jose.JSONDeSerializable):
def _internal_name(self, name: str) -> str:
return '_' + name if name == 'terms_of_service' else name
@classmethod
def _canon_key(cls, key: Union[str, HasResourceType, Type[HasResourceType]]) -> str:
if isinstance(key, str):
return key
return key.resource_type
@classmethod
def register(cls,
resource_body_cls: Type[GenericHasResourceType]) -> Type[GenericHasResourceType]:
"""Register resource."""
resource_type = resource_body_cls.resource_type
assert resource_type not in cls._REGISTERED_TYPES
cls._REGISTERED_TYPES[resource_type] = resource_body_cls
return resource_body_cls
def __init__(self, jobj: Mapping[str, Any]) -> None:
canon_jobj = util.map_keys(jobj, self._canon_key)
# TODO: check that everything is an absolute URL; acme-spec is
# not clear on that
self._jobj = canon_jobj
self._jobj = jobj
def __getattr__(self, name: str) -> Any:
try:
return self[name.replace('_', '-')]
return self[name]
except KeyError as error:
raise AttributeError(str(error))
def __getitem__(self, name: Union[str, HasResourceType, Type[HasResourceType]]) -> Any:
def __getitem__(self, name: str) -> Any:
try:
return self._jobj[self._canon_key(name)]
return self._jobj[name]
except KeyError:
raise KeyError('Directory field "' + self._canon_key(name) + '" not found')
raise KeyError(f'Directory field "{name}" not found')
def to_partial_json(self) -> Dict[str, Any]:
return self._jobj
return util.map_keys(self._jobj, lambda k: k)
@classmethod
def from_json(cls, jobj: MutableMapping[str, Any]) -> 'Directory':
@ -459,17 +421,12 @@ class Registration(ResourceBody):
return self._filter_contact(self.email_prefix)
@Directory.register
class NewRegistration(ResourceMixin, Registration):
class NewRegistration(Registration):
"""New registration."""
resource_type = 'new-reg'
resource: str = fields.resource(resource_type)
class UpdateRegistration(ResourceMixin, Registration):
class UpdateRegistration(Registration):
"""Update registration."""
resource_type = 'reg'
resource: str = fields.resource(resource_type)
class RegistrationResource(ResourceWithURI):
@ -507,7 +464,6 @@ class ChallengeBody(ResourceBody):
# challenge object supports either one, but should be accessed through the
# name "uri". In Client.answer_challenge, whichever one is set will be
# used.
_uri: str = jose.field('uri', omitempty=True, default=None)
_url: str = jose.field('url', omitempty=True, default=None)
status: Status = jose.field('status', decoder=Status.from_json,
omitempty=True, default=STATUS_PENDING)
@ -536,7 +492,7 @@ class ChallengeBody(ResourceBody):
@property
def uri(self) -> str:
"""The URL of this challenge."""
return self._url or self._uri
return self._url
def __getattr__(self, name: str) -> Any:
return getattr(self.chall, name)
@ -545,10 +501,10 @@ class ChallengeBody(ResourceBody):
# When iterating over fields, use the external name 'uri' instead of
# the internal '_uri'.
for name in super().__iter__():
yield name[1:] if name == '_uri' else name
yield 'uri' if name == '_url' else name
def _internal_name(self, name: str) -> str:
return '_' + name if name == 'uri' else name
return '_url' if name == 'uri' else name
class ChallengeResource(Resource):
@ -572,15 +528,12 @@ class Authorization(ResourceBody):
:ivar acme.messages.Identifier identifier:
:ivar list challenges: `list` of `.ChallengeBody`
:ivar tuple combinations: Challenge combinations (`tuple` of `tuple`
of `int`, as opposed to `list` of `list` from the spec).
:ivar acme.messages.Status status:
:ivar datetime.datetime expires:
"""
identifier: Identifier = jose.field('identifier', decoder=Identifier.from_json, omitempty=True)
challenges: List[ChallengeBody] = jose.field('challenges', omitempty=True)
combinations: Tuple[Tuple[int, ...], ...] = jose.field('combinations', omitempty=True)
status: Status = jose.field('status', omitempty=True, decoder=Status.from_json)
# TODO: 'expires' is allowed for Authorization Resources in
@ -596,24 +549,13 @@ class Authorization(ResourceBody):
def challenges(value: List[Dict[str, Any]]) -> Tuple[ChallengeBody, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring
return tuple(ChallengeBody.from_json(chall) for chall in value)
@property
def resolved_combinations(self) -> Tuple[Tuple[ChallengeBody, ...], ...]:
"""Combinations with challenges instead of indices."""
return tuple(tuple(self.challenges[idx] for idx in combo)
for combo in self.combinations) # pylint: disable=not-an-iterable
@Directory.register
class NewAuthorization(ResourceMixin, Authorization):
class NewAuthorization(Authorization):
"""New authorization."""
resource_type = 'new-authz'
resource: str = fields.resource(resource_type)
class UpdateAuthorization(ResourceMixin, Authorization):
class UpdateAuthorization(Authorization):
"""Update authorization."""
resource_type = 'authz'
resource: str = fields.resource(resource_type)
class AuthorizationResource(ResourceWithURI):
@ -627,16 +569,13 @@ class AuthorizationResource(ResourceWithURI):
new_cert_uri: str = jose.field('new_cert_uri', omitempty=True)
@Directory.register
class CertificateRequest(ResourceMixin, jose.JSONObjectWithFields):
"""ACME new-cert request.
class CertificateRequest(jose.JSONObjectWithFields):
"""ACME newOrder request.
:ivar jose.ComparableX509 csr:
`OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
"""
resource_type = 'new-cert'
resource: str = fields.resource(resource_type)
csr: jose.ComparableX509 = jose.field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
@ -653,16 +592,13 @@ class CertificateResource(ResourceWithURI):
authzrs: Tuple[AuthorizationResource, ...] = jose.field('authzrs')
@Directory.register
class Revocation(ResourceMixin, jose.JSONObjectWithFields):
class Revocation(jose.JSONObjectWithFields):
"""Revocation message.
:ivar jose.ComparableX509 certificate: `OpenSSL.crypto.X509` wrapped in
`jose.ComparableX509`
"""
resource_type = 'revoke-cert'
resource: str = fields.resource(resource_type)
certificate: jose.ComparableX509 = jose.field(
'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert)
reason: int = jose.field('reason')
@ -719,7 +655,5 @@ class OrderResource(ResourceWithURI):
omitempty=True)
@Directory.register
class NewOrder(Order):
"""New order."""
resource_type = 'new-order'

View file

@ -1,68 +0,0 @@
"""Useful mixins for Challenge and Resource objects"""
from typing import Any
from typing import Dict
class VersionedLEACMEMixin:
"""This mixin stores the version of Let's Encrypt's endpoint being used."""
@property
def le_acme_version(self) -> int:
"""Define the version of ACME protocol to use"""
return getattr(self, '_le_acme_version', 1)
@le_acme_version.setter
def le_acme_version(self, version: int) -> None:
# We need to use object.__setattr__ to not depend on the specific implementation of
# __setattr__ in current class (eg. jose.TypedJSONObjectWithFields raises AttributeError
# for any attempt to set an attribute to make objects immutable).
object.__setattr__(self, '_le_acme_version', version)
def __setattr__(self, key: str, value: Any) -> None:
if key == 'le_acme_version':
# Required for @property to operate properly. See comment above.
object.__setattr__(self, key, value)
else:
super().__setattr__(key, value) # pragma: no cover
class ResourceMixin(VersionedLEACMEMixin):
"""
This mixin generates a RFC8555 compliant JWS payload
by removing the `resource` field if needed (eg. ACME v2 protocol).
"""
def to_partial_json(self) -> Dict[str, Any]:
"""See josepy.JSONDeserializable.to_partial_json()"""
return _safe_jobj_compliance(super(),
'to_partial_json', 'resource')
def fields_to_partial_json(self) -> Dict[str, Any]:
"""See josepy.JSONObjectWithFields.fields_to_partial_json()"""
return _safe_jobj_compliance(super(),
'fields_to_partial_json', 'resource')
class TypeMixin(VersionedLEACMEMixin):
"""
This mixin allows generation of a RFC8555 compliant JWS payload
by removing the `type` field if needed (eg. ACME v2 protocol).
"""
def to_partial_json(self) -> Dict[str, Any]:
"""See josepy.JSONDeserializable.to_partial_json()"""
return _safe_jobj_compliance(super(),
'to_partial_json', 'type')
def fields_to_partial_json(self) -> Dict[str, Any]:
"""See josepy.JSONObjectWithFields.fields_to_partial_json()"""
return _safe_jobj_compliance(super(),
'fields_to_partial_json', 'type')
def _safe_jobj_compliance(instance: Any, jobj_method: str,
uncompliant_field: str) -> Dict[str, Any]:
if hasattr(instance, jobj_method):
jobj: Dict[str, Any] = getattr(instance, jobj_method)()
if instance.le_acme_version == 2:
jobj.pop(uncompliant_field, None)
return jobj
raise AttributeError(f'Method {jobj_method}() is not implemented.') # pragma: no cover

View file

@ -163,7 +163,7 @@ def example_http():
# Register account and accept TOS
net = client.ClientNetwork(acc_key, user_agent=USER_AGENT)
directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json())
directory = client.ClientV2.get_directory(DIRECTORY_URL, net)
client_acme = client.ClientV2(directory, net=net)
# Terms of Service URL is in client_acme.directory.meta.terms_of_service
@ -215,8 +215,7 @@ def example_http():
try:
regr = client_acme.query_registration(regr)
except errors.Error as err:
if err.typ == messages.OLD_ERROR_PREFIX + 'unauthorized' \
or err.typ == messages.ERROR_PREFIX + 'unauthorized':
if err.typ == messages.ERROR_PREFIX + 'unauthorized':
# Status is deactivated.
pass
raise

View file

@ -92,8 +92,7 @@ class DNS01ResponseTest(unittest.TestCase):
self.response = self.chall.response(KEY)
def test_to_partial_json(self):
self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'},
self.msg.to_partial_json())
self.assertEqual({}, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import DNS01Response
@ -163,8 +162,7 @@ class HTTP01ResponseTest(unittest.TestCase):
self.response = self.chall.response(KEY)
def test_to_partial_json(self):
self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'},
self.msg.to_partial_json())
self.assertEqual({}, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import HTTP01Response
@ -274,8 +272,7 @@ class TLSALPN01ResponseTest(unittest.TestCase):
}
def test_to_partial_json(self):
self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'},
self.response.to_partial_json())
self.assertEqual({}, self.response.to_partial_json())
def test_from_json(self):
from acme.challenges import TLSALPN01Response
@ -461,8 +458,6 @@ class DNSResponseTest(unittest.TestCase):
from acme.challenges import DNSResponse
self.msg = DNSResponse(validation=self.validation)
self.jmsg_to = {
'resource': 'challenge',
'type': 'dns',
'validation': self.validation,
}
self.jmsg_from = {
@ -492,7 +487,6 @@ class JWSPayloadRFC8555Compliant(unittest.TestCase):
from acme.challenges import HTTP01Response
challenge_body = HTTP01Response()
challenge_body.le_acme_version = 2
jobj = challenge_body.json_dumps(indent=2).encode()
# RFC8555 states that challenge responses must have an empty payload.

View file

@ -3,52 +3,37 @@
import copy
import datetime
import http.client as http_client
import ipaddress
import json
import unittest
from typing import Dict
from unittest import mock
import josepy as jose
import OpenSSL
import requests
from acme import challenges
from acme import errors
from acme import jws as acme_jws
from acme import messages
from acme.mixins import VersionedLEACMEMixin
from acme.client import ClientV2
import messages_test
import test_util
CERT_DER = test_util.load_vector('cert.der')
CERT_SAN_PEM = test_util.load_vector('cert-san.pem')
CSR_SAN_PEM = test_util.load_vector('csr-san.pem')
CSR_MIXED_PEM = test_util.load_vector('csr-mixed.pem')
KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
KEY2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
DIRECTORY_V1 = messages.Directory({
messages.NewRegistration:
'https://www.letsencrypt-demo.org/acme/new-reg',
messages.Revocation:
'https://www.letsencrypt-demo.org/acme/revoke-cert',
messages.NewAuthorization:
'https://www.letsencrypt-demo.org/acme/new-authz',
messages.CertificateRequest:
'https://www.letsencrypt-demo.org/acme/new-cert',
})
DIRECTORY_V2 = messages.Directory({
'newAccount': 'https://www.letsencrypt-demo.org/acme/new-account',
'newNonce': 'https://www.letsencrypt-demo.org/acme/new-nonce',
'newOrder': 'https://www.letsencrypt-demo.org/acme/new-order',
'revokeCert': 'https://www.letsencrypt-demo.org/acme/revoke-cert',
'meta': messages.Directory.Meta(),
})
class ClientTestBase(unittest.TestCase):
"""Base for tests in acme.client."""
class ClientV2Test(unittest.TestCase):
"""Tests for acme.client.ClientV2."""
def setUp(self):
self.response = mock.MagicMock(
@ -80,650 +65,15 @@ class ClientTestBase(unittest.TestCase):
self.authz = messages.Authorization(
identifier=messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com'),
challenges=(challb,), combinations=None)
challenges=(challb,))
self.authzr = messages.AuthorizationResource(
body=self.authz, uri=authzr_uri)
# Reason code for revocation
self.rsn = 1
class BackwardsCompatibleClientV2Test(ClientTestBase):
"""Tests for acme.client.BackwardsCompatibleClientV2."""
def setUp(self):
super().setUp()
# contains a loaded cert
self.certr = messages.CertificateResource(
body=messages_test.CERT)
loaded = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, CERT_SAN_PEM)
wrapped = jose.ComparableX509(loaded)
self.chain = [wrapped, wrapped]
self.cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, messages_test.CERT.wrapped).decode()
single_chain = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, loaded).decode()
self.chain_pem = single_chain + single_chain
self.fullchain_pem = self.cert_pem + self.chain_pem
self.orderr = messages.OrderResource(
csr_pem=CSR_SAN_PEM)
def _init(self):
uri = 'http://www.letsencrypt-demo.org/directory'
from acme.client import BackwardsCompatibleClientV2
return BackwardsCompatibleClientV2(net=self.net,
key=KEY, server=uri)
def test_init_downloads_directory(self):
uri = 'http://www.letsencrypt-demo.org/directory'
from acme.client import BackwardsCompatibleClientV2
BackwardsCompatibleClientV2(net=self.net,
key=KEY, server=uri)
self.net.get.assert_called_once_with(uri)
def test_init_acme_version(self):
self.response.json.return_value = DIRECTORY_V1.to_json()
client = self._init()
self.assertEqual(client.acme_version, 1)
self.response.json.return_value = DIRECTORY_V2.to_json()
client = self._init()
self.assertEqual(client.acme_version, 2)
def test_query_registration_client_v2(self):
self.response.json.return_value = DIRECTORY_V2.to_json()
client = self._init()
self.response.json.return_value = self.regr.body.to_json()
self.response.headers = {'Location': 'https://www.letsencrypt-demo.org/acme/reg/1'}
self.assertEqual(self.regr, client.query_registration(self.regr))
def test_forwarding(self):
self.response.json.return_value = DIRECTORY_V1.to_json()
client = self._init()
self.assertEqual(client.directory, client.client.directory)
self.assertEqual(client.key, KEY)
self.assertEqual(client.deactivate_registration, client.client.deactivate_registration)
self.assertRaises(AttributeError, client.__getattr__, 'nonexistent')
self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos')
self.assertRaises(AttributeError, client.__getattr__, 'new_account')
def test_new_account_and_tos(self):
# v2 no tos
self.response.json.return_value = DIRECTORY_V2.to_json()
with mock.patch('acme.client.ClientV2') as mock_client:
client = self._init()
client.new_account_and_tos(self.new_reg)
mock_client().new_account.assert_called_with(self.new_reg)
# v2 tos good
with mock.patch('acme.client.ClientV2') as mock_client:
mock_client().directory.meta.__contains__.return_value = True
client = self._init()
client.new_account_and_tos(self.new_reg, lambda x: True)
mock_client().new_account.assert_called_with(
self.new_reg.update(terms_of_service_agreed=True))
# v2 tos bad
with mock.patch('acme.client.ClientV2') as mock_client:
mock_client().directory.meta.__contains__.return_value = True
client = self._init()
def _tos_cb(tos):
raise errors.Error
self.assertRaises(errors.Error, client.new_account_and_tos,
self.new_reg, _tos_cb)
mock_client().new_account.assert_not_called()
# v1 yes tos
self.response.json.return_value = DIRECTORY_V1.to_json()
with mock.patch('acme.client.Client') as mock_client:
regr = mock.MagicMock(terms_of_service="TOS")
mock_client().register.return_value = regr
client = self._init()
client.new_account_and_tos(self.new_reg)
mock_client().register.assert_called_once_with(self.new_reg)
mock_client().agree_to_tos.assert_called_once_with(regr)
# v1 no tos
with mock.patch('acme.client.Client') as mock_client:
regr = mock.MagicMock(terms_of_service=None)
mock_client().register.return_value = regr
client = self._init()
client.new_account_and_tos(self.new_reg)
mock_client().register.assert_called_once_with(self.new_reg)
mock_client().agree_to_tos.assert_not_called()
@mock.patch('OpenSSL.crypto.load_certificate_request')
@mock.patch('acme.crypto_util._pyopenssl_cert_or_req_all_names')
def test_new_order_v1(self, mock__pyopenssl_cert_or_req_all_names,
unused_mock_load_certificate_request):
self.response.json.return_value = DIRECTORY_V1.to_json()
mock__pyopenssl_cert_or_req_all_names.return_value = ['example.com', 'www.example.com']
mock_csr_pem = mock.MagicMock()
with mock.patch('acme.client.Client') as mock_client:
mock_client().request_domain_challenges.return_value = mock.sentinel.auth
client = self._init()
orderr = client.new_order(mock_csr_pem)
self.assertEqual(orderr.authorizations, [mock.sentinel.auth, mock.sentinel.auth])
def test_new_order_v2(self):
self.response.json.return_value = DIRECTORY_V2.to_json()
mock_csr_pem = mock.MagicMock()
with mock.patch('acme.client.ClientV2') as mock_client:
client = self._init()
client.new_order(mock_csr_pem)
mock_client().new_order.assert_called_once_with(mock_csr_pem)
@mock.patch('acme.client.Client')
def test_finalize_order_v1_success(self, mock_client):
self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr
mock_client().fetch_chain.return_value = self.chain
deadline = datetime.datetime(9999, 9, 9)
client = self._init()
result = client.finalize_order(self.orderr, deadline)
self.assertEqual(result.fullchain_pem, self.fullchain_pem)
mock_client().fetch_chain.assert_called_once_with(self.certr)
@mock.patch('acme.client.Client')
def test_finalize_order_v1_fetch_chain_error(self, mock_client):
self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr
mock_client().fetch_chain.return_value = self.chain
mock_client().fetch_chain.side_effect = [errors.Error, self.chain]
deadline = datetime.datetime(9999, 9, 9)
client = self._init()
result = client.finalize_order(self.orderr, deadline)
self.assertEqual(result.fullchain_pem, self.fullchain_pem)
self.assertEqual(mock_client().fetch_chain.call_count, 2)
@mock.patch('acme.client.Client')
def test_finalize_order_v1_timeout(self, mock_client):
self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr
deadline = deadline = datetime.datetime.now() - datetime.timedelta(seconds=60)
client = self._init()
self.assertRaises(errors.TimeoutError, client.finalize_order,
self.orderr, deadline)
def test_finalize_order_v2(self):
self.response.json.return_value = DIRECTORY_V2.to_json()
mock_orderr = mock.MagicMock()
mock_deadline = mock.MagicMock()
with mock.patch('acme.client.ClientV2') as mock_client:
client = self._init()
client.finalize_order(mock_orderr, mock_deadline)
mock_client().finalize_order.assert_called_once_with(mock_orderr, mock_deadline, False)
def test_revoke(self):
self.response.json.return_value = DIRECTORY_V1.to_json()
with mock.patch('acme.client.Client') as mock_client:
client = self._init()
client.revoke(messages_test.CERT, self.rsn)
mock_client().revoke.assert_called_once_with(messages_test.CERT, self.rsn)
self.response.json.return_value = DIRECTORY_V2.to_json()
with mock.patch('acme.client.ClientV2') as mock_client:
client = self._init()
client.revoke(messages_test.CERT, self.rsn)
mock_client().revoke.assert_called_once_with(messages_test.CERT, self.rsn)
def test_update_registration(self):
self.response.json.return_value = DIRECTORY_V1.to_json()
with mock.patch('acme.client.Client') as mock_client:
client = self._init()
client.update_registration(mock.sentinel.regr, None)
mock_client().update_registration.assert_called_once_with(mock.sentinel.regr, None)
# newNonce present means it will pick acme_version 2
def test_external_account_required_true(self):
self.response.json.return_value = messages.Directory({
'newNonce': 'http://letsencrypt-test.com/acme/new-nonce',
'meta': messages.Directory.Meta(external_account_required=True),
}).to_json()
client = self._init()
self.assertTrue(client.external_account_required())
# newNonce present means it will pick acme_version 2
def test_external_account_required_false(self):
self.response.json.return_value = messages.Directory({
'newNonce': 'http://letsencrypt-test.com/acme/new-nonce',
'meta': messages.Directory.Meta(external_account_required=False),
}).to_json()
client = self._init()
self.assertFalse(client.external_account_required())
def test_external_account_required_false_v1(self):
self.response.json.return_value = messages.Directory({
'meta': messages.Directory.Meta(external_account_required=False),
}).to_json()
client = self._init()
self.assertFalse(client.external_account_required())
class ClientTest(ClientTestBase):
"""Tests for acme.client.Client."""
def setUp(self):
super().setUp()
self.directory = DIRECTORY_V1
# Registration
self.regr = self.regr.update(
terms_of_service='https://www.letsencrypt-demo.org/tos')
# Request issuance
self.certr = messages.CertificateResource(
body=messages_test.CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
from acme.client import Client
self.client = Client(
directory=self.directory, key=KEY, alg=jose.RS256, net=self.net)
def test_init_downloads_directory(self):
uri = 'http://www.letsencrypt-demo.org/directory'
from acme.client import Client
self.client = Client(
directory=uri, key=KEY, alg=jose.RS256, net=self.net)
self.net.get.assert_called_once_with(uri)
@mock.patch('acme.client.ClientNetwork')
def test_init_without_net(self, mock_net):
mock_net.return_value = mock.sentinel.net
alg = jose.RS256
from acme.client import Client
self.client = Client(
directory=self.directory, key=KEY, alg=alg)
mock_net.called_once_with(KEY, alg=alg, verify_ssl=True)
self.assertEqual(self.client.net, mock.sentinel.net)
def test_register(self):
# "Instance of 'Field' has no to_json/update member" bug:
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
'terms-of-service': {'url': self.regr.terms_of_service},
})
self.assertEqual(self.regr, self.client.register(self.new_reg))
# TODO: test POST call arguments
def test_update_registration(self):
# "Instance of 'Field' has no to_json/update member" bug:
self.response.headers['Location'] = self.regr.uri
self.response.json.return_value = self.regr.body.to_json()
self.assertEqual(self.regr, self.client.update_registration(self.regr))
# TODO: test POST call arguments
# TODO: split here and separate test
self.response.json.return_value = self.regr.body.update(
contact=()).to_json()
def test_deactivate_account(self):
self.response.headers['Location'] = self.regr.uri
self.response.json.return_value = self.regr.body.to_json()
self.assertEqual(self.regr,
self.client.deactivate_registration(self.regr))
def test_query_registration(self):
self.response.json.return_value = self.regr.body.to_json()
self.assertEqual(self.regr, self.client.query_registration(self.regr))
def test_agree_to_tos(self):
self.client.update_registration = mock.Mock()
self.client.agree_to_tos(self.regr)
regr = self.client.update_registration.call_args[0][0]
self.assertEqual(self.regr.terms_of_service, regr.body.agreement)
def _prepare_response_for_request_challenges(self):
self.response.status_code = http_client.CREATED
self.response.headers['Location'] = self.authzr.uri
self.response.json.return_value = self.authz.to_json()
def test_request_challenges(self):
self._prepare_response_for_request_challenges()
self.client.request_challenges(self.identifier)
self.net.post.assert_called_once_with(
self.directory.new_authz,
messages.NewAuthorization(identifier=self.identifier),
acme_version=1)
def test_request_challenges_deprecated_arg(self):
self._prepare_response_for_request_challenges()
self.client.request_challenges(self.identifier, new_authzr_uri="hi")
self.net.post.assert_called_once_with(
self.directory.new_authz,
messages.NewAuthorization(identifier=self.identifier),
acme_version=1)
def test_request_challenges_custom_uri(self):
self._prepare_response_for_request_challenges()
self.client.request_challenges(self.identifier)
self.net.post.assert_called_once_with(
'https://www.letsencrypt-demo.org/acme/new-authz', mock.ANY,
acme_version=1)
def test_request_challenges_unexpected_update(self):
self._prepare_response_for_request_challenges()
self.response.json.return_value = self.authz.update(
identifier=self.identifier.update(value='foo')).to_json()
self.assertRaises(
errors.UnexpectedUpdate, self.client.request_challenges,
self.identifier)
def test_request_challenges_wildcard(self):
wildcard_identifier = messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='*.example.org')
self.assertRaises(
errors.WildcardUnsupportedError, self.client.request_challenges,
wildcard_identifier)
def test_request_domain_challenges(self):
self.client.request_challenges = mock.MagicMock()
self.assertEqual(
self.client.request_challenges(self.identifier),
self.client.request_domain_challenges('example.com'))
def test_answer_challenge(self):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.to_json()
chall_response = challenges.DNSResponse(validation=None)
self.client.answer_challenge(self.challr.body, chall_response)
# TODO: split here and separate test
self.assertRaises(errors.UnexpectedUpdate, self.client.answer_challenge,
self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self):
self.assertRaises(
errors.ClientError, self.client.answer_challenge,
self.challr.body, challenges.DNSResponse(validation=None))
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.assertEqual(
datetime.datetime(1999, 12, 31, 23, 59, 59),
self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_invalid(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = 'foooo'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_overflow(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
dt_mock.datetime.side_effect = datetime.datetime
self.response.headers['Retry-After'] = "Tue, 116 Feb 2016 11:50:00 MST"
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_seconds(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 50),
self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_missing(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.client.retry_after(response=self.response, default=10))
def test_poll(self):
self.response.json.return_value = self.authzr.body.to_json()
self.assertEqual((self.authzr, self.response),
self.client.poll(self.authzr))
# TODO: split here and separate test
self.response.json.return_value = self.authz.update(
identifier=self.identifier.update(value='foo')).to_json()
self.assertRaises(
errors.UnexpectedUpdate, self.client.poll, self.authzr)
def test_request_issuance(self):
self.response.content = CERT_DER
self.response.headers['Location'] = self.certr.uri
self.response.links['up'] = {'url': self.certr.cert_chain_uri}
self.assertEqual(self.certr, self.client.request_issuance(
messages_test.CSR, (self.authzr,)))
# TODO: check POST args
def test_request_issuance_missing_up(self):
self.response.content = CERT_DER
self.response.headers['Location'] = self.certr.uri
self.assertEqual(
self.certr.update(cert_chain_uri=None),
self.client.request_issuance(messages_test.CSR, (self.authzr,)))
def test_request_issuance_missing_location(self):
self.assertRaises(
errors.ClientError, self.client.request_issuance,
messages_test.CSR, (self.authzr,))
@mock.patch('acme.client.datetime')
@mock.patch('acme.client.time')
def test_poll_and_request_issuance(self, time_mock, dt_mock):
# clock.dt | pylint: disable=no-member
clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27))
def sleep(seconds):
"""increment clock"""
clock.dt += datetime.timedelta(seconds=seconds)
time_mock.sleep.side_effect = sleep
def now():
"""return current clock value"""
return clock.dt
dt_mock.datetime.now.side_effect = now
dt_mock.timedelta = datetime.timedelta
def poll(authzr): # pylint: disable=missing-docstring
# record poll start time based on the current clock value
authzr.times.append(clock.dt)
# suppose it takes 2 seconds for server to produce the
# result, increment clock
clock.dt += datetime.timedelta(seconds=2)
if len(authzr.retries) == 1: # no more retries
done = mock.MagicMock(uri=authzr.uri, times=authzr.times)
done.body.status = authzr.retries[0]
return done, []
# response (2nd result tuple element) is reduced to only
# Retry-After header contents represented as integer
# seconds; authzr.retries is a list of Retry-After
# headers, head(retries) is peeled of as a current
# Retry-After header, and tail(retries) is persisted for
# later poll() calls
return (mock.MagicMock(retries=authzr.retries[1:],
uri=authzr.uri + '.', times=authzr.times),
authzr.retries[0])
self.client.poll = mock.MagicMock(side_effect=poll)
mintime = 7
def retry_after(response, default):
# pylint: disable=missing-docstring
# check that poll_and_request_issuance correctly passes mintime
self.assertEqual(default, mintime)
return clock.dt + datetime.timedelta(seconds=response)
self.client.retry_after = mock.MagicMock(side_effect=retry_after)
def request_issuance(csr, authzrs): # pylint: disable=missing-docstring
return csr, authzrs
self.client.request_issuance = mock.MagicMock(
side_effect=request_issuance)
csr = mock.MagicMock()
authzrs = (
mock.MagicMock(uri='a', times=[], retries=(
8, 20, 30, messages.STATUS_VALID)),
mock.MagicMock(uri='b', times=[], retries=(
5, messages.STATUS_VALID)),
)
cert, updated_authzrs = self.client.poll_and_request_issuance(
csr, authzrs, mintime=mintime,
# make sure that max_attempts is per-authorization, rather
# than global
max_attempts=max(len(authzrs[0].retries), len(authzrs[1].retries)))
self.assertIs(cert[0], csr)
self.assertIs(cert[1], updated_authzrs)
self.assertEqual(updated_authzrs[0].uri, 'a...')
self.assertEqual(updated_authzrs[1].uri, 'b.')
self.assertEqual(updated_authzrs[0].times, [
datetime.datetime(2015, 3, 27),
# a is scheduled for 10, but b is polling [9..11), so it
# will be picked up as soon as b is finished, without
# additional sleeping
datetime.datetime(2015, 3, 27, 0, 0, 11),
datetime.datetime(2015, 3, 27, 0, 0, 33),
datetime.datetime(2015, 3, 27, 0, 1, 5),
])
self.assertEqual(updated_authzrs[1].times, [
datetime.datetime(2015, 3, 27, 0, 0, 2),
datetime.datetime(2015, 3, 27, 0, 0, 9),
])
self.assertEqual(clock.dt, datetime.datetime(2015, 3, 27, 0, 1, 7))
# CA sets invalid | TODO: move to a separate test
invalid_authzr = mock.MagicMock(
times=[], retries=[messages.STATUS_INVALID])
self.assertRaises(
errors.PollError, self.client.poll_and_request_issuance,
csr, authzrs=(invalid_authzr,), mintime=mintime)
# exceeded max_attempts | TODO: move to a separate test
self.assertRaises(
errors.PollError, self.client.poll_and_request_issuance,
csr, authzrs, mintime=mintime, max_attempts=2)
def test_deactivate_authorization(self):
authzb = self.authzr.body.update(status=messages.STATUS_DEACTIVATED)
self.response.json.return_value = authzb.to_json()
authzr = self.client.deactivate_authorization(self.authzr)
self.assertEqual(authzb, authzr.body)
self.assertEqual(self.client.net.post.call_count, 1)
self.assertIn(self.authzr.uri, self.net.post.call_args_list[0][0])
def test_check_cert(self):
self.response.headers['Location'] = self.certr.uri
self.response.content = CERT_DER
self.assertEqual(self.certr.update(body=messages_test.CERT),
self.client.check_cert(self.certr))
# TODO: split here and separate test
self.response.headers['Location'] = 'foo'
self.assertRaises(
errors.UnexpectedUpdate, self.client.check_cert, self.certr)
def test_check_cert_missing_location(self):
self.response.content = CERT_DER
self.assertRaises(
errors.ClientError, self.client.check_cert, self.certr)
def test_refresh(self):
self.client.check_cert = mock.MagicMock()
self.assertEqual(
self.client.check_cert(self.certr), self.client.refresh(self.certr))
def test_fetch_chain_no_up_link(self):
self.assertEqual([], self.client.fetch_chain(self.certr.update(
cert_chain_uri=None)))
def test_fetch_chain_single(self):
# pylint: disable=protected-access
self.client._get_cert = mock.MagicMock()
self.client._get_cert.return_value = (
mock.MagicMock(links={}), "certificate")
self.assertEqual([self.client._get_cert(self.certr.cert_chain_uri)[1]],
self.client.fetch_chain(self.certr))
def test_fetch_chain_max(self):
# pylint: disable=protected-access
up_response = mock.MagicMock(links={'up': {'url': 'http://cert'}})
noup_response = mock.MagicMock(links={})
self.client._get_cert = mock.MagicMock()
self.client._get_cert.side_effect = [
(up_response, "cert")] * 9 + [(noup_response, "last_cert")]
chain = self.client.fetch_chain(self.certr, max_length=10)
self.assertEqual(chain, ["cert"] * 9 + ["last_cert"])
def test_fetch_chain_too_many(self): # recursive
# pylint: disable=protected-access
response = mock.MagicMock(links={'up': {'url': 'http://cert'}})
self.client._get_cert = mock.MagicMock()
self.client._get_cert.return_value = (response, "certificate")
self.assertRaises(errors.Error, self.client.fetch_chain, self.certr)
def test_revoke(self):
self.client.revoke(self.certr.body, self.rsn)
self.net.post.assert_called_once_with(
self.directory[messages.Revocation], mock.ANY, acme_version=1)
def test_revocation_payload(self):
obj = messages.Revocation(certificate=self.certr.body, reason=self.rsn)
self.assertIn('reason', obj.to_partial_json().keys())
self.assertEqual(self.rsn, obj.to_partial_json()['reason'])
def test_revoke_bad_status_raises_error(self):
self.response.status_code = http_client.METHOD_NOT_ALLOWED
self.assertRaises(
errors.ClientError,
self.client.revoke,
self.certr,
self.rsn)
class ClientV2Test(ClientTestBase):
"""Tests for acme.client.ClientV2."""
def setUp(self):
super().setUp()
self.directory = DIRECTORY_V2
from acme.client import ClientV2
self.client = ClientV2(self.directory, self.net)
self.new_reg = self.new_reg.update(terms_of_service_agreed=True)
@ -752,11 +102,40 @@ class ClientV2Test(ClientTestBase):
self.assertEqual(self.regr, self.client.new_account(self.new_reg))
def test_new_account_tos_link(self):
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
'terms-of-service': {'url': 'https://www.letsencrypt-demo.org/tos'},
})
self.assertEqual(self.client.new_account(self.new_reg).terms_of_service,
'https://www.letsencrypt-demo.org/tos')
def test_new_account_conflict(self):
self.response.status_code = http_client.OK
self.response.headers['Location'] = self.regr.uri
self.assertRaises(errors.ConflictError, self.client.new_account, self.new_reg)
def test_deactivate_account(self):
deactivated_regr = self.regr.update(
body=self.regr.body.update(status='deactivated'))
self.response.json.return_value = deactivated_regr.body.to_json()
self.response.status_code = http_client.OK
self.response.headers['Location'] = self.regr.uri
self.assertEqual(self.client.deactivate_registration(self.regr), deactivated_regr)
def test_deactivate_authorization(self):
deactivated_authz = self.authzr.update(
body=self.authzr.body.update(status=messages.STATUS_DEACTIVATED))
self.response.json.return_value = deactivated_authz.body.to_json()
authzr = self.client.deactivate_authorization(self.authzr)
self.assertEqual(deactivated_authz.body, authzr.body)
self.assertEqual(self.client.net.post.call_count, 1)
self.assertIn(self.authzr.uri, self.net.post.call_args_list[0][0])
def test_new_order(self):
order_response = copy.deepcopy(self.response)
order_response.status_code = http_client.CREATED
@ -775,6 +154,20 @@ class ClientV2Test(ClientTestBase):
mock_post_as_get.side_effect = (authz_response, authz_response2)
self.assertEqual(self.client.new_order(CSR_MIXED_PEM), self.orderr)
def test_answer_challege(self):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.to_json()
chall_response = challenges.DNSResponse(validation=None)
self.client.answer_challenge(self.challr.body, chall_response)
self.assertRaises(errors.UnexpectedUpdate, self.client.answer_challenge,
self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self):
self.assertRaises(
errors.ClientError, self.client.answer_challenge,
self.challr.body, challenges.DNSResponse(validation=None))
@mock.patch('acme.client.datetime')
def test_poll_and_finalize(self, mock_datetime):
mock_datetime.datetime.now.return_value = datetime.datetime(2018, 2, 15)
@ -821,6 +214,11 @@ class ClientV2Test(ClientTestBase):
self.authz.to_json(), self.authz2.to_json(), updated_authz2.to_json())
self.assertEqual(self.client.poll_authorizations(self.orderr, deadline), updated_orderr)
def test_poll_unexpected_update(self):
updated_authz = self.authz.update(identifier=self.identifier.update(value='foo'))
self.response.json.return_value = updated_authz.to_json()
self.assertRaises(errors.UnexpectedUpdate, self.client.poll, self.authzr)
def test_finalize_order_success(self):
updated_order = self.order.update(
certificate='https://www.letsencrypt-demo.org/acme/cert/',
@ -872,9 +270,9 @@ class ClientV2Test(ClientTestBase):
deadline = datetime.datetime(9999, 9, 9)
resp = self.client.finalize_order(self.orderr, deadline, fetch_alternative_chains=True)
self.net.post.assert_any_call('https://example.com/acme/cert/1',
mock.ANY, acme_version=2, new_nonce_url=mock.ANY)
mock.ANY, new_nonce_url=mock.ANY)
self.net.post.assert_any_call('https://example.com/acme/cert/2',
mock.ANY, acme_version=2, new_nonce_url=mock.ANY)
mock.ANY, new_nonce_url=mock.ANY)
self.assertEqual(resp, updated_orderr)
del self.response.headers['Link']
@ -884,8 +282,15 @@ class ClientV2Test(ClientTestBase):
def test_revoke(self):
self.client.revoke(messages_test.CERT, self.rsn)
self.net.post.assert_called_once_with(
self.directory["revokeCert"], mock.ANY, acme_version=2,
new_nonce_url=DIRECTORY_V2['newNonce'])
self.directory["revokeCert"], mock.ANY, new_nonce_url=DIRECTORY_V2['newNonce'])
def test_revoke_bad_status_raises_error(self):
self.response.status_code = http_client.METHOD_NOT_ALLOWED
self.assertRaises(
errors.ClientError,
self.client.revoke,
messages_test.CERT,
self.rsn)
def test_update_registration(self):
# "Instance of 'Field' has no to_json/update member" bug:
@ -916,6 +321,11 @@ class ClientV2Test(ClientTestBase):
def test_external_account_required_default(self):
self.assertFalse(self.client.external_account_required())
def test_query_registration_client(self):
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = 'https://www.letsencrypt-demo.org/acme/reg/1'
self.assertEqual(self.regr, self.client.query_registration(self.regr))
def test_post_as_get(self):
with mock.patch('acme.client.ClientV2._authzr_from_response') as mock_client:
mock_client.return_value = self.authzr2
@ -923,12 +333,64 @@ class ClientV2Test(ClientTestBase):
self.client.poll(self.authzr2) # pylint: disable=protected-access
self.client.net.post.assert_called_once_with(
self.authzr2.uri, None, acme_version=2,
self.authzr2.uri, None,
new_nonce_url='https://www.letsencrypt-demo.org/acme/new-nonce')
self.client.net.get.assert_not_called()
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.assertEqual(
datetime.datetime(1999, 12, 31, 23, 59, 59),
self.client.retry_after(response=self.response, default=10))
class MockJSONDeSerializable(VersionedLEACMEMixin, jose.JSONDeSerializable):
@mock.patch('acme.client.datetime')
def test_retry_after_invalid(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = 'foooo'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_overflow(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
dt_mock.datetime.side_effect = datetime.datetime
self.response.headers['Retry-After'] = "Tue, 116 Feb 2016 11:50:00 MST"
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_seconds(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50'
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 50),
self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime')
def test_retry_after_missing(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.assertEqual(
datetime.datetime(2015, 3, 27, 0, 0, 10),
self.client.retry_after(response=self.response, default=10))
def test_get_directory(self):
self.response.json.return_value = DIRECTORY_V2.to_json()
self.assertEqual(
DIRECTORY_V2.to_partial_json(),
ClientV2.get_directory('https://example.com/dir', self.net).to_partial_json())
class MockJSONDeSerializable(jose.JSONDeSerializable):
# pylint: disable=missing-docstring
def __init__(self, value):
self.value = value
@ -963,8 +425,7 @@ class ClientNetworkTest(unittest.TestCase):
def test_wrap_in_jws(self):
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url",
acme_version=1)
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url")
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
self.assertEqual(jws.signature.combined.nonce, b'Tg')
@ -973,8 +434,7 @@ class ClientNetworkTest(unittest.TestCase):
self.net.account = {'uri': 'acct-uri'}
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url",
acme_version=2)
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url")
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
self.assertEqual(jws.signature.combined.nonce, b'Tg')
@ -1080,14 +540,13 @@ class ClientNetworkTest(unittest.TestCase):
self.net.session = mock.MagicMock()
self.net.session.request.return_value = mock.MagicMock(
ok=True, status_code=http_client.OK,
headers={"Content-Type": "application/pkix-cert"},
content=b"hi")
# pylint: disable=protected-access
self.net._send_request('HEAD', 'http://example.com/', 'foo',
timeout=mock.ANY, bar='baz')
timeout=mock.ANY, bar='baz', headers={'Accept': 'application/pkix-cert'})
mock_logger.debug.assert_called_with(
'Received response:\nHTTP %d\n%s\n\n%s', 200,
'Content-Type: application/pkix-cert', b'aGk=')
'', b'aGk=')
def test_send_request_post(self):
self.net.session = mock.MagicMock()
@ -1260,13 +719,13 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
'uri', self.obj, content_type=self.content_type))
self.assertTrue(self.response.checked)
self.net._wrap_in_jws.assert_called_once_with(
self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1)
self.obj, jose.b64decode(self.all_nonces.pop()), "uri")
self.available_nonces = []
self.assertRaises(errors.MissingNonce, self.net.post,
'uri', self.obj, content_type=self.content_type)
self.net._wrap_in_jws.assert_called_with(
self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1)
self.obj, jose.b64decode(self.all_nonces.pop()), "uri")
def test_post_wrong_initial_nonce(self): # HEAD
self.available_nonces = [b'f', jose.b64encode(b'good')]
@ -1324,14 +783,13 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
check_response = mock.MagicMock()
self.net._check_response = check_response
self.assertRaises(errors.ClientError, self.net.post, 'uri',
self.obj, content_type=self.content_type, acme_version=2,
self.obj, content_type=self.content_type,
new_nonce_url='new_nonce_uri')
self.assertEqual(check_response.call_count, 1)
def test_new_nonce_uri_removed(self):
self.content_type = None
self.net.post('uri', self.obj, content_type=None,
acme_version=2, new_nonce_url='new_nonce_uri')
self.net.post('uri', self.obj, content_type=None, new_nonce_url='new_nonce_uri')
class ClientNetworkSourceAddressBindingTest(unittest.TestCase):

View file

@ -54,19 +54,5 @@ class RFC3339FieldTest(unittest.TestCase):
jose.DeserializationError, RFC3339Field.default_decoder, '')
class ResourceTest(unittest.TestCase):
"""Tests for acme.fields.Resource."""
def setUp(self):
from acme.fields import Resource
self.field = Resource('x')
def test_decode_good(self):
self.assertEqual('x', self.field.decode('x'))
def test_decode_wrong(self):
self.assertRaises(jose.DeserializationError, self.field.decode, 'y')
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,30 +0,0 @@
"""Tests for acme.magic_typing."""
import sys
import unittest
import warnings
from unittest import mock
class MagicTypingTest(unittest.TestCase):
"""Tests for acme.magic_typing."""
def test_import_success(self):
try:
import typing as temp_typing
except ImportError: # pragma: no cover
temp_typing = None # pragma: no cover
typing_class_mock = mock.MagicMock()
text_mock = mock.MagicMock()
typing_class_mock.Text = text_mock
sys.modules['typing'] = typing_class_mock
if 'acme.magic_typing' in sys.modules:
del sys.modules['acme.magic_typing'] # pragma: no cover
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
from acme.magic_typing import Text
self.assertEqual(Text, text_mock)
del sys.modules['acme.magic_typing']
sys.modules['typing'] = temp_typing
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -134,8 +134,8 @@ class DirectoryTest(unittest.TestCase):
def setUp(self):
from acme.messages import Directory
self.dir = Directory({
'new-reg': 'reg',
mock.MagicMock(resource_type='new-cert'): 'cert',
'newReg': 'reg',
'newCert': 'cert',
'meta': Directory.Meta(
terms_of_service='https://example.com/acme/terms',
website='https://www.example.com/',
@ -148,26 +148,23 @@ class DirectoryTest(unittest.TestCase):
Directory({'foo': 'bar'})
def test_getitem(self):
self.assertEqual('reg', self.dir['new-reg'])
from acme.messages import NewRegistration
self.assertEqual('reg', self.dir[NewRegistration])
self.assertEqual('reg', self.dir[NewRegistration()])
self.assertEqual('reg', self.dir['newReg'])
def test_getitem_fails_with_key_error(self):
self.assertRaises(KeyError, self.dir.__getitem__, 'foo')
def test_getattr(self):
self.assertEqual('reg', self.dir.new_reg)
self.assertEqual('reg', self.dir.newReg)
def test_getattr_fails_with_attribute_error(self):
self.assertRaises(AttributeError, self.dir.__getattr__, 'foo')
def test_to_json(self):
self.assertEqual(self.dir.to_json(), {
'new-reg': 'reg',
'new-cert': 'cert',
'newReg': 'reg',
'newCert': 'cert',
'meta': {
'terms-of-service': 'https://example.com/acme/terms',
'termsOfService': 'https://example.com/acme/terms',
'website': 'https://www.example.com/',
'caaIdentities': ['example.com'],
},
@ -287,7 +284,7 @@ class UpdateRegistrationTest(unittest.TestCase):
def test_empty(self):
from acme.messages import UpdateRegistration
jstring = '{"resource": "reg"}'
self.assertEqual(jstring, UpdateRegistration().json_dumps())
self.assertEqual('{}', UpdateRegistration().json_dumps())
self.assertEqual(
UpdateRegistration(), UpdateRegistration.json_loads(jstring))
@ -335,7 +332,7 @@ class ChallengeBodyTest(unittest.TestCase):
error=error)
self.jobj_to = {
'uri': 'http://challb',
'url': 'http://challb',
'status': self.status,
'type': 'dns',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
@ -382,20 +379,17 @@ class AuthorizationTest(unittest.TestCase):
chall=challenges.DNS(
token=b'DGyRejmCefe7v4NfDGDKfA')),
)
combinations = ((0,), (1,))
from acme.messages import Authorization
from acme.messages import Identifier
from acme.messages import IDENTIFIER_FQDN
identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com')
self.authz = Authorization(
identifier=identifier, combinations=combinations,
challenges=self.challbs)
identifier=identifier, challenges=self.challbs)
self.jobj_from = {
'identifier': identifier.to_json(),
'challenges': [challb.to_json() for challb in self.challbs],
'combinations': combinations,
}
def test_from_json(self):
@ -406,12 +400,6 @@ class AuthorizationTest(unittest.TestCase):
from acme.messages import Authorization
hash(Authorization.from_json(self.jobj_from))
def test_resolved_combinations(self):
self.assertEqual(self.authz.resolved_combinations, (
(self.challbs[0],),
(self.challbs[1],),
))
class AuthorizationResourceTest(unittest.TestCase):
"""Tests for acme.messages.AuthorizationResource."""
@ -502,7 +490,6 @@ class JWSPayloadRFC8555Compliant(unittest.TestCase):
from acme.messages import NewAuthorization
new_order = NewAuthorization()
new_order.le_acme_version = 2
jobj = new_order.json_dumps(indent=2).encode()
# RFC8555 states that JWS bodies must not have a resource field.

View file

@ -20,7 +20,7 @@ import pytz
from acme import fields as acme_fields
from acme import messages
from acme.client import ClientBase
from acme.client import ClientV2
from certbot import configuration
from certbot import errors
from certbot import interfaces
@ -114,7 +114,7 @@ class AccountMemoryStorage(interfaces.AccountStorage):
def find_all(self) -> List[Account]:
return list(self.accounts.values())
def save(self, account: Account, client: ClientBase) -> None:
def save(self, account: Account, client: ClientV2) -> None:
if account.id in self.accounts:
logger.debug("Overwriting account: %s", account.id)
self.accounts[account.id] = account
@ -243,11 +243,11 @@ class AccountFileStorage(interfaces.AccountStorage):
def load(self, account_id: str) -> Account:
return self._load_for_server_path(account_id, self.config.server_path)
def save(self, account: Account, client: ClientBase) -> None:
def save(self, account: Account, client: ClientV2) -> None:
"""Create a new account.
:param Account account: account to create
:param ClientBase client: ACME client associated to the account
:param ClientV2 client: ACME client associated to the account
"""
try:
@ -258,11 +258,11 @@ class AccountFileStorage(interfaces.AccountStorage):
except IOError as error:
raise errors.AccountStorageError(error)
def update_regr(self, account: Account, client: ClientBase) -> None:
def update_regr(self, account: Account, client: ClientV2) -> None:
"""Update the registration resource.
:param Account account: account to update
:param ClientBase client: ACME client associated to the account
:param ClientV2 client: ACME client associated to the account
"""
try:
@ -358,7 +358,7 @@ class AccountFileStorage(interfaces.AccountStorage):
with util.safe_open(self._key_path(dir_path), "w", chmod=0o400) as key_file:
key_file.write(account.key.json_dumps())
def _update_regr(self, account: Account, acme: ClientBase, dir_path: str) -> None:
def _update_regr(self, account: Account, acme: ClientV2, dir_path: str) -> None:
with open(self._regr_path(dir_path), "w") as regr_file:
regr = account.regr
# If we have a value for new-authz, save it for forwards

View file

@ -36,7 +36,7 @@ class AuthHandler:
:class:`~acme.challenges.Challenge` types
:type auth: certbot.interfaces.Authenticator
:ivar acme.client.BackwardsCompatibleClientV2 acme_client: ACME client API.
:ivar acme.client.ClientV2 acme_client: ACME client API.
:ivar account: Client's Account
:type account: :class:`certbot._internal.account.Account`
@ -226,15 +226,10 @@ class AuthHandler:
logger.info("Performing the following challenges:")
for authzr in pending_authzrs:
authzr_challenges = authzr.body.challenges
if self.acme.acme_version == 1:
combinations = authzr.body.combinations
else:
combinations = tuple((i,) for i in range(len(authzr_challenges)))
path = gen_challenge_path(
authzr_challenges,
self._get_chall_pref(authzr.body.identifier.value),
combinations)
self._get_chall_pref(authzr.body.identifier.value))
achalls.extend(self._challenge_factory(authzr, path))
@ -387,12 +382,9 @@ def challb_to_achall(challb: messages.ChallengeBody, account_key: josepy.JWK,
def gen_challenge_path(challbs: List[messages.ChallengeBody],
preferences: List[Type[challenges.Challenge]],
combinations: Tuple[Tuple[int, ...], ...]) -> Tuple[int, ...]:
preferences: List[Type[challenges.Challenge]]) -> Tuple[int, ...]:
"""Generate a plan to get authority over the identity.
.. todo:: This can be possibly be rewritten to use resolved_combinations.
:param tuple challbs: A tuple of challenges
(:class:`acme.messages.Challenge`) from
:class:`acme.messages.AuthorizationResource` to be
@ -402,10 +394,6 @@ def gen_challenge_path(challbs: List[messages.ChallengeBody],
:param list preferences: List of challenge preferences for domain
(:class:`acme.challenges.Challenge` subclasses)
:param tuple combinations: A collection of sets of challenges from
:class:`acme.messages.Challenge`, each of which would
be sufficient to prove possession of the identifier.
:returns: list of indices from ``challenges``.
:rtype: list
@ -413,21 +401,6 @@ def gen_challenge_path(challbs: List[messages.ChallengeBody],
path cannot be created that satisfies the CA given the preferences and
combinations.
"""
if combinations:
return _find_smart_path(challbs, preferences, combinations)
return _find_dumb_path(challbs, preferences)
def _find_smart_path(challbs: List[messages.ChallengeBody],
preferences: List[Type[challenges.Challenge]],
combinations: Tuple[Tuple[int, ...], ...]
) -> Tuple[int, ...]:
"""Find challenge path with server hints.
Can be called if combinations is included. Function uses a simple
ranking system to choose the combo with the lowest cost.
"""
chall_cost = {}
max_cost = 1
@ -441,6 +414,8 @@ def _find_smart_path(challbs: List[messages.ChallengeBody],
# Set above completing all of the available challenges
best_combo_cost = max_cost
combinations = tuple((i,) for i in range(len(challbs)))
combo_total = 0
for combo in combinations:
for challenge_index in combo:
@ -459,28 +434,6 @@ def _find_smart_path(challbs: List[messages.ChallengeBody],
return best_combo
def _find_dumb_path(challbs: List[messages.ChallengeBody],
preferences: List[Type[challenges.Challenge]]) -> Tuple[int, ...]:
"""Find challenge path without server hints.
Should be called if the combinations hint is not included by the
server. This function either returns a path containing all
challenges provided by the CA or raises an exception.
"""
path = []
for i, challb in enumerate(challbs):
# supported is set to True if the challenge type is supported
supported = next((True for pref_c in preferences
if isinstance(challb.chall, pref_c)), False)
if supported:
path.append(i)
else:
raise _report_no_chall_path(challbs)
return tuple(path)
def _report_no_chall_path(challbs: List[messages.ChallengeBody]) -> errors.AuthorizationError:
"""Logs and return a raisable error reporting that no satisfiable chall path exists.

View file

@ -10,7 +10,6 @@ from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
import warnings
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key
@ -70,16 +69,8 @@ def acme_from_config_key(config: configuration.NamespaceConfig, key: jose.JWK,
verify_ssl=(not config.no_verify_ssl),
user_agent=determine_user_agent(config))
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
client = acme_client.BackwardsCompatibleClientV2(net, key, config.server)
if client.acme_version == 1:
logger.warning(
"Certbot is configured to use an ACMEv1 server (%s). ACMEv1 support is deprecated"
" and will soon be removed. See https://community.letsencrypt.org/t/143839 for "
"more information.", config.server)
return cast(acme_client.ClientV2, client)
directory = acme_client.ClientV2.get_directory(config.server, net)
return acme_client.ClientV2(directory, net)
def determine_user_agent(config: configuration.NamespaceConfig) -> str:
@ -256,18 +247,13 @@ def perform_registration(acme: acme_client.ClientV2, config: configuration.Names
" Please use --eab-kid and --eab-hmac-key.")
raise errors.Error(msg)
tos = acme.directory.meta.terms_of_service
if tos_cb and tos:
tos_cb(tos)
try:
newreg = messages.NewRegistration.from_data(
email=config.email, external_account_binding=eab)
# Until ACME v1 support is removed from Certbot, we actually need the provided
# ACME client to be a wrapper of type BackwardsCompatibleClientV2.
# TODO: Remove this cast and rewrite the logic when the client is actually a ClientV2
try:
return cast(acme_client.BackwardsCompatibleClientV2,
acme).new_account_and_tos(newreg, tos_cb)
except AttributeError:
raise errors.Error("The ACME client must be an instance of "
"acme.client.BackwardsCompatibleClientV2")
return acme.new_account(messages.NewRegistration.from_data(
email=config.email, terms_of_service_agreed=True, external_account_binding=eab))
except messages.Error as e:
if e.code in ("invalidEmail", "invalidContact"):
if config.noninteractive_mode:
@ -291,8 +277,8 @@ class Client:
:ivar .Authenticator auth: Prepared (`.Authenticator.prepare`)
authenticator that can solve ACME challenges.
:ivar .Installer installer: Installer.
:ivar acme.client.BackwardsCompatibleClientV2 acme: Optional ACME
client API handle. You might already have one from `register`.
:ivar acme.client.ClientV2 acme: Optional ACME client API handle. You might
already have one from `register`.
"""

View file

@ -18,7 +18,7 @@ import zope.interface
from acme.challenges import Challenge
from acme.challenges import ChallengeResponse
from acme.client import ClientBase
from acme.client import ClientV2
from certbot import configuration
from certbot.achallenges import AnnotatedChallenge
@ -53,7 +53,7 @@ class AccountStorage(metaclass=ABCMeta):
raise NotImplementedError()
@abstractmethod
def save(self, account: 'Account', client: ClientBase) -> None: # pragma: no cover
def save(self, account: 'Account', client: ClientV2) -> None: # pragma: no cover
"""Save account.
:raises .AccountStorageError: if account could not be saved

View file

@ -3,7 +3,6 @@ import datetime
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Tuple
import josepy as jose
@ -24,12 +23,6 @@ DNS01_2 = challenges.DNS01(token=b"cafecafecafecafecafecafe0feedbac")
CHALLENGES = [HTTP01, DNS01]
def gen_combos(challbs: Iterable[messages.ChallengeBody]) -> Tuple[Tuple[int], ...]:
"""Generate natural combinations for challbs."""
# completing a single DV challenge satisfies the CA
return tuple((i,) for i, _ in enumerate(challbs))
def chall_to_challb(chall: challenges.Challenge, status: messages.Status) -> messages.ChallengeBody:
"""Return ChallengeBody from Challenge."""
kwargs = {
@ -61,15 +54,13 @@ ACHALLENGES = [HTTP01_A, DNS01_A]
def gen_authzr(authz_status: messages.Status, domain: str, challs: Iterable[challenges.Challenge],
statuses: Iterable[messages.Status],
combos: bool = True) -> messages.AuthorizationResource:
statuses: Iterable[messages.Status]) -> messages.AuthorizationResource:
"""Generate an authorization resource.
:param authz_status: Status object
:type authz_status: :class:`acme.messages.Status`
:param list challs: Challenge objects
:param list statuses: status of each challenge object
:param bool combos: Whether or not to add combinations
"""
challbs = tuple(
@ -81,8 +72,6 @@ def gen_authzr(authz_status: messages.Status, domain: str, challs: Iterable[chal
typ=messages.IDENTIFIER_FQDN, value=domain),
"challenges": challbs,
}
if combos:
authz_kwargs.update({"combinations": gen_combos(challbs)})
if authz_status == messages.STATUS_VALID:
authz_kwargs.update({
"status": authz_status,

View file

@ -33,7 +33,7 @@ class ChallengeFactoryTest(unittest.TestCase):
self.authzr = acme_util.gen_authzr(
messages.STATUS_PENDING, "test", acme_util.CHALLENGES,
[messages.STATUS_PENDING] * 6, False)
[messages.STATUS_PENDING] * 6)
def test_all(self):
achalls = self.handler._challenge_factory(
@ -81,7 +81,6 @@ class HandleAuthorizationsTest(unittest.TestCase):
self.mock_account = mock.MagicMock()
self.mock_net = mock.MagicMock(spec=acme_client.ClientV2)
self.mock_net.acme_version = 1
self.mock_net.retry_after.side_effect = acme_client.ClientV2.retry_after
self.handler = AuthHandler(
@ -92,8 +91,8 @@ class HandleAuthorizationsTest(unittest.TestCase):
def tearDown(self):
logging.disable(logging.NOTSET)
def _test_name1_http_01_1_common(self, combos):
authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=combos)
def _test_name1_http_01_1_common(self):
authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)
mock_order = mock.MagicMock(authorizations=[authzr])
self.mock_net.poll.side_effect = _gen_mock_on_poll(retry=1, wait_value=30)
@ -117,39 +116,14 @@ class HandleAuthorizationsTest(unittest.TestCase):
self.assertEqual(len(authzr), 1)
def test_name1_http_01_1_acme_1(self):
self._test_name1_http_01_1_common(combos=True)
def test_name1_http_01_1_acme_2(self):
self.mock_net.acme_version = 2
self._test_name1_http_01_1_common(combos=False)
def test_name1_http_01_1_dns_1_acme_1(self):
self.mock_net.poll.side_effect = _gen_mock_on_poll()
self.mock_auth.get_chall_pref.return_value.append(challenges.DNS01)
authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=False)
mock_order = mock.MagicMock(authorizations=[authzr])
authzr = self.handler.handle_authorizations(mock_order, self.mock_config)
self.assertEqual(self.mock_net.answer_challenge.call_count, 2)
self.assertEqual(self.mock_net.poll.call_count, 1)
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
# Test if list first element is http-01, use typ because it is an achall
for achall in self.mock_auth.cleanup.call_args[0][0]:
self.assertIn(achall.typ, ["http-01", "dns-01"])
# Length of authorizations list
self.assertEqual(len(authzr), 1)
self._test_name1_http_01_1_common()
def test_name1_http_01_1_dns_1_acme_2(self):
self.mock_net.acme_version = 2
self.mock_net.poll.side_effect = _gen_mock_on_poll()
self.mock_auth.get_chall_pref.return_value.append(challenges.DNS01)
authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=False)
authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)
mock_order = mock.MagicMock(authorizations=[authzr])
authzr = self.handler.handle_authorizations(mock_order, self.mock_config)
@ -165,7 +139,7 @@ class HandleAuthorizationsTest(unittest.TestCase):
# Length of authorizations list
self.assertEqual(len(authzr), 1)
def _test_name3_http_01_3_common(self, combos):
def test_name3_http_01_3_common_acme_2(self):
authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES),
gen_dom_authzr(domain="1", challs=acme_util.CHALLENGES),
gen_dom_authzr(domain="2", challs=acme_util.CHALLENGES)]
@ -183,13 +157,6 @@ class HandleAuthorizationsTest(unittest.TestCase):
self.assertEqual(len(authzr), 3)
def test_name3_http_01_3_common_acme_1(self):
self._test_name3_http_01_3_common(combos=True)
def test_name3_http_01_3_common_acme_2(self):
self.mock_net.acme_version = 2
self._test_name3_http_01_3_common(combos=False)
def test_debug_challenges(self):
config = mock.Mock(debug_challenges=True, verbose_count=0)
authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)]
@ -269,8 +236,8 @@ class HandleAuthorizationsTest(unittest.TestCase):
self.assertRaises(errors.AuthorizationError, self.handler.handle_authorizations,
mock_order, self.mock_config)
def _test_preferred_challenge_choice_common(self, combos):
authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=combos)]
def test_preferred_challenge_choice_common_acme_2(self):
authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)]
mock_order = mock.MagicMock(authorizations=authzrs)
self.mock_auth.get_chall_pref.return_value.append(challenges.HTTP01)
@ -285,28 +252,14 @@ class HandleAuthorizationsTest(unittest.TestCase):
self.assertEqual(
self.mock_auth.cleanup.call_args[0][0][0].typ, "http-01")
def test_preferred_challenge_choice_common_acme_1(self):
self._test_preferred_challenge_choice_common(combos=True)
def test_preferred_challenge_choice_common_acme_2(self):
self.mock_net.acme_version = 2
self._test_preferred_challenge_choice_common(combos=False)
def _test_preferred_challenges_not_supported_common(self, combos):
authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=combos)]
def test_preferred_challenges_not_supported_acme_2(self):
authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)]
mock_order = mock.MagicMock(authorizations=authzrs)
self.handler.pref_challs.append(challenges.DNS01.typ)
self.assertRaises(
errors.AuthorizationError, self.handler.handle_authorizations,
mock_order, self.mock_config)
def test_preferred_challenges_not_supported_acme_1(self):
self._test_preferred_challenges_not_supported_common(combos=True)
def test_preferred_challenges_not_supported_acme_2(self):
self.mock_net.acme_version = 2
self._test_preferred_challenges_not_supported_common(combos=False)
def test_dns_only_challenge_not_supported(self):
authzrs = [gen_dom_authzr(domain="0", challs=[acme_util.DNS01])]
mock_order = mock.MagicMock(authorizations=authzrs)
@ -317,7 +270,7 @@ class HandleAuthorizationsTest(unittest.TestCase):
def test_perform_error(self):
self.mock_auth.perform.side_effect = errors.AuthorizationError
authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=True)
authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)
mock_order = mock.MagicMock(authorizations=[authzr])
self.assertRaises(errors.AuthorizationError, self.handler.handle_authorizations,
mock_order, self.mock_config)
@ -392,7 +345,7 @@ class HandleAuthorizationsTest(unittest.TestCase):
authzr = acme_util.gen_authzr(
messages.STATUS_PENDING, "0",
[acme_util.DNS01],
[messages.STATUS_PENDING], False)
[messages.STATUS_PENDING])
mock_order = mock.MagicMock(authorizations=[authzr])
self.assertRaises(
errors.AuthorizationError, self.handler.handle_authorizations,
@ -404,7 +357,7 @@ class HandleAuthorizationsTest(unittest.TestCase):
authzr = acme_util.gen_authzr(
messages.STATUS_VALID, "0",
[acme_util.DNS01],
[messages.STATUS_VALID], False)
[messages.STATUS_VALID])
mock_order = mock.MagicMock(authorizations=[authzr])
self.handler.handle_authorizations(mock_order, self.mock_config)
@ -426,7 +379,7 @@ class HandleAuthorizationsTest(unittest.TestCase):
("is_valid_but_will_fail", messages.STATUS_VALID)]
to_deactivate = [acme_util.gen_authzr(a[1], a[0], [acme_util.HTTP01],
[a[1], False]) for a in to_deactivate]
[a[1]]) for a in to_deactivate]
orderr = mock.MagicMock(authorizations=to_deactivate)
self.mock_net.deactivate_authorization.side_effect = _mock_deactivate
@ -452,8 +405,7 @@ def _gen_mock_on_poll(status=messages.STATUS_VALID, retry=0, wait_value=1):
effective_status,
authzr.body.identifier.value,
[challb.chall for challb in authzr.body.challenges],
[effective_status] * len(authzr.body.challenges),
authzr.body.combinations)
[effective_status] * len(authzr.body.challenges))
return updated_azr, mock.MagicMock(headers={'Retry-After': str(wait_value)})
return _mock
@ -477,8 +429,6 @@ class ChallbToAchallTest(unittest.TestCase):
class GenChallengePathTest(unittest.TestCase):
"""Tests for certbot._internal.auth_handler.gen_challenge_path.
.. todo:: Add more tests for dumb_path... depending on what we want to do.
"""
def setUp(self):
logging.disable(logging.FATAL)
@ -487,34 +437,25 @@ class GenChallengePathTest(unittest.TestCase):
logging.disable(logging.NOTSET)
@classmethod
def _call(cls, challbs, preferences, combinations):
def _call(cls, challbs, preferences):
from certbot._internal.auth_handler import gen_challenge_path
return gen_challenge_path(challbs, preferences, combinations)
return gen_challenge_path(challbs, preferences)
def test_common_case(self):
"""Given DNS01 and HTTP01 with appropriate combos."""
challbs = (acme_util.DNS01_P, acme_util.HTTP01_P)
prefs = [challenges.DNS01, challenges.HTTP01]
combos = ((0,), (1,))
# Smart then trivial dumb path test
self.assertEqual(self._call(challbs, prefs, combos), (0,))
self.assertTrue(self._call(challbs, prefs, None))
# Rearrange order...
self.assertEqual(self._call(challbs[::-1], prefs, combos), (1,))
self.assertTrue(self._call(challbs[::-1], prefs, None))
self.assertEqual(self._call(challbs, prefs), (0,))
self.assertEqual(self._call(challbs[::-1], prefs), (1,))
def test_not_supported(self):
challbs = (acme_util.DNS01_P, acme_util.HTTP01_P)
challbs = (acme_util.DNS01_P,)
prefs = [challenges.HTTP01]
combos = ((0, 1),)
# smart path fails because no challs in perfs satisfies combos
# smart path fails because no challs in prefs satisfies combos
self.assertRaises(
errors.AuthorizationError, self._call, challbs, prefs, combos)
# dumb path fails because all challbs are not supported
self.assertRaises(
errors.AuthorizationError, self._call, challbs, prefs, None)
errors.AuthorizationError, self._call, challbs, prefs)
class ReportFailedAuthzrsTest(unittest.TestCase):
@ -615,11 +556,11 @@ def gen_auth_resp(chall_list):
for chall in chall_list]
def gen_dom_authzr(domain, challs, combos=True):
def gen_dom_authzr(domain, challs):
"""Generates new authzr for domains."""
return acme_util.gen_authzr(
messages.STATUS_PENDING, domain, challs,
[messages.STATUS_PENDING] * len(challs), combos)
[messages.STATUS_PENDING] * len(challs))
if __name__ == "__main__":

View file

@ -69,13 +69,13 @@ class RegisterTest(test_util.ConfigTestCase):
self.config.register_unsafely_without_email = False
self.config.email = "alias@example.com"
self.account_storage = account.AccountMemoryStorage()
self.tos_cb = mock.MagicMock()
with mock.patch("zope.component.provideUtility"):
display_obj.set_display(MagicMock())
def _call(self):
from certbot._internal.client import register
tos_cb = mock.MagicMock()
return register(self.config, self.account_storage, tos_cb)
return register(self.config, self.account_storage, self.tos_cb)
@staticmethod
def _public_key_mock():
@ -98,31 +98,42 @@ class RegisterTest(test_util.ConfigTestCase):
@staticmethod
@contextlib.contextmanager
def _patched_acme_client():
# This function is written this way to avoid deprecation warnings that
# are raised when BackwardsCompatibleClientV2 is accessed on the real
# acme.client module.
with mock.patch('certbot._internal.client.acme_client') as mock_acme_client:
yield mock_acme_client.BackwardsCompatibleClientV2
yield mock_acme_client.ClientV2
def test_no_tos(self):
with self._patched_acme_client() as mock_client:
mock_client.new_account_and_tos().terms_of_service = "http://tos"
mock_client.new_account().terms_of_service = "http://tos"
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.eff.prepare_subscription") as mock_prepare:
mock_client().new_account_and_tos.side_effect = errors.Error
mock_client().new_account.side_effect = errors.Error
self.assertRaises(errors.Error, self._call)
self.assertIs(mock_prepare.called, False)
mock_client().new_account_and_tos.side_effect = None
mock_client().new_account.side_effect = None
self._call()
self.assertIs(mock_prepare.called, True)
@mock.patch('certbot._internal.eff.prepare_subscription')
def test_empty_meta(self, unused_mock_prepare):
# Test that we can handle an ACME server which does not implement the 'meta'
# directory object (for terms-of-service handling).
with self._patched_acme_client() as mock_client:
from acme.messages import Directory
mock_client().directory = Directory.from_json({})
mock_client().external_account_required.side_effect = self._false_mock
self._call()
self.assertIs(self.tos_cb.called, False)
@test_util.patch_display_util()
def test_it(self, unused_mock_get_utility):
with self._patched_acme_client() as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.eff.handle_subscription"):
self._call()
self.assertIs(self.tos_cb.called, True)
@mock.patch("certbot._internal.client.display_ops.get_email")
def test_email_retry(self, mock_get_email):
@ -133,7 +144,7 @@ class RegisterTest(test_util.ConfigTestCase):
with self._patched_acme_client() as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.eff.prepare_subscription") as mock_prepare:
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
mock_client().new_account.side_effect = [mx_err, mock.MagicMock()]
self._call()
self.assertEqual(mock_get_email.call_count, 1)
self.assertIs(mock_prepare.called, True)
@ -146,7 +157,7 @@ class RegisterTest(test_util.ConfigTestCase):
with self._patched_acme_client() as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.eff.handle_subscription"):
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
mock_client().new_account.side_effect = [mx_err, mock.MagicMock()]
self.assertRaises(errors.Error, self._call)
def test_needs_email(self):
@ -176,7 +187,7 @@ class RegisterTest(test_util.ConfigTestCase):
# check Certbot did not ask the user to provide an email
self.assertIs(mock_get_email.called, False)
# check Certbot created an account with no email. Contact should return empty
self.assertFalse(mock_client().new_account_and_tos.call_args[0][0].contact)
self.assertFalse(mock_client().new_account.call_args[0][0].contact)
@test_util.patch_display_util()
def test_with_eab_arguments(self, unused_mock_get_utility):
@ -228,7 +239,7 @@ class RegisterTest(test_util.ConfigTestCase):
)
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.eff.handle_subscription") as mock_handle:
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
mock_client().new_account.side_effect = [mx_err, mock.MagicMock()]
self.assertRaises(messages.Error, self._call)
self.assertIs(mock_handle.called, False)
@ -245,7 +256,7 @@ class ClientTestCommon(test_util.ConfigTestCase):
from certbot._internal.client import Client
with mock.patch("certbot._internal.client.acme_client") as acme:
self.acme_client = acme.BackwardsCompatibleClientV2
self.acme_client = acme.ClientV2
self.acme = self.acme_client.return_value = mock.MagicMock()
self.client_network = acme.ClientNetwork
self.client = Client(

View file

@ -388,7 +388,7 @@ class RevokeTest(test_util.TempDirTestCase):
mock.patch('certbot._internal.main._determine_account'),
mock.patch('certbot._internal.main.display_ops.success_revocation')
]
self.mock_acme_client = patches[0].start().BackwardsCompatibleClientV2
self.mock_acme_client = patches[0].start().ClientV2
patches[1].start()
self.mock_determine_account = patches[2].start()
self.mock_success_revoke = patches[3].start()
@ -418,12 +418,19 @@ class RevokeTest(test_util.TempDirTestCase):
from certbot._internal.main import revoke
revoke(config, plugins)
def _mock_set_by_cli(self, mocked: mock.MagicMock, key: str, value: bool) -> None:
def set_by_cli(k: str) -> bool:
if key == k:
return value
return mock.DEFAULT
mocked.side_effect = set_by_cli
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.main.client.acme_client')
def test_revoke_with_reason(self, mock_acme_client,
mock_delete_if_appropriate):
mock_delete_if_appropriate.return_value = False
mock_revoke = mock_acme_client.BackwardsCompatibleClientV2().revoke
mock_revoke = mock_acme_client.ClientV2().revoke
expected = []
for reason, code in constants.REVOCATION_REASONS.items():
args = 'revoke --cert-path={0} --reason {1}'.format(self.tmp_cert_path, reason).split()
@ -438,42 +445,56 @@ class RevokeTest(test_util.TempDirTestCase):
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
def test_revoke_by_certname(self, unused_mock_renewal_file_for_certname,
mock_cert, mock_delete_if_appropriate):
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.cli.set_by_cli')
def test_revoke_by_certname(self, mock_set_by_cli, mock_acme_from_config,
unused_mock_renewal_file_for_certname, mock_cert,
mock_delete_if_appropriate):
self._mock_set_by_cli(mock_set_by_cli, "server", False)
mock_acme_from_config.return_value = self.mock_acme_client
mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path,
server="https://acme.example")
args = 'revoke --cert-name=example.com'.split()
mock_delete_if_appropriate.return_value = False
self._call(args)
self.mock_acme_client.assert_called_once_with(mock.ANY, mock.ANY, 'https://acme.example')
self.assertEqual(mock_acme_from_config.call_args_list[0][0][0].server,
'https://acme.example')
self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path)
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
def test_revoke_by_certname_and_server(self, unused_mock_renewal_file_for_certname,
mock_cert, mock_delete_if_appropriate):
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.cli.set_by_cli')
def test_revoke_by_certname_and_server(self, mock_set_by_cli, mock_acme_from_config,
unused_mock_renewal_file_for_certname, mock_cert,
mock_delete_if_appropriate):
"""Revoking with --server should use the server from the CLI"""
self._mock_set_by_cli(mock_set_by_cli, "server", True)
mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path,
server="https://acme.example")
args = 'revoke --cert-name=example.com --server https://other.example'.split()
mock_delete_if_appropriate.return_value = False
self._call(args)
self.mock_acme_client.assert_called_once_with(mock.ANY, mock.ANY, 'https://other.example')
self.assertEqual(mock_acme_from_config.call_args_list[0][0][0].server,
'https://other.example')
self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path)
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
def test_revoke_by_certname_empty_server(self, unused_mock_renewal_file_for_certname,
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.cli.set_by_cli')
def test_revoke_by_certname_empty_server(self, mock_set_by_cli, mock_acme_from_config,
unused_mock_renewal_file_for_certname,
mock_cert, mock_delete_if_appropriate):
"""Revoking with --cert-name where the lineage server is empty shouldn't crash """
mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path, server=None)
args = 'revoke --cert-name=example.com'.split()
mock_delete_if_appropriate.return_value = False
self._call(args)
self.mock_acme_client.assert_called_once_with(
mock.ANY, mock.ANY, constants.CLI_DEFAULTS['server'])
self.assertEqual(mock_acme_from_config.call_args_list[0][0][0].server,
constants.CLI_DEFAULTS['server'])
self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path)
@mock.patch('certbot._internal.main._delete_if_appropriate')
@ -1566,11 +1587,12 @@ class MainTest(test_util.ConfigTestCase):
self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH,
'--server', server, 'revoke'])
with open(RSA2048_KEY_PATH, 'rb') as f:
mock_acme_client.BackwardsCompatibleClientV2.assert_called_once_with(
mock.ANY, jose.JWK.load(f.read()), server)
self.assertEqual(mock_acme_client.ClientV2.call_count, 1)
self.assertEqual(mock_acme_client.ClientNetwork.call_args[0][0],
jose.JWK.load(f.read()))
with open(SS_CERT_PATH, 'rb') as f:
cert = crypto_util.pyopenssl_load_certificate(f.read())[0]
mock_revoke = mock_acme_client.BackwardsCompatibleClientV2().revoke
mock_revoke = mock_acme_client.ClientV2().revoke
mock_revoke.assert_called_once_with(
jose.ComparableX509(cert),
mock.ANY)