Full apacheparser implementation.

This commit is contained in:
sydneyli 2019-10-03 18:48:09 -07:00
parent 6148e5c355
commit 9bfce3177e
8 changed files with 214 additions and 53 deletions

View file

@ -1,10 +1,26 @@
""" apacheconfig implementation of the ParserNode interfaces """
import glob
from certbot import errors
from certbot.compat import os
from certbot_apache import assertions
from certbot_apache import interfaces
from certbot_apache import parsernode_util as util
def _load_file(filename, metadata):
with open(filename) as f:
ast = metadata['loader'].loads(f.read())
metadata = metadata.copy()
metadata['ac_ast'] = ast
return ApacheBlockNode(name=assertions.PASS,
ancestor=None,
filepath=filename,
metadata=metadata)
class ApacheParserNode(interfaces.ParserNode):
""" apacheconfig implementation of ParserNode interface.
@ -62,6 +78,19 @@ class ApacheDirectiveNode(ApacheParserNode):
self.enabled = enabled
self.include = None
# Include processing
if self.name and self.name.lower() in ["include", "includeoptional"]:
value = self.parameters[0]
path = os.path.join(os.path.dirname(self.metadata['serverroot']), value)
if os.path.isdir(path):
path += "/*"
filepaths = glob.glob(path)
for filepath in filepaths:
if filepath not in self.metadata['parsed_files']:
node = _load_file(filepath, self.metadata)
self.metadata['parsed_files'][filepath] = node
self.include = set(filepaths)
def __eq__(self, other): # pragma: no cover
if isinstance(other, self.__class__):
return (self.name == other.name and
@ -75,7 +104,52 @@ class ApacheDirectiveNode(ApacheParserNode):
def set_parameters(self, parameters):
"""Sets the parameters for DirectiveNode"""
pass
self.parameters = tuple(parameters)
self._raw.value = tuple(" ".join(parameters))
def _parameters_from_string(text):
text = text.strip()
words = []
word = ""
quote = None
escape = False
for c in text:
if c.isspace() and not quote:
if word:
words.append(word)
word = ""
else:
word += c
if not escape:
if not quote and c in "\"\'":
quote = c
elif c == quote:
words.append(word[1:-1])
word = ""
quote = None
escape = c == "\\"
if word:
words.append(word)
return tuple(words)
def _recursive_generator(node, files_visited=None):
# iterator through children, and recursively expands through blocks and includes
if not files_visited:
files_visited = set([node.filepath])
for child in node.children:
yield child
if isinstance(child, ApacheBlockNode):
for subchild in _recursive_generator(child, files_visited):
yield subchild
if isinstance(child, ApacheDirectiveNode) and child.include:
for filename in child.include:
if filename not in files_visited:
files_visited.add(filename)
file_ast = node.metadata['parsed_files'][filename]
for subchild in _recursive_generator(file_ast, files_visited):
yield subchild
class ApacheBlockNode(ApacheDirectiveNode):
@ -83,9 +157,31 @@ class ApacheBlockNode(ApacheDirectiveNode):
def __init__(self, **kwargs):
super(ApacheBlockNode, self).__init__(**kwargs)
self.children = ()
self._raw_children = self._raw
children = []
for raw_node in self._raw_children:
metadata = self.metadata.copy()
metadata['ac_ast'] = raw_node
if raw_node.typestring == "comment":
# TODO: Why does the other implementation cut off both the "#" and initial spaces?
node = ApacheCommentNode(comment=raw_node.name[2:], metadata=metadata,
ancestor=self, filepath=self.filepath)
elif raw_node.typestring == "block":
parameters = _parameters_from_string(raw_node.arguments)
node = ApacheBlockNode(name=raw_node.tag, parameters=parameters,
metadata=metadata, ancestor=self,
filepath=self.filepath)
else:
parameters = ()
if raw_node.value:
parameters = _parameters_from_string(raw_node.value)
node = ApacheDirectiveNode(name=raw_node.name,
parameters=parameters, metadata=metadata,
ancestor=self, filepath=self.filepath)
children.append(node)
self.children = tuple(children)
def __eq__(self, other): # pragma: no cover
def __eq__(self, other):
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@ -99,61 +195,98 @@ class ApacheBlockNode(ApacheDirectiveNode):
def add_child_block(self, name, parameters=None, position=None): # pylint: disable=unused-argument
"""Adds a new BlockNode to the sequence of children"""
new_block = ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)
self.children += (new_block,)
parameters_string = " " + " ".join(parameters) if parameters else ""
position = len(self._raw_children) if not position else position
position = min(len(self._raw_children), position)
raw_block = self._raw_children.add(position, "\n<%s%s>\n</%s>" %
(name, parameters_string, name))
metadata = self.metadata.copy()
metadata['ac_ast'] = raw_block
if not parameters:
parameters = []
new_block = ApacheBlockNode(name=name, parameters=tuple(parameters),
ancestor=self, metadata=metadata, filepath=self.filepath)
# Update metadata
children = list(self.children)
children.insert(position, new_block)
self.children = tuple(children)
return new_block
def add_child_directive(self, name, parameters=None, position=None): # pylint: disable=unused-argument
"""Adds a new DirectiveNode to the sequence of children"""
new_dir = ApacheDirectiveNode(name=assertions.PASS,
parameters=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)
self.children += (new_dir,)
parameters_string = " " + " ".join(parameters) if parameters else ""
position = len(self._raw_children) if not position else position
raw_item = self._raw_children.add(position, "\n%s%s" % (name, parameters_string))
metadata = self.metadata.copy()
metadata['ac_ast'] = raw_item
if not parameters:
parameters = []
new_dir = ApacheDirectiveNode(name=name, parameters=tuple(parameters), ancestor=self,
metadata=metadata, filepath=self.filepath)
# Update metadata
children = list(self.children)
children.insert(position, new_dir)
self.children = tuple(children)
return new_dir
# pylint: disable=unused-argument
def add_child_comment(self, comment="", position=None): # pragma: no cover
"""Adds a new CommentNode to the sequence of children"""
new_comment = ApacheCommentNode(comment=assertions.PASS,
position = len(self._raw_children) if not position else position
raw_comment = self._raw_children.add(position, comment)
metadata = self.metadata.copy()
metadata['ac_ast'] = raw_comment
new_comment = ApacheCommentNode(comment=comment, metadata=metadata,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)
self.children += (new_comment,)
filepath=self.filepath)
# Update metadata
children = list(self.children)
children.insert(position, new_comment)
self.children = tuple(children)
return new_comment
# TODO: Implement exclude
def find_blocks(self, name, exclude=True): # pylint: disable=unused-argument
"""Recursive search of BlockNodes from the sequence of children"""
return [ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
blocks = []
for child in _recursive_generator(self):
if isinstance(child, ApacheBlockNode) and child.name.lower() == name.lower():
blocks.append(child)
return blocks
def find_directives(self, name, exclude=True): # pylint: disable=unused-argument
"""Recursive search of DirectiveNodes from the sequence of children"""
return [ApacheDirectiveNode(name=assertions.PASS,
parameters=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
directives = []
for child in _recursive_generator(self):
if isinstance(child, ApacheDirectiveNode) and child.name.lower() == name.lower():
directives.append(child)
return directives
def find_comments(self, comment, exact=False): # pylint: disable=unused-argument
"""Recursive search of DirectiveNodes from the sequence of children"""
return [ApacheCommentNode(comment=assertions.PASS,
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
comments = []
for child in _recursive_generator(self):
# TODO: Is this the correct metric for matching comments?
if isinstance(child, ApacheCommentNode) and comment in child.comment:
comments.append(child)
return comments
def delete_child(self, child): # pragma: no cover
"""Deletes a ParserNode from the sequence of children"""
pass
index = -1
i = None
for i, elem in enumerate(self.children):
if elem == child:
index = i
break
if index < 0:
raise errors.PluginError("Could not find child node to delete")
children_list = list(self.children)
thing = children_list.pop(i)
self.children = tuple(children_list)
self._raw_children.remove(i)
return thing
def unsaved_files(self): # pragma: no cover
"""Returns a list of unsaved filepaths"""

View file

@ -39,7 +39,6 @@ def assertEqualComment(first, second): # pragma: no cover
def _assertEqualDirectiveComponents(first, second): # pragma: no cover
""" Handles assertion for instance variables for DirectiveNode and BlockNode"""
# Enabled value cannot be asserted, because Augeas implementation
# is unable to figure that out.
# assert first.enabled == second.enabled

View file

@ -372,8 +372,8 @@ class AugeasBlockNode(AugeasDirectiveNode):
exception if it's unable to do so.
:param AugeasParserNode: child: A node to delete.
"""
if not self.parser.aug.remove(child.metadata["augeaspath"]):
if not self.parser.aug.remove(child.metadata["augeaspath"]):
raise errors.PluginError(
("Could not delete child node, the Augeas path: {} doesn't " +
"seem to exist.").format(child.metadata["augeaspath"])

View file

@ -18,6 +18,9 @@ import zope.interface
from acme import challenges
from acme.magic_typing import DefaultDict, Dict, List, Set, Union # pylint: disable=unused-import, no-name-in-module
from apacheconfig import make_writable_loader
from apacheconfig import flavors
from certbot import errors
from certbot import interfaces
from certbot import util
@ -255,12 +258,7 @@ class ApacheConfigurator(common.Installer):
self.recovery_routine()
# Perform the actual Augeas initialization to be able to react
self.parser = self.get_parser()
# Set up ParserNode root
pn_meta = {"augeasparser": self.parser,
"augeaspath": self.parser.get_root_augpath(),
"ac_ast": None}
self.parser_root = self.get_parsernode_root(pn_meta)
self.parser_root = self.get_parsernode_root()
# Check for errors in parsing files with Augeas
self.parser.check_parsing_errors("httpd.aug")
@ -357,20 +355,31 @@ class ApacheConfigurator(common.Installer):
self.option("server_root"), self.conf("vhost-root"),
self.version, configurator=self)
def get_parsernode_root(self, metadata):
def get_parsernode_root(self):
"""Initializes the ParserNode parser root instance."""
apache_vars = dict()
apache_vars["defines"] = apache_util.parse_defines(self.option("ctl"))
apache_vars["includes"] = apache_util.parse_includes(self.option("ctl"))
apache_vars["modules"] = apache_util.parse_modules(self.option("ctl"))
metadata["apache_vars"] = apache_vars
with make_writable_loader(**flavors.NATIVE_APACHE) as loader:
with open(self.parser.loc["root"]) as f:
ac_ast = loader.loads(f.read())
# Set up ParserNode root
pn_meta = {"augeasparser": self.parser,
"augeaspath": self.parser.get_root_augpath(),
"serverroot": self.parser.loc["root"],
"ac_ast": ac_ast,
"parsed_files": {},
"loader": loader}
pn_meta["apache_vars"] = apache_vars
return dualparser.DualBlockNode(
name=assertions.PASS,
ancestor=None,
filepath=self.parser.loc["root"],
metadata=metadata
metadata=pn_meta
)
def _wildcard_domain(self, domain):

