Merge pull request #2179 from joohoi/apache_pep8

PEP8 - fixes to apache plugin
This commit is contained in:
Peter Eckersley 2016-01-15 15:14:30 -08:00
commit 10214e2668
12 changed files with 182 additions and 130 deletions

View file

@ -120,7 +120,8 @@ class AugeasConfigurator(common.Plugin):
self.reverter.add_to_temp_checkpoint(
save_files, self.save_notes)
else:
self.reverter.add_to_checkpoint(save_files, self.save_notes)
self.reverter.add_to_checkpoint(save_files,
self.save_notes)
except errors.ReverterError as err:
raise errors.PluginError(str(err))

View file

@ -133,7 +133,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
@property
def mod_ssl_conf(self):
"""Full absolute path to SSL configuration file."""
return os.path.join(self.config.config_dir, constants.MOD_SSL_CONF_DEST)
return os.path.join(self.config.config_dir,
constants.MOD_SSL_CONF_DEST)
def prepare(self):
"""Prepare the authenticator/installer.
@ -191,15 +192,15 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return matches
def deploy_cert(self, domain, cert_path, key_path,
chain_path=None, fullchain_path=None): # pylint: disable=unused-argument
chain_path=None, fullchain_path=None):
"""Deploys certificate to specified virtual host.
Currently tries to find the last directives to deploy the cert in
the VHost associated with the given domain. If it can't find the
directives, it searches the "included" confs. The function verifies that
it has located the three directives and finally modifies them to point
to the correct destination. After the certificate is installed, the
VirtualHost is enabled if it isn't already.
directives, it searches the "included" confs. The function verifies
that it has located the three directives and finally modifies them
to point to the correct destination. After the certificate is
installed, the VirtualHost is enabled if it isn't already.
.. todo:: Might be nice to remove chain directive if none exists
This shouldn't happen within letsencrypt though
@ -215,8 +216,10 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# cert_key... can all be parsed appropriately
self.prepare_server_https("443")
path = {"cert_path": self.parser.find_dir("SSLCertificateFile", None, vhost.path),
"cert_key": self.parser.find_dir("SSLCertificateKeyFile", None, vhost.path)}
path = {"cert_path": self.parser.find_dir("SSLCertificateFile",
None, vhost.path),
"cert_key": self.parser.find_dir("SSLCertificateKeyFile",
None, vhost.path)}
# Only include if a certificate chain is specified
if chain_path is not None:
@ -246,7 +249,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self.parser.add_dir(vhost.path,
"SSLCertificateChainFile", chain_path)
else:
raise errors.PluginError("--chain-path is required for your version of Apache")
raise errors.PluginError("--chain-path is required for your "
"version of Apache")
else:
if not fullchain_path:
raise errors.PluginError("Please provide the --fullchain-path\
@ -320,7 +324,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
elif not vhost.ssl:
addrs = self._get_proposed_addrs(vhost, "443")
# TODO: Conflicts is too conservative
if not any(vhost.enabled and vhost.conflicts(addrs) for vhost in self.vhosts):
if not any(vhost.enabled and vhost.conflicts(addrs) for
vhost in self.vhosts):
vhost = self.make_vhost_ssl(vhost)
else:
logger.error(
@ -588,15 +593,16 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self.prepare_https_modules(temp)
# Check for Listen <port>
# Note: This could be made to also look for ip:443 combo
listens = [self.parser.get_arg(x).split()[0] for x in self.parser.find_dir("Listen")]
listens = [self.parser.get_arg(x).split()[0] for
x in self.parser.find_dir("Listen")]
# In case no Listens are set (which really is a broken apache config)
if not listens:
listens = ["80"]
if port in listens:
return
for listen in listens:
# For any listen statement, check if the machine also listens on Port 443.
# If not, add such a listen statement.
# For any listen statement, check if the machine also listens on
# Port 443. If not, add such a listen statement.
if len(listen.split(":")) == 1:
# Its listening to all interfaces
if port not in listens:
@ -624,8 +630,9 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self.parser.add_dir_to_ifmodssl(
parser.get_aug_path(
self.parser.loc["listen"]), "Listen", args)
self.save_notes += "Added Listen %s:%s directive to %s\n" % (
ip, port, self.parser.loc["listen"])
self.save_notes += ("Added Listen %s:%s directive to "
"%s\n") % (ip, port,
self.parser.loc["listen"])
listens.append("%s:%s" % (ip, port))
def prepare_https_modules(self, temp):
@ -824,20 +831,25 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
def _clean_vhost(self, vhost):
# remove duplicated or conflicting ssl directives
self._deduplicate_directives(vhost.path,
["SSLCertificateFile", "SSLCertificateKeyFile"])
["SSLCertificateFile",
"SSLCertificateKeyFile"])
# remove all problematic directives
self._remove_directives(vhost.path, ["SSLCertificateChainFile"])
def _deduplicate_directives(self, vh_path, directives):
for directive in directives:
while len(self.parser.find_dir(directive, None, vh_path, False)) > 1:
directive_path = self.parser.find_dir(directive, None, vh_path, False)
while len(self.parser.find_dir(directive, None,
vh_path, False)) > 1:
directive_path = self.parser.find_dir(directive, None,
vh_path, False)
self.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
def _remove_directives(self, vh_path, directives):
for directive in directives:
while len(self.parser.find_dir(directive, None, vh_path, False)) > 0:
directive_path = self.parser.find_dir(directive, None, vh_path, False)
while len(self.parser.find_dir(directive, None,
vh_path, False)) > 0:
directive_path = self.parser.find_dir(directive, None,
vh_path, False)
self.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
def _add_dummy_ssl_directives(self, vh_path):
@ -864,7 +876,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
for addr in vhost.addrs:
for test_vh in self.vhosts:
if (vhost.filep != test_vh.filep and
any(test_addr == addr for test_addr in test_vh.addrs) and
any(test_addr == addr for
test_addr in test_vh.addrs) and
not self.is_name_vhost(addr)):
self.add_name_vhost(addr)
logger.info("Enabling NameVirtualHosts on %s", addr)
@ -873,9 +886,9 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if need_to_save:
self.save()
############################################################################
######################################################################
# Enhancements
############################################################################
######################################################################
def supported_enhancements(self): # pylint: disable=no-self-use
"""Returns currently supported enhancements."""
return ["redirect", "ensure-http-header"]
@ -936,14 +949,14 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Add directives to server
self.parser.add_dir(ssl_vhost.path, "Header",
constants.HEADER_ARGS[header_substring])
constants.HEADER_ARGS[header_substring])
self.save_notes += ("Adding %s header to ssl vhost in %s\n" %
(header_substring, ssl_vhost.filep))
(header_substring, ssl_vhost.filep))
self.save()
logger.info("Adding %s header to ssl vhost in %s", header_substring,
ssl_vhost.filep)
ssl_vhost.filep)
def _verify_no_matching_http_header(self, ssl_vhost, header_substring):
"""Checks to see if an there is an existing Header directive that
@ -963,14 +976,15 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
header_substring exists
"""
header_path = self.parser.find_dir("Header", None, start=ssl_vhost.path)
header_path = self.parser.find_dir("Header", None,
start=ssl_vhost.path)
if header_path:
# "Existing Header directive for virtualhost"
pat = '(?:[ "]|^)(%s)(?:[ "]|$)' % (header_substring.lower())
for match in header_path:
if re.search(pat, self.aug.get(match).lower()):
raise errors.PluginEnhancementAlreadyPresent(
"Existing %s header" % (header_substring))
"Existing %s header" % (header_substring))
def _enable_redirect(self, ssl_vhost, unused_options):
"""Redirect all equivalent HTTP traffic to ssl_vhost.
@ -1019,7 +1033,6 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Check if LetsEncrypt redirection already exists
self._verify_no_letsencrypt_redirect(general_vh)
# Note: if code flow gets here it means we didn't find the exact
# letsencrypt RewriteRule config for redirection. Finding
# another RewriteRule is likely to be fine in most or all cases,
@ -1038,10 +1051,10 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if self.get_version() >= (2, 3, 9):
self.parser.add_dir(general_vh.path, "RewriteRule",
constants.REWRITE_HTTPS_ARGS_WITH_END)
constants.REWRITE_HTTPS_ARGS_WITH_END)
else:
self.parser.add_dir(general_vh.path, "RewriteRule",
constants.REWRITE_HTTPS_ARGS)
constants.REWRITE_HTTPS_ARGS)
self.save_notes += ("Redirecting host in %s to ssl vhost in %s\n" %
(general_vh.filep, ssl_vhost.filep))
@ -1063,7 +1076,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
letsencrypt redirection WriteRule exists in virtual host.
"""
rewrite_path = self.parser.find_dir(
"RewriteRule", None, start=vhost.path)
"RewriteRule", None, start=vhost.path)
# There can be other RewriteRule directive lines in vhost config.
# rewrite_args_dict keys are directive ids and the corresponding value
@ -1078,12 +1091,12 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if rewrite_args_dict:
redirect_args = [constants.REWRITE_HTTPS_ARGS,
constants.REWRITE_HTTPS_ARGS_WITH_END]
constants.REWRITE_HTTPS_ARGS_WITH_END]
for matches in rewrite_args_dict.values():
if [self.aug.get(x) for x in matches] in redirect_args:
raise errors.PluginEnhancementAlreadyPresent(
"Let's Encrypt has already enabled redirection")
"Let's Encrypt has already enabled redirection")
def _is_rewrite_exists(self, vhost):
"""Checks if there exists a RewriteRule directive in vhost
@ -1096,7 +1109,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
rewrite_path = self.parser.find_dir(
"RewriteRule", None, start=vhost.path)
"RewriteRule", None, start=vhost.path)
return bool(rewrite_path)
def _is_rewrite_engine_on(self, vhost):
@ -1107,7 +1120,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
rewrite_engine_path = self.parser.find_dir("RewriteEngine", "on",
start=vhost.path)
start=vhost.path)
if rewrite_engine_path:
return self.parser.get_arg(rewrite_engine_path[0])
return False
@ -1153,7 +1166,6 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
else:
rewrite_rule_args = constants.REWRITE_HTTPS_ARGS
return ("<VirtualHost %s>\n"
"%s \n"
"%s \n"
@ -1165,7 +1177,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"ErrorLog /var/log/apache2/redirect.error.log\n"
"LogLevel warn\n"
"</VirtualHost>\n"
% (" ".join(str(addr) for addr in self._get_proposed_addrs(ssl_vhost)),
% (" ".join(str(addr) for
addr in self._get_proposed_addrs(ssl_vhost)),
servername, serveralias,
" ".join(rewrite_rule_args)))
@ -1179,7 +1192,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if len(ssl_vhost.name) < (255 - (len(redirect_filename) + 1)):
redirect_filename = "le-redirect-%s.conf" % ssl_vhost.name
redirect_filepath = os.path.join(self.conf("vhost-root"), redirect_filename)
redirect_filepath = os.path.join(self.conf("vhost-root"),
redirect_filename)
# Register the new file that will be created
# Note: always register the creation before writing to ensure file will
@ -1207,7 +1221,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return None
def _get_proposed_addrs(self, vhost, port="80"): # pylint: disable=no-self-use
def _get_proposed_addrs(self, vhost, port="80"):
"""Return all addrs of vhost with the port replaced with the specified.
:param obj.VirtualHost ssl_vhost: Original Vhost
@ -1287,7 +1301,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
.. note:: Does not make sure that the site correctly works or that all
modules are enabled appropriately.
.. todo:: This function should number subdomains before the domain vhost
.. todo:: This function should number subdomains before the domain
vhost
.. todo:: Make sure link is not broken...

