Fixing remaining typing issues

This commit is contained in:
Adrien Ferrand 2022-01-19 09:06:21 +01:00
parent f62eabc94e
commit 2ef0a702c9
10 changed files with 104 additions and 75 deletions

View file

@ -65,6 +65,7 @@ Translates over to:
]
"""
from typing import Any
from typing import cast
from typing import Dict
from typing import Iterable
from typing import List
@ -93,7 +94,8 @@ class AugeasParserNode(interfaces.ParserNode):
self.filepath: str = filepath
self.dirty: bool = dirty
self.metadata: Dict[str, Any] = metadata
self.parser: parser.ApacheParser = self.metadata.get("augeasparser")
self.parser: parser.ApacheParser = cast(parser.ApacheParser,
self.metadata.get("augeasparser"))
try:
if self.metadata["augeaspath"].endswith("/"):
raise errors.PluginError(
@ -146,14 +148,16 @@ class AugeasParserNode(interfaces.ParserNode):
}
# Check if the file was included from the root config or initial state
enabled = self.parser.parsed_in_original(
apache_util.get_file_path(path)
)
file_path = apache_util.get_file_path(path)
if file_path is None:
raise ValueError(f"No file path found for vhost: {path}.")
enabled = self.parser.parsed_in_original(file_path)
return AugeasBlockNode(name=name,
enabled=enabled,
ancestor=assertions.PASS,
filepath=apache_util.get_file_path(path),
filepath=file_path,
metadata=metadata)
def _aug_get_name(self, path: str) -> str:
@ -232,7 +236,7 @@ class AugeasDirectiveNode(AugeasParserNode):
self.parser.aug.set(param_path, param)
@property
def parameters(self) -> Tuple[str, ...]:
def parameters(self) -> Tuple[Optional[str], ...]:
"""
Fetches the parameters from Augeas tree, ensuring that the sequence always
represents the current state
@ -283,9 +287,10 @@ class AugeasBlockNode(AugeasDirectiveNode):
# Create the new block
self.parser.aug.insert(insertpath, name, before)
# Check if the file was included from the root config or initial state
enabled = self.parser.parsed_in_original(
apache_util.get_file_path(realpath)
)
file_path = apache_util.get_file_path(realpath)
if file_path is None:
raise errors.Error(f"No file path found for vhost: {realpath}")
enabled = self.parser.parsed_in_original(file_path)
# Parameters will be set at the initialization of the new object
return AugeasBlockNode(
@ -293,7 +298,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
parameters=parameters,
enabled=enabled,
ancestor=assertions.PASS,
filepath=apache_util.get_file_path(realpath),
filepath=file_path,
metadata=new_metadata,
)
@ -317,16 +322,17 @@ class AugeasBlockNode(AugeasDirectiveNode):
# Set the directive key
self.parser.aug.set(realpath, name)
# Check if the file was included from the root config or initial state
enabled = self.parser.parsed_in_original(
apache_util.get_file_path(realpath)
)
file_path = apache_util.get_file_path(realpath)
if file_path is None:
raise errors.Error(f"No file path found for vhost: {realpath}")
enabled = self.parser.parsed_in_original(file_path)
return AugeasDirectiveNode(
name=name,
parameters=parameters,
enabled=enabled,
ancestor=assertions.PASS,
filepath=apache_util.get_file_path(realpath),
filepath=file_path,
metadata=new_metadata,
)
@ -359,7 +365,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
"""Recursive search of BlockNodes from the sequence of children"""
nodes: List["AugeasBlockNode"] = []
paths = self._aug_find_blocks(name)
paths: Iterable[str] = self._aug_find_blocks(name)
if exclude:
paths = self.parser.exclude_dirs(paths)
for path in paths:

View file

