Use the ACMEv2 newNonce endpoint when a new nonce is needed (#6442)

Also, add checking to the newNonce HEAD request, and check responses in general before attempting to save a nonce, for a better error message.

* check response before adding nonce to the pool

* fix tests so that they test what they're supposed to test, and also allow the order of _add_nonce and _check_response to be switched

* make _get_nonce take acme_version

* Send HEAD to newNonce endpoint when using ACMEv2

* check the HEAD newNonce response

* remove unnecessary try; get returns None if the item doesn't exist

* instead of setting new_nonce_url on ClientNetwork, use the saved directory in ClientBase and pass that into ClientNetwork.post

* no need to test acme_version in _get_nonce

* pop new_nonce_url out of kwargs before passing to _send_request
This commit is contained in:
ohemorange 2018-11-02 17:32:33 -07:00 committed by GitHub
parent a1af42bc5f
commit 2c1964c639
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 20 deletions

View file

@ -7,6 +7,7 @@ Certbot adheres to [Semantic Versioning](http://semver.org/).
### Added
* `revoke` accepts `--cert-name`, and doesn't accept both `--cert-name` and `--cert-path`.
* Use the ACMEv2 newNonce endpoint when a new nonce is needed, and newNonce is available in the directory.
### Changed

View file

@ -89,6 +89,8 @@ class ClientBase(object): # pylint: disable=too-many-instance-attributes
"""
kwargs.setdefault('acme_version', self.acme_version)
if hasattr(self.directory, 'newNonce'):
kwargs.setdefault('new_nonce_url', getattr(self.directory, 'newNonce'))
return self.net.post(*args, **kwargs)
def update_registration(self, regr, update=None):
@ -1106,10 +1108,15 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
else:
raise errors.MissingNonce(response)
def _get_nonce(self, url):
def _get_nonce(self, url, new_nonce_url):
if not self._nonces:
logger.debug('Requesting fresh nonce')
self._add_nonce(self.head(url))
if new_nonce_url is None:
response = self.head(url)
else:
# request a new nonce from the acme newNonce endpoint
response = self._check_response(self.head(new_nonce_url), content_type=None)
self._add_nonce(response)
return self._nonces.pop()
def post(self, *args, **kwargs):
@ -1130,8 +1137,13 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
def _post_once(self, url, obj, content_type=JOSE_CONTENT_TYPE,
acme_version=1, **kwargs):
data = self._wrap_in_jws(obj, self._get_nonce(url), url, acme_version)
try:
new_nonce_url = kwargs.pop('new_nonce_url')
except KeyError:
new_nonce_url = None
data = self._wrap_in_jws(obj, self._get_nonce(url, new_nonce_url), url, acme_version)
kwargs.setdefault('headers', {'Content-Type': content_type})
response = self._send_request('POST', url, data=data, **kwargs)
response = self._check_response(response, content_type=content_type)
self._add_nonce(response)
return self._check_response(response, content_type=content_type)
return response

View file

@ -805,7 +805,8 @@ class ClientV2Test(ClientTestBase):
def test_revoke(self):
self.client.revoke(messages_test.CERT, self.rsn)
self.net.post.assert_called_once_with(
self.directory["revokeCert"], mock.ANY, acme_version=2)
self.directory["revokeCert"], mock.ANY, acme_version=2,
new_nonce_url=DIRECTORY_V2['newNonce'])
def test_update_registration(self):
# "Instance of 'Field' has no to_json/update member" bug:
@ -1052,7 +1053,10 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.response = mock.MagicMock(ok=True, status_code=http_client.OK)
self.response.headers = {}
self.response.links = {}
self.checked_response = mock.MagicMock()
self.response.checked = False
self.acmev1_nonce_response = mock.MagicMock(ok=False,
status_code=http_client.METHOD_NOT_ALLOWED)
self.acmev1_nonce_response.headers = {}
self.obj = mock.MagicMock()
self.wrapped_obj = mock.MagicMock()
self.content_type = mock.sentinel.content_type
@ -1064,13 +1068,21 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
def send_request(*args, **kwargs):
# pylint: disable=unused-argument,missing-docstring
self.assertFalse("new_nonce_url" in kwargs)
method = args[0]
uri = args[1]
if method == 'HEAD' and uri != "new_nonce_uri":
response = self.acmev1_nonce_response
else:
response = self.response
if self.available_nonces:
self.response.headers = {
response.headers = {
self.net.REPLAY_NONCE_HEADER:
self.available_nonces.pop().decode()}
else:
self.response.headers = {}
return self.response
response.headers = {}
return response
# pylint: disable=protected-access
self.net._send_request = self.send_request = mock.MagicMock(
@ -1082,28 +1094,39 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
# pylint: disable=missing-docstring
self.assertEqual(self.response, response)
self.assertEqual(self.content_type, content_type)
return self.checked_response
self.assertTrue(self.response.ok)
self.response.checked = True
return self.response
def test_head(self):
self.assertEqual(self.response, self.net.head(
self.assertEqual(self.acmev1_nonce_response, self.net.head(
'http://example.com/', 'foo', bar='baz'))
self.send_request.assert_called_once_with(
'HEAD', 'http://example.com/', 'foo', bar='baz')
def test_head_v2(self):
self.assertEqual(self.response, self.net.head(
'new_nonce_uri', 'foo', bar='baz'))
self.send_request.assert_called_once_with(
'HEAD', 'new_nonce_uri', 'foo', bar='baz')
def test_get(self):
self.assertEqual(self.checked_response, self.net.get(
self.assertEqual(self.response, self.net.get(
'http://example.com/', content_type=self.content_type, bar='baz'))
self.assertTrue(self.response.checked)
self.send_request.assert_called_once_with(
'GET', 'http://example.com/', bar='baz')
def test_post_no_content_type(self):
self.content_type = self.net.JOSE_CONTENT_TYPE
self.assertEqual(self.checked_response, self.net.post('uri', self.obj))
self.assertEqual(self.response, self.net.post('uri', self.obj))
self.assertTrue(self.response.checked)
def test_post(self):
# pylint: disable=protected-access
self.assertEqual(self.checked_response, self.net.post(
self.assertEqual(self.response, self.net.post(
'uri', self.obj, content_type=self.content_type))
self.assertTrue(self.response.checked)
self.net._wrap_in_jws.assert_called_once_with(
self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1)
@ -1135,7 +1158,7 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
def test_post_not_retried(self):
check_response = mock.MagicMock()
check_response.side_effect = [messages.Error.with_code('malformed'),
self.checked_response]
self.response]
# pylint: disable=protected-access
self.net._check_response = check_response
@ -1143,13 +1166,12 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.obj, content_type=self.content_type)
def test_post_successful_retry(self):
check_response = mock.MagicMock()
check_response.side_effect = [messages.Error.with_code('badNonce'),
self.checked_response]
post_once = mock.MagicMock()
post_once.side_effect = [messages.Error.with_code('badNonce'),
self.response]
# pylint: disable=protected-access
self.net._check_response = check_response
self.assertEqual(self.checked_response, self.net.post(
self.assertEqual(self.response, self.net.post(
'uri', self.obj, content_type=self.content_type))
def test_head_get_post_error_passthrough(self):
@ -1160,6 +1182,26 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.assertRaises(requests.exceptions.RequestException,
self.net.post, 'uri', obj=self.obj)
def test_post_bad_nonce_head(self):
# pylint: disable=protected-access
# regression test for https://github.com/certbot/certbot/issues/6092
bad_response = mock.MagicMock(ok=False, status_code=http_client.SERVICE_UNAVAILABLE)
self.net._send_request = mock.MagicMock()
self.net._send_request.return_value = bad_response
self.content_type = None
check_response = mock.MagicMock()
self.net._check_response = check_response
self.assertRaises(errors.ClientError, self.net.post, 'uri',
self.obj, content_type=self.content_type, acme_version=2,
new_nonce_url='new_nonce_uri')
self.assertEqual(check_response.call_count, 1)
def test_new_nonce_uri_removed(self):
self.content_type = None
self.net.post('uri', self.obj, content_type=None,
acme_version=2, new_nonce_url='new_nonce_uri')
class ClientNetworkSourceAddressBindingTest(unittest.TestCase):
"""Tests that if ClientNetwork has a source IP set manually, the underlying library has
used the provided source address."""