revisions

This commit is contained in:
James Kasten 2015-02-23 04:26:43 -08:00
parent 5d76c0feb1
commit fa0c3d2b9f
19 changed files with 273 additions and 356 deletions

View file

@ -74,6 +74,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
zope.interface.implements(interfaces.IAuthenticator, interfaces.IInstaller)
description = "Apache Web Server"
def __init__(self, config, version=None):
"""Initialize an Apache Configurator.
@ -87,6 +89,19 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if os.geteuid() == 0:
self.verify_setup()
# Add name_server association dict
self.assoc = dict()
# Add number of outstanding challenges
self.chall_out = 0
# These will be set in the prepare function
self.parser = None
self.version = version
self.vhosts = None
self.enhance_func = {"redirect": self._enable_redirect}
def prepare(self):
"""Prepare the authenticator/installer."""
self.parser = parser.ApacheParser(
self.aug, self.config.apache_server_root,
self.config.apache_mod_ssl_conf)
@ -94,14 +109,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self.check_parsing_errors("httpd.aug")
# Set Version
self.version = self.get_version() if version is None else version
if self.version is None:
self.version = self.get_version()
# Get all of the available vhosts
self.vhosts = self.get_virtual_hosts()
# Add name_server association dict
self.assoc = dict()
# Add number of outstanding challenges
self.chall_out = 0
# Enable mod_ssl if it isn't already enabled
# This is Let's Encrypt... we enable mod_ssl on initialization :)
@ -110,9 +122,9 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# on initialization
self._prepare_server_https()
self.enhance_func = {"redirect": self._enable_redirect}
temp_install(self.config.apache_mod_ssl_conf)
def deploy_cert(self, domain, cert, key, cert_chain=None):
"""Deploys certificate to specified virtual host.

View file

