Fix Pylint upgrade issues

* Remove unsupported pylint disable options
    * star-args removed in Pylint 1.4.3
    * abstract-class-little-used removed in Pylint 1.4.3

* Fixes new lint errors

* Copy dummy-variable-rgx expression to new ignored-argument-names expression to ignore unused funtion arguments

* Notable changes
    * Refactor to satisfy Pylint no-else-return warning
    * Fix Pylint inconsistent-return-statements warning
    * Refactor to satisfy consider-iterating-dictionary
    * Remove methods with only super call to satisfy useless-super-delegation
    * Refactor too-many-nested-statements where possible
    * Suppress type checked errors where member is dynamically added (notably derived from josepy.JSONObjectWithFields)
    * Remove None default of func parameter for ExitHandler and ErrorHandler

Resolves #5973
This commit is contained in:
James Payne 2018-05-14 16:33:30 +00:00
parent 24974b07ba
commit 5300d7d71f
90 changed files with 292 additions and 347 deletions

View file

@ -189,6 +189,9 @@ init-import=no
# not used).
dummy-variables-rgx=(unused)?_.*|dummy
# A regular expression matching ignored argument variable names
ignored-argument-names=(unused)?_.*|dummy
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=

View file

@ -95,6 +95,7 @@ class _TokenChallenge(Challenge):
"""
# TODO: check that path combined with uri does not go above
# URI_ROOT_PATH!
# pylint: disable=unsupported-membership-test
return b'..' not in self.token and b'/' not in self.token
@ -142,7 +143,7 @@ class KeyAuthorizationChallengeResponse(ChallengeResponse):
@six.add_metaclass(abc.ABCMeta)
class KeyAuthorizationChallenge(_TokenChallenge):
# pylint: disable=abstract-class-little-used,too-many-ancestors
# pylint: disable=too-many-ancestors
"""Challenge based on Key Authorization.
:param response_cls: Subclass of `KeyAuthorizationChallengeResponse`
@ -174,7 +175,7 @@ class KeyAuthorizationChallenge(_TokenChallenge):
:rtype: KeyAuthorizationChallengeResponse
"""
return self.response_cls(
return self.response_cls( # pylint: disable=not-callable
key_authorization=self.key_authorization(account_key))
@abc.abstractmethod

View file

@ -6,7 +6,7 @@ import mock
import OpenSSL
import requests
from six.moves.urllib import parse as urllib_parse # pylint: disable=import-error
from six.moves.urllib import parse as urllib_parse # pylint: disable=import-error,relative-import
from acme import errors
from acme import test_util

View file

@ -6,16 +6,16 @@ from email.utils import parsedate_tz
import heapq
import logging
import time
import re
import sys
import six
from six.moves import http_client # pylint: disable=import-error
import josepy as jose
import OpenSSL
import re
from requests_toolbelt.adapters.source import SourceAddressAdapter
import requests
from requests.adapters import HTTPAdapter
import sys
from requests_toolbelt.adapters.source import SourceAddressAdapter
from acme import crypto_util
from acme import errors
@ -134,7 +134,7 @@ class ClientBase(object): # pylint: disable=too-many-instance-attributes
authzr = messages.AuthorizationResource(
body=messages.Authorization.from_json(response.json()),
uri=response.headers.get('Location', uri))
if identifier is not None and authzr.body.identifier != identifier:
if identifier is not None and authzr.body.identifier != identifier: # pylint: disable=no-member
raise errors.UnexpectedUpdate(authzr)
return authzr
@ -608,7 +608,7 @@ class ClientV2(ClientBase):
response = self._post(self.directory['newOrder'], order)
body = messages.Order.from_json(response.json())
authorizations = []
for url in body.authorizations:
for url in body.authorizations: # pylint: disable=not-an-iterable
authorizations.append(self._authzr_from_response(self.net.get(url), uri=url))
return messages.OrderResource(
body=body,
@ -640,7 +640,7 @@ class ClientV2(ClientBase):
for url in orderr.body.authorizations:
while datetime.datetime.now() < deadline:
authzr = self._authzr_from_response(self.net.get(url), uri=url)
if authzr.body.status != messages.STATUS_PENDING:
if authzr.body.status != messages.STATUS_PENDING: # pylint: disable=no-member
responses.append(authzr)
break
time.sleep(1)
@ -654,7 +654,7 @@ class ClientV2(ClientBase):
for chall in authzr.body.challenges:
if chall.error != None:
failed.append(authzr)
if len(failed) > 0:
if failed:
raise errors.ValidationError(failed)
return orderr.update(authorizations=responses)
@ -778,8 +778,7 @@ class BackwardsCompatibleClientV2(object):
for domain in dnsNames:
authorizations.append(self.client.request_domain_challenges(domain))
return messages.OrderResource(authorizations=authorizations, csr_pem=csr_pem)
else:
return self.client.new_order(csr_pem)
return self.client.new_order(csr_pem)
def finalize_order(self, orderr, deadline):
"""Finalize an order and obtain a certificate.
@ -812,12 +811,11 @@ class BackwardsCompatibleClientV2(object):
'certificate, please rerun the command for a new one.')
cert = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped).decode()
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped).decode() # pylint: disable=no-member
chain = crypto_util.dump_pyopenssl_chain(chain).decode()
return orderr.update(fullchain_pem=(cert + chain))
else:
return self.client.finalize_order(orderr, deadline)
return self.client.finalize_order(orderr, deadline)
def revoke(self, cert, rsn):
"""Revoke certificate.
@ -835,8 +833,7 @@ class BackwardsCompatibleClientV2(object):
def _acme_version_from_directory(self, directory):
if hasattr(directory, 'newNonce'):
return 2
else:
return 1
return 1
class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
@ -913,7 +910,6 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
if self.account is not None:
kwargs["kid"] = self.account["uri"]
kwargs["key"] = self.key
# pylint: disable=star-args
return jws.JWS.sign(jobj, **kwargs).json_dumps(indent=2)
@classmethod
@ -1057,7 +1053,7 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
if self.REPLAY_NONCE_HEADER in response.headers:
nonce = response.headers[self.REPLAY_NONCE_HEADER]
try:
decoded_nonce = jws.Header._fields['nonce'].decode(nonce)
decoded_nonce = jws.Header._fields['nonce'].decode(nonce) # pylint: disable=no-member
except jose.DeserializationError as error:
raise errors.BadNonce(nonce, error)
logger.debug('Storing nonce: %s', nonce)

View file

@ -63,7 +63,7 @@ class ClientTestBase(unittest.TestCase):
reg = messages.Registration(
contact=self.contact, key=KEY.public_key())
the_arg = dict(reg) # type: Dict
self.new_reg = messages.NewRegistration(**the_arg) # pylint: disable=star-args
self.new_reg = messages.NewRegistration(**the_arg)
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1')
@ -403,7 +403,7 @@ class ClientTest(ClientTestBase):
def test_answer_challenge(self):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.to_json()
self.response.json.return_value = self.challr.body.to_json() # pylint: disable=no-member
chall_response = challenges.DNSResponse(validation=None)
@ -411,7 +411,7 @@ class ClientTest(ClientTestBase):
# TODO: split here and separate test
self.assertRaises(errors.UnexpectedUpdate, self.client.answer_challenge,
self.challr.body.update(uri='foo'), chall_response)
self.challr.body.update(uri='foo'), chall_response) # pylint: disable=no-member
def test_answer_challenge_missing_next(self):
self.assertRaises(
@ -465,7 +465,7 @@ class ClientTest(ClientTestBase):
self.client.retry_after(response=self.response, default=10))
def test_poll(self):
self.response.json.return_value = self.authzr.body.to_json()
self.response.json.return_value = self.authzr.body.to_json() # pylint: disable=no-member
self.assertEqual((self.authzr, self.response),
self.client.poll(self.authzr))
@ -694,7 +694,7 @@ class ClientV2Test(ClientTestBase):
def test_new_account(self):
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.json.return_value = self.regr.body.to_json() # pylint: disable=no-member
self.response.headers['Location'] = self.regr.uri
self.assertEqual(self.regr, self.client.new_account(self.new_reg))
@ -743,7 +743,7 @@ class ClientV2Test(ClientTestBase):
def test_poll_authorizations_failure(self):
deadline = datetime.datetime(9999, 9, 9)
challb = self.challr.body.update(status=messages.STATUS_INVALID,
challb = self.challr.body.update(status=messages.STATUS_INVALID, # pylint: disable=no-member
error=messages.Error.with_code('unauthorized'))
authz = self.authz.update(status=messages.STATUS_INVALID, challenges=(challb,))
self.response.json.return_value = authz.to_json()
@ -799,7 +799,7 @@ class MockJSONDeSerializable(jose.JSONDeSerializable):
return {'foo': self.value}
@classmethod
def from_json(cls, value):
def from_json(cls, jobj):
pass # pragma: no cover
@ -829,7 +829,7 @@ class ClientNetworkTest(unittest.TestCase):
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url",
acme_version=1)
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'}) # pylint: disable=no-member
self.assertEqual(jws.signature.combined.nonce, b'Tg')
def test_wrap_in_jws_v2(self):
@ -839,7 +839,7 @@ class ClientNetworkTest(unittest.TestCase):
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url",
acme_version=2)
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'})
self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'}) # pylint: disable=no-member
self.assertEqual(jws.signature.combined.nonce, b'Tg')
self.assertEqual(jws.signature.combined.kid, u'acct-uri')
self.assertEqual(jws.signature.combined.url, u'url')

View file

@ -138,7 +138,6 @@ def probe_sni(name, host, port=443, timeout=300,
host_protocol_agnostic = None if host == '::' or host == '0' else host
try:
# pylint: disable=star-args
logger.debug("Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
" from {0}:{1}".format(source_address[0], source_address[1]) if \
socket_kwargs else "")
@ -194,8 +193,7 @@ def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req):
if common_name is None:
return sans
else:
return [common_name] + [d for d in sans if d != common_name]
return [common_name] + [d for d in sans if d != common_name]
def _pyopenssl_cert_or_req_san(cert_or_req):
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL.

View file

@ -24,7 +24,7 @@ class HeaderTest(unittest.TestCase):
def test_nonce_decoder(self):
from acme.jws import Header
nonce_field = Header._fields['nonce']
nonce_field = Header._fields['nonce'] # pylint: disable=no-member
self.assertRaises(
jose.DeserializationError, nonce_field.decode, self.wrong_nonce)

View file

@ -40,8 +40,7 @@ def is_acme_error(err):
"""Check if argument is an ACME error."""
if isinstance(err, Error) and (err.typ is not None):
return (ERROR_PREFIX in err.typ) or (OLD_ERROR_PREFIX in err.typ)
else:
return False
return False
@six.python_2_unicode_compatible
@ -96,6 +95,7 @@ class Error(jose.JSONObjectWithFields, errors.Error):
code = str(self.typ).split(':')[-1]
if code in ERROR_CODES:
return code
return None
def __str__(self):
return b' :: '.join(
@ -110,18 +110,19 @@ class _Constant(jose.JSONDeSerializable, collections.Hashable): # type: ignore
POSSIBLE_NAMES = NotImplemented
def __init__(self, name):
self.POSSIBLE_NAMES[name] = self
super(_Constant, self).__init__()
self.POSSIBLE_NAMES[name] = self # pylint: disable=unsupported-assignment-operation
self.name = name
def to_partial_json(self):
return self.name
@classmethod
def from_json(cls, value):
if value not in cls.POSSIBLE_NAMES:
def from_json(cls, jobj):
if jobj not in cls.POSSIBLE_NAMES: # pylint: disable=unsupported-membership-test
raise jose.DeserializationError(
'{0} not recognized'.format(cls.__name__))
return cls.POSSIBLE_NAMES[value]
return cls.POSSIBLE_NAMES[jobj] # pylint: disable=unsubscriptable-object
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, self.name)
@ -179,7 +180,6 @@ class Directory(jose.JSONDeSerializable):
def __init__(self, **kwargs):
kwargs = dict((self._internal_name(k), v) for k, v in kwargs.items())
# pylint: disable=star-args
super(Directory.Meta, self).__init__(**kwargs)
@property
@ -291,7 +291,7 @@ class Registration(ResourceBody):
def _filter_contact(self, prefix):
return tuple(
detail[len(prefix):] for detail in self.contact
detail[len(prefix):] for detail in self.contact # pylint: disable=not-an-iterable
if detail.startswith(prefix))
@property
@ -363,7 +363,6 @@ class ChallengeBody(ResourceBody):
def __init__(self, **kwargs):
kwargs = dict((self._internal_name(k), v) for k, v in kwargs.items())
# pylint: disable=star-args
super(ChallengeBody, self).__init__(**kwargs)
def encode(self, name):
@ -446,7 +445,7 @@ class Authorization(ResourceBody):
def resolved_combinations(self):
"""Combinations with challenges instead of indices."""
return tuple(tuple(self.challenges[idx] for idx in combo)
for combo in self.combinations)
for combo in self.combinations) # pylint: disable=not-an-iterable
@Directory.register

View file

@ -82,7 +82,7 @@ class BaseDualNetworkedServers(object):
kwargs["ipv6"] = ip_version
new_address = (server_address[0],) + (port,) + server_address[2:]
new_args = (new_address,) + remaining_args
server = ServerClass(*new_args, **kwargs) # pylint: disable=star-args
server = ServerClass(*new_args, **kwargs)
except socket.error:
logger.debug("Failed to bind to %s:%s using %s", new_address[0],
new_address[1], "IPv6" if ip_version else "IPv4")
@ -91,7 +91,7 @@ class BaseDualNetworkedServers(object):
# If two servers are set up and port 0 was passed in, ensure we always
# bind to the same port for both servers.
port = server.socket.getsockname()[1]
if len(self.servers) == 0:
if not self.servers:
raise socket.error("Could not bind to IPv4 or IPv6.")
def serve_forever(self):

View file

@ -4,8 +4,8 @@
"""
import os
import pkg_resources
import unittest
import pkg_resources
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
@ -92,5 +92,4 @@ def skip_unless(condition, reason): # pragma: no cover
return unittest.skipUnless(condition, reason)
elif condition:
return lambda cls: cls
else:
return lambda cls: None
return lambda cls: None