@ -9,6 +9,7 @@ import time
from collections import defaultdict
from typing import Any
from typing import Callable
from typing import cast
from typing import DefaultDict
from typing import Dict
from typing import Iterable
@ -471,8 +472,7 @@ class ApacheConfigurator(common.Configurator):
"""Initializes the ApacheParser"""
# If user provided vhost_root value in command line, use it
return parser.ApacheParser(
self.options.server_root, self.conf("vhost-root"),
self.version, configurator=self)
self.options.server_root, self, self.conf("vhost-root"), version=self.version)
def get_parsernode_root(self, metadata: Dict[str, Any]) -> dualparser.DualBlockNode:
"""Initializes the ParserNode parser root instance."""
@ -560,14 +560,14 @@ class ApacheConfigurator(common.Configurator):
return list(matched)
def _raise_no_suitable_vhost_error(self, target_name: str) -> None:
def _generate_no_suitable_vhost_error(self, target_name: str) -> errors.PluginError:
"""
Notifies the user that Certbot could not find a vhost to secure
and raises an error.
:param str target_name: The server name that could not be mapped
:raises errors.PluginError: Raised unconditionally
"""
raise errors.PluginError(
return errors.PluginError(
"Certbot could not find a VirtualHost for {0} in the Apache "
"configuration. Please create a VirtualHost with a ServerName "
"matching {0} and try again.".format(target_name)
@ -611,7 +611,7 @@ class ApacheConfigurator(common.Configurator):
dialog_output = display_ops.select_vhost_multiple(list(dialog_input))
if not dialog_output:
self._raise_no_suitable_vhost_error(domain)
raise self._generate_no_suitable_vhost_error(domain)
# Make sure we create SSL vhosts for the ones that are HTTP only
# if requested.
@ -734,11 +734,12 @@ class ApacheConfigurator(common.Configurator):
# to get created if a non-ssl vhost is selected.
return self._choose_vhost_from_list(target_name, temp=not create_if_no_ssl)
def _choose_vhost_from_list(self, target_name: str, temp: bool = False) -> obj.VirtualHost:
def _choose_vhost_from_list(self, target_name: str,
temp: bool = False) -> obj.VirtualHost:
# Select a vhost from a list
vhost = display_ops.select_vhost(target_name, self.vhosts)
if vhost is None:
self._raise_no_suitable_vhost_error(target_name)
raise self._generate_no_suitable_vhost_error(target_name)
if temp:
return vhost
if not vhost.ssl:
@ -922,7 +923,7 @@ class ApacheConfigurator(common.Configurator):
return ""
def _get_vhost_names(self, path: Optional[str]) -> Tuple[str, List[str]]:
def _get_vhost_names(self, path: Optional[str]) -> Tuple[Optional[str], List[Optional[str]]]:
"""Helper method for getting the ServerName and
ServerAlias values from vhost in path
@ -959,7 +960,7 @@ class ApacheConfigurator(common.Configurator):
servername, serveraliases = self._get_vhost_names(host.path)
for alias in serveraliases:
if not host.modmacro:
if not host.modmacro and alias is not None:
host.aliases.add(alias)
if not host.modmacro:
@ -974,14 +975,16 @@ class ApacheConfigurator(common.Configurator):
:rtype: :class:`~certbot_apache._internal.obj.VirtualHost`
"""
addrs: Set[Addr] = set()
addrs: Set[obj.Addr] = set()
try:
args = self.parser.aug.match(path + "/arg")
except RuntimeError:
logger.warning("Encountered a problem while parsing file: %s, skipping", path)
return None
for arg in args:
addrs.add(obj.Addr.fromstring(self.parser.get_arg(arg)))
arg_value = self.parser.get_arg(arg)
if arg_value is not None:
addrs.add(obj.Addr.fromstring(arg_value))
is_ssl = False
if self.parser.find_dir("SSLEngine", "on", start=path, exclude=False):
@ -1134,8 +1137,11 @@ class ApacheConfigurator(common.Configurator):
# Check if the VirtualHost is contained in a mod_macro block
if node.find_ancestors("Macro"):
macro = True
# VirtualHost V2 is part of migration to the pure-Python Apache parser project. It is not
# used on production as of now.
# TODO: Use a meaning full value for augeas path instead of an empty string
vhost = obj.VirtualHost(
node.filepath, None, addrs, is_ssl, enabled, modmacro=macro, node=node
node.filepath, "", addrs, is_ssl, enabled, modmacro=macro, node=node
)
self._populate_vhost_names_v2(vhost)
return vhost
@ -1231,8 +1237,8 @@ class ApacheConfigurator(common.Configurator):
# Check for Listen <port>
# Note: This could be made to also look for ip:443 combo
listens = [self.parser.get_arg(x).split()[0] for
x in self.parser.find_dir("Listen")]
directives = [self.parser.get_arg(x) for x in self.parser.find_dir("Listen")]
listens = [directive.split()[0] for directive in directives if directive]
# Listen already in place
if self._has_port_already(listens, port):
@ -1421,7 +1427,10 @@ class ApacheConfigurator(common.Configurator):
# We know the length is one because of the assertion above
# Create the Vhost object
ssl_vhost: obj.VirtualHost = self._create_vhost(vh_p)
vhost = self._create_vhost(vh_p)
if not vhost:
raise errors.Error("Could not create a vhost")
ssl_vhost: obj.VirtualHost = vhost
ssl_vhost.ancestor = nonssl_vhost
self.vhosts.append(ssl_vhost)
@ -1716,7 +1725,7 @@ class ApacheConfigurator(common.Configurator):
self.parser.add_dir(vh_path, "ServerAlias", target_name)
self._add_servernames(vhost)
def _has_matching_wildcard(self, vh_path: Optional[str], target_name: Optional[str]) -> bool:
def _has_matching_wildcard(self, vh_path: str, target_name: str) -> bool:
"""Is target_name already included in a wildcard in the vhost?
:param str vh_path: Augeas path to the vhost
@ -1766,14 +1775,14 @@ class ApacheConfigurator(common.Configurator):
if need_to_save:
self.save()
def find_vhost_by_id(self, id_str: str) -> Optional[obj.VirtualHost]:
def find_vhost_by_id(self, id_str: str) -> obj.VirtualHost:
"""
Searches through VirtualHosts and tries to match the id in a comment
:param str id_str: Id string for matching
:returns: The matched VirtualHost or None
:rtype: :class:`~certbot_apache._internal.obj.VirtualHost` or None
:returns: The matched VirtualHost
:rtype: :class:`~certbot_apache._internal.obj.VirtualHost`
:raises .errors.PluginError: If no VirtualHost is found
"""
@ -1802,8 +1811,9 @@ class ApacheConfigurator(common.Configurator):
id_comment = self.parser.find_comments(search_comment, vhost.path)
if id_comment:
# Use the first value, multiple ones shouldn't exist
comment: str = self.parser.get_arg(id_comment[0])
return comment.split(" ")[-1]
comment = self.parser.get_arg(id_comment[0])
if comment is not None:
return comment.split(" ")[-1]
return None
def add_vhost_id(self, vhost: obj.VirtualHost) -> Optional[str]:
@ -1888,7 +1898,7 @@ class ApacheConfigurator(common.Configurator):
raise
def _autohsts_increase(
self, vhost: obj.VirtualHost, id_str: str, nextstep: Union[int, float]
self, vhost: obj.VirtualHost, id_str: str, nextstep: int
) -> None:
"""Increase the AutoHSTS max-age value
@ -2262,9 +2272,11 @@ class ApacheConfigurator(common.Configurator):
self.parser.aug.load()
# Make a new vhost data structure and add it to the lists
new_vhost: obj.VirtualHost = self._create_vhost(parser.get_aug_path(
new_vhost = self._create_vhost(parser.get_aug_path(
self._escape(redirect_filepath))
)
if new_vhost is None:
raise errors.Error("Could not create a new vhost")
self.vhosts.append(new_vhost)
self._enhanced_vhosts["redirect"].add(new_vhost)
@ -2526,7 +2538,7 @@ class ApacheConfigurator(common.Configurator):
"""
self._chall_out.update(achalls)
responses = [None] * len(achalls)
responses: List[Optional[Challenge]] = [None] * len(achalls)
http_doer = http_01.ApacheHttp01(self)
for i, achall in enumerate(achalls):
@ -2548,11 +2560,12 @@ class ApacheConfigurator(common.Configurator):
self._update_responses(responses, http_response, http_doer)
return responses
# We assume all challenges has been fulfilled as described in the function documentation.
return cast(List[Challenge], responses)
def _update_responses(
self,
responses: Iterable[challenges.HTTP01Response],
responses: List[Optional[challenges.HTTP01Response]],
chall_response: List[Challenge],
chall_doer: http_01.ApacheHttp01
) -> None:
@ -2653,6 +2666,8 @@ class ApacheConfigurator(common.Configurator):
# Add ID to the VirtualHost for mapping back to it later
uniq_id = self.add_vhost_id(ssl_vhost)
if uniq_id is None:
raise errors.Error("Could not generate a unique id")
self.save_notes += "Adding unique ID {0} to VirtualHost in {1}\n".format(
uniq_id, ssl_vhost.filep)
# Add the actual HSTS header
@ -2684,7 +2699,7 @@ class ApacheConfigurator(common.Configurator):
if config["timestamp"] + constants.AUTOHSTS_FREQ > curtime:
# Skip if last increase was < AUTOHSTS_FREQ ago
continue
nextstep = config["laststep"] + 1
nextstep = cast(int, config["laststep"]) + 1
if nextstep < len(constants.AUTOHSTS_STEPS):
# If installer hasn't been prepared yet, do it now
if not self._prepared:

View file

@ -133,8 +133,8 @@ class DualDirectiveNode(DualNodeBase):
kwargs.setdefault("primary", None)
kwargs.setdefault("secondary", None)
primary: Optional[DualDirectiveNode] = kwargs.pop("primary")
secondary: Optional[apacheparser.ApacheDirectiveNode] = kwargs.pop("secondary")
primary = kwargs.pop("primary")
secondary = kwargs.pop("secondary")
if primary or secondary:
assert primary and secondary
@ -282,7 +282,7 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualDirectiveNode, "find_directives", name,
exclude=exclude)
def find_comments(self, comment: DualCommentNode) -> Sequence[apacheparser.ApacheParserNode]:
def find_comments(self, comment: str) -> Sequence[apacheparser.ApacheParserNode]:
"""
Performs a search for CommentNodes using both implementations and
checks the results. This is built upon the assumption that unimplemented

View file

@ -3,6 +3,7 @@ import logging
from typing import Any
from typing import cast
from typing import List
from typing import Optional
from certbot import errors
from certbot import util
@ -76,8 +77,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
def get_parser(self) -> "CentOSParser":
"""Initializes the ApacheParser"""
return CentOSParser(
self.options.server_root, self.options.vhost_root,
self.version, configurator=self)
self.options.server_root, self, self.options.vhost_root, self.version)
def _deploy_cert(self, *args: Any, **kwargs: Any): # pylint: disable=arguments-differ
"""
@ -111,7 +111,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
"use, and run Certbot again.")
raise MisconfigurationError(msg)
else:
loadmod_args = path_args
loadmod_args = [arg for arg in path_args if arg]
centos_parser: CentOSParser = cast(CentOSParser, self.parser)
if centos_parser.not_modssl_ifmodule(noarg_path):

View file

@ -38,8 +38,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
def get_parser(self) -> "FedoraParser":
"""Initializes the ApacheParser"""
return FedoraParser(
self.options.server_root, self.options.vhost_root,
self.version, configurator=self)
self.options.server_root, self, self.options.vhost_root, self.version)
def _try_restart_fedora(self) -> None:
"""

View file

@ -31,8 +31,7 @@ class GentooConfigurator(configurator.ApacheConfigurator):
def get_parser(self) -> "GentooParser":
"""Initializes the ApacheParser"""
return GentooParser(
self.options.server_root, self.options.vhost_root,
self.version, configurator=self)
self.options.server_root, self, self.options.vhost_root, self.version)
class GentooParser(parser.ApacheParser):

View file

@ -47,9 +47,8 @@ class ApacheParser:
arg_var_interpreter: Pattern = re.compile(r"\$\{[^ \}]*}")
fnmatch_chars: Set[str] = {"*", "?", "\\", "[", "]"}
def __init__(self, root: str, vhostroot: Optional[str] = None,
version: Tuple[int, ...] = (2, 4),
configurator: Optional["ApacheConfigurator"] = None) -> None:
def __init__(self, root: str, configurator: "ApacheConfigurator",
vhostroot: str, version: Tuple[int, ...] = (2, 4)) -> None:
# Note: Order is important here.
# Needed for calling save() with reverter functionality that resides in
@ -445,7 +444,8 @@ class ApacheParser:
else:
self.aug.set(aug_conf_path + "/directive[last()]/arg", args)
def add_dir_beginning(self, aug_conf_path: str, dirname: str, args: List[str]) -> None:
def add_dir_beginning(self, aug_conf_path: str, dirname: str,
args: Union[List[str], str]) -> None:
"""Adds the directive to the beginning of defined aug_conf_path.
:param str aug_conf_path: Augeas configuration path to add directive
@ -590,7 +590,7 @@ class ApacheParser:
allargs = self.aug.match(match + '*')
return [self.get_arg(arg) for arg in allargs]
def get_arg(self, match: Optional[str]) -> str:
def get_arg(self, match: Optional[str]) -> Optional[str]:
"""Uses augeas.get to get argument value and interprets result.
This also converts all variables and parameters appropriately.
@ -605,6 +605,7 @@ class ApacheParser:
# e.g. strip now, not later
if not value:
return None
value = value.strip("'\"")
variables = ApacheParser.arg_var_interpreter.findall(value)
@ -624,7 +625,7 @@ class ApacheParser:
"""
return get_aug_path(self.loc["root"])
def exclude_dirs(self, matches: Set[str]) -> List[str]:
def exclude_dirs(self, matches: Iterable[str]) -> List[str]:
"""Exclude directives that are not loaded into the configuration."""
filters = [("ifmodule", self.modules.keys()), ("ifdefine", self.variables)]
@ -686,7 +687,7 @@ class ApacheParser:
arg = os.path.normpath(arg)
return arg
def _get_include_path(self, arg: str) -> str:
def _get_include_path(self, arg: Optional[str]) -> Optional[str]:
"""Converts an Apache Include directive into Augeas path.
Converts an Apache Include directive argument into an Augeas
@ -706,6 +707,8 @@ class ApacheParser:
# if matchObj.group() != arg:
# logger.error("Error: Invalid regexp characters in %s", arg)
# return []
if arg is None:
return None
arg = self.standard_path_from_server_root(arg)
# Attempts to add a transform to the file if one does not already exist
@ -774,7 +777,7 @@ class ApacheParser:
self._add_httpd_transform(filepath)
self.aug.load()
def parsed_in_current(self, filep: str) -> bool:
def parsed_in_current(self, filep: Optional[str]) -> bool:
"""Checks if the file path is parsed by current Augeas parser config
ie. returns True if the file is found on a path that's found in live
Augeas configuration.
@ -784,9 +787,11 @@ class ApacheParser:
:returns: True if file is parsed in existing configuration tree
:rtype: bool
"""
if not filep:
return False
return self._parsed_by_parser_paths(filep, self.parser_paths)
def parsed_in_original(self, filep: str) -> bool:
def parsed_in_original(self, filep: Optional[str]) -> bool:
"""Checks if the file path is parsed by existing Apache config.
ie. returns True if the file is found on a path that matches Include or
IncludeOptional statement in the Apache configuration.
@ -796,6 +801,8 @@ class ApacheParser:
:returns: True if file is parsed in existing configuration tree
:rtype: bool
"""
if not filep:
return False
return self._parsed_by_parser_paths(filep, self.existing_paths)
def _parsed_by_parser_paths(self, filep: str, paths: Mapping[str, List[str]]) -> bool:

View file

@ -345,8 +345,8 @@ class ParserInitTest(util.ApacheTest):
self.config.config_test = mock.Mock()
self.assertRaises(
errors.NoInstallationError, ApacheParser,
os.path.relpath(self.config_path), "/dummy/vhostpath",
version=(2, 4, 22), configurator=self.config)
os.path.relpath(self.config_path), self.config,
"/dummy/vhostpath", version=(2, 4, 22))
def test_init_old_aug(self):
from certbot_apache._internal.parser import ApacheParser
@ -354,8 +354,8 @@ class ParserInitTest(util.ApacheTest):
mock_c.return_value = False
self.assertRaises(
errors.NotSupportedError,
ApacheParser, os.path.relpath(self.config_path),
"/dummy/vhostpath", version=(2, 4, 22), configurator=self.config)
ApacheParser, os.path.relpath(self.config_path), self.config,
"/dummy/vhostpath", version=(2, 4, 22))
@mock.patch("certbot_apache._internal.apache_util._get_runtime_cfg")
def test_unparseable(self, mock_cfg):
@ -363,8 +363,8 @@ class ParserInitTest(util.ApacheTest):
mock_cfg.return_value = ('Define: TEST')
self.assertRaises(
errors.PluginError,
ApacheParser, os.path.relpath(self.config_path),
"/dummy/vhostpath", version=(2, 2, 22), configurator=self.config)
ApacheParser, os.path.relpath(self.config_path), self.config,
"/dummy/vhostpath", version=(2, 2, 22))
def test_root_normalized(self):
from certbot_apache._internal.parser import ApacheParser
@ -375,7 +375,7 @@ class ParserInitTest(util.ApacheTest):
self.temp_dir,
"debian_apache_2_4/////multiple_vhosts/../multiple_vhosts/apache2")
parser = ApacheParser(path, "/dummy/vhostpath", configurator=self.config)
parser = ApacheParser(path, self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)
@ -384,8 +384,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
os.path.relpath(self.config_path),
"/dummy/vhostpath", configurator=self.config)
os.path.relpath(self.config_path), self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)
@ -394,8 +393,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
self.config_path + os.path.sep,
"/dummy/vhostpath", configurator=self.config)
self.config_path + os.path.sep, self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)

