100% unittests for reverter, code cleanup

This commit is contained in:
James Kasten 2015-01-22 21:51:25 -08:00
parent 7a238bd0de
commit 417183165e
7 changed files with 316 additions and 69 deletions

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.client.reverter`
---------------------------------
.. automodule:: letsencrypt.client.reverter
:members:

View file

@ -108,4 +108,4 @@ CERT_DELETE_MSG = "This certificate has either been deleted or moved"
Indicates that the original certificate has been moved/deleted.
"""
"""

View file

@ -519,6 +519,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return ssl_vhost
# pylint: disable=no-method-argument,no-self-use,unused-argument
def supported_enhancements():
"""Returns currently supported enhancements."""
return ["redirect"]

View file

@ -88,7 +88,7 @@ class ApacheDvsni(object):
# Create all of the challenge certs
for chall in self.dvsni_chall:
cert_path = self.get_cert_file(chall.nonce)
self.config.reverter.register_file_creation(cert_path)
self.config.reverter.register_file_creation(True, cert_path)
s_b64 = challenge_util.dvsni_gen_cert(
cert_path, chall.domain, chall.r_b64, chall.nonce, chall.key)

View file

@ -1,9 +1,5 @@
"""Class of Augeas Configurators."""
import logging
import os
import sys
import shutil
import time
import augeas
@ -117,7 +113,7 @@ class AugeasConfigurator(object):
self.reverter.add_to_checkpoint(save_files, self.save_notes)
if title and not temporary:
success = self.reverter.finalize_checkpoint(title)
self.reverter.finalize_checkpoint(title)
self.aug.set("/augeas/save", save_state)
self.save_notes = ""
@ -169,4 +165,4 @@ class AugeasConfigurator(object):
def view_config_changes(self):
"""Show all of the configuration changes that have taken place."""
self.reverter.show_config_changes()
self.reverter.view_config_changes()

View file

@ -2,7 +2,6 @@
import logging
import os
import shutil
import sys
import time
from letsencrypt.client import CONFIG
@ -59,8 +58,8 @@ class Reverter(object):
backups.sort()
if len(backups) < rollback:
logging.error("Unable to rollback %d checkpoints, only %d exist",
rollback, len(backups))
logging.warning("Unable to rollback %d checkpoints, only %d exist",
rollback, len(backups))
while rollback > 0 and backups:
cp_dir = os.path.join(self.direc['backup'], backups.pop())
@ -77,9 +76,10 @@ class Reverter(object):
All checkpoints are printed to the console.
Note: Any 'IN_PROGRESS' checkpoints will be removed by the cleanup
.. note:: Any 'IN_PROGRESS' checkpoints will be removed by the cleanup
script found in the constructor, before this function would ever be
called.
.. todo:: Decide on a policy for error handling, OSError IOError...
"""
backups = os.listdir(self.direc['backup'])
@ -88,6 +88,7 @@ class Reverter(object):
if not backups:
logging.info("Letsencrypt has not saved any backups of your "
"configuration")
return
# Make sure there isn't anything unexpected in the backup folder
# There should only be timestamped (float) directories
try:
@ -95,7 +96,7 @@ class Reverter(object):
float(bkup)
except ValueError:
raise errors.LetsEncryptReverterError(
"Invalid directories in {}".format(self.direc['backup']))
"Invalid directories in {0}".format(self.direc['backup']))
for bkup in backups:
print time.ctime(float(bkup))
@ -107,21 +108,16 @@ class Reverter(object):
with open(os.path.join(cur_dir, "FILEPATHS")) as paths_fd:
filepaths = paths_fd.read().splitlines()
for path in filepaths:
print " {}".format(path)
print " {0}".format(path)
try:
if os.path.isfile(os.path.join(cur_dir, "NEW_FILES")):
with open(os.path.join(cur_dir, "NEW_FILES")) as new_fd:
print "New Configuration Files:"
filepaths = new_fd.read().splitlines()
for path in filepaths:
print " {}".format(path)
except (IOError, OSError) as err:
logging.error(str(err))
raise errors.LetsEncryptReverterError(
"Unable to read new files in checkpoint"
"- {}".format(cur_dir))
print "\n"
if os.path.isfile(os.path.join(cur_dir, "NEW_FILES")):
with open(os.path.join(cur_dir, "NEW_FILES")) as new_fd:
print "New Configuration Files:"
filepaths = new_fd.read().splitlines()
for path in filepaths:
print " {0}".format(path)
print "{0}".format(os.linesep)
def add_to_temp_checkpoint(self, save_files, save_notes):
"""Add files to temporary checkpoint
@ -151,6 +147,10 @@ class Reverter(object):
:param set save_files: set of files to save
:param str save_notes: notes about changes made during the save
:raises IOError: If unable to open cp_dir + FILEPATHS file
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError: If
unable to add checkpoint
"""
le_util.make_or_verify_dir(cp_dir, 0o755, os.geteuid())
@ -172,9 +172,19 @@ class Reverter(object):
# Tag files with index so multiple files can
# have the same filename
logging.debug("Creating backup of %s", filename)
shutil.copy2(filename, os.path.join(
cp_dir, os.path.basename(filename) + "_" + str(idx)))
op_fd.write(filename + '\n')
try:
shutil.copy2(filename, os.path.join(
cp_dir, os.path.basename(filename) + "_" + str(idx)))
op_fd.write(filename + '\n')
# http://stackoverflow.com/questions/4726260/effective-use-of-python-shutil-copy2
except IOError:
op_fd.close()
logging.error(
"Unable to add file %s to checkpoint %s",
filename, cp_dir)
raise errors.LetsEncryptReverterError(
"Unable to add file {0} to checkpoint "
"{1}".format(filename, cp_dir))
idx += 1
op_fd.close()
@ -256,10 +266,19 @@ class Reverter(object):
:param bool temporary: If the file creation registry is for
a temp or permanent save.
:param \*files: file paths (str) to be registered
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`: If
call does not contain necessary parameters or if the file creation
is unable to be registered.
"""
# Make sure some files are provided... as this is an error
# Made this mistake in my initial implementation of apache.dvsni.py
if not files:
raise errors.LetsEncryptReverterError(
"Forgot to provide files to registration call")
if temporary:
cp_dir = self.direc['temp']
else:
@ -269,14 +288,16 @@ class Reverter(object):
try:
with open(os.path.join(cp_dir, "NEW_FILES"), 'a') as new_fd:
for file_path in files:
new_fd.write("%s\n" % file_path)
new_fd.write("{0}{1}".format(file_path, os.linesep))
except (IOError, OSError):
logging.error("ERROR: Unable to register file creation")
logging.error("Unable to register file creation(s) - %s", files)
raise errors.LetsEncryptReverterError(
"Unable to register file creation(s) - {0}".format(files))
def recovery_routine(self):
"""Revert all previously modified files.
First, any changes found in self.direc["temp"] are removed,
First, any changes found in self.direc['temp'] are removed,
then IN_PROGRESS changes are removed The order is important.
IN_PROGRESS is unable to add files that are already added by a TEMP
change. Thus TEMP must be rolled back first because that will be the
@ -289,12 +310,12 @@ class Reverter(object):
self._recover_checkpoint(self.direc['progress'])
except errors.LetsEncryptReverterError:
# We have a partial or incomplete recovery
# Not as egregious
logging.fatal("Incomplete or failed recovery for %s",
logging.fatal("Incomplete or failed recovery for IN_PROGRESS "
"checkpoint - %s",
self.direc['progress'])
raise errors.LetsEncryptReverterError(
"Incomplete or failed recovery for "
"%s" % self.direc['progress'])
"Incomplete or failed recovery for IN_PROGRESS checkpoint "
"- %s" % self.direc['progress'])
# pylint: disable=no-self-use
def _remove_contained_files(self, file_list):
@ -305,6 +326,9 @@ class Reverter(object):
:returns: Success
:rtype: bool
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`: If
all files within file_list cannot be removed
"""
# Check to see that file exists to differentiate can't find file_list
# and can't remove filepaths within file_list errors.
@ -319,13 +343,16 @@ class Reverter(object):
if os.path.lexists(path):
os.remove(path)
else:
logging.warn(
"File: %s - Could not be found to be deleted\n"
"LE probably shut down unexpectedly", path)
logging.warning(
"File: %s - Could not be found to be deleted%s"
"LE probably shut down unexpectedly",
os.linesep, path)
except (IOError, OSError):
logging.fatal(
"Unable to remove filepaths contained within %s", file_list)
sys.exit(41)
raise errors.LetsEncryptReverterError(
"Unable to remove filepaths contained within "
"{0}".format(file_list))
return True
@ -337,9 +364,14 @@ class Reverter(object):
Move self.direc['progress'] to Backups directory and
rename the directory as a timestamp
:param str title: Title describing checkpoint
:raises :class:`letsencrypt.client.errors.LetsEncryptReverterError`
"""
# Check to make sure an "in progress" directory exists
if not os.path.isdir(self.direc['progress']):
logging.warning("No IN_PROGRESS checkpoint to finalize")
return
changes_since_path = os.path.join(
@ -354,7 +386,6 @@ class Reverter(object):
changes_tmp.write(changes_orig.read())
shutil.move(changes_since_tmp_path, changes_since_path)
except (IOError, OSError):
logging.error("Unable to finalize checkpoint - adding title")
raise errors.LetsEncryptReverterError("Unable to add title")

View file

@ -5,7 +5,10 @@ import shutil
import tempfile
import unittest
class ReverterTest(unittest.TestCase):
import mock
# pylint: disable=invalid-name,protected-access,too-many-instance-attributes
class ReverterCheckpointLocalTest(unittest.TestCase):
"""Test the Reverter Class."""
def setUp(self):
from letsencrypt.client.reverter import Reverter
@ -13,26 +16,12 @@ class ReverterTest(unittest.TestCase):
# Disable spurious errors... we are trying to test for them
logging.disable(logging.CRITICAL)
self.work_dir = tempfile.mkdtemp("work")
backup = os.path.join(self.work_dir, "backup")
self.direc = {'backup': backup,
'temp': os.path.join(self.work_dir, "temp"),
'progress': os.path.join(backup, "progress")}
self.work_dir, self.direc = setup_work_direc()
self.reverter = Reverter(self.direc)
self.dir1 = tempfile.mkdtemp("dir1")
self.dir2 = tempfile.mkdtemp("dir2")
self.config1 = os.path.join(self.dir1, "config.txt")
self.config2 = os.path.join(self.dir2, "config.txt")
with open(self.config1, 'w') as file_fd:
file_fd.write("directive-dir1")
with open(self.config2, 'w') as file_fd:
file_fd.write("directive-dir2")
self.sets = [set([self.config1]),
set([self.config2]),
set([self.config1, self.config2])]
tup = setup_test_files()
self.config1, self.config2, self.dir1, self.dir2, self.sets = tup
def tearDown(self):
shutil.rmtree(self.work_dir)
@ -53,6 +42,17 @@ class ReverterTest(unittest.TestCase):
get_filepaths(self.direc['temp']),
"{0}\n{1}\n".format(self.config1, self.config2))
def test_add_to_checkpoint_copy_failure(self):
from letsencrypt.client.errors import LetsEncryptReverterError
with mock.patch("letsencrypt.client.reverter."
"shutil.copy2") as mock_copy2:
mock_copy2.side_effect = IOError("bad copy")
self.assertRaises(LetsEncryptReverterError,
self.reverter.add_to_checkpoint,
self.sets[0],
"save1")
def test_checkpoint_conflict(self):
"""Make sure that checkpoint errors are thrown appropriately."""
from letsencrypt.client.errors import LetsEncryptReverterError
@ -77,7 +77,7 @@ class ReverterTest(unittest.TestCase):
self.reverter.add_to_checkpoint,
set([config3]), "invalid save")
def test_multiple_saves_and_rollback(self):
def test_multiple_saves_and_temp_revert(self):
self.reverter.add_to_temp_checkpoint(self.sets[0], "save1")
update_file(self.config1, "updated-directive")
self.reverter.add_to_temp_checkpoint(self.sets[0], "save2-updated dir")
@ -86,6 +86,113 @@ class ReverterTest(unittest.TestCase):
self.reverter.revert_temporary_config()
self.assertEqual(read_in(self.config1), "directive-dir1")
def test_multiple_registration_fail_and_revert(self):
config3 = os.path.join(self.dir1, "config3.txt")
update_file(config3, "Config3")
config4 = os.path.join(self.dir2, "config4.txt")
update_file(config4, "Config4")
# Test multiple registrations and two registrations at once
self.reverter.register_file_creation(True, self.config1)
self.reverter.register_file_creation(True, self.config2)
self.reverter.register_file_creation(True, config3, config4)
# Simulate Let's Encrypt crash... recovery routine is run
self.reverter.recovery_routine()
self.assertFalse(os.path.isfile(self.config1))
self.assertFalse(os.path.isfile(self.config2))
self.assertFalse(os.path.isfile(config3))
self.assertFalse(os.path.isfile(config4))
def test_register_file_creation_write_error(self):
from letsencrypt.client.errors import LetsEncryptReverterError
m_open = mock.mock_open()
with mock.patch("letsencrypt.client.reverter.open",
m_open, create=True):
m_open.side_effect = OSError("bad open")
self.assertRaises(LetsEncryptReverterError,
self.reverter.register_file_creation,
True, self.config1)
def test_bad_registration(self):
from letsencrypt.client.errors import LetsEncryptReverterError
# Made this mistake and want to make sure it doesn't happen again...
self.assertRaises(LetsEncryptReverterError,
self.reverter.register_file_creation,
"filepath")
def test_recovery_routine_in_progress_failure(self):
from letsencrypt.client.errors import LetsEncryptReverterError
self.reverter.add_to_checkpoint(self.sets[0], "perm save")
self.reverter._recover_checkpoint = mock.MagicMock(
side_effect=LetsEncryptReverterError)
self.assertRaises(LetsEncryptReverterError,
self.reverter.recovery_routine)
def test_recover_checkpoint_revert_temp_failures(self):
from letsencrypt.client.errors import LetsEncryptReverterError
mock_recover = mock.MagicMock(side_effect=LetsEncryptReverterError("e"))
self.reverter._recover_checkpoint = mock_recover
self.reverter.add_to_temp_checkpoint(self.sets[0], "config1 save")
self.assertRaises(LetsEncryptReverterError,
self.reverter.revert_temporary_config)
def test_recover_checkpoint_rollback_failure(self):
from letsencrypt.client.errors import LetsEncryptReverterError
mock_recover = mock.MagicMock(side_effect=LetsEncryptReverterError("e"))
self.reverter._recover_checkpoint = mock_recover
self.reverter.add_to_checkpoint(self.sets[0], "config1 save")
self.reverter.finalize_checkpoint("Title")
self.assertRaises(LetsEncryptReverterError,
self.reverter.rollback_checkpoints, 1)
def test_recover_checkpoint_copy_failure(self):
from letsencrypt.client.errors import LetsEncryptReverterError
self.reverter.add_to_temp_checkpoint(self.sets[0], "save1")
with mock.patch("letsencrypt.client.reverter.shutil."
"copy2") as mock_copy2:
mock_copy2.side_effect = OSError("bad copy")
self.assertRaises(LetsEncryptReverterError,
self.reverter.revert_temporary_config)
def test_recover_checkpoint_rm_failure(self):
from letsencrypt.client.errors import LetsEncryptReverterError
self.reverter.add_to_temp_checkpoint(self.sets[0], "temp save")
with mock.patch("letsencrypt.client.reverter.shutil."
"rmtree") as mock_rmtree:
mock_rmtree.side_effect = OSError("Cannot remove tree")
self.assertRaises(LetsEncryptReverterError,
self.reverter.revert_temporary_config)
@mock.patch("letsencrypt.client.reverter.logging.warning")
def test_recover_checkpoint_missing_new_files(self, mock_warn):
self.reverter.register_file_creation(
True, os.path.join(self.dir1, "missing_file.txt"))
self.reverter.revert_temporary_config()
self.assertEqual(mock_warn.call_count, 1)
@mock.patch("letsencrypt.client.reverter.os.remove")
def test_recover_checkpoint_remove_failure(self, mock_remove):
from letsencrypt.client.errors import LetsEncryptReverterError
self.reverter.register_file_creation(True, self.config1)
mock_remove.side_effect = OSError("Can't remove")
self.assertRaises(LetsEncryptReverterError,
self.reverter.revert_temporary_config)
def test_recovery_routine_temp_and_perm(self):
# Register a new perm checkpoint file
config3 = os.path.join(self.dir1, "config3.txt")
@ -119,6 +226,25 @@ class ReverterTest(unittest.TestCase):
self.assertEqual(read_in(self.config1), "directive-dir1")
self.assertEqual(read_in(self.config2), "directive-dir2")
# pylint: disable=invalid-name,protected-access,too-many-instance-attributes
class TestFullCheckpointsReverter(unittest.TestCase):
"""Tests functions having to deal with full checkpoints."""
def setUp(self):
from letsencrypt.client.reverter import Reverter
# Disable spurious errors...
logging.disable(logging.CRITICAL)
self.work_dir, self.direc = setup_work_direc()
self.reverter = Reverter(self.direc)
tup = setup_test_files()
self.config1, self.config2, self.dir1, self.dir2, self.sets = tup
def tearDown(self):
shutil.rmtree(self.work_dir)
shutil.rmtree(self.dir1)
shutil.rmtree(self.dir2)
def test_rollback_improper_inputs(self):
from letsencrypt.client.errors import LetsEncryptReverterError
self.assertRaises(
@ -160,6 +286,38 @@ class ReverterTest(unittest.TestCase):
self.reverter.rollback_checkpoints(1)
self.assertEqual(read_in(self.config1), "directive-dir1")
@mock.patch("letsencrypt.client.reverter.logging.warning")
def test_finalize_checkpoint_no_in_progress(self, mock_warn):
self.reverter.finalize_checkpoint("No checkpoint... should warn")
self.assertEqual(mock_warn.call_count, 1)
@mock.patch("letsencrypt.client.reverter.shutil.move")
def test_finalize_checkpoint_cannot_title(self, mock_move):
from letsencrypt.client.errors import LetsEncryptReverterError
self.reverter.add_to_checkpoint(self.sets[0], "perm save")
mock_move.side_effect = OSError("cannot move")
self.assertRaises(LetsEncryptReverterError,
self.reverter.finalize_checkpoint,
"Title")
@mock.patch("letsencrypt.client.reverter.os.rename")
def test_finalize_checkpoint_no_rename_directory(self, mock_rename):
from letsencrypt.client.errors import LetsEncryptReverterError
self.reverter.add_to_checkpoint(self.sets[0], "perm save")
mock_rename.side_effect = OSError
self.assertRaises(LetsEncryptReverterError,
self.reverter.finalize_checkpoint,
"Title")
@mock.patch("letsencrypt.client.reverter.logging")
def test_rollback_too_many(self, mock_logging):
self.reverter.rollback_checkpoints(1)
self.assertEqual(mock_logging.warning.call_count, 1)
def test_multi_rollback(self):
config3 = self._setup_three_checkpoints()
self.reverter.rollback_checkpoints(3)
@ -174,6 +332,20 @@ class ReverterTest(unittest.TestCase):
# Just make sure it doesn't throw any errors.
self.reverter.view_config_changes()
@mock.patch("letsencrypt.client.reverter.logging")
def test_view_config_changes_no_backups(self, mock_logging):
self.reverter.view_config_changes()
self.assertTrue(mock_logging.info.call_count > 0)
def test_view_config_changes_bad_backups_dir(self):
from letsencrypt.client.errors import LetsEncryptReverterError
# There shouldn't be any "in progess directories when this is called
# It must just be clean checkpoints
os.makedirs(os.path.join(self.direc['backup'], "in_progress"))
self.assertRaises(LetsEncryptReverterError,
self.reverter.view_config_changes)
def _setup_three_checkpoints(self):
"""Generate some finalized checkpoints."""
# Checkpoint1 - config1
@ -203,14 +375,56 @@ class ReverterTest(unittest.TestCase):
return config3
def get_save_notes(dir):
class QuickInitReverterTest(unittest.TestCase):
"""Quick test of init."""
def test_init(self):
from letsencrypt.client.reverter import Reverter
rev = Reverter()
# Verify direc is set
self.assertTrue(rev.direc['backup'])
self.assertTrue(rev.direc['temp'])
self.assertTrue(rev.direc['progress'])
def setup_work_direc():
"""Setup directories."""
work_dir = tempfile.mkdtemp("work")
backup = os.path.join(work_dir, "backup")
os.makedirs(backup)
direc = {'backup': backup,
'temp': os.path.join(work_dir, "temp"),
'progress': os.path.join(backup, "progress")}
return work_dir, direc
def setup_test_files():
"""Setup sample configuration files."""
dir1 = tempfile.mkdtemp("dir1")
dir2 = tempfile.mkdtemp("dir2")
config1 = os.path.join(dir1, "config.txt")
config2 = os.path.join(dir2, "config.txt")
with open(config1, 'w') as file_fd:
file_fd.write("directive-dir1")
with open(config2, 'w') as file_fd:
file_fd.write("directive-dir2")
sets = [set([config1]),
set([config2]),
set([config1, config2])]
return config1, config2, dir1, dir2, sets
def get_save_notes(dire):
"""Read save notes"""
return read_in(os.path.join(dir, 'CHANGES_SINCE'))
return read_in(os.path.join(dire, 'CHANGES_SINCE'))
def get_filepaths(dir):
def get_filepaths(dire):
"""Get Filepaths"""
return read_in(os.path.join(dir, 'FILEPATHS'))
return read_in(os.path.join(dire, 'FILEPATHS'))
def read_in(path):
@ -219,11 +433,11 @@ def read_in(path):
return file_fd.read()
def update_file(filename, str):
def update_file(filename, string):
"""Update a file with a new value."""
with open(filename, 'w') as file_fd:
file_fd.write(str)
file_fd.write(string)
if __name__ == '__main__':
unittest.main()
unittest.main()