View file

@ -3,12 +3,15 @@
import fnmatch
import logging
import os
import pkg_resources
import re
import six
import socket
import time
from collections import defaultdict
import pkg_resources
import six
import zope.component
import zope.interface
@ -32,8 +35,6 @@ from certbot_apache import obj
from certbot_apache import parser
from certbot_apache import tls_sni_01
from collections import defaultdict
logger = logging.getLogger(__name__)
@ -214,7 +215,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
'.'.join(str(i) for i in self.version))
if self.version < (2, 2):
raise errors.NotSupportedError(
"Apache Version %s not supported.", str(self.version))
"Apache Version {0} not supported.".format(str(self.version)))
if not self._check_aug_version():
raise errors.NotSupportedError(
@ -245,7 +246,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
except (OSError, errors.LockError):
logger.debug("Encountered error:", exc_info=True)
raise errors.PluginError(
"Unable to lock %s", self.conf("server-root"))
"Unable to lock {0}".format(self.conf("server-root")))
def _check_aug_version(self):
""" Checks that we have recent enough version of libaugeas.
@ -352,6 +353,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
if len(name.split(".")) == len(domain.split(".")):
return fnmatch.fnmatch(name, domain)
return None
def _choose_vhosts_wildcard(self, domain, create_ssl=True):
@ -678,7 +680,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if name:
all_names.add(name)
if len(vhost_macro) > 0:
if vhost_macro:
zope.component.getUtility(interfaces.IDisplay).notification(
"Apache mod_macro seems to be in use in file(s):\n{0}"
"\n\nUnfortunately mod_macro is not yet supported".format(
@ -1024,6 +1026,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Ugly but takes care of protocol def, eg: 1.1.1.1:443 https
if listen.split(":")[-1].split(" ")[0] == port:
return True
return None
def prepare_https_modules(self, temp):
"""Helper method for prepare_server_https, taking care of enabling
@ -1170,8 +1173,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if fp.endswith(".conf"):
return fp[:-(len(".conf"))] + self.conf("le_vhost_ext")
else:
return fp + self.conf("le_vhost_ext")
return fp + self.conf("le_vhost_ext")
def _sift_rewrite_rule(self, line):
"""Decides whether a line should be copied to a SSL vhost.
@ -1391,8 +1393,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
def _remove_directives(self, vh_path, directives):
for directive in directives:
while len(self.parser.find_dir(directive, None,
vh_path, False)) > 0:
while self.parser.find_dir(directive, None, vh_path, False):
directive_path = self.parser.find_dir(directive, None,
vh_path, False)
self.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))

View file

@ -26,7 +26,7 @@ def select_vhost_multiple(vhosts):
return list()
tags_list = [vhost.display_repr()+"\n" for vhost in vhosts]
# Remove the extra newline from the last entry
if len(tags_list):
if tags_list:
tags_list[-1] = tags_list[-1][:-1]
code, names = zope.component.getUtility(interfaces.IDisplay).checklist(
"Which VirtualHosts would you like to install the wildcard certificate for?",
@ -62,8 +62,7 @@ def select_vhost(domain, vhosts):
code, tag = _vhost_menu(domain, vhosts)
if code == display_util.OK:
return vhosts[tag]
else:
return None
return None
def _vhost_menu(domain, vhosts):
"""Select an appropriate Apache Vhost.
@ -93,7 +92,7 @@ def _vhost_menu(domain, vhosts):
for vhost in vhosts:
if len(vhost.get_names()) == 1:
disp_name = next(iter(vhost.get_names()))
elif len(vhost.get_names()) == 0:
elif not vhost.get_names():
disp_name = ""
else:
disp_name = "Multiple Names"

View file