View file

@ -73,7 +73,8 @@ AUGEAS_LENS_DIR = pkg_resources.resource_filename(
REWRITE_HTTPS_ARGS = [
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,QSA,R=permanent]"]
"""Apache version<2.3.9 rewrite rule arguments used for redirections to https vhost"""
"""Apache version<2.3.9 rewrite rule arguments used for redirections to
https vhost"""
REWRITE_HTTPS_ARGS_WITH_END = [
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[END,QSA,R=permanent]"]
@ -81,14 +82,14 @@ REWRITE_HTTPS_ARGS_WITH_END = [
https vhost"""
HSTS_ARGS = ["always", "set", "Strict-Transport-Security",
"\"max-age=31536000\""]
"\"max-age=31536000\""]
"""Apache header arguments for HSTS"""
UIR_ARGS = ["always", "set", "Content-Security-Policy",
"upgrade-insecure-requests"]
"upgrade-insecure-requests"]
HEADER_ARGS = {"Strict-Transport-Security": HSTS_ARGS,
"Upgrade-Insecure-Requests": UIR_ARGS}
"Upgrade-Insecure-Requests": UIR_ARGS}
def os_constant(key):

View file

@ -79,8 +79,9 @@ def _vhost_menu(domain, vhosts):
)
code, tag = zope.component.getUtility(interfaces.IDisplay).menu(
"We were unable to find a vhost with a ServerName or Address of {0}.{1}"
"Which virtual host would you like to choose?".format(
"We were unable to find a vhost with a ServerName "
"or Address of {0}.{1}Which virtual host would you "
"like to choose?".format(
domain, os.linesep),
choices, help_label="More Info", ok_label="Select")

View file

@ -96,11 +96,12 @@ class ApacheParser(object):
def update_runtime_variables(self):
""""
.. note:: Compile time variables (apache2ctl -V) are not used within the
dynamic configuration files. These should not be parsed or
.. note:: Compile time variables (apache2ctl -V) are not used within
the dynamic configuration files. These should not be parsed or
interpreted.
.. todo:: Create separate compile time variables... simply for arg_get()
.. todo:: Create separate compile time variables...
simply for arg_get()
"""
stdout = self._get_runtime_cfg()
@ -177,7 +178,8 @@ class ApacheParser(object):
# Make sure we don't cause an IndexError (end of list)
# Check to make sure arg + 1 doesn't exist
if (i == (len(matches) - 1) or
not matches[i + 1].endswith("/arg[%d]" % (args + 1))):
not matches[i + 1].endswith("/arg[%d]" %
(args + 1))):
filtered.append(matches[i][:-len("/arg[%d]" % args)])
return filtered
@ -311,8 +313,6 @@ class ApacheParser(object):
for match in matches:
dir_ = self.aug.get(match).lower()
if dir_ == "include" or dir_ == "includeoptional":
# start[6:] to strip off /files
#print self._get_include_path(self.get_arg(match +"/arg")), directive, arg
ordered_matches.extend(self.find_dir(
directive, arg,
self._get_include_path(self.get_arg(match + "/arg")),
@ -331,8 +331,8 @@ class ApacheParser(object):
"""
value = self.aug.get(match)
# No need to strip quotes for variables, as apache2ctl already does this
# but we do need to strip quotes for all normal arguments.
# No need to strip quotes for variables, as apache2ctl already does
# this, but we do need to strip quotes for all normal arguments.
# Note: normal argument may be a quoted variable
# e.g. strip now, not later
@ -454,7 +454,7 @@ class ApacheParser(object):
https://apr.apache.org/docs/apr/2.0/apr__fnmatch_8h_source.html
http://apache2.sourcearchive.com/documentation/2.2.16-6/apr__fnmatch_8h_source.html
:param str clean_fn_match: Apache style filename match, similar to globs
:param str clean_fn_match: Apache style filename match, like globs
:returns: regex suitable for augeas
:rtype: str

View file

@ -96,7 +96,8 @@ class ComplexParserTest(util.ParserTest):
else:
self.assertFalse(self.parser.find_dir("FNMATCH_DIRECTIVE"))
# NOTE: Only run one test per function otherwise you will have inf recursion
# NOTE: Only run one test per function otherwise you will have
# inf recursion
def test_include(self):
self.verify_fnmatch("test_fnmatch.?onf")
@ -104,7 +105,8 @@ class ComplexParserTest(util.ParserTest):
self.verify_fnmatch("../complex_parsing/[te][te]st_*.?onf")
def test_include_fullpath(self):
self.verify_fnmatch(os.path.join(self.config_path, "test_fnmatch.conf"))
self.verify_fnmatch(os.path.join(self.config_path,
"test_fnmatch.conf"))
def test_include_fullpath_trailing_slash(self):
self.verify_fnmatch(self.config_path + "//")

View file

@ -35,10 +35,10 @@ class TwoVhost80Test(util.ApacheTest):
def mock_deploy_cert(self, config):
"""A test for a mock deploy cert"""
self.config.real_deploy_cert = self.config.deploy_cert
def mocked_deploy_cert(*args, **kwargs):
"""a helper to mock a deployed cert"""
with mock.patch(
"letsencrypt_apache.configurator.ApacheConfigurator.enable_mod"):
with mock.patch("letsencrypt_apache.configurator.ApacheConfigurator.enable_mod"):
config.real_deploy_cert(*args, **kwargs)
self.config.deploy_cert = mocked_deploy_cert
return self.config
@ -70,7 +70,8 @@ class TwoVhost80Test(util.ApacheTest):
def test_prepare_old_aug(self, mock_exe_exists, _):
mock_exe_exists.return_value = True
self.config.config_test = mock.Mock()
self.config._check_aug_version = mock.Mock(return_value=False) # pylint: disable=protected-access
# pylint: disable=protected-access
self.config._check_aug_version = mock.Mock(return_value=False)
self.assertRaises(
errors.NotSupportedError, self.config.prepare)
@ -110,8 +111,8 @@ class TwoVhost80Test(util.ApacheTest):
def test_add_servernames_alias(self):
self.config.parser.add_dir(
self.vh_truth[2].path, "ServerAlias", ["*.le.co"])
self.config._add_servernames(self.vh_truth[2]) # pylint: disable=protected-access
# pylint: disable=protected-access
self.config._add_servernames(self.vh_truth[2])
self.assertEqual(
self.vh_truth[2].get_names(), set(["*.le.co", "ip-172-30-0-17"]))
@ -177,7 +178,8 @@ class TwoVhost80Test(util.ApacheTest):
def test_choose_vhost_select_vhost_conflicting_non_ssl(self, mock_select):
mock_select.return_value = self.vh_truth[3]
conflicting_vhost = obj.VirtualHost(
"path", "aug_path", set([obj.Addr.fromstring("*:443")]), True, True)
"path", "aug_path", set([obj.Addr.fromstring("*:443")]),
True, True)
self.config.vhosts.append(conflicting_vhost)
self.assertRaises(
@ -196,7 +198,8 @@ class TwoVhost80Test(util.ApacheTest):
def test_find_best_vhost_variety(self):
# pylint: disable=protected-access
ssl_vh = obj.VirtualHost(
"fp", "ap", set([obj.Addr(("*", "443")), obj.Addr(("zombo.com",))]),
"fp", "ap", set([obj.Addr(("*", "443")),
obj.Addr(("zombo.com",))]),
True, False)
self.config.vhosts.append(ssl_vh)
self.assertEqual(self.config._find_best_vhost("zombo.com"), ssl_vh)
@ -277,7 +280,8 @@ class TwoVhost80Test(util.ApacheTest):
def test_deploy_cert_newssl(self):
self.config = util.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir, version=(2, 4, 16))
self.config_path, self.vhost_path, self.config_dir,
self.work_dir, version=(2, 4, 16))
self.config.parser.modules.add("ssl_module")
self.config.parser.modules.add("mod_ssl.c")
@ -295,7 +299,8 @@ class TwoVhost80Test(util.ApacheTest):
self.assertTrue("ssl_module" in self.config.parser.modules)
loc_cert = self.config.parser.find_dir(
"sslcertificatefile", "example/fullchain.pem", self.vh_truth[1].path)
"sslcertificatefile", "example/fullchain.pem",
self.vh_truth[1].path)
loc_key = self.config.parser.find_dir(
"sslcertificateKeyfile", "example/key.pem", self.vh_truth[1].path)
@ -310,7 +315,8 @@ class TwoVhost80Test(util.ApacheTest):
def test_deploy_cert_newssl_no_fullchain(self):
self.config = util.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir, version=(2, 4, 16))
self.config_path, self.vhost_path, self.config_dir,
self.work_dir, version=(2, 4, 16))
self.config = self.mock_deploy_cert(self.config)
self.config.parser.modules.add("ssl_module")
@ -320,11 +326,13 @@ class TwoVhost80Test(util.ApacheTest):
self.config.assoc["random.demo"] = self.vh_truth[1]
self.assertRaises(errors.PluginError,
lambda: self.config.deploy_cert(
"random.demo", "example/cert.pem", "example/key.pem"))
"random.demo", "example/cert.pem",
"example/key.pem"))
def test_deploy_cert_old_apache_no_chain(self):
self.config = util.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir, version=(2, 4, 7))
self.config_path, self.vhost_path, self.config_dir,
self.work_dir, version=(2, 4, 7))
self.config = self.mock_deploy_cert(self.config)
self.config.parser.modules.add("ssl_module")
@ -334,7 +342,8 @@ class TwoVhost80Test(util.ApacheTest):
self.config.assoc["random.demo"] = self.vh_truth[1]
self.assertRaises(errors.PluginError,
lambda: self.config.deploy_cert(
"random.demo", "example/cert.pem", "example/key.pem"))
"random.demo", "example/cert.pem",
"example/key.pem"))
def test_deploy_cert(self):
self.config.parser.modules.add("ssl_module")
@ -442,7 +451,8 @@ class TwoVhost80Test(util.ApacheTest):
# Test Listen statements with specific ip listeed
self.config.prepare_server_https("443")
# Should only be 2 here, as the third interface already listens to the correct port
# Should only be 2 here, as the third interface
# already listens to the correct port
self.assertEqual(mock_add_dir.call_count, 2)
# Check argument to new Listen statements
@ -456,9 +466,12 @@ class TwoVhost80Test(util.ApacheTest):
# Test
self.config.prepare_server_https("8080", temp=True)
self.assertEqual(mock_add_dir.call_count, 3)
self.assertEqual(mock_add_dir.call_args_list[0][0][2], ["1.2.3.4:8080", "https"])
self.assertEqual(mock_add_dir.call_args_list[1][0][2], ["[::1]:8080", "https"])
self.assertEqual(mock_add_dir.call_args_list[2][0][2], ["1.1.1.1:8080", "https"])
self.assertEqual(mock_add_dir.call_args_list[0][0][2],
["1.2.3.4:8080", "https"])
self.assertEqual(mock_add_dir.call_args_list[1][0][2],
["[::1]:8080", "https"])
self.assertEqual(mock_add_dir.call_args_list[2][0][2],
["1.1.1.1:8080", "https"])
def test_prepare_server_https_mixed_listen(self):
@ -476,7 +489,8 @@ class TwoVhost80Test(util.ApacheTest):
# Test Listen statements with specific ip listeed
self.config.prepare_server_https("443")
# Should only be 2 here, as the third interface already listens to the correct port
# Should only be 2 here, as the third interface
# already listens to the correct port
self.assertEqual(mock_add_dir.call_count, 0)
def test_make_vhost_ssl(self):
@ -510,7 +524,8 @@ class TwoVhost80Test(util.ApacheTest):
for directive in ["SSLCertificateFile", "SSLCertificateKeyFile",
"SSLCertificateChainFile", "SSLCACertificatePath"]:
for _ in range(10):
self.config.parser.add_dir(self.vh_truth[1].path, directive, ["bogus"])
self.config.parser.add_dir(self.vh_truth[1].path,
directive, ["bogus"])
self.config.save()
self.config._clean_vhost(self.vh_truth[1])
@ -536,23 +551,24 @@ class TwoVhost80Test(util.ApacheTest):
# pylint: disable=protected-access
DIRECTIVE = "Foo"
for _ in range(10):
self.config.parser.add_dir(self.vh_truth[1].path, DIRECTIVE, ["bar"])
self.config.parser.add_dir(self.vh_truth[1].path,
DIRECTIVE, ["bar"])
self.config.save()
self.config._deduplicate_directives(self.vh_truth[1].path, [DIRECTIVE])
self.config.save()
self.assertEqual(
len(self.config.parser.find_dir(
DIRECTIVE, None, self.vh_truth[1].path, False)),
1)
len(self.config.parser.find_dir(
DIRECTIVE, None, self.vh_truth[1].path, False)), 1)
def test_remove_directives(self):
# pylint: disable=protected-access
DIRECTIVES = ["Foo", "Bar"]
for directive in DIRECTIVES:
for _ in range(10):
self.config.parser.add_dir(self.vh_truth[1].path, directive, ["baz"])
self.config.parser.add_dir(self.vh_truth[1].path,
directive, ["baz"])
self.config.save()
self.config._remove_directives(self.vh_truth[1].path, DIRECTIVES)
@ -560,9 +576,8 @@ class TwoVhost80Test(util.ApacheTest):
for directive in DIRECTIVES:
self.assertEqual(
len(self.config.parser.find_dir(
directive, None, self.vh_truth[1].path, False)),
0)
len(self.config.parser.find_dir(
directive, None, self.vh_truth[1].path, False)), 0)
def test_make_vhost_ssl_extra_vhs(self):
self.config.aug.match = mock.Mock(return_value=["p1", "p2"])
@ -651,7 +666,8 @@ class TwoVhost80Test(util.ApacheTest):
self.assertRaises(errors.PluginError, self.config.get_version)
mock_script.return_value = (
"Server Version: Apache/2.3{0} Apache/2.4.7".format(os.linesep), "")
"Server Version: Apache/2.3{0} Apache/2.4.7".format(
os.linesep), "")
self.assertRaises(errors.PluginError, self.config.get_version)
mock_script.side_effect = errors.SubprocessError("Can't find program")
@ -675,7 +691,8 @@ class TwoVhost80Test(util.ApacheTest):
def test_config_test_bad_process(self, mock_run_script):
mock_run_script.side_effect = errors.SubprocessError
self.assertRaises(errors.MisconfigurationError, self.config.config_test)
self.assertRaises(errors.MisconfigurationError,
self.config.config_test)
def test_get_all_certs_keys(self):
c_k = self.config.get_all_certs_keys()
@ -687,7 +704,8 @@ class TwoVhost80Test(util.ApacheTest):
self.assertTrue("default-ssl" in path)
def test_get_all_certs_keys_malformed_conf(self):
self.config.parser.find_dir = mock.Mock(side_effect=[["path"], [], ["path"], []])
self.config.parser.find_dir = mock.Mock(
side_effect=[["path"], [], ["path"], []])
c_k = self.config.get_all_certs_keys()
self.assertFalse(c_k)
@ -708,13 +726,13 @@ class TwoVhost80Test(util.ApacheTest):
def test_supported_enhancements(self):
self.assertTrue(isinstance(self.config.supported_enhancements(), list))
@mock.patch("letsencrypt.le_util.exe_exists")
def test_enhance_unknown_vhost(self, mock_exe):
self.config.parser.modules.add("rewrite_module")
mock_exe.return_value = True
ssl_vh = obj.VirtualHost(
"fp", "ap", set([obj.Addr(("*", "443")), obj.Addr(("satoshi.com",))]),
"fp", "ap", set([obj.Addr(("*", "443")),
obj.Addr(("satoshi.com",))]),
True, False)
self.config.vhosts.append(ssl_vh)
self.assertRaises(
@ -735,7 +753,7 @@ class TwoVhost80Test(util.ApacheTest):
# This will create an ssl vhost for letsencrypt.demo
self.config.enhance("letsencrypt.demo", "ensure-http-header",
"Strict-Transport-Security")
"Strict-Transport-Security")
self.assertTrue("headers_module" in self.config.parser.modules)
@ -745,7 +763,7 @@ class TwoVhost80Test(util.ApacheTest):
# These are not immediately available in find_dir even with save() and
# load(). They must be found in sites-available
hsts_header = self.config.parser.find_dir(
"Header", None, ssl_vhost.path)
"Header", None, ssl_vhost.path)
# four args to HSTS header
self.assertEqual(len(hsts_header), 4)
@ -757,12 +775,12 @@ class TwoVhost80Test(util.ApacheTest):
# This will create an ssl vhost for letsencrypt.demo
self.config.enhance("encryption-example.demo", "ensure-http-header",
"Strict-Transport-Security")
"Strict-Transport-Security")
self.assertRaises(
errors.PluginEnhancementAlreadyPresent,
self.config.enhance, "encryption-example.demo", "ensure-http-header",
"Strict-Transport-Security")
self.config.enhance, "encryption-example.demo",
"ensure-http-header", "Strict-Transport-Security")
@mock.patch("letsencrypt.le_util.run_script")
@mock.patch("letsencrypt.le_util.exe_exists")
@ -773,7 +791,7 @@ class TwoVhost80Test(util.ApacheTest):
# This will create an ssl vhost for letsencrypt.demo
self.config.enhance("letsencrypt.demo", "ensure-http-header",
"Upgrade-Insecure-Requests")
"Upgrade-Insecure-Requests")
self.assertTrue("headers_module" in self.config.parser.modules)
@ -783,7 +801,7 @@ class TwoVhost80Test(util.ApacheTest):
# These are not immediately available in find_dir even with save() and
# load(). They must be found in sites-available
uir_header = self.config.parser.find_dir(
"Header", None, ssl_vhost.path)
"Header", None, ssl_vhost.path)
# four args to HSTS header
self.assertEqual(len(uir_header), 4)
@ -795,14 +813,12 @@ class TwoVhost80Test(util.ApacheTest):
# This will create an ssl vhost for letsencrypt.demo
self.config.enhance("encryption-example.demo", "ensure-http-header",
"Upgrade-Insecure-Requests")
"Upgrade-Insecure-Requests")
self.assertRaises(
errors.PluginEnhancementAlreadyPresent,
self.config.enhance, "encryption-example.demo", "ensure-http-header",
"Upgrade-Insecure-Requests")
self.config.enhance, "encryption-example.demo",
"ensure-http-header", "Upgrade-Insecure-Requests")
@mock.patch("letsencrypt.le_util.run_script")
@mock.patch("letsencrypt.le_util.exe_exists")
@ -836,7 +852,8 @@ class TwoVhost80Test(util.ApacheTest):
self.config.get_version = mock.Mock(return_value=(2, 3, 9))
self.config.parser.add_dir(
self.vh_truth[3].path, "RewriteRule", ["Unknown"])
self.assertTrue(self.config._is_rewrite_exists(self.vh_truth[3])) # pylint: disable=protected-access
# pylint: disable=protected-access
self.assertTrue(self.config._is_rewrite_exists(self.vh_truth[3]))
def test_rewrite_engine_exists(self):
# Skip the enable mod
@ -844,8 +861,8 @@ class TwoVhost80Test(util.ApacheTest):
self.config.get_version = mock.Mock(return_value=(2, 3, 9))
self.config.parser.add_dir(
self.vh_truth[3].path, "RewriteEngine", "on")
self.assertTrue(self.config._is_rewrite_engine_on(self.vh_truth[3])) # pylint: disable=protected-access
# pylint: disable=protected-access
self.assertTrue(self.config._is_rewrite_engine_on(self.vh_truth[3]))
@mock.patch("letsencrypt.le_util.run_script")
@mock.patch("letsencrypt.le_util.exe_exists")
@ -857,7 +874,7 @@ class TwoVhost80Test(util.ApacheTest):
# Create a preexisting rewrite rule
self.config.parser.add_dir(
self.vh_truth[3].path, "RewriteRule", ["UnknownPattern",
"UnknownTarget"])
"UnknownTarget"])
self.config.save()
# This will create an ssl vhost for letsencrypt.demo
@ -879,11 +896,11 @@ class TwoVhost80Test(util.ApacheTest):
self.assertTrue("rewrite_module" in self.config.parser.modules)
def test_redirect_with_conflict(self):
self.config.parser.modules.add("rewrite_module")
ssl_vh = obj.VirtualHost(
"fp", "ap", set([obj.Addr(("*", "443")), obj.Addr(("zombo.com",))]),
"fp", "ap", set([obj.Addr(("*", "443")),
obj.Addr(("zombo.com",))]),
True, False)
# No names ^ this guy should conflict.
@ -908,7 +925,8 @@ class TwoVhost80Test(util.ApacheTest):
self.vh_truth[1].name = "default.com"
self.vh_truth[1].aliases = set(["yes.default.com"])
self.config._enable_redirect(self.vh_truth[1], "") # pylint: disable=protected-access
# pylint: disable=protected-access
self.config._enable_redirect(self.vh_truth[1], "")
self.assertEqual(len(self.config.vhosts), 7)
def test_create_own_redirect_for_old_apache_version(self):
@ -918,7 +936,8 @@ class TwoVhost80Test(util.ApacheTest):
self.vh_truth[1].name = "default.com"
self.vh_truth[1].aliases = set(["yes.default.com"])
self.config._enable_redirect(self.vh_truth[1], "") # pylint: disable=protected-access
# pylint: disable=protected-access
self.config._enable_redirect(self.vh_truth[1], "")
self.assertEqual(len(self.config.vhosts), 7)
def test_sift_line(self):
@ -942,10 +961,10 @@ class TwoVhost80Test(util.ApacheTest):
http_vhost.path, "RewriteEngine", "on")
self.config.parser.add_dir(
http_vhost.path, "RewriteRule",
["^",
"https://%{SERVER_NAME}%{REQUEST_URI}",
"[L,QSA,R=permanent]"])
http_vhost.path, "RewriteRule",
["^",
"https://%{SERVER_NAME}%{REQUEST_URI}",
"[L,QSA,R=permanent]"])
self.config.save()
ssl_vhost = self.config.make_vhost_ssl(self.vh_truth[0])
@ -954,8 +973,9 @@ class TwoVhost80Test(util.ApacheTest):
"RewriteEngine", "on", ssl_vhost.path, False))
conf_text = open(ssl_vhost.filep).read()
commented_rewrite_rule = \
"# RewriteRule ^ https://%{SERVER_NAME}%{REQUEST_URI} [L,QSA,R=permanent]"
commented_rewrite_rule = ("# RewriteRule ^ "
"https://%{SERVER_NAME}%{REQUEST_URI} "
"[L,QSA,R=permanent]")
self.assertTrue(commented_rewrite_rule in conf_text)
mock_get_utility().add_message.assert_called_once_with(mock.ANY,
mock.ANY)
@ -990,9 +1010,11 @@ class TwoVhost80Test(util.ApacheTest):
def test_aug_version(self):
mock_match = mock.Mock(return_value=["something"])
self.config.aug.match = mock_match
self.assertEquals(self.config._check_aug_version(), ["something"]) # pylint: disable=protected-access
# pylint: disable=protected-access
self.assertEquals(self.config._check_aug_version(),
["something"])
self.config.aug.match.side_effect = RuntimeError
self.assertFalse(self.config._check_aug_version()) # pylint: disable=protected-access
self.assertFalse(self.config._check_aug_version())
if __name__ == "__main__":

