Finish new typing

This commit is contained in:
Adrien Ferrand 2021-10-27 00:01:03 +02:00
parent 6c353aa4b4
commit 92d666da2c
5 changed files with 82 additions and 63 deletions

View file

@ -34,7 +34,7 @@ class Challenge(jose.TypedJSONObjectWithFields):
TYPES: Dict[str, Type['Challenge']] = {}
@classmethod
def from_json(cls, jobj: Mapping[str, Any]) -> 'Challenge':
def from_json(cls, jobj: Mapping[str, Any]) -> jose.TypedJSONObjectWithFields:
try:
return super().from_json(jobj)
except jose.UnrecognizedTypeError as error:
@ -62,6 +62,7 @@ class UnrecognizedChallenge(Challenge):
:ivar jobj: Original JSON decoded object.
"""
jobj: Dict[str, Any]
def __init__(self, jobj: Mapping[str, Any]) -> None:
super().__init__()
@ -85,7 +86,7 @@ class _TokenChallenge(Challenge):
"""Minimum size of the :attr:`token` in bytes."""
# TODO: acme-spec doesn't specify token as base64-encoded value
token: bytes = jose.Field(
token: bytes = jose.field(
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
@ -108,10 +109,10 @@ class _TokenChallenge(Challenge):
class KeyAuthorizationChallengeResponse(ChallengeResponse):
"""Response to Challenges based on Key Authorization.
:param unicode key_authorization:
:param str key_authorization:
"""
key_authorization = jose.Field("keyAuthorization")
key_authorization: str = jose.field("keyAuthorization")
thumbprint_hash_function = hashes.SHA256
def verify(self, chall: 'KeyAuthorizationChallenge', account_public_key: jose.JWK) -> bool:
@ -523,7 +524,7 @@ class TLSALPN01(KeyAuthorizationChallenge):
"""Generate validation.
:param JWK account_key:
:param unicode domain: Domain verified by the challenge.
:param str domain: Domain verified by the challenge.
:param OpenSSL.crypto.PKey cert_key: Optional private key used
in certificate generation. If not provided (``None``), then
fresh key will be generated.
@ -531,9 +532,12 @@ class TLSALPN01(KeyAuthorizationChallenge):
:rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
"""
return self.response(account_key).gen_cert(
domain = kwargs.get('domain')
if not isinstance(domain, str):
raise errors.Error("Parameter domain should be a string.")
return cast(TLSALPN01Response, self.response(account_key)).gen_cert(
key=kwargs.get('cert_key'),
domain=kwargs.get('domain'))
domain=domain)
@staticmethod
def is_supported() -> bool:
@ -599,8 +603,7 @@ class DNS(_TokenChallenge):
:rtype: DNSResponse
"""
return DNSResponse(validation=self.gen_validation(
account_key, **kwargs))
return DNSResponse(validation=self.gen_validation(account_key, **kwargs))
def validation_domain_name(self, name: str) -> str:
"""Domain name for TXT validation record.
@ -620,7 +623,7 @@ class DNSResponse(ChallengeResponse):
"""
typ = "dns"
validation = jose.Field("validation", decoder=jose.JWS.from_json)
validation: jose.JWS = jose.field("validation", decoder=jose.JWS.from_json)
def check_validation(self, chall: 'DNS', account_public_key: jose.JWK) -> bool:
"""Check validation.

View file

@ -19,6 +19,7 @@ from typing import cast
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Optional
from typing import Set
from typing import Text
@ -130,8 +131,9 @@ class ClientBase:
:rtype: `.RegistrationResource`
"""
return self.update_registration(regr, messages.Registration.from_json(
{"status": "deactivated", "contact": None}))
return self.update_registration(regr, cast(messages.Registration,
messages.Registration.from_json(
{"status": "deactivated", "contact": None})))
def deactivate_authorization(self,
authzr: messages.AuthorizationResource
@ -324,7 +326,8 @@ class Client(ClientBase):
"""
return self.update_registration(
regr.update(body=regr.body.update(agreement=regr.terms_of_service)))
cast(messages.RegistrationResource, regr.update(
body=regr.body.update(agreement=regr.terms_of_service))))
def request_challenges(self, identifier: messages.Identifier,
new_authzr_uri: Optional[str] = None) -> messages.AuthorizationResource:
@ -541,7 +544,7 @@ class Client(ClientBase):
raise errors.ClientError('Location header missing')
if response.headers['Location'] != certr.uri:
raise errors.UnexpectedUpdate(response.text)
return certr.update(body=cert)
return cast(messages.CertificateResource, certr.update(body=cert))
def refresh(self, certr: messages.CertificateResource) -> messages.CertificateResource:
"""Refresh certificate.
@ -674,7 +677,7 @@ class ClientV2(ClientBase):
only_existing_reg = regr.body.update(only_return_existing=True)
response = self._post(self.directory['newAccount'], only_existing_reg)
updated_uri = response.headers['Location']
new_regr = regr.update(uri=updated_uri)
new_regr = cast(messages.RegistrationResource, regr.update(uri=updated_uri))
self.net.account = new_regr
return new_regr
@ -700,7 +703,7 @@ class ClientV2(ClientBase):
value=ips))
order = messages.NewOrder(identifiers=identifiers)
response = self._post(self.directory['newOrder'], order)
body = messages.Order.from_json(response.json())
body = cast(messages.Order, messages.Order.from_json(response.json()))
authorizations = []
# pylint has trouble understanding our josepy based objects which use
# things like custom metaclass logic. body.authorizations should be a
@ -772,7 +775,7 @@ class ClientV2(ClientBase):
failed.append(authzr)
if failed:
raise errors.ValidationError(failed)
return orderr.update(authorizations=responses)
return cast(messages.OrderResource, orderr.update(authorizations=responses))
def finalize_order(self, orderr: messages.OrderResource, deadline: datetime.datetime,
fetch_alternative_chains: bool = False) -> messages.OrderResource:
@ -794,7 +797,7 @@ class ClientV2(ClientBase):
while datetime.datetime.now() < deadline:
time.sleep(1)
response = self._post_as_get(orderr.uri)
body = messages.Order.from_json(response.json())
body = cast(messages.Order, messages.Order.from_json(response.json()))
if body.error is not None:
raise errors.IssuanceError(body.error)
if body.certificate is not None:
@ -897,16 +900,16 @@ class BackwardsCompatibleClientV2:
check_tos_cb(tos)
if self.acme_version == 1:
client_v1 = cast(Client, self.client)
regr = client_v1.register(regr)
if regr.terms_of_service is not None:
_assess_tos(regr.terms_of_service)
return client_v1.agree_to_tos(regr)
return regr
regr_res = client_v1.register(regr)
if regr_res.terms_of_service is not None:
_assess_tos(regr_res.terms_of_service)
return client_v1.agree_to_tos(regr_res)
return regr_res
else:
client_v2 = cast(ClientV2, self.client)
if "terms_of_service" in client_v2.directory.meta:
_assess_tos(client_v2.directory.meta.terms_of_service)
regr = regr.update(terms_of_service_agreed=True)
regr = cast(messages.NewRegistration, regr.update(terms_of_service_agreed=True))
return client_v2.new_account(regr)
def new_order(self, csr_pem: bytes) -> messages.OrderResource:
@ -970,10 +973,11 @@ class BackwardsCompatibleClientV2:
'certificate, please rerun the command for a new one.')
cert = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped).decode()
OpenSSL.crypto.FILETYPE_PEM,
cast(OpenSSL.crypto.X509, cast(jose.ComparableX509, certr.body).wrapped)).decode()
chain_str = crypto_util.dump_pyopenssl_chain(chain).decode()
return orderr.update(fullchain_pem=(cert + chain_str))
return cast(messages.OrderResource, orderr.update(fullchain_pem=(cert + chain_str)))
return cast(ClientV2, self.client).finalize_order(
orderr, deadline, fetch_alternative_chains)
@ -1056,7 +1060,7 @@ class ClientNetwork:
pass
def _wrap_in_jws(self, obj: jose.JSONDeSerializable, nonce: str, url: str,
acme_version: int) -> jose.JWS:
acme_version: int) -> str:
"""Wrap `JSONDeSerializable` object in JWS.
.. todo:: Implement ``acmePath``.
@ -1064,7 +1068,7 @@ class ClientNetwork:
:param josepy.JSONDeSerializable obj:
:param str url: The URL to which this object will be POSTed
:param str nonce:
:rtype: `josepy.JWS`
:rtype: str
"""
if isinstance(obj, VersionedLEACMEMixin):
@ -1082,7 +1086,7 @@ class ClientNetwork:
if self.account is not None:
kwargs["kid"] = self.account["uri"]
kwargs["key"] = self.key
return jws.JWS.sign(jobj, **kwargs).json_dumps(indent=2)
return jws.JWS.sign(jobj, **cast(Mapping[str, Any], kwargs)).json_dumps(indent=2)
@classmethod
def _check_response(cls, response: requests.Response,
@ -1125,7 +1129,7 @@ class ClientNetwork:
'Ignoring wrong Content-Type (%r) for JSON Error',
response_ct)
try:
raise messages.Error.from_json(jobj)
raise cast(messages.Error, messages.Error.from_json(jobj))
except jose.DeserializationError as error:
# Couldn't deserialize JSON object
raise errors.ClientError((response, error))

View file

@ -410,7 +410,8 @@ def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
return cert
def dump_pyopenssl_chain(chain: List[crypto.X509], filetype: int = crypto.FILETYPE_PEM) -> bytes:
def dump_pyopenssl_chain(chain: Union[List[jose.ComparableX509], List[crypto.X509]],
filetype: int = crypto.FILETYPE_PEM) -> bytes:
"""Dump certificate chain into a bundle.
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
@ -425,6 +426,8 @@ def dump_pyopenssl_chain(chain: List[crypto.X509], filetype: int = crypto.FILETY
def _dump_cert(cert: Union[jose.ComparableX509, crypto.X509]) -> bytes:
if isinstance(cert, jose.ComparableX509):
if isinstance(cert.wrapped, crypto.X509Req):
raise errors.Error("Unexpected certificate signing request provided.")
cert = cert.wrapped
return crypto.dump_certificate(filetype, cert)

View file

@ -1,6 +1,7 @@
"""ACME protocol messages."""
from collections.abc import Hashable
import json
from typing import cast
from typing import Any
from typing import Dict
from typing import Iterator
@ -10,6 +11,7 @@ from typing import MutableMapping
from typing import Tuple
from typing import Type
from typing import Optional
from typing import Union
import josepy as jose
@ -98,7 +100,7 @@ class Error(jose.JSONObjectWithFields, errors.Error):
typ = ERROR_PREFIX + code
# Mypy will not understand that the Error constructor accepts a named argument
# "typ" because of josepy magic. Let's ignore the type check here.
return cls(typ=typ, **kwargs) # type: ignore
return cls(typ=typ, **kwargs)
@property
def description(self) -> Optional[str]:
@ -164,7 +166,7 @@ class _Constant(jose.JSONDeSerializable, Hashable):
class Status(_Constant):
"""ACME "status" field."""
POSSIBLE_NAMES: Dict[str, 'Status'] = {}
POSSIBLE_NAMES: Dict[str, _Constant] = {}
STATUS_UNKNOWN = Status('unknown')
@ -179,7 +181,7 @@ STATUS_DEACTIVATED = Status('deactivated')
class IdentifierType(_Constant):
"""ACME identifier type."""
POSSIBLE_NAMES: Dict[str, 'IdentifierType'] = {}
POSSIBLE_NAMES: Dict[str, _Constant] = {}
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
@ -200,15 +202,15 @@ class Identifier(jose.JSONObjectWithFields):
class Directory(jose.JSONDeSerializable):
"""Directory."""
_REGISTERED_TYPES: Dict[str, Type['Directory']] = {}
_REGISTERED_TYPES: Dict[str, Type[jose.JSONObjectWithFields]] = {}
class Meta(jose.JSONObjectWithFields):
"""Directory Meta."""
_terms_of_service = jose.Field('terms-of-service', omitempty=True)
_terms_of_service_v2 = jose.Field('termsOfService', omitempty=True)
website = jose.Field('website', omitempty=True)
caa_identities = jose.Field('caaIdentities', omitempty=True)
external_account_required = jose.Field('externalAccountRequired', omitempty=True)
_terms_of_service: str = jose.field('terms-of-service', omitempty=True)
_terms_of_service_v2: str = jose.field('termsOfService', omitempty=True)
website: str = jose.field('website', omitempty=True)
caa_identities: List[str] = jose.field('caaIdentities', omitempty=True)
external_account_required: bool = jose.field('externalAccountRequired', omitempty=True)
def __init__(self, **kwargs: Any) -> None:
kwargs = {self._internal_name(k): v for k, v in kwargs.items()}
@ -229,13 +231,17 @@ class Directory(jose.JSONDeSerializable):
return '_' + name if name == 'terms_of_service' else name
@classmethod
def _canon_key(cls, key: str) -> str:
return getattr(key, 'resource_type', key)
def _canon_key(cls, key: Union[jose.JSONObjectWithFields, str]) -> str:
return key if isinstance(key, str) else getattr(key, 'resource_type')
@classmethod
def register(cls, resource_body_cls: Type['Directory']) -> Type['Directory']:
def register(cls, resource_body_cls: Type[jose.JSONObjectWithFields]
) -> Type[jose.JSONObjectWithFields]:
"""Register resource."""
resource_type = resource_body_cls.resource_type
resource_type = getattr(resource_body_cls, 'resource_type')
if not resource_type:
raise errors.Error(f'Error, current resource {resource_body_cls} '
f'do not declare a resource_type field.')
assert resource_type not in cls._REGISTERED_TYPES
cls._REGISTERED_TYPES[resource_type] = resource_body_cls
return resource_body_cls
@ -252,7 +258,7 @@ class Directory(jose.JSONDeSerializable):
except KeyError as error:
raise AttributeError(str(error))
def __getitem__(self, name: str) -> Any:
def __getitem__(self, name: Union[str, jose.JSONObjectWithFields]) -> Any:
try:
return self._jobj[self._canon_key(name)]
except KeyError:
@ -298,7 +304,8 @@ class ExternalAccountBinding:
"""Create External Account Binding Resource from contact details, kid and hmac."""
key_json = json.dumps(account_public_key.to_partial_json()).encode()
decoded_hmac_key = jose.b64.b64decode(hmac_key)
# Fix type with TO_DEFINE
decoded_hmac_key = jose.b64.b64decode(hmac_key) # type: ignore
url = directory["newAccount"]
eab = jws.JWS.sign(key_json, jose.jwk.JWKOct(key=decoded_hmac_key),
@ -313,7 +320,7 @@ class Registration(ResourceBody):
:ivar jose.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec,
`tuple` of `unicode`.
`tuple` of `str`.
:ivar str agreement:
"""
@ -432,8 +439,8 @@ class RegistrationResource(ResourceWithURI):
"""Registration Resource.
:ivar acme.messages.Registration body:
:ivar unicode new_authzr_uri: Deprecated. Do not use.
:ivar unicode terms_of_service: URL for the CA TOS.
:ivar str new_authzr_uri: Deprecated. Do not use.
:ivar str terms_of_service: URL for the CA TOS.
"""
body: Registration = jose.field('body', decoder=Registration.from_json)
@ -549,11 +556,11 @@ class Authorization(ResourceBody):
# Mypy does not understand the josepy magic happening here, and falsely claims
# that challenge is redefined. Let's ignore the type check here.
@challenges.decoder # type: ignore
def challenges(value: List[Mapping[str, Any]]) -> Tuple[ChallengeBody, ...]: # pylint: disable=no-self-argument,missing-function-docstring
return tuple(ChallengeBody.from_json(chall) for chall in value)
def challenges(value: List[Mapping[str, Any]]) -> Tuple[ChallengeBody, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring
return tuple(cast(ChallengeBody, ChallengeBody.from_json(chall)) for chall in value)
@property
def resolved_combinations(self) -> Tuple[Tuple[Dict[str, Any], ...], ...]:
def resolved_combinations(self) -> Tuple[Tuple[ChallengeBody, ...], ...]:
"""Combinations with challenges instead of indices."""
return tuple(tuple(self.challenges[idx] for idx in combo)
for combo in self.combinations) # pylint: disable=not-an-iterable
@ -621,7 +628,7 @@ class Revocation(ResourceMixin, jose.JSONObjectWithFields):
resource = fields.Resource(resource_type)
certificate: jose.ComparableX509 = jose.field(
'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert)
reason = jose.Field('reason')
reason: str = jose.field('reason')
class Order(ResourceBody):
@ -649,29 +656,31 @@ class Order(ResourceBody):
# Mypy does not understand the josepy magic happening here, and falsely claims
# that identifiers is redefined. Let's ignore the type check here.
@identifiers.decoder # type: ignore
def identifiers(value: List[Mapping[str, Any]]) -> Tuple[Identifier, ...]: # pylint: disable=no-self-argument,missing-function-docstring
return tuple(Identifier.from_json(identifier) for identifier in value)
def identifiers(value: List[Mapping[str, Any]]) -> Tuple[Identifier, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring
return tuple(cast(Identifier, Identifier.from_json(identifier)) for identifier in value)
class OrderResource(ResourceWithURI):
"""Order Resource.
:ivar acme.messages.Order body:
:ivar str csr_pem: The CSR this Order will be finalized with.
:ivar bytes csr_pem: The CSR this Order will be finalized with.
:ivar authorizations: Fully-fetched AuthorizationResource objects.
:vartype authorizations: `list` of `acme.messages.AuthorizationResource`
:ivar str fullchain_pem: The fetched contents of the certificate URL
:ivar bytes fullchain_pem: The fetched contents of the certificate URL
produced once the order was finalized, if it's present.
:ivar alternative_fullchains_pem: The fetched contents of alternative certificate
chain URLs produced once the order was finalized, if present and requested during
finalization.
:vartype alternative_fullchains_pem: `list` of `str`
:vartype alternative_fullchains_pem: `list` of `bytes`
"""
body: Order = jose.field('body', decoder=Order.from_json)
csr_pem: str = jose.field('csr_pem', omitempty=True)
csr_pem: bytes = jose.field('csr_pem', omitempty=True)
authorizations: List[AuthorizationResource] = jose.field('authorizations')
fullchain_pem: str = jose.field('fullchain_pem', omitempty=True)
alternative_fullchains_pem: List[str] = jose.field('alternative_fullchains_pem', omitempty=True)
fullchain_pem: bytes = jose.field('fullchain_pem', omitempty=True)
alternative_fullchains_pem: List[bytes] = jose.field('alternative_fullchains_pem',
omitempty=True)
@Directory.register
class NewOrder(Order):

View file

@ -50,8 +50,8 @@ class Account:
"""
creation_dt = acme_fields.RFC3339Field("creation_dt")
creation_host = jose.Field("creation_host")
register_to_eff = jose.Field("register_to_eff", omitempty=True)
creation_host: str = jose.field("creation_host")
register_to_eff: str = jose.field("register_to_eff", omitempty=True)
def __init__(self, regr, key, meta=None):
self.key = key