@ -26,7 +26,7 @@ class Addr(common.Addr):
def __repr__(self):
return "certbot_apache.obj.Addr(" + repr(self.tup) + ")"
def __hash__(self):
def __hash__(self): # pylint: disable=useless-super-delegation
# Python 3 requires explicit overridden for __hash__ if __eq__ or
# __cmp__ is overridden. See https://bugs.python.org/issue2235
return super(Addr, self).__hash__()
@ -47,8 +47,7 @@ class Addr(common.Addr):
return 0
elif self.get_addr() == "*":
return 1
else:
return 2
return 2
def conflicts(self, addr):
r"""Returns if address could conflict with correct function of self.

View file

@ -56,5 +56,5 @@ class CentOSParser(parser.ApacheParser):
def parse_sysconfig_var(self):
""" Parses Apache CLI options from CentOS configuration file """
defines = apache_util.parse_define_file(self.sysconfig_filep, "OPTIONS")
for k in defines.keys():
for k in defines:
self.variables[k] = defines[k]

View file

@ -51,7 +51,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
"""
if vhost.enabled:
return
return None
enabled_path = ("%s/sites-enabled/%s" %
(self.parser.root,
@ -68,7 +68,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
enabled_path) == vhost.filep:
# Already in shape
vhost.enabled = True
return
return None
else:
logger.warning(
"Could not symlink %s to %s, got error: %s", enabled_path,
@ -81,6 +81,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
vhost.enabled = True
logger.info("Enabling available site: %s", vhost.filep)
self.save_notes += "Enabled site %s\n" % vhost.filep
return None
def enable_mod(self, mod_name, temp=False):
# pylint: disable=unused-argument

View file

@ -56,7 +56,7 @@ class GentooParser(parser.ApacheParser):
""" Parses Apache CLI options from Gentoo configuration file """
defines = apache_util.parse_define_file(self.apacheconfig_filep,
"APACHE2_OPTS")
for k in defines.keys():
for k in defines:
self.variables[k] = defines[k]
def update_modules(self):

View file

@ -82,7 +82,7 @@ class ApacheParser(object):
:param str inc_path: path of file to include
"""
if len(self.find_dir(case_i("Include"), inc_path)) == 0:
if not self.find_dir(case_i("Include"), inc_path):
logger.debug("Adding Include %s to %s",
inc_path, get_aug_path(main_config))
self.add_dir(
@ -138,7 +138,7 @@ class ApacheParser(object):
mods.add(os.path.basename(mod_filename)[:-2] + "c")
else:
logger.debug("Could not read LoadModule directive from " +
"Augeas path: {0}".format(match_name[6:]))
"Augeas path: %s", match_name[6:])
self.modules.update(mods)
def update_runtime_variables(self):
@ -228,8 +228,8 @@ class ApacheParser(object):
"Error running command %s for runtime parameters!%s",
command, os.linesep)
raise errors.MisconfigurationError(
"Error accessing loaded Apache parameters: %s",
command)
"Error accessing loaded Apache parameters: {0}".format(
command))
# Small errors that do not impede
if proc.returncode != 0:
logger.warning("Error in checking parameter list: %s", stderr)
@ -255,12 +255,12 @@ class ApacheParser(object):
"""
filtered = []
if args == 1:
for i in range(len(matches)):
if matches[i].endswith("/arg"):
for i, match in enumerate(matches):
if match.endswith("/arg"):
filtered.append(matches[i][:-4])
else:
for i in range(len(matches)):
if matches[i].endswith("/arg[%d]" % args):
for i, match in enumerate(matches):
if match.endswith("/arg[%d]" % args):
# Make sure we don't cause an IndexError (end of list)
# Check to make sure arg + 1 doesn't exist
if (i == (len(matches) - 1) or
@ -305,7 +305,7 @@ class ApacheParser(object):
"""
if_mods = self.aug.match(("%s/IfModule/*[self::arg='%s']" %
(aug_conf_path, mod)))
if len(if_mods) == 0:
if not if_mods:
self.aug.set("%s/IfModule[last() + 1]" % aug_conf_path, "")
self.aug.set("%s/IfModule[last()]/arg" % aug_conf_path, mod)
if_mods = self.aug.match(("%s/IfModule/*[self::arg='%s']" %
@ -569,9 +569,8 @@ class ApacheParser(object):
if sys.version_info < (3, 6):
# This strips off final /Z(?ms)
return fnmatch.translate(clean_fn_match)[:-7]
else: # pragma: no cover
# Since Python 3.6, it returns a different pattern like (?s:.*\.load)\Z
return fnmatch.translate(clean_fn_match)[4:-3]
# Since Python 3.6, it returns a different pattern like (?s:.*\.load)\Z
return fnmatch.translate(clean_fn_match)[4:-3] # pragma: no cover
def parse_file(self, filepath):
"""Parse file with Augeas
@ -653,10 +652,7 @@ class ApacheParser(object):
use_new = False
else:
use_new = True
if new_file_match == "*":
remove_old = True
else:
remove_old = False
remove_old = new_file_match == "*"
except KeyError:
use_new = True
remove_old = False

View file

@ -340,6 +340,7 @@ class MultipleVhostsTest(util.ApacheTest):
"""Mock method for parser.find_dir"""
if directive == "Include" and argument.endswith("options-ssl-apache.conf"):
return ["/path/to/whatever"]
return None # pragma: no cover
mock_add = mock.MagicMock()
self.config.parser.add_dir = mock_add
@ -451,8 +452,7 @@ class MultipleVhostsTest(util.ApacheTest):
but an SSLCertificateKeyFile directive is missing."""
if "SSLCertificateFile" in args:
return ["example/cert.pem"]
else:
return []
return []
mock_find_dir = mock.MagicMock(return_value=[])
mock_find_dir.side_effect = side_effect
@ -1025,7 +1025,7 @@ class MultipleVhostsTest(util.ApacheTest):
# pylint: disable=protected-access
http_vh = self.config._get_http_vhost(ssl_vh)
self.assertTrue(http_vh.ssl == False)
self.assertFalse(http_vh.ssl)
@mock.patch("certbot.util.run_script")
@mock.patch("certbot.util.exe_exists")
@ -1390,7 +1390,7 @@ class MultipleVhostsTest(util.ApacheTest):
# pylint: disable=protected-access
cases = {u"*.example.org": True, b"*.x.example.org": True,
u"a.example.org": False, b"a.x.example.org": False}
for key in cases.keys():
for key in cases:
self.assertEqual(self.config._wildcard_domain(key), cases[key])
def test_choose_vhosts_wildcard(self):
@ -1514,7 +1514,7 @@ class AugeasVhostsTest(util.ApacheTest):
def test_choosevhost_works(self):
path = "debian_apache_2_4/augeas_vhosts/apache2/sites-available/old,default.conf"
chosen_vhost = self.config._create_vhost(path)
self.assertTrue(chosen_vhost == None or chosen_vhost.path == path)
self.assertTrue(chosen_vhost is None or chosen_vhost.path == path)
@mock.patch("certbot_apache.configurator.ApacheConfigurator._create_vhost")
def test_get_vhost_continue(self, mock_vhost):

View file

@ -14,7 +14,7 @@ class EntryPointTest(unittest.TestCase):
def test_get_configurator(self):
with mock.patch("certbot.util.get_os_info") as mock_info:
for distro in entrypoint.OVERRIDE_CLASSES.keys():
for distro in entrypoint.OVERRIDE_CLASSES:
mock_info.return_value = (distro, "whatever")
self.assertEqual(entrypoint.get_configurator(),
entrypoint.OVERRIDE_CLASSES[distro])
@ -23,7 +23,7 @@ class EntryPointTest(unittest.TestCase):
with mock.patch("certbot.util.get_os_info") as mock_info:
mock_info.return_value = ("nonexistent", "irrelevant")
with mock.patch("certbot.util.get_systemd_os_like") as mock_like:
for like in entrypoint.OVERRIDE_CLASSES.keys():
for like in entrypoint.OVERRIDE_CLASSES:
mock_like.return_value = [like]
self.assertEqual(entrypoint.get_configurator(),
entrypoint.OVERRIDE_CLASSES[like])

View file

@ -113,6 +113,7 @@ class MultipleVhostsTestGentoo(util.ApacheTest):
"""Mock httpd process stdout"""
if command == ['apache2ctl', 'modules']:
return mod_val
return None # pragma: no cover
mock_get.side_effect = mock_get_cfg
self.config.parser.modules = set()

View file

@ -1,7 +1,7 @@
"""Test for certbot_apache.http_01."""
import mock
import os
import unittest
import mock
from acme import challenges
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
@ -19,7 +19,7 @@ NUM_ACHALLS = 3
class ApacheHttp01Test(util.ApacheTest):
"""Test for certbot_apache.http_01.ApacheHttp01."""
def setUp(self, *args, **kwargs):
def setUp(self, *args, **kwargs): # pylint: disable=arguments-differ
super(ApacheHttp01Test, self).setUp(*args, **kwargs)
self.account_key = self.rsa512jwk
@ -77,7 +77,7 @@ class ApacheHttp01Test(util.ApacheTest):
calls = mock_enmod.call_args_list
other_calls = []
for call in calls:
if "rewrite" != call[0][0]:
if call[0][0] != "rewrite":
other_calls.append(call)
# If these lists are equal, we never enabled mod_rewrite

View file

@ -234,6 +234,7 @@ class BasicParserTest(util.ParserTest):
return inc_val
elif cmd[-1] == "DUMP_MODULES":
return mod_val
return None # pragma: no cover
mock_cfg.side_effect = mock_get_vars

View file

@ -4,14 +4,14 @@ import unittest
import mock
from six.moves import xrange # pylint: disable=redefined-builtin, import-error
from certbot import errors
from certbot.plugins import common_test
from certbot_apache import obj
from certbot_apache.tests import util
from six.moves import xrange # pylint: disable=redefined-builtin, import-error
class TlsSniPerformTest(util.ApacheTest):
"""Test the ApacheTlsSni01 challenge."""

View file

@ -115,8 +115,7 @@ def get_apache_configurator( # pylint: disable=too-many-arguments, too-many-loc
"""Mock default vhost path"""
if key == "vhost_root":
return vhost_path
else:
return orig_os_constant(key)
return orig_os_constant(key)
with mock.patch("certbot_apache.configurator.ApacheConfigurator.constant") as mock_cons:
mock_cons.side_effect = mock_os_constant

View file

@ -135,7 +135,7 @@ class ApacheTlsSni01(common.TLSSNI01):
return addrs
for addr in vhost.addrs:
if "_default_" == addr.get_addr():
if addr.get_addr() == "_default_":
addrs.add(default_addr)
else:
addrs.add(

View file

@ -88,8 +88,7 @@ class Proxy(object):
"""Returns the set of domain names that can be tested against"""
if self._test_names:
return self._test_names
else:
return {"example.com"}
return {"example.com"}
def deploy_cert(self, domain, cert_path, key_path, chain_path=None,
fullchain_path=None):

View file

@ -19,10 +19,6 @@ class Proxy(configurators_common.Proxy):
# pylint: disable=too-many-instance-attributes
"""A common base for Nginx test configurators"""
def __init__(self, args):
"""Initializes the plugin with the given command line args"""
super(Proxy, self).__init__(args)
def load_config(self):
"""Loads the next configuration for the plugin to test"""
config = super(Proxy, self).load_config()
@ -48,7 +44,7 @@ class Proxy(configurators_common.Proxy):
def _prepare_configurator(self):
"""Prepares the Nginx plugin for testing"""
for k in constants.CLI_DEFAULTS.keys():
for k in constants.CLI_DEFAULTS:
setattr(self.le_config, "nginx_" + k, constants.os_constant(k))
conf = configuration.NamespaceConfig(self.le_config)
@ -75,12 +71,19 @@ def _get_names(config):
all_names = set()
for root, _dirs, files in os.walk(config):
for this_file in files:
for line in open(os.path.join(root, this_file)):
if line.strip().startswith("server_name"):
names = line.partition("server_name")[2].rpartition(";")[0]
for n in names.split():
# Filter out wildcards in both all_names and test_names
if not n.startswith("*."):
all_names.add(n)
update_names = _get_server_names(root, this_file)
all_names.update(update_names)
non_ip_names = set(n for n in all_names if not util.IP_REGEX.match(n))
return all_names, non_ip_names
def _get_server_names(root, filename):
"""Returns all names in a config file path"""
all_names = set()
for line in open(os.path.join(root, filename)):
if line.strip().startswith("server_name"):
names = line.partition("server_name")[2].rpartition(";")[0]
for n in names.split():
# Filter out wildcards in both all_names and test_names
if not n.startswith("*."):
all_names.add(n)
return all_names

View file

@ -6,8 +6,9 @@ import certbot.interfaces
# pylint: disable=no-self-argument,no-method-argument
class IPluginProxy(zope.interface.Interface):
class IPluginProxy(zope.interface.Interface): # pylint: disable=inherit-non-class
"""Wraps a Certbot plugin"""
http_port = zope.interface.Attribute(
"The port to connect to on localhost for HTTP traffic")
@ -17,7 +18,7 @@ class IPluginProxy(zope.interface.Interface):
def add_parser_arguments(cls, parser):
"""Adds command line arguments needed by the parser"""
def __init__(args):
def __init__(args): # pylint: disable=super-init-not-called
"""Initializes the plugin with the given command line args"""
def cleanup_from_tests(): # type: ignore

View file

@ -240,9 +240,8 @@ def test_rollback(plugin, config, backup):
if _dirs_are_unequal(config, backup):
logger.error("*** Rollback failed for config `%s`", config)
return False
else:
logger.info("Rollback succeeded")
return True
logger.info("Rollback succeeded")
return True
def _create_backup(config, temp_dir):
@ -257,7 +256,7 @@ def _create_backup(config, temp_dir):
def _dirs_are_unequal(dir1, dir2):
"""Returns True if dir1 and dir2 are unequal"""
dircmps = [filecmp.dircmp(dir1, dir2)]
while len(dircmps):
while dircmps:
dircmp = dircmps.pop()
if dircmp.left_only or dircmp.right_only:
logger.error("The following files and directories are only "

View file

@ -35,7 +35,7 @@ def create_le_config(parent_dir):
config["domains"] = None
return argparse.Namespace(**config) # pylint: disable=star-args
return argparse.Namespace(**config)
def extract_configs(configs, parent_dir):

View file

@ -1,6 +1,6 @@
"""Tests for certbot_compatibility_test.validator."""
import requests
import unittest
import requests
import mock
import OpenSSL

View file

@ -159,7 +159,7 @@ class _CloudflareClient(object):
'you have supplied valid Cloudflare API credentials.{2}'
.format(code, e, ' ({0})'.format(hint) if hint else ''))
if len(zones) > 0:
if zones:
zone_id = zones[0]['id']
logger.debug('Found zone_id of %s for %s using name %s', zone_id, domain, zone_name)
return zone_id
@ -191,9 +191,9 @@ class _CloudflareClient(object):
logger.debug('Encountered CloudFlareAPIError getting TXT record_id: %s', e)
records = []
if len(records) > 0:
if records:
# Cleanup is returning the system to the state we found it. If, for some reason,
# there are multiple matching records, we only delete one because we only added one.
return records[0]['id']
else:
logger.debug('Unable to find TXT record.')
logger.debug('Unable to find TXT record.')
return None

View file

@ -154,7 +154,7 @@ class _DigitalOceanClient(object):
for guess in domain_name_guesses:
matches = [domain for domain in domains if domain.name == guess]
if len(matches) > 0:
if matches:
domain = matches[0]
logger.debug('Found base domain for %s using name %s', domain_name, guess)
return domain

View file

@ -79,7 +79,7 @@ class _DNSMadeEasyLexiconClient(dns_common_lexicon.LexiconClient):
def _handle_http_error(self, e, domain_name):
if domain_name in str(e) and str(e).startswith('404 Client Error: Not Found for url:'):
return
return None
hint = None
if str(e).startswith('403 Client Error: Forbidden for url:'):

View file

@ -274,7 +274,7 @@ class _GoogleClient(object):
raise errors.PluginError('Encountered error finding managed zone: {0}'
.format(e))
if len(zones) > 0:
if zones:
zone_id = zones[0]['id']
logger.debug('Found id of %s for %s using name %s', zone_id, domain, zone_name)
return zone_id
@ -303,5 +303,4 @@ class _GoogleClient(object):
if isinstance(content, bytes):
return content.decode()
else:
return content
return content

View file

@ -73,11 +73,10 @@ class _NS1LexiconClient(dns_common_lexicon.LexiconClient):
def _handle_http_error(self, e, domain_name):
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:') or \
str(e).startswith("400 Client Error: Bad Request for url:")):
return # Expected errors when zone name guess is wrong
else:
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Is your API key correct?'
return None # Expected errors when zone name guess is wrong
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Is your API key correct?'
return errors.PluginError('Error determining zone identifier: {0}.{1}'
.format(e, ' ({0})'.format(hint) if hint else ''))
return errors.PluginError('Error determining zone identifier: {0}.{1}'
.format(e, ' ({0})'.format(hint) if hint else ''))

View file

@ -220,4 +220,3 @@ class _RFC2136Client(object):
except Exception as e:
raise errors.PluginError('Encountered error when making query: {0}'
.format(e))

View file

@ -171,7 +171,7 @@ class RFC2136ClientTest(unittest.TestCase):
result = self.rfc2136_client._query_soa(DOMAIN)
query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
self.assertTrue(result == True)
self.assertTrue(result)
@mock.patch("dns.query.udp")
def test_query_soa_not_found(self, query_mock):
@ -181,7 +181,7 @@ class RFC2136ClientTest(unittest.TestCase):
result = self.rfc2136_client._query_soa(DOMAIN)
query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
self.assertTrue(result == False)
self.assertFalse(result)
@mock.patch("dns.query.udp")
def test_query_soa_wraps_errors(self, query_mock):

View file

@ -44,7 +44,7 @@ class Authenticator(dns_common.DNSAuthenticator):
def _setup_credentials(self):
pass
def _perform(self, domain, validation_domain_name, validation): # pylint: disable=missing-docstring
def _perform(self, domain, validation_name, validation): # pylint: disable=missing-docstring
pass
def perform(self, achalls):
@ -65,9 +65,9 @@ class Authenticator(dns_common.DNSAuthenticator):
raise errors.PluginError("\n".join([str(e), INSTRUCTIONS]))
return [achall.response(achall.account_key) for achall in achalls]
def _cleanup(self, domain, validation_domain_name, validation):
def _cleanup(self, domain, validation_name, validation):
try:
self._change_txt_record("DELETE", validation_domain_name, validation)
self._change_txt_record("DELETE", validation_name, validation)
except (NoCredentialsError, ClientError) as e:
logger.debug('Encountered error during cleanup: %s', e, exc_info=True)

View file

@ -13,6 +13,7 @@ import zope.interface
from acme import challenges
from acme import crypto_util as acme_crypto_util
from acme.magic_typing import List, Dict, Set # pylint: disable=unused-import, no-name-in-module
from certbot import constants as core_constants
from certbot import crypto_util
@ -29,7 +30,6 @@ from certbot_nginx import parser
from certbot_nginx import tls_sni_01
from certbot_nginx import http_01
from certbot_nginx import obj # pylint: disable=unused-import
from acme.magic_typing import List, Dict, Set # pylint: disable=unused-import, no-name-in-module
@ -157,7 +157,7 @@ class NginxConfigurator(common.Installer):
except (OSError, errors.LockError):
logger.debug('Encountered error:', exc_info=True)
raise errors.PluginError(
'Unable to lock %s', self.conf('server-root'))
'Unable to lock {0}'.format(self.conf('server-root')))
# Entry point in main.py for installing cert
@ -398,9 +398,8 @@ class NginxConfigurator(common.Installer):
rank = matches[0]['rank']
wildcards = [x for x in matches if x['rank'] == rank]
return max(wildcards, key=lambda x: len(x['name']))['vhost']
else:
# Exact or regex match
return matches[0]['vhost']
# Exact or regex match
return matches[0]['vhost']
def _rank_matches_by_name_and_ssl(self, vhost_list, target_name):
@ -479,25 +478,23 @@ class NginxConfigurator(common.Installer):
if matching_port == "" or matching_port is None:
# if no port is specified, Nginx defaults to listening on port 80.
return test_port == self.DEFAULT_LISTEN_PORT
else:
return test_port == matching_port
return test_port == matching_port
def _vhost_listening_on_port_no_ssl(self, vhost, port):
found_matching_port = False
if len(vhost.addrs) == 0:
if not vhost.addrs:
# if there are no listen directives at all, Nginx defaults to
# listening on port 80.
found_matching_port = (port == self.DEFAULT_LISTEN_PORT)
else:
for addr in vhost.addrs:
if self._port_matches(port, addr.get_port()) and addr.ssl == False:
if self._port_matches(port, addr.get_port()) and not addr.ssl:
found_matching_port = True
if found_matching_port:
# make sure we don't have an 'ssl on' directive
return not self.parser.has_ssl_on_directive(vhost)
else:
return False
return False
def _get_redirect_ranked_matches(self, target_name, port):
"""Gets a ranked list of plaintextish port-listening vhosts matching target_name
@ -586,7 +583,7 @@ class NginxConfigurator(common.Installer):
# If the vhost was implicitly listening on the default Nginx port,
# have it continue to do so.
if len(vhost.addrs) == 0:
if not vhost.addrs:
listen_block = [['\n ', 'listen', ' ', self.DEFAULT_LISTEN_PORT]]
self.parser.add_server_directives(vhost, listen_block)

View file

@ -22,7 +22,7 @@ def select_vhost_multiple(vhosts):
return list()
tags_list = [vhost.display_repr()+"\n" for vhost in vhosts]
# Remove the extra newline from the last entry
if len(tags_list):
if tags_list:
tags_list[-1] = tags_list[-1][:-1]
code, names = zope.component.getUtility(interfaces.IDisplay).checklist(
"Which server blocks would you like to modify?",

View file

@ -4,13 +4,13 @@ import logging
import os
from acme import challenges
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot.plugins import common
from certbot_nginx import obj
from certbot_nginx import nginxparser
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
@ -207,3 +207,4 @@ class NginxHttp01(common.ChallengePerformer):
' ', '$1', ' ', 'break']]
self.configurator.parser.add_server_directives(vhost,
rewrite_directive, insert_at_top=True)
return None

View file

@ -84,7 +84,7 @@ class Addr(common.Addr):
port = tup[2]
# The rest of the parts are options; we only care about ssl and default
while len(parts) > 0:
while parts:
nextpart = parts.pop()
if nextpart == 'ssl':
ssl = True
@ -120,7 +120,7 @@ class Addr(common.Addr):
def __repr__(self):
return "Addr(" + self.__str__() + ")"
def __hash__(self):
def __hash__(self): # pylint: disable=useless-super-delegation
# Python 3 requires explicit overridden for __hash__
# See certbot-apache/certbot_apache/obj.py for more information
return super(Addr, self).__hash__()
@ -224,15 +224,17 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
for a in self.addrs:
if a.ipv6:
return True
return False
def ipv4_enabled(self):
"""Return true if one or more of the listen directives in vhost are IPv4
only"""
if self.addrs is None or len(self.addrs) == 0:
if not self.addrs:
return True
for a in self.addrs:
if not a.ipv6:
return True
return False
def display_repr(self):
"""Return a representation of VHost to be used in dialog"""
@ -250,7 +252,7 @@ def _find_directive(directives, directive_name, match_content=None):
"""Find a directive of type directive_name in directives. If match_content is given,
Searches for `match_content` in the directive arguments.
"""
if not directives or isinstance(directives, six.string_types) or len(directives) == 0:
if not directives or isinstance(directives, six.string_types):
return None
# If match_content is None, just match on directive type. Otherwise, match on

View file

@ -4,8 +4,8 @@ import functools
import glob
import logging
import os
import pyparsing
import re
import pyparsing
import six
@ -52,6 +52,7 @@ class NginxParser(object):
:param str filepath: The path to the files to parse, as a glob
"""
# pylint: disable=too-many-nested-blocks
filepath = self.abs_path(filepath)
trees = self._parse_files(filepath)
for tree in trees:
@ -82,8 +83,7 @@ class NginxParser(object):
"""
if not os.path.isabs(path):
return os.path.join(self.root, path)
else:
return path
return path
def _build_addr_to_ssl(self):
"""Builds a map from address to whether it listens on ssl in any server block
@ -381,7 +381,7 @@ class NginxParser(object):
if only_directives is not None:
new_directives = nginxparser.UnspacedList([])
for directive in raw_in_parsed[1]:
if len(directive) > 0 and directive[0] in only_directives:
if directive and directive[0] in only_directives:
new_directives.append(directive)
raw_in_parsed[1] = new_directives
@ -394,7 +394,7 @@ class NginxParser(object):
addr.default = False
addr.ipv6only = False
for directive in enclosing_block[new_vhost.path[-1]][1]:
if len(directive) > 0 and directive[0] == 'listen':
if directive and directive[0] == 'listen':
if 'default_server' in directive:
del directive[directive.index('default_server')]
if 'default' in directive:
@ -460,19 +460,19 @@ def get_best_match(target_name, names):
elif _regex_match(target_name, name):
regex.append(name)
if len(exact) > 0:
if exact:
# There can be more than one exact match; e.g. eff.org, .eff.org
match = min(exact, key=len)
return ('exact', match)
if len(wildcard_start) > 0:
if wildcard_start:
# Return the longest wildcard
match = max(wildcard_start, key=len)
return ('wildcard_start', match)
if len(wildcard_end) > 0:
if wildcard_end:
# Return the longest wildcard
match = max(wildcard_end, key=len)
return ('wildcard_end', match)
if len(regex) > 0:
if regex:
# Just return the first one for now
match = regex[0]
return ('regex', match)
@ -517,10 +517,7 @@ def _regex_match(target_name, name):
# After tilde is a perl-compatible regex
try:
regex = re.compile(name[1:])
if re.match(regex, target_name):
return True
else:
return False
return re.match(regex, target_name)
except re.error: # pragma: no cover
# perl-compatible regexes are sometimes not recognized by python
return False

View file

@ -17,9 +17,6 @@ from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-m
class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
"""Nginx Parser Test."""
def setUp(self):
super(NginxParserTest, self).setUp()
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)

View file

@ -1,9 +1,9 @@
"""Common utilities for certbot_nginx."""
import copy
import os
import pkg_resources
import tempfile
import unittest
import pkg_resources
import josepy as jose
import mock
@ -113,8 +113,7 @@ def contains_at_depth(haystack, needle, n):
return False
if n == 0:
return needle in haystack
else:
for item in haystack:
if contains_at_depth(item, needle, n - 1):
return True
return False
for item in haystack:
if contains_at_depth(item, needle, n - 1):
return True
return False

View file

@ -107,7 +107,7 @@ class AccountMemoryStorage(interfaces.AccountStorage):
def find_all(self):
return list(six.itervalues(self.accounts))
def save(self, account, acme):
def save(self, account, client):
# pylint: disable=unused-argument
if account.id in self.accounts:
logger.debug("Overwriting account: %s", account.id)
@ -193,8 +193,8 @@ class AccountFileStorage(interfaces.AccountStorage):
account_id, acc.id))
return acc
def save(self, account, acme):
self._save(account, acme, regr_only=False)
def save(self, account, client):
self._save(account, client, regr_only=False)
def save_regr(self, account, acme):
"""Save the registration resource.

View file

@ -374,7 +374,7 @@ def challb_to_achall(challb, account_key, domain):
return achallenges.DNS(challb=challb, domain=domain)
else:
raise errors.Error(
"Received unsupported challenge of type: %s", chall.typ)
"Received unsupported challenge of type: {0}".format(chall.typ))
def gen_challenge_path(challbs, preferences, combinations):
@ -405,8 +405,7 @@ def gen_challenge_path(challbs, preferences, combinations):
"""
if combinations:
return _find_smart_path(challbs, preferences, combinations)
else:
return _find_dumb_path(challbs, preferences)
return _find_dumb_path(challbs, preferences)
def _find_smart_path(challbs, preferences, combinations):

View file

@ -2,9 +2,9 @@
import datetime
import logging
import os
import pytz
import re
import traceback
import pytz
import zope.component
from certbot import crypto_util
@ -181,10 +181,9 @@ def _archive_files(candidate_lineage, filetype):
archive_dir = candidate_lineage.archive_dir
pattern = [os.path.join(archive_dir, f) for f in os.listdir(archive_dir)
if re.match("{0}[0-9]*.pem".format(filetype), f)]
if len(pattern) > 0:
if pattern:
return pattern
else:
return None
return None
def _acceptable_matches():
""" Generates the list that's passed to match_and_check_overlaps. Is its own function to

View file

@ -293,8 +293,7 @@ def config_help(name, hidden=False):
"""Extract the help message for an `.IConfig` attribute."""
if hidden:
return argparse.SUPPRESS
else:
return interfaces.IConfig[name].__doc__
return interfaces.IConfig[name].__doc__
class HelpfulArgumentGroup(object):
@ -510,7 +509,7 @@ class HelpfulArgumentParser(object):
# Help that are synonyms for --help subcommands
COMMANDS_TOPICS = ["command", "commands", "subcommand", "subcommands", "verbs"]
def _list_subcommands(self):
longest = max(len(v) for v in VERB_HELP_MAP.keys())
longest = max(len(v) for v in VERB_HELP_MAP)
text = "The full list of available SUBCOMMANDS is:\n\n"
for verb, props in sorted(VERB_HELP):
@ -538,7 +537,7 @@ class HelpfulArgumentParser(object):
apache_doc = "(the certbot apache plugin is not installed)"
usage = SHORT_USAGE
if help_arg == True:
if help_arg is True:
self.notify(usage + COMMAND_OVERVIEW % (apache_doc, nginx_doc) + HELP_USAGE)
sys.exit(0)
elif help_arg in self.COMMANDS_TOPICS:
@ -839,8 +838,7 @@ class HelpfulArgumentParser(object):
return dict([(t, True) for t in self.help_topics])
elif not chosen_topic:
return dict([(t, False) for t in self.help_topics])
else:
return dict([(t, t == chosen_topic) for t in self.help_topics])
return dict([(t, t == chosen_topic) for t in self.help_topics])
def _add_all_groups(helpful):
helpful.add_group("automation", description="Flags for automating execution & other tweaks")

View file

@ -293,7 +293,7 @@ class Client(object):
orderr = self._get_order_and_authorizations(csr.data, self.config.allow_subset_of_names)
authzr = orderr.authorizations
auth_domains = set(a.body.identifier.value for a in authzr)
auth_domains = set(a.body.identifier.value for a in authzr) # pylint: disable=not-an-iterable
successful_domains = [d for d in domains if d in auth_domains]
# allow_subset_of_names is currently disabled for wildcard
@ -362,11 +362,10 @@ class Client(object):
logger.debug("Dry run: Skipping creating new lineage for %s",
new_name)
return None
else:
return storage.RenewableCert.new_lineage(
new_name, cert,
key.pem, chain,
self.config)
return storage.RenewableCert.new_lineage(
new_name, cert,
key.pem, chain,
self.config)
def _choose_lineagename(self, domains, certname):
"""Chooses a name for the new lineage.
@ -385,8 +384,7 @@ class Client(object):
elif util.is_wildcard_domain(domains[0]):
# Don't make files and directories starting with *.
return domains[0][2:]
else:
return domains[0]
return domains[0]
def save_certificate(self, cert_pem, chain_pem,
cert_path, chain_path, fullchain_path):
@ -670,9 +668,8 @@ def _open_pem_file(cli_arg_path, pem_path):
if cli.set_by_cli(cli_arg_path):
return util.safe_open(pem_path, chmod=0o644, mode="wb"),\
os.path.abspath(pem_path)
else:
uniq = util.unique_file(pem_path, 0o644, "wb")
return uniq[0], os.path.abspath(uniq[1])
uniq = util.unique_file(pem_path, 0o644, "wb")
return uniq[0], os.path.abspath(uniq[1])
def _save_chain(chain_pem, chain_file):
"""Saves chain_pem at a unique path based on chain_path.

View file

@ -2,7 +2,7 @@
import copy
import os
from six.moves.urllib import parse # pylint: disable=import-error
from six.moves.urllib import parse # pylint: disable=import-error,relative-import
import zope.interface
from certbot import constants

View file

@ -83,8 +83,7 @@ def choose_account(accounts):
"Please choose an account", labels, force_interactive=True)
if code == display_util.OK:
return accounts[index]
else:
return None
return None
def choose_values(values, question=None):
"""Display screen to let user pick one or multiple values from the provided
@ -99,8 +98,7 @@ def choose_values(values, question=None):
question, tags=values, force_interactive=True)
if code == display_util.OK and items:
return items
else:
return []
return []
def choose_names(installer, question=None):
"""Display screen to select domains to validate.
@ -129,8 +127,7 @@ def choose_names(installer, question=None):
code, names = _filter_names(names, question)
if code == display_util.OK and names:
return names
else:
return []
return []
def get_valid_domains(domains):
@ -214,7 +211,7 @@ def _choose_names_manually(prompt_prefix=""):
except errors.ConfigurationError as e:
invalid_domains[domain] = str(e)
if len(invalid_domains):
if invalid_domains:
retry_message = (
"One or more of the entered domain names was not valid:"
"{0}{0}").format(os.linesep)

