Refactor augeas_configurator.py functionality to configurator.py and parser.py accordingly. (#7181)

This pull request moves the functionality within `AugeasConfigurator` that previously existed as a parent class of `ApacheConfigurator` to `ApacheConfigurator` and `ApacheParser` accordingly.

Most of the methods were moved as-is, and one (`recovery_routine()`) was completely removed. Few of the methods had to be split between the configurator and parser, good example of this is `save()`.

The Augeas object now lives completely within the `ApacheParser`.

* Remove augeasconfigurator

* Fix references

* Adjust tests accordingly

* Simplify test

* Address review comments

* Address review comments

* Move test_recovery_routine_reload
This commit is contained in:
Joona Hoikkala 2019-06-28 18:39:13 +03:00 committed by Brad Warren
parent 4fc0ef0fbe
commit c08a4dec2d
11 changed files with 322 additions and 361 deletions

View file

@ -1,207 +0,0 @@
"""Class of Augeas Configurators."""
import logging
from certbot import errors
from certbot.plugins import common
from certbot_apache import constants
logger = logging.getLogger(__name__)
class AugeasConfigurator(common.Installer):
"""Base Augeas Configurator class.
:ivar config: Configuration.
:type config: :class:`~certbot.interfaces.IConfig`
:ivar aug: Augeas object
:type aug: :class:`augeas.Augeas`
:ivar str save_notes: Human-readable configuration change notes
:ivar reverter: saves and reverts checkpoints
:type reverter: :class:`certbot.reverter.Reverter`
"""
def __init__(self, *args, **kwargs):
super(AugeasConfigurator, self).__init__(*args, **kwargs)
# Placeholder for augeas
self.aug = None
self.save_notes = ""
def init_augeas(self):
""" Initialize the actual Augeas instance """
import augeas
self.aug = augeas.Augeas(
# specify a directory to load our preferred lens from
loadpath=constants.AUGEAS_LENS_DIR,
# Do not save backup (we do it ourselves), do not load
# anything by default
flags=(augeas.Augeas.NONE |
augeas.Augeas.NO_MODL_AUTOLOAD |
augeas.Augeas.ENABLE_SPAN))
# See if any temporary changes need to be recovered
# This needs to occur before VirtualHost objects are setup...
# because this will change the underlying configuration and potential
# vhosts
self.recovery_routine()
def check_parsing_errors(self, lens):
"""Verify Augeas can parse all of the lens files.
:param str lens: lens to check for errors
:raises .errors.PluginError: If there has been an error in parsing with
the specified lens.
"""
error_files = self.aug.match("/augeas//error")
for path in error_files:
# Check to see if it was an error resulting from the use of
# the httpd lens
lens_path = self.aug.get(path + "/lens")
# As aug.get may return null
if lens_path and lens in lens_path:
msg = (
"There has been an error in parsing the file {0} on line {1}: "
"{2}".format(
# Strip off /augeas/files and /error
path[13:len(path) - 6],
self.aug.get(path + "/line"),
self.aug.get(path + "/message")))
raise errors.PluginError(msg)
def ensure_augeas_state(self):
"""Makes sure that all Augeas dom changes are written to files to avoid
loss of configuration directives when doing additional augeas parsing,
causing a possible augeas.load() resulting dom reset
"""
if self.unsaved_files():
self.save_notes += "(autosave)"
self.save()
def unsaved_files(self):
"""Lists files that have modified Augeas DOM but the changes have not
been written to the filesystem yet, used by `self.save()` and
ApacheConfigurator to check the file state.
:raises .errors.PluginError: If there was an error in Augeas, in
an attempt to save the configuration, or an error creating a
checkpoint
:returns: `set` of unsaved files
"""
save_state = self.aug.get("/augeas/save")
self.aug.set("/augeas/save", "noop")
# Existing Errors
ex_errs = self.aug.match("/augeas//error")
try:
# This is a noop save
self.aug.save()
except (RuntimeError, IOError):
self._log_save_errors(ex_errs)
# Erase Save Notes
self.save_notes = ""
raise errors.PluginError(
"Error saving files, check logs for more info.")
# Return the original save method
self.aug.set("/augeas/save", save_state)
# Retrieve list of modified files
# Note: Noop saves can cause the file to be listed twice, I used a
# set to remove this possibility. This is a known augeas 0.10 error.
save_paths = self.aug.match("/augeas/events/saved")
save_files = set()
if save_paths:
for path in save_paths:
save_files.add(self.aug.get(path)[6:])
return save_files
def save(self, title=None, temporary=False):
"""Saves all changes to the configuration files.
This function first checks for save errors, if none are found,
all configuration changes made will be saved. According to the
function parameters. If an exception is raised, a new checkpoint
was not created.
:param str title: The title of the save. If a title is given, the
configuration will be saved as a new checkpoint and put in a
timestamped directory.
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (ie. challenges)
"""
save_files = self.unsaved_files()
if save_files:
self.add_to_checkpoint(save_files,
self.save_notes, temporary=temporary)
self.save_notes = ""
self.aug.save()
# Force reload if files were modified
# This is needed to recalculate augeas directive span
if save_files:
for sf in save_files:
self.aug.remove("/files/"+sf)
self.aug.load()
if title and not temporary:
self.finalize_checkpoint(title)
def _log_save_errors(self, ex_errs):
"""Log errors due to bad Augeas save.
:param list ex_errs: Existing errors before save
"""
# Check for the root of save problems
new_errs = self.aug.match("/augeas//error")
# logger.error("During Save - %s", mod_conf)
logger.error("Unable to save files: %s. Attempted Save Notes: %s",
", ".join(err[13:len(err) - 6] for err in new_errs
# Only new errors caused by recent save
if err not in ex_errs), self.save_notes)
# Wrapper functions for Reverter class
def recovery_routine(self):
"""Revert all previously modified files.
Reverts all modified files that have not been saved as a checkpoint
:raises .errors.PluginError: If unable to recover the configuration
"""
super(AugeasConfigurator, self).recovery_routine()
# Need to reload configuration after these changes take effect
self.aug.load()
def revert_challenge_config(self):
"""Used to cleanup challenge configurations.
:raises .errors.PluginError: If unable to revert the challenge config.
"""
self.revert_temporary_config()
self.aug.load()
def rollback_checkpoints(self, rollback=1):
"""Rollback saved checkpoints.
:param int rollback: Number of checkpoints to revert
:raises .errors.PluginError: If there is a problem with the input or
the function is unable to correctly revert the configuration
"""
super(AugeasConfigurator, self).rollback_checkpoints(rollback)
self.aug.load()

View file

@ -1,4 +1,4 @@
"""Apache Configuration based off of Augeas Configurator."""
"""Apache Configurator."""
# pylint: disable=too-many-lines
import copy
import fnmatch
@ -29,7 +29,6 @@ from certbot.plugins.util import path_surgery
from certbot.plugins.enhancements import AutoHSTSEnhancement
from certbot_apache import apache_util
from certbot_apache import augeas_configurator
from certbot_apache import constants
from certbot_apache import display_ops
from certbot_apache import http_01
@ -70,7 +69,7 @@ logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator, interfaces.IInstaller)
@zope.interface.provider(interfaces.IPluginFactory)
class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
class ApacheConfigurator(common.Installer):
# pylint: disable=too-many-instance-attributes,too-many-public-methods
"""Apache configurator.
@ -198,6 +197,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
# Temporary state for AutoHSTS enhancement
self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]]
# Reverter save notes
self.save_notes = ""
# These will be set in the prepare function
self._prepared = False
@ -228,12 +229,6 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
:raises .errors.PluginError: If there is any other error
"""
# Perform the actual Augeas initialization to be able to react
try:
self.init_augeas()
except ImportError:
raise errors.NoInstallationError("Problem in Augeas installation")
self._prepare_options()
# Verify Apache is installed
@ -251,16 +246,14 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
raise errors.NotSupportedError(
"Apache Version {0} not supported.".format(str(self.version)))
if not self._check_aug_version():
raise errors.NotSupportedError(
"Apache plugin support requires libaugeas0 and augeas-lenses "
"version 1.2.0 or higher, please make sure you have you have "
"those installed.")
# Recover from previous crash before Augeas initialization to have the
# correct parse tree from the get go.
self.recovery_routine()
# Perform the actual Augeas initialization to be able to react
self.parser = self.get_parser()
# Check for errors in parsing files with Augeas
self.check_parsing_errors("httpd.aug")
self.parser.check_parsing_errors("httpd.aug")
# Get all of the available vhosts
self.vhosts = self.get_virtual_hosts()
@ -279,6 +272,67 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
" Apache configuration?".format(self.option("server_root")))
self._prepared = True
def save(self, title=None, temporary=False):
"""Saves all changes to the configuration files.
This function first checks for save errors, if none are found,
all configuration changes made will be saved. According to the
function parameters. If an exception is raised, a new checkpoint
was not created.
:param str title: The title of the save. If a title is given, the
configuration will be saved as a new checkpoint and put in a
timestamped directory.
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (ie. challenges)
"""
save_files = self.parser.unsaved_files()
if save_files:
self.add_to_checkpoint(save_files,
self.save_notes, temporary=temporary)
# Handle the parser specific tasks
self.parser.save(save_files)
if title and not temporary:
self.finalize_checkpoint(title)
def recovery_routine(self):
"""Revert all previously modified files.
Reverts all modified files that have not been saved as a checkpoint
:raises .errors.PluginError: If unable to recover the configuration
"""
super(ApacheConfigurator, self).recovery_routine()
# Reload configuration after these changes take effect if needed
# ie. ApacheParser has been initialized.
if self.parser:
# TODO: wrap into non-implementation specific parser interface
self.parser.aug.load()
def revert_challenge_config(self):
"""Used to cleanup challenge configurations.
:raises .errors.PluginError: If unable to revert the challenge config.
"""
self.revert_temporary_config()
self.parser.aug.load()
def rollback_checkpoints(self, rollback=1):
"""Rollback saved checkpoints.
:param int rollback: Number of checkpoints to revert
:raises .errors.PluginError: If there is a problem with the input or
the function is unable to correctly revert the configuration
"""
super(ApacheConfigurator, self).rollback_checkpoints(rollback)
self.parser.aug.load()
def _verify_exe_availability(self, exe):
"""Checks availability of Apache executable"""
if not util.exe_exists(exe):
@ -286,26 +340,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
raise errors.NoInstallationError(
'Cannot find Apache executable {0}'.format(exe))
def _check_aug_version(self):
""" Checks that we have recent enough version of libaugeas.
If augeas version is recent enough, it will support case insensitive
regexp matching"""
self.aug.set("/test/path/testing/arg", "aRgUMeNT")
try:
matches = self.aug.match(
"/test//*[self::arg=~regexp('argument', 'i')]")
except RuntimeError:
self.aug.remove("/test/path")
return False
self.aug.remove("/test/path")
return matches
def get_parser(self):
"""Initializes the ApacheParser"""
# If user provided vhost_root value in command line, use it
return parser.ApacheParser(
self.aug, self.option("server_root"), self.conf("vhost-root"),
self.option("server_root"), self.conf("vhost-root"),
self.version, configurator=self)
def _wildcard_domain(self, domain):
@ -484,8 +523,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# install SSLCertificateFile, SSLCertificateKeyFile,
# and SSLCertificateChainFile directives
set_cert_path = cert_path
self.aug.set(path["cert_path"][-1], cert_path)
self.aug.set(path["cert_key"][-1], key_path)
self.parser.aug.set(path["cert_path"][-1], cert_path)
self.parser.aug.set(path["cert_key"][-1], key_path)
if chain_path is not None:
self.parser.add_dir(vhost.path,
"SSLCertificateChainFile", chain_path)
@ -497,8 +536,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
raise errors.PluginError("Please provide the --fullchain-path "
"option pointing to your full chain file")
set_cert_path = fullchain_path
self.aug.set(path["cert_path"][-1], fullchain_path)
self.aug.set(path["cert_key"][-1], key_path)
self.parser.aug.set(path["cert_path"][-1], fullchain_path)
self.parser.aug.set(path["cert_key"][-1], key_path)
# Enable the new vhost if needed
if not vhost.enabled:
@ -798,7 +837,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
addrs = set()
try:
args = self.aug.match(path + "/arg")
args = self.parser.aug.match(path + "/arg")
except RuntimeError:
logger.warning("Encountered a problem while parsing file: %s, skipping", path)
return None
@ -816,7 +855,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
is_ssl = True
filename = apache_util.get_file_path(
self.aug.get("/augeas/files%s/path" % apache_util.get_file_path(path)))
self.parser.aug.get("/augeas/files%s/path" % apache_util.get_file_path(path)))
if filename is None:
return None
@ -846,7 +885,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Make a list of parser paths because the parser_paths
# dictionary may be modified during the loop.
for vhost_path in list(self.parser.parser_paths):
paths = self.aug.match(
paths = self.parser.aug.match(
("/files%s//*[label()=~regexp('%s')]" %
(vhost_path, parser.case_i("VirtualHost"))))
paths = [path for path in paths if
@ -1100,16 +1139,16 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
avail_fp = nonssl_vhost.filep
ssl_fp = self._get_ssl_vhost_path(avail_fp)
orig_matches = self.aug.match("/files%s//* [label()=~regexp('%s')]" %
orig_matches = self.parser.aug.match("/files%s//* [label()=~regexp('%s')]" %
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
self._copy_create_ssl_vhost_skeleton(nonssl_vhost, ssl_fp)
# Reload augeas to take into account the new vhost
self.aug.load()
self.parser.aug.load()
# Get Vhost augeas path for new vhost
new_matches = self.aug.match("/files%s//* [label()=~regexp('%s')]" %
new_matches = self.parser.aug.match("/files%s//* [label()=~regexp('%s')]" %
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
@ -1120,7 +1159,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Make Augeas aware of the new vhost
self.parser.parse_file(ssl_fp)
# Try to search again
new_matches = self.aug.match(
new_matches = self.parser.aug.match(
"/files%s//* [label()=~regexp('%s')]" %
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
@ -1270,8 +1309,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"vhost for your HTTPS site located at {1} because they have "
"the potential to create redirection loops.".format(
vhost.filep, ssl_fp), reporter.MEDIUM_PRIORITY)
self.aug.set("/augeas/files%s/mtime" % (self._escape(ssl_fp)), "0")
self.aug.set("/augeas/files%s/mtime" % (self._escape(vhost.filep)), "0")
self.parser.aug.set("/augeas/files%s/mtime" % (self._escape(ssl_fp)), "0")
self.parser.aug.set("/augeas/files%s/mtime" % (self._escape(vhost.filep)), "0")
def _sift_rewrite_rules(self, contents):
""" Helper function for _copy_create_ssl_vhost_skeleton to prepare the
@ -1346,7 +1385,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
try:
span_val = self.aug.span(vhost.path)
span_val = self.parser.aug.span(vhost.path)
except ValueError:
logger.critical("Error while reading the VirtualHost %s from "
"file %s", vhost.name, vhost.filep, exc_info=True)
@ -1381,13 +1420,13 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
def _update_ssl_vhosts_addrs(self, vh_path):
ssl_addrs = set()
ssl_addr_p = self.aug.match(vh_path + "/arg")
ssl_addr_p = self.parser.aug.match(vh_path + "/arg")
for addr in ssl_addr_p:
old_addr = obj.Addr.fromstring(
str(self.parser.get_arg(addr)))
ssl_addr = old_addr.get_addr_obj("443")
self.aug.set(addr, str(ssl_addr))
self.parser.aug.set(addr, str(ssl_addr))
ssl_addrs.add(ssl_addr)
return ssl_addrs
@ -1406,14 +1445,14 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
vh_path, False)) > 1:
directive_path = self.parser.find_dir(directive, None,
vh_path, False)
self.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
self.parser.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
def _remove_directives(self, vh_path, directives):
for directive in directives:
while self.parser.find_dir(directive, None, vh_path, False):
directive_path = self.parser.find_dir(directive, None,
vh_path, False)
self.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
self.parser.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
def _add_dummy_ssl_directives(self, vh_path):
self.parser.add_dir(vh_path, "SSLCertificateFile",
@ -1452,7 +1491,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
matches = self.parser.find_dir(
"ServerAlias", start=vh_path, exclude=False)
aliases = (self.aug.get(match) for match in matches)
aliases = (self.parser.aug.get(match) for match in matches)
return self.domain_in_names(aliases, target_name)
def _add_name_vhost_if_necessary(self, vhost):
@ -1635,7 +1674,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if header_path:
pat = '(?:[ "]|^)(strict-transport-security)(?:[ "]|$)'
for match in header_path:
if re.search(pat, self.aug.get(match).lower()):
if re.search(pat, self.parser.aug.get(match).lower()):
hsts_dirpath = match
if not hsts_dirpath:
err_msg = ("Certbot was unable to find the existing HSTS header "
@ -1649,7 +1688,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Our match statement was for string strict-transport-security, but
# we need to update the value instead. The next index is for the value
hsts_dirpath = hsts_dirpath.replace("arg[3]", "arg[4]")
self.aug.set(hsts_dirpath, hsts_maxage)
self.parser.aug.set(hsts_dirpath, hsts_maxage)
note_msg = ("Increasing HSTS max-age value to {0} for VirtualHost "
"in {1}\n".format(nextstep_value, vhost.filep))
logger.debug(note_msg)
@ -1731,7 +1770,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# We'll simply delete the directive, so that we'll have a
# consistent OCSP cache path.
if stapling_cache_aug_path:
self.aug.remove(
self.parser.aug.remove(
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
@ -1808,7 +1847,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# "Existing Header directive for virtualhost"
pat = '(?:[ "]|^)(%s)(?:[ "]|$)' % (header_substring.lower())
for match in header_path:
if re.search(pat, self.aug.get(match).lower()):
if re.search(pat, self.parser.aug.get(match).lower()):
raise errors.PluginEnhancementAlreadyPresent(
"Existing %s header" % (header_substring))
@ -1935,11 +1974,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
constants.REWRITE_HTTPS_ARGS_WITH_END]
for dir_path, args_paths in rewrite_args_dict.items():
arg_vals = [self.aug.get(x) for x in args_paths]
arg_vals = [self.parser.aug.get(x) for x in args_paths]
# Search for past redirection rule, delete it, set the new one
if arg_vals in constants.OLD_REWRITE_HTTPS_ARGS:
self.aug.remove(dir_path)
self.parser.aug.remove(dir_path)
self._set_https_redirection_rewrite_rule(vhost)
self.save()
raise errors.PluginEnhancementAlreadyPresent(
@ -1995,7 +2034,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
redirect_filepath = self._write_out_redirect(ssl_vhost, text)
self.aug.load()
self.parser.aug.load()
# Make a new vhost data structure and add it to the lists
new_vhost = self._create_vhost(parser.get_aug_path(self._escape(redirect_filepath)))
self.vhosts.append(new_vhost)

View file

@ -86,7 +86,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
def get_parser(self):
"""Initializes the ApacheParser"""
return CentOSParser(
self.aug, self.option("server_root"), self.option("vhost_root"),
self.option("server_root"), self.option("vhost_root"),
self.version, configurator=self)
def _deploy_cert(self, *args, **kwargs): # pylint: disable=arguments-differ
@ -155,7 +155,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
for loadmod_path in loadmod_paths:
nodir_path = loadmod_path.split("/directive")[0]
# Remove the old LoadModule directive
self.aug.remove(loadmod_path)
self.parser.aug.remove(loadmod_path)
# Create a new IfModule !mod_ssl.c if not already found on path
ssl_ifmod = self.parser.get_ifmod(nodir_path, "!mod_ssl.c",

View file

@ -51,7 +51,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
def get_parser(self):
"""Initializes the ApacheParser"""
return FedoraParser(
self.aug, self.option("server_root"), self.option("vhost_root"),
self.option("server_root"), self.option("vhost_root"),
self.version, configurator=self)
def _try_restart_fedora(self):

View file

@ -44,7 +44,7 @@ class GentooConfigurator(configurator.ApacheConfigurator):
def get_parser(self):
"""Initializes the ApacheParser"""
return GentooParser(
self.aug, self.option("server_root"), self.option("vhost_root"),
self.option("server_root"), self.option("vhost_root"),
self.version, configurator=self)

View file

@ -13,6 +13,8 @@ from acme.magic_typing import Dict, List, Set # pylint: disable=unused-import,
from certbot import errors
from certbot.compat import os
from certbot_apache import constants
logger = logging.getLogger(__name__)
@ -32,7 +34,7 @@ class ApacheParser(object):
arg_var_interpreter = re.compile(r"\$\{[^ \}]*}")
fnmatch_chars = set(["*", "?", "\\", "[", "]"])
def __init__(self, aug, root, vhostroot=None, version=(2, 4),
def __init__(self, root, vhostroot=None, version=(2, 4),
configurator=None):
# Note: Order is important here.
@ -41,11 +43,20 @@ class ApacheParser(object):
# issues with aug.load() after adding new files / defines to parse tree
self.configurator = configurator
# Initialize augeas
self.aug = None
self.init_augeas()
if not self.check_aug_version():
raise errors.NotSupportedError(
"Apache plugin support requires libaugeas0 and augeas-lenses "
"version 1.2.0 or higher, please make sure you have you have "
"those installed.")
self.modules = set() # type: Set[str]
self.parser_paths = {} # type: Dict[str, List[str]]
self.variables = {} # type: Dict[str, str]
self.aug = aug
# Find configuration root and make sure augeas can parse it.
self.root = os.path.abspath(root)
self.loc = {"root": self._find_config_root()}
@ -77,6 +88,146 @@ class ApacheParser(object):
if self.find_dir("Define", exclude=False):
raise errors.PluginError("Error parsing runtime variables")
def init_augeas(self):
""" Initialize the actual Augeas instance """
try:
import augeas
except ImportError: # pragma: no cover
raise errors.NoInstallationError("Problem in Augeas installation")
self.aug = augeas.Augeas(
# specify a directory to load our preferred lens from
loadpath=constants.AUGEAS_LENS_DIR,
# Do not save backup (we do it ourselves), do not load
# anything by default
flags=(augeas.Augeas.NONE |
augeas.Augeas.NO_MODL_AUTOLOAD |
augeas.Augeas.ENABLE_SPAN))
def check_parsing_errors(self, lens):
"""Verify Augeas can parse all of the lens files.
:param str lens: lens to check for errors
:raises .errors.PluginError: If there has been an error in parsing with
the specified lens.
"""
error_files = self.aug.match("/augeas//error")
for path in error_files:
# Check to see if it was an error resulting from the use of
# the httpd lens
lens_path = self.aug.get(path + "/lens")
# As aug.get may return null
if lens_path and lens in lens_path:
msg = (
"There has been an error in parsing the file {0} on line {1}: "
"{2}".format(
# Strip off /augeas/files and /error
path[13:len(path) - 6],
self.aug.get(path + "/line"),
self.aug.get(path + "/message")))
raise errors.PluginError(msg)
def check_aug_version(self):
""" Checks that we have recent enough version of libaugeas.
If augeas version is recent enough, it will support case insensitive
regexp matching"""
self.aug.set("/test/path/testing/arg", "aRgUMeNT")
try:
matches = self.aug.match(
"/test//*[self::arg=~regexp('argument', 'i')]")
except RuntimeError:
self.aug.remove("/test/path")
return False
self.aug.remove("/test/path")
return matches
def unsaved_files(self):
"""Lists files that have modified Augeas DOM but the changes have not
been written to the filesystem yet, used by `self.save()` and
ApacheConfigurator to check the file state.
:raises .errors.PluginError: If there was an error in Augeas, in
an attempt to save the configuration, or an error creating a
checkpoint
:returns: `set` of unsaved files
"""
save_state = self.aug.get("/augeas/save")
self.aug.set("/augeas/save", "noop")
# Existing Errors
ex_errs = self.aug.match("/augeas//error")
try:
# This is a noop save
self.aug.save()
except (RuntimeError, IOError):
self._log_save_errors(ex_errs)
# Erase Save Notes
self.configurator.save_notes = ""
raise errors.PluginError(
"Error saving files, check logs for more info.")
# Return the original save method
self.aug.set("/augeas/save", save_state)
# Retrieve list of modified files
# Note: Noop saves can cause the file to be listed twice, I used a
# set to remove this possibility. This is a known augeas 0.10 error.
save_paths = self.aug.match("/augeas/events/saved")
save_files = set()
if save_paths:
for path in save_paths:
save_files.add(self.aug.get(path)[6:])
return save_files
def ensure_augeas_state(self):
"""Makes sure that all Augeas dom changes are written to files to avoid
loss of configuration directives when doing additional augeas parsing,
causing a possible augeas.load() resulting dom reset
"""
if self.unsaved_files():
self.configurator.save_notes += "(autosave)"
self.configurator.save()
def save(self, save_files):
"""Saves all changes to the configuration files.
save() is called from ApacheConfigurator to handle the parser specific
tasks of saving.
:param list save_files: list of strings of file paths that we need to save.
"""
self.configurator.save_notes = ""
self.aug.save()
# Force reload if files were modified
# This is needed to recalculate augeas directive span
if save_files:
for sf in save_files:
self.aug.remove("/files/"+sf)
self.aug.load()
def _log_save_errors(self, ex_errs):
"""Log errors due to bad Augeas save.
:param list ex_errs: Existing errors before save
"""
# Check for the root of save problems
new_errs = self.aug.match("/augeas//error")
# logger.error("During Save - %s", mod_conf)
logger.error("Unable to save files: %s. Attempted Save Notes: %s",
", ".join(err[13:len(err) - 6] for err in new_errs
# Only new errors caused by recent save
if err not in ex_errs), self.configurator.save_notes)
def add_include(self, main_config, inc_path):
"""Add Include for a new configuration file if one does not exist
@ -658,8 +809,7 @@ class ApacheParser(object):
use_new, remove_old = self._check_path_actions(filepath)
# Ensure that we have the latest Augeas DOM state on disk before
# calling aug.load() which reloads the state from disk
if self.configurator:
self.configurator.ensure_augeas_state()
self.ensure_augeas_state()
# Test if augeas included file for Httpd.lens
# Note: This works for augeas globs, ie. *.conf
if use_new:

View file

@ -165,10 +165,6 @@ class CentOS6Tests(util.ApacheTest):
"LoadModule", "ssl_module", start=self.vh_truth[1].path, exclude=False)
self.assertEqual(len(post_loadmods), 1)
def test_loadmod_non_duplicate(self):
# the modules/mod_ssl.so exists in ssl.conf
sslmod_args = ["ssl_module", "modules/mod_somethingelse.so"]
@ -197,7 +193,7 @@ class CentOS6Tests(util.ApacheTest):
exclude=False)
for mod in orig_loadmods:
noarg_path = mod.rpartition("/")[0]
self.config.aug.remove(noarg_path)
self.config.parser.aug.remove(noarg_path)
self.config.save()
self.config.deploy_cert(
"random.demo", "example/cert.pem", "example/key.pem",

View file

@ -1,21 +1,20 @@
"""Test for certbot_apache.augeas_configurator."""
"""Test for certbot_apache.configurator implementations of reverter"""
import shutil
import unittest
import mock
from certbot import errors
from certbot.compat import os
from certbot_apache.tests import util
class AugeasConfiguratorTest(util.ApacheTest):
"""Test for Augeas Configurator base class."""
class ConfiguratorReverterTest(util.ApacheTest):
"""Test for ApacheConfigurator reverter methods"""
def setUp(self): # pylint: disable=arguments-differ
super(AugeasConfiguratorTest, self).setUp()
super(ConfiguratorReverterTest, self).setUp()
self.config = util.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir)
@ -28,20 +27,6 @@ class AugeasConfiguratorTest(util.ApacheTest):
shutil.rmtree(self.work_dir)
shutil.rmtree(self.temp_dir)
def test_bad_parse(self):
# pylint: disable=protected-access
self.config.parser.parse_file(os.path.join(
self.config.parser.root, "conf-available", "bad_conf_file.conf"))
self.assertRaises(
errors.PluginError, self.config.check_parsing_errors, "httpd.aug")
def test_bad_save(self):
mock_save = mock.Mock()
mock_save.side_effect = IOError
self.config.aug.save = mock_save
self.assertRaises(errors.PluginError, self.config.save)
def test_bad_save_checkpoint(self):
self.config.reverter.add_to_checkpoint = mock.Mock(
side_effect=errors.ReverterError)
@ -63,23 +48,9 @@ class AugeasConfiguratorTest(util.ApacheTest):
self.assertTrue(mock_finalize.is_called)
def test_recovery_routine(self):
mock_load = mock.Mock()
self.config.aug.load = mock_load
self.config.recovery_routine()
self.assertEqual(mock_load.call_count, 1)
def test_recovery_routine_error(self):
self.config.reverter.recovery_routine = mock.Mock(
side_effect=errors.ReverterError)
self.assertRaises(
errors.PluginError, self.config.recovery_routine)
def test_revert_challenge_config(self):
mock_load = mock.Mock()
self.config.aug.load = mock_load
self.config.parser.aug.load = mock_load
self.config.revert_challenge_config()
self.assertEqual(mock_load.call_count, 1)
@ -93,7 +64,7 @@ class AugeasConfiguratorTest(util.ApacheTest):
def test_rollback_checkpoints(self):
mock_load = mock.Mock()
self.config.aug.load = mock_load
self.config.parser.aug.load = mock_load
self.config.rollback_checkpoints()
self.assertEqual(mock_load.call_count, 1)
@ -111,6 +82,12 @@ class AugeasConfiguratorTest(util.ApacheTest):
side_effect=errors.ReverterError)
self.assertRaises(errors.PluginError, self.config.view_config_changes)
def test_recovery_routine_reload(self):
mock_load = mock.Mock()
self.config.parser.aug.load = mock_load
self.config.recovery_routine()
self.assertEqual(mock_load.call_count, 1)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -51,25 +51,14 @@ class MultipleVhostsTest(util.ApacheTest):
self.config.deploy_cert = mocked_deploy_cert
return self.config
@mock.patch("certbot_apache.configurator.ApacheConfigurator.init_augeas")
@mock.patch("certbot_apache.configurator.path_surgery")
def test_prepare_no_install(self, mock_surgery, _init_augeas):
def test_prepare_no_install(self, mock_surgery):
silly_path = {"PATH": "/tmp/nothingness2342"}
mock_surgery.return_value = False
with mock.patch.dict('os.environ', silly_path):
self.assertRaises(errors.NoInstallationError, self.config.prepare)
self.assertEqual(mock_surgery.call_count, 1)
@mock.patch("certbot_apache.augeas_configurator.AugeasConfigurator.init_augeas")
def test_prepare_no_augeas(self, mock_init_augeas):
""" Test augeas initialization ImportError """
def side_effect_error():
""" Side effect error for the test """
raise ImportError
mock_init_augeas.side_effect = side_effect_error
self.assertRaises(
errors.NoInstallationError, self.config.prepare)
@mock.patch("certbot_apache.parser.ApacheParser")
@mock.patch("certbot_apache.configurator.util.exe_exists")
def test_prepare_version(self, mock_exe_exists, _):
@ -81,16 +70,6 @@ class MultipleVhostsTest(util.ApacheTest):
self.assertRaises(
errors.NotSupportedError, self.config.prepare)
@mock.patch("certbot_apache.parser.ApacheParser")
@mock.patch("certbot_apache.configurator.util.exe_exists")
def test_prepare_old_aug(self, mock_exe_exists, _):
mock_exe_exists.return_value = True
self.config.config_test = mock.Mock()
# pylint: disable=protected-access
self.config._check_aug_version = mock.Mock(return_value=False)
self.assertRaises(
errors.NotSupportedError, self.config.prepare)
def test_prepare_locked(self):
server_root = self.config.conf("server-root")
self.config.config_test = mock.Mock()
@ -675,7 +654,7 @@ class MultipleVhostsTest(util.ApacheTest):
# span excludes the closing </VirtualHost> tag in older versions
# of Augeas
return_value = [self.vh_truth[0].filep, 1, 12, 0, 0, 0, 1142]
with mock.patch.object(self.config.aug, 'span') as mock_span:
with mock.patch.object(self.config.parser.aug, 'span') as mock_span:
mock_span.return_value = return_value
self.test_make_vhost_ssl()
@ -683,7 +662,7 @@ class MultipleVhostsTest(util.ApacheTest):
# span includes the closing </VirtualHost> tag in newer versions
# of Augeas
return_value = [self.vh_truth[0].filep, 1, 12, 0, 0, 0, 1157]
with mock.patch.object(self.config.aug, 'span') as mock_span:
with mock.patch.object(self.config.parser.aug, 'span') as mock_span:
mock_span.return_value = return_value
self.test_make_vhost_ssl()
@ -1232,7 +1211,7 @@ class MultipleVhostsTest(util.ApacheTest):
except errors.PluginEnhancementAlreadyPresent:
args_paths = self.config.parser.find_dir(
"RewriteRule", None, http_vhost.path, False)
arg_vals = [self.config.aug.get(x) for x in args_paths]
arg_vals = [self.config.parser.aug.get(x) for x in args_paths]
self.assertEqual(arg_vals, constants.REWRITE_HTTPS_ARGS)
@ -1335,15 +1314,6 @@ class MultipleVhostsTest(util.ApacheTest):
return account_key, (achall1, achall2, achall3)
def test_aug_version(self):
mock_match = mock.Mock(return_value=["something"])
self.config.aug.match = mock_match
# pylint: disable=protected-access
self.assertEqual(self.config._check_aug_version(),
["something"])
self.config.aug.match.side_effect = RuntimeError
self.assertFalse(self.config._check_aug_version())
def test_enable_site_nondebian(self):
inc_path = "/path/to/wherever"
vhost = self.vh_truth[0]
@ -1512,7 +1482,7 @@ class MultipleVhostsTest(util.ApacheTest):
self.assertEqual(first_id, second_id)
def test_realpath_replaces_symlink(self):
orig_match = self.config.aug.match
orig_match = self.config.parser.aug.match
mock_vhost = copy.deepcopy(self.vh_truth[0])
mock_vhost.filep = mock_vhost.filep.replace('sites-enabled', u'sites-available')
mock_vhost.path = mock_vhost.path.replace('sites-enabled', 'sites-available')
@ -1526,7 +1496,7 @@ class MultipleVhostsTest(util.ApacheTest):
return orig_match(aug_expr)
self.config.parser.parser_paths = ["/mocked/path"]
self.config.aug.match = mock_match
self.config.parser.aug.match = mock_match
vhs = self.config.get_virtual_hosts()
self.assertEqual(len(vhs), 2)
self.assertTrue(vhs[0] == self.vh_truth[1])
@ -1552,8 +1522,8 @@ class AugeasVhostsTest(util.ApacheTest):
self.work_dir)
def test_choosevhost_with_illegal_name(self):
self.config.aug = mock.MagicMock()
self.config.aug.match.side_effect = RuntimeError
self.config.parser.aug = mock.MagicMock()
self.config.parser.aug.match.side_effect = RuntimeError
path = "debian_apache_2_4/augeas_vhosts/apache2/sites-available/old-and-default.conf"
chosen_vhost = self.config._create_vhost(path)
self.assertEqual(None, chosen_vhost)

View file

@ -1,8 +1,8 @@
# pylint: disable=too-many-public-methods
"""Tests for certbot_apache.parser."""
import shutil
import unittest
import augeas
import mock
from certbot import errors
@ -22,6 +22,27 @@ class BasicParserTest(util.ParserTest):
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_bad_parse(self):
self.parser.parse_file(os.path.join(self.parser.root,
"conf-available", "bad_conf_file.conf"))
self.assertRaises(
errors.PluginError, self.parser.check_parsing_errors, "httpd.aug")
def test_bad_save(self):
mock_save = mock.Mock()
mock_save.side_effect = IOError
self.parser.aug.save = mock_save
self.assertRaises(errors.PluginError, self.parser.unsaved_files)
def test_aug_version(self):
mock_match = mock.Mock(return_value=["something"])
self.parser.aug.match = mock_match
# pylint: disable=protected-access
self.assertEqual(self.parser.check_aug_version(),
["something"])
self.parser.aug.match.side_effect = RuntimeError
self.assertFalse(self.parser.check_aug_version())
def test_find_config_root_no_root(self):
# pylint: disable=protected-access
os.remove(self.parser.loc["root"])
@ -311,21 +332,38 @@ class BasicParserTest(util.ParserTest):
class ParserInitTest(util.ApacheTest):
def setUp(self): # pylint: disable=arguments-differ
super(ParserInitTest, self).setUp()
self.aug = augeas.Augeas(
flags=augeas.Augeas.NONE | augeas.Augeas.NO_MODL_AUTOLOAD)
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
@mock.patch("certbot_apache.parser.ApacheParser.init_augeas")
def test_prepare_no_augeas(self, mock_init_augeas):
from certbot_apache.parser import ApacheParser
mock_init_augeas.side_effect = errors.NoInstallationError
self.config.config_test = mock.Mock()
self.assertRaises(
errors.NoInstallationError, ApacheParser,
os.path.relpath(self.config_path), "/dummy/vhostpath",
version=(2, 4, 22), configurator=self.config)
def test_init_old_aug(self):
from certbot_apache.parser import ApacheParser
with mock.patch("certbot_apache.parser.ApacheParser.check_aug_version") as mock_c:
mock_c.return_value = False
self.assertRaises(
errors.NotSupportedError,
ApacheParser, os.path.relpath(self.config_path),
"/dummy/vhostpath", version=(2, 4, 22), configurator=self.config)
@mock.patch("certbot_apache.parser.ApacheParser._get_runtime_cfg")
def test_unparseable(self, mock_cfg):
from certbot_apache.parser import ApacheParser
mock_cfg.return_value = ('Define: TEST')
self.assertRaises(
errors.PluginError,
ApacheParser, self.aug, os.path.relpath(self.config_path),
ApacheParser, os.path.relpath(self.config_path),
"/dummy/vhostpath", version=(2, 2, 22), configurator=self.config)
def test_root_normalized(self):
@ -337,8 +375,7 @@ class ParserInitTest(util.ApacheTest):
self.temp_dir,
"debian_apache_2_4/////multiple_vhosts/../multiple_vhosts/apache2")
parser = ApacheParser(self.aug, path,
"/dummy/vhostpath", configurator=self.config)
parser = ApacheParser(path, "/dummy/vhostpath", configurator=self.config)
self.assertEqual(parser.root, self.config_path)
@ -347,7 +384,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
self.aug, os.path.relpath(self.config_path),
os.path.relpath(self.config_path),
"/dummy/vhostpath", configurator=self.config)
self.assertEqual(parser.root, self.config_path)
@ -357,7 +394,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
self.aug, self.config_path + os.path.sep,
self.config_path + os.path.sep,
"/dummy/vhostpath", configurator=self.config)
self.assertEqual(parser.root, self.config_path)

View file

@ -78,8 +78,7 @@ class ParserTest(ApacheTest):
with mock.patch("certbot_apache.parser.ApacheParser."
"update_runtime_variables"):
self.parser = ApacheParser(
self.aug, self.config_path, self.vhost_path,
configurator=self.config)
self.config_path, self.vhost_path, configurator=self.config)
def get_apache_configurator( # pylint: disable=too-many-arguments, too-many-locals