diff --git a/docs/using.rst b/docs/using.rst index 441bf1623..63d287921 100644 --- a/docs/using.rst +++ b/docs/using.rst @@ -24,7 +24,7 @@ Ubuntu :: sudo apt-get install python python-setuptools python-virtualenv python-dev \ - gcc swig dialog libaugeas0 libssl-dev ca-certificates + gcc swig dialog libaugeas0 libssl-dev ca-certificates openssl Mac OSX diff --git a/letsencrypt/client/errors.py b/letsencrypt/client/errors.py index 6a3739832..28ef3b9b2 100644 --- a/letsencrypt/client/errors.py +++ b/letsencrypt/client/errors.py @@ -31,3 +31,7 @@ class LetsEncryptMisconfigurationError(LetsEncryptConfiguratorError): class LetsEncryptDvsniError(LetsEncryptConfiguratorError): """Let's Encrypt DVSNI error.""" + + +class LetsEncryptValidationError(LetsEncryptClientError): + pass diff --git a/letsencrypt/client/interfaces.py b/letsencrypt/client/interfaces.py index 9e35a754a..47c564799 100644 --- a/letsencrypt/client/interfaces.py +++ b/letsencrypt/client/interfaces.py @@ -168,7 +168,7 @@ class IDisplay(zope.interface.Interface): """Ask the user whether they would like to redirect to HTTPS.""" -class IValidator(object): +class IValidator(zope.interface.Interface): """Configuration validator.""" def redirect(name): @@ -177,8 +177,11 @@ class IValidator(object): def ocsp_stapling(name): """Verify ocsp stapling for domain.""" - def https(names): + def https(name): """Verifiy HTTPS is enabled for domain.""" def hsts(name): """Verify HSTS header is enabled.""" + + def spdy(name): + """Verify SPDY is enabled.""" diff --git a/letsencrypt/client/tests/validator_test.py b/letsencrypt/client/tests/validator_test.py new file mode 100644 index 000000000..08074cd54 --- /dev/null +++ b/letsencrypt/client/tests/validator_test.py @@ -0,0 +1,72 @@ +import unittest + +import responses +from requests.exceptions import ConnectionError +from letsencrypt.client.errors import LetsEncryptValidationError +from letsencrypt.client.validator import Validator + + +def _add(secure=False, **kwargs): + url = "{}://test.com".format("https" if secure else "http") + return responses.add(responses.GET, url, **kwargs) + + +class ValidatorTest(unittest.TestCase): + @responses.activate + def test_succesful_redirect(self): + _add(status=301, adding_headers={"location": "https://test.com"}) + self.assertTrue(Validator().redirect("test.com")) + + @responses.activate + def test_redirect_missing_location(self): + _add(status=301) + self.assertFalse(Validator().redirect("test.com")) + + @responses.activate + def test_redirect_wrong_status_code(self): + _add(status=201, adding_headers={"location": "https://test.com"}) + self.assertFalse(Validator().redirect("test.com")) + + @responses.activate + def test_redirect_wrong_redirect_code(self): + _add(status=303, adding_headers={"location": "https://test.com"}) + self.assertRaises(LetsEncryptValidationError, Validator().redirect, "test.com") + + @responses.activate + def test_https_fail(self): + self.assertRaises(ConnectionError, Validator().https, "test.com") + + @responses.activate + def test_https_success(self): + _add(secure=True, body="blaa") + self.assertTrue(Validator().https("test.com")) + + @responses.activate + def test_hsts_empty(self): + _add(secure=True, adding_headers={"strict-transport-security": ""}) + self.assertFalse(Validator().hsts("test.com")) + + @responses.activate + def test_hsts_malformed(self): + _add(secure=True, adding_headers={"strict-transport-security": "sdfal"}) + self.assertRaises(LetsEncryptValidationError, Validator().hsts, "test.com") + + @responses.activate + def test_hsts_expire(self): + _add(secure=True, adding_headers={"strict-transport-security": "max-age=3600"}) + self.assertRaises(LetsEncryptValidationError, Validator().hsts, "test.com") + + @responses.activate + def test_hsts(self): + _add(secure=True, adding_headers={"strict-transport-security": "max-age=31536000"}) + self.assertTrue(Validator().hsts("test.com")) + + @responses.activate + def test_hsts(self): + headers = {"strict-transport-security": "max-age=31536000;includeSubDomains"} + _add(secure=True, adding_headers=headers) + self.assertTrue(Validator().hsts("test.com")) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/client/validator.py b/letsencrypt/client/validator.py new file mode 100644 index 000000000..7e46f415b --- /dev/null +++ b/letsencrypt/client/validator.py @@ -0,0 +1,133 @@ +"""Validators to determine the current webserver configuration""" +from subprocess import PIPE, Popen +import logging +import re + +import requests +import zope.interface + +from letsencrypt.client import interfaces +from letsencrypt.client.errors import LetsEncryptValidationError + +log = logging.getLogger(__name__) + +OCSP_OPENSSL_CMD = "openssl s_client -connect {hostname}:443" +OCSP_OPENSSL_DELIMITER = "OCSP response:" +OCSP_OPENSSL_NO_RESPONSE = "no response sent" +PROTOCOLS_OPENSSL_DELIMITER = "Protocols advertised by server:" +SPDY_PROTOCOL_RE = re.compile(r"^spdy/\d(\.\d)?$") + + +def _openssl(hostname, args, input=None): + """ + Call openssl binary in client mode. + + :raises LetsEncryptValidationError if openssl exits with error-code + :param hostname: server to connect to (on port 443) + :param args: arguments (list) to append to default ones + :param input: stdin to binary + :return: (stdout, stderr) + """ + openssl_cmd = OCSP_OPENSSL_CMD.format(hostname=hostname) + openssl_cmd = openssl_cmd.split(" ") + list(args) + + log.debug("Calling openssl binary with arguments: %s", openssl_cmd[1:]) + openssl = Popen(openssl_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + + stdout, stderr = openssl.communicate(input=input) + log.debug("OpenSSL stdout: %s", stdout) + log.debug("OpenSSL stderr: %s", stderr) + + if openssl.returncode != 0: + error_msg = "OpenSSL quit with error-code: {openssl.returncode}" + raise LetsEncryptValidationError(error_msg.format(openssl=openssl)) + + return stdout, stderr + + +def _filter_startswith(strings, start): + """Yields all strings which start with given string.""" + for string in strings: + if string.startswith(start): + yield string + + +class Validator(object): + """Collection of functions to test a live webserver's configuration""" + zope.interface.implements(interfaces.IValidator) + + def redirect(self, hostname): + """Test whether webserver redirects to secure connection.""" + response = requests.get("http://" + hostname, allow_redirects=False) + + if response.status_code not in (301, 303): + return False + + redirect_location = response.headers.get("location", "") + if not redirect_location.startswith("https://"): + return False + + if response.status_code != 301: + error_msg = "Server did not redirect with permanent code." + raise LetsEncryptValidationError(error_msg) + + return True + + def https(self, hostname): + """Test whether webserver supports HTTPS""" + requests.get("https://" + hostname, verify=True) + return True + + def hsts(self, hostname): + """Test for HTTP Strict Transport Security header""" + headers = requests.get("https://" + hostname).headers + hsts_header = headers.get("strict-transport-security") + + if not hsts_header: + return False + + # Split directives following RFC6797, section 6.1 + directives = [d.split("=") for d in hsts_header.split(";")] + max_age = [d for d in directives if d[0] == "max-age"] + + if not max_age: + error_msg = "Server responded with invalid HSTS header field." + raise LetsEncryptValidationError(error_msg) + + try: + max_age_name, max_age_value = max_age[0] + max_age_value = int(max_age_value) + except ValueError: + error_msg = "Server responded with invalid HSTS header field." + raise LetsEncryptValidationError(error_msg) + + # Test whether HSTS does not expire for at least two weeks. + if max_age_value <= (2 * 7 * 24 * 3600): + error_msg = "HSTS should not expire in less than two weeks." + raise LetsEncryptValidationError(error_msg) + + return True + + def ocsp_stapling(self, hostname): + """Test for OCSP stapling. See RFC 6066, section 8.""" + stdout, stderr = _openssl(hostname, ["-tls1", "-tlsextdebug", "-status"], input="QUIT\n") + ocsp_status = next(_filter_startswith(stdout.split("\n"), OCSP_OPENSSL_DELIMITER)) + return OCSP_OPENSSL_NO_RESPONSE not in ocsp_status + + def _get_nextgen_protocols(self, hostname): + """Return a set with all 'nextgen' protocols supported by server (reported by openssl).""" + stdout, stderr = _openssl(hostname, ["-nextprotoneg", "''"], input="QUIT\n") + delimiter_line = list(_filter_startswith(stdout.split("\n"), PROTOCOLS_OPENSSL_DELIMITER)) + + if not delimiter_line: + return set() + + protocols = delimiter_line[0].split(PROTOCOLS_OPENSSL_DELIMITER)[1] + return set(p.strip() for p in protocols.split(",")) + + def spdy(self, hostname): + """Test for SPDY support""" + # SPDY is supported if we recognise at least one protocol + next_gen_protocols = self._get_nextgen_protocols(hostname) + spdy_protocols = filter(SPDY_PROTOCOL_RE.match, next_gen_protocols) + return bool(list(spdy_protocols)) diff --git a/requirements.txt b/requirements.txt index a95a0807f..37993ae3c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +responses==0.3.0 M2Crypto==0.22.3 python2-pythondialog jsonschema==2.4.0