View file

@ -64,6 +64,8 @@ class DualNodeBase(object):
new_nodes.append(nodeclass(primary=c,
secondary=secondary_res[0]))
else:
print len(primary_res)
print len(secondary_res)
assert len(primary_res) == len(secondary_res)
matches = self._create_matching_list(primary_res, secondary_res)
for p, s in matches:
@ -237,7 +239,8 @@ class DualBlockNode(DualNodeBase):
raise AssertionError("Could not find a matching node.")
return matched
def find_blocks(self, name, exclude=True):
# TODO: Re-set default "exclude" to True when apacheconfig version is implemented.
def find_blocks(self, name, exclude=False):
"""
Performs a search for BlockNodes using both implementations and does simple
checks for results. This is built upon the assumption that unimplemented
@ -249,7 +252,7 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualBlockNode, "find_blocks", name,
exclude=exclude)
def find_directives(self, name, exclude=True):
def find_directives(self, name, exclude=False):
"""
Performs a search for DirectiveNodes using both implementations and
checks the results. This is built upon the assumption that unimplemented

View file

@ -20,6 +20,19 @@ class AugeasParserNodeTest(util.ApacheTest): # pylint: disable=too-many-public-
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
def test_parameters_from_string(self):
from certbot_apache.apacheparser import _parameters_from_string
cases = [
(" a b\t c d", ("a", "b", "c", "d")),
("a \"b\" c", ("a", "b", "c")),
("a \" b c \"", ("a", " b c ")),
("a \"b \\\"c\\\"\"", ("a", "b \\\"c\\\"")),
("a \'b \"c\"\'", ("a", "b \"c\"")),
]
for case, expected in cases:
result = _parameters_from_string(case)
self.assertEqual(result, expected)
def test_save(self):
with mock.patch('certbot_apache.parser.ApacheParser.save') as mock_save:
self.config.parser_root.save("A save message")
@ -61,7 +74,7 @@ class AugeasParserNodeTest(util.ApacheTest): # pylint: disable=too-many-public-
def test_find_directive_found(self):
directives = self.config.parser_root.find_directives("Listen")
self.assertEqual(len(directives), 1)
self.assertEqual(len(directives), 3)
self.assertTrue(directives[0].filepath.endswith("/apache2/ports.conf"))
self.assertEqual(directives[0].parameters, (u'80',))
@ -103,7 +116,7 @@ class AugeasParserNodeTest(util.ApacheTest): # pylint: disable=too-many-public-
def test_set_parameters_atinit(self):
from certbot_apache.augeasparser import AugeasDirectiveNode
servernames = self.config.parser_root.find_directives("servername")
servernames = self.config.parser_root.find_directives("servername", exclude=False)
setparam = "certbot_apache.augeasparser.AugeasDirectiveNode.set_parameters"
with mock.patch(setparam) as mock_set:
AugeasDirectiveNode(
@ -146,11 +159,11 @@ class AugeasParserNodeTest(util.ApacheTest): # pylint: disable=too-many-public-
def test_add_child_comment(self):
newc = self.config.parser_root.primary.add_child_comment("The content")
comments = self.config.parser_root.find_comments("The content")
comments = self.config.parser_root.primary.find_comments("The content")
self.assertEqual(len(comments), 1)
self.assertEqual(
newc.metadata["augeaspath"],
comments[0].primary.metadata["augeaspath"]
comments[0].metadata["augeaspath"]
)
self.assertEqual(newc.comment, comments[0].comment)

View file

@ -15,7 +15,10 @@ class DualParserNodeTest(unittest.TestCase): # pylint: disable=too-many-public-
parser_mock = mock.MagicMock()
parser_mock.aug.match.return_value = []
parser_mock.get_arg.return_value = []
self.metadata = {"augeasparser": parser_mock, "augeaspath": "/invalid", "ac_ast": None}
ast_mock = mock.MagicMock()
ast_mock.__iter__.return_value = iter([])
self.metadata = {"augeasparser": parser_mock,
"augeaspath": "/invalid", "ac_ast": ast_mock}
self.block = dualparser.DualBlockNode(name="block",
ancestor=None,
filepath="/tmp/something",

View file

@ -11,6 +11,7 @@ version = '0.40.0.dev0'
install_requires = [
'acme>=0.29.0',
'certbot>=0.39.0',
'apacheconfig',
'mock',
'python-augeas',
'setuptools',