View file

@ -73,7 +73,7 @@ class ParserTest(ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
self.parser = ApacheParser(
self.config_path, self.vhost_path, configurator=self.config)
self.config_path, self.config, self.vhost_path)
def get_apache_configurator(

View file

@ -12,6 +12,8 @@ from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Type
from typing import TypeVar
from typing import Tuple
import pkg_resources
@ -244,6 +246,9 @@ class Configurator(Installer, interfaces.Authenticator, metaclass=ABCMeta):
"""
GenericAddr = TypeVar("GenericAddr", bound="Addr")
class Addr:
r"""Represents an virtual host address.
@ -256,7 +261,7 @@ class Addr:
self.ipv6 = ipv6
@classmethod
def fromstring(cls, str_addr: str) -> 'Addr':
def fromstring(cls: Type[GenericAddr], str_addr: str) -> GenericAddr:
"""Initialize Addr from string."""
if str_addr.startswith('['):
# ipv6 addresses starts with [
@ -301,7 +306,7 @@ class Addr:
"""Return port."""
return self.tup[1]
def get_addr_obj(self, port: str) -> "Addr":
def get_addr_obj(self: GenericAddr, port: str) -> GenericAddr:
"""Return new address object with same addr and new port."""
return self.__class__((self.tup[0], port), self.ipv6)