Fix reverse searches

This commit is contained in:
Joona Hoikkala 2019-12-13 16:14:09 +02:00
parent 9e5bca4bbf
commit 88cb4c8547
No known key found for this signature in database
GPG key ID: D5AA86BBF9B29A5C
5 changed files with 172 additions and 1 deletions

View file

@ -52,6 +52,23 @@ def get_internal_aug_path(vhost_path):
"""
return _split_aug_path(vhost_path)[1]
def normalize_filepath(path, root):
"""Attempts to normalize path to absolute path.
:param str path: The file path
:param str root: The working directory root
:returns: Normalized absolute path to a file
:rtype: str
"""
# Expands references to user home
path = os.path.expanduser(path)
# Removes ../ etc.
path = os.path.normpath(path)
if not os.path.isabs(path):
path = os.path.join(root, path)
return path
def _split_aug_path(vhost_path):
"""Splits an Augeas path into a file path and an internal path.

View file

@ -857,7 +857,7 @@ class ApacheConfigurator(common.Installer):
return None
macro = False
if "/macro/" in path.lower():
if self.parser.find_blocks_from_include_tree("macro", path):
macro = True
vhost_enabled = self.parser.parsed_in_original(filename)

View file

@ -13,6 +13,7 @@ from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import constants
logger = logging.getLogger(__name__)
@ -87,6 +88,10 @@ class ApacheParser(object):
if self.find_dir("Define", exclude=False):
raise errors.PluginError("Error parsing runtime variables")
# Store information about includes. This must be stored to mitigate
# the performance penalty.
self.includes = dict() # type: Dict[str, str]
def init_augeas(self):
""" Initialize the actual Augeas instance """
@ -245,6 +250,7 @@ class ApacheParser(object):
new_dir = os.path.dirname(inc_path)
new_file = os.path.basename(inc_path)
self.existing_paths.setdefault(new_dir, []).append(new_file)
self.includes = self._find_all_includes()
def add_mod(self, mod_name):
"""Shortcut for updating parser modules."""
@ -442,6 +448,10 @@ class ApacheParser(object):
else:
for i, arg in enumerate(args):
self.aug.set("%s/arg[%d]" % (nvh_path, i + 1), arg)
# Catch include and includeoptional
if directive.lower().startswith("include"):
# Refresh the list of includes
self.includes = self._find_all_includes()
def get_ifmod(self, aug_conf_path, mod, beginning=False):
"""Returns the path to <IfMod mod> and creates one if it doesn't exist.
@ -560,6 +570,122 @@ class ApacheParser(object):
results.append(comment)
return results
def find_blocks_from_include_tree(self, block, path):
"""Recursively searches through parents of this path and returns
a list of Augeas paths of the Apache configuration blocks that
match the searched name. This search is case insensitive.
:param str block: Case insensitive name of the blocks to search
:param str path: Augeas path for the object which parents to search
:returns: A list of Augeas paths to found blocks
:rtype: list
"""
block_paths = [] # type: List[str]
# Queue tracks the paths that we need to search for
queue = list()
# Searched paths tracks the paths that we already tried to
# find includes for. Avoids infinite loops.
searched_paths = set() # type: Set[str]
searchpath = path
while searchpath:
# Add the results to the result list
block_paths += self._find_all_blocks_from_path(block, searchpath)
parents = self.find_includes_for_path(searchpath)
# Handle the search queue
searched_paths.add(searchpath)
for parent in parents:
if parent not in searched_paths:
queue.append(parent)
try:
searchpath = queue.pop()
except IndexError:
searchpath = None
return block_paths
def _find_all_blocks_from_path(self, block, path):
"""Helper function for find_block_from_include_tree to find all
occurances of block name from the current path.
:param str block: Name of the block to find
:param str path: The Augeas path to search the blocks from
:returns: A list of Augeas paths to the searched blocks
:rtype: list of str
"""
block_paths = [] # type: List[str]
# Find blocks in the middle of path as well as in the end
block = "/{}(/|$)".format(block)
# Not found
if block.lower() not in path.lower():
return block_paths
startidx = 0
found = re.search(block, path[startidx:], flags=re.IGNORECASE)
while found:
if found:
block_paths.append(path[:found.end()])
startidx = found.end()
found = re.search(block, path[startidx:], flags=re.IGNORECASE)
# We want the list to be from the leaf to the root
block_paths.reverse()
return block_paths
def find_includes_for_path(self, path):
"""Searches for Include or IncludeOptional directive
that adds the configuration file in Augeas path to the Augeas
DOM and Apache configuration.
:param str path: Augeas path to search Include directive for
:returns: Augeas paths of the Include directives that included
the file in provided Augeas path
:rtype: list of str
"""
# Check if the path is in the root configuration
if path.startswith(get_aug_path(self.loc["root"])):
return []
if not self.includes:
self.includes = self._find_all_includes()
filepath = apache_util.get_file_path(path)
include_paths = []
for inc in self.includes:
incpath = apache_util.normalize_filepath(
self.includes[inc],
os.path.dirname(self.loc["root"])
)
if fnmatch.fnmatch(filepath, incpath):
# Should end with /arg
include_paths.append(inc)
return include_paths
def _find_all_includes(self):
"""A helper function to find and return all the active Include and
IncludeOptional directives from the configuration. The arguments
are also resolved, and a dictionary containing the Augeas path
and the argument value is returned.
:returns: A dictionary of Include and IncludeOptional directives
and their values.
"""
includes = self.find_dir("Include", start="/files")
includes += self.find_dir("IncludeOptional", start="/files")
inc_dict = dict()
for inc in includes:
# Remove the /arg from directive path for dictionary key
incpath = inc.rpartition("/arg")[0]
inc_dict[incpath] = self.get_arg(inc)
return inc_dict
def find_dir(self, directive, arg=None, start=None, exclude=True):
"""Finds directive in the configuration.

View file

@ -326,6 +326,29 @@ class BasicParserTest(util.ParserTest):
self.assertEqual(len(comm), 1)
self.assertTrue(self.parser.loc["name"] in comm[0])
def test_find_blocks_from_include_tree(self):
default_ssl = None
for vh in self.config.vhosts:
if vh.path.endswith("default-ssl.conf/IfModule/VirtualHost"):
default_ssl = vh
break
assert default_ssl is not None
# Need to add this manually as apache2ctl cannot be run for tests
self.parser.modules.add("mod_ssl.c")
blocks = self.parser.find_blocks_from_include_tree("ifmodule",
default_ssl.path)
self.assertEqual(len(blocks), 2)
self.assertTrue(blocks[0].endswith(
"sites-enabled/default-ssl.conf/IfModule"
))
self.assertTrue(blocks[1].endswith(
"apache2.conf/IfModule"
))
notfound = self.parser.find_blocks_from_include_tree("nonexistent",
default_ssl.path)
self.assertEqual(notfound, [])
class ParserInitTest(util.ApacheTest):
def setUp(self): # pylint: disable=arguments-differ

View file

@ -204,4 +204,9 @@ IncludeOptional sites-enabled/*.conf
</VirtualHost>
# Custom re-include of a single file to test multiple ancestor searches
<IfModule mod_ssl.c>
IncludeOptional sites-enabled/default-ssl.conf
</IfModule>
# vim: syntax=apache ts=4 sw=4 sts=4 sr noet