Merge branch 'validator' of git://github.com/martijnbastiaan/lets-encrypt-preview into martijnbastiaan-validator

Conflicts:
	README.md
	letsencrypt/client/interfaces.py
This commit is contained in:
James Kasten 2015-01-29 03:48:07 -08:00
commit 435b7a126a
6 changed files with 216 additions and 3 deletions

View file

@ -24,7 +24,7 @@ Ubuntu
::
sudo apt-get install python python-setuptools python-virtualenv python-dev \
gcc swig dialog libaugeas0 libssl-dev ca-certificates
gcc swig dialog libaugeas0 libssl-dev ca-certificates openssl
Mac OSX

View file

@ -31,3 +31,7 @@ class LetsEncryptMisconfigurationError(LetsEncryptConfiguratorError):
class LetsEncryptDvsniError(LetsEncryptConfiguratorError):
"""Let's Encrypt DVSNI error."""
class LetsEncryptValidationError(LetsEncryptClientError):
pass

View file

@ -168,7 +168,7 @@ class IDisplay(zope.interface.Interface):
"""Ask the user whether they would like to redirect to HTTPS."""
class IValidator(object):
class IValidator(zope.interface.Interface):
"""Configuration validator."""
def redirect(name):
@ -177,8 +177,11 @@ class IValidator(object):
def ocsp_stapling(name):
"""Verify ocsp stapling for domain."""
def https(names):
def https(name):
"""Verifiy HTTPS is enabled for domain."""
def hsts(name):
"""Verify HSTS header is enabled."""
def spdy(name):
"""Verify SPDY is enabled."""

View file

