From cdf59a8b7f5342836ca2b39602f50bd9f8602f4f Mon Sep 17 00:00:00 2001 From: Martijn Bastiaan Date: Sun, 18 Jan 2015 16:50:59 +0100 Subject: [PATCH] Fixed pylint warnings, added test skeleton, implemented regular expression for SPDY test --- letsencrypt/client/tests/validator_test.py | 5 ++ letsencrypt/client/validator.py | 94 +++++++++++----------- 2 files changed, 50 insertions(+), 49 deletions(-) create mode 100644 letsencrypt/client/tests/validator_test.py diff --git a/letsencrypt/client/tests/validator_test.py b/letsencrypt/client/tests/validator_test.py new file mode 100644 index 000000000..c6561bdf3 --- /dev/null +++ b/letsencrypt/client/tests/validator_test.py @@ -0,0 +1,5 @@ +import unittest + + +class ValidatorTest(unittest.TestCase): + pass diff --git a/letsencrypt/client/validator.py b/letsencrypt/client/validator.py index 52f951687..6a0cd1e81 100644 --- a/letsencrypt/client/validator.py +++ b/letsencrypt/client/validator.py @@ -1,6 +1,7 @@ """Validators to determine the current webserver configuration""" from subprocess import PIPE, Popen import logging +import re import requests import zope.interface @@ -14,7 +15,8 @@ OCSP_OPENSSL_CMD = "openssl s_client -connect {hostname}:443" OCSP_OPENSSL_DELIMITER = "OCSP response:" OCSP_OPENSSL_NO_RESPONSE = "no response sent" PROTOCOLS_OPENSSL_DELIMITER = "Protocols advertised by server:" -SPDY_PROTOCOLS = {"spdy/3.1", "spdy/3"} +SPDY_PROTOCOL_RE = re.compile(r"^spdy/\d(\.\d)?$") + def _openssl(hostname, args, input=None): """ @@ -26,26 +28,36 @@ def _openssl(hostname, args, input=None): :param input: stdin to binary :return: (stdout, stderr) """ - openssl_cmd = OCSP_OPENSSL_CMD.format(hostname=hostname).split(" ") + list(args) + openssl_cmd = OCSP_OPENSSL_CMD.format(**locals()).split(" ") + list(args) - log.debug("Calling openssl binary with arguments: " + str(openssl_cmd[1:])) + log.debug("Calling openssl binary with arguments: %s", openssl_cmd[1:]) openssl = Popen(openssl_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) stdout, stderr = openssl.communicate(input=input) - log.debug("OpenSSL stdout: " + stdout) - log.debug("OpenSSL stderr: " + stderr) + log.debug("OpenSSL stdout: %s", stdout) + log.debug("OpenSSL stderr: %s", stderr) if openssl.returncode != 0: - raise LetsEncryptValidationError("OpenSSL quit with error-code: {openssl.returncode}.".format(openssl=openssl)) + error_msg = "OpenSSL quit with error-code: {openssl.returncode}" + raise LetsEncryptValidationError(error_msg.format(openssl=openssl)) return stdout, stderr +def _filter_startswith(strings, start): + """Yields all strings which start with given string.""" + for string in strings: + if string.startswith(start): + yield string + + class Validator(object): + """Collection of functions to test a live webserver's configuration""" zope.interface.implements(interfaces.IValidator) - def redirect(self, name): - response = requests.get("http://" + name, allow_redirects=False) + def redirect(self, hostname): + """Test whether webserver redirects to secure connection.""" + response = requests.get("http://" + hostname, allow_redirects=False) if response.status_code not in (301, 303): return False @@ -55,73 +67,57 @@ class Validator(object): return False if response.status_code != 301: - raise LetsEncryptValidationError("Server did not redirect with permanent code.") + error_msg = "Server did not redirect with permanent code." + raise LetsEncryptValidationError(error_msg) return True - def https(self, name): - requests.get("https://" + name, verify=True) + def https(self, hostname): + """Test whether webserver supports HTTPS""" + requests.get("https://" + hostname, verify=True) return True - def hsts(self, name): - headers = requests.get("https://" + name).headers + def hsts(self, hostname): + """Test for HTTP Strict Transport Security header""" + headers = requests.get("https://" + hostname).headers hsts_header = headers.get("strict-transport-security") - + if not hsts_header: return False # Split directives following RFC6797, section 6.1 directives = [d.split("=") for d in hsts_header.split(";")] max_age = [d for d in directives if d[0] == "max-age"][0] - + try: max_age_name, max_age_value = max_age max_age_value = int(max_age_value) except ValueError: - raise LetsEncryptValidationError("Server responded with invalid HSTS header field.") + error_msg = "Server responded with invalid HSTS header field." + raise LetsEncryptValidationError(error_msg) return True - def ocsp_stapling(self, name): - stdout, stderr = _openssl(name, ["-tls1", "-tlsextdebug", "-status"], input="QUIT\n") - ocsp_status = next(line for line in stdout.split("\n") if line.startswith(OCSP_OPENSSL_DELIMITER)) + def ocsp_stapling(self, hostname): + """Test for OCSP stapling. See RFC 6066, section 8.""" + stdout, stderr = _openssl(hostname, ["-tls1", "-tlsextdebug", "-status"], input="QUIT\n") + ocsp_status = next(_filter_startswith(stdout.split("\n"), OCSP_OPENSSL_DELIMITER)) return OCSP_OPENSSL_NO_RESPONSE not in ocsp_status - def _get_nextgen_protocols(self, name): + def _get_nextgen_protocols(self, hostname): """Return a set with all 'nextgen' protocols supported by server (reported by openssl).""" - stdout, stderr = _openssl(name, ["-nextprotoneg", "''"], input="QUIT\n") - delimiter_line = list(line for line in stdout.split("\n") if line.startswith(PROTOCOLS_OPENSSL_DELIMITER)) + stdout, stderr = _openssl(hostname, ["-nextprotoneg", "''"], input="QUIT\n") + delimiter_line = list(_filter_startswith(stdout.split("\n"), PROTOCOLS_OPENSSL_DELIMITER)) if not delimiter_line: return set() protocols = delimiter_line[0].split(PROTOCOLS_OPENSSL_DELIMITER)[1] - return {p.strip() for p in protocols.split(",")} + return set(p.strip() for p in protocols.split(",")) - def spdy(self, name): + def spdy(self, hostname): + """Test for SPDY support""" # SPDY is supported if we recognise at least one protocol - return bool(self._get_nextgen_protocols(name) & SPDY_PROTOCOLS) - - - -if __name__ == '__main__': - print("letsencrypt.org:") - print(Validator().ocsp_stapling("letsencrypt.org")) - print(Validator().hsts("letsencrypt.org")) - print(Validator().https("letsencrypt.org")) - print(Validator().redirect("letsencrypt.org")) - print(Validator().ocsp_stapling("letsencrypt.org")) - print(Validator().spdy("letsencrypt.org")) - print(Validator()._get_nextgen_protocols("letsencrypt.org")) - - print("\ntweakers.net:") - print(Validator().hsts("tweakers.net")) - print(Validator().https("tweakers.net")) - print(Validator().redirect("tweakers.net")) - print(Validator().ocsp_stapling("tweakers.net")) - print(Validator().spdy("tweakers.net")) - print(Validator()._get_nextgen_protocols("tweakers.net")) - - print("\nnon-existing-domain.net:") - print(Validator().ocsp_stapling("non-existing-domain.net")) - + next_gen_protocols = self._get_nextgen_protocols(hostname) + spdy_protocols = filter(SPDY_PROTOCOL_RE.match, next_gen_protocols) + return bool(list(spdy_protocols))