View file

@ -47,7 +47,8 @@ class VirtualHostTest(unittest.TestCase):
self.assertTrue(self.vhost1.conflicts([self.addr2]))
self.assertFalse(self.vhost1.conflicts([self.addr_default]))
self.assertFalse(self.vhost2.conflicts([self.addr1, self.addr_default]))
self.assertFalse(self.vhost2.conflicts([self.addr1,
self.addr_default]))
def test_same_server(self):
from letsencrypt_apache.obj import VirtualHost

View file

@ -118,7 +118,8 @@ class BasicParserTest(util.ParserTest):
# pylint: disable=protected-access
path = os.path.join(self.parser.root, "httpd.conf")
open(path, 'w').close()
self.parser.add_dir(self.parser.loc["default"], "Include", "httpd.conf")
self.parser.add_dir(self.parser.loc["default"], "Include",
"httpd.conf")
self.assertEqual(
path, self.parser._set_user_config_file())

View file

@ -9,6 +9,8 @@ from letsencrypt.plugins import common_test
from letsencrypt_apache import obj
from letsencrypt_apache.tests import util
from six.moves import xrange # pylint: disable=redefined-builtin, import-error
class TlsSniPerformTest(util.ApacheTest):
"""Test the ApacheTlsSni01 challenge."""
@ -58,7 +60,7 @@ class TlsSniPerformTest(util.ApacheTest):
mock_setup_cert.assert_called_once_with(achall)
# Check to make sure challenge config path is included in apache config.
# Check to make sure challenge config path is included in apache config
self.assertEqual(
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)), 1)
@ -78,8 +80,7 @@ class TlsSniPerformTest(util.ApacheTest):
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
with mock.patch(
"letsencrypt_apache.configurator.ApacheConfigurator.enable_mod"):
with mock.patch("letsencrypt_apache.configurator.ApacheConfigurator.enable_mod"):
sni_responses = self.sni.perform()
self.assertEqual(mock_setup_cert.call_count, 2)
@ -126,13 +127,15 @@ class TlsSniPerformTest(util.ApacheTest):
def test_get_addrs_default(self):
self.sni.configurator.choose_vhost = mock.Mock(
return_value=obj.VirtualHost(
"path", "aug_path", set([obj.Addr.fromstring("_default_:443")]),
"path", "aug_path",
set([obj.Addr.fromstring("_default_:443")]),
False, False)
)
# pylint: disable=protected-access
self.assertEqual(
set([obj.Addr.fromstring("*:443")]),
self.sni._get_addrs(self.achalls[0])) # pylint: disable=protected-access
self.sni._get_addrs(self.achalls[0]))
if __name__ == "__main__":

