From c678652b3c8e8c953858796422cbcada170d0b06 Mon Sep 17 00:00:00 2001 From: Adrien Ferrand Date: Wed, 27 Oct 2021 01:02:59 +0200 Subject: [PATCH] Add generic methods to save some casts, and fix lint --- acme/acme/challenges.py | 20 +++++++++++++++++--- acme/acme/client.py | 29 ++++++++++++++--------------- acme/acme/messages.py | 29 ++++++++++++++++++++++------- 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 884c76789..250a42845 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -12,6 +12,8 @@ from typing import Mapping from typing import Optional from typing import Tuple from typing import Type +from typing import TypeVar +from typing import Union from cryptography.hazmat.primitives import hashes import josepy as jose @@ -27,14 +29,26 @@ from acme.mixins import TypeMixin logger = logging.getLogger(__name__) +T = TypeVar('T', bound='_JSONObjectWithFields') +R = TypeVar('R', bound='Challenge') -class Challenge(jose.TypedJSONObjectWithFields): + +# TODO: Remove this class once JSONObjectWithFields in josepy becomes generic. +class _JSONObjectWithFields(jose.JSONObjectWithFields): + """Generic version of jose.JSONObjectWithFields""" + + @classmethod + def from_json(cls: Type[T], jobj: Mapping[str, Any]) -> T: + return cast(T, super().from_json(jobj)) + + +class Challenge(_JSONObjectWithFields): # _fields_to_partial_json """ACME challenge.""" TYPES: Dict[str, Type['Challenge']] = {} @classmethod - def from_json(cls, jobj: Mapping[str, Any]) -> jose.TypedJSONObjectWithFields: + def from_json(cls: Type[R], jobj: Mapping[str, Any]) -> Union[R, 'UnrecognizedChallenge']: try: return super().from_json(jobj) except jose.UnrecognizedTypeError as error: @@ -42,7 +56,7 @@ class Challenge(jose.TypedJSONObjectWithFields): return UnrecognizedChallenge.from_json(jobj) -class ChallengeResponse(ResourceMixin, TypeMixin, jose.TypedJSONObjectWithFields): +class ChallengeResponse(ResourceMixin, TypeMixin, _JSONObjectWithFields): # _fields_to_partial_json """ACME challenge response.""" TYPES: Dict[str, Type['ChallengeResponse']] = {} diff --git a/acme/acme/client.py b/acme/acme/client.py index 397f30b5f..2380f3e0e 100644 --- a/acme/acme/client.py +++ b/acme/acme/client.py @@ -326,8 +326,7 @@ class Client(ClientBase): """ return self.update_registration( - cast(messages.RegistrationResource, regr.update( - body=regr.body.update(agreement=regr.terms_of_service)))) + regr.update(body=regr.body.update(agreement=regr.terms_of_service))) def request_challenges(self, identifier: messages.Identifier, new_authzr_uri: Optional[str] = None) -> messages.AuthorizationResource: @@ -544,7 +543,7 @@ class Client(ClientBase): raise errors.ClientError('Location header missing') if response.headers['Location'] != certr.uri: raise errors.UnexpectedUpdate(response.text) - return cast(messages.CertificateResource, certr.update(body=cert)) + return certr.update(body=cert) def refresh(self, certr: messages.CertificateResource) -> messages.CertificateResource: """Refresh certificate. @@ -677,7 +676,7 @@ class ClientV2(ClientBase): only_existing_reg = regr.body.update(only_return_existing=True) response = self._post(self.directory['newAccount'], only_existing_reg) updated_uri = response.headers['Location'] - new_regr = cast(messages.RegistrationResource, regr.update(uri=updated_uri)) + new_regr = regr.update(uri=updated_uri) self.net.account = new_regr return new_regr @@ -703,12 +702,12 @@ class ClientV2(ClientBase): value=ips)) order = messages.NewOrder(identifiers=identifiers) response = self._post(self.directory['newOrder'], order) - body = cast(messages.Order, messages.Order.from_json(response.json())) + body = messages.Order.from_json(response.json()) authorizations = [] # pylint has trouble understanding our josepy based objects which use # things like custom metaclass logic. body.authorizations should be a # list of strings containing URLs so let's disable this check here. - for url in body.authorizations: # pylint: disable=not-an-iterable + for url in body.authorizations: # pylint: disable=not-an-iterable,no-member authorizations.append(self._authzr_from_response(self._post_as_get(url), uri=url)) return messages.OrderResource( body=body, @@ -775,7 +774,7 @@ class ClientV2(ClientBase): failed.append(authzr) if failed: raise errors.ValidationError(failed) - return cast(messages.OrderResource, orderr.update(authorizations=responses)) + return orderr.update(authorizations=responses) def finalize_order(self, orderr: messages.OrderResource, deadline: datetime.datetime, fetch_alternative_chains: bool = False) -> messages.OrderResource: @@ -797,11 +796,11 @@ class ClientV2(ClientBase): while datetime.datetime.now() < deadline: time.sleep(1) response = self._post_as_get(orderr.uri) - body = cast(messages.Order, messages.Order.from_json(response.json())) - if body.error is not None: - raise errors.IssuanceError(body.error) - if body.certificate is not None: - certificate_response = self._post_as_get(body.certificate) + body = messages.Order.from_json(response.json()) + if body.error is not None: # pylint: disable=no-member + raise errors.IssuanceError(body.error) # pylint: disable=no-member + if body.certificate is not None: # pylint: disable=no-member + certificate_response = self._post_as_get(body.certificate) # pylint: disable=no-member orderr = orderr.update(body=body, fullchain_pem=certificate_response.text) if fetch_alternative_chains: alt_chains_urls = self._get_links(certificate_response, 'alternate') @@ -909,7 +908,7 @@ class BackwardsCompatibleClientV2: client_v2 = cast(ClientV2, self.client) if "terms_of_service" in client_v2.directory.meta: _assess_tos(client_v2.directory.meta.terms_of_service) - regr = cast(messages.NewRegistration, regr.update(terms_of_service_agreed=True)) + regr = regr.update(terms_of_service_agreed=True) return client_v2.new_account(regr) def new_order(self, csr_pem: bytes) -> messages.OrderResource: @@ -977,7 +976,7 @@ class BackwardsCompatibleClientV2: cast(OpenSSL.crypto.X509, cast(jose.ComparableX509, certr.body).wrapped)).decode() chain_str = crypto_util.dump_pyopenssl_chain(chain).decode() - return cast(messages.OrderResource, orderr.update(fullchain_pem=(cert + chain_str))) + return orderr.update(fullchain_pem=(cert + chain_str)) return cast(ClientV2, self.client).finalize_order( orderr, deadline, fetch_alternative_chains) @@ -1129,7 +1128,7 @@ class ClientNetwork: 'Ignoring wrong Content-Type (%r) for JSON Error', response_ct) try: - raise cast(messages.Error, messages.Error.from_json(jobj)) + raise messages.Error.from_json(jobj) # pylint: disable=raising-non-exception except jose.DeserializationError as error: # Couldn't deserialize JSON object raise errors.ClientError((response, error)) diff --git a/acme/acme/messages.py b/acme/acme/messages.py index b55340780..d1482be34 100644 --- a/acme/acme/messages.py +++ b/acme/acme/messages.py @@ -8,9 +8,10 @@ from typing import Iterator from typing import List from typing import Mapping from typing import MutableMapping +from typing import Optional from typing import Tuple from typing import Type -from typing import Optional +from typing import TypeVar from typing import Union import josepy as jose @@ -64,6 +65,20 @@ ERROR_TYPE_DESCRIPTIONS = dict( ERROR_TYPE_DESCRIPTIONS.update(dict( # add errors with old prefix, deprecate me (OLD_ERROR_PREFIX + name, desc) for name, desc in ERROR_CODES.items())) +T = TypeVar('T', bound='_JSONObjectWithFields') + + +# TODO: Remove this class once JSONObjectWithFields and ImmutableMap in josepy becomes generic. +class _JSONObjectWithFields(jose.JSONObjectWithFields): + """Generic version of jose.JSONObjectWithFields""" + + @classmethod + def from_json(cls: Type[T], jobj: Mapping[str, Any]) -> T: + return cast(T, super().from_json(jobj)) + + def update(self: T, **kwargs: Any) -> T: + return cast(T, super().update(**kwargs)) + def is_acme_error(err: BaseException) -> bool: """Check if argument is an ACME error.""" @@ -72,7 +87,7 @@ def is_acme_error(err: BaseException) -> bool: return False -class Error(jose.JSONObjectWithFields, errors.Error): +class Error(_JSONObjectWithFields, errors.Error): """ACME error. https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00 @@ -188,7 +203,7 @@ IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder IDENTIFIER_IP = IdentifierType('ip') # IdentifierIP in pebble - not in Boulder yet -class Identifier(jose.JSONObjectWithFields): +class Identifier(_JSONObjectWithFields): """ACME identifier. :ivar IdentifierType typ: @@ -273,7 +288,7 @@ class Directory(jose.JSONDeSerializable): return cls(jobj) -class Resource(jose.JSONObjectWithFields): +class Resource(_JSONObjectWithFields): """ACME Resource. :ivar acme.messages.ResourceBody body: Resource body. @@ -291,7 +306,7 @@ class ResourceWithURI(Resource): uri: str = jose.field('uri') # no ChallengeResource.uri -class ResourceBody(jose.JSONObjectWithFields): +class ResourceBody(_JSONObjectWithFields): """ACME Resource Body.""" @@ -558,7 +573,7 @@ class Authorization(ResourceBody): # that challenge is redefined. Let's ignore the type check here. @challenges.decoder # type: ignore def challenges(value: List[Dict[str, Any]]) -> Tuple[ChallengeBody, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring - return tuple(cast(ChallengeBody, ChallengeBody.from_json(chall)) for chall in value) + return tuple(ChallengeBody.from_json(chall) for chall in value) @property def resolved_combinations(self) -> Tuple[Tuple[ChallengeBody, ...], ...]: @@ -658,7 +673,7 @@ class Order(ResourceBody): # that identifiers is redefined. Let's ignore the type check here. @identifiers.decoder # type: ignore def identifiers(value: List[Dict[str, Any]]) -> Tuple[Identifier, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring - return tuple(cast(Identifier, Identifier.from_json(identifier)) for identifier in value) + return tuple(Identifier.from_json(identifier) for identifier in value) class OrderResource(ResourceWithURI):