Merge pull request #570 from kuba/simplehttp

SimpleHTTP fixes
This commit is contained in:
James Kasten 2015-07-02 09:19:21 -07:00
commit e140eca4f3
7 changed files with 152 additions and 53 deletions

View file

@ -2,13 +2,18 @@
import binascii
import functools
import hashlib
import logging
import Crypto.Random
import requests
from acme import jose
from acme import other
logger = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
@ -63,6 +68,8 @@ class SimpleHTTPResponse(ChallengeResponse):
MAX_PATH_LEN = 25
"""Maximum allowed `path` length."""
CONTENT_TYPE = "text/plain"
@property
def good_path(self):
"""Is `path` good?
@ -72,6 +79,8 @@ class SimpleHTTPResponse(ChallengeResponse):
[RFC4648]", base64.b64decode ignores those characters
"""
# TODO: check that path combined with uri does not go above
# URI_ROOT_PATH!
return len(self.path) <= 25
@property
@ -79,6 +88,11 @@ class SimpleHTTPResponse(ChallengeResponse):
"""URL scheme for the provisioned resource."""
return "https" if self.tls else "http"
@property
def port(self):
"""Port that the ACME client should be listening for validation."""
return 443 if self.tls else 80
def uri(self, domain):
"""Create an URI to the provisioned resource.
@ -91,6 +105,51 @@ class SimpleHTTPResponse(ChallengeResponse):
return self._URI_TEMPLATE.format(
scheme=self.scheme, domain=domain, path=self.path)
def simple_verify(self, chall, domain, port=None):
"""Simple verify.
According to the ACME specification, "the ACME server MUST
ignore the certificate provided by the HTTPS server", so
``requests.get`` is called with ``verify=False``.
:param .SimpleHTTP chall: Corresponding challenge.
:param str domain: Domain name being verified.
:param int port: Port used in the validation.
:returns: ``True`` iff validation is successful, ``False``
otherwise.
:rtype: bool
"""
# TODO: ACME specification defines URI template that doesn't
# allow to use a custom port... Make sure port is not in the
# request URI, if it's standard.
if port is not None and port != self.port:
logger.warn(
"Using non-standard port for SimpleHTTP verification: %s", port)
domain += ":{0}".format(port)
uri = self.uri(domain)
logger.debug("Verifying %s at %s...", chall.typ, uri)
try:
http_response = requests.get(uri, verify=False)
except requests.exceptions.RequestException as error:
logger.error("Unable to reach %s: %s", uri, error)
return False
logger.debug(
"Received %s. Headers: %s", http_response, http_response.headers)
good_token = http_response.text == chall.token
if not good_token:
logger.error(
"Unable to verify %s! Expected: %r, returned: %r.",
uri, chall.token, http_response.text)
# TODO: spec contradicts itself, c.f.
# https://github.com/letsencrypt/acme-spec/pull/156/files#r33136438
good_ct = self.CONTENT_TYPE == http_response.headers.get(
"Content-Type", self.CONTENT_TYPE)
return self.good_path and good_ct and good_token
@Challenge.register
class DVSNI(DVChallenge):

View file

@ -5,6 +5,9 @@ import unittest
import Crypto.PublicKey.RSA
import M2Crypto
import mock
import requests
import urlparse
from acme import jose
from acme import other
@ -49,6 +52,7 @@ class SimpleHTTPTest(unittest.TestCase):
class SimpleHTTPResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
def setUp(self):
from acme.challenges import SimpleHTTPResponse
@ -66,6 +70,12 @@ class SimpleHTTPResponseTest(unittest.TestCase):
'tls': True,
}
from acme.challenges import SimpleHTTP
self.chall = SimpleHTTP(token="foo")
self.resp_http = SimpleHTTPResponse(path="bar", tls=False)
self.resp_https = SimpleHTTPResponse(path="bar", tls=True)
self.good_headers = {'Content-Type': SimpleHTTPResponse.CONTENT_TYPE}
def test_good_path(self):
self.assertTrue(self.msg_http.good_path)
self.assertTrue(self.msg_https.good_path)
@ -76,11 +86,17 @@ class SimpleHTTPResponseTest(unittest.TestCase):
self.assertEqual('http', self.msg_http.scheme)
self.assertEqual('https', self.msg_https.scheme)
def test_port(self):
self.assertEqual(80, self.msg_http.port)
self.assertEqual(443, self.msg_https.port)
def test_uri(self):
self.assertEqual('http://example.com/.well-known/acme-challenge/'
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_http.uri('example.com'))
self.assertEqual('https://example.com/.well-known/acme-challenge/'
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_https.uri('example.com'))
self.assertEqual(
'http://example.com/.well-known/acme-challenge/'
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_http.uri('example.com'))
self.assertEqual(
'https://example.com/.well-known/acme-challenge/'
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_https.uri('example.com'))
def test_to_partial_json(self):
self.assertEqual(self.jmsg_http, self.msg_http.to_partial_json())
@ -98,6 +114,37 @@ class SimpleHTTPResponseTest(unittest.TestCase):
hash(SimpleHTTPResponse.from_json(self.jmsg_http))
hash(SimpleHTTPResponse.from_json(self.jmsg_https))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_good_token(self, mock_get):
for resp in self.resp_http, self.resp_https:
mock_get.reset_mock()
mock_get.return_value = mock.MagicMock(
text=self.chall.token, headers=self.good_headers)
self.assertTrue(resp.simple_verify(self.chall, "local"))
mock_get.assert_called_once_with(resp.uri("local"), verify=False)
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_token(self, mock_get):
mock_get.return_value = mock.MagicMock(
text=self.chall.token + "!", headers=self.good_headers)
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_content_type(self, mock_get):
mock_get().text = self.chall.token
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_connection_error(self, mock_get):
mock_get.side_effect = requests.exceptions.RequestException
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_port(self, mock_get):
self.resp_http.simple_verify(self.chall, "local", 4430)
self.assertEqual("local:4430", urlparse.urlparse(
mock_get.mock_calls[0][1][0]).netloc)
class DVSNITest(unittest.TestCase):

View file

@ -480,9 +480,11 @@ def create_parser(plugins, args):
"testing", "--no-verify-ssl", action="store_true",
help=config_help("no_verify_ssl"),
default=flag_default("no_verify_ssl"))
helpful.add( # TODO: apache and nginx plugins do NOT respect it (#479)
helpful.add( # TODO: apache plugin does NOT respect it (#479)
"testing", "--dvsni-port", type=int, default=flag_default("dvsni_port"),
help=config_help("dvsni_port"))
helpful.add("testing", "--simple-http-port", type=int,
help=config_help("simple_http_port"))
helpful.add("testing", "--no-simple-http-tls", action="store_true",
help=config_help("no_simple_http_tls"))

View file

@ -188,6 +188,8 @@ class IConfig(zope.interface.Interface):
no_simple_http_tls = zope.interface.Attribute(
"Do not use TLS when solving SimpleHTTP challenges.")
simple_http_port = zope.interface.Attribute(
"Port used in the SimpleHttp challenge.")
class IInstaller(IPlugin):

View file

@ -1,9 +1,7 @@
"""Manual plugin."""
import logging
import os
import sys
import requests
import zope.component
import zope.interface
@ -14,9 +12,6 @@ from letsencrypt import interfaces
from letsencrypt.plugins import common
logger = logging.getLogger(__name__)
class ManualAuthenticator(common.Plugin):
"""Manual Authenticator.
@ -34,36 +29,47 @@ Make sure your web server displays the following content at
{achall.token}
Content-Type header MUST be set to {ct}.
If you don't have HTTP server configured, you can run the following
command on the target server (as root):
{command}
"""
# "cd /tmp/letsencrypt" makes sure user doesn't serve /root,
# separate "public_html" ensures that cert.pem/key.pem are not
# served and makes it more obvious that Python command will serve
# anything recursively under the cwd
HTTP_TEMPLATE = """\
mkdir -p {response.URI_ROOT_PATH}
mkdir -p /tmp/letsencrypt/public_html/{response.URI_ROOT_PATH}
cd /tmp/letsencrypt/public_html
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
# run only once per server:
python -m SimpleHTTPServer 80"""
python -c "import BaseHTTPServer, SimpleHTTPServer; \\
SimpleHTTPServer.SimpleHTTPRequestHandler.extensions_map = {{'': '{ct}'}}; \\
s = BaseHTTPServer.HTTPServer(('', {port}), SimpleHTTPServer.SimpleHTTPRequestHandler); \\
s.serve_forever()" """
"""Non-TLS command template."""
# https://www.piware.de/2011/01/creating-an-https-server-in-python/
HTTPS_TEMPLATE = """\
mkdir -p {response.URI_ROOT_PATH} # run only once per server
mkdir -p /tmp/letsencrypt/public_html/{response.URI_ROOT_PATH}
cd /tmp/letsencrypt/public_html
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
# run only once per server:
openssl req -new -newkey rsa:4096 -subj "/" -days 1 -nodes -x509 -keyout key.pem -out cert.pem
openssl req -new -newkey rsa:4096 -subj "/" -days 1 -nodes -x509 -keyout ../key.pem -out ../cert.pem
python -c "import BaseHTTPServer, SimpleHTTPServer, ssl; \\
s = BaseHTTPServer.HTTPServer(('', 443), SimpleHTTPServer.SimpleHTTPRequestHandler); \\
s.socket = ssl.wrap_socket(s.socket, keyfile='key.pem', certfile='cert.pem'); \\
SimpleHTTPServer.SimpleHTTPRequestHandler.extensions_map = {{'': '{ct}'}}; \\
s = BaseHTTPServer.HTTPServer(('', {port}), SimpleHTTPServer.SimpleHTTPRequestHandler); \\
s.socket = ssl.wrap_socket(s.socket, keyfile='../key.pem', certfile='../cert.pem'); \\
s.serve_forever()" """
"""TLS command template.
According to the ACME specification, "the ACME server MUST ignore
the certificate provided by the HTTPS server", so the first command
generates temporary self-signed certificate. For the same reason
``requests.get`` in `_verify` sets ``verify=False``. Python HTTPS
server command serves the ``token`` on all URIs.
generates temporary self-signed certificate.
"""
@ -105,11 +111,14 @@ binary for temporary key/certificate generation.""".replace("\n", "")
assert response.good_path # is encoded os.urandom(18) good?
self._notify_and_wait(self.MESSAGE_TEMPLATE.format(
achall=achall, response=response,
uri=response.uri(achall.domain),
command=self.template.format(achall=achall, response=response)))
achall=achall, response=response, uri=response.uri(achall.domain),
ct=response.CONTENT_TYPE, command=self.template.format(
achall=achall, response=response, ct=response.CONTENT_TYPE,
port=(response.port if self.config.simple_http_port is None
else self.config.simple_http_port))))
if self._verify(achall, response):
if response.simple_verify(
achall.challb, achall.domain, self.config.simple_http_port):
return response
else:
return None
@ -121,21 +130,5 @@ binary for temporary key/certificate generation.""".replace("\n", "")
sys.stdout.write(message)
raw_input("Press ENTER to continue")
def _verify(self, achall, chall_response): # pylint: disable=no-self-use
uri = chall_response.uri(achall.domain)
logger.debug("Verifying %s...", uri)
try:
response = requests.get(uri, verify=False)
except requests.exceptions.ConnectionError as error:
logger.exception(error)
return False
ret = response.text == achall.token
if not ret:
logger.error("Unable to verify %s! Expected: %r, returned: %r.",
uri, achall.token, response.text)
return ret
def cleanup(self, achalls): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover

View file

@ -2,7 +2,6 @@
import unittest
import mock
import requests
from acme import challenges
@ -15,7 +14,8 @@ class ManualAuthenticatorTest(unittest.TestCase):
def setUp(self):
from letsencrypt.plugins.manual import ManualAuthenticator
self.config = mock.MagicMock(no_simple_http_tls=True)
self.config = mock.MagicMock(
no_simple_http_tls=True, simple_http_port=4430)
self.auth = ManualAuthenticator(config=self.config, name="manual")
self.achalls = [achallenges.SimpleHTTP(
challb=acme_util.SIMPLE_HTTP, domain="foo.com", key=None)]
@ -32,28 +32,25 @@ class ManualAuthenticatorTest(unittest.TestCase):
@mock.patch("letsencrypt.plugins.manual.sys.stdout")
@mock.patch("letsencrypt.plugins.manual.os.urandom")
@mock.patch("letsencrypt.plugins.manual.requests.get")
@mock.patch("acme.challenges.SimpleHTTPResponse.simple_verify")
@mock.patch("__builtin__.raw_input")
def test_perform(self, mock_raw_input, mock_get, mock_urandom, mock_stdout):
def test_perform(self, mock_raw_input, mock_verify, mock_urandom,
mock_stdout):
mock_urandom.return_value = "foo"
mock_get().text = self.achalls[0].token
mock_verify.return_value = True
self.assertEqual(
[challenges.SimpleHTTPResponse(tls=False, path='Zm9v')],
self.auth.perform(self.achalls))
resp = challenges.SimpleHTTPResponse(tls=False, path='Zm9v')
self.assertEqual([resp], self.auth.perform(self.achalls))
mock_raw_input.assert_called_once()
mock_get.assert_called_with(
"http://foo.com/.well-known/acme-challenge/Zm9v", verify=False)
mock_verify.assert_called_with(self.achalls[0].challb, "foo.com", 4430)
message = mock_stdout.write.mock_calls[0][1][0]
self.assertTrue(self.achalls[0].token in message)
self.assertTrue('Zm9v' in message)
mock_get().text = self.achalls[0].token + '!'
mock_verify.return_value = False
self.assertEqual([None], self.auth.perform(self.achalls))
mock_get.side_effect = requests.exceptions.ConnectionError
self.assertEqual([None], self.auth.perform(self.achalls))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -61,7 +61,6 @@ letsencrypt_install_requires = [
'pyrfc3339',
'python2-pythondialog>=3.2.2rc1', # Debian squeeze support, cf. #280
'pytz',
'requests',
'zope.component',
'zope.interface',
'M2Crypto',