View file

@ -183,8 +183,7 @@ class FileDisplay(object):
if ans == "c" or ans == "C":
return CANCEL, "-1"
else:
return OK, ans
return OK, ans
def yesno(self, message, yes_label="Yes", no_label="No", default=None,
cli_flag=None, force_interactive=False, **unused_kwargs):
@ -260,7 +259,7 @@ class FileDisplay(object):
force_interactive=True)
if code == OK:
if len(ans.strip()) == 0:
if not ans.strip():
ans = " ".join(str(x) for x in range(1, len(tags)+1))
indices = separate_list_input(ans)
selected_tags = self._scrub_checklist_input(indices, tags)
@ -528,8 +527,7 @@ class NoninteractiveDisplay(object):
"""
if default is None:
self._interaction_fail(message, cli_flag)
else:
return OK, default
return OK, default
def yesno(self, message, yes_label=None, no_label=None,
default=None, cli_flag=None, **unused_kwargs):
@ -546,8 +544,7 @@ class NoninteractiveDisplay(object):
"""
if default is None:
self._interaction_fail(message, cli_flag)
else:
return default
return default
def checklist(self, message, tags, default=None,
cli_flag=None, **unused_kwargs):
@ -565,8 +562,7 @@ class NoninteractiveDisplay(object):
"""
if default is None:
self._interaction_fail(message, cli_flag, "? ".join(tags))
else:
return OK, default
return OK, default
def directory_select(self, message, default=None,
cli_flag=None, **unused_kwargs):

View file

@ -73,7 +73,7 @@ def _check_response(response):
logger.debug('Received response:\n%s', response.content)
try:
response.raise_for_status()
if response.json()['status'] == False:
if not response.json()['status']:
_report_failure('your e-mail address appears to be invalid')
except requests.exceptions.HTTPError:
_report_failure()

View file

@ -53,7 +53,7 @@ class ErrorHandler(object):
deferred until they finish.
"""
def __init__(self, func=None, *args, **kwargs):
def __init__(self, func, *args, **kwargs):
self.call_on_regular_exit = False
self.body_executed = False
self.funcs = []
@ -147,7 +147,6 @@ class ExitHandler(ErrorHandler):
In addition to cleaning up on all signals, also cleans up on
regular exit.
"""
def __init__(self, func=None, *args, **kwargs):
def __init__(self, func, *args, **kwargs):
ErrorHandler.__init__(self, func, *args, **kwargs)
self.call_on_regular_exit = True

View file

@ -73,7 +73,7 @@ class IPluginFactory(zope.interface.Interface):
description = zope.interface.Attribute("Short plugin description")
def __call__(config, name):
def __call__(config, name): # pylint: disable=signature-differs
"""Create new `IPlugin`.
:param IConfig config: Configuration.

