Add basic HTTP01 support to Apache

* Add a simple version of HTTP01

* remove cert from chall name

* make directory work on 2.2

* cleanup challenges when finished

* import shutil

* fixup perform and cleanup tests

* Add tests for http_01.py
This commit is contained in:
Brad Warren 2018-01-10 20:14:56 -08:00
parent 9e95208101
commit 2ba334a182
5 changed files with 278 additions and 36 deletions

View file

@ -24,9 +24,10 @@ from certbot_apache import apache_util
from certbot_apache import augeas_configurator
from certbot_apache import constants
from certbot_apache import display_ops
from certbot_apache import tls_sni_01
from certbot_apache import http_01
from certbot_apache import obj
from certbot_apache import parser
from certbot_apache import tls_sni_01
from collections import defaultdict
@ -163,6 +164,9 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"ensure-http-header": self._set_http_header,
"staple-ocsp": self._enable_ocsp_stapling}
# This will be set during the perform function
self.http_doer = None
@property
def mod_ssl_conf(self):
"""Full absolute path to SSL configuration file."""
@ -1855,7 +1859,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
###########################################################################
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Return list of challenge preferences."""
return [challenges.TLSSNI01]
return [challenges.TLSSNI01, challenges.HTTP01]
def perform(self, achalls):
"""Perform the configuration related challenge.
@ -1867,16 +1871,21 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
self._chall_out.update(achalls)
responses = [None] * len(achalls)
chall_doer = tls_sni_01.ApacheTlsSni01(self)
self.http_doer = http_01.ApacheHttp01(self)
sni_doer = tls_sni_01.ApacheTlsSni01(self)
for i, achall in enumerate(achalls):
# Currently also have chall_doer hold associated index of the
# challenge. This helps to put all of the responses back together
# when they are all complete.
chall_doer.add_chall(achall, i)
if isinstance(achall.chall, challenges.HTTP01):
self.http_doer.add_chall(achall, i)
else: # tls-sni-01
sni_doer.add_chall(achall, i)
sni_response = chall_doer.perform()
if sni_response:
http_response = self.http_doer.perform()
sni_response = sni_doer.perform()
if http_response or sni_response:
# Must reload in order to activate the challenges.
# Handled here because we may be able to load up other challenge
# types
@ -1886,14 +1895,18 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# of identifying when the new configuration is being used.
time.sleep(3)
# Go through all of the challenges and assign them to the proper
# place in the responses return value. All responses must be in the
# same order as the original challenges.
for i, resp in enumerate(sni_response):
responses[chall_doer.indices[i]] = resp
self._update_responses(responses, http_response, self.http_doer)
self._update_responses(responses, sni_response, sni_doer)
return responses
def _update_responses(self, responses, chall_response, chall_doer):
# Go through all of the challenges and assign them to the proper
# place in the responses return value. All responses must be in the
# same order as the original challenges.
for i, resp in enumerate(chall_response):
responses[chall_doer.indices[i]] = resp
def cleanup(self, achalls):
"""Revert all challenges."""
self._chall_out.difference_update(achalls)
@ -1903,6 +1916,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self.revert_challenge_config()
self.restart()
self.parser.reset_modules()
self.http_doer.cleanup()
def install_ssl_options_conf(self, options_ssl, options_ssl_digest):
"""Copy Certbot's SSL options file into the system's config dir if required."""

View file

