diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 9000b370a..97e3bed88 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -23,9 +23,6 @@ import requests from acme import crypto_util from acme import errors -from acme import fields -from acme.mixins import ResourceMixin -from acme.mixins import TypeMixin logger = logging.getLogger(__name__) @@ -47,12 +44,17 @@ class Challenge(jose.TypedJSONObjectWithFields): return UnrecognizedChallenge.from_json(jobj) -class ChallengeResponse(ResourceMixin, TypeMixin, jose.TypedJSONObjectWithFields): +class ChallengeResponse(jose.TypedJSONObjectWithFields): # _fields_to_partial_json """ACME challenge response.""" TYPES: Dict[str, Type['ChallengeResponse']] = {} - resource_type = 'challenge' - resource: str = fields.resource(resource_type) + + def to_partial_json(self) -> Dict[str, Any]: + # Removes the `type` field which is inserted by TypedJSONObjectWithFields.to_partial_json. + # This field breaks RFC8555 compliance. + jobj = super().to_partial_json() + jobj.pop(self.type_field_name, None) + return jobj class UnrecognizedChallenge(Challenge): diff --git a/acme/acme/client.py b/acme/acme/client.py index e1dc9040f..105e9298f 100644 --- a/acme/acme/client.py +++ b/acme/acme/client.py @@ -1,23 +1,13 @@ """ACME client API.""" -# pylint: disable=too-many-lines -# This pylint disable can be deleted once the deprecated ACMEv1 code is -# removed. import base64 -import collections import datetime from email.utils import parsedate_tz -import heapq import http.client as http_client import logging import re -import sys import time -from types import ModuleType from typing import Any -from typing import Callable from typing import cast -from typing import Dict -from typing import Iterable from typing import List from typing import Mapping from typing import Optional @@ -25,7 +15,6 @@ from typing import Set from typing import Text from typing import Tuple from typing import Union -import warnings import josepy as jose import OpenSSL @@ -39,572 +28,13 @@ from acme import crypto_util from acme import errors from acme import jws from acme import messages -from acme.mixins import VersionedLEACMEMixin logger = logging.getLogger(__name__) DEFAULT_NETWORK_TIMEOUT = 45 -DER_CONTENT_TYPE = 'application/pkix-cert' - -class ClientBase: - """ACME client base object. - - :ivar messages.Directory directory: - :ivar .ClientNetwork net: Client network. - :ivar int acme_version: ACME protocol version. 1 or 2. - """ - def __init__(self, directory: messages.Directory, net: 'ClientNetwork', - acme_version: int) -> None: - """Initialize. - - :param .messages.Directory directory: Directory Resource - :param .ClientNetwork net: Client network. - :param int acme_version: ACME protocol version. 1 or 2. - """ - self.directory = directory - self.net = net - self.acme_version = acme_version - - @classmethod - def _regr_from_response(cls, response: requests.Response, uri: Optional[str] = None, - terms_of_service: Optional[str] = None - ) -> messages.RegistrationResource: - if 'terms-of-service' in response.links: - terms_of_service = response.links['terms-of-service']['url'] - - return messages.RegistrationResource( - body=messages.Registration.from_json(response.json()), - uri=response.headers.get('Location', uri), - terms_of_service=terms_of_service) - - def _send_recv_regr(self, regr: messages.RegistrationResource, - body: messages.Registration) -> messages.RegistrationResource: - response = self._post(regr.uri, body) - - # TODO: Boulder returns httplib.ACCEPTED - #assert response.status_code == httplib.OK - - # TODO: Boulder does not set Location or Link on update - # (c.f. acme-spec #94) - - return self._regr_from_response( - response, uri=regr.uri, - terms_of_service=regr.terms_of_service) - - def _post(self, *args: Any, **kwargs: Any) -> requests.Response: - """Wrapper around self.net.post that adds the acme_version. - - """ - kwargs.setdefault('acme_version', self.acme_version) - if hasattr(self.directory, 'newNonce'): - kwargs.setdefault('new_nonce_url', getattr(self.directory, 'newNonce')) - return self.net.post(*args, **kwargs) - - def update_registration(self, regr: messages.RegistrationResource, - update: Optional[messages.Registration] = None - ) -> messages.RegistrationResource: - """Update registration. - - :param messages.RegistrationResource regr: Registration Resource. - :param messages.Registration update: Updated body of the - resource. If not provided, body will be taken from `regr`. - - :returns: Updated Registration Resource. - :rtype: `.RegistrationResource` - - """ - update = regr.body if update is None else update - body = messages.UpdateRegistration(**dict(update)) - updated_regr = self._send_recv_regr(regr, body=body) - self.net.account = updated_regr - return updated_regr - - def deactivate_registration(self, regr: messages.RegistrationResource - ) -> messages.RegistrationResource: - """Deactivate registration. - - :param messages.RegistrationResource regr: The Registration Resource - to be deactivated. - - :returns: The Registration resource that was deactivated. - :rtype: `.RegistrationResource` - - """ - return self.update_registration(regr, messages.Registration.from_json( - {"status": "deactivated", "contact": None})) - - def deactivate_authorization(self, - authzr: messages.AuthorizationResource - ) -> messages.AuthorizationResource: - """Deactivate authorization. - - :param messages.AuthorizationResource authzr: The Authorization resource - to be deactivated. - - :returns: The Authorization resource that was deactivated. - :rtype: `.AuthorizationResource` - - """ - body = messages.UpdateAuthorization(status='deactivated') - response = self._post(authzr.uri, body) - return self._authzr_from_response(response, - authzr.body.identifier, authzr.uri) - - def _authzr_from_response(self, response: requests.Response, - identifier: Optional[messages.Identifier] = None, - uri: Optional[str] = None) -> messages.AuthorizationResource: - authzr = messages.AuthorizationResource( - body=messages.Authorization.from_json(response.json()), - uri=response.headers.get('Location', uri)) - if identifier is not None and authzr.body.identifier != identifier: # pylint: disable=no-member - raise errors.UnexpectedUpdate(authzr) - return authzr - - def answer_challenge(self, challb: messages.ChallengeBody, - response: challenges.ChallengeResponse) -> messages.ChallengeResource: - """Answer challenge. - - :param challb: Challenge Resource body. - :type challb: `.ChallengeBody` - - :param response: Corresponding Challenge response - :type response: `.challenges.ChallengeResponse` - - :returns: Challenge Resource with updated body. - :rtype: `.ChallengeResource` - - :raises .UnexpectedUpdate: - - """ - resp = self._post(challb.uri, response) - try: - authzr_uri = resp.links['up']['url'] - except KeyError: - raise errors.ClientError('"up" Link header missing') - challr = messages.ChallengeResource( - authzr_uri=authzr_uri, - body=messages.ChallengeBody.from_json(resp.json())) - # TODO: check that challr.uri == resp.headers['Location']? - if challr.uri != challb.uri: - raise errors.UnexpectedUpdate(challr.uri) - return challr - - @classmethod - def retry_after(cls, response: requests.Response, default: int) -> datetime.datetime: - """Compute next `poll` time based on response ``Retry-After`` header. - - Handles integers and various datestring formats per - https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.37 - - :param requests.Response response: Response from `poll`. - :param int default: Default value (in seconds), used when - ``Retry-After`` header is not present or invalid. - - :returns: Time point when next `poll` should be performed. - :rtype: `datetime.datetime` - - """ - retry_after = response.headers.get('Retry-After', str(default)) - try: - seconds = int(retry_after) - except ValueError: - # The RFC 2822 parser handles all of RFC 2616's cases in modern - # environments (primarily HTTP 1.1+ but also py27+) - when = parsedate_tz(retry_after) - if when is not None: - try: - tz_secs = datetime.timedelta(when[-1] if when[-1] is not None else 0) - return datetime.datetime(*when[:7]) - tz_secs - except (ValueError, OverflowError): - pass - seconds = default - - return datetime.datetime.now() + datetime.timedelta(seconds=seconds) - - def _revoke(self, cert: jose.ComparableX509, rsn: int, url: str) -> None: - """Revoke certificate. - - :param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in - `.ComparableX509` - - :param int rsn: Reason code for certificate revocation. - - :param str url: ACME URL to post to - - :raises .ClientError: If revocation is unsuccessful. - - """ - response = self._post(url, - messages.Revocation( - certificate=cert, - reason=rsn)) - if response.status_code != http_client.OK: - raise errors.ClientError( - 'Successful revocation must return HTTP OK status') - - -class Client(ClientBase): - """ACME client for a v1 API. - - .. deprecated:: 1.18.0 - Use :class:`ClientV2` instead. - - .. todo:: - Clean up raised error types hierarchy, document, and handle (wrap) - instances of `.DeserializationError` raised in `from_json()`. - - :ivar messages.Directory directory: - :ivar key: `josepy.JWK` (private) - :ivar alg: `josepy.JWASignature` - :ivar bool verify_ssl: Verify SSL certificates? - :ivar .ClientNetwork net: Client network. Useful for testing. If not - supplied, it will be initialized using `key`, `alg` and - `verify_ssl`. - - """ - - def __init__(self, directory: messages.Directory, key: jose.JWK, - alg: jose.JWASignature=jose.RS256, verify_ssl: bool = True, - net: Optional['ClientNetwork'] = None) -> None: - """Initialize. - - :param directory: Directory Resource (`.messages.Directory`) or - URI from which the resource will be downloaded. - - """ - self.key = key - if net is None: - net = ClientNetwork(key, alg=alg, verify_ssl=verify_ssl) - - if isinstance(directory, str): - directory = messages.Directory.from_json( - net.get(directory).json()) - super().__init__(directory=directory, - net=net, acme_version=1) - - def register(self, new_reg: Optional[messages.NewRegistration] = None - ) -> messages.RegistrationResource: - """Register. - - :param .NewRegistration new_reg: - - :returns: Registration Resource. - :rtype: `.RegistrationResource` - - """ - new_reg = messages.NewRegistration() if new_reg is None else new_reg - response = self._post(self.directory[new_reg], new_reg) - # TODO: handle errors - assert response.status_code == http_client.CREATED - - # "Instance of 'Field' has no key/contact member" bug: - return self._regr_from_response(response) - - def query_registration(self, regr: messages.RegistrationResource - ) -> messages.RegistrationResource: - """Query server about registration. - - :param messages.RegistrationResource regr: Existing Registration - Resource. - - """ - return self._send_recv_regr(regr, messages.UpdateRegistration()) - - def agree_to_tos(self, regr: messages.RegistrationResource - ) -> messages.RegistrationResource: - """Agree to the terms-of-service. - - Agree to the terms-of-service in a Registration Resource. - - :param regr: Registration Resource. - :type regr: `.RegistrationResource` - - :returns: Updated Registration Resource. - :rtype: `.RegistrationResource` - - """ - return self.update_registration( - regr.update(body=regr.body.update(agreement=regr.terms_of_service))) - - def request_challenges(self, identifier: messages.Identifier, - new_authzr_uri: Optional[str] = None) -> messages.AuthorizationResource: - """Request challenges. - - :param .messages.Identifier identifier: Identifier to be challenged. - :param str new_authzr_uri: Deprecated. Do not use. - - :returns: Authorization Resource. - :rtype: `.AuthorizationResource` - - :raises errors.WildcardUnsupportedError: if a wildcard is requested - - """ - if new_authzr_uri is not None: - logger.debug("request_challenges with new_authzr_uri deprecated.") - - if identifier.value.startswith("*"): - raise errors.WildcardUnsupportedError( - "Requesting an authorization for a wildcard name is" - " forbidden by this version of the ACME protocol.") - - new_authz = messages.NewAuthorization(identifier=identifier) - response = self._post(self.directory.new_authz, new_authz) - # TODO: handle errors - assert response.status_code == http_client.CREATED - return self._authzr_from_response(response, identifier) - - def request_domain_challenges(self, domain: str,new_authzr_uri: Optional[str] = None - ) -> messages.AuthorizationResource: - """Request challenges for domain names. - - This is simply a convenience function that wraps around - `request_challenges`, but works with domain names instead of - generic identifiers. See ``request_challenges`` for more - documentation. - - :param str domain: Domain name to be challenged. - :param str new_authzr_uri: Deprecated. Do not use. - - :returns: Authorization Resource. - :rtype: `.AuthorizationResource` - - :raises errors.WildcardUnsupportedError: if a wildcard is requested - - """ - return self.request_challenges(messages.Identifier( - typ=messages.IDENTIFIER_FQDN, value=domain), new_authzr_uri) - - def request_issuance(self, csr: jose.ComparableX509, - authzrs: Iterable[messages.AuthorizationResource] - ) -> messages.CertificateResource: - """Request issuance. - - :param csr: CSR - :type csr: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509` - - :param authzrs: `list` of `.AuthorizationResource` - - :returns: Issued certificate - :rtype: `.messages.CertificateResource` - - """ - assert authzrs, "Authorizations list is empty" - logger.debug("Requesting issuance...") - - # TODO: assert len(authzrs) == number of SANs - req = messages.CertificateRequest(csr=csr) - - content_type = DER_CONTENT_TYPE # TODO: add 'cert_type 'argument - response = self._post( - self.directory.new_cert, - req, - content_type=content_type, - headers={'Accept': content_type}) - - cert_chain_uri = response.links.get('up', {}).get('url') - - try: - uri = response.headers['Location'] - except KeyError: - raise errors.ClientError('"Location" Header missing') - - return messages.CertificateResource( - uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri, - body=jose.ComparableX509(OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_ASN1, response.content))) - - def poll(self, authzr: messages.AuthorizationResource - ) -> Tuple[messages.AuthorizationResource, requests.Response]: - """Poll Authorization Resource for status. - - :param authzr: Authorization Resource - :type authzr: `.AuthorizationResource` - - :returns: Updated Authorization Resource and HTTP response. - - :rtype: (`.AuthorizationResource`, `requests.Response`) - - """ - response = self.net.get(authzr.uri) - updated_authzr = self._authzr_from_response( - response, authzr.body.identifier, authzr.uri) - return updated_authzr, response - - def poll_and_request_issuance(self, csr: jose.ComparableX509, - authzrs: Iterable[messages.AuthorizationResource], - mintime: int = 5, max_attempts: int = 10 - ) -> Tuple[messages.CertificateResource, - Tuple[messages.AuthorizationResource, ...]]: - """Poll and request issuance. - - This function polls all provided Authorization Resource URIs - until all challenges are valid, respecting ``Retry-After`` HTTP - headers, and then calls `request_issuance`. - - :param .ComparableX509 csr: CSR (`OpenSSL.crypto.X509Req` - wrapped in `.ComparableX509`) - :param authzrs: `list` of `.AuthorizationResource` - :param int mintime: Minimum time before next attempt, used if - ``Retry-After`` is not present in the response. - :param int max_attempts: Maximum number of attempts (per - authorization) before `PollError` with non-empty ``waiting`` - is raised. - - :returns: ``(cert, updated_authzrs)`` `tuple` where ``cert`` is - the issued certificate (`.messages.CertificateResource`), - and ``updated_authzrs`` is a `tuple` consisting of updated - Authorization Resources (`.AuthorizationResource`) as - present in the responses from server, and in the same order - as the input ``authzrs``. - :rtype: `tuple` - - :raises PollError: in case of timeout or if some authorization - was marked by the CA as invalid - - """ - assert max_attempts > 0 - attempts: Dict[messages.AuthorizationResource, int] = collections.defaultdict(int) - exhausted = set() - - # priority queue with datetime.datetime (based on Retry-After) as key, - # and original Authorization Resource as value - waiting = [ - (datetime.datetime.now(), index, authzr) - for index, authzr in enumerate(authzrs) - ] - heapq.heapify(waiting) - # mapping between original Authorization Resource and the most - # recently updated one - updated = {authzr: authzr for authzr in authzrs} - - while waiting: - # find the smallest Retry-After, and sleep if necessary - when, index, authzr = heapq.heappop(waiting) - now = datetime.datetime.now() - if when > now: - seconds = (when - now).seconds - logger.debug('Sleeping for %d seconds', seconds) - time.sleep(seconds) - - # Note that we poll with the latest updated Authorization - # URI, which might have a different URI than initial one - updated_authzr, response = self.poll(updated[authzr]) - updated[authzr] = updated_authzr - - attempts[authzr] += 1 - if updated_authzr.body.status not in ( # pylint: disable=no-member - messages.STATUS_VALID, messages.STATUS_INVALID): - if attempts[authzr] < max_attempts: - # push back to the priority queue, with updated retry_after - heapq.heappush(waiting, (self.retry_after( - response, default=mintime), index, authzr)) - else: - exhausted.add(authzr) - - if exhausted or any(authzr.body.status == messages.STATUS_INVALID - for authzr in updated.values()): - raise errors.PollError(exhausted, updated) - - updated_authzrs = tuple(updated[authzr] for authzr in authzrs) - return self.request_issuance(csr, updated_authzrs), updated_authzrs - - def _get_cert(self, uri: str) -> Tuple[requests.Response, jose.ComparableX509]: - """Returns certificate from URI. - - :param str uri: URI of certificate - - :returns: tuple of the form - (response, :class:`josepy.util.ComparableX509`) - :rtype: tuple - - """ - content_type = DER_CONTENT_TYPE # TODO: make it a param - response = self.net.get(uri, headers={'Accept': content_type}, - content_type=content_type) - return response, jose.ComparableX509(OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_ASN1, response.content)) - - def check_cert(self, certr: messages.CertificateResource) -> messages.CertificateResource: - """Check for new cert. - - :param certr: Certificate Resource - :type certr: `.CertificateResource` - - :returns: Updated Certificate Resource. - :rtype: `.CertificateResource` - - """ - # TODO: acme-spec 5.1 table action should be renamed to - # "refresh cert", and this method integrated with self.refresh - response, cert = self._get_cert(certr.uri) - if 'Location' not in response.headers: - raise errors.ClientError('Location header missing') - if response.headers['Location'] != certr.uri: - raise errors.UnexpectedUpdate(response.text) - return certr.update(body=cert) - - def refresh(self, certr: messages.CertificateResource) -> messages.CertificateResource: - """Refresh certificate. - - :param certr: Certificate Resource - :type certr: `.CertificateResource` - - :returns: Updated Certificate Resource. - :rtype: `.CertificateResource` - - """ - # TODO: If a client sends a refresh request and the server is - # not willing to refresh the certificate, the server MUST - # respond with status code 403 (Forbidden) - return self.check_cert(certr) - - def fetch_chain(self, certr: messages.CertificateResource, - max_length: int = 10) -> List[jose.ComparableX509]: - """Fetch chain for certificate. - - :param .CertificateResource certr: Certificate Resource - :param int max_length: Maximum allowed length of the chain. - Note that each element in the certificate requires new - ``HTTP GET`` request, and the length of the chain is - controlled by the ACME CA. - - :raises errors.Error: if recursion exceeds `max_length` - - :returns: Certificate chain for the Certificate Resource. It is - a list ordered so that the first element is a signer of the - certificate from Certificate Resource. Will be empty if - ``cert_chain_uri`` is ``None``. - :rtype: `list` of `OpenSSL.crypto.X509` wrapped in `.ComparableX509` - - """ - chain: List[jose.ComparableX509] = [] - uri = certr.cert_chain_uri - while uri is not None and len(chain) < max_length: - response, cert = self._get_cert(uri) - uri = response.links.get('up', {}).get('url') - chain.append(cert) - if uri is not None: - raise errors.Error( - "Recursion limit reached. Didn't get {0}".format(uri)) - return chain - - def revoke(self, cert: jose.ComparableX509, rsn: int) -> None: - """Revoke certificate. - - :param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in - `.ComparableX509` - - :param int rsn: Reason code for certificate revocation. - - :raises .ClientError: If revocation is unsuccessful. - - """ - self._revoke(cert, rsn, self.directory[messages.Revocation]) - - -class ClientV2(ClientBase): +class ClientV2: """ACME client for a v2 API. :ivar messages.Directory directory: @@ -617,7 +47,8 @@ class ClientV2(ClientBase): :param .messages.Directory directory: Directory Resource :param .ClientNetwork net: Client network. """ - super().__init__(directory=directory, net=net, acme_version=2) + self.directory = directory + self.net = net def new_account(self, new_account: messages.NewRegistration) -> messages.RegistrationResource: """Register. @@ -664,8 +95,13 @@ class ClientV2(ClientBase): """ # https://github.com/certbot/certbot/issues/6155 - new_regr = self._get_v2_account(regr) - return super().update_registration(new_regr, update) + regr = self._get_v2_account(regr) + + update = regr.body if update is None else update + body = messages.UpdateRegistration(**dict(update)) + updated_regr = self._send_recv_regr(regr, body=body) + self.net.account = updated_regr + return updated_regr def _get_v2_account(self, regr: messages.RegistrationResource, update_body: bool = False ) -> messages.RegistrationResource: @@ -827,7 +263,9 @@ class ClientV2(ClientBase): def external_account_required(self) -> bool: """Checks if ACME server requires External Account Binding authentication.""" - return hasattr(self.directory, 'meta') and self.directory.meta.external_account_required + return hasattr(self.directory, 'meta') and \ + hasattr(self.directory.meta, 'external_account_required') and \ + self.directory.meta.external_account_required def _post_as_get(self, *args: Any, **kwargs: Any) -> requests.Response: """ @@ -853,138 +291,156 @@ class ClientV2(ClientBase): return [l['url'] for l in links if 'rel' in l and 'url' in l and l['rel'] == relation_type] + @classmethod + def get_directory(cls, url: str, net: 'ClientNetwork') -> messages.Directory: + """ + Retrieves the ACME directory (RFC 8555 section 7.1.1) from the ACME server. + :param str url: the URL where the ACME directory is available + :param ClientNetwork net: the ClientNetwork to use to make the request -class BackwardsCompatibleClientV2: - """ACME client wrapper that tends towards V2-style calls, but - supports V1 servers. + :returns: the ACME directory object + :rtype: messages.Directory + """ + return messages.Directory.from_json(net.get(url).json()) - .. deprecated:: 1.18.0 - Use :class:`ClientV2` instead. - - .. note:: While this class handles the majority of the differences - between versions of the ACME protocol, if you need to support an - ACME server based on version 3 or older of the IETF ACME draft - that uses combinations in authorizations (or lack thereof) to - signal that the client needs to complete something other than - any single challenge in the authorization to make it valid, the - user of this class needs to understand and handle these - differences themselves. This does not apply to either of Let's - Encrypt's endpoints where successfully completing any challenge - in an authorization will make it valid. - - :ivar int acme_version: 1 or 2, corresponding to the Let's Encrypt endpoint - :ivar .ClientBase client: either Client or ClientV2 - """ - - def __init__(self, net: 'ClientNetwork', key: jose.JWK, server: str) -> None: - directory = messages.Directory.from_json(net.get(server).json()) - self.acme_version = self._acme_version_from_directory(directory) - self.client: Union[Client, ClientV2] - if self.acme_version == 1: - self.client = Client(directory, key=key, net=net) - else: - self.client = ClientV2(directory, net=net) - - def __getattr__(self, name: str) -> Any: - return getattr(self.client, name) - - def new_account_and_tos(self, regr: messages.NewRegistration, - check_tos_cb: Optional[Callable[[str], None]] = None + @classmethod + def _regr_from_response(cls, response: requests.Response, uri: Optional[str] = None, + terms_of_service: Optional[str] = None ) -> messages.RegistrationResource: - """Combined register and agree_tos for V1, new_account for V2 + if 'terms-of-service' in response.links: + terms_of_service = response.links['terms-of-service']['url'] - :param .NewRegistration regr: - :param callable check_tos_cb: callback that raises an error if - the check does not work - """ - def _assess_tos(tos: str) -> None: - if check_tos_cb is not None: - check_tos_cb(tos) - if self.acme_version == 1: - client_v1 = cast(Client, self.client) - regr_res = client_v1.register(regr) - if regr_res.terms_of_service is not None: - _assess_tos(regr_res.terms_of_service) - return client_v1.agree_to_tos(regr_res) - return regr_res - else: - client_v2 = cast(ClientV2, self.client) - if ("terms_of_service" in client_v2.directory.meta and - client_v2.directory.meta.terms_of_service is not None): - _assess_tos(client_v2.directory.meta.terms_of_service) - regr = regr.update(terms_of_service_agreed=True) - return client_v2.new_account(regr) + return messages.RegistrationResource( + body=messages.Registration.from_json(response.json()), + uri=response.headers.get('Location', uri), + terms_of_service=terms_of_service) - def new_order(self, csr_pem: bytes) -> messages.OrderResource: - """Request a new Order object from the server. + def _send_recv_regr(self, regr: messages.RegistrationResource, + body: messages.Registration) -> messages.RegistrationResource: + response = self._post(regr.uri, body) - If using ACMEv1, returns a dummy OrderResource with only - the authorizations field filled in. + # TODO: Boulder returns httplib.ACCEPTED + #assert response.status_code == httplib.OK - :param bytes csr_pem: A CSR in PEM format. + # TODO: Boulder does not set Location or Link on update + # (c.f. acme-spec #94) - :returns: The newly created order. - :rtype: OrderResource + return self._regr_from_response( + response, uri=regr.uri, + terms_of_service=regr.terms_of_service) - :raises errors.WildcardUnsupportedError: if a wildcard domain is - requested but unsupported by the ACME version + def _post(self, *args: Any, **kwargs: Any) -> requests.Response: + """Wrapper around self.net.post that adds the newNonce URL. + + This is used to retry the request in case of a badNonce error. """ - if self.acme_version == 1: - client_v1 = cast(Client, self.client) - csr = OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr_pem) - # pylint: disable=protected-access - dnsNames = crypto_util._pyopenssl_cert_or_req_all_names(csr) - authorizations = [] - for domain in dnsNames: - authorizations.append(client_v1.request_domain_challenges(domain)) - return messages.OrderResource(authorizations=authorizations, csr_pem=csr_pem) - return cast(ClientV2, self.client).new_order(csr_pem) + kwargs.setdefault('new_nonce_url', getattr(self.directory, 'newNonce')) + return self.net.post(*args, **kwargs) - def finalize_order(self, orderr: messages.OrderResource, deadline: datetime.datetime, - fetch_alternative_chains: bool = False) -> messages.OrderResource: - """Finalize an order and obtain a certificate. + def deactivate_registration(self, regr: messages.RegistrationResource + ) -> messages.RegistrationResource: + """Deactivate registration. - :param messages.OrderResource orderr: order to finalize - :param datetime.datetime deadline: when to stop polling and timeout - :param bool fetch_alternative_chains: whether to also fetch alternative - certificate chains + :param messages.RegistrationResource regr: The Registration Resource + to be deactivated. - :returns: finalized order - :rtype: messages.OrderResource + :returns: The Registration resource that was deactivated. + :rtype: `.RegistrationResource` """ - if self.acme_version == 1: - client_v1 = cast(Client, self.client) - csr_pem = orderr.csr_pem - certr = client_v1.request_issuance( - jose.ComparableX509( - OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr_pem)), - orderr.authorizations) + return self.update_registration(regr, messages.Registration.from_json( + {"status": "deactivated", "contact": None})) - chain = None - while datetime.datetime.now() < deadline: + def deactivate_authorization(self, + authzr: messages.AuthorizationResource + ) -> messages.AuthorizationResource: + """Deactivate authorization. + + :param messages.AuthorizationResource authzr: The Authorization resource + to be deactivated. + + :returns: The Authorization resource that was deactivated. + :rtype: `.AuthorizationResource` + + """ + body = messages.UpdateAuthorization(status='deactivated') + response = self._post(authzr.uri, body) + return self._authzr_from_response(response, + authzr.body.identifier, authzr.uri) + + def _authzr_from_response(self, response: requests.Response, + identifier: Optional[messages.Identifier] = None, + uri: Optional[str] = None) -> messages.AuthorizationResource: + authzr = messages.AuthorizationResource( + body=messages.Authorization.from_json(response.json()), + uri=response.headers.get('Location', uri)) + if identifier is not None and authzr.body.identifier != identifier: # pylint: disable=no-member + raise errors.UnexpectedUpdate(authzr) + return authzr + + def answer_challenge(self, challb: messages.ChallengeBody, + response: challenges.ChallengeResponse) -> messages.ChallengeResource: + """Answer challenge. + + :param challb: Challenge Resource body. + :type challb: `.ChallengeBody` + + :param response: Corresponding Challenge response + :type response: `.challenges.ChallengeResponse` + + :returns: Challenge Resource with updated body. + :rtype: `.ChallengeResource` + + :raises .UnexpectedUpdate: + + """ + resp = self._post(challb.uri, response) + try: + authzr_uri = resp.links['up']['url'] + except KeyError: + raise errors.ClientError('"up" Link header missing') + challr = messages.ChallengeResource( + authzr_uri=authzr_uri, + body=messages.ChallengeBody.from_json(resp.json())) + # TODO: check that challr.uri == resp.headers['Location']? + if challr.uri != challb.uri: + raise errors.UnexpectedUpdate(challr.uri) + return challr + + @classmethod + def retry_after(cls, response: requests.Response, default: int) -> datetime.datetime: + """Compute next `poll` time based on response ``Retry-After`` header. + + Handles integers and various datestring formats per + https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.37 + + :param requests.Response response: Response from `poll`. + :param int default: Default value (in seconds), used when + ``Retry-After`` header is not present or invalid. + + :returns: Time point when next `poll` should be performed. + :rtype: `datetime.datetime` + + """ + retry_after = response.headers.get('Retry-After', str(default)) + try: + seconds = int(retry_after) + except ValueError: + # The RFC 2822 parser handles all of RFC 2616's cases in modern + # environments (primarily HTTP 1.1+ but also py27+) + when = parsedate_tz(retry_after) + if when is not None: try: - chain = client_v1.fetch_chain(certr) - break - except errors.Error: - time.sleep(1) + tz_secs = datetime.timedelta(when[-1] if when[-1] is not None else 0) + return datetime.datetime(*when[:7]) - tz_secs + except (ValueError, OverflowError): + pass + seconds = default - if chain is None: - raise errors.TimeoutError( - 'Failed to fetch chain. You should not deploy the generated ' - 'certificate, please rerun the command for a new one.') + return datetime.datetime.now() + datetime.timedelta(seconds=seconds) - cert = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, - cast(OpenSSL.crypto.X509, cast(jose.ComparableX509, certr.body).wrapped)).decode() - chain_str = crypto_util.dump_pyopenssl_chain(chain).decode() - - return orderr.update(fullchain_pem=(cert + chain_str)) - return cast(ClientV2, self.client).finalize_order( - orderr, deadline, fetch_alternative_chains) - - def revoke(self, cert: jose.ComparableX509, rsn: int) -> None: + def _revoke(self, cert: jose.ComparableX509, rsn: int, url: str) -> None: """Revoke certificate. :param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in @@ -992,23 +448,18 @@ class BackwardsCompatibleClientV2: :param int rsn: Reason code for certificate revocation. + :param str url: ACME URL to post to + :raises .ClientError: If revocation is unsuccessful. """ - self.client.revoke(cert, rsn) - - def _acme_version_from_directory(self, directory: messages.Directory) -> int: - if hasattr(directory, 'newNonce'): - return 2 - return 1 - - def external_account_required(self) -> bool: - """Checks if the server requires an external account for ACMEv2 servers. - - Always return False for ACMEv1 servers, as it doesn't use External Account Binding.""" - if self.acme_version == 1: - return False - return cast(ClientV2, self.client).external_account_required() + response = self._post(url, + messages.Revocation( + certificate=cert, + reason=rsn)) + if response.status_code != http_client.OK: + raise errors.ClientError( + 'Successful revocation must return HTTP OK status') class ClientNetwork: @@ -1025,8 +476,8 @@ class ClientNetwork: :param josepy.JWK key: Account private key :param messages.RegistrationResource account: Account object. Required if you are - planning to use .post() with acme_version=2 for anything other than - creating a new account; may be set later after registering. + planning to use .post() for anything other than creating a new account; + may be set later after registering. :param josepy.JWASignature alg: Algorithm to use in signing JWS. :param bool verify_ssl: Whether to verify certificates on SSL connections. :param str user_agent: String to send as User-Agent header. @@ -1062,8 +513,7 @@ class ClientNetwork: except Exception: # pylint: disable=broad-except pass - def _wrap_in_jws(self, obj: jose.JSONDeSerializable, nonce: str, url: str, - acme_version: int) -> str: + def _wrap_in_jws(self, obj: jose.JSONDeSerializable, nonce: str, url: str) -> str: """Wrap `JSONDeSerializable` object in JWS. .. todo:: Implement ``acmePath``. @@ -1074,20 +524,17 @@ class ClientNetwork: :rtype: str """ - if isinstance(obj, VersionedLEACMEMixin): - obj.le_acme_version = acme_version jobj = obj.json_dumps(indent=2).encode() if obj else b'' logger.debug('JWS payload:\n%s', jobj) kwargs = { "alg": self.alg, "nonce": nonce, + "url": url } - if acme_version == 2: - kwargs["url"] = url - # newAccount and revokeCert work without the kid - # newAccount must not have kid - if self.account is not None: - kwargs["kid"] = self.account["uri"] + # newAccount and revokeCert work without the kid + # newAccount must not have kid + if self.account is not None: + kwargs["kid"] = self.account["uri"] kwargs["key"] = self.key return jws.JWS.sign(jobj, **cast(Mapping[str, Any], kwargs)).json_dumps(indent=2) @@ -1201,15 +648,11 @@ class ClientNetwork: host, path, _err_no, err_msg = m.groups() raise ValueError(f"Requesting {host}{path}:{err_msg}") - # If the Content-Type is DER or an Accept header was sent in the - # request, the response may not be UTF-8 encoded. In this case, we - # don't set response.encoding and log the base64 response instead of - # raw bytes to keep binary data out of the logs. This code can be - # simplified to only check for an Accept header in the request when - # ACMEv1 support is dropped. + # If an Accept header was sent in the request, the response may not be + # UTF-8 encoded. In this case, we don't set response.encoding and log + # the base64 response instead of raw bytes to keep binary data out of the logs. debug_content: Union[bytes, str] - if (response.headers.get("Content-Type") == DER_CONTENT_TYPE or - "Accept" in kwargs["headers"]): + if "Accept" in kwargs["headers"]: debug_content = base64.b64encode(response.content) else: # We set response.encoding so response.text knows the response is @@ -1280,44 +723,11 @@ class ClientNetwork: raise def _post_once(self, url: str, obj: jose.JSONDeSerializable, - content_type: str = JOSE_CONTENT_TYPE, acme_version: int = 1, - **kwargs: Any) -> requests.Response: + content_type: str = JOSE_CONTENT_TYPE, **kwargs: Any) -> requests.Response: new_nonce_url = kwargs.pop('new_nonce_url', None) - data = self._wrap_in_jws(obj, self._get_nonce(url, new_nonce_url), url, acme_version) + data = self._wrap_in_jws(obj, self._get_nonce(url, new_nonce_url), url) kwargs.setdefault('headers', {'Content-Type': content_type}) response = self._send_request('POST', url, data=data, **kwargs) response = self._check_response(response, content_type=content_type) self._add_nonce(response) return response - - -# This class takes a similar approach to the cryptography project to deprecate attributes -# in public modules. See the _ModuleWithDeprecation class here: -# https://github.com/pyca/cryptography/blob/91105952739442a74582d3e62b3d2111365b0dc7/src/cryptography/utils.py#L129 -class _ClientDeprecationModule: - """ - Internal class delegating to a module, and displaying warnings when attributes - related to deprecated attributes in the acme.client module. - """ - def __init__(self, module: ModuleType) -> None: - self.__dict__['_module'] = module - - def __getattr__(self, attr: str) -> Any: - if attr in ('Client', 'BackwardsCompatibleClientV2'): - warnings.warn('The {0} attribute in acme.client is deprecated ' - 'and will be removed soon.'.format(attr), - DeprecationWarning, stacklevel=2) - return getattr(self._module, attr) - - def __setattr__(self, attr: str, value: Any) -> None: # pragma: no cover - setattr(self._module, attr, value) - - def __delattr__(self, attr: str) -> None: # pragma: no cover - delattr(self._module, attr) - - def __dir__(self) -> List[str]: # pragma: no cover - return ['_module'] + dir(self._module) - - -# Patching ourselves to warn about deprecation and planned removal of some elements in the module. -sys.modules[__name__] = cast(ModuleType, _ClientDeprecationModule(sys.modules[__name__])) diff --git a/acme/acme/fields.py b/acme/acme/fields.py index 191231df2..d642d10c5 100644 --- a/acme/acme/fields.py +++ b/acme/acme/fields.py @@ -51,22 +51,6 @@ class RFC3339Field(jose.Field): raise jose.DeserializationError(error) -class Resource(jose.Field): - """Resource MITM field.""" - - def __init__(self, resource_type: str, *args: Any, **kwargs: Any) -> None: - self.resource_type = resource_type - kwargs['default'] = resource_type - super().__init__('resource', *args, **kwargs) - - def decode(self, value: Any) -> Any: - if value != self.resource_type: - raise jose.DeserializationError( - 'Wrong resource type: {0} instead of {1}'.format( - value, self.resource_type)) - return value - - def fixed(json_name: str, value: Any) -> Any: """Generates a type-friendly Fixed field.""" return Fixed(json_name, value) @@ -75,8 +59,3 @@ def fixed(json_name: str, value: Any) -> Any: def rfc3339(json_name: str, omitempty: bool = False) -> Any: """Generates a type-friendly RFC3339 field.""" return RFC3339Field(json_name, omitempty=omitempty) - - -def resource(resource_type: str) -> Any: - """Generates a type-friendly Resource field.""" - return Resource(resource_type) diff --git a/acme/acme/magic_typing.py b/acme/acme/magic_typing.py deleted file mode 100644 index b05d2c4bc..000000000 --- a/acme/acme/magic_typing.py +++ /dev/null @@ -1,18 +0,0 @@ -"""Simple shim around the typing module. - -This was useful when this code supported Python 2 and typing wasn't always -available. This code is being kept for now for backwards compatibility. - -""" -import warnings -from typing import * # pylint: disable=wildcard-import, unused-wildcard-import -from typing import Any - -warnings.warn("acme.magic_typing is deprecated and will be removed in a future release.", - DeprecationWarning) - - -class TypingClass: - """Ignore import errors by getting anything""" - def __getattr__(self, name: str) -> Any: - return None # pragma: no cover diff --git a/acme/acme/messages.py b/acme/acme/messages.py index 9b9ef5de2..6c4e23815 100644 --- a/acme/acme/messages.py +++ b/acme/acme/messages.py @@ -11,9 +11,7 @@ from typing import MutableMapping from typing import Optional from typing import Tuple from typing import Type -from typing import TYPE_CHECKING from typing import TypeVar -from typing import Union import josepy as jose @@ -22,14 +20,8 @@ from acme import errors from acme import fields from acme import jws from acme import util -from acme.mixins import ResourceMixin -if TYPE_CHECKING: - from typing_extensions import Protocol # pragma: no cover -else: - Protocol = object -OLD_ERROR_PREFIX = "urn:acme:error:" ERROR_PREFIX = "urn:ietf:params:acme:error:" ERROR_CODES = { @@ -67,15 +59,13 @@ ERROR_CODES = { ERROR_TYPE_DESCRIPTIONS = {**{ ERROR_PREFIX + name: desc for name, desc in ERROR_CODES.items() -}, **{ # add errors with old prefix, deprecate me - OLD_ERROR_PREFIX + name: desc for name, desc in ERROR_CODES.items() }} def is_acme_error(err: BaseException) -> bool: """Check if argument is an ACME error.""" if isinstance(err, Error) and (err.typ is not None): - return (ERROR_PREFIX in err.typ) or (OLD_ERROR_PREFIX in err.typ) + return ERROR_PREFIX in err.typ return False @@ -223,25 +213,15 @@ STATUS_READY = Status('ready') STATUS_DEACTIVATED = Status('deactivated') -class HasResourceType(Protocol): - """ - Represents a class with a resource_type class parameter of type string. - """ - resource_type: str = NotImplemented - - -GenericHasResourceType = TypeVar("GenericHasResourceType", bound=HasResourceType) - - class Directory(jose.JSONDeSerializable): - """Directory.""" + """Directory. - _REGISTERED_TYPES: Dict[str, Type[HasResourceType]] = {} + Directory resources must be accessed by the exact field name in RFC8555 (section 9.7.5). + """ class Meta(jose.JSONObjectWithFields): """Directory Meta.""" - _terms_of_service: str = jose.field('terms-of-service', omitempty=True) - _terms_of_service_v2: str = jose.field('termsOfService', omitempty=True) + _terms_of_service: str = jose.field('termsOfService', omitempty=True) website: str = jose.field('website', omitempty=True) caa_identities: List[str] = jose.field('caaIdentities', omitempty=True) external_account_required: bool = jose.field('externalAccountRequired', omitempty=True) @@ -253,7 +233,7 @@ class Directory(jose.JSONDeSerializable): @property def terms_of_service(self) -> str: """URL for the CA TOS""" - return self._terms_of_service or self._terms_of_service_v2 + return self._terms_of_service def __iter__(self) -> Iterator[str]: # When iterating over fields, use the external name 'terms_of_service' instead of @@ -264,41 +244,23 @@ class Directory(jose.JSONDeSerializable): def _internal_name(self, name: str) -> str: return '_' + name if name == 'terms_of_service' else name - @classmethod - def _canon_key(cls, key: Union[str, HasResourceType, Type[HasResourceType]]) -> str: - if isinstance(key, str): - return key - return key.resource_type - - @classmethod - def register(cls, - resource_body_cls: Type[GenericHasResourceType]) -> Type[GenericHasResourceType]: - """Register resource.""" - resource_type = resource_body_cls.resource_type - assert resource_type not in cls._REGISTERED_TYPES - cls._REGISTERED_TYPES[resource_type] = resource_body_cls - return resource_body_cls - def __init__(self, jobj: Mapping[str, Any]) -> None: - canon_jobj = util.map_keys(jobj, self._canon_key) - # TODO: check that everything is an absolute URL; acme-spec is - # not clear on that - self._jobj = canon_jobj + self._jobj = jobj def __getattr__(self, name: str) -> Any: try: - return self[name.replace('_', '-')] + return self[name] except KeyError as error: raise AttributeError(str(error)) - def __getitem__(self, name: Union[str, HasResourceType, Type[HasResourceType]]) -> Any: + def __getitem__(self, name: str) -> Any: try: - return self._jobj[self._canon_key(name)] + return self._jobj[name] except KeyError: - raise KeyError('Directory field "' + self._canon_key(name) + '" not found') + raise KeyError(f'Directory field "{name}" not found') def to_partial_json(self) -> Dict[str, Any]: - return self._jobj + return util.map_keys(self._jobj, lambda k: k) @classmethod def from_json(cls, jobj: MutableMapping[str, Any]) -> 'Directory': @@ -459,17 +421,12 @@ class Registration(ResourceBody): return self._filter_contact(self.email_prefix) -@Directory.register -class NewRegistration(ResourceMixin, Registration): +class NewRegistration(Registration): """New registration.""" - resource_type = 'new-reg' - resource: str = fields.resource(resource_type) -class UpdateRegistration(ResourceMixin, Registration): +class UpdateRegistration(Registration): """Update registration.""" - resource_type = 'reg' - resource: str = fields.resource(resource_type) class RegistrationResource(ResourceWithURI): @@ -507,7 +464,6 @@ class ChallengeBody(ResourceBody): # challenge object supports either one, but should be accessed through the # name "uri". In Client.answer_challenge, whichever one is set will be # used. - _uri: str = jose.field('uri', omitempty=True, default=None) _url: str = jose.field('url', omitempty=True, default=None) status: Status = jose.field('status', decoder=Status.from_json, omitempty=True, default=STATUS_PENDING) @@ -536,7 +492,7 @@ class ChallengeBody(ResourceBody): @property def uri(self) -> str: """The URL of this challenge.""" - return self._url or self._uri + return self._url def __getattr__(self, name: str) -> Any: return getattr(self.chall, name) @@ -545,10 +501,10 @@ class ChallengeBody(ResourceBody): # When iterating over fields, use the external name 'uri' instead of # the internal '_uri'. for name in super().__iter__(): - yield name[1:] if name == '_uri' else name + yield 'uri' if name == '_url' else name def _internal_name(self, name: str) -> str: - return '_' + name if name == 'uri' else name + return '_url' if name == 'uri' else name class ChallengeResource(Resource): @@ -572,15 +528,12 @@ class Authorization(ResourceBody): :ivar acme.messages.Identifier identifier: :ivar list challenges: `list` of `.ChallengeBody` - :ivar tuple combinations: Challenge combinations (`tuple` of `tuple` - of `int`, as opposed to `list` of `list` from the spec). :ivar acme.messages.Status status: :ivar datetime.datetime expires: """ identifier: Identifier = jose.field('identifier', decoder=Identifier.from_json, omitempty=True) challenges: List[ChallengeBody] = jose.field('challenges', omitempty=True) - combinations: Tuple[Tuple[int, ...], ...] = jose.field('combinations', omitempty=True) status: Status = jose.field('status', omitempty=True, decoder=Status.from_json) # TODO: 'expires' is allowed for Authorization Resources in @@ -596,24 +549,13 @@ class Authorization(ResourceBody): def challenges(value: List[Dict[str, Any]]) -> Tuple[ChallengeBody, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring return tuple(ChallengeBody.from_json(chall) for chall in value) - @property - def resolved_combinations(self) -> Tuple[Tuple[ChallengeBody, ...], ...]: - """Combinations with challenges instead of indices.""" - return tuple(tuple(self.challenges[idx] for idx in combo) - for combo in self.combinations) # pylint: disable=not-an-iterable - -@Directory.register -class NewAuthorization(ResourceMixin, Authorization): +class NewAuthorization(Authorization): """New authorization.""" - resource_type = 'new-authz' - resource: str = fields.resource(resource_type) -class UpdateAuthorization(ResourceMixin, Authorization): +class UpdateAuthorization(Authorization): """Update authorization.""" - resource_type = 'authz' - resource: str = fields.resource(resource_type) class AuthorizationResource(ResourceWithURI): @@ -627,16 +569,13 @@ class AuthorizationResource(ResourceWithURI): new_cert_uri: str = jose.field('new_cert_uri', omitempty=True) -@Directory.register -class CertificateRequest(ResourceMixin, jose.JSONObjectWithFields): - """ACME new-cert request. +class CertificateRequest(jose.JSONObjectWithFields): + """ACME newOrder request. :ivar jose.ComparableX509 csr: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509` """ - resource_type = 'new-cert' - resource: str = fields.resource(resource_type) csr: jose.ComparableX509 = jose.field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr) @@ -653,16 +592,13 @@ class CertificateResource(ResourceWithURI): authzrs: Tuple[AuthorizationResource, ...] = jose.field('authzrs') -@Directory.register -class Revocation(ResourceMixin, jose.JSONObjectWithFields): +class Revocation(jose.JSONObjectWithFields): """Revocation message. :ivar jose.ComparableX509 certificate: `OpenSSL.crypto.X509` wrapped in `jose.ComparableX509` """ - resource_type = 'revoke-cert' - resource: str = fields.resource(resource_type) certificate: jose.ComparableX509 = jose.field( 'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert) reason: int = jose.field('reason') @@ -719,7 +655,5 @@ class OrderResource(ResourceWithURI): omitempty=True) -@Directory.register class NewOrder(Order): """New order.""" - resource_type = 'new-order' diff --git a/acme/acme/mixins.py b/acme/acme/mixins.py deleted file mode 100644 index e6e678d60..000000000 --- a/acme/acme/mixins.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Useful mixins for Challenge and Resource objects""" -from typing import Any -from typing import Dict - - -class VersionedLEACMEMixin: - """This mixin stores the version of Let's Encrypt's endpoint being used.""" - @property - def le_acme_version(self) -> int: - """Define the version of ACME protocol to use""" - return getattr(self, '_le_acme_version', 1) - - @le_acme_version.setter - def le_acme_version(self, version: int) -> None: - # We need to use object.__setattr__ to not depend on the specific implementation of - # __setattr__ in current class (eg. jose.TypedJSONObjectWithFields raises AttributeError - # for any attempt to set an attribute to make objects immutable). - object.__setattr__(self, '_le_acme_version', version) - - def __setattr__(self, key: str, value: Any) -> None: - if key == 'le_acme_version': - # Required for @property to operate properly. See comment above. - object.__setattr__(self, key, value) - else: - super().__setattr__(key, value) # pragma: no cover - - -class ResourceMixin(VersionedLEACMEMixin): - """ - This mixin generates a RFC8555 compliant JWS payload - by removing the `resource` field if needed (eg. ACME v2 protocol). - """ - def to_partial_json(self) -> Dict[str, Any]: - """See josepy.JSONDeserializable.to_partial_json()""" - return _safe_jobj_compliance(super(), - 'to_partial_json', 'resource') - - def fields_to_partial_json(self) -> Dict[str, Any]: - """See josepy.JSONObjectWithFields.fields_to_partial_json()""" - return _safe_jobj_compliance(super(), - 'fields_to_partial_json', 'resource') - - -class TypeMixin(VersionedLEACMEMixin): - """ - This mixin allows generation of a RFC8555 compliant JWS payload - by removing the `type` field if needed (eg. ACME v2 protocol). - """ - def to_partial_json(self) -> Dict[str, Any]: - """See josepy.JSONDeserializable.to_partial_json()""" - return _safe_jobj_compliance(super(), - 'to_partial_json', 'type') - - def fields_to_partial_json(self) -> Dict[str, Any]: - """See josepy.JSONObjectWithFields.fields_to_partial_json()""" - return _safe_jobj_compliance(super(), - 'fields_to_partial_json', 'type') - - -def _safe_jobj_compliance(instance: Any, jobj_method: str, - uncompliant_field: str) -> Dict[str, Any]: - if hasattr(instance, jobj_method): - jobj: Dict[str, Any] = getattr(instance, jobj_method)() - if instance.le_acme_version == 2: - jobj.pop(uncompliant_field, None) - return jobj - - raise AttributeError(f'Method {jobj_method}() is not implemented.') # pragma: no cover diff --git a/acme/examples/http01_example.py b/acme/examples/http01_example.py index 2dc197d09..ab62ecbcc 100644 --- a/acme/examples/http01_example.py +++ b/acme/examples/http01_example.py @@ -163,7 +163,7 @@ def example_http(): # Register account and accept TOS net = client.ClientNetwork(acc_key, user_agent=USER_AGENT) - directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json()) + directory = client.ClientV2.get_directory(DIRECTORY_URL, net) client_acme = client.ClientV2(directory, net=net) # Terms of Service URL is in client_acme.directory.meta.terms_of_service @@ -215,8 +215,7 @@ def example_http(): try: regr = client_acme.query_registration(regr) except errors.Error as err: - if err.typ == messages.OLD_ERROR_PREFIX + 'unauthorized' \ - or err.typ == messages.ERROR_PREFIX + 'unauthorized': + if err.typ == messages.ERROR_PREFIX + 'unauthorized': # Status is deactivated. pass raise diff --git a/acme/tests/challenges_test.py b/acme/tests/challenges_test.py index d7815a6c3..f9e886f64 100644 --- a/acme/tests/challenges_test.py +++ b/acme/tests/challenges_test.py @@ -92,8 +92,7 @@ class DNS01ResponseTest(unittest.TestCase): self.response = self.chall.response(KEY) def test_to_partial_json(self): - self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'}, - self.msg.to_partial_json()) + self.assertEqual({}, self.msg.to_partial_json()) def test_from_json(self): from acme.challenges import DNS01Response @@ -163,8 +162,7 @@ class HTTP01ResponseTest(unittest.TestCase): self.response = self.chall.response(KEY) def test_to_partial_json(self): - self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'}, - self.msg.to_partial_json()) + self.assertEqual({}, self.msg.to_partial_json()) def test_from_json(self): from acme.challenges import HTTP01Response @@ -274,8 +272,7 @@ class TLSALPN01ResponseTest(unittest.TestCase): } def test_to_partial_json(self): - self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'}, - self.response.to_partial_json()) + self.assertEqual({}, self.response.to_partial_json()) def test_from_json(self): from acme.challenges import TLSALPN01Response @@ -461,8 +458,6 @@ class DNSResponseTest(unittest.TestCase): from acme.challenges import DNSResponse self.msg = DNSResponse(validation=self.validation) self.jmsg_to = { - 'resource': 'challenge', - 'type': 'dns', 'validation': self.validation, } self.jmsg_from = { @@ -492,7 +487,6 @@ class JWSPayloadRFC8555Compliant(unittest.TestCase): from acme.challenges import HTTP01Response challenge_body = HTTP01Response() - challenge_body.le_acme_version = 2 jobj = challenge_body.json_dumps(indent=2).encode() # RFC8555 states that challenge responses must have an empty payload. diff --git a/acme/tests/client_test.py b/acme/tests/client_test.py index 7ce28b4fe..1b00e0b90 100644 --- a/acme/tests/client_test.py +++ b/acme/tests/client_test.py @@ -3,52 +3,37 @@ import copy import datetime import http.client as http_client -import ipaddress import json import unittest from typing import Dict from unittest import mock import josepy as jose -import OpenSSL import requests from acme import challenges from acme import errors from acme import jws as acme_jws from acme import messages -from acme.mixins import VersionedLEACMEMixin +from acme.client import ClientV2 import messages_test import test_util -CERT_DER = test_util.load_vector('cert.der') CERT_SAN_PEM = test_util.load_vector('cert-san.pem') -CSR_SAN_PEM = test_util.load_vector('csr-san.pem') CSR_MIXED_PEM = test_util.load_vector('csr-mixed.pem') KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem')) -KEY2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem')) - -DIRECTORY_V1 = messages.Directory({ - messages.NewRegistration: - 'https://www.letsencrypt-demo.org/acme/new-reg', - messages.Revocation: - 'https://www.letsencrypt-demo.org/acme/revoke-cert', - messages.NewAuthorization: - 'https://www.letsencrypt-demo.org/acme/new-authz', - messages.CertificateRequest: - 'https://www.letsencrypt-demo.org/acme/new-cert', -}) DIRECTORY_V2 = messages.Directory({ 'newAccount': 'https://www.letsencrypt-demo.org/acme/new-account', 'newNonce': 'https://www.letsencrypt-demo.org/acme/new-nonce', 'newOrder': 'https://www.letsencrypt-demo.org/acme/new-order', 'revokeCert': 'https://www.letsencrypt-demo.org/acme/revoke-cert', + 'meta': messages.Directory.Meta(), }) -class ClientTestBase(unittest.TestCase): - """Base for tests in acme.client.""" +class ClientV2Test(unittest.TestCase): + """Tests for acme.client.ClientV2.""" def setUp(self): self.response = mock.MagicMock( @@ -80,650 +65,15 @@ class ClientTestBase(unittest.TestCase): self.authz = messages.Authorization( identifier=messages.Identifier( typ=messages.IDENTIFIER_FQDN, value='example.com'), - challenges=(challb,), combinations=None) + challenges=(challb,)) self.authzr = messages.AuthorizationResource( body=self.authz, uri=authzr_uri) # Reason code for revocation self.rsn = 1 - -class BackwardsCompatibleClientV2Test(ClientTestBase): - """Tests for acme.client.BackwardsCompatibleClientV2.""" - - def setUp(self): - super().setUp() - # contains a loaded cert - self.certr = messages.CertificateResource( - body=messages_test.CERT) - - loaded = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, CERT_SAN_PEM) - wrapped = jose.ComparableX509(loaded) - self.chain = [wrapped, wrapped] - - self.cert_pem = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, messages_test.CERT.wrapped).decode() - - single_chain = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, loaded).decode() - self.chain_pem = single_chain + single_chain - - self.fullchain_pem = self.cert_pem + self.chain_pem - - self.orderr = messages.OrderResource( - csr_pem=CSR_SAN_PEM) - - def _init(self): - uri = 'http://www.letsencrypt-demo.org/directory' - from acme.client import BackwardsCompatibleClientV2 - return BackwardsCompatibleClientV2(net=self.net, - key=KEY, server=uri) - - def test_init_downloads_directory(self): - uri = 'http://www.letsencrypt-demo.org/directory' - from acme.client import BackwardsCompatibleClientV2 - BackwardsCompatibleClientV2(net=self.net, - key=KEY, server=uri) - self.net.get.assert_called_once_with(uri) - - def test_init_acme_version(self): - self.response.json.return_value = DIRECTORY_V1.to_json() - client = self._init() - self.assertEqual(client.acme_version, 1) - - self.response.json.return_value = DIRECTORY_V2.to_json() - client = self._init() - self.assertEqual(client.acme_version, 2) - - def test_query_registration_client_v2(self): - self.response.json.return_value = DIRECTORY_V2.to_json() - client = self._init() - self.response.json.return_value = self.regr.body.to_json() - self.response.headers = {'Location': 'https://www.letsencrypt-demo.org/acme/reg/1'} - self.assertEqual(self.regr, client.query_registration(self.regr)) - - def test_forwarding(self): - self.response.json.return_value = DIRECTORY_V1.to_json() - client = self._init() - self.assertEqual(client.directory, client.client.directory) - self.assertEqual(client.key, KEY) - self.assertEqual(client.deactivate_registration, client.client.deactivate_registration) - self.assertRaises(AttributeError, client.__getattr__, 'nonexistent') - self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos') - self.assertRaises(AttributeError, client.__getattr__, 'new_account') - - def test_new_account_and_tos(self): - # v2 no tos - self.response.json.return_value = DIRECTORY_V2.to_json() - with mock.patch('acme.client.ClientV2') as mock_client: - client = self._init() - client.new_account_and_tos(self.new_reg) - mock_client().new_account.assert_called_with(self.new_reg) - - # v2 tos good - with mock.patch('acme.client.ClientV2') as mock_client: - mock_client().directory.meta.__contains__.return_value = True - client = self._init() - client.new_account_and_tos(self.new_reg, lambda x: True) - mock_client().new_account.assert_called_with( - self.new_reg.update(terms_of_service_agreed=True)) - - # v2 tos bad - with mock.patch('acme.client.ClientV2') as mock_client: - mock_client().directory.meta.__contains__.return_value = True - client = self._init() - def _tos_cb(tos): - raise errors.Error - self.assertRaises(errors.Error, client.new_account_and_tos, - self.new_reg, _tos_cb) - mock_client().new_account.assert_not_called() - - # v1 yes tos - self.response.json.return_value = DIRECTORY_V1.to_json() - with mock.patch('acme.client.Client') as mock_client: - regr = mock.MagicMock(terms_of_service="TOS") - mock_client().register.return_value = regr - client = self._init() - client.new_account_and_tos(self.new_reg) - mock_client().register.assert_called_once_with(self.new_reg) - mock_client().agree_to_tos.assert_called_once_with(regr) - - # v1 no tos - with mock.patch('acme.client.Client') as mock_client: - regr = mock.MagicMock(terms_of_service=None) - mock_client().register.return_value = regr - client = self._init() - client.new_account_and_tos(self.new_reg) - mock_client().register.assert_called_once_with(self.new_reg) - mock_client().agree_to_tos.assert_not_called() - - @mock.patch('OpenSSL.crypto.load_certificate_request') - @mock.patch('acme.crypto_util._pyopenssl_cert_or_req_all_names') - def test_new_order_v1(self, mock__pyopenssl_cert_or_req_all_names, - unused_mock_load_certificate_request): - self.response.json.return_value = DIRECTORY_V1.to_json() - mock__pyopenssl_cert_or_req_all_names.return_value = ['example.com', 'www.example.com'] - mock_csr_pem = mock.MagicMock() - with mock.patch('acme.client.Client') as mock_client: - mock_client().request_domain_challenges.return_value = mock.sentinel.auth - client = self._init() - orderr = client.new_order(mock_csr_pem) - self.assertEqual(orderr.authorizations, [mock.sentinel.auth, mock.sentinel.auth]) - - def test_new_order_v2(self): - self.response.json.return_value = DIRECTORY_V2.to_json() - mock_csr_pem = mock.MagicMock() - with mock.patch('acme.client.ClientV2') as mock_client: - client = self._init() - client.new_order(mock_csr_pem) - mock_client().new_order.assert_called_once_with(mock_csr_pem) - - @mock.patch('acme.client.Client') - def test_finalize_order_v1_success(self, mock_client): - self.response.json.return_value = DIRECTORY_V1.to_json() - - mock_client().request_issuance.return_value = self.certr - mock_client().fetch_chain.return_value = self.chain - - deadline = datetime.datetime(9999, 9, 9) - client = self._init() - result = client.finalize_order(self.orderr, deadline) - self.assertEqual(result.fullchain_pem, self.fullchain_pem) - mock_client().fetch_chain.assert_called_once_with(self.certr) - - @mock.patch('acme.client.Client') - def test_finalize_order_v1_fetch_chain_error(self, mock_client): - self.response.json.return_value = DIRECTORY_V1.to_json() - - mock_client().request_issuance.return_value = self.certr - mock_client().fetch_chain.return_value = self.chain - mock_client().fetch_chain.side_effect = [errors.Error, self.chain] - - deadline = datetime.datetime(9999, 9, 9) - client = self._init() - result = client.finalize_order(self.orderr, deadline) - self.assertEqual(result.fullchain_pem, self.fullchain_pem) - self.assertEqual(mock_client().fetch_chain.call_count, 2) - - @mock.patch('acme.client.Client') - def test_finalize_order_v1_timeout(self, mock_client): - self.response.json.return_value = DIRECTORY_V1.to_json() - - mock_client().request_issuance.return_value = self.certr - - deadline = deadline = datetime.datetime.now() - datetime.timedelta(seconds=60) - client = self._init() - self.assertRaises(errors.TimeoutError, client.finalize_order, - self.orderr, deadline) - - def test_finalize_order_v2(self): - self.response.json.return_value = DIRECTORY_V2.to_json() - mock_orderr = mock.MagicMock() - mock_deadline = mock.MagicMock() - with mock.patch('acme.client.ClientV2') as mock_client: - client = self._init() - client.finalize_order(mock_orderr, mock_deadline) - mock_client().finalize_order.assert_called_once_with(mock_orderr, mock_deadline, False) - - def test_revoke(self): - self.response.json.return_value = DIRECTORY_V1.to_json() - with mock.patch('acme.client.Client') as mock_client: - client = self._init() - client.revoke(messages_test.CERT, self.rsn) - mock_client().revoke.assert_called_once_with(messages_test.CERT, self.rsn) - - self.response.json.return_value = DIRECTORY_V2.to_json() - with mock.patch('acme.client.ClientV2') as mock_client: - client = self._init() - client.revoke(messages_test.CERT, self.rsn) - mock_client().revoke.assert_called_once_with(messages_test.CERT, self.rsn) - - def test_update_registration(self): - self.response.json.return_value = DIRECTORY_V1.to_json() - with mock.patch('acme.client.Client') as mock_client: - client = self._init() - client.update_registration(mock.sentinel.regr, None) - mock_client().update_registration.assert_called_once_with(mock.sentinel.regr, None) - - # newNonce present means it will pick acme_version 2 - def test_external_account_required_true(self): - self.response.json.return_value = messages.Directory({ - 'newNonce': 'http://letsencrypt-test.com/acme/new-nonce', - 'meta': messages.Directory.Meta(external_account_required=True), - }).to_json() - - client = self._init() - - self.assertTrue(client.external_account_required()) - - # newNonce present means it will pick acme_version 2 - def test_external_account_required_false(self): - self.response.json.return_value = messages.Directory({ - 'newNonce': 'http://letsencrypt-test.com/acme/new-nonce', - 'meta': messages.Directory.Meta(external_account_required=False), - }).to_json() - - client = self._init() - - self.assertFalse(client.external_account_required()) - - def test_external_account_required_false_v1(self): - self.response.json.return_value = messages.Directory({ - 'meta': messages.Directory.Meta(external_account_required=False), - }).to_json() - - client = self._init() - - self.assertFalse(client.external_account_required()) - - -class ClientTest(ClientTestBase): - """Tests for acme.client.Client.""" - - def setUp(self): - super().setUp() - - self.directory = DIRECTORY_V1 - - # Registration - self.regr = self.regr.update( - terms_of_service='https://www.letsencrypt-demo.org/tos') - - # Request issuance - self.certr = messages.CertificateResource( - body=messages_test.CERT, authzrs=(self.authzr,), - uri='https://www.letsencrypt-demo.org/acme/cert/1', - cert_chain_uri='https://www.letsencrypt-demo.org/ca') - - from acme.client import Client - self.client = Client( - directory=self.directory, key=KEY, alg=jose.RS256, net=self.net) - - def test_init_downloads_directory(self): - uri = 'http://www.letsencrypt-demo.org/directory' - from acme.client import Client - self.client = Client( - directory=uri, key=KEY, alg=jose.RS256, net=self.net) - self.net.get.assert_called_once_with(uri) - - @mock.patch('acme.client.ClientNetwork') - def test_init_without_net(self, mock_net): - mock_net.return_value = mock.sentinel.net - alg = jose.RS256 - from acme.client import Client - self.client = Client( - directory=self.directory, key=KEY, alg=alg) - mock_net.called_once_with(KEY, alg=alg, verify_ssl=True) - self.assertEqual(self.client.net, mock.sentinel.net) - - def test_register(self): - # "Instance of 'Field' has no to_json/update member" bug: - self.response.status_code = http_client.CREATED - self.response.json.return_value = self.regr.body.to_json() - self.response.headers['Location'] = self.regr.uri - self.response.links.update({ - 'terms-of-service': {'url': self.regr.terms_of_service}, - }) - - self.assertEqual(self.regr, self.client.register(self.new_reg)) - # TODO: test POST call arguments - - def test_update_registration(self): - # "Instance of 'Field' has no to_json/update member" bug: - self.response.headers['Location'] = self.regr.uri - self.response.json.return_value = self.regr.body.to_json() - self.assertEqual(self.regr, self.client.update_registration(self.regr)) - # TODO: test POST call arguments - - # TODO: split here and separate test - self.response.json.return_value = self.regr.body.update( - contact=()).to_json() - - def test_deactivate_account(self): - self.response.headers['Location'] = self.regr.uri - self.response.json.return_value = self.regr.body.to_json() - self.assertEqual(self.regr, - self.client.deactivate_registration(self.regr)) - - def test_query_registration(self): - self.response.json.return_value = self.regr.body.to_json() - self.assertEqual(self.regr, self.client.query_registration(self.regr)) - - def test_agree_to_tos(self): - self.client.update_registration = mock.Mock() - self.client.agree_to_tos(self.regr) - regr = self.client.update_registration.call_args[0][0] - self.assertEqual(self.regr.terms_of_service, regr.body.agreement) - - def _prepare_response_for_request_challenges(self): - self.response.status_code = http_client.CREATED - self.response.headers['Location'] = self.authzr.uri - self.response.json.return_value = self.authz.to_json() - - def test_request_challenges(self): - self._prepare_response_for_request_challenges() - self.client.request_challenges(self.identifier) - self.net.post.assert_called_once_with( - self.directory.new_authz, - messages.NewAuthorization(identifier=self.identifier), - acme_version=1) - - def test_request_challenges_deprecated_arg(self): - self._prepare_response_for_request_challenges() - self.client.request_challenges(self.identifier, new_authzr_uri="hi") - self.net.post.assert_called_once_with( - self.directory.new_authz, - messages.NewAuthorization(identifier=self.identifier), - acme_version=1) - - def test_request_challenges_custom_uri(self): - self._prepare_response_for_request_challenges() - self.client.request_challenges(self.identifier) - self.net.post.assert_called_once_with( - 'https://www.letsencrypt-demo.org/acme/new-authz', mock.ANY, - acme_version=1) - - def test_request_challenges_unexpected_update(self): - self._prepare_response_for_request_challenges() - self.response.json.return_value = self.authz.update( - identifier=self.identifier.update(value='foo')).to_json() - self.assertRaises( - errors.UnexpectedUpdate, self.client.request_challenges, - self.identifier) - - def test_request_challenges_wildcard(self): - wildcard_identifier = messages.Identifier( - typ=messages.IDENTIFIER_FQDN, value='*.example.org') - self.assertRaises( - errors.WildcardUnsupportedError, self.client.request_challenges, - wildcard_identifier) - - def test_request_domain_challenges(self): - self.client.request_challenges = mock.MagicMock() - self.assertEqual( - self.client.request_challenges(self.identifier), - self.client.request_domain_challenges('example.com')) - - def test_answer_challenge(self): - self.response.links['up'] = {'url': self.challr.authzr_uri} - self.response.json.return_value = self.challr.body.to_json() - - chall_response = challenges.DNSResponse(validation=None) - - self.client.answer_challenge(self.challr.body, chall_response) - - # TODO: split here and separate test - self.assertRaises(errors.UnexpectedUpdate, self.client.answer_challenge, - self.challr.body.update(uri='foo'), chall_response) - - def test_answer_challenge_missing_next(self): - self.assertRaises( - errors.ClientError, self.client.answer_challenge, - self.challr.body, challenges.DNSResponse(validation=None)) - - def test_retry_after_date(self): - self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT' - self.assertEqual( - datetime.datetime(1999, 12, 31, 23, 59, 59), - self.client.retry_after(response=self.response, default=10)) - - @mock.patch('acme.client.datetime') - def test_retry_after_invalid(self, dt_mock): - dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) - dt_mock.timedelta = datetime.timedelta - - self.response.headers['Retry-After'] = 'foooo' - self.assertEqual( - datetime.datetime(2015, 3, 27, 0, 0, 10), - self.client.retry_after(response=self.response, default=10)) - - @mock.patch('acme.client.datetime') - def test_retry_after_overflow(self, dt_mock): - dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) - dt_mock.timedelta = datetime.timedelta - dt_mock.datetime.side_effect = datetime.datetime - - self.response.headers['Retry-After'] = "Tue, 116 Feb 2016 11:50:00 MST" - self.assertEqual( - datetime.datetime(2015, 3, 27, 0, 0, 10), - self.client.retry_after(response=self.response, default=10)) - - @mock.patch('acme.client.datetime') - def test_retry_after_seconds(self, dt_mock): - dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) - dt_mock.timedelta = datetime.timedelta - - self.response.headers['Retry-After'] = '50' - self.assertEqual( - datetime.datetime(2015, 3, 27, 0, 0, 50), - self.client.retry_after(response=self.response, default=10)) - - @mock.patch('acme.client.datetime') - def test_retry_after_missing(self, dt_mock): - dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) - dt_mock.timedelta = datetime.timedelta - - self.assertEqual( - datetime.datetime(2015, 3, 27, 0, 0, 10), - self.client.retry_after(response=self.response, default=10)) - - def test_poll(self): - self.response.json.return_value = self.authzr.body.to_json() - self.assertEqual((self.authzr, self.response), - self.client.poll(self.authzr)) - - # TODO: split here and separate test - self.response.json.return_value = self.authz.update( - identifier=self.identifier.update(value='foo')).to_json() - self.assertRaises( - errors.UnexpectedUpdate, self.client.poll, self.authzr) - - def test_request_issuance(self): - self.response.content = CERT_DER - self.response.headers['Location'] = self.certr.uri - self.response.links['up'] = {'url': self.certr.cert_chain_uri} - self.assertEqual(self.certr, self.client.request_issuance( - messages_test.CSR, (self.authzr,))) - # TODO: check POST args - - def test_request_issuance_missing_up(self): - self.response.content = CERT_DER - self.response.headers['Location'] = self.certr.uri - self.assertEqual( - self.certr.update(cert_chain_uri=None), - self.client.request_issuance(messages_test.CSR, (self.authzr,))) - - def test_request_issuance_missing_location(self): - self.assertRaises( - errors.ClientError, self.client.request_issuance, - messages_test.CSR, (self.authzr,)) - - @mock.patch('acme.client.datetime') - @mock.patch('acme.client.time') - def test_poll_and_request_issuance(self, time_mock, dt_mock): - # clock.dt | pylint: disable=no-member - clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27)) - - def sleep(seconds): - """increment clock""" - clock.dt += datetime.timedelta(seconds=seconds) - time_mock.sleep.side_effect = sleep - - def now(): - """return current clock value""" - return clock.dt - dt_mock.datetime.now.side_effect = now - dt_mock.timedelta = datetime.timedelta - - def poll(authzr): # pylint: disable=missing-docstring - # record poll start time based on the current clock value - authzr.times.append(clock.dt) - - # suppose it takes 2 seconds for server to produce the - # result, increment clock - clock.dt += datetime.timedelta(seconds=2) - - if len(authzr.retries) == 1: # no more retries - done = mock.MagicMock(uri=authzr.uri, times=authzr.times) - done.body.status = authzr.retries[0] - return done, [] - - # response (2nd result tuple element) is reduced to only - # Retry-After header contents represented as integer - # seconds; authzr.retries is a list of Retry-After - # headers, head(retries) is peeled of as a current - # Retry-After header, and tail(retries) is persisted for - # later poll() calls - return (mock.MagicMock(retries=authzr.retries[1:], - uri=authzr.uri + '.', times=authzr.times), - authzr.retries[0]) - self.client.poll = mock.MagicMock(side_effect=poll) - - mintime = 7 - - def retry_after(response, default): - # pylint: disable=missing-docstring - # check that poll_and_request_issuance correctly passes mintime - self.assertEqual(default, mintime) - return clock.dt + datetime.timedelta(seconds=response) - self.client.retry_after = mock.MagicMock(side_effect=retry_after) - - def request_issuance(csr, authzrs): # pylint: disable=missing-docstring - return csr, authzrs - self.client.request_issuance = mock.MagicMock( - side_effect=request_issuance) - - csr = mock.MagicMock() - authzrs = ( - mock.MagicMock(uri='a', times=[], retries=( - 8, 20, 30, messages.STATUS_VALID)), - mock.MagicMock(uri='b', times=[], retries=( - 5, messages.STATUS_VALID)), - ) - - cert, updated_authzrs = self.client.poll_and_request_issuance( - csr, authzrs, mintime=mintime, - # make sure that max_attempts is per-authorization, rather - # than global - max_attempts=max(len(authzrs[0].retries), len(authzrs[1].retries))) - self.assertIs(cert[0], csr) - self.assertIs(cert[1], updated_authzrs) - self.assertEqual(updated_authzrs[0].uri, 'a...') - self.assertEqual(updated_authzrs[1].uri, 'b.') - self.assertEqual(updated_authzrs[0].times, [ - datetime.datetime(2015, 3, 27), - # a is scheduled for 10, but b is polling [9..11), so it - # will be picked up as soon as b is finished, without - # additional sleeping - datetime.datetime(2015, 3, 27, 0, 0, 11), - datetime.datetime(2015, 3, 27, 0, 0, 33), - datetime.datetime(2015, 3, 27, 0, 1, 5), - ]) - self.assertEqual(updated_authzrs[1].times, [ - datetime.datetime(2015, 3, 27, 0, 0, 2), - datetime.datetime(2015, 3, 27, 0, 0, 9), - ]) - self.assertEqual(clock.dt, datetime.datetime(2015, 3, 27, 0, 1, 7)) - - # CA sets invalid | TODO: move to a separate test - invalid_authzr = mock.MagicMock( - times=[], retries=[messages.STATUS_INVALID]) - self.assertRaises( - errors.PollError, self.client.poll_and_request_issuance, - csr, authzrs=(invalid_authzr,), mintime=mintime) - - # exceeded max_attempts | TODO: move to a separate test - self.assertRaises( - errors.PollError, self.client.poll_and_request_issuance, - csr, authzrs, mintime=mintime, max_attempts=2) - - def test_deactivate_authorization(self): - authzb = self.authzr.body.update(status=messages.STATUS_DEACTIVATED) - self.response.json.return_value = authzb.to_json() - authzr = self.client.deactivate_authorization(self.authzr) - self.assertEqual(authzb, authzr.body) - self.assertEqual(self.client.net.post.call_count, 1) - self.assertIn(self.authzr.uri, self.net.post.call_args_list[0][0]) - - def test_check_cert(self): - self.response.headers['Location'] = self.certr.uri - self.response.content = CERT_DER - self.assertEqual(self.certr.update(body=messages_test.CERT), - self.client.check_cert(self.certr)) - - # TODO: split here and separate test - self.response.headers['Location'] = 'foo' - self.assertRaises( - errors.UnexpectedUpdate, self.client.check_cert, self.certr) - - def test_check_cert_missing_location(self): - self.response.content = CERT_DER - self.assertRaises( - errors.ClientError, self.client.check_cert, self.certr) - - def test_refresh(self): - self.client.check_cert = mock.MagicMock() - self.assertEqual( - self.client.check_cert(self.certr), self.client.refresh(self.certr)) - - def test_fetch_chain_no_up_link(self): - self.assertEqual([], self.client.fetch_chain(self.certr.update( - cert_chain_uri=None))) - - def test_fetch_chain_single(self): - # pylint: disable=protected-access - self.client._get_cert = mock.MagicMock() - self.client._get_cert.return_value = ( - mock.MagicMock(links={}), "certificate") - self.assertEqual([self.client._get_cert(self.certr.cert_chain_uri)[1]], - self.client.fetch_chain(self.certr)) - - def test_fetch_chain_max(self): - # pylint: disable=protected-access - up_response = mock.MagicMock(links={'up': {'url': 'http://cert'}}) - noup_response = mock.MagicMock(links={}) - self.client._get_cert = mock.MagicMock() - self.client._get_cert.side_effect = [ - (up_response, "cert")] * 9 + [(noup_response, "last_cert")] - chain = self.client.fetch_chain(self.certr, max_length=10) - self.assertEqual(chain, ["cert"] * 9 + ["last_cert"]) - - def test_fetch_chain_too_many(self): # recursive - # pylint: disable=protected-access - response = mock.MagicMock(links={'up': {'url': 'http://cert'}}) - self.client._get_cert = mock.MagicMock() - self.client._get_cert.return_value = (response, "certificate") - self.assertRaises(errors.Error, self.client.fetch_chain, self.certr) - - def test_revoke(self): - self.client.revoke(self.certr.body, self.rsn) - self.net.post.assert_called_once_with( - self.directory[messages.Revocation], mock.ANY, acme_version=1) - - def test_revocation_payload(self): - obj = messages.Revocation(certificate=self.certr.body, reason=self.rsn) - self.assertIn('reason', obj.to_partial_json().keys()) - self.assertEqual(self.rsn, obj.to_partial_json()['reason']) - - def test_revoke_bad_status_raises_error(self): - self.response.status_code = http_client.METHOD_NOT_ALLOWED - self.assertRaises( - errors.ClientError, - self.client.revoke, - self.certr, - self.rsn) - - -class ClientV2Test(ClientTestBase): - """Tests for acme.client.ClientV2.""" - - def setUp(self): - super().setUp() - self.directory = DIRECTORY_V2 - from acme.client import ClientV2 self.client = ClientV2(self.directory, self.net) self.new_reg = self.new_reg.update(terms_of_service_agreed=True) @@ -752,11 +102,40 @@ class ClientV2Test(ClientTestBase): self.assertEqual(self.regr, self.client.new_account(self.new_reg)) + def test_new_account_tos_link(self): + self.response.status_code = http_client.CREATED + self.response.json.return_value = self.regr.body.to_json() + self.response.headers['Location'] = self.regr.uri + self.response.links.update({ + 'terms-of-service': {'url': 'https://www.letsencrypt-demo.org/tos'}, + }) + + self.assertEqual(self.client.new_account(self.new_reg).terms_of_service, + 'https://www.letsencrypt-demo.org/tos') + + def test_new_account_conflict(self): self.response.status_code = http_client.OK self.response.headers['Location'] = self.regr.uri self.assertRaises(errors.ConflictError, self.client.new_account, self.new_reg) + def test_deactivate_account(self): + deactivated_regr = self.regr.update( + body=self.regr.body.update(status='deactivated')) + self.response.json.return_value = deactivated_regr.body.to_json() + self.response.status_code = http_client.OK + self.response.headers['Location'] = self.regr.uri + self.assertEqual(self.client.deactivate_registration(self.regr), deactivated_regr) + + def test_deactivate_authorization(self): + deactivated_authz = self.authzr.update( + body=self.authzr.body.update(status=messages.STATUS_DEACTIVATED)) + self.response.json.return_value = deactivated_authz.body.to_json() + authzr = self.client.deactivate_authorization(self.authzr) + self.assertEqual(deactivated_authz.body, authzr.body) + self.assertEqual(self.client.net.post.call_count, 1) + self.assertIn(self.authzr.uri, self.net.post.call_args_list[0][0]) + def test_new_order(self): order_response = copy.deepcopy(self.response) order_response.status_code = http_client.CREATED @@ -775,6 +154,20 @@ class ClientV2Test(ClientTestBase): mock_post_as_get.side_effect = (authz_response, authz_response2) self.assertEqual(self.client.new_order(CSR_MIXED_PEM), self.orderr) + def test_answer_challege(self): + self.response.links['up'] = {'url': self.challr.authzr_uri} + self.response.json.return_value = self.challr.body.to_json() + chall_response = challenges.DNSResponse(validation=None) + self.client.answer_challenge(self.challr.body, chall_response) + + self.assertRaises(errors.UnexpectedUpdate, self.client.answer_challenge, + self.challr.body.update(uri='foo'), chall_response) + + def test_answer_challenge_missing_next(self): + self.assertRaises( + errors.ClientError, self.client.answer_challenge, + self.challr.body, challenges.DNSResponse(validation=None)) + @mock.patch('acme.client.datetime') def test_poll_and_finalize(self, mock_datetime): mock_datetime.datetime.now.return_value = datetime.datetime(2018, 2, 15) @@ -821,6 +214,11 @@ class ClientV2Test(ClientTestBase): self.authz.to_json(), self.authz2.to_json(), updated_authz2.to_json()) self.assertEqual(self.client.poll_authorizations(self.orderr, deadline), updated_orderr) + def test_poll_unexpected_update(self): + updated_authz = self.authz.update(identifier=self.identifier.update(value='foo')) + self.response.json.return_value = updated_authz.to_json() + self.assertRaises(errors.UnexpectedUpdate, self.client.poll, self.authzr) + def test_finalize_order_success(self): updated_order = self.order.update( certificate='https://www.letsencrypt-demo.org/acme/cert/', @@ -872,9 +270,9 @@ class ClientV2Test(ClientTestBase): deadline = datetime.datetime(9999, 9, 9) resp = self.client.finalize_order(self.orderr, deadline, fetch_alternative_chains=True) self.net.post.assert_any_call('https://example.com/acme/cert/1', - mock.ANY, acme_version=2, new_nonce_url=mock.ANY) + mock.ANY, new_nonce_url=mock.ANY) self.net.post.assert_any_call('https://example.com/acme/cert/2', - mock.ANY, acme_version=2, new_nonce_url=mock.ANY) + mock.ANY, new_nonce_url=mock.ANY) self.assertEqual(resp, updated_orderr) del self.response.headers['Link'] @@ -884,8 +282,15 @@ class ClientV2Test(ClientTestBase): def test_revoke(self): self.client.revoke(messages_test.CERT, self.rsn) self.net.post.assert_called_once_with( - self.directory["revokeCert"], mock.ANY, acme_version=2, - new_nonce_url=DIRECTORY_V2['newNonce']) + self.directory["revokeCert"], mock.ANY, new_nonce_url=DIRECTORY_V2['newNonce']) + + def test_revoke_bad_status_raises_error(self): + self.response.status_code = http_client.METHOD_NOT_ALLOWED + self.assertRaises( + errors.ClientError, + self.client.revoke, + messages_test.CERT, + self.rsn) def test_update_registration(self): # "Instance of 'Field' has no to_json/update member" bug: @@ -916,6 +321,11 @@ class ClientV2Test(ClientTestBase): def test_external_account_required_default(self): self.assertFalse(self.client.external_account_required()) + def test_query_registration_client(self): + self.response.json.return_value = self.regr.body.to_json() + self.response.headers['Location'] = 'https://www.letsencrypt-demo.org/acme/reg/1' + self.assertEqual(self.regr, self.client.query_registration(self.regr)) + def test_post_as_get(self): with mock.patch('acme.client.ClientV2._authzr_from_response') as mock_client: mock_client.return_value = self.authzr2 @@ -923,12 +333,64 @@ class ClientV2Test(ClientTestBase): self.client.poll(self.authzr2) # pylint: disable=protected-access self.client.net.post.assert_called_once_with( - self.authzr2.uri, None, acme_version=2, + self.authzr2.uri, None, new_nonce_url='https://www.letsencrypt-demo.org/acme/new-nonce') self.client.net.get.assert_not_called() + def test_retry_after_date(self): + self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT' + self.assertEqual( + datetime.datetime(1999, 12, 31, 23, 59, 59), + self.client.retry_after(response=self.response, default=10)) -class MockJSONDeSerializable(VersionedLEACMEMixin, jose.JSONDeSerializable): + @mock.patch('acme.client.datetime') + def test_retry_after_invalid(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) + dt_mock.timedelta = datetime.timedelta + + self.response.headers['Retry-After'] = 'foooo' + self.assertEqual( + datetime.datetime(2015, 3, 27, 0, 0, 10), + self.client.retry_after(response=self.response, default=10)) + + @mock.patch('acme.client.datetime') + def test_retry_after_overflow(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) + dt_mock.timedelta = datetime.timedelta + dt_mock.datetime.side_effect = datetime.datetime + + self.response.headers['Retry-After'] = "Tue, 116 Feb 2016 11:50:00 MST" + self.assertEqual( + datetime.datetime(2015, 3, 27, 0, 0, 10), + self.client.retry_after(response=self.response, default=10)) + + @mock.patch('acme.client.datetime') + def test_retry_after_seconds(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) + dt_mock.timedelta = datetime.timedelta + + self.response.headers['Retry-After'] = '50' + self.assertEqual( + datetime.datetime(2015, 3, 27, 0, 0, 50), + self.client.retry_after(response=self.response, default=10)) + + @mock.patch('acme.client.datetime') + def test_retry_after_missing(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) + dt_mock.timedelta = datetime.timedelta + + self.assertEqual( + datetime.datetime(2015, 3, 27, 0, 0, 10), + self.client.retry_after(response=self.response, default=10)) + + def test_get_directory(self): + self.response.json.return_value = DIRECTORY_V2.to_json() + self.assertEqual( + DIRECTORY_V2.to_partial_json(), + ClientV2.get_directory('https://example.com/dir', self.net).to_partial_json()) + + +class MockJSONDeSerializable(jose.JSONDeSerializable): # pylint: disable=missing-docstring def __init__(self, value): self.value = value @@ -963,8 +425,7 @@ class ClientNetworkTest(unittest.TestCase): def test_wrap_in_jws(self): # pylint: disable=protected-access jws_dump = self.net._wrap_in_jws( - MockJSONDeSerializable('foo'), nonce=b'Tg', url="url", - acme_version=1) + MockJSONDeSerializable('foo'), nonce=b'Tg', url="url") jws = acme_jws.JWS.json_loads(jws_dump) self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'}) self.assertEqual(jws.signature.combined.nonce, b'Tg') @@ -973,8 +434,7 @@ class ClientNetworkTest(unittest.TestCase): self.net.account = {'uri': 'acct-uri'} # pylint: disable=protected-access jws_dump = self.net._wrap_in_jws( - MockJSONDeSerializable('foo'), nonce=b'Tg', url="url", - acme_version=2) + MockJSONDeSerializable('foo'), nonce=b'Tg', url="url") jws = acme_jws.JWS.json_loads(jws_dump) self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'}) self.assertEqual(jws.signature.combined.nonce, b'Tg') @@ -1080,14 +540,13 @@ class ClientNetworkTest(unittest.TestCase): self.net.session = mock.MagicMock() self.net.session.request.return_value = mock.MagicMock( ok=True, status_code=http_client.OK, - headers={"Content-Type": "application/pkix-cert"}, content=b"hi") # pylint: disable=protected-access self.net._send_request('HEAD', 'http://example.com/', 'foo', - timeout=mock.ANY, bar='baz') + timeout=mock.ANY, bar='baz', headers={'Accept': 'application/pkix-cert'}) mock_logger.debug.assert_called_with( 'Received response:\nHTTP %d\n%s\n\n%s', 200, - 'Content-Type: application/pkix-cert', b'aGk=') + '', b'aGk=') def test_send_request_post(self): self.net.session = mock.MagicMock() @@ -1260,13 +719,13 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase): 'uri', self.obj, content_type=self.content_type)) self.assertTrue(self.response.checked) self.net._wrap_in_jws.assert_called_once_with( - self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1) + self.obj, jose.b64decode(self.all_nonces.pop()), "uri") self.available_nonces = [] self.assertRaises(errors.MissingNonce, self.net.post, 'uri', self.obj, content_type=self.content_type) self.net._wrap_in_jws.assert_called_with( - self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1) + self.obj, jose.b64decode(self.all_nonces.pop()), "uri") def test_post_wrong_initial_nonce(self): # HEAD self.available_nonces = [b'f', jose.b64encode(b'good')] @@ -1324,14 +783,13 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase): check_response = mock.MagicMock() self.net._check_response = check_response self.assertRaises(errors.ClientError, self.net.post, 'uri', - self.obj, content_type=self.content_type, acme_version=2, + self.obj, content_type=self.content_type, new_nonce_url='new_nonce_uri') self.assertEqual(check_response.call_count, 1) def test_new_nonce_uri_removed(self): self.content_type = None - self.net.post('uri', self.obj, content_type=None, - acme_version=2, new_nonce_url='new_nonce_uri') + self.net.post('uri', self.obj, content_type=None, new_nonce_url='new_nonce_uri') class ClientNetworkSourceAddressBindingTest(unittest.TestCase): diff --git a/acme/tests/fields_test.py b/acme/tests/fields_test.py index 4cc167f9c..b53798f00 100644 --- a/acme/tests/fields_test.py +++ b/acme/tests/fields_test.py @@ -54,19 +54,5 @@ class RFC3339FieldTest(unittest.TestCase): jose.DeserializationError, RFC3339Field.default_decoder, '') -class ResourceTest(unittest.TestCase): - """Tests for acme.fields.Resource.""" - - def setUp(self): - from acme.fields import Resource - self.field = Resource('x') - - def test_decode_good(self): - self.assertEqual('x', self.field.decode('x')) - - def test_decode_wrong(self): - self.assertRaises(jose.DeserializationError, self.field.decode, 'y') - - if __name__ == '__main__': unittest.main() # pragma: no cover diff --git a/acme/tests/magic_typing_test.py b/acme/tests/magic_typing_test.py deleted file mode 100644 index d470337bd..000000000 --- a/acme/tests/magic_typing_test.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Tests for acme.magic_typing.""" -import sys -import unittest -import warnings -from unittest import mock - - -class MagicTypingTest(unittest.TestCase): - """Tests for acme.magic_typing.""" - def test_import_success(self): - try: - import typing as temp_typing - except ImportError: # pragma: no cover - temp_typing = None # pragma: no cover - typing_class_mock = mock.MagicMock() - text_mock = mock.MagicMock() - typing_class_mock.Text = text_mock - sys.modules['typing'] = typing_class_mock - if 'acme.magic_typing' in sys.modules: - del sys.modules['acme.magic_typing'] # pragma: no cover - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - from acme.magic_typing import Text - self.assertEqual(Text, text_mock) - del sys.modules['acme.magic_typing'] - sys.modules['typing'] = temp_typing - - -if __name__ == '__main__': - unittest.main() # pragma: no cover diff --git a/acme/tests/messages_test.py b/acme/tests/messages_test.py index cf7e7629a..cce4e409f 100644 --- a/acme/tests/messages_test.py +++ b/acme/tests/messages_test.py @@ -134,8 +134,8 @@ class DirectoryTest(unittest.TestCase): def setUp(self): from acme.messages import Directory self.dir = Directory({ - 'new-reg': 'reg', - mock.MagicMock(resource_type='new-cert'): 'cert', + 'newReg': 'reg', + 'newCert': 'cert', 'meta': Directory.Meta( terms_of_service='https://example.com/acme/terms', website='https://www.example.com/', @@ -148,26 +148,23 @@ class DirectoryTest(unittest.TestCase): Directory({'foo': 'bar'}) def test_getitem(self): - self.assertEqual('reg', self.dir['new-reg']) - from acme.messages import NewRegistration - self.assertEqual('reg', self.dir[NewRegistration]) - self.assertEqual('reg', self.dir[NewRegistration()]) + self.assertEqual('reg', self.dir['newReg']) def test_getitem_fails_with_key_error(self): self.assertRaises(KeyError, self.dir.__getitem__, 'foo') def test_getattr(self): - self.assertEqual('reg', self.dir.new_reg) + self.assertEqual('reg', self.dir.newReg) def test_getattr_fails_with_attribute_error(self): self.assertRaises(AttributeError, self.dir.__getattr__, 'foo') def test_to_json(self): self.assertEqual(self.dir.to_json(), { - 'new-reg': 'reg', - 'new-cert': 'cert', + 'newReg': 'reg', + 'newCert': 'cert', 'meta': { - 'terms-of-service': 'https://example.com/acme/terms', + 'termsOfService': 'https://example.com/acme/terms', 'website': 'https://www.example.com/', 'caaIdentities': ['example.com'], }, @@ -287,7 +284,7 @@ class UpdateRegistrationTest(unittest.TestCase): def test_empty(self): from acme.messages import UpdateRegistration jstring = '{"resource": "reg"}' - self.assertEqual(jstring, UpdateRegistration().json_dumps()) + self.assertEqual('{}', UpdateRegistration().json_dumps()) self.assertEqual( UpdateRegistration(), UpdateRegistration.json_loads(jstring)) @@ -335,7 +332,7 @@ class ChallengeBodyTest(unittest.TestCase): error=error) self.jobj_to = { - 'uri': 'http://challb', + 'url': 'http://challb', 'status': self.status, 'type': 'dns', 'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA', @@ -382,20 +379,17 @@ class AuthorizationTest(unittest.TestCase): chall=challenges.DNS( token=b'DGyRejmCefe7v4NfDGDKfA')), ) - combinations = ((0,), (1,)) from acme.messages import Authorization from acme.messages import Identifier from acme.messages import IDENTIFIER_FQDN identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com') self.authz = Authorization( - identifier=identifier, combinations=combinations, - challenges=self.challbs) + identifier=identifier, challenges=self.challbs) self.jobj_from = { 'identifier': identifier.to_json(), 'challenges': [challb.to_json() for challb in self.challbs], - 'combinations': combinations, } def test_from_json(self): @@ -406,12 +400,6 @@ class AuthorizationTest(unittest.TestCase): from acme.messages import Authorization hash(Authorization.from_json(self.jobj_from)) - def test_resolved_combinations(self): - self.assertEqual(self.authz.resolved_combinations, ( - (self.challbs[0],), - (self.challbs[1],), - )) - class AuthorizationResourceTest(unittest.TestCase): """Tests for acme.messages.AuthorizationResource.""" @@ -502,7 +490,6 @@ class JWSPayloadRFC8555Compliant(unittest.TestCase): from acme.messages import NewAuthorization new_order = NewAuthorization() - new_order.le_acme_version = 2 jobj = new_order.json_dumps(indent=2).encode() # RFC8555 states that JWS bodies must not have a resource field. diff --git a/certbot/certbot/_internal/account.py b/certbot/certbot/_internal/account.py index d8f002948..8ebc71de3 100644 --- a/certbot/certbot/_internal/account.py +++ b/certbot/certbot/_internal/account.py @@ -20,7 +20,7 @@ import pytz from acme import fields as acme_fields from acme import messages -from acme.client import ClientBase +from acme.client import ClientV2 from certbot import configuration from certbot import errors from certbot import interfaces @@ -114,7 +114,7 @@ class AccountMemoryStorage(interfaces.AccountStorage): def find_all(self) -> List[Account]: return list(self.accounts.values()) - def save(self, account: Account, client: ClientBase) -> None: + def save(self, account: Account, client: ClientV2) -> None: if account.id in self.accounts: logger.debug("Overwriting account: %s", account.id) self.accounts[account.id] = account @@ -243,11 +243,11 @@ class AccountFileStorage(interfaces.AccountStorage): def load(self, account_id: str) -> Account: return self._load_for_server_path(account_id, self.config.server_path) - def save(self, account: Account, client: ClientBase) -> None: + def save(self, account: Account, client: ClientV2) -> None: """Create a new account. :param Account account: account to create - :param ClientBase client: ACME client associated to the account + :param ClientV2 client: ACME client associated to the account """ try: @@ -258,11 +258,11 @@ class AccountFileStorage(interfaces.AccountStorage): except IOError as error: raise errors.AccountStorageError(error) - def update_regr(self, account: Account, client: ClientBase) -> None: + def update_regr(self, account: Account, client: ClientV2) -> None: """Update the registration resource. :param Account account: account to update - :param ClientBase client: ACME client associated to the account + :param ClientV2 client: ACME client associated to the account """ try: @@ -358,7 +358,7 @@ class AccountFileStorage(interfaces.AccountStorage): with util.safe_open(self._key_path(dir_path), "w", chmod=0o400) as key_file: key_file.write(account.key.json_dumps()) - def _update_regr(self, account: Account, acme: ClientBase, dir_path: str) -> None: + def _update_regr(self, account: Account, acme: ClientV2, dir_path: str) -> None: with open(self._regr_path(dir_path), "w") as regr_file: regr = account.regr # If we have a value for new-authz, save it for forwards diff --git a/certbot/certbot/_internal/auth_handler.py b/certbot/certbot/_internal/auth_handler.py index 979ef0220..05feaadc0 100644 --- a/certbot/certbot/_internal/auth_handler.py +++ b/certbot/certbot/_internal/auth_handler.py @@ -36,7 +36,7 @@ class AuthHandler: :class:`~acme.challenges.Challenge` types :type auth: certbot.interfaces.Authenticator - :ivar acme.client.BackwardsCompatibleClientV2 acme_client: ACME client API. + :ivar acme.client.ClientV2 acme_client: ACME client API. :ivar account: Client's Account :type account: :class:`certbot._internal.account.Account` @@ -226,15 +226,10 @@ class AuthHandler: logger.info("Performing the following challenges:") for authzr in pending_authzrs: authzr_challenges = authzr.body.challenges - if self.acme.acme_version == 1: - combinations = authzr.body.combinations - else: - combinations = tuple((i,) for i in range(len(authzr_challenges))) path = gen_challenge_path( authzr_challenges, - self._get_chall_pref(authzr.body.identifier.value), - combinations) + self._get_chall_pref(authzr.body.identifier.value)) achalls.extend(self._challenge_factory(authzr, path)) @@ -387,12 +382,9 @@ def challb_to_achall(challb: messages.ChallengeBody, account_key: josepy.JWK, def gen_challenge_path(challbs: List[messages.ChallengeBody], - preferences: List[Type[challenges.Challenge]], - combinations: Tuple[Tuple[int, ...], ...]) -> Tuple[int, ...]: + preferences: List[Type[challenges.Challenge]]) -> Tuple[int, ...]: """Generate a plan to get authority over the identity. - .. todo:: This can be possibly be rewritten to use resolved_combinations. - :param tuple challbs: A tuple of challenges (:class:`acme.messages.Challenge`) from :class:`acme.messages.AuthorizationResource` to be @@ -402,10 +394,6 @@ def gen_challenge_path(challbs: List[messages.ChallengeBody], :param list preferences: List of challenge preferences for domain (:class:`acme.challenges.Challenge` subclasses) - :param tuple combinations: A collection of sets of challenges from - :class:`acme.messages.Challenge`, each of which would - be sufficient to prove possession of the identifier. - :returns: list of indices from ``challenges``. :rtype: list @@ -413,21 +401,6 @@ def gen_challenge_path(challbs: List[messages.ChallengeBody], path cannot be created that satisfies the CA given the preferences and combinations. - """ - if combinations: - return _find_smart_path(challbs, preferences, combinations) - return _find_dumb_path(challbs, preferences) - - -def _find_smart_path(challbs: List[messages.ChallengeBody], - preferences: List[Type[challenges.Challenge]], - combinations: Tuple[Tuple[int, ...], ...] - ) -> Tuple[int, ...]: - """Find challenge path with server hints. - - Can be called if combinations is included. Function uses a simple - ranking system to choose the combo with the lowest cost. - """ chall_cost = {} max_cost = 1 @@ -441,6 +414,8 @@ def _find_smart_path(challbs: List[messages.ChallengeBody], # Set above completing all of the available challenges best_combo_cost = max_cost + combinations = tuple((i,) for i in range(len(challbs))) + combo_total = 0 for combo in combinations: for challenge_index in combo: @@ -459,28 +434,6 @@ def _find_smart_path(challbs: List[messages.ChallengeBody], return best_combo -def _find_dumb_path(challbs: List[messages.ChallengeBody], - preferences: List[Type[challenges.Challenge]]) -> Tuple[int, ...]: - """Find challenge path without server hints. - - Should be called if the combinations hint is not included by the - server. This function either returns a path containing all - challenges provided by the CA or raises an exception. - - """ - path = [] - for i, challb in enumerate(challbs): - # supported is set to True if the challenge type is supported - supported = next((True for pref_c in preferences - if isinstance(challb.chall, pref_c)), False) - if supported: - path.append(i) - else: - raise _report_no_chall_path(challbs) - - return tuple(path) - - def _report_no_chall_path(challbs: List[messages.ChallengeBody]) -> errors.AuthorizationError: """Logs and return a raisable error reporting that no satisfiable chall path exists. diff --git a/certbot/certbot/_internal/client.py b/certbot/certbot/_internal/client.py index bcd9713db..89c0e498a 100644 --- a/certbot/certbot/_internal/client.py +++ b/certbot/certbot/_internal/client.py @@ -10,7 +10,6 @@ from typing import IO from typing import List from typing import Optional from typing import Tuple -import warnings from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key @@ -70,16 +69,8 @@ def acme_from_config_key(config: configuration.NamespaceConfig, key: jose.JWK, verify_ssl=(not config.no_verify_ssl), user_agent=determine_user_agent(config)) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - - client = acme_client.BackwardsCompatibleClientV2(net, key, config.server) - if client.acme_version == 1: - logger.warning( - "Certbot is configured to use an ACMEv1 server (%s). ACMEv1 support is deprecated" - " and will soon be removed. See https://community.letsencrypt.org/t/143839 for " - "more information.", config.server) - return cast(acme_client.ClientV2, client) + directory = acme_client.ClientV2.get_directory(config.server, net) + return acme_client.ClientV2(directory, net) def determine_user_agent(config: configuration.NamespaceConfig) -> str: @@ -256,18 +247,13 @@ def perform_registration(acme: acme_client.ClientV2, config: configuration.Names " Please use --eab-kid and --eab-hmac-key.") raise errors.Error(msg) + tos = acme.directory.meta.terms_of_service + if tos_cb and tos: + tos_cb(tos) + try: - newreg = messages.NewRegistration.from_data( - email=config.email, external_account_binding=eab) - # Until ACME v1 support is removed from Certbot, we actually need the provided - # ACME client to be a wrapper of type BackwardsCompatibleClientV2. - # TODO: Remove this cast and rewrite the logic when the client is actually a ClientV2 - try: - return cast(acme_client.BackwardsCompatibleClientV2, - acme).new_account_and_tos(newreg, tos_cb) - except AttributeError: - raise errors.Error("The ACME client must be an instance of " - "acme.client.BackwardsCompatibleClientV2") + return acme.new_account(messages.NewRegistration.from_data( + email=config.email, terms_of_service_agreed=True, external_account_binding=eab)) except messages.Error as e: if e.code in ("invalidEmail", "invalidContact"): if config.noninteractive_mode: @@ -291,8 +277,8 @@ class Client: :ivar .Authenticator auth: Prepared (`.Authenticator.prepare`) authenticator that can solve ACME challenges. :ivar .Installer installer: Installer. - :ivar acme.client.BackwardsCompatibleClientV2 acme: Optional ACME - client API handle. You might already have one from `register`. + :ivar acme.client.ClientV2 acme: Optional ACME client API handle. You might + already have one from `register`. """ diff --git a/certbot/certbot/interfaces.py b/certbot/certbot/interfaces.py index 0d12ffd2e..00c37303a 100644 --- a/certbot/certbot/interfaces.py +++ b/certbot/certbot/interfaces.py @@ -18,7 +18,7 @@ import zope.interface from acme.challenges import Challenge from acme.challenges import ChallengeResponse -from acme.client import ClientBase +from acme.client import ClientV2 from certbot import configuration from certbot.achallenges import AnnotatedChallenge @@ -53,7 +53,7 @@ class AccountStorage(metaclass=ABCMeta): raise NotImplementedError() @abstractmethod - def save(self, account: 'Account', client: ClientBase) -> None: # pragma: no cover + def save(self, account: 'Account', client: ClientV2) -> None: # pragma: no cover """Save account. :raises .AccountStorageError: if account could not be saved diff --git a/certbot/certbot/tests/acme_util.py b/certbot/certbot/tests/acme_util.py index d8ee7f9a8..6412f5dc7 100644 --- a/certbot/certbot/tests/acme_util.py +++ b/certbot/certbot/tests/acme_util.py @@ -3,7 +3,6 @@ import datetime from typing import Any from typing import Dict from typing import Iterable -from typing import Tuple import josepy as jose @@ -24,12 +23,6 @@ DNS01_2 = challenges.DNS01(token=b"cafecafecafecafecafecafe0feedbac") CHALLENGES = [HTTP01, DNS01] -def gen_combos(challbs: Iterable[messages.ChallengeBody]) -> Tuple[Tuple[int], ...]: - """Generate natural combinations for challbs.""" - # completing a single DV challenge satisfies the CA - return tuple((i,) for i, _ in enumerate(challbs)) - - def chall_to_challb(chall: challenges.Challenge, status: messages.Status) -> messages.ChallengeBody: """Return ChallengeBody from Challenge.""" kwargs = { @@ -61,15 +54,13 @@ ACHALLENGES = [HTTP01_A, DNS01_A] def gen_authzr(authz_status: messages.Status, domain: str, challs: Iterable[challenges.Challenge], - statuses: Iterable[messages.Status], - combos: bool = True) -> messages.AuthorizationResource: + statuses: Iterable[messages.Status]) -> messages.AuthorizationResource: """Generate an authorization resource. :param authz_status: Status object :type authz_status: :class:`acme.messages.Status` :param list challs: Challenge objects :param list statuses: status of each challenge object - :param bool combos: Whether or not to add combinations """ challbs = tuple( @@ -81,8 +72,6 @@ def gen_authzr(authz_status: messages.Status, domain: str, challs: Iterable[chal typ=messages.IDENTIFIER_FQDN, value=domain), "challenges": challbs, } - if combos: - authz_kwargs.update({"combinations": gen_combos(challbs)}) if authz_status == messages.STATUS_VALID: authz_kwargs.update({ "status": authz_status, diff --git a/certbot/tests/auth_handler_test.py b/certbot/tests/auth_handler_test.py index e13dfbfe5..9adbcdfbc 100644 --- a/certbot/tests/auth_handler_test.py +++ b/certbot/tests/auth_handler_test.py @@ -33,7 +33,7 @@ class ChallengeFactoryTest(unittest.TestCase): self.authzr = acme_util.gen_authzr( messages.STATUS_PENDING, "test", acme_util.CHALLENGES, - [messages.STATUS_PENDING] * 6, False) + [messages.STATUS_PENDING] * 6) def test_all(self): achalls = self.handler._challenge_factory( @@ -81,7 +81,6 @@ class HandleAuthorizationsTest(unittest.TestCase): self.mock_account = mock.MagicMock() self.mock_net = mock.MagicMock(spec=acme_client.ClientV2) - self.mock_net.acme_version = 1 self.mock_net.retry_after.side_effect = acme_client.ClientV2.retry_after self.handler = AuthHandler( @@ -92,8 +91,8 @@ class HandleAuthorizationsTest(unittest.TestCase): def tearDown(self): logging.disable(logging.NOTSET) - def _test_name1_http_01_1_common(self, combos): - authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=combos) + def _test_name1_http_01_1_common(self): + authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES) mock_order = mock.MagicMock(authorizations=[authzr]) self.mock_net.poll.side_effect = _gen_mock_on_poll(retry=1, wait_value=30) @@ -117,39 +116,14 @@ class HandleAuthorizationsTest(unittest.TestCase): self.assertEqual(len(authzr), 1) - def test_name1_http_01_1_acme_1(self): - self._test_name1_http_01_1_common(combos=True) - def test_name1_http_01_1_acme_2(self): - self.mock_net.acme_version = 2 - self._test_name1_http_01_1_common(combos=False) - - def test_name1_http_01_1_dns_1_acme_1(self): - self.mock_net.poll.side_effect = _gen_mock_on_poll() - self.mock_auth.get_chall_pref.return_value.append(challenges.DNS01) - - authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=False) - mock_order = mock.MagicMock(authorizations=[authzr]) - authzr = self.handler.handle_authorizations(mock_order, self.mock_config) - - self.assertEqual(self.mock_net.answer_challenge.call_count, 2) - - self.assertEqual(self.mock_net.poll.call_count, 1) - - self.assertEqual(self.mock_auth.cleanup.call_count, 1) - # Test if list first element is http-01, use typ because it is an achall - for achall in self.mock_auth.cleanup.call_args[0][0]: - self.assertIn(achall.typ, ["http-01", "dns-01"]) - - # Length of authorizations list - self.assertEqual(len(authzr), 1) + self._test_name1_http_01_1_common() def test_name1_http_01_1_dns_1_acme_2(self): - self.mock_net.acme_version = 2 self.mock_net.poll.side_effect = _gen_mock_on_poll() self.mock_auth.get_chall_pref.return_value.append(challenges.DNS01) - authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=False) + authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES) mock_order = mock.MagicMock(authorizations=[authzr]) authzr = self.handler.handle_authorizations(mock_order, self.mock_config) @@ -165,7 +139,7 @@ class HandleAuthorizationsTest(unittest.TestCase): # Length of authorizations list self.assertEqual(len(authzr), 1) - def _test_name3_http_01_3_common(self, combos): + def test_name3_http_01_3_common_acme_2(self): authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES), gen_dom_authzr(domain="1", challs=acme_util.CHALLENGES), gen_dom_authzr(domain="2", challs=acme_util.CHALLENGES)] @@ -183,13 +157,6 @@ class HandleAuthorizationsTest(unittest.TestCase): self.assertEqual(len(authzr), 3) - def test_name3_http_01_3_common_acme_1(self): - self._test_name3_http_01_3_common(combos=True) - - def test_name3_http_01_3_common_acme_2(self): - self.mock_net.acme_version = 2 - self._test_name3_http_01_3_common(combos=False) - def test_debug_challenges(self): config = mock.Mock(debug_challenges=True, verbose_count=0) authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)] @@ -269,8 +236,8 @@ class HandleAuthorizationsTest(unittest.TestCase): self.assertRaises(errors.AuthorizationError, self.handler.handle_authorizations, mock_order, self.mock_config) - def _test_preferred_challenge_choice_common(self, combos): - authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=combos)] + def test_preferred_challenge_choice_common_acme_2(self): + authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)] mock_order = mock.MagicMock(authorizations=authzrs) self.mock_auth.get_chall_pref.return_value.append(challenges.HTTP01) @@ -285,28 +252,14 @@ class HandleAuthorizationsTest(unittest.TestCase): self.assertEqual( self.mock_auth.cleanup.call_args[0][0][0].typ, "http-01") - def test_preferred_challenge_choice_common_acme_1(self): - self._test_preferred_challenge_choice_common(combos=True) - - def test_preferred_challenge_choice_common_acme_2(self): - self.mock_net.acme_version = 2 - self._test_preferred_challenge_choice_common(combos=False) - - def _test_preferred_challenges_not_supported_common(self, combos): - authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=combos)] + def test_preferred_challenges_not_supported_acme_2(self): + authzrs = [gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES)] mock_order = mock.MagicMock(authorizations=authzrs) self.handler.pref_challs.append(challenges.DNS01.typ) self.assertRaises( errors.AuthorizationError, self.handler.handle_authorizations, mock_order, self.mock_config) - def test_preferred_challenges_not_supported_acme_1(self): - self._test_preferred_challenges_not_supported_common(combos=True) - - def test_preferred_challenges_not_supported_acme_2(self): - self.mock_net.acme_version = 2 - self._test_preferred_challenges_not_supported_common(combos=False) - def test_dns_only_challenge_not_supported(self): authzrs = [gen_dom_authzr(domain="0", challs=[acme_util.DNS01])] mock_order = mock.MagicMock(authorizations=authzrs) @@ -317,7 +270,7 @@ class HandleAuthorizationsTest(unittest.TestCase): def test_perform_error(self): self.mock_auth.perform.side_effect = errors.AuthorizationError - authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES, combos=True) + authzr = gen_dom_authzr(domain="0", challs=acme_util.CHALLENGES) mock_order = mock.MagicMock(authorizations=[authzr]) self.assertRaises(errors.AuthorizationError, self.handler.handle_authorizations, mock_order, self.mock_config) @@ -392,7 +345,7 @@ class HandleAuthorizationsTest(unittest.TestCase): authzr = acme_util.gen_authzr( messages.STATUS_PENDING, "0", [acme_util.DNS01], - [messages.STATUS_PENDING], False) + [messages.STATUS_PENDING]) mock_order = mock.MagicMock(authorizations=[authzr]) self.assertRaises( errors.AuthorizationError, self.handler.handle_authorizations, @@ -404,7 +357,7 @@ class HandleAuthorizationsTest(unittest.TestCase): authzr = acme_util.gen_authzr( messages.STATUS_VALID, "0", [acme_util.DNS01], - [messages.STATUS_VALID], False) + [messages.STATUS_VALID]) mock_order = mock.MagicMock(authorizations=[authzr]) self.handler.handle_authorizations(mock_order, self.mock_config) @@ -426,7 +379,7 @@ class HandleAuthorizationsTest(unittest.TestCase): ("is_valid_but_will_fail", messages.STATUS_VALID)] to_deactivate = [acme_util.gen_authzr(a[1], a[0], [acme_util.HTTP01], - [a[1], False]) for a in to_deactivate] + [a[1]]) for a in to_deactivate] orderr = mock.MagicMock(authorizations=to_deactivate) self.mock_net.deactivate_authorization.side_effect = _mock_deactivate @@ -452,8 +405,7 @@ def _gen_mock_on_poll(status=messages.STATUS_VALID, retry=0, wait_value=1): effective_status, authzr.body.identifier.value, [challb.chall for challb in authzr.body.challenges], - [effective_status] * len(authzr.body.challenges), - authzr.body.combinations) + [effective_status] * len(authzr.body.challenges)) return updated_azr, mock.MagicMock(headers={'Retry-After': str(wait_value)}) return _mock @@ -477,8 +429,6 @@ class ChallbToAchallTest(unittest.TestCase): class GenChallengePathTest(unittest.TestCase): """Tests for certbot._internal.auth_handler.gen_challenge_path. - .. todo:: Add more tests for dumb_path... depending on what we want to do. - """ def setUp(self): logging.disable(logging.FATAL) @@ -487,34 +437,25 @@ class GenChallengePathTest(unittest.TestCase): logging.disable(logging.NOTSET) @classmethod - def _call(cls, challbs, preferences, combinations): + def _call(cls, challbs, preferences): from certbot._internal.auth_handler import gen_challenge_path - return gen_challenge_path(challbs, preferences, combinations) + return gen_challenge_path(challbs, preferences) def test_common_case(self): """Given DNS01 and HTTP01 with appropriate combos.""" challbs = (acme_util.DNS01_P, acme_util.HTTP01_P) prefs = [challenges.DNS01, challenges.HTTP01] - combos = ((0,), (1,)) - # Smart then trivial dumb path test - self.assertEqual(self._call(challbs, prefs, combos), (0,)) - self.assertTrue(self._call(challbs, prefs, None)) - # Rearrange order... - self.assertEqual(self._call(challbs[::-1], prefs, combos), (1,)) - self.assertTrue(self._call(challbs[::-1], prefs, None)) + self.assertEqual(self._call(challbs, prefs), (0,)) + self.assertEqual(self._call(challbs[::-1], prefs), (1,)) def test_not_supported(self): - challbs = (acme_util.DNS01_P, acme_util.HTTP01_P) + challbs = (acme_util.DNS01_P,) prefs = [challenges.HTTP01] - combos = ((0, 1),) - # smart path fails because no challs in perfs satisfies combos + # smart path fails because no challs in prefs satisfies combos self.assertRaises( - errors.AuthorizationError, self._call, challbs, prefs, combos) - # dumb path fails because all challbs are not supported - self.assertRaises( - errors.AuthorizationError, self._call, challbs, prefs, None) + errors.AuthorizationError, self._call, challbs, prefs) class ReportFailedAuthzrsTest(unittest.TestCase): @@ -615,11 +556,11 @@ def gen_auth_resp(chall_list): for chall in chall_list] -def gen_dom_authzr(domain, challs, combos=True): +def gen_dom_authzr(domain, challs): """Generates new authzr for domains.""" return acme_util.gen_authzr( messages.STATUS_PENDING, domain, challs, - [messages.STATUS_PENDING] * len(challs), combos) + [messages.STATUS_PENDING] * len(challs)) if __name__ == "__main__": diff --git a/certbot/tests/client_test.py b/certbot/tests/client_test.py index 28daefea8..e51be7f9b 100644 --- a/certbot/tests/client_test.py +++ b/certbot/tests/client_test.py @@ -69,13 +69,13 @@ class RegisterTest(test_util.ConfigTestCase): self.config.register_unsafely_without_email = False self.config.email = "alias@example.com" self.account_storage = account.AccountMemoryStorage() + self.tos_cb = mock.MagicMock() with mock.patch("zope.component.provideUtility"): display_obj.set_display(MagicMock()) def _call(self): from certbot._internal.client import register - tos_cb = mock.MagicMock() - return register(self.config, self.account_storage, tos_cb) + return register(self.config, self.account_storage, self.tos_cb) @staticmethod def _public_key_mock(): @@ -98,31 +98,42 @@ class RegisterTest(test_util.ConfigTestCase): @staticmethod @contextlib.contextmanager def _patched_acme_client(): - # This function is written this way to avoid deprecation warnings that - # are raised when BackwardsCompatibleClientV2 is accessed on the real - # acme.client module. with mock.patch('certbot._internal.client.acme_client') as mock_acme_client: - yield mock_acme_client.BackwardsCompatibleClientV2 + yield mock_acme_client.ClientV2 def test_no_tos(self): with self._patched_acme_client() as mock_client: - mock_client.new_account_and_tos().terms_of_service = "http://tos" + mock_client.new_account().terms_of_service = "http://tos" mock_client().external_account_required.side_effect = self._false_mock with mock.patch("certbot._internal.eff.prepare_subscription") as mock_prepare: - mock_client().new_account_and_tos.side_effect = errors.Error + mock_client().new_account.side_effect = errors.Error self.assertRaises(errors.Error, self._call) self.assertIs(mock_prepare.called, False) - mock_client().new_account_and_tos.side_effect = None + mock_client().new_account.side_effect = None self._call() self.assertIs(mock_prepare.called, True) + @mock.patch('certbot._internal.eff.prepare_subscription') + def test_empty_meta(self, unused_mock_prepare): + # Test that we can handle an ACME server which does not implement the 'meta' + # directory object (for terms-of-service handling). + with self._patched_acme_client() as mock_client: + from acme.messages import Directory + mock_client().directory = Directory.from_json({}) + + mock_client().external_account_required.side_effect = self._false_mock + + self._call() + self.assertIs(self.tos_cb.called, False) + @test_util.patch_display_util() def test_it(self, unused_mock_get_utility): with self._patched_acme_client() as mock_client: mock_client().external_account_required.side_effect = self._false_mock with mock.patch("certbot._internal.eff.handle_subscription"): self._call() + self.assertIs(self.tos_cb.called, True) @mock.patch("certbot._internal.client.display_ops.get_email") def test_email_retry(self, mock_get_email): @@ -133,7 +144,7 @@ class RegisterTest(test_util.ConfigTestCase): with self._patched_acme_client() as mock_client: mock_client().external_account_required.side_effect = self._false_mock with mock.patch("certbot._internal.eff.prepare_subscription") as mock_prepare: - mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()] + mock_client().new_account.side_effect = [mx_err, mock.MagicMock()] self._call() self.assertEqual(mock_get_email.call_count, 1) self.assertIs(mock_prepare.called, True) @@ -146,7 +157,7 @@ class RegisterTest(test_util.ConfigTestCase): with self._patched_acme_client() as mock_client: mock_client().external_account_required.side_effect = self._false_mock with mock.patch("certbot._internal.eff.handle_subscription"): - mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()] + mock_client().new_account.side_effect = [mx_err, mock.MagicMock()] self.assertRaises(errors.Error, self._call) def test_needs_email(self): @@ -176,7 +187,7 @@ class RegisterTest(test_util.ConfigTestCase): # check Certbot did not ask the user to provide an email self.assertIs(mock_get_email.called, False) # check Certbot created an account with no email. Contact should return empty - self.assertFalse(mock_client().new_account_and_tos.call_args[0][0].contact) + self.assertFalse(mock_client().new_account.call_args[0][0].contact) @test_util.patch_display_util() def test_with_eab_arguments(self, unused_mock_get_utility): @@ -228,7 +239,7 @@ class RegisterTest(test_util.ConfigTestCase): ) mock_client().external_account_required.side_effect = self._false_mock with mock.patch("certbot._internal.eff.handle_subscription") as mock_handle: - mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()] + mock_client().new_account.side_effect = [mx_err, mock.MagicMock()] self.assertRaises(messages.Error, self._call) self.assertIs(mock_handle.called, False) @@ -245,7 +256,7 @@ class ClientTestCommon(test_util.ConfigTestCase): from certbot._internal.client import Client with mock.patch("certbot._internal.client.acme_client") as acme: - self.acme_client = acme.BackwardsCompatibleClientV2 + self.acme_client = acme.ClientV2 self.acme = self.acme_client.return_value = mock.MagicMock() self.client_network = acme.ClientNetwork self.client = Client( diff --git a/certbot/tests/main_test.py b/certbot/tests/main_test.py index 61632fc8f..ad443f1ca 100644 --- a/certbot/tests/main_test.py +++ b/certbot/tests/main_test.py @@ -388,7 +388,7 @@ class RevokeTest(test_util.TempDirTestCase): mock.patch('certbot._internal.main._determine_account'), mock.patch('certbot._internal.main.display_ops.success_revocation') ] - self.mock_acme_client = patches[0].start().BackwardsCompatibleClientV2 + self.mock_acme_client = patches[0].start().ClientV2 patches[1].start() self.mock_determine_account = patches[2].start() self.mock_success_revoke = patches[3].start() @@ -418,12 +418,19 @@ class RevokeTest(test_util.TempDirTestCase): from certbot._internal.main import revoke revoke(config, plugins) + def _mock_set_by_cli(self, mocked: mock.MagicMock, key: str, value: bool) -> None: + def set_by_cli(k: str) -> bool: + if key == k: + return value + return mock.DEFAULT + mocked.side_effect = set_by_cli + @mock.patch('certbot._internal.main._delete_if_appropriate') @mock.patch('certbot._internal.main.client.acme_client') def test_revoke_with_reason(self, mock_acme_client, mock_delete_if_appropriate): mock_delete_if_appropriate.return_value = False - mock_revoke = mock_acme_client.BackwardsCompatibleClientV2().revoke + mock_revoke = mock_acme_client.ClientV2().revoke expected = [] for reason, code in constants.REVOCATION_REASONS.items(): args = 'revoke --cert-path={0} --reason {1}'.format(self.tmp_cert_path, reason).split() @@ -438,42 +445,56 @@ class RevokeTest(test_util.TempDirTestCase): @mock.patch('certbot._internal.main._delete_if_appropriate') @mock.patch('certbot._internal.storage.RenewableCert') @mock.patch('certbot._internal.storage.renewal_file_for_certname') - def test_revoke_by_certname(self, unused_mock_renewal_file_for_certname, - mock_cert, mock_delete_if_appropriate): + @mock.patch('certbot._internal.client.acme_from_config_key') + @mock.patch('certbot._internal.cli.set_by_cli') + def test_revoke_by_certname(self, mock_set_by_cli, mock_acme_from_config, + unused_mock_renewal_file_for_certname, mock_cert, + mock_delete_if_appropriate): + self._mock_set_by_cli(mock_set_by_cli, "server", False) + mock_acme_from_config.return_value = self.mock_acme_client mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path, server="https://acme.example") args = 'revoke --cert-name=example.com'.split() mock_delete_if_appropriate.return_value = False self._call(args) - self.mock_acme_client.assert_called_once_with(mock.ANY, mock.ANY, 'https://acme.example') + self.assertEqual(mock_acme_from_config.call_args_list[0][0][0].server, + 'https://acme.example') self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path) @mock.patch('certbot._internal.main._delete_if_appropriate') @mock.patch('certbot._internal.storage.RenewableCert') @mock.patch('certbot._internal.storage.renewal_file_for_certname') - def test_revoke_by_certname_and_server(self, unused_mock_renewal_file_for_certname, - mock_cert, mock_delete_if_appropriate): + @mock.patch('certbot._internal.client.acme_from_config_key') + @mock.patch('certbot._internal.cli.set_by_cli') + def test_revoke_by_certname_and_server(self, mock_set_by_cli, mock_acme_from_config, + unused_mock_renewal_file_for_certname, mock_cert, + mock_delete_if_appropriate): """Revoking with --server should use the server from the CLI""" + self._mock_set_by_cli(mock_set_by_cli, "server", True) mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path, server="https://acme.example") args = 'revoke --cert-name=example.com --server https://other.example'.split() mock_delete_if_appropriate.return_value = False self._call(args) - self.mock_acme_client.assert_called_once_with(mock.ANY, mock.ANY, 'https://other.example') + self.assertEqual(mock_acme_from_config.call_args_list[0][0][0].server, + 'https://other.example') self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path) @mock.patch('certbot._internal.main._delete_if_appropriate') @mock.patch('certbot._internal.storage.RenewableCert') @mock.patch('certbot._internal.storage.renewal_file_for_certname') - def test_revoke_by_certname_empty_server(self, unused_mock_renewal_file_for_certname, + @mock.patch('certbot._internal.client.acme_from_config_key') + @mock.patch('certbot._internal.cli.set_by_cli') + def test_revoke_by_certname_empty_server(self, mock_set_by_cli, mock_acme_from_config, + unused_mock_renewal_file_for_certname, mock_cert, mock_delete_if_appropriate): """Revoking with --cert-name where the lineage server is empty shouldn't crash """ mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path, server=None) args = 'revoke --cert-name=example.com'.split() mock_delete_if_appropriate.return_value = False self._call(args) - self.mock_acme_client.assert_called_once_with( - mock.ANY, mock.ANY, constants.CLI_DEFAULTS['server']) + self.assertEqual(mock_acme_from_config.call_args_list[0][0][0].server, + constants.CLI_DEFAULTS['server']) self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path) @mock.patch('certbot._internal.main._delete_if_appropriate') @@ -1566,11 +1587,12 @@ class MainTest(test_util.ConfigTestCase): self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH, '--server', server, 'revoke']) with open(RSA2048_KEY_PATH, 'rb') as f: - mock_acme_client.BackwardsCompatibleClientV2.assert_called_once_with( - mock.ANY, jose.JWK.load(f.read()), server) + self.assertEqual(mock_acme_client.ClientV2.call_count, 1) + self.assertEqual(mock_acme_client.ClientNetwork.call_args[0][0], + jose.JWK.load(f.read())) with open(SS_CERT_PATH, 'rb') as f: cert = crypto_util.pyopenssl_load_certificate(f.read())[0] - mock_revoke = mock_acme_client.BackwardsCompatibleClientV2().revoke + mock_revoke = mock_acme_client.ClientV2().revoke mock_revoke.assert_called_once_with( jose.ComparableX509(cert), mock.ANY)