View file

@ -180,8 +180,7 @@ class ColoredStreamHandler(logging.StreamHandler):
out = super(ColoredStreamHandler, self).format(record)
if self.colored and record.levelno >= self.red_level:
return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
else:
return out
return out
class MemoryHandler(logging.handlers.MemoryHandler):

View file

@ -175,6 +175,7 @@ def _handle_subset_cert_request(config, domains, cert):
raise errors.Error(USER_CANCELLED)
# pylint: disable=inconsistent-return-statements
def _handle_identical_cert_request(config, lineage):
"""Figure out what to do if a lineage has the same names as a previously obtained one
@ -222,8 +223,7 @@ def _handle_identical_cert_request(config, lineage):
return "reinstall", lineage
elif response[1] == 1:
return "renew", lineage
else:
assert False, "This is impossible"
assert False, "This is imporssible"
def _find_lineage_for_domains(config, domains):
"""Determine whether there are duplicated names and how to handle
@ -262,6 +262,7 @@ def _find_lineage_for_domains(config, domains):
return _handle_identical_cert_request(config, ident_names_cert)
elif subset_names_cert is not None:
return _handle_subset_cert_request(config, domains, subset_names_cert)
return None, None
def _find_cert(config, domains, certname):
"""Finds an existing certificate object given domains and/or a certificate name.
@ -340,7 +341,7 @@ def _get_added_removed(after, before):
def _format_list(character, strings):
"""Format list with given character
"""
if len(strings) == 0:
if not strings:
formatted = "{br}(None)"
else:
formatted = "{br}{ch} " + "{br}{ch} ".join(strings)
@ -515,6 +516,7 @@ def _determine_account(config):
raise errors.Error(
"Registration cannot proceed without accepting "
"Terms of Service.")
return None
try:
acc, acme = client.register(
config, account_storage, tos_cb=_tos_cb)
@ -686,6 +688,7 @@ def unregister(config, unused_plugins):
account_files.delete(config.account)
reporter_util.add_message("Account deactivated.", reporter_util.MEDIUM_PRIORITY)
return None
def register(config, unused_plugins):
@ -710,7 +713,7 @@ def register(config, unused_plugins):
# registering a new account
if not config.update_registration:
if len(accounts) > 0:
if accounts:
# TODO: add a flag to register a duplicate account (this will
# also require extending _determine_account's behavior
# or else extracting the registration code from there)
@ -719,10 +722,10 @@ def register(config, unused_plugins):
"unsupported.")
# _determine_account will register an account
_determine_account(config)
return
return None
# --update-registration
if len(accounts) == 0:
if not accounts:
return "Could not find an existing account to update."
if config.email is None:
if config.register_unsafely_without_email:
@ -739,6 +742,7 @@ def register(config, unused_plugins):
account_storage.save_regr(acc, cb_client.acme)
eff.handle_subscription(config)
add_msg("Your e-mail address was updated to {0}.".format(config.email))
return None
def _install_cert(config, le_client, domains, lineage=None):
"""Install a cert
@ -802,6 +806,7 @@ def install(config, plugins):
raise errors.ConfigurationError("Path to certificate or key was not defined. "
"If your certificate is managed by Certbot, please use --cert-name "
"to define which certificate you would like to install.")
return None
def _populate_from_certname(config):
"""Helper function for install to populate missing config values from lineage
@ -911,6 +916,7 @@ def enhance(config, plugins):
config.chain_path = lineage.chain_path
le_client = _init_le_client(config, authenticator=None, installer=installer)
le_client.enhance_config(domains, config.chain_path, ask_redirect=False)
return None
def rollback(config, plugins):
@ -1050,6 +1056,7 @@ def revoke(config, unused_plugins): # TODO: coop with renewal config
return str(e)
display_ops.success_revocation(config.cert_path[0])
return None
def run(config, plugins): # pylint: disable=too-many-branches,too-many-locals
@ -1096,6 +1103,7 @@ def run(config, plugins): # pylint: disable=too-many-branches,too-many-locals
display_ops.success_renewal(domains)
_suggest_donation_if_appropriate(config)
return None
def _csr_get_and_save_cert(config, le_client):
@ -1280,7 +1288,7 @@ def set_displayer(config):
zope.component.provideUtility(displayer)
def main(cli_args=sys.argv[1:]):
def main(cli_args=None):
"""Command line argument parsing and main script execution.
:returns: result of requested command
@ -1289,6 +1297,8 @@ def main(cli_args=sys.argv[1:]):
:raises errors.Error: error if plugin command is not supported
"""
if cli_args is None:
cli_args = sys.argv[1:]
log.pre_arg_parse_setup()
plugins = plugins_disco.PluginsRegistry.find_all()

View file

@ -89,9 +89,8 @@ class RevocationChecker(object):
host = url.partition("://")[2].rstrip("/")
if host:
return url, host
else:
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
return None, None
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
return None, None
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
"""Parse openssl's weird output to work out what it means."""
@ -117,4 +116,3 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
logger.warn("Unable to properly parse OCSP output: %s\nstderr:%s",
ocsp_output, ocsp_errors)
return False