@ -0,0 +1,94 @@
"""A class that performs HTTP-01 challenges for Apache"""
import logging
import os
import shutil
import tempfile
from certbot.plugins import common
logger = logging.getLogger(__name__)
class ApacheHttp01(common.TLSSNI01):
"""Class that performs HTPP-01 challenges within the Apache configurator."""
CONFIG_TEMPLATE24 = """\
Alias /.well-known/acme-challenge {0}
<Directory {0} >
Require all granted
</Directory>
"""
CONFIG_TEMPLATE22 = """\
Alias /.well-known/acme-challenge {0}
<Directory {0} >
Order allow,deny
Allow from all
</Directory>
"""
def __init__(self, *args, **kwargs):
super(ApacheHttp01, self).__init__(*args, **kwargs)
self.challenge_conf = os.path.join(
self.configurator.conf("challenge-location"),
"le_http_01_challenge.conf")
self.challenge_dir = None
def perform(self):
"""Perform all HTTP-01 challenges."""
if not self.achalls:
return []
# Save any changes to the configuration as a precaution
# About to make temporary changes to the config
self.configurator.save("Changes before challenge setup", True)
responses = self._set_up_challenges()
self._mod_config()
# Save reversible changes
self.configurator.save("HTTP Challenge", True)
return responses
def cleanup(self):
"""Cleanup the challenge directory."""
shutil.rmtree(self.challenge_dir, ignore_errors=True)
self.challenge_dir = None
def _mod_config(self):
self.configurator.parser.add_include(
self.configurator.parser.loc["default"], self.challenge_conf)
self.configurator.reverter.register_file_creation(
True, self.challenge_conf)
if self.configurator.version < (2, 4):
config_template = self.CONFIG_TEMPLATE22
else:
config_template = self.CONFIG_TEMPLATE24
config_text = config_template.format(self.challenge_dir)
logger.debug("writing a config file with text:\n %s", config_text)
with open(self.challenge_conf, "w") as new_conf:
new_conf.write(config_text)
def _set_up_challenges(self):
self.challenge_dir = tempfile.mkdtemp()
os.chmod(self.challenge_dir, 0o755)
responses = []
for achall in self.achalls:
responses.append(self._set_up_challenge(achall))
return responses
def _set_up_challenge(self, achall):
response, validation = achall.response_and_validation()
name = os.path.join(self.challenge_dir, achall.chall.encode("token"))
with open(name, 'wb') as f:
f.write(validation.encode())
os.chmod(name, 0o644)
return response

View file

@ -676,23 +676,33 @@ class MultipleVhostsTest(util.ApacheTest):
self.config._add_name_vhost_if_necessary(self.vh_truth[0])
self.assertEqual(self.config.add_name_vhost.call_count, 2)
@mock.patch("certbot_apache.configurator.http_01.ApacheHttp01.perform")
@mock.patch("certbot_apache.configurator.tls_sni_01.ApacheTlsSni01.perform")
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
def test_perform(self, mock_restart, mock_perform):
def test_perform(self, mock_restart, mock_tls_perform, mock_http_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
account_key, achall1, achall2 = self.get_achalls()
account_key, achalls = self.get_key_and_achalls()
expected = [
achall1.response(account_key),
achall2.response(account_key),
]
all_expected = []
http_expected = []
tls_expected = []
for achall in achalls:
response = achall.response(account_key)
if isinstance(achall.chall, challenges.HTTP01):
http_expected.append(response)
else:
tls_expected.append(response)
all_expected.append(response)
mock_perform.return_value = expected
responses = self.config.perform([achall1, achall2])
mock_http_perform.return_value = http_expected
mock_tls_perform.return_value = tls_expected
self.assertEqual(mock_perform.call_count, 1)
self.assertEqual(responses, expected)
responses = self.config.perform(achalls)
self.assertEqual(mock_http_perform.call_count, 1)
self.assertEqual(mock_tls_perform.call_count, 1)
self.assertEqual(responses, all_expected)
self.assertEqual(mock_restart.call_count, 1)
@ -700,30 +710,38 @@ class MultipleVhostsTest(util.ApacheTest):
@mock.patch("certbot_apache.parser.ApacheParser._get_runtime_cfg")
def test_cleanup(self, mock_cfg, mock_restart):
mock_cfg.return_value = ""
_, achall1, achall2 = self.get_achalls()
_, achalls = self.get_key_and_achalls()
self.config.http_doer = mock.MagicMock()
self.config._chall_out.add(achall1) # pylint: disable=protected-access
self.config._chall_out.add(achall2) # pylint: disable=protected-access
for achall in achalls:
self.config._chall_out.add(achall) # pylint: disable=protected-access
self.config.cleanup([achall1])
self.assertFalse(mock_restart.called)
self.config.cleanup([achall2])
self.assertTrue(mock_restart.called)
for i, achall in enumerate(achalls):
self.config.cleanup([achall])
if i == len(achalls) - 1:
self.assertTrue(mock_restart.called)
self.assertTrue(self.config.http_doer.cleanup.called)
else:
self.assertFalse(mock_restart.called)
self.assertFalse(self.config.http_doer.cleanup.called)
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
@mock.patch("certbot_apache.parser.ApacheParser._get_runtime_cfg")
def test_cleanup_no_errors(self, mock_cfg, mock_restart):
mock_cfg.return_value = ""
_, achall1, achall2 = self.get_achalls()
_, achalls = self.get_key_and_achalls()
self.config.http_doer = mock.MagicMock()
self.config._chall_out.add(achall1) # pylint: disable=protected-access
for achall in achalls:
self.config._chall_out.add(achall) # pylint: disable=protected-access
self.config.cleanup([achall2])
self.config.cleanup([achalls[-1]])
self.assertFalse(mock_restart.called)
self.assertFalse(self.config.http_doer.cleanup.called)
self.config.cleanup([achall1, achall2])
self.config.cleanup(achalls)
self.assertTrue(mock_restart.called)
self.assertTrue(self.config.http_doer.cleanup.called)
@mock.patch("certbot.util.run_script")
def test_get_version(self, mock_script):
@ -1151,7 +1169,7 @@ class MultipleVhostsTest(util.ApacheTest):
not_rewriterule = "NotRewriteRule ^ ..."
self.assertFalse(self.config._sift_rewrite_rule(not_rewriterule))
def get_achalls(self):
def get_key_and_achalls(self):
"""Return testing achallenges."""
account_key = self.rsa512jwk
achall1 = achallenges.KeyAuthorizationAnnotatedChallenge(
@ -1166,8 +1184,12 @@ class MultipleVhostsTest(util.ApacheTest):
token=b"uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU"),
"pending"),
domain="certbot.demo", account_key=account_key)
achall3 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=(b'x' * 16)), "pending"),
domain="example.org", account_key=account_key)
return account_key, achall1, achall2
return account_key, (achall1, achall2, achall3)
def test_make_addrs_sni_ready(self):
self.config.version = (2, 2)

