mirror of
https://github.com/certbot/certbot.git
synced 2026-06-08 16:22:18 -04:00
fix stuff and make changes to help monkeytype
This commit is contained in:
parent
5de37679ff
commit
13a61542fc
7 changed files with 68 additions and 82 deletions
|
|
@ -8,6 +8,7 @@ from unittest import mock
|
|||
import pytest
|
||||
|
||||
from certbot import errors
|
||||
from certbot._internal import lock
|
||||
from certbot.compat import os
|
||||
from certbot.tests import util as test_util
|
||||
|
||||
|
|
@ -23,32 +24,22 @@ else:
|
|||
|
||||
class LockDirTest(test_util.TempDirTestCase):
|
||||
"""Tests for certbot._internal.lock.lock_dir."""
|
||||
@classmethod
|
||||
def _call(cls, *args, **kwargs):
|
||||
from certbot._internal.lock import lock_dir
|
||||
return lock_dir(*args, **kwargs)
|
||||
|
||||
def test_it(self) -> None:
|
||||
assert_raises = functools.partial(
|
||||
self.assertRaises, errors.LockError, self._call, self.tempdir)
|
||||
self.assertRaises, errors.LockError, lock.lock_dir, self.tempdir)
|
||||
lock_path = os.path.join(self.tempdir, '.certbot.lock')
|
||||
test_util.lock_and_call(assert_raises, lock_path)
|
||||
|
||||
|
||||
class LockFileTest(test_util.TempDirTestCase):
|
||||
"""Tests for certbot._internal.lock.LockFile."""
|
||||
@classmethod
|
||||
def _call(cls, *args, **kwargs) -> LockFile:
|
||||
from certbot._internal.lock import LockFile
|
||||
return LockFile(*args, **kwargs)
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.lock_path = os.path.join(self.tempdir, 'test.lock')
|
||||
|
||||
def test_acquire_without_deletion(self) -> None:
|
||||
# acquire the lock in another process but don't delete the file
|
||||
child = multiprocessing.Process(target=self._call,
|
||||
child = multiprocessing.Process(target=lock.LockFile,
|
||||
args=(self.lock_path,))
|
||||
child.start()
|
||||
child.join()
|
||||
|
|
@ -60,11 +51,11 @@ class LockFileTest(test_util.TempDirTestCase):
|
|||
|
||||
def test_contention(self) -> None:
|
||||
assert_raises = functools.partial(
|
||||
self.assertRaises, errors.LockError, self._call, self.lock_path)
|
||||
self.assertRaises, errors.LockError, lock.LockFile, self.lock_path)
|
||||
test_util.lock_and_call(assert_raises, self.lock_path)
|
||||
|
||||
def test_locked_repr(self) -> None:
|
||||
lock_file = self._call(self.lock_path)
|
||||
lock_file = lock.LockFile(self.lock_path)
|
||||
try:
|
||||
locked_repr = repr(lock_file)
|
||||
self._test_repr_common(lock_file, locked_repr)
|
||||
|
|
@ -73,13 +64,13 @@ class LockFileTest(test_util.TempDirTestCase):
|
|||
lock_file.release()
|
||||
|
||||
def test_released_repr(self) -> None:
|
||||
lock_file = self._call(self.lock_path)
|
||||
lock_file = lock.LockFile(self.lock_path)
|
||||
lock_file.release()
|
||||
released_repr = repr(lock_file)
|
||||
self._test_repr_common(lock_file, released_repr)
|
||||
assert 'released' in released_repr
|
||||
|
||||
def _test_repr_common(self, lock_file: LockFile, lock_repr: str) -> None:
|
||||
def _test_repr_common(self, lock_file: lock.LockFile, lock_repr: str) -> None:
|
||||
assert lock_file.__class__.__name__ in lock_repr
|
||||
assert self.lock_path in lock_repr
|
||||
|
||||
|
|
@ -100,11 +91,11 @@ class LockFileTest(test_util.TempDirTestCase):
|
|||
|
||||
with mock.patch('certbot._internal.lock.filesystem.os.stat') as mock_stat:
|
||||
mock_stat.side_effect = delete_and_stat
|
||||
self._call(self.lock_path)
|
||||
lock.LockFile(self.lock_path)
|
||||
assert len(should_delete) == 0
|
||||
|
||||
def test_removed(self) -> None:
|
||||
lock_file = self._call(self.lock_path)
|
||||
lock_file = lock.LockFile(self.lock_path)
|
||||
lock_file.release()
|
||||
assert not os.path.exists(self.lock_path)
|
||||
|
||||
|
|
@ -117,7 +108,7 @@ class LockFileTest(test_util.TempDirTestCase):
|
|||
with mock.patch(mocked_function) as mock_lock:
|
||||
mock_lock.side_effect = IOError(msg)
|
||||
try:
|
||||
self._call(self.lock_path)
|
||||
lock.LockFile(self.lock_path)
|
||||
except IOError as err:
|
||||
assert msg in str(err)
|
||||
else: # pragma: no cover
|
||||
|
|
@ -133,7 +124,7 @@ class LockFileTest(test_util.TempDirTestCase):
|
|||
with mock.patch(mock_function) as mock_os:
|
||||
mock_os.side_effect = OSError(msg)
|
||||
try:
|
||||
self._call(self.lock_path)
|
||||
lock.LockFile(self.lock_path)
|
||||
except OSError as err:
|
||||
assert msg in str(err)
|
||||
else: # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -361,7 +361,7 @@ class PostArgParseExceptHookTest(unittest.TestCase):
|
|||
mock_logger, output = self._test_common(exc_type, debug=False)
|
||||
mock_logger.error.assert_called_once_with('Exiting due to user request.')
|
||||
|
||||
def _test_common(self, error_type: Union[Type[ValueError], Type[BaseException], Type[PluginError], Type[KeyboardInterrupt], Callable], debug: bool, quiet: bool=False) -> Tuple[MagicMock, str]:
|
||||
def _test_common(self, error_type: Union[Type[BaseException], Callable], debug: bool, quiet: bool=False) -> Tuple[MagicMock, str]:
|
||||
"""Returns the mocked logger and stderr output."""
|
||||
mock_err = io.StringIO()
|
||||
|
||||
|
|
@ -387,7 +387,7 @@ class PostArgParseExceptHookTest(unittest.TestCase):
|
|||
output = mock_err.getvalue()
|
||||
return mock_logger, output
|
||||
|
||||
def _assert_exception_logged(self, log_func: MagicMock, exc_type: Union[Type[Error], Type[ValueError], Type[PluginError], Type[BaseException]]) -> None:
|
||||
def _assert_exception_logged(self, log_func: MagicMock, exc_type: Type[BaseException]) -> None:
|
||||
assert log_func.called
|
||||
call_kwargs = log_func.call_args[1]
|
||||
assert 'exc_info' in call_kwargs
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import josepy as jose
|
|||
import pytest
|
||||
import pytz
|
||||
|
||||
from acme.messages import Error as acme_error
|
||||
#from acme.messages import Error as acme_error
|
||||
from certbot import configuration
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
|
|
@ -888,16 +888,16 @@ class DetermineAccountTest(test_util.ConfigTestCase):
|
|||
err_msg = "Some error message raised by Certbot"
|
||||
self._register_error_common(err_msg, errors.Error(err_msg))
|
||||
|
||||
def test_register_error_acme_type_and_detail(self):
|
||||
err_msg = ("Error returned by the ACME server: must agree to terms of service")
|
||||
exception = acme_error(typ = "urn:ietf:params:acme:error:malformed",
|
||||
detail = "must agree to terms of service")
|
||||
self._register_error_common(err_msg, exception)
|
||||
#def test_register_error_acme_type_and_detail(self):
|
||||
# err_msg = ("Error returned by the ACME server: must agree to terms of service")
|
||||
# exception = acme_error(typ = "urn:ietf:params:acme:error:malformed",
|
||||
# detail = "must agree to terms of service")
|
||||
# self._register_error_common(err_msg, exception)
|
||||
|
||||
def test_register_error_acme_type_only(self):
|
||||
err_msg = ("Error returned by the ACME server: The server experienced an internal error")
|
||||
exception = acme_error(typ = "urn:ietf:params:acme:error:serverInternal")
|
||||
self._register_error_common(err_msg, exception)
|
||||
#def test_register_error_acme_type_only(self):
|
||||
# err_msg = ("Error returned by the ACME server: The server experienced an internal error")
|
||||
# exception = acme_error(typ = "urn:ietf:params:acme:error:serverInternal")
|
||||
# self._register_error_common(err_msg, exception)
|
||||
|
||||
|
||||
class MainTest(test_util.ConfigTestCase):
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import pytest
|
|||
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot._internal.plugins import disco
|
||||
from certbot._internal.plugins import null
|
||||
from certbot._internal.plugins import standalone
|
||||
from certbot._internal.plugins import webroot
|
||||
|
|
@ -41,12 +42,9 @@ class PluginEntryPointTest(unittest.TestCase):
|
|||
self.ep3 = pkg_resources.EntryPoint(
|
||||
"ep3", "a.ep3", dist=mock.MagicMock(key="p3"))
|
||||
|
||||
from certbot._internal.plugins.disco import PluginEntryPoint
|
||||
self.plugin_ep = PluginEntryPoint(EP_SA)
|
||||
self.plugin_ep = disco.PluginEntryPoint(EP_SA)
|
||||
|
||||
def test_entry_point_to_plugin_name_not_prefixed(self) -> None:
|
||||
from certbot._internal.plugins.disco import PluginEntryPoint
|
||||
|
||||
names = {
|
||||
self.ep1: "ep1",
|
||||
self.ep1prim: "ep1",
|
||||
|
|
@ -56,7 +54,7 @@ class PluginEntryPointTest(unittest.TestCase):
|
|||
}
|
||||
|
||||
for entry_point, name in names.items():
|
||||
assert name == PluginEntryPoint.entry_point_to_plugin_name(entry_point)
|
||||
assert name == disco.PluginEntryPoint.entry_point_to_plugin_name(entry_point)
|
||||
|
||||
def test_description(self) -> None:
|
||||
assert "server locally" in self.plugin_ep.description
|
||||
|
|
@ -162,22 +160,16 @@ class PluginEntryPointTest(unittest.TestCase):
|
|||
class PluginsRegistryTest(unittest.TestCase):
|
||||
"""Tests for certbot._internal.plugins.disco.PluginsRegistry."""
|
||||
|
||||
@classmethod
|
||||
def _create_new_registry(cls, plugins: Dict[str, Union[MagicMock, str]]) -> PluginsRegistry:
|
||||
from certbot._internal.plugins.disco import PluginsRegistry
|
||||
return PluginsRegistry(plugins)
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.plugin_ep = mock.MagicMock()
|
||||
self.plugin_ep.name = "mock"
|
||||
self.plugin_ep.__hash__.side_effect = TypeError
|
||||
self.plugins = {self.plugin_ep.name: self.plugin_ep}
|
||||
self.reg = self._create_new_registry(self.plugins)
|
||||
self.reg = disco.PluginsRegistry(self.plugins)
|
||||
self.ep1 = pkg_resources.EntryPoint(
|
||||
"ep1", "p1.ep1", dist=mock.MagicMock(key="p1"))
|
||||
|
||||
def test_find_all(self) -> None:
|
||||
from certbot._internal.plugins.disco import PluginsRegistry
|
||||
with mock.patch("certbot._internal.plugins.disco.pkg_resources") as mock_pkg:
|
||||
mock_pkg.iter_entry_points.side_effect = [
|
||||
iter([EP_SA]), iter([EP_WR, self.ep1])
|
||||
|
|
@ -186,7 +178,7 @@ class PluginsRegistryTest(unittest.TestCase):
|
|||
mock_load.side_effect = [
|
||||
standalone.Authenticator, webroot.Authenticator,
|
||||
null.Installer, null.Installer]
|
||||
plugins = PluginsRegistry.find_all()
|
||||
plugins = disco.PluginsRegistry.find_all()
|
||||
assert plugins["sa"].plugin_cls is standalone.Authenticator
|
||||
assert plugins["sa"].entry_point is EP_SA
|
||||
assert plugins["wr"].plugin_cls is webroot.Authenticator
|
||||
|
|
@ -202,7 +194,7 @@ class PluginsRegistryTest(unittest.TestCase):
|
|||
assert ["mock"] == list(self.reg)
|
||||
|
||||
def test_len(self) -> None:
|
||||
assert 0 == len(self._create_new_registry({}))
|
||||
assert 0 == len(disco.PluginsRegistry({}))
|
||||
assert 1 == len(self.reg)
|
||||
|
||||
def test_init(self) -> None:
|
||||
|
|
@ -233,7 +225,7 @@ class PluginsRegistryTest(unittest.TestCase):
|
|||
c: mock.MagicMock(prepare=functools.partial(order.append, c))
|
||||
for c in string.ascii_letters
|
||||
}
|
||||
reg = self._create_new_registry(plugins)
|
||||
reg = disco.PluginsRegistry(plugins)
|
||||
reg.prepare()
|
||||
# order of prepare calls must be sorted to prevent deadlock
|
||||
# caused by plugins acquiring locks during prepare
|
||||
|
|
@ -257,11 +249,11 @@ class PluginsRegistryTest(unittest.TestCase):
|
|||
repr(self.reg)
|
||||
|
||||
def test_str(self) -> None:
|
||||
assert "No plugins" == str(self._create_new_registry({}))
|
||||
assert "No plugins" == str(disco.PluginsRegistry({}))
|
||||
self.plugin_ep.__str__ = lambda _: "Mock"
|
||||
assert "Mock" == str(self.reg)
|
||||
plugins = {self.plugin_ep.name: self.plugin_ep, "foo": "Bar"}
|
||||
reg = self._create_new_registry(plugins)
|
||||
reg = disco.PluginsRegistry(plugins)
|
||||
assert "Bar\n\nMock" == str(reg)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -17,29 +17,31 @@ from certbot.plugins import dns_test_common
|
|||
from certbot.tests import util as test_util
|
||||
|
||||
|
||||
class _FakeDNSAuthenticator(dns_common.DNSAuthenticator):
|
||||
_setup_credentials = mock.MagicMock()
|
||||
_perform = mock.MagicMock()
|
||||
_cleanup = mock.MagicMock()
|
||||
|
||||
def more_info(self): # pylint: disable=missing-docstring,no-self-use
|
||||
return 'A fake authenticator for testing.'
|
||||
|
||||
|
||||
class _FakeConfig:
|
||||
fake_propagation_seconds = 0
|
||||
fake_config_key = 1
|
||||
fake_other_key = None
|
||||
fake_file_path = None
|
||||
|
||||
|
||||
class DNSAuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest):
|
||||
# pylint: disable=protected-access
|
||||
|
||||
class _FakeDNSAuthenticator(dns_common.DNSAuthenticator):
|
||||
_setup_credentials = mock.MagicMock()
|
||||
_perform = mock.MagicMock()
|
||||
_cleanup = mock.MagicMock()
|
||||
|
||||
def more_info(self): # pylint: disable=missing-docstring,no-self-use
|
||||
return 'A fake authenticator for testing.'
|
||||
|
||||
class _FakeConfig:
|
||||
fake_propagation_seconds = 0
|
||||
fake_config_key = 1
|
||||
fake_other_key = None
|
||||
fake_file_path = None
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.config = DNSAuthenticatorTest._FakeConfig()
|
||||
self.config = _FakeConfig()
|
||||
|
||||
self.auth = DNSAuthenticatorTest._FakeDNSAuthenticator(self.config, "fake")
|
||||
self.auth = _FakeDNSAuthenticator(self.config, "fake")
|
||||
|
||||
@test_util.patch_display_util()
|
||||
def test_perform(self, unused_mock_get_utility):
|
||||
|
|
@ -126,21 +128,22 @@ class DNSAuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthen
|
|||
self.auth.auth_hint([mock.MagicMock()])
|
||||
|
||||
|
||||
class _MockLoggingHandler(logging.Handler):
|
||||
messages = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.reset()
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def emit(self, record):
|
||||
self.messages[record.levelname.lower()].append(record.getMessage())
|
||||
|
||||
def reset(self):
|
||||
"""Allows the handler to be reset between tests."""
|
||||
self.messages = collections.defaultdict(list)
|
||||
|
||||
|
||||
class CredentialsConfigurationTest(test_util.TempDirTestCase):
|
||||
class _MockLoggingHandler(logging.Handler):
|
||||
messages = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.reset()
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def emit(self, record):
|
||||
self.messages[record.levelname.lower()].append(record.getMessage())
|
||||
|
||||
def reset(self):
|
||||
"""Allows the handler to be reset between tests."""
|
||||
self.messages = collections.defaultdict(list)
|
||||
|
||||
def test_valid_file(self):
|
||||
path = os.path.join(self.tempdir, 'too-permissive-file.ini')
|
||||
|
||||
|
|
@ -157,7 +160,7 @@ class CredentialsConfigurationTest(test_util.TempDirTestCase):
|
|||
dns_common.CredentialsConfiguration(path)
|
||||
|
||||
def test_valid_file_with_unsafe_permissions(self):
|
||||
log = self._MockLoggingHandler()
|
||||
log = _MockLoggingHandler()
|
||||
dns_common.logger.addHandler(log)
|
||||
|
||||
path = os.path.join(self.tempdir, 'too-permissive-file.ini')
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ from unittest.mock import MagicMock, Mock
|
|||
class ConveniencePickPluginTest(unittest.TestCase):
|
||||
"""Tests for certbot._internal.plugins.selection.pick_*."""
|
||||
|
||||
def _test(self, fun: Callable, ifaces: Union[Tuple[Type[Authenticator], Type[Installer]], Tuple[Type[Installer]], Tuple[Type[Authenticator]]]) -> None:
|
||||
def _test(self, fun: Callable, ifaces: Tuple[Type[interfaces.Plugin], ...]) -> None:
|
||||
config = mock.Mock()
|
||||
default = mock.Mock()
|
||||
plugins = mock.Mock()
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class ServerManagerTest(unittest.TestCase):
|
|||
assert self.mgr.certs is self.certs
|
||||
assert self.mgr.http_01_resources is self.http_01_resources
|
||||
|
||||
def _test_run_stop(self, challenge_type: Type[HTTP01]) -> None:
|
||||
def _test_run_stop(self, challenge_type: Type[challenges.HTTP01]) -> None:
|
||||
server = self.mgr.run(port=0, challenge_type=challenge_type)
|
||||
port = server.getsocknames()[0][1]
|
||||
assert self.mgr.running() == {port: server}
|
||||
|
|
|
|||
Loading…
Reference in a new issue