mirror of
https://github.com/certbot/certbot.git
synced 2026-06-08 16:22:18 -04:00
Add generic methods to save some casts, and fix lint
This commit is contained in:
parent
2cc4ff200a
commit
c678652b3c
3 changed files with 53 additions and 25 deletions
|
|
@ -12,6 +12,8 @@ from typing import Mapping
|
|||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
import josepy as jose
|
||||
|
|
@ -27,14 +29,26 @@ from acme.mixins import TypeMixin
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar('T', bound='_JSONObjectWithFields')
|
||||
R = TypeVar('R', bound='Challenge')
|
||||
|
||||
class Challenge(jose.TypedJSONObjectWithFields):
|
||||
|
||||
# TODO: Remove this class once JSONObjectWithFields in josepy becomes generic.
|
||||
class _JSONObjectWithFields(jose.JSONObjectWithFields):
|
||||
"""Generic version of jose.JSONObjectWithFields"""
|
||||
|
||||
@classmethod
|
||||
def from_json(cls: Type[T], jobj: Mapping[str, Any]) -> T:
|
||||
return cast(T, super().from_json(jobj))
|
||||
|
||||
|
||||
class Challenge(_JSONObjectWithFields):
|
||||
# _fields_to_partial_json
|
||||
"""ACME challenge."""
|
||||
TYPES: Dict[str, Type['Challenge']] = {}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, jobj: Mapping[str, Any]) -> jose.TypedJSONObjectWithFields:
|
||||
def from_json(cls: Type[R], jobj: Mapping[str, Any]) -> Union[R, 'UnrecognizedChallenge']:
|
||||
try:
|
||||
return super().from_json(jobj)
|
||||
except jose.UnrecognizedTypeError as error:
|
||||
|
|
@ -42,7 +56,7 @@ class Challenge(jose.TypedJSONObjectWithFields):
|
|||
return UnrecognizedChallenge.from_json(jobj)
|
||||
|
||||
|
||||
class ChallengeResponse(ResourceMixin, TypeMixin, jose.TypedJSONObjectWithFields):
|
||||
class ChallengeResponse(ResourceMixin, TypeMixin, _JSONObjectWithFields):
|
||||
# _fields_to_partial_json
|
||||
"""ACME challenge response."""
|
||||
TYPES: Dict[str, Type['ChallengeResponse']] = {}
|
||||
|
|
|
|||
|
|
@ -326,8 +326,7 @@ class Client(ClientBase):
|
|||
|
||||
"""
|
||||
return self.update_registration(
|
||||
cast(messages.RegistrationResource, regr.update(
|
||||
body=regr.body.update(agreement=regr.terms_of_service))))
|
||||
regr.update(body=regr.body.update(agreement=regr.terms_of_service)))
|
||||
|
||||
def request_challenges(self, identifier: messages.Identifier,
|
||||
new_authzr_uri: Optional[str] = None) -> messages.AuthorizationResource:
|
||||
|
|
@ -544,7 +543,7 @@ class Client(ClientBase):
|
|||
raise errors.ClientError('Location header missing')
|
||||
if response.headers['Location'] != certr.uri:
|
||||
raise errors.UnexpectedUpdate(response.text)
|
||||
return cast(messages.CertificateResource, certr.update(body=cert))
|
||||
return certr.update(body=cert)
|
||||
|
||||
def refresh(self, certr: messages.CertificateResource) -> messages.CertificateResource:
|
||||
"""Refresh certificate.
|
||||
|
|
@ -677,7 +676,7 @@ class ClientV2(ClientBase):
|
|||
only_existing_reg = regr.body.update(only_return_existing=True)
|
||||
response = self._post(self.directory['newAccount'], only_existing_reg)
|
||||
updated_uri = response.headers['Location']
|
||||
new_regr = cast(messages.RegistrationResource, regr.update(uri=updated_uri))
|
||||
new_regr = regr.update(uri=updated_uri)
|
||||
self.net.account = new_regr
|
||||
return new_regr
|
||||
|
||||
|
|
@ -703,12 +702,12 @@ class ClientV2(ClientBase):
|
|||
value=ips))
|
||||
order = messages.NewOrder(identifiers=identifiers)
|
||||
response = self._post(self.directory['newOrder'], order)
|
||||
body = cast(messages.Order, messages.Order.from_json(response.json()))
|
||||
body = messages.Order.from_json(response.json())
|
||||
authorizations = []
|
||||
# pylint has trouble understanding our josepy based objects which use
|
||||
# things like custom metaclass logic. body.authorizations should be a
|
||||
# list of strings containing URLs so let's disable this check here.
|
||||
for url in body.authorizations: # pylint: disable=not-an-iterable
|
||||
for url in body.authorizations: # pylint: disable=not-an-iterable,no-member
|
||||
authorizations.append(self._authzr_from_response(self._post_as_get(url), uri=url))
|
||||
return messages.OrderResource(
|
||||
body=body,
|
||||
|
|
@ -775,7 +774,7 @@ class ClientV2(ClientBase):
|
|||
failed.append(authzr)
|
||||
if failed:
|
||||
raise errors.ValidationError(failed)
|
||||
return cast(messages.OrderResource, orderr.update(authorizations=responses))
|
||||
return orderr.update(authorizations=responses)
|
||||
|
||||
def finalize_order(self, orderr: messages.OrderResource, deadline: datetime.datetime,
|
||||
fetch_alternative_chains: bool = False) -> messages.OrderResource:
|
||||
|
|
@ -797,11 +796,11 @@ class ClientV2(ClientBase):
|
|||
while datetime.datetime.now() < deadline:
|
||||
time.sleep(1)
|
||||
response = self._post_as_get(orderr.uri)
|
||||
body = cast(messages.Order, messages.Order.from_json(response.json()))
|
||||
if body.error is not None:
|
||||
raise errors.IssuanceError(body.error)
|
||||
if body.certificate is not None:
|
||||
certificate_response = self._post_as_get(body.certificate)
|
||||
body = messages.Order.from_json(response.json())
|
||||
if body.error is not None: # pylint: disable=no-member
|
||||
raise errors.IssuanceError(body.error) # pylint: disable=no-member
|
||||
if body.certificate is not None: # pylint: disable=no-member
|
||||
certificate_response = self._post_as_get(body.certificate) # pylint: disable=no-member
|
||||
orderr = orderr.update(body=body, fullchain_pem=certificate_response.text)
|
||||
if fetch_alternative_chains:
|
||||
alt_chains_urls = self._get_links(certificate_response, 'alternate')
|
||||
|
|
@ -909,7 +908,7 @@ class BackwardsCompatibleClientV2:
|
|||
client_v2 = cast(ClientV2, self.client)
|
||||
if "terms_of_service" in client_v2.directory.meta:
|
||||
_assess_tos(client_v2.directory.meta.terms_of_service)
|
||||
regr = cast(messages.NewRegistration, regr.update(terms_of_service_agreed=True))
|
||||
regr = regr.update(terms_of_service_agreed=True)
|
||||
return client_v2.new_account(regr)
|
||||
|
||||
def new_order(self, csr_pem: bytes) -> messages.OrderResource:
|
||||
|
|
@ -977,7 +976,7 @@ class BackwardsCompatibleClientV2:
|
|||
cast(OpenSSL.crypto.X509, cast(jose.ComparableX509, certr.body).wrapped)).decode()
|
||||
chain_str = crypto_util.dump_pyopenssl_chain(chain).decode()
|
||||
|
||||
return cast(messages.OrderResource, orderr.update(fullchain_pem=(cert + chain_str)))
|
||||
return orderr.update(fullchain_pem=(cert + chain_str))
|
||||
return cast(ClientV2, self.client).finalize_order(
|
||||
orderr, deadline, fetch_alternative_chains)
|
||||
|
||||
|
|
@ -1129,7 +1128,7 @@ class ClientNetwork:
|
|||
'Ignoring wrong Content-Type (%r) for JSON Error',
|
||||
response_ct)
|
||||
try:
|
||||
raise cast(messages.Error, messages.Error.from_json(jobj))
|
||||
raise messages.Error.from_json(jobj) # pylint: disable=raising-non-exception
|
||||
except jose.DeserializationError as error:
|
||||
# Couldn't deserialize JSON object
|
||||
raise errors.ClientError((response, error))
|
||||
|
|
|
|||
|
|
@ -8,9 +8,10 @@ from typing import Iterator
|
|||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import MutableMapping
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import Optional
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
import josepy as jose
|
||||
|
|
@ -64,6 +65,20 @@ ERROR_TYPE_DESCRIPTIONS = dict(
|
|||
ERROR_TYPE_DESCRIPTIONS.update(dict( # add errors with old prefix, deprecate me
|
||||
(OLD_ERROR_PREFIX + name, desc) for name, desc in ERROR_CODES.items()))
|
||||
|
||||
T = TypeVar('T', bound='_JSONObjectWithFields')
|
||||
|
||||
|
||||
# TODO: Remove this class once JSONObjectWithFields and ImmutableMap in josepy becomes generic.
|
||||
class _JSONObjectWithFields(jose.JSONObjectWithFields):
|
||||
"""Generic version of jose.JSONObjectWithFields"""
|
||||
|
||||
@classmethod
|
||||
def from_json(cls: Type[T], jobj: Mapping[str, Any]) -> T:
|
||||
return cast(T, super().from_json(jobj))
|
||||
|
||||
def update(self: T, **kwargs: Any) -> T:
|
||||
return cast(T, super().update(**kwargs))
|
||||
|
||||
|
||||
def is_acme_error(err: BaseException) -> bool:
|
||||
"""Check if argument is an ACME error."""
|
||||
|
|
@ -72,7 +87,7 @@ def is_acme_error(err: BaseException) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
class Error(jose.JSONObjectWithFields, errors.Error):
|
||||
class Error(_JSONObjectWithFields, errors.Error):
|
||||
"""ACME error.
|
||||
|
||||
https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00
|
||||
|
|
@ -188,7 +203,7 @@ IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
|
|||
IDENTIFIER_IP = IdentifierType('ip') # IdentifierIP in pebble - not in Boulder yet
|
||||
|
||||
|
||||
class Identifier(jose.JSONObjectWithFields):
|
||||
class Identifier(_JSONObjectWithFields):
|
||||
"""ACME identifier.
|
||||
|
||||
:ivar IdentifierType typ:
|
||||
|
|
@ -273,7 +288,7 @@ class Directory(jose.JSONDeSerializable):
|
|||
return cls(jobj)
|
||||
|
||||
|
||||
class Resource(jose.JSONObjectWithFields):
|
||||
class Resource(_JSONObjectWithFields):
|
||||
"""ACME Resource.
|
||||
|
||||
:ivar acme.messages.ResourceBody body: Resource body.
|
||||
|
|
@ -291,7 +306,7 @@ class ResourceWithURI(Resource):
|
|||
uri: str = jose.field('uri') # no ChallengeResource.uri
|
||||
|
||||
|
||||
class ResourceBody(jose.JSONObjectWithFields):
|
||||
class ResourceBody(_JSONObjectWithFields):
|
||||
"""ACME Resource Body."""
|
||||
|
||||
|
||||
|
|
@ -558,7 +573,7 @@ class Authorization(ResourceBody):
|
|||
# that challenge is redefined. Let's ignore the type check here.
|
||||
@challenges.decoder # type: ignore
|
||||
def challenges(value: List[Dict[str, Any]]) -> Tuple[ChallengeBody, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring
|
||||
return tuple(cast(ChallengeBody, ChallengeBody.from_json(chall)) for chall in value)
|
||||
return tuple(ChallengeBody.from_json(chall) for chall in value)
|
||||
|
||||
@property
|
||||
def resolved_combinations(self) -> Tuple[Tuple[ChallengeBody, ...], ...]:
|
||||
|
|
@ -658,7 +673,7 @@ class Order(ResourceBody):
|
|||
# that identifiers is redefined. Let's ignore the type check here.
|
||||
@identifiers.decoder # type: ignore
|
||||
def identifiers(value: List[Dict[str, Any]]) -> Tuple[Identifier, ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring
|
||||
return tuple(cast(Identifier, Identifier.from_json(identifier)) for identifier in value)
|
||||
return tuple(Identifier.from_json(identifier) for identifier in value)
|
||||
|
||||
|
||||
class OrderResource(ResourceWithURI):
|
||||
|
|
|
|||
Loading…
Reference in a new issue