From 92d666da2ca71c96b5144a691969026a7fa7a111 Mon Sep 17 00:00:00 2001 From: Adrien Ferrand Date: Wed, 27 Oct 2021 00:01:03 +0200 Subject: [PATCH] Finish new typing --- acme/acme/challenges.py | 23 ++++++---- acme/acme/client.py | 44 ++++++++++-------- acme/acme/crypto_util.py | 5 +- acme/acme/messages.py | 69 ++++++++++++++++------------ certbot/certbot/_internal/account.py | 4 +- 5 files changed, 82 insertions(+), 63 deletions(-) diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index de842a261..e20118987 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -34,7 +34,7 @@ class Challenge(jose.TypedJSONObjectWithFields): TYPES: Dict[str, Type['Challenge']] = {} @classmethod - def from_json(cls, jobj: Mapping[str, Any]) -> 'Challenge': + def from_json(cls, jobj: Mapping[str, Any]) -> jose.TypedJSONObjectWithFields: try: return super().from_json(jobj) except jose.UnrecognizedTypeError as error: @@ -62,6 +62,7 @@ class UnrecognizedChallenge(Challenge): :ivar jobj: Original JSON decoded object. """ + jobj: Dict[str, Any] def __init__(self, jobj: Mapping[str, Any]) -> None: super().__init__() @@ -85,7 +86,7 @@ class _TokenChallenge(Challenge): """Minimum size of the :attr:`token` in bytes.""" # TODO: acme-spec doesn't specify token as base64-encoded value - token: bytes = jose.Field( + token: bytes = jose.field( "token", encoder=jose.encode_b64jose, decoder=functools.partial( jose.decode_b64jose, size=TOKEN_SIZE, minimum=True)) @@ -108,10 +109,10 @@ class _TokenChallenge(Challenge): class KeyAuthorizationChallengeResponse(ChallengeResponse): """Response to Challenges based on Key Authorization. - :param unicode key_authorization: + :param str key_authorization: """ - key_authorization = jose.Field("keyAuthorization") + key_authorization: str = jose.field("keyAuthorization") thumbprint_hash_function = hashes.SHA256 def verify(self, chall: 'KeyAuthorizationChallenge', account_public_key: jose.JWK) -> bool: @@ -523,7 +524,7 @@ class TLSALPN01(KeyAuthorizationChallenge): """Generate validation. :param JWK account_key: - :param unicode domain: Domain verified by the challenge. + :param str domain: Domain verified by the challenge. :param OpenSSL.crypto.PKey cert_key: Optional private key used in certificate generation. If not provided (``None``), then fresh key will be generated. @@ -531,9 +532,12 @@ class TLSALPN01(KeyAuthorizationChallenge): :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey` """ - return self.response(account_key).gen_cert( + domain = kwargs.get('domain') + if not isinstance(domain, str): + raise errors.Error("Parameter domain should be a string.") + return cast(TLSALPN01Response, self.response(account_key)).gen_cert( key=kwargs.get('cert_key'), - domain=kwargs.get('domain')) + domain=domain) @staticmethod def is_supported() -> bool: @@ -599,8 +603,7 @@ class DNS(_TokenChallenge): :rtype: DNSResponse """ - return DNSResponse(validation=self.gen_validation( - account_key, **kwargs)) + return DNSResponse(validation=self.gen_validation(account_key, **kwargs)) def validation_domain_name(self, name: str) -> str: """Domain name for TXT validation record. @@ -620,7 +623,7 @@ class DNSResponse(ChallengeResponse): """ typ = "dns" - validation = jose.Field("validation", decoder=jose.JWS.from_json) + validation: jose.JWS = jose.field("validation", decoder=jose.JWS.from_json) def check_validation(self, chall: 'DNS', account_public_key: jose.JWK) -> bool: """Check validation. diff --git a/acme/acme/client.py b/acme/acme/client.py index ae2f261aa..f74719211 100644 --- a/acme/acme/client.py +++ b/acme/acme/client.py @@ -19,6 +19,7 @@ from typing import cast from typing import Dict from typing import Iterable from typing import List +from typing import Mapping from typing import Optional from typing import Set from typing import Text @@ -130,8 +131,9 @@ class ClientBase: :rtype: `.RegistrationResource` """ - return self.update_registration(regr, messages.Registration.from_json( - {"status": "deactivated", "contact": None})) + return self.update_registration(regr, cast(messages.Registration, + messages.Registration.from_json( + {"status": "deactivated", "contact": None}))) def deactivate_authorization(self, authzr: messages.AuthorizationResource @@ -324,7 +326,8 @@ class Client(ClientBase): """ return self.update_registration( - regr.update(body=regr.body.update(agreement=regr.terms_of_service))) + cast(messages.RegistrationResource, regr.update( + body=regr.body.update(agreement=regr.terms_of_service)))) def request_challenges(self, identifier: messages.Identifier, new_authzr_uri: Optional[str] = None) -> messages.AuthorizationResource: @@ -541,7 +544,7 @@ class Client(ClientBase): raise errors.ClientError('Location header missing') if response.headers['Location'] != certr.uri: raise errors.UnexpectedUpdate(response.text) - return certr.update(body=cert) + return cast(messages.CertificateResource, certr.update(body=cert)) def refresh(self, certr: messages.CertificateResource) -> messages.CertificateResource: """Refresh certificate. @@ -674,7 +677,7 @@ class ClientV2(ClientBase): only_existing_reg = regr.body.update(only_return_existing=True) response = self._post(self.directory['newAccount'], only_existing_reg) updated_uri = response.headers['Location'] - new_regr = regr.update(uri=updated_uri) + new_regr = cast(messages.RegistrationResource, regr.update(uri=updated_uri)) self.net.account = new_regr return new_regr @@ -700,7 +703,7 @@ class ClientV2(ClientBase): value=ips)) order = messages.NewOrder(identifiers=identifiers) response = self._post(self.directory['newOrder'], order) - body = messages.Order.from_json(response.json()) + body = cast(messages.Order, messages.Order.from_json(response.json())) authorizations = [] # pylint has trouble understanding our josepy based objects which use # things like custom metaclass logic. body.authorizations should be a @@ -772,7 +775,7 @@ class ClientV2(ClientBase): failed.append(authzr) if failed: raise errors.ValidationError(failed) - return orderr.update(authorizations=responses) + return cast(messages.OrderResource, orderr.update(authorizations=responses)) def finalize_order(self, orderr: messages.OrderResource, deadline: datetime.datetime, fetch_alternative_chains: bool = False) -> messages.OrderResource: @@ -794,7 +797,7 @@ class ClientV2(ClientBase): while datetime.datetime.now() < deadline: time.sleep(1) response = self._post_as_get(orderr.uri) - body = messages.Order.from_json(response.json()) + body = cast(messages.Order, messages.Order.from_json(response.json())) if body.error is not None: raise errors.IssuanceError(body.error) if body.certificate is not None: @@ -897,16 +900,16 @@ class BackwardsCompatibleClientV2: check_tos_cb(tos) if self.acme_version == 1: client_v1 = cast(Client, self.client) - regr = client_v1.register(regr) - if regr.terms_of_service is not None: - _assess_tos(regr.terms_of_service) - return client_v1.agree_to_tos(regr) - return regr + regr_res = client_v1.register(regr) + if regr_res.terms_of_service is not None: + _assess_tos(regr_res.terms_of_service) + return client_v1.agree_to_tos(regr_res) + return regr_res else: client_v2 = cast(ClientV2, self.client) if "terms_of_service" in client_v2.directory.meta: _assess_tos(client_v2.directory.meta.terms_of_service) - regr = regr.update(terms_of_service_agreed=True) + regr = cast(messages.NewRegistration, regr.update(terms_of_service_agreed=True)) return client_v2.new_account(regr) def new_order(self, csr_pem: bytes) -> messages.OrderResource: @@ -970,10 +973,11 @@ class BackwardsCompatibleClientV2: 'certificate, please rerun the command for a new one.') cert = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped).decode() + OpenSSL.crypto.FILETYPE_PEM, + cast(OpenSSL.crypto.X509, cast(jose.ComparableX509, certr.body).wrapped)).decode() chain_str = crypto_util.dump_pyopenssl_chain(chain).decode() - return orderr.update(fullchain_pem=(cert + chain_str)) + return cast(messages.OrderResource, orderr.update(fullchain_pem=(cert + chain_str))) return cast(ClientV2, self.client).finalize_order( orderr, deadline, fetch_alternative_chains) @@ -1056,7 +1060,7 @@ class ClientNetwork: pass def _wrap_in_jws(self, obj: jose.JSONDeSerializable, nonce: str, url: str, - acme_version: int) -> jose.JWS: + acme_version: int) -> str: """Wrap `JSONDeSerializable` object in JWS. .. todo:: Implement ``acmePath``. @@ -1064,7 +1068,7 @@ class ClientNetwork: :param josepy.JSONDeSerializable obj: :param str url: The URL to which this object will be POSTed :param str nonce: - :rtype: `josepy.JWS` + :rtype: str """ if isinstance(obj, VersionedLEACMEMixin): @@ -1082,7 +1086,7 @@ class ClientNetwork: if self.account is not None: kwargs["kid"] = self.account["uri"] kwargs["key"] = self.key - return jws.JWS.sign(jobj, **kwargs).json_dumps(indent=2) + return jws.JWS.sign(jobj, **cast(Mapping[str, Any], kwargs)).json_dumps(indent=2) @classmethod def _check_response(cls, response: requests.Response, @@ -1125,7 +1129,7 @@ class ClientNetwork: 'Ignoring wrong Content-Type (%r) for JSON Error', response_ct) try: - raise messages.Error.from_json(jobj) + raise cast(messages.Error, messages.Error.from_json(jobj)) except jose.DeserializationError as error: # Couldn't deserialize JSON object raise errors.ClientError((response, error)) diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index 681dd18b1..99e11ee7c 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -410,7 +410,8 @@ def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None, return cert -def dump_pyopenssl_chain(chain: List[crypto.X509], filetype: int = crypto.FILETYPE_PEM) -> bytes: +def dump_pyopenssl_chain(chain: Union[List[jose.ComparableX509], List[crypto.X509]], + filetype: int = crypto.FILETYPE_PEM) -> bytes: """Dump certificate chain into a bundle. :param list chain: List of `OpenSSL.crypto.X509` (or wrapped in @@ -425,6 +426,8 @@ def dump_pyopenssl_chain(chain: List[crypto.X509], filetype: int = crypto.FILETY def _dump_cert(cert: Union[jose.ComparableX509, crypto.X509]) -> bytes: if isinstance(cert, jose.ComparableX509): + if isinstance(cert.wrapped, crypto.X509Req): + raise errors.Error("Unexpected certificate signing request provided.") cert = cert.wrapped return crypto.dump_certificate(filetype, cert) diff --git a/acme/acme/messages.py b/acme/acme/messages.py index 0043a68e9..f29ecc057 100644 --- a/acme/acme/messages.py +++ b/acme/acme/messages.py @@ -1,6 +1,7 @@ """ACME protocol messages.""" from collections.abc import Hashable import json +from typing import cast from typing import Any from typing import Dict from typing import Iterator @@ -10,6 +11,7 @@ from typing import MutableMapping from typing import Tuple from typing import Type from typing import Optional +from typing import Union import josepy as jose @@ -98,7 +100,7 @@ class Error(jose.JSONObjectWithFields, errors.Error): typ = ERROR_PREFIX + code # Mypy will not understand that the Error constructor accepts a named argument # "typ" because of josepy magic. Let's ignore the type check here. - return cls(typ=typ, **kwargs) # type: ignore + return cls(typ=typ, **kwargs) @property def description(self) -> Optional[str]: @@ -164,7 +166,7 @@ class _Constant(jose.JSONDeSerializable, Hashable): class Status(_Constant): """ACME "status" field.""" - POSSIBLE_NAMES: Dict[str, 'Status'] = {} + POSSIBLE_NAMES: Dict[str, _Constant] = {} STATUS_UNKNOWN = Status('unknown') @@ -179,7 +181,7 @@ STATUS_DEACTIVATED = Status('deactivated') class IdentifierType(_Constant): """ACME identifier type.""" - POSSIBLE_NAMES: Dict[str, 'IdentifierType'] = {} + POSSIBLE_NAMES: Dict[str, _Constant] = {} IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder @@ -200,15 +202,15 @@ class Identifier(jose.JSONObjectWithFields): class Directory(jose.JSONDeSerializable): """Directory.""" - _REGISTERED_TYPES: Dict[str, Type['Directory']] = {} + _REGISTERED_TYPES: Dict[str, Type[jose.JSONObjectWithFields]] = {} class Meta(jose.JSONObjectWithFields): """Directory Meta.""" - _terms_of_service = jose.Field('terms-of-service', omitempty=True) - _terms_of_service_v2 = jose.Field('termsOfService', omitempty=True) - website = jose.Field('website', omitempty=True) - caa_identities = jose.Field('caaIdentities', omitempty=True) - external_account_required = jose.Field('externalAccountRequired', omitempty=True) + _terms_of_service: str = jose.field('terms-of-service', omitempty=True) + _terms_of_service_v2: str = jose.field('termsOfService', omitempty=True) + website: str = jose.field('website', omitempty=True) + caa_identities: List[str] = jose.field('caaIdentities', omitempty=True) + external_account_required: bool = jose.field('externalAccountRequired', omitempty=True) def __init__(self, **kwargs: Any) -> None: kwargs = {self._internal_name(k): v for k, v in kwargs.items()} @@ -229,13 +231,17 @@ class Directory(jose.JSONDeSerializable): return '_' + name if name == 'terms_of_service' else name @classmethod - def _canon_key(cls, key: str) -> str: - return getattr(key, 'resource_type', key) + def _canon_key(cls, key: Union[jose.JSONObjectWithFields, str]) -> str: + return key if isinstance(key, str) else getattr(key, 'resource_type') @classmethod - def register(cls, resource_body_cls: Type['Directory']) -> Type['Directory']: + def register(cls, resource_body_cls: Type[jose.JSONObjectWithFields] + ) -> Type[jose.JSONObjectWithFields]: """Register resource.""" - resource_type = resource_body_cls.resource_type + resource_type = getattr(resource_body_cls, 'resource_type') + if not resource_type: + raise errors.Error(f'Error, current resource {resource_body_cls} ' + f'do not declare a resource_type field.') assert resource_type not in cls._REGISTERED_TYPES cls._REGISTERED_TYPES[resource_type] = resource_body_cls return resource_body_cls @@ -252,7 +258,7 @@ class Directory(jose.JSONDeSerializable): except KeyError as error: raise AttributeError(str(error)) - def __getitem__(self, name: str) -> Any: + def __getitem__(self, name: Union[str, jose.JSONObjectWithFields]) -> Any: try: return self._jobj[self._canon_key(name)] except KeyError: @@ -298,7 +304,8 @@ class ExternalAccountBinding: """Create External Account Binding Resource from contact details, kid and hmac.""" key_json = json.dumps(account_public_key.to_partial_json()).encode() - decoded_hmac_key = jose.b64.b64decode(hmac_key) + # Fix type with TO_DEFINE + decoded_hmac_key = jose.b64.b64decode(hmac_key) # type: ignore url = directory["newAccount"] eab = jws.JWS.sign(key_json, jose.jwk.JWKOct(key=decoded_hmac_key), @@ -313,7 +320,7 @@ class Registration(ResourceBody): :ivar jose.JWK key: Public key. :ivar tuple contact: Contact information following ACME spec, - `tuple` of `unicode`. + `tuple` of `str`. :ivar str agreement: """ @@ -432,8 +439,8 @@ class RegistrationResource(ResourceWithURI): """Registration Resource. :ivar acme.messages.Registration body: - :ivar unicode new_authzr_uri: Deprecated. Do not use. - :ivar unicode terms_of_service: URL for the CA TOS. + :ivar str new_authzr_uri: Deprecated. Do not use. + :ivar str terms_of_service: URL for the CA TOS. """ body: Registration = jose.field('body', decoder=Registration.from_json) @@ -549,11 +556,11 @@ class Authorization(ResourceBody): # Mypy does not understand the josepy magic happening here, and falsely claims # that challenge is redefined. Let's ignore the type check here. @challenges.decoder # type: ignore - def challenges(value: List[Mapping[str, Any]]) -> Tuple[ChallengeBody, ...]: # pylint: disable=no-self-argument,missing-function-docstring - return tuple(ChallengeBody.from_json(chall) for chall in value) + def challenges(value: List[Mapping[str, Any]]) -> Tuple[ChallengeBody, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring + return tuple(cast(ChallengeBody, ChallengeBody.from_json(chall)) for chall in value) @property - def resolved_combinations(self) -> Tuple[Tuple[Dict[str, Any], ...], ...]: + def resolved_combinations(self) -> Tuple[Tuple[ChallengeBody, ...], ...]: """Combinations with challenges instead of indices.""" return tuple(tuple(self.challenges[idx] for idx in combo) for combo in self.combinations) # pylint: disable=not-an-iterable @@ -621,7 +628,7 @@ class Revocation(ResourceMixin, jose.JSONObjectWithFields): resource = fields.Resource(resource_type) certificate: jose.ComparableX509 = jose.field( 'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert) - reason = jose.Field('reason') + reason: str = jose.field('reason') class Order(ResourceBody): @@ -649,29 +656,31 @@ class Order(ResourceBody): # Mypy does not understand the josepy magic happening here, and falsely claims # that identifiers is redefined. Let's ignore the type check here. @identifiers.decoder # type: ignore - def identifiers(value: List[Mapping[str, Any]]) -> Tuple[Identifier, ...]: # pylint: disable=no-self-argument,missing-function-docstring - return tuple(Identifier.from_json(identifier) for identifier in value) + def identifiers(value: List[Mapping[str, Any]]) -> Tuple[Identifier, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring + return tuple(cast(Identifier, Identifier.from_json(identifier)) for identifier in value) class OrderResource(ResourceWithURI): """Order Resource. :ivar acme.messages.Order body: - :ivar str csr_pem: The CSR this Order will be finalized with. + :ivar bytes csr_pem: The CSR this Order will be finalized with. :ivar authorizations: Fully-fetched AuthorizationResource objects. :vartype authorizations: `list` of `acme.messages.AuthorizationResource` - :ivar str fullchain_pem: The fetched contents of the certificate URL + :ivar bytes fullchain_pem: The fetched contents of the certificate URL produced once the order was finalized, if it's present. :ivar alternative_fullchains_pem: The fetched contents of alternative certificate chain URLs produced once the order was finalized, if present and requested during finalization. - :vartype alternative_fullchains_pem: `list` of `str` + :vartype alternative_fullchains_pem: `list` of `bytes` """ body: Order = jose.field('body', decoder=Order.from_json) - csr_pem: str = jose.field('csr_pem', omitempty=True) + csr_pem: bytes = jose.field('csr_pem', omitempty=True) authorizations: List[AuthorizationResource] = jose.field('authorizations') - fullchain_pem: str = jose.field('fullchain_pem', omitempty=True) - alternative_fullchains_pem: List[str] = jose.field('alternative_fullchains_pem', omitempty=True) + fullchain_pem: bytes = jose.field('fullchain_pem', omitempty=True) + alternative_fullchains_pem: List[bytes] = jose.field('alternative_fullchains_pem', + omitempty=True) + @Directory.register class NewOrder(Order): diff --git a/certbot/certbot/_internal/account.py b/certbot/certbot/_internal/account.py index 3d45b8ed6..70ee8ae06 100644 --- a/certbot/certbot/_internal/account.py +++ b/certbot/certbot/_internal/account.py @@ -50,8 +50,8 @@ class Account: """ creation_dt = acme_fields.RFC3339Field("creation_dt") - creation_host = jose.Field("creation_host") - register_to_eff = jose.Field("register_to_eff", omitempty=True) + creation_host: str = jose.field("creation_host") + register_to_eff: str = jose.field("register_to_eff", omitempty=True) def __init__(self, regr, key, meta=None): self.key = key