@ -5,7 +5,6 @@ import sys
import Crypto.PublicKey.RSA
import M2Crypto
import zope.component
from letsencrypt.acme import messages
from letsencrypt.acme import util as acme_util
@ -14,14 +13,13 @@ from letsencrypt.client import auth_handler
from letsencrypt.client import client_authenticator
from letsencrypt.client import crypto_util
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client import network
from letsencrypt.client import reverter
from letsencrypt.client import revoker
from letsencrypt.client.apache import configurator
from letsencrypt.client.display import ops
from letsencrypt.client.display import ops as display_ops
from letsencrypt.client.display import enhancements
@ -50,7 +48,8 @@ class Client(object):
"""Initialize a client.
:param dv_auth: IAuthenticator that can solve the
:const:`letsencrypt.client.constants.DV_CHALLENGES`
:const:`letsencrypt.client.constants.DV_CHALLENGES`.
:func:`letsencrypt.client.interfaces.IAuthenticator.prepare`
:type dv_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
"""
@ -200,7 +199,7 @@ class Client(object):
# sites may have been enabled / final cleanup
self.installer.restart()
ops.success_installation(domains)
display_ops.success_installation(domains)
def enhance_config(self, domains, redirect=None):
"""Enhance the configuration.
@ -357,6 +356,8 @@ def determine_authenticator(all_auths):
:returns: Valid Authenticator object or None
:raises :class:`letsencrypt.client.errors.LetsEncryptClientError`
"""
# Available Authenticator objects
avail_auths = []
@ -365,29 +366,20 @@ def determine_authenticator(all_auths):
for pot_auth in all_auths:
try:
# I do not think this a great solution but haven't come up with
# anything better yet... other than constricting init functions for
# authenticators
if len(pot_auth) == 2:
# pylint: disable=no-value-for-parameter
avail_auths.append((pot_auth[0], pot_auth[1]()))
elif len(pot_auth) == 3:
avail_auths.append((pot_auth[0], pot_auth[1](pot_auth[2])))
else:
raise errors.LetsEncryptClientError(
"IAuthenticator: Number of parameters not supported")
pot_auth.prepare()
avail_auths.append(pot_auth)
except errors.LetsEncryptMisconfigurationError as err:
errs[pot_auth[1]] = err
avail_auths.append((pot_auth[0], pot_auth[1]))
errs[pot_auth] = err
avail_auths.append(pot_auth)
except errors.LetsEncryptNoInstallationError:
pass
continue
if len(avail_auths) > 1:
auth = ops.choose_authenticator(avail_auths, errs)
auth = display_ops.choose_authenticator(avail_auths, errs)
elif len(avail_auths) == 1:
auth = avail_auths[0][1]
auth = avail_auths[0]
else:
auth = None
raise errors.LetsEncryptClientError("No Authenticators available.")
if auth in errs:
logging.error("Please fix the configuration for the Authenticator. "
@ -406,26 +398,20 @@ def determine_installer(config):
"""
try:
return configurator.ApacheConfigurator(config)
installer = configurator.ApacheConfigurator(config)
installer.prepare()
return installer
except errors.LetsEncryptNoInstallationError:
logging.info("Unable to find a way to install the certificate.")
return None
except errors.LetsEncryptMisconfigurationError:
# This will have to be changed in the future...
return installer
def rollback(checkpoints, config):
"""Revert configuration the specified number of checkpoints.
.. note:: If another installer uses something other than the reverter class
to do their configuration changes, the correct reverter will have to be
determined.
.. note:: This function restarts the server even if there weren't any
rollbacks. The user may be confused or made an error and simply needs
to restart the server.
.. todo:: This function will have to change depending on the functionality
of future installers. Perhaps the interface should define errors that
are thrown for the various functions.
:param int checkpoints: Number of checkpoints to revert.
:param config: Configuration.
@ -433,11 +419,7 @@ def rollback(checkpoints, config):
"""
# Misconfigurations are only a slight problems... allow the user to rollback
try:
installer = determine_installer(config)
except errors.LetsEncryptMisconfigurationError:
_misconfigured_rollback(checkpoints, config)
return
installer = determine_installer(config)
# No Errors occurred during init... proceed normally
# If installer is None... couldn't find an installer... there shouldn't be
@ -447,43 +429,6 @@ def rollback(checkpoints, config):
installer.restart()
def _misconfigured_rollback(checkpoints, config):
"""Handles the case where the Installer is misconfigured.
:param int checkpoints: Number of checkpoints to revert.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
yes = zope.component.getUtility(interfaces.IDisplay).yesno(
"Oh, no! The web server is currently misconfigured.{0}{0}"
"Would you still like to rollback the "
"configuration?".format(os.linesep))
if not yes:
logging.info("The error message is above.")
logging.info("Configuration was not rolled back.")
return
logging.info("Rolling back using the Reverter module")
# recovery routine has probably already been run by installer
# in the__init__ attempt, run it again for safety... it shouldn't hurt
# Also... not sure how future installers will handle recovery.
rev = reverter.Reverter(config)
rev.recovery_routine()
rev.rollback_checkpoints(checkpoints)
# We should try to restart the server
try:
installer = determine_installer(config)
installer.restart()
logging.info("Hooray! Rollback solved the misconfiguration!")
logging.info("Your web server is back up and running.")
except errors.LetsEncryptMisconfigurationError:
logging.warning(
"Rollback was unable to solve the misconfiguration issues")
def revoke(config, no_confirm, cert, authkey):
"""Revoke certificates.
@ -493,14 +438,9 @@ def revoke(config, no_confirm, cert, authkey):
"""
# Misconfigurations don't really matter. Determine installer better choose
# correctly though.
try:
installer = determine_installer(config)
except errors.LetsEncryptMisconfigurationError:
zope.component.getUtility(interfaces.IDisplay).notification(
"The web server is currently misconfigured. Some "
"abilities like seeing which certificates are currently "
"installed may not be available.")
installer = None
# This will need some better prepared or properly configured parameter...
# I will figure it out later...
installer = determine_installer(config)
revoc = revoker.Revoker(installer, config, no_confirm)
# Cert is most selective, so it is chosen first.

View file

@ -129,35 +129,35 @@ def make_ss_cert(key_str, domains, not_before=None,
pubkey = M2Crypto.EVP.PKey()
pubkey.assign_rsa(rsa_key)
m2_cert = M2Crypto.X509.X509()
m2_cert.set_pubkey(pubkey)
m2_cert.set_serial_number(1337)
m2_cert.set_version(2)
cert = M2Crypto.X509.X509()
cert.set_pubkey(pubkey)
cert.set_serial_number(1337)
cert.set_version(2)
current_ts = long(time.time() if not_before is None else not_before)
current = M2Crypto.ASN1.ASN1_UTCTIME()
current.set_time(current_ts)
expire = M2Crypto.ASN1.ASN1_UTCTIME()
expire.set_time(current_ts + validity)
m2_cert.set_not_before(current)
m2_cert.set_not_after(expire)
cert.set_not_before(current)
cert.set_not_after(expire)
subject = m2_cert.get_subject()
subject = cert.get_subject()
subject.C = "US"
subject.ST = "Michigan"
subject.L = "Ann Arbor"
subject.O = "University of Michigan and the EFF"
subject.CN = domains[0]
m2_cert.set_issuer(m2_cert.get_subject())
cert.set_issuer(cert.get_subject())
if len(domains) > 1:
m2_cert.add_ext(M2Crypto.X509.new_extension(
cert.add_ext(M2Crypto.X509.new_extension(
"basicConstraints", "CA:FALSE"))
m2_cert.add_ext(M2Crypto.X509.new_extension(
cert.add_ext(M2Crypto.X509.new_extension(
"subjectAltName", ", ".join(["DNS:%s" % d for d in domains])))
m2_cert.sign(pubkey, "sha256")
assert m2_cert.verify(pubkey)
assert m2_cert.verify()
cert.sign(pubkey, "sha256")
assert cert.verify(pubkey)
assert cert.verify()
# print check_purpose(,0
return m2_cert.as_pem()
return cert.as_pem()

View file

@ -8,6 +8,10 @@ from letsencrypt.client import interfaces
from letsencrypt.client.display import util as display_util
# Used to make easier to read code.
util = zope.component.getUtility # pylint: disable=invalid-name
def ask(enhancement):
"""Display the enhancement to the user.
@ -22,9 +26,10 @@ def ask(enhancement):
"""
try:
return dispatch[enhancement]()
# Call the appropriate function based on the enhancement
return DISPATCH[enhancement]()
except KeyError:
logging.error("Unsupported enhancement given to ask()")
logging.error("Unsupported enhancement given to ask(): %s", enhancement)
raise errors.LetsEncryptClientError("Unsupported Enhancement")
@ -50,9 +55,6 @@ def redirect_by_default():
return selection == 1
util = zope.component.getUtility # pylint: disable=invalid-name
dispatch = { # pylint: disable=invalid-name
DISPATCH = {
"redirect": redirect_by_default
}

View file

@ -14,18 +14,17 @@ util = zope.component.getUtility # pylint: disable=invalid-name
def choose_authenticator(auths, errs):
"""Allow the user to choose their authenticator.
:param list auths: Where each is a tuple of the form
('description', 'IAuthenticator') where IAuthenticator is a
:class:`letsencrypt.client.interfaces.IAuthenticator` object or class
:param list auths: Where each of type
:class:`letsencrypt.client.interfaces.IAuthenticator` object
:param dict errs: Mapping IAuthenticator objects to error messages
:returns: Authenticator selected
:rtype: :class:`letsencrypt.client.interfaces.IAuthenticator`
"""
descs = [auth[0] if auth[1] not in errs else "%s (Misconfigured)" % auth[0]
descs = [auth.description if auth not in errs
else "%s (Misconfigured)" % auth.description
for auth in auths]
iauths = [auth[1] for auth in auths]
while True:
code, index = util(interfaces.IDisplay).menu(
@ -33,12 +32,12 @@ def choose_authenticator(auths, errs):
descs, help_label="More Info")
if code == display_util.OK:
return iauths[index]
return auths[index]
elif code == display_util.HELP:
if iauths[index] in errs:
msg = "Reported Error: %s" % errs[iauths[index]]
if auths[index] in errs:
msg = "Reported Error: %s" % errs[auths[index]]
else:
msg = iauths[index].more_info()
msg = auths[index].more_info()
util(interfaces.IDisplay).notification(
msg, height=display_util.HEIGHT)
else:
@ -95,7 +94,7 @@ def _filter_names(names):
def _choose_names_manually():
"""Manualy input names for those without an installer."""
"""Manually input names for those without an installer."""
code, input_ = util(interfaces.IDisplay).input(
"Please enter in your domain name(s) (comma and/or space separated) ")

View file

@ -6,29 +6,11 @@ import zope.component
from letsencrypt.client import interfaces
from letsencrypt.client.display import util as display_util
# Convenience call to make for easier to read lines.
util = zope.component.getUtility # pylint: disable=invalid-name
def choose_certs(certs):
"""Choose a certificate from a menu.
:param list certs: List of cert dicts.
:returns: selection (zero-based index)
:rtype: int
"""
while True:
code, selection = _display_certs(certs)
if code == display_util.OK:
return selection
elif code == display_util.HELP:
more_info_cert(certs[selection])
else:
exit(0)
def _display_certs(certs):
def display_certs(certs):
"""Display the certificates in a menu for revocation.
:param list certs: each is a :class:`letsencrypt.client.revoker.Cert`
@ -65,10 +47,9 @@ def confirm_revocation(cert):
:rtype: bool
"""
text = ("{0}Are you sure you would like to revoke the following "
"certificate:{0}".format(os.linesep))
text += cert.pretty_print()
text += "This action cannot be reversed!"
text = ("Are you sure you would like to revoke the following "
"certificate:{0}{cert}This action cannot be "
"reversed!".format(os.linesep, cert=cert.pretty_print()))
return util(interfaces.IDisplay).yesno(text)
@ -78,8 +59,8 @@ def more_info_cert(cert):
:param dict cert: cert dict used throughout revoker.py
"""
text = "{0}Certificate Information:{0}".format(os.linesep)
text += cert.pretty_print()
text = "Certificate Information:{0}{1}".format(
os.linesep, cert.pretty_print())
util(interfaces.IDisplay).notification(text, height=display_util.HEIGHT)

View file

@ -1,4 +1,4 @@
"""Lets Encrypt display."""
"""Let's Encrypt display."""
import os
import textwrap
@ -13,10 +13,10 @@ HEIGHT = 20
# Display exit codes
OK = "ok"
"""Display exit code indicating user acceptance"""
"""Display exit code indicating user acceptance."""
CANCEL = "cancel"
"""Display exit code for a user canceling the display"""
"""Display exit code for a user canceling the display."""
HELP = "help"
"""Display exit code when for when the user requests more help."""
@ -63,10 +63,7 @@ class NcursesDisplay(object):
:rtype: tuple
"""
if help_label:
help_button = True
else:
help_button = False
help_button = bool(help_label)
# Can accept either tuples or just the actual choices
if choices and isinstance(choices[0], tuple):
@ -111,7 +108,7 @@ class NcursesDisplay(object):
return self.dialog.inputbox(message, width=self.width)
def yesno(self, message, yes_label="Yes", no_label="No"):
"""Display a Yes/No dialog box
"""Display a Yes/No dialog box.
Yes and No label must begin with different letters.
@ -198,7 +195,7 @@ class FileDisplay(object):
def input(self, message):
# pylint: disable=no-self-use
"""Accept input from the user
"""Accept input from the user.
:param str message: message to display to the user
@ -219,7 +216,8 @@ class FileDisplay(object):
def yesno(self, message, yes_label="Yes", no_label="No"):
"""Query the user with a yes/no question.
Yes and No label must begin with different letters
Yes and No label must begin with different letters, and must contain at
least one letter each.
:param str message: question for the user
:param str yes_label: Label of the "Yes" parameter
@ -336,7 +334,7 @@ class FileDisplay(object):
self.outfile.write(side_frame)
def _wrap_lines(self, msg): # pylint: disable=no-self-use
"""Format lines nicely to 80 chars
"""Format lines nicely to 80 chars.
:param str msg: Original message

View file

@ -13,6 +13,16 @@ class IAuthenticator(zope.interface.Interface):
"""
def prepare():
"""Prepare the authenticator.
Finish up any additional initialization.
:raises
:class:`letsencrypt.client.errors.LetsEncryptMisconfigurationError`
when full initialization cannot be completed.
"""
def get_chall_pref(domain):
"""Return list of challenge preferences.
@ -118,6 +128,16 @@ class IInstaller(zope.interface.Interface):
"""
def prepare():
"""Prepare the installer.
Finish up any additional initialization.
:raises
:class:`letsencrypt.client.errors.LetsEncryptMisconfigurationError`
when full initialization cannot be completed.
"""
def get_all_names():
"""Returns all names that may be authenticated."""

View file

@ -11,6 +11,7 @@ import csv
import logging
import os
import shutil
import tempfile
import Crypto.PublicKey.RSA
import M2Crypto
@ -64,7 +65,14 @@ class Revoker(object):
"""
certs = []
clean_pem = Crypto.PublicKey.RSA.importKey(authkey.pem).exportKey("PEM")
try:
clean_pem = Crypto.PublicKey.RSA.importKey(
authkey.pem).exportKey("PEM")
# https://www.dlitz.net/software/pycrypto/api/current/Crypto.PublicKey.RSA-module.html
except (IndexError, ValueError, TypeError):
raise errors.LetsEncryptRevokerError(
"Invalid key file specified to revoke_from_key")
with open(self.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
@ -73,9 +81,17 @@ class Revoker(object):
# Note: The key can be different than the pub key found in the
# certificate.
_, b_k = self._row_to_backup(row)
if clean_pem == Crypto.PublicKey.RSA.importKey(
open(b_k).read()).exportKey("PEM"):
certs.append(Cert.fromrow(row, self.config.cert_key_backup))
try:
if clean_pem == Crypto.PublicKey.RSA.importKey(
open(b_k).read()).exportKey("PEM"):
certs.append(
Cert.fromrow(row, self.config.cert_key_backup))
except (IndexError, ValueError, TypeError):
# This should never happen given the assumptions of the
# module. If it does, it is probably best to delete the
# the offending key/cert. For now... just raise an exception
raise errors.LetsEncryptRevokerError(
"%s - backup file is corrupted.")
if certs:
self._safe_revoke(certs)
@ -100,7 +116,7 @@ class Revoker(object):
for row in csvreader:
cert = Cert.fromrow(row, self.config.cert_key_backup)
if cert == cert_to_revoke:
if cert.get_der() == cert_to_revoke.get_der():
self._safe_revoke([cert])
return
@ -114,15 +130,17 @@ class Revoker(object):
while True:
if certs:
selection = revocation.choose_certs(certs)
code, selection = revocation.display_certs(certs)
revoked_certs = self._safe_revoke([certs[selection]])
# Since we are currently only revoking one cert at a time...
if revoked_certs:
# This is safer than using remove as Revoker.Certs only
# check the DER value of the cert. There could potentially
# be multiple backup certs with the same value.
del certs[selection]
if code == display_util.OK:
revoked_certs = self._safe_revoke([certs[selection]])
# Since we are currently only revoking one cert at a time...
if revoked_certs:
del certs[selection]
elif code == display_util.HELP:
revocation.more_info_cert(certs[selection])
else:
return
else:
logging.info(
"There are not any trusted Let's Encrypt "
@ -158,7 +176,7 @@ class Revoker(object):
return certs
def _get_installed_locations(self):
"""Get installed locations of certificates
"""Get installed locations of certificates.
:returns: map from cert sha1 fingerprint to :class:`list` of vhosts
where the certificate is installed.
@ -257,8 +275,7 @@ class Revoker(object):
:class:`letsencrypt.client.revoker.Cert`
"""
list_path2 = os.path.join(self.config.cert_key_backup, "LIST.tmp")
list_path2 = tempfile.mktemp(".tmp", "LIST")
idx = 0
with open(self.list_path, "rb") as orgfile:
@ -447,24 +464,14 @@ class Cert(object):
self.backup_path = backup
self.backup_key_path = backup_key
def get_installed_msg(self):
"""Access installed message."""
return ", ".join(self.installed)
def get_subject(self):
"""Get subject."""
return self.cert.get_subject().as_text()
# I would rather not have outside classes messing with Cert directly.
# (I would like to change M2Crypto -> something else without issues)
def get_cn(self):
"""Get common name."""
return self.cert.get_subject().CN
def get_issuer(self):
"""Get issuer."""
return self.cert.get_issuer().as_text()
def get_fingerprint(self):
"""Get sha1 fingerprint."""
"""Get SHA1"""
return self.cert.get_fingerprint(md="sha1")
def get_not_before(self):
@ -475,13 +482,16 @@ class Cert(object):
"""Get not_valid_after field."""
return self.cert.get_not_after().get_datetime()
def get_serial(self):
"""Get serial number."""
self.cert.get_serial_number()
def get_der(self):
"""Get certificate in der format."""
return self.cert.as_der()
def get_pub_key(self):
"""Get public key size."""
# .. todo:: M2Crypto doesn't support ECC, this will have to be updated
"""Get public key size.
.. todo:: M2Crypto doesn't support ECC, this will have to be updated
"""
return "RSA " + str(self.cert.get_pubkey().size() * 8)
def get_san(self):
@ -492,16 +502,17 @@ class Cert(object):
return ""
def __str__(self):
text = []
text.append("Subject: %s" % self.get_subject())
text.append("SAN: %s" % self.get_san())
text.append("Issuer: %s" % self.get_issuer())
text.append("Public Key: %s" % self.get_pub_key())
text.append("Not Before: %s" % str(self.get_not_before()))
text.append("Not After: %s" % str(self.get_not_after()))
text.append("Serial Number: %s" % self.get_serial())
text.append("SHA1: %s%s" % (self.get_fingerprint(), os.linesep))
text.append("Installed: %s" % self.get_installed_msg())
text = [
"Subject: %s" % self.cert.get_subject().as_text(),
"SAN: %s" % self.get_san(),
"Issuer: %s" % self.cert.get_issuer().as_text(),
"Public Key: %s" % self.get_pub_key(),
"Not Before: %s" % str(self.get_not_before()),
"Not After: %s" % str(self.get_not_after()),
"Serial Number: %s" % self.cert.get_serial_number(),
"SHA1: %s%s" % (self.get_fingerprint(), os.linesep),
"Installed: %s" % ", ".join(self.installed),
]
if self.orig is not None:
if self.orig.status == "":
@ -521,6 +532,3 @@ class Cert(object):
"""Nicely frames a cert str"""
frame = "-" * (display_util.WIDTH - 4) + os.linesep
return "{frame}{cert}{frame}".format(frame=frame, cert=str(self))
def __eq__(self, other):
return self.cert.as_der() == other.cert.as_der()

View file

@ -29,6 +29,8 @@ class StandaloneAuthenticator(object):
"""
zope.interface.implements(interfaces.IAuthenticator)
description = "Standalone Authenticator"
def __init__(self):
self.child_pid = None
self.parent_pid = os.getpid()
@ -39,6 +41,14 @@ class StandaloneAuthenticator(object):
self.private_key = None
self.ssl_conn = None
def prepare(self):
"""There is nothing left to setup.
.. todo:: This should probably do the port check
"""
pass
def client_signal_handler(self, sig, unused_frame):
"""Signal handler for the parent process.

View file

@ -75,6 +75,8 @@ def get_apache_configurator(
work_dir=work_dir),
version)
config.prepare()
return config

View file

@ -12,55 +12,55 @@ class DetermineAuthenticatorTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator \
import StandaloneAuthenticator
self.mock_stand = mock.MagicMock(spec=StandaloneAuthenticator)
self.mock_apache = mock.MagicMock(spec=ApacheConfigurator)
self.mock_stand = mock.MagicMock(
spec=StandaloneAuthenticator, description="Apache Web Server")
self.mock_apache = mock.MagicMock(
spec=ApacheConfigurator, description="Standalone Authenticator")
self.mock_config = mock.Mock()
self.all_auths = [
("Apache Web Server", self.mock_apache, self.mock_config),
("Standalone", self.mock_stand),
]
self.all_auths = [self.mock_apache, self.mock_stand]
@classmethod
def _call(cls, all_auths):
from letsencrypt.client.client import determine_authenticator
return determine_authenticator(all_auths)
@mock.patch("letsencrypt.client.client.ops.choose_authenticator")
@mock.patch("letsencrypt.client.client.display_ops.choose_authenticator")
def test_accept_two(self, mock_choose):
mock_choose.return_value = self.mock_stand()
self.assertEqual(self._call(self.all_auths), self.mock_stand())
def test_accept_one(self):
self.mock_apache.prepare.return_value = self.mock_apache
self.assertEqual(
self._call(self.all_auths[:1]), self.mock_apache(self.mock_config))
self._call(self.all_auths[:1]), self.mock_apache)
def test_no_installation_one(self):
self.mock_apache.side_effect = errors.LetsEncryptNoInstallationError
self.mock_apache.prepare.side_effect = \
errors.LetsEncryptNoInstallationError
self.assertEqual(self._call(self.all_auths), self.mock_stand())
self.assertEqual(self._call(self.all_auths), self.mock_stand)
def test_no_installations(self):
self.mock_apache.side_effect = errors.LetsEncryptNoInstallationError
self.mock_stand.side_effect = errors.LetsEncryptNoInstallationError
self.mock_apache.prepare.side_effect = \
errors.LetsEncryptNoInstallationError
self.mock_stand.prepare.side_effect = \
errors.LetsEncryptNoInstallationError
self.assertTrue(self._call(self.all_auths) is None)
self.assertRaises(errors.LetsEncryptClientError,
self._call,
self.all_auths)
@mock.patch("letsencrypt.client.client.logging")
@mock.patch("letsencrypt.client.client.ops.choose_authenticator")
def test_misconfigured(self, mock_choose, mock_log): # pylint: disable=unused-argument
self.mock_apache.side_effect = errors.LetsEncryptMisconfigurationError
@mock.patch("letsencrypt.client.client.display_ops.choose_authenticator")
def test_misconfigured(self, mock_choose, unused_log):
self.mock_apache.prepare.side_effect = \
errors.LetsEncryptMisconfigurationError
mock_choose.return_value = self.mock_apache
self.assertRaises(SystemExit, self._call, self.all_auths)
def test_too_many_params(self):
self.assertRaises(
errors.LetsEncryptClientError,
self._call,
[("desc", self.mock_apache, "1", "2", "3", "4", "5")])
class RollbackTest(unittest.TestCase):
"""Test the rollback function."""
@ -82,59 +82,6 @@ class RollbackTest(unittest.TestCase):
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 1)
self.assertEqual(self.m_install().restart.call_count, 1)
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.client.client.determine_installer")
def test_misconfiguration_fixed(self, mock_det, mock_rev, mock_input):
mock_det.side_effect = [errors.LetsEncryptMisconfigurationError,
self.m_install]
mock_input().yesno.return_value = True
self._call(1)
# Don't rollback twice... (only on one object)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 1)
# Only restart once
self.assertEqual(self.m_install.restart.call_count, 1)
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.client.logging.warning")
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.client.client.determine_installer")
def test_misconfiguration_remains(
self, mock_det, mock_rev, mock_warn, mock_input):
mock_det.side_effect = errors.LetsEncryptMisconfigurationError
mock_input().yesno.return_value = True
self._call(1)
# Don't rollback twice... (only on one object)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 1)
# Never call restart because init never succeeds
self.assertEqual(self.m_install().restart.call_count, 0)
# There should be a warning about the remaining problem
self.assertEqual(mock_warn.call_count, 1)
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.client.client.determine_installer")
def test_user_decides_to_manually_investigate(
self, mock_det, mock_rev, mock_input):
mock_det.side_effect = errors.LetsEncryptMisconfigurationError
mock_input().yesno.return_value = False
self._call(1)
# Neither is ever called
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 0)
@mock.patch("letsencrypt.client.client.determine_installer")
def test_no_installer(self, mock_det):
mock_det.return_value = None

View file

@ -62,8 +62,7 @@ class MakeKeyTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def test_it(self): # pylint: disable=no-self-use
from letsencrypt.client.crypto_util import make_key
# This individual test was taking over 6 seconds...
# I have shortened it... to aid debugging the rest of the project
# Do not test larger keys as it takes too long.
M2Crypto.RSA.load_key_string(make_key(1024))
@ -94,43 +93,5 @@ class MakeSSCertTest(unittest.TestCase):
make_ss_cert(RSA256_KEY, ['example.com', 'www.example.com'])
# class GetCertInfoTest(unittest.TestCase):
# """Tests for letsencrypt.client.crypto_util.get_cert_info."""
#
# def setUp(self):
# self.cert_info = {
# 'not_before': datetime.datetime(
# 2014, 12, 11, 22, 34, 45, tzinfo=M2Crypto.ASN1.UTC),
# 'not_after': datetime.datetime(
# 2014, 12, 18, 22, 34, 45, tzinfo=M2Crypto.ASN1.UTC),
# 'subject': 'C=US, ST=Michigan, L=Ann Arbor, O=University '
# 'of Michigan and the EFF, CN=example.com',
# 'cn': 'example.com',
# 'issuer': 'C=US, ST=Michigan, L=Ann Arbor, O=University '
# 'of Michigan and the EFF, CN=example.com',
# 'serial': 1337L,
# 'pub_key': 'RSA 512',
# }
#
# def _call(self, name):
# from letsencrypt.client.crypto_util import get_cert_info
# self.assertEqual(get_cert_info(pkg_resources.resource_filename(
# __name__, os.path.join('testdata', name))), self.cert_info)
#
# def test_single_domain(self):
# self.cert_info.update({
# 'san': '',
# 'fingerprint': '9F8CE01450D288467C3326AC0457E351939C72E',
# })
# self._call('cert.pem')
#
# def test_san(self):
# self.cert_info.update({
# 'san': 'DNS:example.com, DNS:www.example.com',
# 'fingerprint': '62F7110431B8E8F55905DBE5592518F9634AC50A',
# })
# self._call('cert-san.pem')
if __name__ == '__main__':
unittest.main()

View file

@ -1,4 +1,4 @@
"""Test display.ops."""
"""Test letsencrypt.client.display.ops."""
import sys
import unittest
@ -17,10 +17,7 @@ class ChooseAuthenticatorTest(unittest.TestCase):
self.mock_apache().more_info.return_value = "Apache Info"
self.mock_stand().more_info.return_value = "Standalone Info"
self.auths = [
("Apache Tag", self.mock_apache),
("Standalone Tag", self.mock_stand)
]
self.auths = [self.mock_apache, self.mock_stand]
self.errs = {self.mock_apache: "This is an error message."}
@ -58,7 +55,7 @@ class ChooseAuthenticatorTest(unittest.TestCase):
class GenHttpsNamesTest(unittest.TestCase):
"""Test _gen_https_names"""
"""Test _gen_https_names."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))

View file

@ -10,7 +10,7 @@ import zope.component
from letsencrypt.client.display import util as display_util
class ChooseCertsTest(unittest.TestCase):
class DisplayCertsTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.revoker import Cert
base_package = "letsencrypt.client.tests"
@ -25,33 +25,37 @@ class ChooseCertsTest(unittest.TestCase):
@classmethod
def _call(cls, certs):
from letsencrypt.client.display.revocation import choose_certs
return choose_certs(certs)
from letsencrypt.client.display.revocation import display_certs
return display_certs(certs)
@mock.patch("letsencrypt.client.display.revocation.util")
def test_revocation(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 0)
choice = self._call(self.certs)
code, choice = self._call(self.certs)
self.assertEqual(display_util.OK, code)
self.assertTrue(self.certs[choice] == self.cert0)
@mock.patch("letsencrypt.client.display.revocation.util")
def test_cancel(self, mock_util):
mock_util().menu.return_value = (display_util.CANCEL, -1)
self.assertRaises(SystemExit, self._call, self.certs)
code, _ = self._call(self.certs)
self.assertEqual(display_util.CANCEL, code)
class MoreInfoCertTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
@classmethod
def _call(cls, cert):
from letsencrypt.client.display.revocation import more_info_cert
more_info_cert(cert)
@mock.patch("letsencrypt.client.display.revocation.util")
def test_more_info(self, mock_util):
mock_util().menu.side_effect = [
(display_util.HELP, 1),
(display_util.OK, 1),
]
self._call(mock.MagicMock())
choice = self._call(self.certs)
self.assertTrue(self.certs[choice] == self.cert1)
self.assertEqual(mock_util().notification.call_count, 1)

View file

@ -330,7 +330,7 @@ class TestFullCheckpointsReverter(unittest.TestCase):
self.assertEqual(read_in(self.config2), "directive-dir2")
self.assertFalse(os.path.isfile(config3))
@mock.patch("letsencrypt.client.client.zope.component.getUtility")
@mock.patch("letsencrypt.client.reverter.zope.component.getUtility")
def test_view_config_changes(self, mock_output):
"""This is not strict as this is subject to change."""
self._setup_three_checkpoints()

View file

@ -1,4 +1,4 @@
"""Test :mod:`letsencrypt.client.revoker`."""
"""Test letsencrypt.client.revoker."""
import csv
import os
import pkg_resources
@ -10,6 +10,8 @@ import mock
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client.apache import configurator
from letsencrypt.client.display import util as display_util
class RevokerBase(unittest.TestCase): # pylint: disable=too-few-public-methods
@ -56,8 +58,9 @@ class RevokerTest(RevokerBase):
self._store_certs()
self.mock_installer = mock.MagicMock()
self.revoker = Revoker(self.mock_installer, self.mock_config)
self.revoker = Revoker(
mock.MagicMock(spec=configurator.ApacheConfigurator),
self.mock_config)
def tearDown(self):
shutil.rmtree(self.backup_dir)
@ -77,6 +80,18 @@ class RevokerTest(RevokerBase):
self.assertEqual(mock_net.call_count, 2)
@mock.patch("letsencrypt.client.revoker.Crypto.PublicKey.RSA.importKey")
def test_revoke_by_invalid_keys(self, mock_import):
mock_import.side_effect = ValueError
self.assertRaises(errors.LetsEncryptRevokerError,
self.revoker.revoke_from_key,
self.key)
mock_import.side_effect = [mock.Mock(), IndexError]
self.assertRaises(errors.LetsEncryptRevokerError,
self.revoker.revoke_from_key,
self.key)
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
@ -95,8 +110,6 @@ class RevokerTest(RevokerBase):
# No revocation went through
self.assertEqual(mock_net.call_count, 0)
@mock.patch("letsencrypt.client.revoker.network."
"Network.send_and_receive_expected")
@mock.patch("letsencrypt.client.revoker.revocation")
@ -140,9 +153,13 @@ class RevokerTest(RevokerBase):
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_menu(self, mock_display, mock_net):
mock_display().confirm_revocation.return_value = True
mock_display.choose_certs.side_effect = [0, SystemExit]
mock_display.display_certs.side_effect = [
(display_util.HELP, 0),
(display_util.OK, 0),
(display_util.CANCEL, -1),
]
self.assertRaises(SystemExit, self.revoker.revoke_from_menu)
self.revoker.revoke_from_menu()
row0 = self.certs[0].get_row()
row1 = self.certs[1].get_row()
@ -153,6 +170,7 @@ class RevokerTest(RevokerBase):
self.assertTrue(self._backups_exist(row1))
self.assertEqual(mock_net.call_count, 1)
self.assertEqual(mock_display.more_info_cert.call_count, 1)
@mock.patch("letsencrypt.client.revoker.logging")
@mock.patch("letsencrypt.client.revoker.network."
@ -160,7 +178,7 @@ class RevokerTest(RevokerBase):
@mock.patch("letsencrypt.client.revoker.revocation")
def test_revoke_by_menu_delete_all(self, mock_display, mock_net, mock_log):
mock_display().confirm_revocation.return_value = True
mock_display.choose_certs.return_value = 0
mock_display.display_certs.return_value = (display_util.OK, 0)
self.revoker.revoke_from_menu()
@ -288,7 +306,7 @@ class RevokerClassMethodsTest(RevokerBase):
self.assertTrue(os.path.isfile(self.list_path))
rows = self._get_rows()
i = 0
for i, row in enumerate(rows):
# pylint: disable=protected-access
self.assertTrue(os.path.isfile(
@ -297,7 +315,7 @@ class RevokerClassMethodsTest(RevokerBase):
Revoker._get_backup(self.backup_dir, i, self.key_path)))
self.assertEqual([str(i), self.paths[i], self.key_path], row)
self.assertEqual(i, 1)
self.assertEqual(len(rows), 2)
def test_store_one_mixed(self):
from letsencrypt.client.revoker import Revoker

View file

@ -549,10 +549,26 @@ class MoreInfoTest(unittest.TestCase):
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
def test_chall_pref(self):
def test_more_info(self):
"""Make sure exceptions aren't raised."""
self.authenticator.more_info()
class InitTest(unittest.TestCase):
"""Tests for more_info() method. (trivially)"""
def setUp(self):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
def test_prepare(self):
"""Make sure exceptions aren't raised.
.. todo:: Add on more once things are setup appropriately.
"""
self.authenticator.prepare()
if __name__ == "__main__":
unittest.main()

View file

@ -22,7 +22,7 @@ from letsencrypt.client import log
from letsencrypt.client import standalone_authenticator as standalone
from letsencrypt.client.apache import configurator
from letsencrypt.client.display import util as display_util
from letsencrypt.client.display import ops
from letsencrypt.client.display import ops as display_ops
def create_parser():
@ -124,7 +124,7 @@ def main(): # pylint: disable=too-many-branches
client.view_config_changes(config)
sys.exit()
if args.revoke or args.rev_cert or args.rev_key:
if args.revoke or args.rev_cert is not None or args.rev_key is not None:
client.revoke(config, args.no_confirm, args.rev_cert, args.rev_key)
sys.exit()
@ -135,10 +135,9 @@ def main(): # pylint: disable=too-many-branches
if not args.eula:
display_eula()
# list of (Description, Known Authenticator classes, init arguments)
all_auths = [
("Apache Web Server", configurator.ApacheConfigurator, config),
("Standalone Authenticator", standalone.StandaloneAuthenticator),
configurator.ApacheConfigurator(config),
standalone.StandaloneAuthenticator(),
]
auth = client.determine_authenticator(all_auths)
if auth is None:
@ -152,7 +151,10 @@ def main(): # pylint: disable=too-many-branches
# This is simple and avoids confusion right now.
installer = None
doms = ops.choose_names(installer) if args.domains is None else args.domains
if args.domains is None:
doms = display_ops.choose_names(installer)
else:
doms = args.domains
# Prepare for init of Client
if args.authkey is None: