Implement ACMEv2 signing of POST bodies.

This commit is contained in:
Jacob Hoffman-Andrews 2017-11-27 14:03:56 -08:00
parent 686fa36b3b
commit a1d4f47ccc
2 changed files with 44 additions and 15 deletions

View file

@ -48,6 +48,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
:ivar messages.Directory directory:
:ivar key: `.JWK` (private)
:ivar account: `.Account` (private)
:ivar alg: `.JWASignature`
:ivar bool verify_ssl: Verify SSL certificates?
:ivar .ClientNetwork net: Client network. Useful for testing. If not
@ -56,7 +57,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
"""
def __init__(self, directory, key, alg=jose.RS256, verify_ssl=True,
def __init__(self, directory, key, account=None, alg=jose.RS256, verify_ssl=True,
net=None):
"""Initialize.
@ -65,7 +66,9 @@ class Client(object): # pylint: disable=too-many-instance-attributes
"""
self.key = key
self.net = ClientNetwork(key, alg, verify_ssl) if net is None else net
self.account = account
self.net = ClientNetwork(key, account=account, alg=alg,
verify_ssl=verify_ssl) if net is None else net
if isinstance(directory, six.string_types):
self.directory = messages.Directory.from_json(
@ -92,10 +95,15 @@ class Client(object): # pylint: disable=too-many-instance-attributes
:rtype: `.RegistrationResource`
"""
new_reg = messages.NewRegistration() if new_reg is None else new_reg
assert isinstance(new_reg, messages.NewRegistration)
if hasattr(self.directory, 'new_account'):
url = self.directory.new_account
new_reg = messages.NewAccount() if new_reg is None else new_reg
else:
url = self.directory.new_reg
new_reg = messages.NewRegistration() if new_reg is None else new_reg
assert isinstance(new_reg, messages.NewRegistration)
response = self.net.post(self.directory[new_reg], new_reg)
response = self.net.post(url, new_reg)
# TODO: handle errors
assert response.status_code == http_client.CREATED
@ -509,15 +517,18 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
JSON_ERROR_CONTENT_TYPE = 'application/problem+json'
REPLAY_NONCE_HEADER = 'Replay-Nonce'
def __init__(self, key, alg=jose.RS256, verify_ssl=True,
user_agent='acme-python', timeout=DEFAULT_NETWORK_TIMEOUT):
def __init__(self, key, account=None, alg=jose.RS256, verify_ssl=True,
user_agent='acme-python', timeout=DEFAULT_NETWORK_TIMEOUT,
acme_version=2):
self.key = key
self.account = account
self.alg = alg
self.verify_ssl = verify_ssl
self._nonces = set()
self.user_agent = user_agent
self.session = requests.Session()
self._default_timeout = timeout
self.acme_version = acme_version
def __del__(self):
# Try to close the session, but don't show exceptions to the
@ -527,7 +538,7 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
except Exception: # pylint: disable=broad-except
pass
def _wrap_in_jws(self, obj, nonce):
def _wrap_in_jws(self, obj, nonce, url):
"""Wrap `JSONDeSerializable` object in JWS.
.. todo:: Implement ``acmePath``.
@ -539,9 +550,17 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
"""
jobj = obj.json_dumps(indent=2).encode()
logger.debug('JWS payload:\n%s', jobj)
return jws.JWS.sign(
payload=jobj, key=self.key, alg=self.alg,
nonce=nonce).json_dumps(indent=2)
kwargs = {
"alg": self.alg,
"nonce": nonce
}
if self.acme_version is 2:
# new ACME spec
kwargs["url"] = url
if self.account is not None:
kwargs["kid"] = self.account["uri"]
kwargs["key"] = self.key
return jws.JWS.sign(jobj, **kwargs).json_dumps(indent=2)
@classmethod
def _check_response(cls, response, content_type=None):
@ -715,7 +734,7 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
raise
def _post_once(self, url, obj, content_type=JOSE_CONTENT_TYPE, **kwargs):
data = self._wrap_in_jws(obj, self._get_nonce(url))
data = self._wrap_in_jws(obj, self._get_nonce(url), url)
kwargs.setdefault('headers', {'Content-Type': content_type})
response = self._send_request('POST', url, data=data, **kwargs)
self._add_nonce(response)

View file

@ -467,11 +467,21 @@ class ClientNetworkTest(unittest.TestCase):
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce=b'Tg')
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url")
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
self.assertEqual(jws.signature.combined.nonce, b'Tg')
self.net.account = {'uri': 'acct-uri'}
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url")
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
self.assertEqual(jws.signature.combined.nonce, b'Tg')
self.assertEqual(jws.signature.combined.kid, u'acct-uri')
self.assertEqual(jws.signature.combined.url, u'url')
def test_check_response_not_ok_jobj_no_error(self):
self.response.ok = False
self.response.json.return_value = {}
@ -701,13 +711,13 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.assertEqual(self.checked_response, self.net.post(
'uri', self.obj, content_type=self.content_type))
self.net._wrap_in_jws.assert_called_once_with(
self.obj, jose.b64decode(self.all_nonces.pop()))
self.obj, jose.b64decode(self.all_nonces.pop()), "uri")
self.available_nonces = []
self.assertRaises(errors.MissingNonce, self.net.post,
'uri', self.obj, content_type=self.content_type)
self.net._wrap_in_jws.assert_called_with(
self.obj, jose.b64decode(self.all_nonces.pop()))
self.obj, jose.b64decode(self.all_nonces.pop()), "uri")
def test_post_wrong_initial_nonce(self): # HEAD
self.available_nonces = [b'f', jose.b64encode(b'good')]