mirror of
https://github.com/certbot/certbot.git
synced 2026-06-04 14:26:10 -04:00
client.py changes for ACMEv2 (#5287)
* Implement ACMEv2 signing of POST bodies. * Add account, and make acme_version explicit. * Remove separate NewAccount. * Rename to add v2. * Add terms_of_service_agreed. * Split out wrap_in_jws_v2 test. * Re-add too-many-public-methods. * Split Client into ClientBase / Client / ClientV2 * Use camelCase for newAccount. * Make acme_version optional parameter on .post(). This allows us to instantiate a ClientNetwork before knowing the version. * Add kid unconditionally.
This commit is contained in:
parent
e085ff06a1
commit
9baf75d6c8
3 changed files with 258 additions and 141 deletions
|
|
@ -39,39 +39,24 @@ DEFAULT_NETWORK_TIMEOUT = 45
|
|||
DER_CONTENT_TYPE = 'application/pkix-cert'
|
||||
|
||||
|
||||
class Client(object): # pylint: disable=too-many-instance-attributes
|
||||
"""ACME client.
|
||||
|
||||
.. todo::
|
||||
Clean up raised error types hierarchy, document, and handle (wrap)
|
||||
instances of `.DeserializationError` raised in `from_json()`.
|
||||
class ClientBase(object): # pylint: disable=too-many-instance-attributes
|
||||
"""ACME client base object.
|
||||
|
||||
:ivar messages.Directory directory:
|
||||
:ivar key: `.JWK` (private)
|
||||
:ivar alg: `.JWASignature`
|
||||
:ivar bool verify_ssl: Verify SSL certificates?
|
||||
:ivar .ClientNetwork net: Client network. Useful for testing. If not
|
||||
supplied, it will be initialized using `key`, `alg` and
|
||||
`verify_ssl`.
|
||||
|
||||
:ivar .ClientNetwork net: Client network.
|
||||
:ivar int acme_version: ACME protocol version. 1 or 2.
|
||||
"""
|
||||
|
||||
def __init__(self, directory, key, alg=jose.RS256, verify_ssl=True,
|
||||
net=None):
|
||||
def __init__(self, directory, net, acme_version):
|
||||
"""Initialize.
|
||||
|
||||
:param directory: Directory Resource (`.messages.Directory`) or
|
||||
URI from which the resource will be downloaded.
|
||||
|
||||
:param .messages.Directory directory: Directory Resource
|
||||
:param .ClientNetwork net: Client network.
|
||||
:param int acme_version: ACME protocol version. 1 or 2.
|
||||
"""
|
||||
self.key = key
|
||||
self.net = ClientNetwork(key, alg, verify_ssl) if net is None else net
|
||||
|
||||
if isinstance(directory, six.string_types):
|
||||
self.directory = messages.Directory.from_json(
|
||||
self.net.get(directory).json())
|
||||
else:
|
||||
self.directory = directory
|
||||
self.directory = directory
|
||||
self.net = net
|
||||
self.acme_version = acme_version
|
||||
|
||||
@classmethod
|
||||
def _regr_from_response(cls, response, uri=None, terms_of_service=None):
|
||||
|
|
@ -83,28 +68,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
uri=response.headers.get('Location', uri),
|
||||
terms_of_service=terms_of_service)
|
||||
|
||||
def register(self, new_reg=None):
|
||||
"""Register.
|
||||
|
||||
:param .NewRegistration new_reg:
|
||||
|
||||
:returns: Registration Resource.
|
||||
:rtype: `.RegistrationResource`
|
||||
|
||||
"""
|
||||
new_reg = messages.NewRegistration() if new_reg is None else new_reg
|
||||
assert isinstance(new_reg, messages.NewRegistration)
|
||||
|
||||
response = self.net.post(self.directory[new_reg], new_reg)
|
||||
# TODO: handle errors
|
||||
assert response.status_code == http_client.CREATED
|
||||
|
||||
# "Instance of 'Field' has no key/contact member" bug:
|
||||
# pylint: disable=no-member
|
||||
return self._regr_from_response(response)
|
||||
|
||||
def _send_recv_regr(self, regr, body):
|
||||
response = self.net.post(regr.uri, body)
|
||||
response = self.net.post(regr.uri, body, acme_version=self.acme_version)
|
||||
|
||||
# TODO: Boulder returns httplib.ACCEPTED
|
||||
#assert response.status_code == httplib.OK
|
||||
|
|
@ -153,21 +118,6 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
"""
|
||||
return self._send_recv_regr(regr, messages.UpdateRegistration())
|
||||
|
||||
def agree_to_tos(self, regr):
|
||||
"""Agree to the terms-of-service.
|
||||
|
||||
Agree to the terms-of-service in a Registration Resource.
|
||||
|
||||
:param regr: Registration Resource.
|
||||
:type regr: `.RegistrationResource`
|
||||
|
||||
:returns: Updated Registration Resource.
|
||||
:rtype: `.RegistrationResource`
|
||||
|
||||
"""
|
||||
return self.update_registration(
|
||||
regr.update(body=regr.body.update(agreement=regr.terms_of_service)))
|
||||
|
||||
def _authzr_from_response(self, response, identifier, uri=None):
|
||||
authzr = messages.AuthorizationResource(
|
||||
body=messages.Authorization.from_json(response.json()),
|
||||
|
|
@ -176,42 +126,6 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
raise errors.UnexpectedUpdate(authzr)
|
||||
return authzr
|
||||
|
||||
def request_challenges(self, identifier, new_authzr_uri=None):
|
||||
"""Request challenges.
|
||||
|
||||
:param .messages.Identifier identifier: Identifier to be challenged.
|
||||
:param str new_authzr_uri: Deprecated. Do not use.
|
||||
|
||||
:returns: Authorization Resource.
|
||||
:rtype: `.AuthorizationResource`
|
||||
|
||||
"""
|
||||
if new_authzr_uri is not None:
|
||||
logger.debug("request_challenges with new_authzr_uri deprecated.")
|
||||
new_authz = messages.NewAuthorization(identifier=identifier)
|
||||
response = self.net.post(self.directory.new_authz, new_authz)
|
||||
# TODO: handle errors
|
||||
assert response.status_code == http_client.CREATED
|
||||
return self._authzr_from_response(response, identifier)
|
||||
|
||||
def request_domain_challenges(self, domain, new_authzr_uri=None):
|
||||
"""Request challenges for domain names.
|
||||
|
||||
This is simply a convenience function that wraps around
|
||||
`request_challenges`, but works with domain names instead of
|
||||
generic identifiers. See ``request_challenges`` for more
|
||||
documentation.
|
||||
|
||||
:param str domain: Domain name to be challenged.
|
||||
:param str new_authzr_uri: Deprecated. Do not use.
|
||||
|
||||
:returns: Authorization Resource.
|
||||
:rtype: `.AuthorizationResource`
|
||||
|
||||
"""
|
||||
return self.request_challenges(messages.Identifier(
|
||||
typ=messages.IDENTIFIER_FQDN, value=domain), new_authzr_uri)
|
||||
|
||||
def answer_challenge(self, challb, response):
|
||||
"""Answer challenge.
|
||||
|
||||
|
|
@ -227,7 +141,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
:raises .UnexpectedUpdate:
|
||||
|
||||
"""
|
||||
response = self.net.post(challb.uri, response)
|
||||
response = self.net.post(challb.uri, response,
|
||||
acme_version=self.acme_version)
|
||||
try:
|
||||
authzr_uri = response.links['up']['url']
|
||||
except KeyError:
|
||||
|
|
@ -288,6 +203,133 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
response, authzr.body.identifier, authzr.uri)
|
||||
return updated_authzr, response
|
||||
|
||||
def revoke(self, cert, rsn):
|
||||
"""Revoke certificate.
|
||||
|
||||
:param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in
|
||||
`.ComparableX509`
|
||||
|
||||
:param int rsn: Reason code for certificate revocation.
|
||||
|
||||
:raises .ClientError: If revocation is unsuccessful.
|
||||
|
||||
"""
|
||||
response = self.net.post(self.directory[messages.Revocation],
|
||||
messages.Revocation(
|
||||
certificate=cert,
|
||||
reason=rsn),
|
||||
content_type=None,
|
||||
acme_version=self.acme_version)
|
||||
if response.status_code != http_client.OK:
|
||||
raise errors.ClientError(
|
||||
'Successful revocation must return HTTP OK status')
|
||||
|
||||
class Client(ClientBase):
|
||||
"""ACME client for a v1 API.
|
||||
|
||||
.. todo::
|
||||
Clean up raised error types hierarchy, document, and handle (wrap)
|
||||
instances of `.DeserializationError` raised in `from_json()`.
|
||||
|
||||
:ivar messages.Directory directory:
|
||||
:ivar key: `.JWK` (private)
|
||||
:ivar alg: `.JWASignature`
|
||||
:ivar bool verify_ssl: Verify SSL certificates?
|
||||
:ivar .ClientNetwork net: Client network. Useful for testing. If not
|
||||
supplied, it will be initialized using `key`, `alg` and
|
||||
`verify_ssl`.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, directory, key, alg=jose.RS256, verify_ssl=True,
|
||||
net=None):
|
||||
"""Initialize.
|
||||
|
||||
:param directory: Directory Resource (`.messages.Directory`) or
|
||||
URI from which the resource will be downloaded.
|
||||
|
||||
"""
|
||||
# pylint: disable=too-many-arguments
|
||||
self.key = key
|
||||
self.net = ClientNetwork(key, alg=alg, verify_ssl=verify_ssl) if net is None else net
|
||||
|
||||
if isinstance(directory, six.string_types):
|
||||
directory = messages.Directory.from_json(
|
||||
self.net.get(directory).json())
|
||||
super(Client, self).__init__(directory=directory,
|
||||
net=net, acme_version=1)
|
||||
|
||||
def register(self, new_reg=None):
|
||||
"""Register.
|
||||
|
||||
:param .NewRegistration new_reg:
|
||||
|
||||
:returns: Registration Resource.
|
||||
:rtype: `.RegistrationResource`
|
||||
|
||||
"""
|
||||
new_reg = messages.NewRegistration() if new_reg is None else new_reg
|
||||
response = self.net.post(self.directory[new_reg], new_reg,
|
||||
acme_version=1)
|
||||
# TODO: handle errors
|
||||
assert response.status_code == http_client.CREATED
|
||||
|
||||
# "Instance of 'Field' has no key/contact member" bug:
|
||||
# pylint: disable=no-member
|
||||
return self._regr_from_response(response)
|
||||
|
||||
def agree_to_tos(self, regr):
|
||||
"""Agree to the terms-of-service.
|
||||
|
||||
Agree to the terms-of-service in a Registration Resource.
|
||||
|
||||
:param regr: Registration Resource.
|
||||
:type regr: `.RegistrationResource`
|
||||
|
||||
:returns: Updated Registration Resource.
|
||||
:rtype: `.RegistrationResource`
|
||||
|
||||
"""
|
||||
return self.update_registration(
|
||||
regr.update(body=regr.body.update(agreement=regr.terms_of_service)))
|
||||
|
||||
def request_challenges(self, identifier, new_authzr_uri=None):
|
||||
"""Request challenges.
|
||||
|
||||
:param .messages.Identifier identifier: Identifier to be challenged.
|
||||
:param str new_authzr_uri: Deprecated. Do not use.
|
||||
|
||||
:returns: Authorization Resource.
|
||||
:rtype: `.AuthorizationResource`
|
||||
|
||||
"""
|
||||
if new_authzr_uri is not None:
|
||||
logger.debug("request_challenges with new_authzr_uri deprecated.")
|
||||
new_authz = messages.NewAuthorization(identifier=identifier)
|
||||
response = self.net.post(self.directory.new_authz, new_authz,
|
||||
acme_version=1)
|
||||
# TODO: handle errors
|
||||
assert response.status_code == http_client.CREATED
|
||||
return self._authzr_from_response(response, identifier)
|
||||
|
||||
def request_domain_challenges(self, domain, new_authzr_uri=None):
|
||||
"""Request challenges for domain names.
|
||||
|
||||
This is simply a convenience function that wraps around
|
||||
`request_challenges`, but works with domain names instead of
|
||||
generic identifiers. See ``request_challenges`` for more
|
||||
documentation.
|
||||
|
||||
:param str domain: Domain name to be challenged.
|
||||
:param str new_authzr_uri: Deprecated. Do not use.
|
||||
|
||||
:returns: Authorization Resource.
|
||||
:rtype: `.AuthorizationResource`
|
||||
|
||||
"""
|
||||
return self.request_challenges(messages.Identifier(
|
||||
typ=messages.IDENTIFIER_FQDN, value=domain), new_authzr_uri)
|
||||
|
||||
def request_issuance(self, csr, authzrs):
|
||||
"""Request issuance.
|
||||
|
||||
|
|
@ -311,7 +353,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
self.directory.new_cert,
|
||||
req,
|
||||
content_type=content_type,
|
||||
headers={'Accept': content_type})
|
||||
headers={'Accept': content_type},
|
||||
acme_version=1)
|
||||
|
||||
cert_chain_uri = response.links.get('up', {}).get('url')
|
||||
|
||||
|
|
@ -481,37 +524,65 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
"Recursion limit reached. Didn't get {0}".format(uri))
|
||||
return chain
|
||||
|
||||
def revoke(self, cert, rsn):
|
||||
"""Revoke certificate.
|
||||
|
||||
:param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in
|
||||
`.ComparableX509`
|
||||
|
||||
:param int rsn: Reason code for certificate revocation.
|
||||
class ClientV2(ClientBase):
|
||||
"""ACME client for a v2 API.
|
||||
|
||||
:raises .ClientError: If revocation is unsuccessful.
|
||||
:ivar messages.Directory directory:
|
||||
:ivar .ClientNetwork net: Client network.
|
||||
"""
|
||||
|
||||
def __init__(self, directory, net):
|
||||
"""Initialize.
|
||||
|
||||
:param .messages.Directory directory: Directory Resource
|
||||
:param .ClientNetwork net: Client network.
|
||||
"""
|
||||
super(ClientV2, self).__init__(directory=directory,
|
||||
net=net, acme_version=2)
|
||||
|
||||
def new_account(self, new_account):
|
||||
"""Register.
|
||||
|
||||
:param .NewRegistration new_account:
|
||||
|
||||
:returns: Registration Resource.
|
||||
:rtype: `.RegistrationResource`
|
||||
|
||||
"""
|
||||
response = self.net.post(self.directory[messages.Revocation],
|
||||
messages.Revocation(
|
||||
certificate=cert,
|
||||
reason=rsn),
|
||||
content_type=None)
|
||||
if response.status_code != http_client.OK:
|
||||
raise errors.ClientError(
|
||||
'Successful revocation must return HTTP OK status')
|
||||
response = self.net.post(self.directory['newAccount'], new_account,
|
||||
acme_version=2)
|
||||
# "Instance of 'Field' has no key/contact member" bug:
|
||||
# pylint: disable=no-member
|
||||
return self._regr_from_response(response)
|
||||
|
||||
|
||||
class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
|
||||
"""Client network."""
|
||||
"""Wrapper around requests that signs POSTs for authentication.
|
||||
|
||||
Also adds user agent, and handles Content-Type.
|
||||
"""
|
||||
JSON_CONTENT_TYPE = 'application/json'
|
||||
JOSE_CONTENT_TYPE = 'application/jose+json'
|
||||
JSON_ERROR_CONTENT_TYPE = 'application/problem+json'
|
||||
REPLAY_NONCE_HEADER = 'Replay-Nonce'
|
||||
|
||||
def __init__(self, key, alg=jose.RS256, verify_ssl=True,
|
||||
"""Initialize.
|
||||
|
||||
:param key: Account private key
|
||||
:param messages.Registration account: Account object. Required if you are
|
||||
planning to use .post() with acme_version=2.
|
||||
:param josepy.JWASignature alg: Algoritm to use in signing JWS.
|
||||
:param bool verify_ssl: Whether to verify certificates on SSL connections.
|
||||
:param str user_agent: String to send as User-Agent header.
|
||||
:param float timeout: Timeout for requests.
|
||||
"""
|
||||
def __init__(self, key, account=None, alg=jose.RS256, verify_ssl=True,
|
||||
user_agent='acme-python', timeout=DEFAULT_NETWORK_TIMEOUT):
|
||||
# pylint: disable=too-many-arguments
|
||||
self.key = key
|
||||
self.account = account
|
||||
self.alg = alg
|
||||
self.verify_ssl = verify_ssl
|
||||
self._nonces = set()
|
||||
|
|
@ -527,21 +598,29 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
|
|||
except Exception: # pylint: disable=broad-except
|
||||
pass
|
||||
|
||||
def _wrap_in_jws(self, obj, nonce):
|
||||
def _wrap_in_jws(self, obj, nonce, url, acme_version):
|
||||
"""Wrap `JSONDeSerializable` object in JWS.
|
||||
|
||||
.. todo:: Implement ``acmePath``.
|
||||
|
||||
:param .JSONDeSerializable obj:
|
||||
:param str url: The URL to which this object will be POSTed
|
||||
:param bytes nonce:
|
||||
:rtype: `.JWS`
|
||||
|
||||
"""
|
||||
jobj = obj.json_dumps(indent=2).encode()
|
||||
logger.debug('JWS payload:\n%s', jobj)
|
||||
return jws.JWS.sign(
|
||||
payload=jobj, key=self.key, alg=self.alg,
|
||||
nonce=nonce).json_dumps(indent=2)
|
||||
kwargs = {
|
||||
"alg": self.alg,
|
||||
"nonce": nonce
|
||||
}
|
||||
if acme_version == 2:
|
||||
kwargs["url"] = url
|
||||
kwargs["kid"] = self.account["uri"]
|
||||
kwargs["key"] = self.key
|
||||
# pylint: disable=star-args
|
||||
return jws.JWS.sign(jobj, **kwargs).json_dumps(indent=2)
|
||||
|
||||
@classmethod
|
||||
def _check_response(cls, response, content_type=None):
|
||||
|
|
@ -714,8 +793,9 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
|
|||
else:
|
||||
raise
|
||||
|
||||
def _post_once(self, url, obj, content_type=JOSE_CONTENT_TYPE, **kwargs):
|
||||
data = self._wrap_in_jws(obj, self._get_nonce(url))
|
||||
def _post_once(self, url, obj, content_type=JOSE_CONTENT_TYPE,
|
||||
acme_version=1, **kwargs):
|
||||
data = self._wrap_in_jws(obj, self._get_nonce(url), url, acme_version)
|
||||
kwargs.setdefault('headers', {'Content-Type': content_type})
|
||||
response = self._send_request('POST', url, data=data, **kwargs)
|
||||
self._add_nonce(response)
|
||||
|
|
|
|||
|
|
@ -104,6 +104,23 @@ class ClientTest(unittest.TestCase):
|
|||
self.assertEqual(self.regr, self.client.register(self.new_reg))
|
||||
# TODO: test POST call arguments
|
||||
|
||||
def test_new_account_v2(self):
|
||||
directory = messages.Directory({
|
||||
"newAccount": 'https://www.letsencrypt-demo.org/acme/new-account',
|
||||
})
|
||||
from acme.client import ClientV2
|
||||
client = ClientV2(directory, self.net)
|
||||
self.response.status_code = http_client.CREATED
|
||||
self.response.json.return_value = self.regr.body.to_json()
|
||||
self.response.headers['Location'] = self.regr.uri
|
||||
|
||||
self.regr = messages.RegistrationResource(
|
||||
body=messages.Registration(
|
||||
contact=self.contact, key=KEY.public_key()),
|
||||
uri='https://www.letsencrypt-demo.org/acme/reg/1')
|
||||
|
||||
self.assertEqual(self.regr, client.new_account(self.regr))
|
||||
|
||||
def test_update_registration(self):
|
||||
# "Instance of 'Field' has no to_json/update member" bug:
|
||||
# pylint: disable=no-member
|
||||
|
|
@ -142,20 +159,23 @@ class ClientTest(unittest.TestCase):
|
|||
self.client.request_challenges(self.identifier)
|
||||
self.net.post.assert_called_once_with(
|
||||
self.directory.new_authz,
|
||||
messages.NewAuthorization(identifier=self.identifier))
|
||||
messages.NewAuthorization(identifier=self.identifier),
|
||||
acme_version=1)
|
||||
|
||||
def test_request_challenges_deprecated_arg(self):
|
||||
self._prepare_response_for_request_challenges()
|
||||
self.client.request_challenges(self.identifier, new_authzr_uri="hi")
|
||||
self.net.post.assert_called_once_with(
|
||||
self.directory.new_authz,
|
||||
messages.NewAuthorization(identifier=self.identifier))
|
||||
messages.NewAuthorization(identifier=self.identifier),
|
||||
acme_version=1)
|
||||
|
||||
def test_request_challenges_custom_uri(self):
|
||||
self._prepare_response_for_request_challenges()
|
||||
self.client.request_challenges(self.identifier)
|
||||
self.net.post.assert_called_once_with(
|
||||
'https://www.letsencrypt-demo.org/acme/new-authz', mock.ANY)
|
||||
'https://www.letsencrypt-demo.org/acme/new-authz', mock.ANY,
|
||||
acme_version=1)
|
||||
|
||||
def test_request_challenges_unexpected_update(self):
|
||||
self._prepare_response_for_request_challenges()
|
||||
|
|
@ -417,7 +437,8 @@ class ClientTest(unittest.TestCase):
|
|||
def test_revoke(self):
|
||||
self.client.revoke(self.certr.body, self.rsn)
|
||||
self.net.post.assert_called_once_with(
|
||||
self.directory[messages.Revocation], mock.ANY, content_type=None)
|
||||
self.directory[messages.Revocation], mock.ANY, content_type=None,
|
||||
acme_version=1)
|
||||
|
||||
def test_revocation_payload(self):
|
||||
obj = messages.Revocation(certificate=self.certr.body, reason=self.rsn)
|
||||
|
|
@ -432,9 +453,22 @@ class ClientTest(unittest.TestCase):
|
|||
self.certr,
|
||||
self.rsn)
|
||||
|
||||
class MockJSONDeSerializable(jose.JSONDeSerializable):
|
||||
# pylint: disable=missing-docstring
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def to_partial_json(self):
|
||||
return {'foo': self.value}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, value):
|
||||
pass # pragma: no cover
|
||||
|
||||
|
||||
class ClientNetworkTest(unittest.TestCase):
|
||||
"""Tests for acme.client.ClientNetwork."""
|
||||
# pylint: disable=too-many-public-methods
|
||||
|
||||
def setUp(self):
|
||||
self.verify_ssl = mock.MagicMock()
|
||||
|
|
@ -453,25 +487,27 @@ class ClientNetworkTest(unittest.TestCase):
|
|||
self.assertTrue(self.net.verify_ssl is self.verify_ssl)
|
||||
|
||||
def test_wrap_in_jws(self):
|
||||
class MockJSONDeSerializable(jose.JSONDeSerializable):
|
||||
# pylint: disable=missing-docstring
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def to_partial_json(self):
|
||||
return {'foo': self.value}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, value):
|
||||
pass # pragma: no cover
|
||||
|
||||
# pylint: disable=protected-access
|
||||
jws_dump = self.net._wrap_in_jws(
|
||||
MockJSONDeSerializable('foo'), nonce=b'Tg')
|
||||
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url",
|
||||
acme_version=1)
|
||||
jws = acme_jws.JWS.json_loads(jws_dump)
|
||||
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
|
||||
self.assertEqual(jws.signature.combined.nonce, b'Tg')
|
||||
|
||||
def test_wrap_in_jws_v2(self):
|
||||
self.net.account = {'uri': 'acct-uri'}
|
||||
# pylint: disable=protected-access
|
||||
jws_dump = self.net._wrap_in_jws(
|
||||
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url",
|
||||
acme_version=2)
|
||||
jws = acme_jws.JWS.json_loads(jws_dump)
|
||||
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
|
||||
self.assertEqual(jws.signature.combined.nonce, b'Tg')
|
||||
self.assertEqual(jws.signature.combined.kid, u'acct-uri')
|
||||
self.assertEqual(jws.signature.combined.url, u'url')
|
||||
|
||||
|
||||
def test_check_response_not_ok_jobj_no_error(self):
|
||||
self.response.ok = False
|
||||
self.response.json.return_value = {}
|
||||
|
|
@ -701,13 +737,13 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
|
|||
self.assertEqual(self.checked_response, self.net.post(
|
||||
'uri', self.obj, content_type=self.content_type))
|
||||
self.net._wrap_in_jws.assert_called_once_with(
|
||||
self.obj, jose.b64decode(self.all_nonces.pop()))
|
||||
self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1)
|
||||
|
||||
self.available_nonces = []
|
||||
self.assertRaises(errors.MissingNonce, self.net.post,
|
||||
'uri', self.obj, content_type=self.content_type)
|
||||
self.net._wrap_in_jws.assert_called_with(
|
||||
self.obj, jose.b64decode(self.all_nonces.pop()))
|
||||
self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1)
|
||||
|
||||
def test_post_wrong_initial_nonce(self): # HEAD
|
||||
self.available_nonces = [b'f', jose.b64encode(b'good')]
|
||||
|
|
|
|||
|
|
@ -251,6 +251,7 @@ class Registration(ResourceBody):
|
|||
contact = jose.Field('contact', omitempty=True, default=())
|
||||
agreement = jose.Field('agreement', omitempty=True)
|
||||
status = jose.Field('status', omitempty=True)
|
||||
terms_of_service_agreed = jose.Field('terms-of-service-agreed', omitempty=True)
|
||||
|
||||
phone_prefix = 'tel:'
|
||||
email_prefix = 'mailto:'
|
||||
|
|
|
|||
Loading…
Reference in a new issue