Fixing some errors.

This commit is contained in:
Mads Jensen 2021-12-30 21:42:48 +01:00
parent 2c496138bc
commit b1c5362929
5 changed files with 21 additions and 23 deletions

View file

@ -242,7 +242,7 @@ class AugeasDirectiveNode(AugeasParserNode):
"""
return tuple(self._aug_get_params(self.metadata["augeaspath"]))
def _aug_get_params(self, path: str) -> List[str]:
def _aug_get_params(self, path: str) -> List[Optional[str]]:
"""Helper function to get parameters for DirectiveNodes and BlockNodes"""
arg_paths = self.parser.aug.match(path + "/arg")

View file

@ -31,7 +31,6 @@ from certbot.compat import os
from certbot.display import util as display_util
from certbot.interfaces import RenewableCert
from certbot.plugins import common
from certbot.plugins.common import Addr
from certbot.plugins.enhancements import AutoHSTSEnhancement
from certbot.plugins.util import path_surgery
from certbot_apache._internal import apache_util
@ -258,7 +257,6 @@ class ApacheConfigurator(common.Configurator):
# These will be set in the prepare function
self._prepared: bool = False
self.parser_root: Optional[dualparser.DualBlockNode] = None
self.parser: parser.ApacheParser
self.version = version
self._openssl_version = openssl_version
self.vhosts: List[obj.VirtualHost]
@ -628,8 +626,7 @@ class ApacheConfigurator(common.Configurator):
def _deploy_cert(
self, vhost: obj.VirtualHost, cert_path: str, key_path: str,
chain_path: str,
fullchain_path: str
chain_path: Optional[str] = None, fullchain_path: Optional[str] = None,
) -> None:
"""
Helper function for deploy_cert() that handles the actual deployment
@ -904,7 +901,7 @@ class ApacheConfigurator(common.Configurator):
return util.get_filtered_names(all_names)
def get_name_from_ip(self, addr: Addr) -> str:
def get_name_from_ip(self, addr: obj.Addr) -> str:
"""Returns a reverse dns name if available.
:param addr: IP Address
@ -976,7 +973,7 @@ class ApacheConfigurator(common.Configurator):
:rtype: :class:`~certbot_apache._internal.obj.VirtualHost`
"""
addrs: Set[Addr] = set()
addrs: Set[obj.Addr] = set()
try:
args = self.parser.aug.match(path + "/arg")
except RuntimeError:
@ -1658,8 +1655,8 @@ class ApacheConfigurator(common.Configurator):
vh_contents[content_index] = line[:line_index]
break
def _update_ssl_vhosts_addrs(self, vh_path: str) -> Set[Addr]:
ssl_addrs: Set[Addr] = set()
def _update_ssl_vhosts_addrs(self, vh_path: str) -> Set[obj.Addr]:
ssl_addrs: Set[obj.Addr] = set()
ssl_addr_p: List[str] = self.parser.aug.match(vh_path + "/arg")
for addr in ssl_addr_p:
@ -1890,7 +1887,7 @@ class ApacheConfigurator(common.Configurator):
raise
def _autohsts_increase(
self, vhost: Optional[obj.VirtualHost], id_str: str, nextstep: Union[int, float]
self, vhost: obj.VirtualHost, id_str: str, nextstep: Union[int, float]
) -> None:
"""Increase the AutoHSTS max-age value
@ -2236,15 +2233,14 @@ class ApacheConfigurator(common.Configurator):
"RewriteRule", None, start=vhost.path)
return bool(rewrite_path)
def _is_rewrite_engine_on(self, vhost: obj.VirtualHost) -> Union[str, bool]:
def _is_rewrite_engine_on(self, vhost: obj.VirtualHost) -> Optional[Union[str, bool]]:
"""Checks if a RewriteEngine directive is on
:param vhost: vhost to check
:type vhost: :class:`~certbot_apache._internal.obj.VirtualHost`
"""
rewrite_engine_path_list = self.parser.find_dir("RewriteEngine", "on",
start=vhost.path)
rewrite_engine_path_list = self.parser.find_dir("RewriteEngine", "on", start=vhost.path)
if rewrite_engine_path_list:
for re_path in rewrite_engine_path_list:
# A RewriteEngine directive may also be included in per
@ -2265,7 +2261,9 @@ class ApacheConfigurator(common.Configurator):
self.parser.aug.load()
# Make a new vhost data structure and add it to the lists
new_vhost = self._create_vhost(parser.get_aug_path(self._escape(redirect_filepath)))
new_vhost: obj.VirtualHost = self._create_vhost(parser.get_aug_path(
self._escape(redirect_filepath))
)
self.vhosts.append(new_vhost)
self._enhanced_vhosts["redirect"].add(new_vhost)
@ -2354,7 +2352,7 @@ class ApacheConfigurator(common.Configurator):
return None
def _get_proposed_addrs(self, vhost: obj.VirtualHost, port: str = "80") -> Iterable[Addr]:
def _get_proposed_addrs(self, vhost: obj.VirtualHost, port: str = "80") -> Iterable[obj.Addr]:
"""Return all addrs of vhost with the port replaced with the specified.
:param obj.VirtualHost ssl_vhost: Original Vhost
@ -2553,7 +2551,7 @@ class ApacheConfigurator(common.Configurator):
def _update_responses(
self,
responses: List[Optional[challenges.HTTP01Response]],
responses: Sequence[challenges.HTTP01Response],
chall_response: List[Challenge],
chall_doer: http_01.ApacheHttp01
) -> None:

View file

@ -3,6 +3,8 @@ import errno
import logging
from typing import Any
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import TYPE_CHECKING
@ -67,7 +69,7 @@ class ApacheHttp01(common.ChallengePerformer):
"http_challenges")
self.moded_vhosts: Set[VirtualHost] = set()
def perform(self) -> List[HTTP01Response]:
def perform(self) -> Sequence[Optional[HTTP01Response]]:
"""Perform all HTTP-01 challenges."""
if not self.achalls:
return []

View file

@ -1,6 +1,5 @@
"""Module contains classes used by the Apache Configurator."""
import re
from typing import AbstractSet
from typing import Any
from typing import Iterable
from typing import Optional
@ -127,7 +126,7 @@ class VirtualHost:
strip_name: Pattern = re.compile(r"^(?:.+://)?([^ :$]*)")
def __init__(
self, filepath: str, path: str, addrs: Set["Addr"],
self, filepath: str, path: Optional[str], addrs: Set[Addr],
ssl: bool, enabled: bool, name: Optional[str] = None,
aliases: Optional[Set[str]] = None, modmacro: bool = False,
ancestor: Optional["VirtualHost"] = None, node = None):

View file

@ -3,6 +3,7 @@ import copy
import fnmatch
import logging
import re
from typing import Collection
from typing import Dict
from typing import Iterable
from typing import KeysView
@ -436,7 +437,7 @@ class ApacheParser:
:type args: list or str
"""
self.aug.set(aug_conf_path + "/directive[last() + 1]", directive)
self.aug.set(aug_conf_path or "" + "/directive[last() + 1]", directive)
if isinstance(args, list):
for i, value in enumerate(args, 1):
self.aug.set(
@ -637,9 +638,7 @@ class ApacheParser:
valid_matches.append(match)
return valid_matches
def _pass_filter(
self, match: str, filter_: Union[Tuple[str, KeysView[str]], Tuple[str, Dict[str, str]]]
) -> bool:
def _pass_filter(self, match: str, filter_: Tuple[str, Collection[str]]) -> bool:
"""Determine if directive passes a filter.
:param str match: Augeas path