Implementaiton of find_* directives

This commit is contained in:
m0namon 2020-01-23 18:01:14 -08:00
parent d263537301
commit e54cd0a534
6 changed files with 140 additions and 33 deletions

View file

@ -1,10 +1,26 @@
""" apacheconfig implementation of the ParserNode interfaces """
import glob
from certbot import errors
from certbot.compat import os
from certbot_apache._internal import assertions
from certbot_apache._internal import interfaces
from certbot_apache._internal import parsernode_util as util
def _load_file(filename, metadata):
with open(filename) as f:
ast = metadata['loader'].loads(f.read())
metadata = metadata.copy()
metadata['ac_ast'] = ast
return ApacheBlockNode(name=assertions.PASS,
ancestor=None,
filepath=filename,
metadata=metadata)
class ApacheParserNode(interfaces.ParserNode):
""" apacheconfig implementation of ParserNode interface.
@ -62,6 +78,19 @@ class ApacheDirectiveNode(ApacheParserNode):
self.enabled = enabled
self.include = None
# Include processing
if self.name and self.name.lower() in ["include", "includeoptional"]:
value = self.parameters[0]
path = os.path.join(self.metadata['serverroot'], value)
if os.path.isdir(path):
path += "/*" # TODO (mona): test
filepaths = glob.glob(path)
for filepath in filepaths:
if filepath not in self.metadata['parsed_files']:
node = _load_file(filepath, self.metadata)
self.metadata['parsed_files'][filepath] = node
self.include = set(filepaths)
def __eq__(self, other): # pragma: no cover
if isinstance(other, self.__class__):
return (self.name == other.name and
@ -75,7 +104,28 @@ class ApacheDirectiveNode(ApacheParserNode):
def set_parameters(self, _parameters):
"""Sets the parameters for DirectiveNode"""
return
self.parameters = tuple(_parameters)
self._raw.value = tuple(" ".join(_parameters))
def _recursive_generator(node, exclude=True, files_visited=None):
# iterator through children, and recursively expands through blocks and includes
if not files_visited:
files_visited = set([node.filepath])
for child in node.children:
if exclude and not isinstance(child, ApacheCommentNode) and not child.enabled:
continue
yield child
if isinstance(child, ApacheBlockNode):
for subchild in _recursive_generator(child, exclude, files_visited):
yield subchild
if isinstance(child, ApacheDirectiveNode) and child.include:
for filename in child.include:
if filename not in files_visited:
files_visited.add(filename)
file_ast = node.metadata['parsed_files'][filename]
for subchild in _recursive_generator(file_ast, exclude, files_visited):
yield subchild
class ApacheBlockNode(ApacheDirectiveNode):
@ -83,9 +133,32 @@ class ApacheBlockNode(ApacheDirectiveNode):
def __init__(self, **kwargs):
super(ApacheBlockNode, self).__init__(**kwargs)
self.children = ()
self._raw_children = self._raw
children = []
def __eq__(self, other): # pragma: no cover
for raw_node in self._raw_children:
metadata = self.metadata.copy()
metadata['ac_ast'] = raw_node
if raw_node.typestring == "comment":
node = ApacheCommentNode(comment=raw_node.name[2:],
metadata=metadata, ancestor=self,
filepath=self.filepath)
elif raw_node.typestring == "block": # TODO (mona) mypy annotations
parameters = util.parameters_from_string(raw_node.arguments)
node = ApacheBlockNode(name=raw_node.tag, parameters=parameters,
metadata=metadata, ancestor=self,
filepath=self.filepath, enabled=self.enabled)
else:
parameters = ()
if raw_node.value: # TODO (mona) mypy annotations
parameters = util.parameters_from_string(raw_node.value)
node = ApacheDirectiveNode(name=raw_node.name, parameters=parameters,
metadata=metadata, ancestor=self,
filepath=self.filepath, enabled=self.enabled)
children.append(node)
self.children = tuple(children)
def __eq__(self, other): # TODO (mona): test
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@ -128,28 +201,31 @@ class ApacheBlockNode(ApacheDirectiveNode):
self.children += (new_comment,)
return new_comment
def find_blocks(self, name, exclude=True): # pylint: disable=unused-argument
# TODO: Implement exclude
def find_blocks(self, name, exclude=False): # pylint: disable=unused-argument
"""Recursive search of BlockNodes from the sequence of children"""
return [ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
blocks = []
for child in _recursive_generator(self, exclude=exclude):
if isinstance(child, ApacheBlockNode) and child.name.lower() == name.lower():
blocks.append(child)
return blocks
def find_directives(self, name, exclude=True): # pylint: disable=unused-argument
"""Recursive search of DirectiveNodes from the sequence of children"""
return [ApacheDirectiveNode(name=assertions.PASS,
parameters=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
directives = []
for child in _recursive_generator(self, exclude=exclude):
if isinstance(child, ApacheDirectiveNode) and child.name.lower() == name.lower():
directives.append(child)
return directives
def find_comments(self, comment, exact=False): # pylint: disable=unused-argument
"""Recursive search of DirectiveNodes from the sequence of children"""
return [ApacheCommentNode(comment=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
comments = []
for child in _recursive_generator(self):
# TODO: Is this the correct metric for matching comments?
if isinstance(child, ApacheCommentNode) and comment in child.comment:
comments.append(child)
return comments
def delete_child(self, child): # pragma: no cover
"""Deletes a ParserNode from the sequence of children"""

View file

@ -389,8 +389,8 @@ class AugeasBlockNode(AugeasDirectiveNode):
exception if it's unable to do so.
:param AugeasParserNode: child: A node to delete.
"""
if not self.parser.aug.remove(child.metadata["augeaspath"]):
if not self.parser.aug.remove(child.metadata["augeaspath"]):
raise errors.PluginError(
("Could not delete child node, the Augeas path: {} doesn't " +
"seem to exist.").format(child.metadata["augeaspath"])

View file

@ -265,6 +265,8 @@ class ApacheConfigurator(common.Installer):
# Set up ParserNode root
pn_meta = {"augeasparser": self.parser,
"augeaspath": self.parser.get_root_augpath(),
# TODO (mona): audit the use of serverroot here
"serverroot": self.option("server_root"),
"ac_ast": None}
if self.USE_PARSERNODE and HAS_APACHECONFIG:
self.parser_root = self.get_parsernode_root(pn_meta)
@ -374,17 +376,19 @@ class ApacheConfigurator(common.Installer):
apache_vars["modules"] = apache_util.parse_modules(self.option("ctl"))
metadata["apache_vars"] = apache_vars
with open(self.parser.loc["root"]) as f:
with apacheconfig.make_loader(writable=True,
**apacheconfig.flavors.NATIVE_APACHE) as loader:
with apacheconfig.make_loader(writable=True,
**apacheconfig.flavors.NATIVE_APACHE) as loader:
with open(self.parser.loc["root"]) as f:
metadata["ac_ast"] = loader.loads(f.read())
metadata["loader"] = loader
metadata["parsed_files"] = {}
return dualparser.DualBlockNode(
name=assertions.PASS,
ancestor=None,
filepath=self.parser.loc["root"],
metadata=metadata
)
return dualparser.DualBlockNode(
name=assertions.PASS,
ancestor=None,
filepath=self.parser.loc["root"],
metadata=metadata
)
def _wildcard_domain(self, domain):
"""

View file

@ -127,3 +127,27 @@ def directivenode_kwargs(kwargs):
parameters = kwargs.pop("parameters")
enabled = kwargs.pop("enabled")
return name, parameters, enabled, kwargs
def parameters_from_string(text):
# TODO (mona) document this function
text = text.strip()
words = []
word = ""
quote = None
for c in text:
if c.isspace() and not quote:
if word:
words.append(word)
word = ""
else:
word += c
if not quote and c in "\"\'":
quote = c
elif c == quote:
words.append(word[1:-1])
word = ""
quote = None
if word:
words.append(word)
return tuple(words)

View file

@ -155,11 +155,11 @@ class AugeasParserNodeTest(util.ApacheTest): # pylint: disable=too-many-public-
def test_add_child_comment(self):
newc = self.config.parser_root.primary.add_child_comment("The content")
comments = self.config.parser_root.find_comments("The content")
comments = self.config.parser_root.primary.find_comments("The content")
self.assertEqual(len(comments), 1)
self.assertEqual(
newc.metadata["augeaspath"],
comments[0].primary.metadata["augeaspath"]
comments[0].metadata["augeaspath"]
)
self.assertEqual(newc.comment, comments[0].comment)
@ -288,11 +288,11 @@ class AugeasParserNodeTest(util.ApacheTest): # pylint: disable=too-many-public-
["with", "parameters"],
position=0
)
dirs = self.config.parser_root.find_directives("ThisWasAdded")
dirs = self.config.parser_root.primary.find_directives("ThisWasAdded")
self.assertEqual(len(dirs), 1)
self.assertEqual(dirs[0].parameters, ("with", "parameters"))
# The new directive was added to the very first line of the config
self.assertTrue(dirs[0].primary.metadata["augeaspath"].endswith("[1]"))
self.assertTrue(dirs[0].metadata["augeaspath"].endswith("[1]"))
def test_add_child_directive_exception(self):
self.assertRaises(

View file

@ -15,7 +15,10 @@ class DualParserNodeTest(unittest.TestCase): # pylint: disable=too-many-public-
parser_mock = mock.MagicMock()
parser_mock.aug.match.return_value = []
parser_mock.get_arg.return_value = []
self.metadata = {"augeasparser": parser_mock, "augeaspath": "/invalid", "ac_ast": None}
ast_mock = mock.MagicMock()
ast_mock.__iter__.return_value = iter([])
self.metadata = {"augeasparser": parser_mock,
"augeaspath": "/invalid", "ac_ast": ast_mock}
self.block = dualparser.DualBlockNode(name="block",
ancestor=None,
filepath="/tmp/something",