View file

@ -0,0 +1,112 @@
"""Test for certbot_apache.http_01."""
import os
import unittest
from acme import challenges
from certbot import achallenges
from certbot.tests import acme_util
from certbot_apache.tests import util
NUM_ACHALLS = 3
class ApacheHttp01TestMeta(type):
"""Generates parmeterized tests for testing perform."""
def __new__(mcs, name, bases, class_dict):
def _gen_test(num_achalls, minor_version):
def _test(self):
achalls = self.achalls[:num_achalls]
self.config.version = (2, minor_version)
self.common_perform_test(achalls)
return _test
for i in range(1, NUM_ACHALLS + 1):
for j in (2, 4):
test_name = "test_perform_{0}_{1}".format(i, j)
class_dict[test_name] = _gen_test(i, j)
return type.__new__(mcs, name, bases, class_dict)
class ApacheHttp01Test(util.ApacheTest):
"""Test for certbot_apache.http_01.ApacheHttp01."""
__metaclass__ = ApacheHttp01TestMeta
def setUp(self, *args, **kwargs):
super(ApacheHttp01Test, self).setUp(*args, **kwargs)
self.maxDiff = None
self.account_key = self.rsa512jwk
self.achalls = []
for i in range(NUM_ACHALLS):
self.achalls.append(
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=((chr(ord('a') + i) * 16))),
"pending"),
domain="example{0}.com".format(i),
account_key=self.account_key))
from certbot_apache.http_01 import ApacheHttp01
self.http = ApacheHttp01(self.config)
def test_empty_perform(self):
self.assertFalse(self.http.perform())
def common_perform_test(self, achalls):
"""Tests perform with the given achalls."""
for achall in achalls:
self.http.add_chall(achall)
expected_response = [
achall.response(self.account_key) for achall in achalls]
self.assertEqual(self.http.perform(), expected_response)
self.assertTrue(os.path.isdir(self.http.challenge_dir))
self._has_min_permissions(self.http.challenge_dir, 0o755)
self._test_challenge_conf()
for achall in achalls:
self._test_challenge_file(achall)
challenge_dir = self.http.challenge_dir
self.http.cleanup()
self.assertFalse(os.path.exists(challenge_dir))
def _test_challenge_conf(self):
self.assertEqual(
len(self.config.parser.find_dir(
"Include", self.http.challenge_conf)), 1)
with open(self.http.challenge_conf) as f:
conf_contents = f.read()
alias_fmt = "Alias /.well-known/acme-challenge {0}"
alias = alias_fmt.format(self.http.challenge_dir)
self.assertTrue(alias in conf_contents)
if self.config.version < (2, 4):
self.assertTrue("Allow from all" in conf_contents)
else:
self.assertTrue("Require all granted" in conf_contents)
def _test_challenge_file(self, achall):
name = os.path.join(self.http.challenge_dir, achall.chall.encode("token"))
validation = achall.validation(self.account_key)
self._has_min_permissions(name, 0o644)
with open(name, 'rb') as f:
self.assertEqual(f.read(), validation.encode())
def _has_min_permissions(self, path, min_mode):
"""Tests the given file has at least the permissions in mode."""
st_mode = os.stat(path).st_mode
self.assertEqual(st_mode, st_mode | min_mode)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -1,6 +1,6 @@
"""Test for certbot_apache.tls_sni_01."""
import unittest
import shutil
import unittest
import mock