mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Merge pull request #3375 from yan12125/python3-certbot-unittests
Enable unit tests of certbot core on Python 3
This commit is contained in:
commit
8aadacbbb3
26 changed files with 142 additions and 99 deletions
|
|
@ -8,6 +8,7 @@ import socket
|
|||
from cryptography.hazmat.primitives import serialization
|
||||
import pyrfc3339
|
||||
import pytz
|
||||
import six
|
||||
import zope.component
|
||||
|
||||
from acme import fields as acme_fields
|
||||
|
|
@ -108,7 +109,7 @@ class AccountMemoryStorage(interfaces.AccountStorage):
|
|||
self.accounts = initial_accounts if initial_accounts is not None else {}
|
||||
|
||||
def find_all(self):
|
||||
return self.accounts.values()
|
||||
return list(six.itervalues(self.accounts))
|
||||
|
||||
def save(self, account):
|
||||
if account.id in self.accounts:
|
||||
|
|
|
|||
|
|
@ -246,8 +246,7 @@ class Client(object):
|
|||
domains,
|
||||
self.config.allow_subset_of_names)
|
||||
|
||||
auth_domains = set(a.body.identifier.value.encode('ascii')
|
||||
for a in authzr)
|
||||
auth_domains = set(a.body.identifier.value for a in authzr)
|
||||
domains = [d for d in domains if d in auth_domains]
|
||||
|
||||
# Create CSR from names
|
||||
|
|
@ -317,7 +316,7 @@ class Client(object):
|
|||
self.config.strict_permissions)
|
||||
|
||||
cert_pem = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped)
|
||||
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped).decode('ascii')
|
||||
|
||||
cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path)
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import traceback
|
|||
|
||||
import OpenSSL
|
||||
import pyrfc3339
|
||||
import six
|
||||
import zope.component
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
|
|
@ -115,16 +116,16 @@ def make_csr(key_str, domains, must_staple=False):
|
|||
# TODO: put SAN if len(domains) > 1
|
||||
extensions = [
|
||||
OpenSSL.crypto.X509Extension(
|
||||
"subjectAltName",
|
||||
b"subjectAltName",
|
||||
critical=False,
|
||||
value=", ".join("DNS:%s" % d for d in domains)
|
||||
value=", ".join("DNS:%s" % d for d in domains).encode('ascii')
|
||||
)
|
||||
]
|
||||
if must_staple:
|
||||
extensions.append(OpenSSL.crypto.X509Extension(
|
||||
"1.3.6.1.5.5.7.1.24",
|
||||
b"1.3.6.1.5.5.7.1.24",
|
||||
critical=False,
|
||||
value="DER:30:03:02:01:05"))
|
||||
value=b"DER:30:03:02:01:05"))
|
||||
req.add_extensions(extensions)
|
||||
req.set_version(2)
|
||||
req.set_pubkey(pkey)
|
||||
|
|
@ -350,7 +351,7 @@ def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
|
|||
if isinstance(cert, jose.ComparableX509):
|
||||
# pylint: disable=protected-access
|
||||
cert = cert.wrapped
|
||||
return OpenSSL.crypto.dump_certificate(filetype, cert)
|
||||
return OpenSSL.crypto.dump_certificate(filetype, cert).decode('ascii')
|
||||
|
||||
# assumes that OpenSSL.crypto.dump_certificate includes ending
|
||||
# newline character
|
||||
|
|
@ -395,8 +396,14 @@ def _notAfterBefore(cert_path, method):
|
|||
with open(cert_path) as f:
|
||||
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
|
||||
f.read())
|
||||
# pyopenssl always returns bytes
|
||||
timestamp = method(x509)
|
||||
reformatted_timestamp = [timestamp[0:4], "-", timestamp[4:6], "-",
|
||||
timestamp[6:8], "T", timestamp[8:10], ":",
|
||||
timestamp[10:12], ":", timestamp[12:]]
|
||||
return pyrfc3339.parse("".join(reformatted_timestamp))
|
||||
reformatted_timestamp = [timestamp[0:4], b"-", timestamp[4:6], b"-",
|
||||
timestamp[6:8], b"T", timestamp[8:10], b":",
|
||||
timestamp[10:12], b":", timestamp[12:]]
|
||||
timestamp_str = b"".join(reformatted_timestamp)
|
||||
# pyrfc3339 uses "native" strings. That is, bytes on Python 2 and unicode
|
||||
# on Python 3
|
||||
if six.PY3:
|
||||
timestamp_str = timestamp_str.decode('ascii')
|
||||
return pyrfc3339.parse(timestamp_str)
|
||||
|
|
|
|||
|
|
@ -180,7 +180,12 @@ def _choose_names_manually():
|
|||
try:
|
||||
domain_list[i] = util.enforce_domain_sanity(domain)
|
||||
except errors.ConfigurationError as e:
|
||||
invalid_domains[domain] = e.message
|
||||
try: # Python 2
|
||||
# pylint: disable=no-member
|
||||
err_msg = e.message.encode('utf-8')
|
||||
except AttributeError:
|
||||
err_msg = str(e)
|
||||
invalid_domains[domain] = err_msg
|
||||
|
||||
if len(invalid_domains):
|
||||
retry_message = (
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import os
|
|||
import textwrap
|
||||
|
||||
import dialog
|
||||
import six
|
||||
import zope.interface
|
||||
|
||||
from certbot import interfaces
|
||||
|
|
@ -253,7 +254,7 @@ class FileDisplay(object):
|
|||
"{line}{frame}{line}{msg}{line}{frame}{line}".format(
|
||||
line=os.linesep, frame=side_frame, msg=message))
|
||||
if pause:
|
||||
raw_input("Press Enter to Continue")
|
||||
six.moves.input("Press Enter to Continue")
|
||||
|
||||
def menu(self, message, choices, ok_label="", cancel_label="",
|
||||
help_label="", **unused_kwargs):
|
||||
|
|
@ -295,7 +296,7 @@ class FileDisplay(object):
|
|||
:rtype: tuple
|
||||
|
||||
"""
|
||||
ans = raw_input(
|
||||
ans = six.moves.input(
|
||||
textwrap.fill(
|
||||
"%s (Enter 'c' to cancel): " % message,
|
||||
80,
|
||||
|
|
@ -330,7 +331,7 @@ class FileDisplay(object):
|
|||
os.linesep, frame=side_frame, msg=message))
|
||||
|
||||
while True:
|
||||
ans = raw_input("{yes}/{no}: ".format(
|
||||
ans = six.moves.input("{yes}/{no}: ".format(
|
||||
yes=_parens_around_char(yes_label),
|
||||
no=_parens_around_char(no_label)))
|
||||
|
||||
|
|
@ -468,7 +469,7 @@ class FileDisplay(object):
|
|||
input_msg = ("Press 1 [enter] to confirm the selection "
|
||||
"(press 'c' to cancel): ")
|
||||
while selection < 1:
|
||||
ans = raw_input(input_msg)
|
||||
ans = six.moves.input(input_msg)
|
||||
if ans.startswith("c") or ans.startswith("C"):
|
||||
return CANCEL, -1
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -255,4 +255,4 @@ class PluginsRegistry(collections.Mapping):
|
|||
def __str__(self):
|
||||
if not self._plugins:
|
||||
return "No plugins"
|
||||
return "\n\n".join(str(p_ep) for p_ep in self._plugins.itervalues())
|
||||
return "\n\n".join(str(p_ep) for p_ep in six.itervalues(self._plugins))
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import unittest
|
|||
|
||||
import mock
|
||||
import pkg_resources
|
||||
import six
|
||||
import zope.interface
|
||||
|
||||
from certbot import errors
|
||||
|
|
@ -50,7 +51,7 @@ class PluginEntryPointTest(unittest.TestCase):
|
|||
EP_SA: "sa",
|
||||
}
|
||||
|
||||
for entry_point, name in names.iteritems():
|
||||
for entry_point, name in six.iteritems(names):
|
||||
self.assertEqual(
|
||||
name, PluginEntryPoint.entry_point_to_plugin_name(entry_point))
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ class AuthenticatorTest(unittest.TestCase):
|
|||
@mock.patch("certbot.plugins.manual.zope.component.getUtility")
|
||||
@mock.patch("certbot.plugins.manual.sys.stdout")
|
||||
@mock.patch("acme.challenges.HTTP01Response.simple_verify")
|
||||
@mock.patch("__builtin__.raw_input")
|
||||
@mock.patch("six.moves.input")
|
||||
def test_perform(self, mock_raw_input, mock_verify, mock_stdout, mock_interaction):
|
||||
mock_verify.return_value = True
|
||||
mock_interaction().yesno.return_value = True
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from __future__ import print_function
|
|||
import os
|
||||
import logging
|
||||
|
||||
import six
|
||||
import zope.component
|
||||
|
||||
from certbot import errors
|
||||
|
|
@ -78,7 +79,7 @@ def pick_plugin(config, default, plugins, question, ifaces):
|
|||
|
||||
if len(prepared) > 1:
|
||||
logger.debug("Multiple candidate plugins: %s", prepared)
|
||||
plugin_ep = choose_plugin(prepared.values(), question)
|
||||
plugin_ep = choose_plugin(list(six.itervalues(prepared)), question)
|
||||
if plugin_ep is None:
|
||||
return None
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ class AlreadyListeningTestNoPsutil(unittest.TestCase):
|
|||
def test_ports_available(self, mock_getutil):
|
||||
import certbot.plugins.util as plugins_util
|
||||
# Ensure we don't get error
|
||||
with mock.patch("socket._socketobject.bind"):
|
||||
with mock.patch("socket.socket.bind"):
|
||||
self.assertFalse(plugins_util.already_listening(80))
|
||||
self.assertFalse(plugins_util.already_listening(80, True))
|
||||
self.assertEqual(mock_getutil.call_count, 0)
|
||||
|
|
@ -73,7 +73,7 @@ class AlreadyListeningTestNoPsutil(unittest.TestCase):
|
|||
sys.modules["psutil"] = None
|
||||
import certbot.plugins.util as plugins_util
|
||||
import socket
|
||||
with mock.patch("socket._socketobject.bind", side_effect=socket.error):
|
||||
with mock.patch("socket.socket.bind", side_effect=socket.error):
|
||||
self.assertTrue(plugins_util.already_listening(80))
|
||||
self.assertTrue(plugins_util.already_listening(80, True))
|
||||
with mock.patch("socket.socket", side_effect=socket.error):
|
||||
|
|
|
|||
|
|
@ -206,7 +206,7 @@ to serve all files under specified web root ({0})."""
|
|||
old_umask = os.umask(0o022)
|
||||
|
||||
try:
|
||||
with open(validation_path, "w") as validation_file:
|
||||
with open(validation_path, "wb") as validation_file:
|
||||
validation_file.write(validation.encode())
|
||||
finally:
|
||||
os.umask(old_umask)
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import shutil
|
|||
import time
|
||||
import traceback
|
||||
|
||||
|
||||
import six
|
||||
import zope.component
|
||||
|
||||
from certbot import constants
|
||||
|
|
@ -310,7 +310,9 @@ class Reverter(object):
|
|||
|
||||
def _run_undo_commands(self, filepath): # pylint: disable=no-self-use
|
||||
"""Run all commands in a file."""
|
||||
with open(filepath, 'rb') as csvfile:
|
||||
# NOTE: csv module uses native strings. That is, bytes on Python 2 and
|
||||
# unicode on Python 3
|
||||
with open(filepath, 'r') as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
for command in reversed(list(csvreader)):
|
||||
try:
|
||||
|
|
@ -408,9 +410,9 @@ class Reverter(object):
|
|||
command_file = None
|
||||
try:
|
||||
if os.path.isfile(commands_fp):
|
||||
command_file = open(commands_fp, "ab")
|
||||
command_file = open(commands_fp, "a")
|
||||
else:
|
||||
command_file = open(commands_fp, "wb")
|
||||
command_file = open(commands_fp, "w")
|
||||
|
||||
csvwriter = csv.writer(command_file)
|
||||
csvwriter.writerow(command)
|
||||
|
|
@ -569,7 +571,7 @@ class Reverter(object):
|
|||
# It is possible save checkpoints faster than 1 per second resulting in
|
||||
# collisions in the naming convention.
|
||||
|
||||
for _ in xrange(2):
|
||||
for _ in six.moves.range(2):
|
||||
timestamp = self._checkpoint_timestamp()
|
||||
final_dir = os.path.join(self.config.backup_dir, timestamp)
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ def write_renewal_config(o_filename, n_filename, target, relevant_data):
|
|||
# TODO: add human-readable comments explaining other available
|
||||
# parameters
|
||||
logger.debug("Writing new config %s.", n_filename)
|
||||
with open(n_filename, "w") as f:
|
||||
with open(n_filename, "wb") as f:
|
||||
config.write(outfile=f)
|
||||
return config
|
||||
|
||||
|
|
|
|||
|
|
@ -131,8 +131,8 @@ class AccountFileStorageTest(unittest.TestCase):
|
|||
for file_name in "regr.json", "meta.json", "private_key.json":
|
||||
self.assertTrue(os.path.exists(
|
||||
os.path.join(account_path, file_name)))
|
||||
self.assertEqual("0400", oct(os.stat(os.path.join(
|
||||
account_path, "private_key.json"))[stat.ST_MODE] & 0o777))
|
||||
self.assertTrue(oct(os.stat(os.path.join(
|
||||
account_path, "private_key.json"))[stat.ST_MODE] & 0o777) in ("0400", "0o400"))
|
||||
|
||||
# restore
|
||||
self.assertEqual(self.acc, self.storage.load(self.acc.id))
|
||||
|
|
@ -179,14 +179,14 @@ class AccountFileStorageTest(unittest.TestCase):
|
|||
self.storage.save(self.acc)
|
||||
mock_open = mock.mock_open()
|
||||
mock_open.side_effect = IOError
|
||||
with mock.patch("__builtin__.open", mock_open):
|
||||
with mock.patch("six.moves.builtins.open", mock_open):
|
||||
self.assertRaises(
|
||||
errors.AccountStorageError, self.storage.load, self.acc.id)
|
||||
|
||||
def test_save_ioerrors(self):
|
||||
mock_open = mock.mock_open()
|
||||
mock_open.side_effect = IOError # TODO: [None, None, IOError]
|
||||
with mock.patch("__builtin__.open", mock_open):
|
||||
with mock.patch("six.moves.builtins.open", mock_open):
|
||||
self.assertRaises(
|
||||
errors.AccountStorageError, self.storage.save, self.acc)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
"""ACME utilities for testing."""
|
||||
import datetime
|
||||
import itertools
|
||||
|
||||
import six
|
||||
|
||||
from acme import challenges
|
||||
from acme import jose
|
||||
|
|
@ -13,10 +14,10 @@ KEY = test_util.load_rsa_private_key('rsa512_key.pem')
|
|||
|
||||
# Challenges
|
||||
HTTP01 = challenges.HTTP01(
|
||||
token="evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
|
||||
token=b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
|
||||
TLSSNI01 = challenges.TLSSNI01(
|
||||
token=jose.b64decode(b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJyPCt92wrDoA"))
|
||||
DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a")
|
||||
DNS = challenges.DNS(token=b"17817c66b60ce2e4012dfad92657527a")
|
||||
|
||||
CHALLENGES = [HTTP01, TLSSNI01, DNS]
|
||||
|
||||
|
|
@ -62,7 +63,7 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
|
|||
# pylint: disable=redefined-outer-name
|
||||
challbs = tuple(
|
||||
chall_to_challb(chall, status)
|
||||
for chall, status in itertools.izip(challs, statuses)
|
||||
for chall, status in six.moves.zip(challs, statuses)
|
||||
)
|
||||
authz_kwargs = {
|
||||
"identifier": messages.Identifier(
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import logging
|
|||
import unittest
|
||||
|
||||
import mock
|
||||
import six
|
||||
|
||||
from acme import challenges
|
||||
from acme import client as acme_client
|
||||
|
|
@ -93,7 +94,7 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
|
||||
self.assertEqual(mock_poll.call_count, 1)
|
||||
chall_update = mock_poll.call_args[0][0]
|
||||
self.assertEqual(chall_update.keys(), ["0"])
|
||||
self.assertEqual(list(six.iterkeys(chall_update)), ["0"])
|
||||
self.assertEqual(len(chall_update.values()), 1)
|
||||
|
||||
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
|
||||
|
|
@ -118,7 +119,7 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
|
||||
self.assertEqual(mock_poll.call_count, 1)
|
||||
chall_update = mock_poll.call_args[0][0]
|
||||
self.assertEqual(chall_update.keys(), ["0"])
|
||||
self.assertEqual(list(six.iterkeys(chall_update)), ["0"])
|
||||
self.assertEqual(len(chall_update.values()), 1)
|
||||
|
||||
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
|
||||
|
|
@ -143,12 +144,12 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
# Check poll call
|
||||
self.assertEqual(mock_poll.call_count, 1)
|
||||
chall_update = mock_poll.call_args[0][0]
|
||||
self.assertEqual(len(chall_update.keys()), 3)
|
||||
self.assertTrue("0" in chall_update.keys())
|
||||
self.assertEqual(len(list(six.iterkeys(chall_update))), 3)
|
||||
self.assertTrue("0" in list(six.iterkeys(chall_update)))
|
||||
self.assertEqual(len(chall_update["0"]), 1)
|
||||
self.assertTrue("1" in chall_update.keys())
|
||||
self.assertTrue("1" in list(six.iterkeys(chall_update)))
|
||||
self.assertEqual(len(chall_update["1"]), 1)
|
||||
self.assertTrue("2" in chall_update.keys())
|
||||
self.assertTrue("2" in list(six.iterkeys(chall_update)))
|
||||
self.assertEqual(len(chall_update["2"]), 1)
|
||||
|
||||
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
|
||||
|
|
@ -167,7 +168,7 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
self.assertRaises(errors.AuthorizationError, self.handler.get_authorizations, [])
|
||||
|
||||
def _validate_all(self, unused_1, unused_2):
|
||||
for dom in self.handler.authzr.keys():
|
||||
for dom in six.iterkeys(self.handler.authzr):
|
||||
azr = self.handler.authzr[dom]
|
||||
self.handler.authzr[dom] = acme_util.gen_authzr(
|
||||
messages.STATUS_VALID,
|
||||
|
|
@ -317,7 +318,7 @@ class GenChallengePathTest(unittest.TestCase):
|
|||
|
||||
"""
|
||||
def setUp(self):
|
||||
logging.disable(logging.fatal)
|
||||
logging.disable(logging.FATAL)
|
||||
|
||||
def tearDown(self):
|
||||
logging.disable(logging.NOTSET)
|
||||
|
|
|
|||
|
|
@ -136,7 +136,8 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
try:
|
||||
with mock.patch('certbot.main.sys.stderr'):
|
||||
main.main(self.standard_args + args[:]) # NOTE: parser can alter its args!
|
||||
except errors.MissingCommandlineFlag as exc:
|
||||
except errors.MissingCommandlineFlag as exc_:
|
||||
exc = exc_
|
||||
self.assertTrue(message in str(exc))
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
|
|
@ -263,7 +264,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
flags = ['--init', '--prepare', '--authenticators', '--installers']
|
||||
for args in itertools.chain(
|
||||
*(itertools.combinations(flags, r)
|
||||
for r in xrange(len(flags)))):
|
||||
for r in six.moves.range(len(flags)))):
|
||||
self._call(['plugins'] + list(args))
|
||||
|
||||
@mock.patch('certbot.main.plugins_disco')
|
||||
|
|
@ -332,7 +333,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
self._call(['-a', 'bad_auth', 'certonly'])
|
||||
assert False, "Exception should have been raised"
|
||||
except errors.PluginSelectionError as e:
|
||||
self.assertTrue('The requested bad_auth plugin does not appear' in e.message)
|
||||
self.assertTrue('The requested bad_auth plugin does not appear' in str(e))
|
||||
|
||||
def test_check_config_sanity_domain(self):
|
||||
# Punycode
|
||||
|
|
@ -427,9 +428,9 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
"The following flags didn't conflict with "
|
||||
'--server: {0}'.format(', '.join(conflicting_args)))
|
||||
except errors.Error as error:
|
||||
self.assertTrue('--server' in error.message)
|
||||
self.assertTrue('--server' in str(error))
|
||||
for arg in conflicting_args:
|
||||
self.assertTrue(arg in error.message)
|
||||
self.assertTrue(arg in str(error))
|
||||
|
||||
def test_must_staple_flag(self):
|
||||
parse = self._get_argument_parser()
|
||||
|
|
@ -855,10 +856,10 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
server = 'foo.bar'
|
||||
self._call_no_clientmock(['--cert-path', CERT, '--key-path', KEY,
|
||||
'--server', server, 'revoke'])
|
||||
with open(KEY) as f:
|
||||
with open(KEY, 'rb') as f:
|
||||
mock_acme_client.Client.assert_called_once_with(
|
||||
server, key=jose.JWK.load(f.read()), net=mock.ANY)
|
||||
with open(CERT) as f:
|
||||
with open(CERT, 'rb') as f:
|
||||
cert = crypto_util.pyopenssl_load_certificate(f.read())[0]
|
||||
mock_revoke = mock_acme_client.Client().revoke
|
||||
mock_revoke.assert_called_once_with(jose.ComparableX509(cert))
|
||||
|
|
@ -885,7 +886,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
config.verbose_count = 1
|
||||
main._handle_exception(
|
||||
Exception, exc_value=exception, trace=None, config=None)
|
||||
mock_open().write.assert_called_once_with(''.join(
|
||||
mock_open().write.assert_any_call(''.join(
|
||||
traceback.format_exception_only(Exception, exception)))
|
||||
error_msg = mock_sys.exit.call_args_list[0][0][0]
|
||||
self.assertTrue('unexpected error' in error_msg)
|
||||
|
|
@ -935,8 +936,8 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
self.assertRaises(
|
||||
argparse.ArgumentTypeError, cli.read_file, rel_test_path)
|
||||
|
||||
test_contents = 'bar\n'
|
||||
with open(rel_test_path, 'w') as f:
|
||||
test_contents = b'bar\n'
|
||||
with open(rel_test_path, 'wb') as f:
|
||||
f.write(test_contents)
|
||||
|
||||
path, contents = cli.read_file(rel_test_path)
|
||||
|
|
@ -1106,7 +1107,7 @@ class DuplicativeCertsTest(storage_test.BaseRenewableCertTest):
|
|||
def test_find_duplicative_names(self, unused_makedir):
|
||||
from certbot.main import _find_duplicative_certs
|
||||
test_cert = test_util.load_vector('cert-san.pem')
|
||||
with open(self.test_rc.cert, 'w') as f:
|
||||
with open(self.test_rc.cert, 'wb') as f:
|
||||
f.write(test_cert)
|
||||
|
||||
# No overlap at all
|
||||
|
|
|
|||
|
|
@ -232,11 +232,11 @@ class ClientTest(unittest.TestCase):
|
|||
self.assertEqual(os.path.dirname(fullchain_path),
|
||||
os.path.dirname(candidate_fullchain_path))
|
||||
|
||||
with open(cert_path, "r") as cert_file:
|
||||
with open(cert_path, "rb") as cert_file:
|
||||
cert_contents = cert_file.read()
|
||||
self.assertEqual(cert_contents, test_util.load_vector(certs[0]))
|
||||
|
||||
with open(chain_path, "r") as chain_file:
|
||||
with open(chain_path, "rb") as chain_file:
|
||||
chain_contents = chain_file.read()
|
||||
self.assertEqual(chain_contents, test_util.load_vector(certs[1]) +
|
||||
test_util.load_vector(certs[2]))
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ class NamespaceConfigTest(unittest.TestCase):
|
|||
config_base = "foo"
|
||||
work_base = "bar"
|
||||
logs_base = "baz"
|
||||
server = "mock.server"
|
||||
|
||||
mock_namespace = mock.MagicMock(spec=['config_dir', 'work_dir',
|
||||
'logs_dir', 'http01_port',
|
||||
|
|
@ -68,6 +69,7 @@ class NamespaceConfigTest(unittest.TestCase):
|
|||
mock_namespace.config_dir = config_base
|
||||
mock_namespace.work_dir = work_base
|
||||
mock_namespace.logs_dir = logs_base
|
||||
mock_namespace.server = server
|
||||
config = NamespaceConfig(mock_namespace)
|
||||
|
||||
self.assertTrue(os.path.isabs(config.config_dir))
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ class MakeCSRTest(unittest.TestCase):
|
|||
# OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID,
|
||||
# and the shortname field is just "UNDEF"
|
||||
must_staple_exts = [e for e in csr.get_extensions()
|
||||
if e.get_data() == "0\x03\x02\x01\x05"]
|
||||
if e.get_data() == b"0\x03\x02\x01\x05"]
|
||||
self.assertEqual(len(must_staple_exts), 1,
|
||||
"Expected exactly one Must Staple extension")
|
||||
|
||||
|
|
@ -341,7 +341,7 @@ class CertLoaderTest(unittest.TestCase):
|
|||
|
||||
def test_load_invalid_cert(self):
|
||||
from certbot.crypto_util import pyopenssl_load_certificate
|
||||
bad_cert_data = CERT.replace("BEGIN CERTIFICATE", "ASDFASDFASDF!!!")
|
||||
bad_cert_data = CERT.replace(b"BEGIN CERTIFICATE", b"ASDFASDFASDF!!!")
|
||||
self.assertRaises(
|
||||
errors.Error, pyopenssl_load_certificate, bad_cert_data)
|
||||
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ class FileOutputDisplayTest(unittest.TestCase):
|
|||
self.assertTrue("message" in string)
|
||||
|
||||
def test_notification_pause(self):
|
||||
with mock.patch("__builtin__.raw_input", return_value="enter"):
|
||||
with mock.patch("six.moves.input", return_value="enter"):
|
||||
self.displayer.notification("message")
|
||||
|
||||
self.assertTrue("message" in self.mock_stdout.write.call_args[0][0])
|
||||
|
|
@ -164,31 +164,31 @@ class FileOutputDisplayTest(unittest.TestCase):
|
|||
self.assertEqual(ret, (display_util.OK, 0))
|
||||
|
||||
def test_input_cancel(self):
|
||||
with mock.patch("__builtin__.raw_input", return_value="c"):
|
||||
with mock.patch("six.moves.input", return_value="c"):
|
||||
code, _ = self.displayer.input("message")
|
||||
|
||||
self.assertTrue(code, display_util.CANCEL)
|
||||
|
||||
def test_input_normal(self):
|
||||
with mock.patch("__builtin__.raw_input", return_value="domain.com"):
|
||||
with mock.patch("six.moves.input", return_value="domain.com"):
|
||||
code, input_ = self.displayer.input("message")
|
||||
|
||||
self.assertEqual(code, display_util.OK)
|
||||
self.assertEqual(input_, "domain.com")
|
||||
|
||||
def test_yesno(self):
|
||||
with mock.patch("__builtin__.raw_input", return_value="Yes"):
|
||||
with mock.patch("six.moves.input", return_value="Yes"):
|
||||
self.assertTrue(self.displayer.yesno("message"))
|
||||
with mock.patch("__builtin__.raw_input", return_value="y"):
|
||||
with mock.patch("six.moves.input", return_value="y"):
|
||||
self.assertTrue(self.displayer.yesno("message"))
|
||||
with mock.patch("__builtin__.raw_input", side_effect=["maybe", "y"]):
|
||||
with mock.patch("six.moves.input", side_effect=["maybe", "y"]):
|
||||
self.assertTrue(self.displayer.yesno("message"))
|
||||
with mock.patch("__builtin__.raw_input", return_value="No"):
|
||||
with mock.patch("six.moves.input", return_value="No"):
|
||||
self.assertFalse(self.displayer.yesno("message"))
|
||||
with mock.patch("__builtin__.raw_input", side_effect=["cancel", "n"]):
|
||||
with mock.patch("six.moves.input", side_effect=["cancel", "n"]):
|
||||
self.assertFalse(self.displayer.yesno("message"))
|
||||
|
||||
with mock.patch("__builtin__.raw_input", return_value="a"):
|
||||
with mock.patch("six.moves.input", return_value="a"):
|
||||
self.assertTrue(self.displayer.yesno("msg", yes_label="Agree"))
|
||||
|
||||
@mock.patch("certbot.display.util.FileDisplay.input")
|
||||
|
|
@ -275,11 +275,11 @@ class FileOutputDisplayTest(unittest.TestCase):
|
|||
|
||||
def test_get_valid_int_ans_valid(self):
|
||||
# pylint: disable=protected-access
|
||||
with mock.patch("__builtin__.raw_input", return_value="1"):
|
||||
with mock.patch("six.moves.input", return_value="1"):
|
||||
self.assertEqual(
|
||||
self.displayer._get_valid_int_ans(1), (display_util.OK, 1))
|
||||
ans = "2"
|
||||
with mock.patch("__builtin__.raw_input", return_value=ans):
|
||||
with mock.patch("six.moves.input", return_value=ans):
|
||||
self.assertEqual(
|
||||
self.displayer._get_valid_int_ans(3),
|
||||
(display_util.OK, int(ans)))
|
||||
|
|
@ -292,7 +292,7 @@ class FileOutputDisplayTest(unittest.TestCase):
|
|||
["c"],
|
||||
]
|
||||
for ans in answers:
|
||||
with mock.patch("__builtin__.raw_input", side_effect=ans):
|
||||
with mock.patch("six.moves.input", side_effect=ans):
|
||||
self.assertEqual(
|
||||
self.displayer._get_valid_int_ans(3),
|
||||
(display_util.CANCEL, -1))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
"""Test certbot.reverter."""
|
||||
import csv
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
|
|
@ -8,6 +7,7 @@ import tempfile
|
|||
import unittest
|
||||
|
||||
import mock
|
||||
import six
|
||||
|
||||
from certbot import errors
|
||||
|
||||
|
|
@ -153,7 +153,7 @@ class ReverterCheckpointLocalTest(unittest.TestCase):
|
|||
|
||||
act_coms = get_undo_commands(self.config.temp_checkpoint_dir)
|
||||
|
||||
for a_com, com in itertools.izip(act_coms, coms):
|
||||
for a_com, com in six.moves.zip(act_coms, coms):
|
||||
self.assertEqual(a_com, com)
|
||||
|
||||
def test_bad_register_undo_command(self):
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import unittest
|
|||
import configobj
|
||||
import mock
|
||||
import pytz
|
||||
import six
|
||||
|
||||
import certbot
|
||||
from certbot import cli
|
||||
|
|
@ -92,8 +93,8 @@ class BaseRenewableCertTest(unittest.TestCase):
|
|||
os.symlink(os.path.join(os.path.pardir, os.path.pardir, "archive",
|
||||
"example.org", "{0}{1}.pem".format(kind, ver)),
|
||||
link)
|
||||
with open(link, "w") as f:
|
||||
f.write(kind if value is None else value)
|
||||
with open(link, "wb") as f:
|
||||
f.write(kind.encode('ascii') if value is None else value)
|
||||
|
||||
def _write_out_ex_kinds(self):
|
||||
for kind in ALL_FOUR:
|
||||
|
|
@ -235,7 +236,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self.assertEqual(self.test_rc.current_version("cert"), None)
|
||||
|
||||
def test_latest_and_next_versions(self):
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
for kind in ALL_FOUR:
|
||||
self._write_out_kind(kind, ver)
|
||||
self.assertEqual(self.test_rc.latest_common_version(), 5)
|
||||
|
|
@ -258,7 +259,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self.assertEqual(self.test_rc.next_free_version(), 18)
|
||||
|
||||
def test_update_link_to(self):
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
for kind in ALL_FOUR:
|
||||
self._write_out_kind(kind, ver)
|
||||
self.assertEqual(ver, self.test_rc.current_version(kind))
|
||||
|
|
@ -285,12 +286,12 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
os.path.basename(self.test_rc.version("cert", 8)))
|
||||
|
||||
def test_update_all_links_to_success(self):
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
for kind in ALL_FOUR:
|
||||
self._write_out_kind(kind, ver)
|
||||
self.assertEqual(ver, self.test_rc.current_version(kind))
|
||||
self.assertEqual(self.test_rc.latest_common_version(), 5)
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
self.test_rc.update_all_links_to(ver)
|
||||
for kind in ALL_FOUR:
|
||||
self.assertEqual(ver, self.test_rc.current_version(kind))
|
||||
|
|
@ -330,11 +331,11 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self.assertEqual(self.test_rc.current_version(kind), 11)
|
||||
|
||||
def test_has_pending_deployment(self):
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
for kind in ALL_FOUR:
|
||||
self._write_out_kind(kind, ver)
|
||||
self.assertEqual(ver, self.test_rc.current_version(kind))
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
self.test_rc.update_all_links_to(ver)
|
||||
for kind in ALL_FOUR:
|
||||
self.assertEqual(ver, self.test_rc.current_version(kind))
|
||||
|
|
@ -373,10 +374,10 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self._write_out_ex_kinds()
|
||||
|
||||
self.test_rc.update_all_links_to(12)
|
||||
with open(self.test_rc.cert, "w") as f:
|
||||
with open(self.test_rc.cert, "wb") as f:
|
||||
f.write(test_cert)
|
||||
self.test_rc.update_all_links_to(11)
|
||||
with open(self.test_rc.cert, "w") as f:
|
||||
with open(self.test_rc.cert, "wb") as f:
|
||||
f.write(test_cert)
|
||||
|
||||
mock_datetime.timedelta = datetime.timedelta
|
||||
|
|
@ -426,7 +427,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self.assertFalse(self.test_rc.should_autodeploy())
|
||||
self.test_rc.configuration["autodeploy"] = "1"
|
||||
# No pending deployment
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
for kind in ALL_FOUR:
|
||||
self._write_out_kind(kind, ver)
|
||||
self.assertFalse(self.test_rc.should_autodeploy())
|
||||
|
|
@ -461,7 +462,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
# (to avoid instantiating parser)
|
||||
mock_rv.side_effect = lambda x: x
|
||||
|
||||
for ver in xrange(1, 6):
|
||||
for ver in six.moves.range(1, 6):
|
||||
for kind in ALL_FOUR:
|
||||
self._write_out_kind(kind, ver)
|
||||
self.test_rc.update_all_links_to(3)
|
||||
|
|
@ -492,7 +493,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self.test_rc.version("privkey", i))))
|
||||
|
||||
for kind in ALL_FOUR:
|
||||
self.assertEqual(self.test_rc.available_versions(kind), range(1, 9))
|
||||
self.assertEqual(self.test_rc.available_versions(kind), list(six.moves.range(1, 9)))
|
||||
self.assertEqual(self.test_rc.current_version(kind), 3)
|
||||
# Test updating from latest version rather than old version
|
||||
self.test_rc.update_all_links_to(8)
|
||||
|
|
@ -501,7 +502,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
"attempt", self.cli_config))
|
||||
for kind in ALL_FOUR:
|
||||
self.assertEqual(self.test_rc.available_versions(kind),
|
||||
range(1, 10))
|
||||
list(six.moves.range(1, 10)))
|
||||
self.assertEqual(self.test_rc.current_version(kind), 8)
|
||||
with open(self.test_rc.version("fullchain", 9)) as f:
|
||||
self.assertEqual(f.read(), "last" + "attempt")
|
||||
|
|
|
|||
|
|
@ -189,6 +189,12 @@ class UniqueFileTest(unittest.TestCase):
|
|||
self.assertTrue(basename3.endswith("foo.txt"))
|
||||
|
||||
|
||||
try:
|
||||
file_type = file
|
||||
except NameError:
|
||||
import io
|
||||
file_type = io.TextIOWrapper
|
||||
|
||||
class UniqueLineageNameTest(unittest.TestCase):
|
||||
"""Tests for certbot.util.unique_lineage_name."""
|
||||
|
||||
|
|
@ -204,13 +210,13 @@ class UniqueLineageNameTest(unittest.TestCase):
|
|||
|
||||
def test_basic(self):
|
||||
f, path = self._call("wow")
|
||||
self.assertTrue(isinstance(f, file))
|
||||
self.assertTrue(isinstance(f, file_type))
|
||||
self.assertEqual(os.path.join(self.root_path, "wow.conf"), path)
|
||||
|
||||
def test_multiple(self):
|
||||
for _ in xrange(10):
|
||||
for _ in six.moves.range(10):
|
||||
f, name = self._call("wow")
|
||||
self.assertTrue(isinstance(f, file))
|
||||
self.assertTrue(isinstance(f, file_type))
|
||||
self.assertTrue(isinstance(name, str))
|
||||
self.assertTrue("wow-0009.conf" in name)
|
||||
|
||||
|
|
|
|||
|
|
@ -402,18 +402,27 @@ def enforce_domain_sanity(domain):
|
|||
:returns: The domain cast to `str`, with ASCII-only contents
|
||||
:rtype: str
|
||||
"""
|
||||
if isinstance(domain, six.text_type):
|
||||
wildcard_marker = u"*."
|
||||
punycode_marker = u"xn--"
|
||||
else:
|
||||
wildcard_marker = b"*."
|
||||
punycode_marker = b"xn--"
|
||||
|
||||
# Check if there's a wildcard domain
|
||||
if domain.startswith("*."):
|
||||
if domain.startswith(wildcard_marker):
|
||||
raise errors.ConfigurationError(
|
||||
"Wildcard domains are not supported: {0}".format(domain))
|
||||
# Punycode
|
||||
if "xn--" in domain:
|
||||
if punycode_marker in domain:
|
||||
raise errors.ConfigurationError(
|
||||
"Punycode domains are not presently supported: {0}".format(domain))
|
||||
|
||||
# Unicode
|
||||
try:
|
||||
domain = domain.encode('ascii').lower()
|
||||
if isinstance(domain, six.binary_type):
|
||||
domain = domain.decode('utf-8')
|
||||
domain.encode('ascii')
|
||||
except UnicodeError:
|
||||
error_fmt = (u"Internationalized domain names "
|
||||
"are not presently supported: {0}")
|
||||
|
|
@ -422,11 +431,10 @@ def enforce_domain_sanity(domain):
|
|||
else:
|
||||
raise errors.ConfigurationError(str(error_fmt).format(domain))
|
||||
|
||||
if six.PY3:
|
||||
domain = domain.decode('ascii')
|
||||
domain = domain.lower()
|
||||
|
||||
# Remove trailing dot
|
||||
domain = domain[:-1] if domain.endswith('.') else domain
|
||||
domain = domain[:-1] if domain.endswith(u'.') else domain
|
||||
|
||||
# Explain separately that IP addresses aren't allowed (apart from not
|
||||
# being FQDNs) because hope springs eternal concerning this point
|
||||
|
|
|
|||
6
tox.ini
6
tox.ini
|
|
@ -41,16 +41,22 @@ deps =
|
|||
commands =
|
||||
pip install -e acme[dns,dev]
|
||||
nosetests -v acme
|
||||
pip install -e .[dev]
|
||||
nosetests -v certbot
|
||||
|
||||
[testenv:py34]
|
||||
commands =
|
||||
pip install -e acme[dns,dev]
|
||||
nosetests -v acme
|
||||
pip install -e .[dev]
|
||||
nosetests -v certbot
|
||||
|
||||
[testenv:py35]
|
||||
commands =
|
||||
pip install -e acme[dns,dev]
|
||||
nosetests -v acme
|
||||
pip install -e .[dev]
|
||||
nosetests -v certbot
|
||||
|
||||
[testenv:cover]
|
||||
basepython = python2.7
|
||||
|
|
|
|||
Loading…
Reference in a new issue