View file

@ -299,9 +299,8 @@ class Addr(object):
# too long, truncate
addr_list = addr_list[0:len(result)]
append_to_end = False
for i in range(0, len(addr_list)):
block = addr_list[i]
if len(block) == 0:
for i, block in enumerate(addr_list):
if not block:
# encountered ::, so rest of the blocks should be
# appended to the end
append_to_end = True

View file

@ -2,11 +2,10 @@
import collections
import itertools
import logging
import pkg_resources
import six
from collections import OrderedDict
import zope.interface
import zope.interface.verify
@ -184,7 +183,11 @@ class PluginsRegistry(collections.Mapping):
# This prevents deadlock caused by plugins acquiring a lock
# and ensures at least one concurrent Certbot instance will run
# successfully.
self._plugins = OrderedDict(sorted(six.iteritems(plugins)))
# Pylint checks for super init, but also claims the super
# has no __init__member
# pylint: disable=super-init-not-called
self._plugins = collections.OrderedDict(sorted(six.iteritems(plugins)))
@classmethod
def find_all(cls):
@ -233,7 +236,6 @@ class PluginsRegistry(collections.Mapping):
def ifaces(self, *ifaces_groups):
"""Filter plugins based on interfaces."""
# pylint: disable=star-args
return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups))
def verify(self, ifaces):
@ -269,8 +271,7 @@ class PluginsRegistry(collections.Mapping):
assert len(candidates) <= 1
if candidates:
return candidates[0]
else:
return None
return None
def __repr__(self):
return "{0}({1})".format(

View file

@ -83,7 +83,7 @@ class DNSAuthenticator(common.Plugin):
raise NotImplementedError()
@abc.abstractmethod
def _perform(self, domain, validation_domain_name, validation): # pragma: no cover
def _perform(self, domain, validation_name, validation): # pragma: no cover
"""
Performs a dns-01 challenge by creating a DNS TXT record.
@ -95,7 +95,7 @@ class DNSAuthenticator(common.Plugin):
raise NotImplementedError()
@abc.abstractmethod
def _cleanup(self, domain, validation_domain_name, validation): # pragma: no cover
def _cleanup(self, domain, validation_name, validation): # pragma: no cover
"""
Deletes the DNS TXT record which would have been created by `_perform_achall`.

View file

@ -95,3 +95,4 @@ class LexiconClient(object):
if not str(e).startswith('No domain found'):
return errors.PluginError('Unexpected error determining zone identifier for {0}: {1}'
.format(domain_name, e))
return None

View file

@ -22,10 +22,6 @@ class DNSAuthenticatorTest(util.TempDirTestCase, dns_test_common.BaseAuthenticat
_perform = mock.MagicMock()
_cleanup = mock.MagicMock()
def __init__(self, *args, **kwargs):
# pylint: disable=protected-access
super(DNSAuthenticatorTest._FakeDNSAuthenticator, self).__init__(*args, **kwargs)
def more_info(self): # pylint: disable=missing-docstring,no-self-use
return 'A fake authenticator for testing.'

View file

@ -137,8 +137,7 @@ class AuthenticatorTest(test_util.TempDirTestCase):
self.auth.cleanup([achall])
self.assertEqual(os.environ['CERTBOT_AUTH_OUTPUT'], 'foo')
self.assertEqual(os.environ['CERTBOT_DOMAIN'], achall.domain)
if (isinstance(achall.chall, challenges.HTTP01) or
isinstance(achall.chall, challenges.DNS01)):
if isinstance(achall.chall, (challenges.HTTP01, challenges.DNS01)):
self.assertEqual(
os.environ['CERTBOT_VALIDATION'],
achall.validation(achall.account_key))

View file

@ -82,8 +82,7 @@ def pick_plugin(config, default, plugins, question, ifaces):
plugin_ep = choose_plugin(list(six.itervalues(prepared)), question)
if plugin_ep is None:
return None
else:
return plugin_ep.init()
return plugin_ep.init()
elif len(prepared) == 1:
plugin_ep = list(prepared.values())[0]
logger.debug("Single candidate plugin: %s", plugin_ep)

View file

@ -225,7 +225,8 @@ class Authenticator(common.Plugin):
try:
return self._perform_single(achall)
except errors.StandaloneBindError as error:
_handle_perform_error(error)
if not _handle_perform_error(error):
raise
def _perform_single(self, achall):
if isinstance(achall.chall, challenges.HTTP01):
@ -282,5 +283,5 @@ def _handle_perform_error(error):
"Cancel", default=False)
if not should_retry:
raise errors.PluginError(msg)
else:
raise
return True
return False

View file

@ -1,8 +1,8 @@
"""Tests for certbot.plugins.storage.PluginStorage"""
import json
import mock
import os
import unittest
import mock
from certbot import errors

View file

@ -17,7 +17,7 @@ def get_prefixes(path):
"""
prefix = path
prefixes = []
while len(prefix) > 0:
while prefix:
prefixes.append(prefix)
prefix, _ = os.path.split(prefix)
# break once we hit '/'
@ -49,8 +49,7 @@ def path_surgery(cmd):
if util.exe_exists(cmd):
return True
else:
expanded = " expanded" if any(added) else ""
logger.warning("Failed to find executable %s in%s PATH: %s", cmd,
expanded, path)
return False
expanded = " expanded" if any(added) else ""
logger.warning("Failed to find executable %s in%s PATH: %s", cmd,
expanded, path)
return False

View file

@ -220,7 +220,7 @@ to serve all files under specified web root ({0})."""
self.performed[root_path].remove(achall)
not_removed = []
while len(self._created_dirs) > 0:
while self._created_dirs:
path = self._created_dirs.pop()
try:
os.rmdir(path)

View file

@ -146,7 +146,7 @@ class Reverter(object):
if not backups:
logger.info("Certbot has not saved backups of your configuration")
return
return None
# Make sure there isn't anything unexpected in the backup folder
# There should only be timestamped (float) directories
try:
@ -182,6 +182,7 @@ class Reverter(object):
return os.linesep.join(output)
zope.component.getUtility(interfaces.IDisplay).notification(
os.linesep.join(output), force_interactive=True, pause=False)
return None
def add_to_temp_checkpoint(self, save_files, save_notes):
"""Add files to temporary checkpoint.

View file

@ -5,11 +5,11 @@ import logging
import os
import re
import stat
import shutil
import configobj
import parsedatetime
import pytz
import shutil
import six
import certbot
@ -272,8 +272,7 @@ def full_archive_path(config_obj, cli_config, lineagename):
"""
if config_obj and "archive_dir" in config_obj:
return config_obj["archive_dir"]
else:
return os.path.join(cli_config.default_archive_dir, lineagename)
return os.path.join(cli_config.default_archive_dir, lineagename)
def _full_live_path(cli_config, lineagename):
"""Returns the full default live path for a lineagename"""
@ -481,8 +480,7 @@ class RenewableCert(object):
server = self.configuration["renewalparams"].get("server", None)
if server:
return util.is_staging(server)
else:
return False
return False
def _check_symlinks(self):
"""Raises an exception if a symlink doesn't exist"""
@ -671,9 +669,8 @@ class RenewableCert(object):
matches = pattern.match(os.path.basename(target))
if matches:
return int(matches.groups()[0])
else:
logger.debug("No matches for target %s.", kind)
return None
logger.debug("No matches for target %s.", kind)
return None
def version(self, kind, version):
"""The filename that corresponds to the specified version and kind.

View file

@ -42,7 +42,7 @@ def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
if status == messages.STATUS_VALID:
kwargs.update({"validated": datetime.datetime.now()})
return messages.ChallengeBody(**kwargs) # pylint: disable=star-args
return messages.ChallengeBody(**kwargs)
# Pending ChallengeBody objects
@ -93,7 +93,6 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
"status": authz_status,
})
# pylint: disable=star-args
return messages.AuthorizationResource(
uri="https://trusted.ca/new-authz-resource",
body=messages.Authorization(**authz_kwargs)

View file

@ -518,21 +518,18 @@ class ReportFailedChallsTest(unittest.TestCase):
self.assertTrue(kwargs["error"].description is not None)
self.http01 = achallenges.KeyAuthorizationAnnotatedChallenge(
# pylint: disable=star-args
challb=messages.ChallengeBody(**kwargs),
domain="example.com",
account_key="key")
kwargs["chall"] = acme_util.TLSSNI01
self.tls_sni_same = achallenges.KeyAuthorizationAnnotatedChallenge(
# pylint: disable=star-args
challb=messages.ChallengeBody(**kwargs),
domain="example.com",
account_key="key")
kwargs["error"] = messages.Error(typ="dnssec", detail="detail")
self.tls_sni_diff = achallenges.KeyAuthorizationAnnotatedChallenge(
# pylint: disable=star-args
challb=messages.ChallengeBody(**kwargs),
domain="foo.bar",
account_key="key")

View file

@ -205,9 +205,11 @@ class CertificatesTest(BaseCertManagerTest):
@mock.patch('certbot.cert_manager.ocsp.RevocationChecker.ocsp_revoked')
def test_report_human_readable(self, mock_revoked):
# pylint: disable=too-many-statements
mock_revoked.return_value = None
from certbot import cert_manager
import datetime, pytz
import datetime
import pytz
expiry = pytz.UTC.fromutc(datetime.datetime.utcnow())
cert = mock.MagicMock(lineagename="nameone")
@ -229,20 +231,20 @@ class CertificatesTest(BaseCertManagerTest):
# pylint: disable=protected-access
out = get_report()
self.assertTrue('1 hour(s)' in out)
self.assertTrue('VALID' in out and not 'INVALID' in out)
self.assertTrue('VALID' in out and 'INVALID' not in out)
cert.target_expiry += datetime.timedelta(days=1)
# pylint: disable=protected-access
out = get_report()
self.assertTrue('1 day' in out)
self.assertFalse('under' in out)
self.assertTrue('VALID' in out and not 'INVALID' in out)
self.assertTrue('VALID' in out and 'INVALID' not in out)
cert.target_expiry += datetime.timedelta(days=2)
# pylint: disable=protected-access
out = get_report()
self.assertTrue('3 days' in out)
self.assertTrue('VALID' in out and not 'INVALID' in out)
self.assertTrue('VALID' in out and 'INVALID' not in out)
cert.is_test_cert = True
mock_revoked.return_value = True

View file

@ -110,7 +110,6 @@ class ClientTestCommon(test_util.ConfigTestCase):
self.config.no_verify_ssl = False
self.config.allow_subset_of_names = False
# pylint: disable=star-args
self.account = mock.MagicMock(**{"key.pem": KEY})
from certbot.client import Client

View file

@ -189,9 +189,6 @@ class VerifyCertSetup(unittest.TestCase):
class VerifyRenewableCertTest(VerifyCertSetup):
"""Tests for certbot.crypto_util.verify_renewable_cert."""
def setUp(self):
super(VerifyRenewableCertTest, self).setUp()
def _call(self, renewable_cert):
from certbot.crypto_util import verify_renewable_cert
return verify_renewable_cert(renewable_cert)
@ -207,9 +204,6 @@ class VerifyRenewableCertTest(VerifyCertSetup):
class VerifyRenewableCertSigTest(VerifyCertSetup):
"""Tests for certbot.crypto_util.verify_renewable_cert."""
def setUp(self):
super(VerifyRenewableCertSigTest, self).setUp()
def _call(self, renewable_cert):
from certbot.crypto_util import verify_renewable_cert_sig
return verify_renewable_cert_sig(renewable_cert)
@ -225,9 +219,6 @@ class VerifyRenewableCertSigTest(VerifyCertSetup):
class VerifyFullchainTest(VerifyCertSetup):
"""Tests for certbot.crypto_util.verify_fullchain."""
def setUp(self):
super(VerifyFullchainTest, self).setUp()
def _call(self, renewable_cert):
from certbot.crypto_util import verify_fullchain
return verify_fullchain(renewable_cert)
@ -246,9 +237,6 @@ class VerifyFullchainTest(VerifyCertSetup):
class VerifyCertMatchesPrivKeyTest(VerifyCertSetup):
"""Tests for certbot.crypto_util.verify_cert_matches_priv_key."""
def setUp(self):
super(VerifyCertMatchesPrivKeyTest, self).setUp()
def _call(self, renewable_cert):
from certbot.crypto_util import verify_cert_matches_priv_key
return verify_cert_matches_priv_key(renewable_cert.cert, renewable_cert.privkey)

View file

@ -43,7 +43,7 @@ class GetEmailTest(unittest.TestCase):
mock_input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("certbot.display.ops.util.safe_email") as mock_safe_email:
mock_safe_email.return_value = True
self.assertTrue(self._call() is "foo@bar.baz")
self.assertTrue(self._call() == "foo@bar.baz")
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_ok_not_safe(self, mock_get_utility):
@ -51,7 +51,7 @@ class GetEmailTest(unittest.TestCase):
mock_input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("certbot.display.ops.util.safe_email") as mock_safe_email:
mock_safe_email.side_effect = [False, True]
self.assertTrue(self._call() is "foo@bar.baz")
self.assertTrue(self._call() == "foo@bar.baz")
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_invalid_flag(self, mock_get_utility):

View file

@ -226,7 +226,6 @@ class FileOutputDisplayTest(unittest.TestCase):
@mock.patch("certbot.display.util.input_with_timeout")
def test_directory_select(self, mock_input):
# pylint: disable=star-args
args = ["msg", "/var/www/html", "--flag", True]
user_input = "/var/www/html"
mock_input.return_value = user_input
@ -315,7 +314,9 @@ class FileOutputDisplayTest(unittest.TestCase):
def test_methods_take_force_interactive(self):
# Every IDisplay method implemented by FileDisplay must take
# force_interactive to prevent workflow regressions.
for name in interfaces.IDisplay.names(): # pylint: disable=no-member
# Use pylint code for disable to keep on single line under line length limit
for name in interfaces.IDisplay.names(): # pylint: disable=no-member,E1120
arg_spec = inspect.getargspec(getattr(self.displayer, name))
self.assertTrue("force_interactive" in arg_spec.args)
@ -374,7 +375,9 @@ class NoninteractiveDisplayTest(unittest.TestCase):
# should take **kwargs because every method of FileDisplay must
# take force_interactive which doesn't apply to
# NoninteractiveDisplay.
for name in interfaces.IDisplay.names(): # pylint: disable=no-member
# Use pylint code for disable to keep on single line under line length limit
for name in interfaces.IDisplay.names(): # pylint: disable=no-member,E1120
method = getattr(self.displayer, name)
# asserts method accepts arbitrary keyword arguments
self.assertFalse(inspect.getargspec(method).keywords is None)

View file

@ -1,8 +1,8 @@
"""Tests for certbot.eff."""
import requests
import unittest
import mock
import requests
from certbot import constants
import certbot.tests.util as test_util

View file

@ -280,7 +280,6 @@ class PreArgParseExceptHookTest(unittest.TestCase):
@mock.patch('certbot.log.post_arg_parse_except_hook')
def test_it(self, mock_post_arg_parse_except_hook):
# pylint: disable=star-args
memory_handler = mock.MagicMock()
args = ('some', 'args',)
kwargs = {'some': 'kwargs'}
@ -356,7 +355,6 @@ class PostArgParseExceptHookTest(unittest.TestCase):
mock_logger.error.side_effect = write_err
with mock.patch('certbot.log.sys.stderr', mock_err):
try:
# pylint: disable=star-args
self._call(
*exc_info, debug=debug, log_path=self.log_path)
except SystemExit as exit_err:
@ -406,13 +404,13 @@ class ExitWithLogPathTest(test_util.TempDirTestCase):
self.assertTrue('logfiles' in err_str)
self.assertTrue(self.tempdir in err_str)
# pylint: disable=inconsistent-return-statements
def _test_common(self, *args, **kwargs):
try:
self._call(*args, **kwargs)
except SystemExit as err:
return str(err)
else: # pragma: no cover
self.fail('SystemExit was not raised.')
self.fail('SystemExit was not raised.') # pragma: no cover
if __name__ == "__main__":

View file

@ -4,12 +4,12 @@
from __future__ import print_function
import itertools
import mock
import os
import shutil
import traceback
import unittest
import datetime
import mock
import pytz
import josepy as jose
@ -606,8 +606,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
fn.endswith("chain") or
fn.endswith("privkey")):
return True
else:
return orig_open(fn, *args, **kwargs)
return orig_open(fn, *args, **kwargs)
with mock.patch("os.path.isfile") as mock_if:
mock_if.side_effect = mock_isfile

View file

@ -90,7 +90,7 @@ class OCSPTest(unittest.TestCase):
@mock.patch('certbot.ocsp.logger')
@mock.patch('certbot.util.run_script')
def test_translate_ocsp(self, mock_run, mock_log):
# pylint: disable=protected-access,star-args
# pylint: disable=protected-access
mock_run.return_value = openssl_confused
from certbot import ocsp
self.assertEqual(ocsp._translate_ocsp_query(*openssl_happy), False)

View file

@ -1,6 +1,6 @@
"""Tests for certbot.renewal"""
import mock
import unittest
import mock
from acme import challenges
@ -12,9 +12,6 @@ import certbot.tests.util as test_util
class RenewalTest(test_util.ConfigTestCase):
def setUp(self):
super(RenewalTest, self).setUp()
@mock.patch('certbot.cli.set_by_cli')
def test_ancient_webroot_renewal_conf(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@ -34,9 +31,6 @@ class RenewalTest(test_util.ConfigTestCase):
class RestoreRequiredConfigElementsTest(test_util.ConfigTestCase):
"""Tests for certbot.renewal.restore_required_config_elements."""
def setUp(self):
super(RestoreRequiredConfigElementsTest, self).setUp()
@classmethod
def _call(cls, *args, **kwargs):
from certbot.renewal import restore_required_config_elements

View file

@ -1,7 +1,7 @@
"""Tests for certbot.reporter."""
import mock
import sys
import unittest
import mock
import six

View file

@ -5,10 +5,10 @@
"""
import multiprocessing
import os
import pkg_resources
import shutil
import tempfile
import unittest
import pkg_resources
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
@ -101,8 +101,7 @@ def skip_unless(condition, reason): # pragma: no cover
return unittest.skipUnless(condition, reason)
elif condition:
return lambda cls: cls
else:
return lambda cls: None
return lambda cls: None
def make_lineage(config_dir, testfile):
@ -254,13 +253,13 @@ class FreezableMock(object):
if name in ('return_value', 'side_effect'):
return setattr(self._mock, name, value)
else:
return object.__setattr__(self, name, value)
return object.__setattr__(self, name, value)
def _create_get_utility_mock():
display = FreezableMock()
for name in interfaces.IDisplay.names(): # pylint: disable=no-member
# Use pylint code for disable to keep on single line under line length limit
for name in interfaces.IDisplay.names(): # pylint: disable=no-member,E1120
if name != 'notification':
frozen_mock = FreezableMock(frozen=True, func=_assert_valid_call)
setattr(display, name, frozen_mock)
@ -284,7 +283,8 @@ def _create_get_utility_mock_with_stdout(stdout):
display = FreezableMock()
for name in interfaces.IDisplay.names(): # pylint: disable=no-member
# Use pylint code for disable to keep on single line under line length limit
for name in interfaces.IDisplay.names(): # pylint: disable=no-member,E1120
if name == 'notification':
frozen_mock = FreezableMock(frozen=True,
func=_write_msg)
@ -306,7 +306,6 @@ def _assert_valid_call(*args, **kwargs):
assert_kwargs['cli_flag'] = kwargs.get('cli_flag', None)
assert_kwargs['force_interactive'] = kwargs.get('force_interactive', False)
# pylint: disable=star-args
display_util.assert_valid_call(*assert_args, **assert_kwargs)

View file

@ -551,7 +551,7 @@ class OsInfoTest(unittest.TestCase):
comm_mock = mock.Mock()
comm_attrs = {'communicate.return_value':
('42.42.42', 'error')}
comm_mock.configure_mock(**comm_attrs) # pylint: disable=star-args
comm_mock.configure_mock(**comm_attrs)
popen_mock.return_value = comm_mock
self.assertEqual(get_os_info()[0], 'darwin')
self.assertEqual(get_os_info()[1], '42.42.42')
@ -607,7 +607,7 @@ class AtexitRegisterTest(unittest.TestCase):
self.assertTrue(mock_atexit.register.called)
args, kwargs = mock_atexit.register.call_args
atexit_func = args[0]
atexit_func(*args[1:], **kwargs) # pylint: disable=star-args
atexit_func(*args[1:], **kwargs)
if __name__ == "__main__":

View file

@ -2,6 +2,7 @@
import argparse
import atexit
import collections
from collections import OrderedDict
# distutils.version under virtualenv confuses pylint
# For more info, see: https://github.com/PyCQA/pylint/issues/73
import distutils.version # pylint: disable=import-error,no-name-in-module
@ -10,14 +11,12 @@ import logging
import os
import platform
import re
import six
import socket
import stat
import subprocess
import sys
from collections import OrderedDict
import six
import configargparse
from certbot import constants
@ -217,7 +216,6 @@ def safe_open(path, mode="w", chmod=None, buffering=None):
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
@ -465,9 +463,8 @@ def safe_email(email):
"""Scrub email address before using it."""
if EMAIL_REGEX.match(email) is not None:
return not email.startswith(".") and ".." not in email
else:
logger.warning("Invalid email address: %s.", email)
return False
logger.warning("Invalid email address: %s.", email)
return False
class _ShowWarning(argparse.Action):

View file

@ -2,11 +2,11 @@
import argparse
import functools
import os
import pkg_resources
import subprocess
import tarfile
import tempfile
import unittest
import pkg_resources
import mock
# six is used in mock.patch()