@ -0,0 +1,72 @@
import unittest
import responses
from requests.exceptions import ConnectionError
from letsencrypt.client.errors import LetsEncryptValidationError
from letsencrypt.client.validator import Validator
def _add(secure=False, **kwargs):
url = "{}://test.com".format("https" if secure else "http")
return responses.add(responses.GET, url, **kwargs)
class ValidatorTest(unittest.TestCase):
@responses.activate
def test_succesful_redirect(self):
_add(status=301, adding_headers={"location": "https://test.com"})
self.assertTrue(Validator().redirect("test.com"))
@responses.activate
def test_redirect_missing_location(self):
_add(status=301)
self.assertFalse(Validator().redirect("test.com"))
@responses.activate
def test_redirect_wrong_status_code(self):
_add(status=201, adding_headers={"location": "https://test.com"})
self.assertFalse(Validator().redirect("test.com"))
@responses.activate
def test_redirect_wrong_redirect_code(self):
_add(status=303, adding_headers={"location": "https://test.com"})
self.assertRaises(LetsEncryptValidationError, Validator().redirect, "test.com")
@responses.activate
def test_https_fail(self):
self.assertRaises(ConnectionError, Validator().https, "test.com")
@responses.activate
def test_https_success(self):
_add(secure=True, body="blaa")
self.assertTrue(Validator().https("test.com"))
@responses.activate
def test_hsts_empty(self):
_add(secure=True, adding_headers={"strict-transport-security": ""})
self.assertFalse(Validator().hsts("test.com"))
@responses.activate
def test_hsts_malformed(self):
_add(secure=True, adding_headers={"strict-transport-security": "sdfal"})
self.assertRaises(LetsEncryptValidationError, Validator().hsts, "test.com")
@responses.activate
def test_hsts_expire(self):
_add(secure=True, adding_headers={"strict-transport-security": "max-age=3600"})
self.assertRaises(LetsEncryptValidationError, Validator().hsts, "test.com")
@responses.activate
def test_hsts(self):
_add(secure=True, adding_headers={"strict-transport-security": "max-age=31536000"})
self.assertTrue(Validator().hsts("test.com"))
@responses.activate
def test_hsts(self):
headers = {"strict-transport-security": "max-age=31536000;includeSubDomains"}
_add(secure=True, adding_headers=headers)
self.assertTrue(Validator().hsts("test.com"))
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,133 @@
"""Validators to determine the current webserver configuration"""
from subprocess import PIPE, Popen
import logging
import re
import requests
import zope.interface
from letsencrypt.client import interfaces
from letsencrypt.client.errors import LetsEncryptValidationError
log = logging.getLogger(__name__)
OCSP_OPENSSL_CMD = "openssl s_client -connect {hostname}:443"
OCSP_OPENSSL_DELIMITER = "OCSP response:"
OCSP_OPENSSL_NO_RESPONSE = "no response sent"
PROTOCOLS_OPENSSL_DELIMITER = "Protocols advertised by server:"
SPDY_PROTOCOL_RE = re.compile(r"^spdy/\d(\.\d)?$")
def _openssl(hostname, args, input=None):
"""
Call openssl binary in client mode.
:raises LetsEncryptValidationError if openssl exits with error-code
:param hostname: server to connect to (on port 443)
:param args: arguments (list) to append to default ones
:param input: stdin to binary
:return: (stdout, stderr)
"""
openssl_cmd = OCSP_OPENSSL_CMD.format(hostname=hostname)
openssl_cmd = openssl_cmd.split(" ") + list(args)
log.debug("Calling openssl binary with arguments: %s", openssl_cmd[1:])
openssl = Popen(openssl_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdout, stderr = openssl.communicate(input=input)
log.debug("OpenSSL stdout: %s", stdout)
log.debug("OpenSSL stderr: %s", stderr)
if openssl.returncode != 0:
error_msg = "OpenSSL quit with error-code: {openssl.returncode}"
raise LetsEncryptValidationError(error_msg.format(openssl=openssl))
return stdout, stderr
def _filter_startswith(strings, start):
"""Yields all strings which start with given string."""
for string in strings:
if string.startswith(start):
yield string
class Validator(object):
"""Collection of functions to test a live webserver's configuration"""
zope.interface.implements(interfaces.IValidator)
def redirect(self, hostname):
"""Test whether webserver redirects to secure connection."""
response = requests.get("http://" + hostname, allow_redirects=False)
if response.status_code not in (301, 303):
return False
redirect_location = response.headers.get("location", "")
if not redirect_location.startswith("https://"):
return False
if response.status_code != 301:
error_msg = "Server did not redirect with permanent code."
raise LetsEncryptValidationError(error_msg)
return True
def https(self, hostname):
"""Test whether webserver supports HTTPS"""
requests.get("https://" + hostname, verify=True)
return True
def hsts(self, hostname):
"""Test for HTTP Strict Transport Security header"""
headers = requests.get("https://" + hostname).headers
hsts_header = headers.get("strict-transport-security")
if not hsts_header:
return False
# Split directives following RFC6797, section 6.1
directives = [d.split("=") for d in hsts_header.split(";")]
max_age = [d for d in directives if d[0] == "max-age"]
if not max_age:
error_msg = "Server responded with invalid HSTS header field."
raise LetsEncryptValidationError(error_msg)
try:
max_age_name, max_age_value = max_age[0]
max_age_value = int(max_age_value)
except ValueError:
error_msg = "Server responded with invalid HSTS header field."
raise LetsEncryptValidationError(error_msg)
# Test whether HSTS does not expire for at least two weeks.
if max_age_value <= (2 * 7 * 24 * 3600):
error_msg = "HSTS should not expire in less than two weeks."
raise LetsEncryptValidationError(error_msg)
return True
def ocsp_stapling(self, hostname):
"""Test for OCSP stapling. See RFC 6066, section 8."""
stdout, stderr = _openssl(hostname, ["-tls1", "-tlsextdebug", "-status"], input="QUIT\n")
ocsp_status = next(_filter_startswith(stdout.split("\n"), OCSP_OPENSSL_DELIMITER))
return OCSP_OPENSSL_NO_RESPONSE not in ocsp_status
def _get_nextgen_protocols(self, hostname):
"""Return a set with all 'nextgen' protocols supported by server (reported by openssl)."""
stdout, stderr = _openssl(hostname, ["-nextprotoneg", "''"], input="QUIT\n")
delimiter_line = list(_filter_startswith(stdout.split("\n"), PROTOCOLS_OPENSSL_DELIMITER))
if not delimiter_line:
return set()
protocols = delimiter_line[0].split(PROTOCOLS_OPENSSL_DELIMITER)[1]
return set(p.strip() for p in protocols.split(","))
def spdy(self, hostname):
"""Test for SPDY support"""
# SPDY is supported if we recognise at least one protocol
next_gen_protocols = self._get_nextgen_protocols(hostname)
spdy_protocols = filter(SPDY_PROTOCOL_RE.match, next_gen_protocols)
return bool(list(spdy_protocols))

View file

@ -1,3 +1,4 @@
responses==0.3.0
M2Crypto==0.22.3
python2-pythondialog
jsonschema==2.4.0