mirror of
https://github.com/certbot/certbot.git
synced 2026-06-03 13:59:02 -04:00
Added validator code
This commit is contained in:
commit
303f7ffe32
4 changed files with 192 additions and 6 deletions
|
|
@ -68,3 +68,6 @@ class MisconfigurationError(PluginError):
|
|||
|
||||
class RevokerError(Error):
|
||||
"""Let's Encrypt Revoker error."""
|
||||
|
||||
class ValidationError(Error):
|
||||
"""Let's Encrypt Validation error."""
|
||||
|
|
|
|||
|
|
@ -384,18 +384,18 @@ class IDisplay(zope.interface.Interface):
|
|||
class IValidator(zope.interface.Interface):
|
||||
"""Configuration validator."""
|
||||
|
||||
def redirect(name):
|
||||
def redirect(hostname, port=80, headers=None):
|
||||
"""Verify redirect to HTTPS."""
|
||||
|
||||
def ocsp_stapling(name):
|
||||
"""Verify ocsp stapling for domain."""
|
||||
|
||||
def https(names):
|
||||
def https(hostname, port=443, headers=None):
|
||||
"""Verify HTTPS is enabled for domain."""
|
||||
|
||||
def hsts(name):
|
||||
def hsts(hostname):
|
||||
"""Verify HSTS header is enabled."""
|
||||
|
||||
def ocsp_stapling(hostname):
|
||||
"""Verify ocsp stapling for domain."""
|
||||
|
||||
|
||||
class IReporter(zope.interface.Interface):
|
||||
"""Interface to collect and display information to the user."""
|
||||
|
|
|
|||
106
letsencrypt/tests/validator_test.py
Normal file
106
letsencrypt/tests/validator_test.py
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
"""Tests for letsencrypt.validator."""
|
||||
import requests
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import validator
|
||||
|
||||
|
||||
class ValidatorTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.validator = validator.Validator()
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_succesful_redirect(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
301, {"location" : "https://test.com"})
|
||||
self.assertTrue(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_redirect_missing_location(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(301)
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_redirect_wrong_status_code(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
201, {"location" : "https://test.com"})
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_redirect_wrong_redirect_code(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
303, {"location" : "https://test.com"})
|
||||
self.assertRaises(
|
||||
errors.ValidationError, self.validator.redirect, "test.com")
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_https_fail(self, mock_get_request):
|
||||
mock_get_request.side_effect = [requests.exceptions.ConnectionError]
|
||||
self.assertRaises(
|
||||
requests.exceptions.ConnectionError, self.validator.https, "test.com")
|
||||
|
||||
def test_https_success(self):
|
||||
with mock.patch("letsencrypt.validator.requests.get"):
|
||||
self.assertTrue(self.validator.https(
|
||||
"test.com", headers={"Host" : "test.com"}))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_empty(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": ""})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_malformed(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "sdfal"})
|
||||
self.assertRaises(
|
||||
errors.ValidationError, self.validator.hsts, "test.com")
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_bad_max_age(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=not-an-int"})
|
||||
self.assertRaises(
|
||||
errors.ValidationError, self.validator.hsts, "test.com")
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_expire(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=3600"})
|
||||
self.assertRaises(
|
||||
errors.ValidationError, self.validator.hsts, "test.com")
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=31536000"})
|
||||
self.assertTrue(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_include_subdomains(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security":
|
||||
"max-age=31536000;includeSubDomains"})
|
||||
self.assertTrue(self.validator.hsts("test.com"))
|
||||
|
||||
def test_ocsp_stapling(self):
|
||||
self.assertRaises(
|
||||
NotImplementedError, self.validator.ocsp_stapling, "test.com")
|
||||
|
||||
def create_response(status_code=200, headers=None):
|
||||
"""Creates a requests.Response object for testing"""
|
||||
response = requests.Response()
|
||||
response.status_code = status_code
|
||||
|
||||
if headers:
|
||||
response.headers = headers
|
||||
|
||||
return response
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
77
letsencrypt/validator.py
Normal file
77
letsencrypt/validator.py
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
"""Validators to determine the current webserver configuration"""
|
||||
import requests
|
||||
import zope.interface
|
||||
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import interfaces
|
||||
|
||||
|
||||
class Validator(object):
|
||||
# pylint: disable=no-self-use
|
||||
"""Collection of functions to test a live webserver's configuration"""
|
||||
zope.interface.implements(interfaces.IValidator)
|
||||
|
||||
def redirect(self, hostname, port=80, headers=None):
|
||||
"""Test whether webserver redirects to secure connection."""
|
||||
response = _get("http", hostname, port, headers)
|
||||
|
||||
if response.status_code not in (301, 303):
|
||||
return False
|
||||
|
||||
redirect_location = response.headers.get("location", "")
|
||||
if not redirect_location.startswith("https://"):
|
||||
return False
|
||||
|
||||
if response.status_code != 301:
|
||||
error_msg = "Server did not redirect with permanent code."
|
||||
raise errors.ValidationError(error_msg)
|
||||
|
||||
return True
|
||||
|
||||
def https(self, hostname, port=443, headers=None):
|
||||
"""Test whether webserver supports HTTPS"""
|
||||
_get("https", hostname, port, headers)
|
||||
return True
|
||||
|
||||
def hsts(self, hostname):
|
||||
"""Test for HTTP Strict Transport Security header"""
|
||||
headers = requests.get("https://" + hostname).headers
|
||||
hsts_header = headers.get("strict-transport-security")
|
||||
|
||||
if not hsts_header:
|
||||
return False
|
||||
|
||||
# Split directives following RFC6797, section 6.1
|
||||
directives = [d.split("=") for d in hsts_header.split(";")]
|
||||
max_age = [d for d in directives if d[0] == "max-age"]
|
||||
|
||||
if not max_age:
|
||||
error_msg = "Server responded with invalid HSTS header field."
|
||||
raise errors.ValidationError(error_msg)
|
||||
|
||||
try:
|
||||
_, max_age_value = max_age[0]
|
||||
max_age_value = int(max_age_value)
|
||||
except ValueError:
|
||||
error_msg = "Server responded with invalid HSTS header field."
|
||||
raise errors.ValidationError(error_msg)
|
||||
|
||||
# Test whether HSTS does not expire for at least two weeks.
|
||||
if max_age_value <= (2 * 7 * 24 * 3600):
|
||||
error_msg = "HSTS should not expire in less than two weeks."
|
||||
raise errors.ValidationError(error_msg)
|
||||
|
||||
return True
|
||||
|
||||
def ocsp_stapling(self, name):
|
||||
"""Verify ocsp stapling for domain."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def _get(scheme, hostname, port, headers, **kwargs):
|
||||
"""Makes a GET request for specified resource"""
|
||||
url = "{0}://{1}:{2}".format(scheme, hostname, port)
|
||||
if headers:
|
||||
return requests.get(url, headers=headers, **kwargs)
|
||||
else:
|
||||
return requests.get(url, **kwargs)
|
||||
Loading…
Reference in a new issue