View file

@ -76,7 +76,8 @@ class ParserTest(ApacheTest): # pytlint: disable=too-few-public-methods
def get_apache_configurator(
config_path, vhost_path, config_dir, work_dir, version=(2, 4, 7), conf=None):
config_path, vhost_path,
config_dir, work_dir, version=(2, 4, 7), conf=None):
"""Create an Apache Configurator with the specified options.
:param conf: Function that returns binary paths. self.conf in Configurator
@ -143,10 +144,12 @@ def get_vh_truth(temp_dir, config_name):
os.path.join(prefix, "mod_macro-example.conf"),
os.path.join(aug_pre,
"mod_macro-example.conf/Macro/VirtualHost"),
set([obj.Addr.fromstring("*:80")]), False, True, modmacro=True),
set([obj.Addr.fromstring("*:80")]), False, True,
modmacro=True),
obj.VirtualHost(
os.path.join(prefix, "default-ssl-port-only.conf"),
os.path.join(aug_pre, "default-ssl-port-only.conf/IfModule/VirtualHost"),
os.path.join(aug_pre, ("default-ssl-port-only.conf/"
"IfModule/VirtualHost")),
set([obj.Addr.fromstring("_default_:443")]), True, False),
]
return vh_truth

View file

@ -10,6 +10,7 @@ from letsencrypt_apache import parser
logger = logging.getLogger(__name__)
class ApacheTlsSni01(common.TLSSNI01):
"""Class that performs TLS-SNI-01 challenges within the Apache configurator
@ -125,7 +126,8 @@ class ApacheTlsSni01(common.TLSSNI01):
addrs.add(default_addr)
else:
addrs.add(
addr.get_sni_addr(self.configurator.config.tls_sni_01_port))
addr.get_sni_addr(
self.configurator.config.tls_sni_01_port))
return addrs