Merged in master

This commit is contained in:
Brad Warren 2015-08-03 18:32:32 -07:00
commit 5ae2bd06cf
46 changed files with 2722 additions and 930 deletions

View file

@ -14,6 +14,7 @@ before_install:
env:
global:
- GOPATH=/tmp/go
- PATH=$GOPATH/bin:$PATH
matrix:
- TOXENV=py26 BOULDER_INTEGRATION=1
- TOXENV=py27 BOULDER_INTEGRATION=1
@ -28,7 +29,7 @@ addons:
- le.wtf
install: "travis_retry pip install tox coveralls"
before_script: '[ "xxx$BOULDER_INTEGRATION" = "xxx" ] || ./tests/boulder-start.sh amqp'
before_script: '[ "xxx$BOULDER_INTEGRATION" = "xxx" ] || ./tests/boulder-start.sh'
script: 'travis_retry tox && ([ "xxx$BOULDER_INTEGRATION" = "xxx" ] || (source .tox/$TOXENV/bin/activate && ./tests/boulder-integration.sh))'
after_success: '[ "$TOXENV" == "cover" ] && coveralls'

View file

@ -3,6 +3,7 @@ import logging
import augeas
from letsencrypt import errors
from letsencrypt import reverter
from letsencrypt.plugins import common
@ -23,7 +24,6 @@ class AugeasConfigurator(common.Plugin):
:type reverter: :class:`letsencrypt.reverter.Reverter`
"""
def __init__(self, *args, **kwargs):
super(AugeasConfigurator, self).__init__(*args, **kwargs)
@ -38,13 +38,16 @@ class AugeasConfigurator(common.Plugin):
# because this will change the underlying configuration and potential
# vhosts
self.reverter = reverter.Reverter(self.config)
self.reverter.recovery_routine()
self.recovery_routine()
def check_parsing_errors(self, lens):
"""Verify Augeas can parse all of the lens files.
:param str lens: lens to check for errors
:raises .errors.PluginError: If there has been an error in parsing with
the specified lens.
"""
error_files = self.aug.match("/augeas//error")
@ -54,11 +57,13 @@ class AugeasConfigurator(common.Plugin):
lens_path = self.aug.get(path + "/lens")
# As aug.get may return null
if lens_path and lens in lens_path:
logger.error(
msg = (
"There has been an error in parsing the file (%s): %s",
# Strip off /augeas/files and /error
path[13:len(path) - 6], self.aug.get(path + "/message"))
raise errors.PluginError(msg)
# TODO: Cleanup this function
def save(self, title=None, temporary=False):
"""Saves all changes to the configuration files.
@ -73,6 +78,9 @@ class AugeasConfigurator(common.Plugin):
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (ie. challenges)
:raises .errors.PluginError: If there was an error in Augeas, in an
attempt to save the configuration, or an error creating a checkpoint
"""
save_state = self.aug.get("/augeas/save")
self.aug.set("/augeas/save", "noop")
@ -85,7 +93,8 @@ class AugeasConfigurator(common.Plugin):
self._log_save_errors(ex_errs)
# Erase Save Notes
self.save_notes = ""
return False
raise errors.PluginError(
"Error saving files, check logs for more info.")
# Retrieve list of modified files
# Note: Noop saves can cause the file to be listed twice, I used a
@ -99,22 +108,26 @@ class AugeasConfigurator(common.Plugin):
for path in save_paths:
save_files.add(self.aug.get(path)[6:])
# Create Checkpoint
if temporary:
self.reverter.add_to_temp_checkpoint(
save_files, self.save_notes)
else:
self.reverter.add_to_checkpoint(save_files, self.save_notes)
try:
# Create Checkpoint
if temporary:
self.reverter.add_to_temp_checkpoint(
save_files, self.save_notes)
else:
self.reverter.add_to_checkpoint(save_files, self.save_notes)
except errors.ReverterError as err:
raise errors.PluginError(str(err))
if title and not temporary:
self.reverter.finalize_checkpoint(title)
try:
self.reverter.finalize_checkpoint(title)
except errors.ReverterError as err:
raise errors.PluginError(str(err))
self.aug.set("/augeas/save", save_state)
self.save_notes = ""
self.aug.save()
return True
def _log_save_errors(self, ex_errs):
"""Log errors due to bad Augeas save.
@ -135,14 +148,26 @@ class AugeasConfigurator(common.Plugin):
Reverts all modified files that have not been saved as a checkpoint
:raises .errors.PluginError: If unable to recover the configuration
"""
self.reverter.recovery_routine()
try:
self.reverter.recovery_routine()
except errors.ReverterError as err:
raise errors.PluginError(str(err))
# Need to reload configuration after these changes take effect
self.aug.load()
def revert_challenge_config(self):
"""Used to cleanup challenge configurations."""
self.reverter.revert_temporary_config()
"""Used to cleanup challenge configurations.
:raises .errors.PluginError: If unable to revert the challenge config.
"""
try:
self.reverter.revert_temporary_config()
except errors.ReverterError as err:
raise errors.PluginError(str(err))
self.aug.load()
def rollback_checkpoints(self, rollback=1):
@ -150,10 +175,24 @@ class AugeasConfigurator(common.Plugin):
:param int rollback: Number of checkpoints to revert
:raises .errors.PluginError: If there is a problem with the input or
the function is unable to correctly revert the configuration
"""
self.reverter.rollback_checkpoints(rollback)
try:
self.reverter.rollback_checkpoints(rollback)
except errors.ReverterError as err:
raise errors.PluginError(str(err))
self.aug.load()
def view_config_changes(self):
"""Show all of the configuration changes that have taken place."""
self.reverter.view_config_changes()
"""Show all of the configuration changes that have taken place.
:raises .errors.PluginError: If there is a problem while processing
the checkpoints directories.
"""
try:
self.reverter.view_config_changes()
except errors.ReverterError as err:
raise errors.PluginError(str(err))

File diff suppressed because it is too large Load diff

View file

@ -6,6 +6,7 @@ CLI_DEFAULTS = dict(
server_root="/etc/apache2",
ctl="apache2ctl",
enmod="a2enmod",
dismod="a2dismod",
init_script="/etc/init.d/apache2",
le_vhost_ext="-le-ssl.conf",
)
@ -20,5 +21,5 @@ MOD_SSL_CONF_SRC = pkg_resources.resource_filename(
distribution."""
REWRITE_HTTPS_ARGS = [
"^.*$", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,R=permanent]"]
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,QSA,R=permanent]"]
"""Apache rewrite rule arguments used for redirections to https vhost"""

View file

@ -60,9 +60,9 @@ def _vhost_menu(domain, vhosts):
choices = []
for vhost in vhosts:
if len(vhost.names) == 1:
disp_name = next(iter(vhost.names))
elif len(vhost.names) == 0:
if len(vhost.get_names()) == 1:
disp_name = next(iter(vhost.get_names()))
elif len(vhost.get_names()) == 0:
disp_name = ""
else:
disp_name = "Multiple Names"

View file

@ -3,6 +3,7 @@ import os
from letsencrypt.plugins import common
from letsencrypt_apache import obj
from letsencrypt_apache import parser
@ -44,6 +45,13 @@ class ApacheDvsni(common.Dvsni):
"""
def __init__(self, *args, **kwargs):
super(ApacheDvsni, self).__init__(*args, **kwargs)
self.challenge_conf = os.path.join(
self.configurator.conf("server-root"),
"le_dvsni_cert_challenge.conf")
def perform(self):
"""Peform a DVSNI challenge."""
if not self.achalls:
@ -52,20 +60,9 @@ class ApacheDvsni(common.Dvsni):
# About to make temporary changes to the config
self.configurator.save()
addresses = []
default_addr = "*:443"
for achall in self.achalls:
vhost = self.configurator.choose_vhost(achall.domain)
# TODO - @jdkasten review this code to make sure it makes sense
self.configurator.make_server_sni_ready(vhost, default_addr)
for addr in vhost.addrs:
if "_default_" == addr.get_addr():
addresses.append([default_addr])
break
else:
addresses.append(list(vhost.addrs))
# Prepare the server for HTTPS
self.configurator.prepare_server_https(
str(self.configurator.config.dvsni_port))
responses = []
@ -74,25 +71,32 @@ class ApacheDvsni(common.Dvsni):
responses.append(self._setup_challenge_cert(achall))
# Setup the configuration
self._mod_config(addresses)
dvsni_addrs = self._mod_config()
self.configurator.make_addrs_sni_ready(dvsni_addrs)
# Save reversible changes
self.configurator.save("SNI Challenge", True)
return responses
def _mod_config(self, ll_addrs):
def _mod_config(self):
"""Modifies Apache config files to include challenge vhosts.
Result: Apache config includes virtual servers for issued challs
:param list ll_addrs: list of list of `~.common.Addr` to apply
:returns: All DVSNI addresses used
:rtype: set
"""
# TODO: Use ip address of existing vhost instead of relying on FQDN
dvsni_addrs = set()
config_text = "<IfModule mod_ssl.c>\n"
for idx, lis in enumerate(ll_addrs):
config_text += self._get_config_text(self.achalls[idx], lis)
for achall in self.achalls:
achall_addrs = self.get_dvsni_addrs(achall)
dvsni_addrs.update(achall_addrs)
config_text += self._get_config_text(achall, achall_addrs)
config_text += "</IfModule>\n"
self._conf_include_check(self.configurator.parser.loc["default"])
@ -102,6 +106,25 @@ class ApacheDvsni(common.Dvsni):
with open(self.challenge_conf, "w") as new_conf:
new_conf.write(config_text)
return dvsni_addrs
def get_dvsni_addrs(self, achall):
"""Return the Apache addresses needed for DVSNI."""
vhost = self.configurator.choose_vhost(achall.domain)
# TODO: Checkout _default_ rules.
dvsni_addrs = set()
default_addr = obj.Addr(("*", str(self.configurator.config.dvsni_port)))
for addr in vhost.addrs:
if "_default_" == addr.get_addr():
dvsni_addrs.add(default_addr)
else:
dvsni_addrs.add(
addr.get_sni_addr(self.configurator.config.dvsni_port))
return dvsni_addrs
def _conf_include_check(self, main_config):
"""Adds DVSNI challenge conf file into configuration.
@ -125,7 +148,7 @@ class ApacheDvsni(common.Dvsni):
:type achall: :class:`letsencrypt.achallenges.DVSNI`
:param list ip_addrs: addresses of challenged domain
:class:`list` of type `~.common.Addr`
:class:`list` of type `~.obj.Addr`
:returns: virtual host configuration text
:rtype: str
@ -141,7 +164,7 @@ class ApacheDvsni(common.Dvsni):
# https://docs.python.org/2.7/reference/lexical_analysis.html
return self.VHOST_TEMPLATE.format(
vhost=ips, server_name=achall.nonce_domain,
ssl_options_conf_path=self.configurator.parser.loc["ssl_options"],
ssl_options_conf_path=self.configurator.mod_ssl_conf,
cert_path=self.get_cert_path(achall),
key_path=self.get_key_path(achall),
document_root=document_root).replace("\n", os.linesep)

View file

@ -1,4 +1,88 @@
"""Module contains classes used by the Apache Configurator."""
import re
from letsencrypt.plugins import common
class Addr(common.Addr):
"""Represents an Apache address."""
def __eq__(self, other):
"""This is defined as equalivalent within Apache.
ip_addr:* == ip_addr
"""
if isinstance(other, self.__class__):
return ((self.tup == other.tup) or
(self.tup[0] == other.tup[0]
and self.is_wildcard() and other.is_wildcard()))
return False
def __ne__(self, other):
return not self.__eq__(other)
def _addr_less_specific(self, addr):
"""Returns if addr.get_addr() is more specific than self.get_addr()."""
# pylint: disable=protected-access
return addr._rank_specific_addr() > self._rank_specific_addr()
def _rank_specific_addr(self):
"""Returns numerical rank for get_addr()
:returns: 2 - FQ, 1 - wildcard, 0 - _default_
:rtype: int
"""
if self.get_addr() == "_default_":
return 0
elif self.get_addr() == "*":
return 1
else:
return 2
def conflicts(self, addr):
"""Returns if address could conflict with correct function of self.
Could addr take away service provided by self within Apache?
.. note::IP Address is more important than wildcard.
Connection from 127.0.0.1:80 with choices of *:80 and 127.0.0.1:*
chooses 127.0.0.1:*
.. todo:: Handle domain name addrs...
Examples:
127.0.0.1:*.conflicts(127.0.0.1:443) - True
127.0.0.1:443.conflicts(127.0.0.1:*) - False
*:443.conflicts(*:80) - False
_default_:443.conflicts(*:443) - True
"""
if self._addr_less_specific(addr):
return True
elif self.get_addr() == addr.get_addr():
if self.is_wildcard() or self.get_port() == addr.get_port():
return True
return False
def is_wildcard(self):
"""Returns if address has a wildcard port."""
return self.tup[1] == "*" or not self.tup[1]
def get_sni_addr(self, port):
"""Returns the least specific address that resolves on the port.
Example:
1.2.3.4:443 -> 1.2.3.4:<port>
1.2.3.4:* -> 1.2.3.4:*
:param str port: Desired port
"""
if self.is_wildcard():
return self
return self.get_addr_obj(port)
class VirtualHost(object): # pylint: disable=too-few-public-methods
@ -8,39 +92,56 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
:ivar str path: Augeas path to virtual host
:ivar set addrs: Virtual Host addresses (:class:`set` of
:class:`common.Addr`)
:ivar set names: Server names/aliases of vhost
:ivar str name: ServerName of VHost
:ivar list aliases: Server aliases of vhost
(:class:`list` of :class:`str`)
:ivar bool ssl: SSLEngine on in vhost
:ivar bool enabled: Virtual host is enabled
https://httpd.apache.org/docs/2.4/vhosts/details.html
.. todo:: Any vhost that includes the magic _default_ wildcard is given the
same ServerName as the main server.
"""
def __init__(self, filep, path, addrs, ssl, enabled, names=None):
# ?: is used for not returning enclosed characters
strip_name = re.compile(r"^(?:.+://)?([^ :$]*)")
def __init__(self, filep, path, addrs, ssl, enabled, name=None, aliases=None):
# pylint: disable=too-many-arguments
"""Initialize a VH."""
self.filep = filep
self.path = path
self.addrs = addrs
self.names = set() if names is None else set(names)
self.name = name
self.aliases = aliases if aliases is not None else set()
self.ssl = ssl
self.enabled = enabled
def add_name(self, name):
"""Add name to vhost."""
self.names.add(name)
def get_names(self):
"""Return a set of all names."""
all_names = set()
all_names.update(self.aliases)
# Strip out any scheme:// and <port> field from servername
if self.name is not None:
all_names.add(VirtualHost.strip_name.findall(self.name)[0])
return all_names
def __str__(self):
return (
"File: {filename}\n"
"Vhost path: {vhpath}\n"
"Addresses: {addrs}\n"
"Names: {names}\n"
"Name: {name}\n"
"Aliases: {aliases}\n"
"TLS Enabled: {tls}\n"
"Site Enabled: {active}".format(
filename=self.filep,
vhpath=self.path,
addrs=", ".join(str(addr) for addr in self.addrs),
names=", ".join(name for name in self.names),
name=self.name if self.name is not None else "",
aliases=", ".join(name for name in self.aliases),
tls="Yes" if self.ssl else "No",
active="Yes" if self.enabled else "No"))
@ -48,7 +149,73 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
if isinstance(other, self.__class__):
return (self.filep == other.filep and self.path == other.path and
self.addrs == other.addrs and
self.names == other.names and
self.get_names() == other.get_names() and
self.ssl == other.ssl and self.enabled == other.enabled)
return False
def __ne__(self, other):
return not self.__eq__(other)
def conflicts(self, addrs):
"""See if vhost conflicts with any of the addrs.
This determines whether or not these addresses would/could overwrite
the vhost addresses.
:param addrs: Iterable Addresses
:type addrs: Iterable :class:~obj.Addr
:returns: If addresses conflicts with vhost
:rtype: bool
"""
for pot_addr in addrs:
for addr in self.addrs:
if addr.conflicts(pot_addr):
return True
return False
def same_server(self, vhost):
"""Determines if the vhost is the same 'server'.
Used in redirection - indicates whether or not the two virtual hosts
serve on the exact same IP combinations, but different ports.
.. todo:: Handle _default_
"""
if vhost.get_names() != self.get_names():
return False
# If equal and set is not empty... assume same server
if self.name is not None or self.aliases:
return True
# Both sets of names are empty.
# Make conservative educated guess... this is very restrictive
# Consider adding more safety checks.
if len(vhost.addrs) != len(self.addrs):
return False
# already_found acts to keep everything very conservative.
# Don't allow multiple ip:ports in same set.
already_found = set()
for addr in vhost.addrs:
for local_addr in self.addrs:
if (local_addr.get_addr() == addr.get_addr() and
local_addr != addr and
local_addr.get_addr() not in already_found):
# This intends to make sure we aren't double counting...
# e.g. 127.0.0.1:* - We require same number of addrs
# currently
already_found.add(local_addr.get_addr())
break
else:
return False
return True

View file

@ -1,33 +1,175 @@
"""ApacheParser is a member object of the ApacheConfigurator class."""
import fnmatch
import itertools
import logging
import os
import re
import subprocess
from letsencrypt import errors
logger = logging.getLogger(__name__)
class ApacheParser(object):
"""Class handles the fine details of parsing the Apache Configuration.
:ivar str root: Normalized abosulte path to the server root
.. todo:: Make parsing general... remove sites-available etc...
:ivar str root: Normalized absolute path to the server root
directory. Without trailing slash.
:ivar str root: Server root
:ivar set modules: All module names that are currently enabled.
:ivar dict loc: Location to place directives, root - configuration origin,
default - user config file, name - NameVirtualHost,
"""
arg_var_interpreter = re.compile(r"\$\{[^ \}]*}")
fnmatch_chars = set(["*", "?", "\\", "[", "]"])
def __init__(self, aug, root, ctl):
# Note: Order is important here.
# This uses the binary, so it can be done first.
# https://httpd.apache.org/docs/2.4/mod/core.html#define
# https://httpd.apache.org/docs/2.4/mod/core.html#ifdefine
# This only handles invocation parameters and Define directives!
self.variables = {}
self.update_runtime_variables(ctl)
def __init__(self, aug, root, ssl_options):
# Find configuration root and make sure augeas can parse it.
self.aug = aug
# Find configuration root and make sure augeas can parse it.
self.root = os.path.abspath(root)
self.loc = self._set_locations(ssl_options)
self.loc = {"root": self._find_config_root()}
self._parse_file(self.loc["root"])
# This problem has been fixed in Augeas 1.0
self.standardize_excl()
# Temporarily set modules to be empty, so that find_dirs can work
# https://httpd.apache.org/docs/2.4/mod/core.html#ifmodule
# This needs to come before locations are set.
self.modules = set()
self._init_modules()
# Set up rest of locations
self.loc.update(self._set_locations())
# Must also attempt to parse sites-available or equivalent
# Sites-available is not included naturally in configuration
self._parse_file(os.path.join(self.root, "sites-available") + "/*")
# This problem has been fixed in Augeas 1.0
self.standardize_excl()
def _init_modules(self):
"""Iterates on the configuration until no new modules are loaded.
def add_dir_to_ifmodssl(self, aug_conf_path, directive, val):
..todo:: This should be attempted to be done with a binary to avoid
the iteration issue. Else... parse and enable mods at same time.
"""
matches = self.find_dir("LoadModule")
iterator = iter(matches)
# Make sure prev_size != cur_size for do: while: iteration
prev_size = -1
while len(self.modules) != prev_size:
prev_size = len(self.modules)
for match_name, match_filename in itertools.izip(
iterator, iterator):
self.modules.add(self.get_arg(match_name))
self.modules.add(
os.path.basename(self.get_arg(match_filename))[:-2] + "c")
def update_runtime_variables(self, ctl):
""""
.. note:: Compile time variables (apache2ctl -V) are not used within the
dynamic configuration files. These should not be parsed or
interpreted.
.. todo:: Create separate compile time variables... simply for arg_get()
"""
stdout = self._get_runtime_cfg(ctl)
variables = dict()
matches = re.compile(r"Define: ([^ \n]*)").findall(stdout)
try:
matches.remove("DUMP_RUN_CFG")
except ValueError:
raise errors.PluginError("Unable to parse runtime variables")
for match in matches:
if match.count("=") > 1:
logger.error("Unexpected number of equal signs in "
"apache2ctl -D DUMP_RUN_CFG")
raise errors.PluginError(
"Error parsing Apache runtime variables")
parts = match.partition("=")
variables[parts[0]] = parts[2]
self.variables = variables
def _get_runtime_cfg(self, ctl): # pylint: disable=no-self-use
"""Get runtime configuration info.
:returns: stdout from DUMP_RUN_CFG
"""
try:
proc = subprocess.Popen(
[ctl, "-D", "DUMP_RUN_CFG"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
except (OSError, ValueError):
logger.error(
"Error accessing %s for runtime parameters!%s", ctl, os.linesep)
raise errors.MisconfigurationError(
"Error accessing loaded Apache parameters: %s", ctl)
# Small errors that do not impede
if proc.returncode != 0:
logger.warn("Error in checking parameter list: %s", stderr)
raise errors.MisconfigurationError(
"Apache is unable to check whether or not the module is "
"loaded because Apache is misconfigured.")
return stdout
def filter_args_num(self, matches, args): # pylint: disable=no-self-use
"""Filter out directives with specific number of arguments.
This function makes the assumption that all related arguments are given
in order. Thus /files/apache/directive[5]/arg[2] must come immediately
after /files/apache/directive[5]/arg[1]. Runs in 1 linear pass.
:param string matches: Matches of all directives with arg nodes
:param int args: Number of args you would like to filter
:returns: List of directives that contain # of arguments.
(arg is stripped off)
"""
filtered = []
if args == 1:
for i in range(len(matches)):
if matches[i].endswith("/arg"):
filtered.append(matches[i][:-4])
else:
for i in range(len(matches)):
if matches[i].endswith("/arg[%d]" % args):
# Make sure we don't cause an IndexError (end of list)
# Check to make sure arg + 1 doesn't exist
if (i == (len(matches) - 1) or
not matches[i + 1].endswith("/arg[%d]" % (args + 1))):
filtered.append(matches[i][:-len("/arg[%d]" % args)])
return filtered
def add_dir_to_ifmodssl(self, aug_conf_path, directive, args):
"""Adds directive and value to IfMod ssl block.
Adds given directive and value along configuration path within
@ -35,8 +177,9 @@ class ApacheParser(object):
the file, it is created.
:param str aug_conf_path: Desired Augeas config path to add directive
:param str directive: Directive you would like to add
:param str val: Value of directive ie. Listen 443, 443 is the value
:param str directive: Directive you would like to add, e.g. Listen
:param args: Values of the directive; str "443" or list of str
:type args: list
"""
# TODO: Add error checking code... does the path given even exist?
@ -46,7 +189,12 @@ class ApacheParser(object):
self.aug.insert(if_mod_path + "arg", "directive", False)
nvh_path = if_mod_path + "directive[1]"
self.aug.set(nvh_path, directive)
self.aug.set(nvh_path + "/arg", val)
if len(args) == 1:
self.aug.set(nvh_path + "/arg", args[0])
else:
for i, arg in enumerate(args):
self.aug.set("%s/arg[%d]" % (nvh_path, i+1), arg)
def _get_ifmod(self, aug_conf_path, mod):
"""Returns the path to <IfMod mod> and creates one if it doesn't exist.
@ -65,7 +213,7 @@ class ApacheParser(object):
# Strip off "arg" at end of first ifmod path
return if_mods[0][:len(if_mods[0]) - 3]
def add_dir(self, aug_conf_path, directive, arg):
def add_dir(self, aug_conf_path, directive, args):
"""Appends directive to the end fo the file given by aug_conf_path.
.. note:: Not added to AugeasConfigurator because it may depend
@ -73,24 +221,24 @@ class ApacheParser(object):
:param str aug_conf_path: Augeas configuration path to add directive
:param str directive: Directive to add
:param str arg: Value of the directive. ie. Listen 443, 443 is arg
:param args: Value of the directive. ie. Listen 443, 443 is arg
:type args: list or str
"""
self.aug.set(aug_conf_path + "/directive[last() + 1]", directive)
if isinstance(arg, list):
for i, value in enumerate(arg, 1):
if isinstance(args, list):
for i, value in enumerate(args, 1):
self.aug.set(
"%s/directive[last()]/arg[%d]" % (aug_conf_path, i), value)
else:
self.aug.set(aug_conf_path + "/directive[last()]/arg", arg)
self.aug.set(aug_conf_path + "/directive[last()]/arg", args)
def find_dir(self, directive, arg=None, start=None):
def find_dir(self, directive, arg=None, start=None, exclude=True):
"""Finds directive in the configuration.
Recursively searches through config files to find directives
Directives should be in the form of a case insensitive regex currently
.. todo:: Add order to directives returned. Last directive comes last..
.. todo:: arg should probably be a list
Note: Augeas is inherently case sensitive while Apache is case
@ -101,20 +249,19 @@ class ApacheParser(object):
compatibility.
:param str directive: Directive to look for
:param arg: Specific value directive must have, None if all should
be considered
:type arg: str or None
:param str start: Beginning Augeas path to begin looking
:param bool exclude: Whether or not to exclude directives based on
variables and enabled modules
"""
# Cannot place member variable in the definition of the function so...
if not start:
start = get_aug_path(self.loc["root"])
# Debug code
# print "find_dir:", directive, "arg:", arg, " | Looking in:", start
# No regexp code
# if arg is None:
# matches = self.aug.match(start +
@ -127,32 +274,101 @@ class ApacheParser(object):
# includes = self.aug.match(start +
# "//* [self::directive='Include']/* [label()='arg']")
regex = "(%s)|(%s)|(%s)" % (case_i(directive),
case_i("Include"),
case_i("IncludeOptional"))
matches = self.aug.match(
"%s//*[self::directive=~regexp('%s')]" % (start, regex))
if exclude:
matches = self._exclude_dirs(matches)
if arg is None:
matches = self.aug.match(("%s//*[self::directive=~regexp('%s')]/arg"
% (start, directive)))
arg_suffix = "/arg"
else:
matches = self.aug.match(("%s//*[self::directive=~regexp('%s')]/*"
"[self::arg=~regexp('%s')]" %
(start, directive, arg)))
arg_suffix = "/*[self::arg=~regexp('%s')]" % case_i(arg)
incl_regex = "(%s)|(%s)" % (case_i('Include'),
case_i('IncludeOptional'))
ordered_matches = []
includes = self.aug.match(("%s//* [self::directive=~regexp('%s')]/* "
"[label()='arg']" % (start, incl_regex)))
# TODO: Wildcards should be included in alphabetical order
# https://httpd.apache.org/docs/2.4/mod/core.html#include
for match in matches:
dir_ = self.aug.get(match).lower()
if dir_ == "include" or dir_ == "includeoptional":
# start[6:] to strip off /files
#print self._get_include_path(self.get_arg(match +"/arg")), directive, arg
ordered_matches.extend(self.find_dir(
directive, arg,
self._get_include_path(self.get_arg(match + "/arg")),
exclude))
# This additionally allows Include
if dir_ == directive.lower():
ordered_matches.extend(self.aug.match(match + arg_suffix))
# for inc in includes:
# print inc, self.aug.get(inc)
return ordered_matches
for include in includes:
# start[6:] to strip off /files
matches.extend(self.find_dir(
directive, arg, self._get_include_path(
strip_dir(start[6:]), self.aug.get(include))))
def get_arg(self, match):
"""Uses augeas.get to get argument value and interprets result.
return matches
This also converts all variables and parameters appropriately.
def _get_include_path(self, cur_dir, arg):
"""
value = self.aug.get(match)
variables = ApacheParser.arg_var_interpreter.findall(value)
for var in variables:
# Strip off ${ and }
try:
value = value.replace(var, self.variables[var[2:-1]])
except KeyError:
raise errors.PluginError("Error Parsing variable: %s" % var)
return value
def _exclude_dirs(self, matches):
"""Exclude directives that are not loaded into the configuration."""
filters = [("ifmodule", self.modules), ("ifdefine", self.variables)]
valid_matches = []
for match in matches:
for filter_ in filters:
if not self._pass_filter(match, filter_):
break
else:
valid_matches.append(match)
return valid_matches
def _pass_filter(self, match, filter_):
"""Determine if directive passes a filter.
:param str match: Augeas path
:param list filter: list of tuples of form
[("lowercase if directive", set of relevant parameters)]
"""
match_l = match.lower()
last_match_idx = match_l.find(filter_[0])
while last_match_idx != -1:
# Check args
end_of_if = match_l.find("/", last_match_idx)
# This should be aug.get (vars are not used e.g. parser.aug_get)
expression = self.aug.get(match[:end_of_if] + "/arg")
if expression.startswith("!"):
# Strip off "!"
if expression[1:] in filter_[1]:
return False
else:
if expression not in filter_[1]:
return False
last_match_idx = match_l.find(filter_[0], end_of_if)
return True
def _get_include_path(self, arg):
"""Converts an Apache Include directive into Augeas path.
Converts an Apache Include directive argument into an Augeas
@ -160,29 +376,12 @@ class ApacheParser(object):
.. todo:: convert to use os.path.join()
:param str cur_dir: current working directory
:param str arg: Argument of Include directive
:returns: Augeas path string
:rtype: str
"""
# Sanity check argument - maybe
# Question: what can the attacker do with control over this string
# Effect parse file... maybe exploit unknown errors in Augeas
# If the attacker can Include anything though... and this function
# only operates on Apache real config data... then the attacker has
# already won.
# Perhaps it is better to simply check the permissions on all
# included files?
# check_config to validate apache config doesn't work because it
# would create a race condition between the check and this input
# TODO: Maybe... although I am convinced we have lost if
# Apache files can't be trusted. The augeas include path
# should be made to be exact.
# Check to make sure only expected characters are used <- maybe remove
# validChars = re.compile("[a-zA-Z0-9.*?_-/]*")
# matchObj = validChars.match(arg)
@ -192,58 +391,48 @@ class ApacheParser(object):
# Standardize the include argument based on server root
if not arg.startswith("/"):
arg = cur_dir + arg
# conf/ is a special variable for ServerRoot in Apache
elif arg.startswith("conf/"):
arg = self.root + arg[4:]
# TODO: Test if Apache allows ../ or ~/ for Includes
# Normpath will condense ../
arg = os.path.normpath(os.path.join(self.root, arg))
# Attempts to add a transform to the file if one does not already exist
self._parse_file(arg)
if os.path.isdir(arg):
self._parse_file(os.path.join(arg, "*"))
else:
self._parse_file(arg)
# Argument represents an fnmatch regular expression, convert it
# Split up the path and convert each into an Augeas accepted regex
# then reassemble
if "*" in arg or "?" in arg:
split_arg = arg.split("/")
for idx, split in enumerate(split_arg):
# * and ? are the two special fnmatch characters
if "*" in split or "?" in split:
# Turn it into a augeas regex
# TODO: Can this instead be an augeas glob instead of regex
split_arg[idx] = ("* [label()=~regexp('%s')]" %
self.fnmatch_to_re(split))
# Reassemble the argument
arg = "/".join(split_arg)
split_arg = arg.split("/")
for idx, split in enumerate(split_arg):
if any(char in ApacheParser.fnmatch_chars for char in split):
# Turn it into a augeas regex
# TODO: Can this instead be an augeas glob instead of regex
split_arg[idx] = ("* [label()=~regexp('%s')]" %
self.fnmatch_to_re(split))
# Reassemble the argument
# Note: This also normalizes the argument /serverroot/ -> /serverroot
arg = "/".join(split_arg)
# If the include is a directory, just return the directory as a file
if arg.endswith("/"):
return get_aug_path(arg[:len(arg)-1])
return get_aug_path(arg)
def fnmatch_to_re(self, clean_fn_match): # pylint: disable=no-self-use
"""Method converts Apache's basic fnmatch to regular expression.
Assumption - Configs are assumed to be well-formed and only writable by
privileged users.
https://apr.apache.org/docs/apr/2.0/apr__fnmatch_8h_source.html
http://apache2.sourcearchive.com/documentation/2.2.16-6/apr__fnmatch_8h_source.html
:param str clean_fn_match: Apache style filename match, similar to globs
:returns: regex suitable for augeas
:rtype: str
"""
# Checkout fnmatch.py in venv/local/lib/python2.7/fnmatch.py
regex = ""
for letter in clean_fn_match:
if letter == '.':
regex = regex + r"\."
elif letter == '*':
regex = regex + ".*"
# According to apache.org ? shouldn't appear
# but in case it is valid...
elif letter == '?':
regex = regex + "."
else:
regex = regex + letter
return regex
# This strips off final /Z(?ms)
return fnmatch.translate(clean_fn_match)[:-7]
def _parse_file(self, filepath):
"""Parse file with Augeas
@ -318,15 +507,14 @@ class ApacheParser(object):
self.aug.load()
def _set_locations(self, ssl_options):
def _set_locations(self):
"""Set default location for directives.
Locations are given as file_paths
.. todo:: Make sure that files are included
"""
root = self._find_config_root()
default = self._set_user_config_file(root)
default = self._set_user_config_file()
temp = os.path.join(self.root, "ports.conf")
if os.path.isfile(temp):
@ -336,8 +524,7 @@ class ApacheParser(object):
listen = default
name = default
return {"root": root, "default": default, "listen": listen,
"name": name, "ssl_options": ssl_options}
return {"default": default, "listen": listen, "name": name}
def _find_config_root(self):
"""Find the Apache Configuration Root file."""
@ -349,7 +536,7 @@ class ApacheParser(object):
raise errors.NoInstallationError("Could not find configuration root")
def _set_user_config_file(self, root):
def _set_user_config_file(self):
"""Set the appropriate user configuration file
.. todo:: This will have to be updated for other distros versions
@ -360,12 +547,11 @@ class ApacheParser(object):
# Basic check to see if httpd.conf exists and
# in hierarchy via direct include
# httpd.conf was very common as a user file in Apache 2.2
if (os.path.isfile(os.path.join(self.root, 'httpd.conf')) and
self.find_dir(
case_i("Include"), case_i("httpd.conf"), root)):
return os.path.join(self.root, 'httpd.conf')
if (os.path.isfile(os.path.join(self.root, "httpd.conf")) and
self.find_dir("Include", "httpd.conf", self.loc["root"])):
return os.path.join(self.root, "httpd.conf")
else:
return os.path.join(self.root, 'apache2.conf')
return os.path.join(self.root, "apache2.conf")
def case_i(string):
@ -391,22 +577,3 @@ def get_aug_path(file_path):
"""
return "/files%s" % file_path
def strip_dir(path):
"""Returns directory of file path.
.. todo:: Replace this with Python standard function
:param str path: path is a file path. not an augeas section or
directive path
:returns: directory
:rtype: str
"""
index = path.rfind("/")
if index > 0:
return path[:index+1]
# No directory
return ""

View file

@ -0,0 +1,115 @@
"""Test for letsencrypt_apache.augeas_configurator."""
import os
import shutil
import unittest
import mock
from letsencrypt import errors
from letsencrypt_apache.tests import util
class AugeasConfiguratorTest(util.ApacheTest):
"""Test for Augeas Configurator base class."""
def setUp(self): # pylint: disable=arguments-differ
super(AugeasConfiguratorTest, self).setUp()
self.config = util.get_apache_configurator(
self.config_path, self.config_dir, self.work_dir)
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/two_vhost_80")
def tearDown(self):
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
shutil.rmtree(self.temp_dir)
def test_bad_parse(self):
# pylint: disable=protected-access
self.config.parser._parse_file(os.path.join(
self.config.parser.root, "conf-available", "bad_conf_file.conf"))
self.assertRaises(
errors.PluginError, self.config.check_parsing_errors, "httpd.aug")
def test_bad_save(self):
mock_save = mock.Mock()
mock_save.side_effect = IOError
self.config.aug.save = mock_save
self.assertRaises(errors.PluginError, self.config.save)
def test_bad_save_checkpoint(self):
self.config.reverter.add_to_checkpoint = mock.Mock(
side_effect=errors.ReverterError)
self.config.parser.add_dir(
self.vh_truth[0].path, "Test", "bad_save_ckpt")
self.assertRaises(errors.PluginError, self.config.save)
def test_bad_save_finalize_checkpoint(self):
self.config.reverter.finalize_checkpoint = mock.Mock(
side_effect=errors.ReverterError)
self.config.parser.add_dir(
self.vh_truth[0].path, "Test", "bad_save_ckpt")
self.assertRaises(errors.PluginError, self.config.save, "Title")
def test_finalize_save(self):
mock_finalize = mock.Mock()
self.config.reverter = mock_finalize
self.config.save("Example Title")
self.assertTrue(mock_finalize.is_called)
def test_recovery_routine(self):
mock_load = mock.Mock()
self.config.aug.load = mock_load
self.config.recovery_routine()
self.assertEqual(mock_load.call_count, 1)
def test_recovery_routine_error(self):
self.config.reverter.recovery_routine = mock.Mock(
side_effect=errors.ReverterError)
self.assertRaises(
errors.PluginError, self.config.recovery_routine)
def test_revert_challenge_config(self):
mock_load = mock.Mock()
self.config.aug.load = mock_load
self.config.revert_challenge_config()
self.assertEqual(mock_load.call_count, 1)
def test_revert_challenge_config_error(self):
self.config.reverter.revert_temporary_config = mock.Mock(
side_effect=errors.ReverterError)
self.assertRaises(
errors.PluginError, self.config.revert_challenge_config)
def test_rollback_checkpoints(self):
mock_load = mock.Mock()
self.config.aug.load = mock_load
self.config.rollback_checkpoints()
self.assertEqual(mock_load.call_count, 1)
def test_rollback_error(self):
self.config.reverter.rollback_checkpoints = mock.Mock(
side_effect=errors.ReverterError)
self.assertRaises(errors.PluginError, self.config.rollback_checkpoints)
def test_view_config_changes(self):
self.config.view_config_changes()
def test_view_config_changes_error(self):
self.config.reverter.view_config_changes = mock.Mock(
side_effect=errors.ReverterError)
self.assertRaises(errors.PluginError, self.config.view_config_changes)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -0,0 +1,112 @@
"""Tests for letsencrypt_apache.parser."""
import os
import shutil
import unittest
from letsencrypt import errors
from letsencrypt_apache.tests import util
class ComplexParserTest(util.ParserTest):
"""Apache Parser Test."""
def setUp(self): # pylint: disable=arguments-differ
super(ComplexParserTest, self).setUp(
"complex_parsing", "complex_parsing")
self.setup_variables()
# This needs to happen after due to setup_variables not being run
# until after
self.parser._init_modules() # pylint: disable=protected-access
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def setup_variables(self):
"""Set up variables for parser."""
self.parser.variables.update(
{
"COMPLEX": "",
"tls_port": "1234",
"fnmatch_filename": "test_fnmatch.conf",
}
)
def test_filter_args_num(self):
"""Note: This may also fail do to Include conf-enabled/ syntax."""
matches = self.parser.find_dir("TestArgsDirective")
self.assertEqual(len(self.parser.filter_args_num(matches, 1)), 3)
self.assertEqual(len(self.parser.filter_args_num(matches, 2)), 2)
self.assertEqual(len(self.parser.filter_args_num(matches, 3)), 1)
def test_basic_variable_parsing(self):
matches = self.parser.find_dir("TestVariablePort")
self.assertEqual(len(matches), 1)
self.assertEqual(self.parser.get_arg(matches[0]), "1234")
def test_invalid_variable_parsing(self):
del self.parser.variables["tls_port"]
matches = self.parser.find_dir("TestVariablePort")
self.assertRaises(
errors.PluginError, self.parser.get_arg, matches[0])
def test_basic_ifdefine(self):
self.assertEqual(len(self.parser.find_dir("VAR_DIRECTIVE")), 2)
self.assertEqual(len(self.parser.find_dir("INVALID_VAR_DIRECTIVE")), 0)
def test_basic_ifmodule(self):
self.assertEqual(len(self.parser.find_dir("MOD_DIRECTIVE")), 2)
self.assertEqual(
len(self.parser.find_dir("INVALID_MOD_DIRECTIVE")), 0)
def test_nested(self):
self.assertEqual(len(self.parser.find_dir("NESTED_DIRECTIVE")), 3)
self.assertEqual(
len(self.parser.find_dir("INVALID_NESTED_DIRECTIVE")), 0)
def test_load_modules(self):
"""If only first is found, there is bad variable parsing."""
self.assertTrue("status_module" in self.parser.modules)
self.assertTrue("mod_status.c" in self.parser.modules)
# This is in an IfDefine
self.assertTrue("ssl_module" in self.parser.modules)
self.assertTrue("mod_ssl.c" in self.parser.modules)
def verify_fnmatch(self, arg, hit=True):
"""Test if Include was correctly parsed."""
from letsencrypt_apache import parser
self.parser.add_dir(parser.get_aug_path(self.parser.loc["default"]),
"Include", [arg])
if hit:
self.assertTrue(self.parser.find_dir("FNMATCH_DIRECTIVE"))
else:
self.assertFalse(self.parser.find_dir("FNMATCH_DIRECTIVE"))
def test_include(self):
self.verify_fnmatch("test_fnmatch.?onf")
def test_include_complex(self):
self.verify_fnmatch("../complex_parsing/[te][te]st_*.?onf")
def test_include_fullpath(self):
self.verify_fnmatch(os.path.join(self.config_path, "test_fnmatch.conf"))
def test_include_variable(self):
self.verify_fnmatch("../complex_parsing/${fnmatch_filename}")
def test_include_missing(self):
# This should miss
self.verify_fnmatch("test_*.onf", False)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -1,7 +1,8 @@
# pylint: disable=too-many-public-methods
"""Test for letsencrypt_apache.configurator."""
import os
import re
import shutil
import socket
import unittest
import mock
@ -12,27 +13,22 @@ from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt.plugins import common
from letsencrypt.tests import acme_util
from letsencrypt_apache import configurator
from letsencrypt_apache import parser
from letsencrypt_apache import obj
from letsencrypt_apache.tests import util
class TwoVhost80Test(util.ApacheTest):
"""Test two standard well configured HTTP vhosts."""
"""Test two standard well-configured HTTP vhosts."""
def setUp(self):
def setUp(self): # pylint: disable=arguments-differ
super(TwoVhost80Test, self).setUp()
with mock.patch("letsencrypt_apache.configurator.ApacheConfigurator."
"mod_loaded") as mock_load:
mock_load.return_value = True
self.config = util.get_apache_configurator(
self.config_path, self.config_dir, self.work_dir)
self.config = util.get_apache_configurator(
self.config_path, self.config_dir, self.work_dir)
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/two_vhost_80")
@ -42,16 +38,55 @@ class TwoVhost80Test(util.ApacheTest):
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
@mock.patch("letsencrypt_apache.parser.ApacheParser")
def test_prepare_version(self, _):
self.config.version = None
self.config.config_test = mock.Mock()
self.config.get_version = mock.Mock(return_value=(1, 1))
self.assertRaises(
errors.NotSupportedError, self.config.prepare)
def test_add_parser_arguments(self): # pylint: disable=no-self-use
from letsencrypt_apache.configurator import ApacheConfigurator
# Weak test..
ApacheConfigurator.add_parser_arguments(mock.MagicMock())
def test_get_all_names(self):
names = self.config.get_all_names()
self.assertEqual(names, set(
["letsencrypt.demo", "encryption-example.demo", "ip-172-30-0-17"]))
@mock.patch("letsencrypt_apache.configurator.socket.gethostbyaddr")
def test_get_all_names_addrs(self, mock_gethost):
mock_gethost.side_effect = [("google.com", "", ""), socket.error]
vhost = obj.VirtualHost(
"fp", "ap",
set([obj.Addr(("8.8.8.8", "443")),
obj.Addr(("zombo.com",)),
obj.Addr(("192.168.1.2"))]),
True, False)
self.config.vhosts.append(vhost)
names = self.config.get_all_names()
self.assertEqual(len(names), 5)
self.assertTrue("zombo.com" in names)
self.assertTrue("google.com" in names)
self.assertTrue("letsencrypt.demo" in names)
def test_add_servernames_alias(self):
self.config.parser.add_dir(
self.vh_truth[2].path, "ServerAlias", ["*.le.co"])
self.config._add_servernames(self.vh_truth[2]) # pylint: disable=protected-access
self.assertEqual(
self.vh_truth[2].get_names(), set(["*.le.co", "ip-172-30-0-17"]))
def test_get_virtual_hosts(self):
"""Make sure all vhosts are being properly found.
.. note:: If test fails, only finding 1 Vhost... it is likely that
it is a problem with is_enabled.
it is a problem with is_enabled. If finding only 3, likely is_ssl
"""
vhs = self.config.get_virtual_hosts()
@ -63,9 +98,77 @@ class TwoVhost80Test(util.ApacheTest):
if vhost == truth:
found += 1
break
else:
raise Exception("Missed: %s" % vhost) # pragma: no cover
self.assertEqual(found, 4)
@mock.patch("letsencrypt_apache.display_ops.select_vhost")
def test_choose_vhost_none_avail(self, mock_select):
mock_select.return_value = None
self.assertRaises(
errors.PluginError, self.config.choose_vhost, "none.com")
@mock.patch("letsencrypt_apache.display_ops.select_vhost")
def test_choose_vhost_select_vhost_ssl(self, mock_select):
mock_select.return_value = self.vh_truth[1]
self.assertEqual(
self.vh_truth[1], self.config.choose_vhost("none.com"))
@mock.patch("letsencrypt_apache.display_ops.select_vhost")
def test_choose_vhost_select_vhost_non_ssl(self, mock_select):
mock_select.return_value = self.vh_truth[0]
chosen_vhost = self.config.choose_vhost("none.com")
self.assertEqual(
self.vh_truth[0].get_names(), chosen_vhost.get_names())
# Make sure we go from HTTP -> HTTPS
self.assertFalse(self.vh_truth[0].ssl)
self.assertTrue(chosen_vhost.ssl)
@mock.patch("letsencrypt_apache.display_ops.select_vhost")
def test_choose_vhost_select_vhost_conflicting_non_ssl(self, mock_select):
mock_select.return_value = self.vh_truth[3]
conflicting_vhost = obj.VirtualHost(
"path", "aug_path", set([obj.Addr.fromstring("*:443")]), True, True)
self.config.vhosts.append(conflicting_vhost)
self.assertRaises(
errors.PluginError, self.config.choose_vhost, "none.com")
def test_find_best_vhost(self):
# pylint: disable=protected-access
self.assertEqual(
self.vh_truth[3], self.config._find_best_vhost("letsencrypt.demo"))
self.assertEqual(
self.vh_truth[0],
self.config._find_best_vhost("encryption-example.demo"))
self.assertTrue(
self.config._find_best_vhost("does-not-exist.com") is None)
def test_find_best_vhost_variety(self):
# pylint: disable=protected-access
ssl_vh = obj.VirtualHost(
"fp", "ap", set([obj.Addr(("*", "443")), obj.Addr(("zombo.com",))]),
True, False)
self.config.vhosts.append(ssl_vh)
self.assertEqual(self.config._find_best_vhost("zombo.com"), ssl_vh)
def test_find_best_vhost_default(self):
# pylint: disable=protected-access
# Assume only the two default vhosts.
self.config.vhosts = [
vh for vh in self.config.vhosts
if vh.name not in ["letsencrypt.demo", "encryption-example.demo"]
]
self.assertEqual(
self.config._find_best_vhost("example.demo"), self.vh_truth[2])
def test_non_default_vhosts(self):
# pylint: disable=protected-access
self.assertEqual(len(self.config._non_default_vhosts()), 3)
def test_is_site_enabled(self):
"""Test if site is enabled.
@ -80,7 +183,59 @@ class TwoVhost80Test(util.ApacheTest):
self.assertTrue(self.config.is_site_enabled(self.vh_truth[2].filep))
self.assertTrue(self.config.is_site_enabled(self.vh_truth[3].filep))
@mock.patch("letsencrypt.le_util.run_script")
@mock.patch("letsencrypt.le_util.exe_exists")
@mock.patch("letsencrypt_apache.parser.subprocess.Popen")
def test_enable_mod(self, mock_popen, mock_exe_exists, mock_run_script):
mock_popen().communicate.return_value = ("Define: DUMP_RUN_CFG", "")
mock_popen().returncode = 0
mock_exe_exists.return_value = True
self.config.enable_mod("ssl")
self.assertTrue("ssl_module" in self.config.parser.modules)
self.assertTrue("mod_ssl.c" in self.config.parser.modules)
self.assertTrue(mock_run_script.called)
def test_enable_mod_unsupported_dirs(self):
shutil.rmtree(os.path.join(self.config.parser.root, "mods-enabled"))
self.assertRaises(
errors.NotSupportedError, self.config.enable_mod, "ssl")
@mock.patch("letsencrypt.le_util.exe_exists")
def test_enable_mod_no_disable(self, mock_exe_exists):
mock_exe_exists.return_value = False
self.assertRaises(
errors.MisconfigurationError, self.config.enable_mod, "ssl")
@mock.patch("letsencrypt.le_util.run_script")
@mock.patch("letsencrypt.le_util.exe_exists")
@mock.patch("letsencrypt_apache.parser.subprocess.Popen")
def test_enable_site(self, mock_popen, mock_exe_exists, mock_run_script):
mock_popen().returncode = 0
mock_popen().communicate.return_value = ("Define: DUMP_RUN_CFG", "")
mock_exe_exists.return_value = True
# Default 443 vhost
self.assertFalse(self.vh_truth[1].enabled)
self.config.enable_site(self.vh_truth[1])
self.assertTrue(self.vh_truth[1].enabled)
# Mod enabled
self.assertTrue(mock_run_script.called)
# Go again to make sure nothing fails
self.config.enable_site(self.vh_truth[1])
def test_enable_site_failure(self):
self.assertRaises(
errors.NotSupportedError,
self.config.enable_site,
obj.VirtualHost("asdf", "afsaf", set(), False, False))
def test_deploy_cert(self):
self.config.parser.modules.add("ssl_module")
self.config.parser.modules.add("mod_ssl.c")
# Get the default 443 vhost
self.config.assoc["random.demo"] = self.vh_truth[1]
self.config.deploy_cert(
@ -88,15 +243,17 @@ class TwoVhost80Test(util.ApacheTest):
"example/cert.pem", "example/key.pem", "example/cert_chain.pem")
self.config.save()
# Verify ssl_module was enabled.
self.assertTrue(self.vh_truth[1].enabled)
self.assertTrue("ssl_module" in self.config.parser.modules)
loc_cert = self.config.parser.find_dir(
parser.case_i("sslcertificatefile"),
re.escape("example/cert.pem"), self.vh_truth[1].path)
"sslcertificatefile", "example/cert.pem", self.vh_truth[1].path)
loc_key = self.config.parser.find_dir(
parser.case_i("sslcertificateKeyfile"),
re.escape("example/key.pem"), self.vh_truth[1].path)
"sslcertificateKeyfile", "example/key.pem", self.vh_truth[1].path)
loc_chain = self.config.parser.find_dir(
parser.case_i("SSLCertificateChainFile"),
re.escape("example/cert_chain.pem"), self.vh_truth[1].path)
"SSLCertificateChainFile", "example/cert_chain.pem",
self.vh_truth[1].path)
# Verify one directive was found in the correct file
self.assertEqual(len(loc_cert), 1)
@ -111,16 +268,53 @@ class TwoVhost80Test(util.ApacheTest):
self.assertEqual(configurator.get_file_path(loc_chain[0]),
self.vh_truth[1].filep)
# One more time for chain directive setting
self.config.deploy_cert(
"random.demo",
"two/cert.pem", "two/key.pem", "two/cert_chain.pem")
self.assertTrue(self.config.parser.find_dir(
"SSLCertificateChainFile", "two/cert_chain.pem",
self.vh_truth[1].path))
def test_deploy_cert_invalid_vhost(self):
self.config.parser.modules.add("ssl_module")
mock_find = mock.MagicMock()
mock_find.return_value = []
self.config.parser.find_dir = mock_find
# Get the default 443 vhost
self.config.assoc["random.demo"] = self.vh_truth[1]
self.assertRaises(
errors.PluginError, self.config.deploy_cert, "random.demo",
"example/cert.pem", "example/key.pem", "example/cert_chain.pem")
def test_is_name_vhost(self):
addr = common.Addr.fromstring("*:80")
addr = obj.Addr.fromstring("*:80")
self.assertTrue(self.config.is_name_vhost(addr))
self.config.version = (2, 2)
self.assertFalse(self.config.is_name_vhost(addr))
def test_add_name_vhost(self):
self.config.add_name_vhost("*:443")
self.config.add_name_vhost(obj.Addr.fromstring("*:443"))
self.config.add_name_vhost(obj.Addr.fromstring("*:80"))
self.assertTrue(self.config.parser.find_dir(
"NameVirtualHost", re.escape("*:443")))
"NameVirtualHost", "*:443", exclude=False))
self.assertTrue(self.config.parser.find_dir(
"NameVirtualHost", "*:80"))
def test_prepare_server_https(self):
self.config.parser.modules.add("ssl_module")
mock_find = mock.Mock()
mock_add_dir = mock.Mock()
mock_find.return_value = []
# This will test the Add listen
self.config.parser.find_dir = mock_find
self.config.parser.add_dir_to_ifmodssl = mock_add_dir
self.config.prepare_server_https("443")
self.config.prepare_server_https("8080")
self.assertEqual(mock_add_dir.call_count, 2)
def test_make_vhost_ssl(self):
ssl_vhost = self.config.make_vhost_ssl(self.vh_truth[0])
@ -133,28 +327,264 @@ class TwoVhost80Test(util.ApacheTest):
self.assertEqual(ssl_vhost.path,
"/files" + ssl_vhost.filep + "/IfModule/VirtualHost")
self.assertEqual(len(ssl_vhost.addrs), 1)
self.assertEqual(set([common.Addr.fromstring("*:443")]), ssl_vhost.addrs)
self.assertEqual(ssl_vhost.names, set(["encryption-example.demo"]))
self.assertEqual(set([obj.Addr.fromstring("*:443")]), ssl_vhost.addrs)
self.assertEqual(ssl_vhost.name, "encryption-example.demo")
self.assertTrue(ssl_vhost.ssl)
self.assertFalse(ssl_vhost.enabled)
self.assertTrue(self.config.parser.find_dir(
"SSLCertificateFile", None, ssl_vhost.path))
"SSLCertificateFile", None, ssl_vhost.path, False))
self.assertTrue(self.config.parser.find_dir(
"SSLCertificateKeyFile", None, ssl_vhost.path))
self.assertTrue(self.config.parser.find_dir(
"Include", self.ssl_options, ssl_vhost.path))
"SSLCertificateKeyFile", None, ssl_vhost.path, False))
self.assertEqual(self.config.is_name_vhost(self.vh_truth[0]),
self.config.is_name_vhost(ssl_vhost))
self.assertEqual(len(self.config.vhosts), 5)
def test_make_vhost_ssl_extra_vhs(self):
self.config.aug.match = mock.Mock(return_value=["p1", "p2"])
self.assertRaises(
errors.PluginError, self.config.make_vhost_ssl, self.vh_truth[0])
def test_make_vhost_ssl_bad_write(self):
mock_open = mock.mock_open()
# This calls open
self.config.reverter.register_file_creation = mock.Mock()
mock_open.side_effect = IOError
with mock.patch("__builtin__.open", mock_open):
self.assertRaises(
errors.PluginError,
self.config.make_vhost_ssl, self.vh_truth[0])
def test_get_ssl_vhost_path(self):
# pylint: disable=protected-access
self.assertTrue(
self.config._get_ssl_vhost_path("example_path").endswith(".conf"))
def test_add_name_vhost_if_necessary(self):
# pylint: disable=protected-access
self.config.save = mock.Mock()
self.config.version = (2, 2)
self.config._add_name_vhost_if_necessary(self.vh_truth[0])
self.assertTrue(self.config.save.called)
@mock.patch("letsencrypt_apache.configurator.dvsni.ApacheDvsni.perform")
@mock.patch("letsencrypt_apache.configurator.ApacheConfigurator.restart")
def test_perform(self, mock_restart, mock_dvsni_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
_, achall1, achall2 = self.get_achalls()
dvsni_ret_val = [
challenges.DVSNIResponse(s="randomS1"),
challenges.DVSNIResponse(s="randomS2"),
]
mock_dvsni_perform.return_value = dvsni_ret_val
responses = self.config.perform([achall1, achall2])
self.assertEqual(mock_dvsni_perform.call_count, 1)
self.assertEqual(responses, dvsni_ret_val)
self.assertEqual(mock_restart.call_count, 1)
@mock.patch("letsencrypt_apache.configurator.ApacheConfigurator.restart")
def test_cleanup(self, mock_restart):
_, achall1, achall2 = self.get_achalls()
self.config._chall_out.add(achall1) # pylint: disable=protected-access
self.config._chall_out.add(achall2) # pylint: disable=protected-access
self.config.cleanup([achall1])
self.assertFalse(mock_restart.called)
self.config.cleanup([achall2])
self.assertTrue(mock_restart.called)
@mock.patch("letsencrypt_apache.configurator.ApacheConfigurator.restart")
def test_cleanup_no_errors(self, mock_restart):
_, achall1, achall2 = self.get_achalls()
self.config._chall_out.add(achall1) # pylint: disable=protected-access
self.config.cleanup([achall2])
self.assertFalse(mock_restart.called)
self.config.cleanup([achall1, achall2])
self.assertTrue(mock_restart.called)
@mock.patch("letsencrypt.le_util.run_script")
def test_get_version(self, mock_script):
mock_script.return_value = (
"Server Version: Apache/2.4.2 (Debian)", "")
self.assertEqual(self.config.get_version(), (2, 4, 2))
mock_script.return_value = (
"Server Version: Apache/2 (Linux)", "")
self.assertEqual(self.config.get_version(), (2,))
mock_script.return_value = (
"Server Version: Apache (Debian)", "")
self.assertRaises(errors.PluginError, self.config.get_version)
mock_script.return_value = (
"Server Version: Apache/2.3{0} Apache/2.4.7".format(os.linesep), "")
self.assertRaises(errors.PluginError, self.config.get_version)
mock_script.side_effect = errors.SubprocessError("Can't find program")
self.assertRaises(errors.PluginError, self.config.get_version)
@mock.patch("letsencrypt_apache.configurator.subprocess.Popen")
def test_restart(self, mock_popen):
"""These will be changed soon enough with reload."""
mock_popen().returncode = 0
mock_popen().communicate.return_value = ("", "")
self.config.restart()
@mock.patch("letsencrypt_apache.configurator.subprocess.Popen")
def test_restart_bad_process(self, mock_popen):
mock_popen.side_effect = OSError
self.assertRaises(errors.MisconfigurationError, self.config.restart)
@mock.patch("letsencrypt_apache.configurator.subprocess.Popen")
def test_restart_failure(self, mock_popen):
mock_popen().communicate.return_value = ("", "")
mock_popen().returncode = 1
self.assertRaises(errors.MisconfigurationError, self.config.restart)
@mock.patch("letsencrypt.le_util.run_script")
def test_config_test(self, _):
self.config.config_test()
@mock.patch("letsencrypt.le_util.run_script")
def test_config_test_bad_process(self, mock_run_script):
mock_run_script.side_effect = errors.SubprocessError
self.assertRaises(errors.MisconfigurationError, self.config.config_test)
def test_get_all_certs_keys(self):
c_k = self.config.get_all_certs_keys()
self.assertEqual(len(c_k), 1)
cert, key, path = next(iter(c_k))
self.assertTrue("cert" in cert)
self.assertTrue("key" in key)
self.assertTrue("default-ssl.conf" in path)
def test_get_all_certs_keys_malformed_conf(self):
self.config.parser.find_dir = mock.Mock(side_effect=[["path"], []])
c_k = self.config.get_all_certs_keys()
self.assertFalse(c_k)
def test_more_info(self):
self.assertTrue(self.config.more_info())
def test_get_chall_pref(self):
self.assertTrue(isinstance(self.config.get_chall_pref(""), list))
def test_temp_install(self):
from letsencrypt_apache.configurator import temp_install
path = os.path.join(self.work_dir, "test_it")
temp_install(path)
self.assertTrue(os.path.isfile(path))
# TEST ENHANCEMENTS
def test_supported_enhancements(self):
self.assertTrue(isinstance(self.config.supported_enhancements(), list))
def test_enhance_unknown_enhancement(self):
self.assertRaises(
errors.PluginError,
self.config.enhance, "letsencrypt.demo", "unknown_enhancement")
@mock.patch("letsencrypt.le_util.run_script")
@mock.patch("letsencrypt.le_util.exe_exists")
def test_redirect_well_formed_http(self, mock_exe, _):
self.config.parser.update_runtime_variables = mock.Mock()
mock_exe.return_value = True
# This will create an ssl vhost for letsencrypt.demo
self.config.enhance("letsencrypt.demo", "redirect")
# These are not immediately available in find_dir even with save() and
# load(). They must be found in sites-available
rw_engine = self.config.parser.find_dir(
"RewriteEngine", "on", self.vh_truth[3].path)
rw_rule = self.config.parser.find_dir(
"RewriteRule", None, self.vh_truth[3].path)
self.assertEqual(len(rw_engine), 1)
# three args to rw_rule
self.assertEqual(len(rw_rule), 3)
self.assertTrue(rw_engine[0].startswith(self.vh_truth[3].path))
self.assertTrue(rw_rule[0].startswith(self.vh_truth[3].path))
self.assertTrue("rewrite_module" in self.config.parser.modules)
def test_redirect_with_conflict(self):
self.config.parser.modules.add("rewrite_module")
ssl_vh = obj.VirtualHost(
"fp", "ap", set([obj.Addr(("*", "443")), obj.Addr(("zombo.com",))]),
True, False)
# No names ^ this guy should conflict.
# pylint: disable=protected-access
self.assertRaises(
errors.PluginError, self.config._enable_redirect, ssl_vh, "")
def test_redirect_twice(self):
# Skip the enable mod
self.config.parser.modules.add("rewrite_module")
self.config.enhance("encryption-example.demo", "redirect")
self.assertRaises(
errors.PluginError,
self.config.enhance, "encryption-example.demo", "redirect")
def test_unknown_rewrite(self):
# Skip the enable mod
self.config.parser.modules.add("rewrite_module")
self.config.parser.add_dir(
self.vh_truth[3].path, "RewriteRule", ["Unknown"])
self.config.save()
self.assertRaises(
errors.PluginError,
self.config.enhance, "letsencrypt.demo", "redirect")
def test_unknown_rewrite2(self):
# Skip the enable mod
self.config.parser.modules.add("rewrite_module")
self.config.parser.add_dir(
self.vh_truth[3].path, "RewriteRule", ["Unknown", "2", "3"])
self.config.save()
self.assertRaises(
errors.PluginError,
self.config.enhance, "letsencrypt.demo", "redirect")
def test_unknown_redirect(self):
# Skip the enable mod
self.config.parser.modules.add("rewrite_module")
self.config.parser.add_dir(
self.vh_truth[3].path, "Redirect", ["Unknown"])
self.config.save()
self.assertRaises(
errors.PluginError,
self.config.enhance, "letsencrypt.demo", "redirect")
def test_create_own_redirect(self):
self.config.parser.modules.add("rewrite_module")
# For full testing... give names...
self.vh_truth[1].name = "default.com"
self.vh_truth[1].aliases = set(["yes.default.com"])
self.config._enable_redirect(self.vh_truth[1], "") # pylint: disable=protected-access
self.assertEqual(len(self.config.vhosts), 5)
def get_achalls(self):
"""Return testing achallenges."""
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
achall1 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
@ -171,39 +601,16 @@ class TwoVhost80Test(util.ApacheTest):
"pending"),
domain="letsencrypt.demo", key=auth_key)
dvsni_ret_val = [
challenges.DVSNIResponse(s="randomS1"),
challenges.DVSNIResponse(s="randomS2"),
]
return auth_key, achall1, achall2
mock_dvsni_perform.return_value = dvsni_ret_val
responses = self.config.perform([achall1, achall2])
self.assertEqual(mock_dvsni_perform.call_count, 1)
self.assertEqual(responses, dvsni_ret_val)
self.assertEqual(mock_restart.call_count, 1)
@mock.patch("letsencrypt_apache.configurator.subprocess.Popen")
def test_get_version(self, mock_popen):
mock_popen().communicate.return_value = (
"Server Version: Apache/2.4.2 (Debian)", "")
self.assertEqual(self.config.get_version(), (2, 4, 2))
mock_popen().communicate.return_value = (
"Server Version: Apache/2 (Linux)", "")
self.assertEqual(self.config.get_version(), (2,))
mock_popen().communicate.return_value = (
"Server Version: Apache (Debian)", "")
self.assertRaises(errors.PluginError, self.config.get_version)
mock_popen().communicate.return_value = (
"Server Version: Apache/2.3{0} Apache/2.4.7".format(os.linesep), "")
self.assertRaises(errors.PluginError, self.config.get_version)
mock_popen.side_effect = OSError("Can't find program")
self.assertRaises(errors.PluginError, self.config.get_version)
def test_make_addrs_sni_ready(self):
self.config.version = (2, 2)
self.config.make_addrs_sni_ready(
set([obj.Addr.fromstring("*:443"), obj.Addr.fromstring("*:80")]))
self.assertTrue(self.config.parser.find_dir(
"NameVirtualHost", "*:80", exclude=False))
self.assertTrue(self.config.parser.find_dir(
"NameVirtualHost", "*:443", exclude=False))
if __name__ == "__main__":

View file

@ -5,10 +5,12 @@ import unittest
import mock
import zope.component
from letsencrypt_apache.tests import util
from letsencrypt.display import util as display_util
from letsencrypt_apache import obj
from letsencrypt_apache.tests import util
class SelectVhostTest(unittest.TestCase):
"""Tests for letsencrypt_apache.display_ops.select_vhost."""
@ -53,6 +55,18 @@ class SelectVhostTest(unittest.TestCase):
self.assertEqual(mock_logger.debug.call_count, 1)
@mock.patch("letsencrypt_apache.display_ops.zope.component.getUtility")
def test_multiple_names(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 4)
self.vhosts.append(
obj.VirtualHost(
"path", "aug_path", set([obj.Addr.fromstring("*:80")]),
False, False,
"wildcard.com", set(["*.wildcard.com"])))
self.assertEqual(self.vhosts[4], self._call(self.vhosts))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -6,9 +6,9 @@ import mock
from acme import challenges
from letsencrypt.plugins import common
from letsencrypt.plugins import common_test
from letsencrypt_apache import obj
from letsencrypt_apache.tests import util
@ -17,14 +17,12 @@ class DvsniPerformTest(util.ApacheTest):
achalls = common_test.DvsniTest.achalls
def setUp(self):
def setUp(self): # pylint: disable=arguments-differ
super(DvsniPerformTest, self).setUp()
with mock.patch("letsencrypt_apache.configurator.ApacheConfigurator."
"mod_loaded") as mock_load:
mock_load.return_value = True
config = util.get_apache_configurator(
self.config_path, self.config_dir, self.work_dir)
config = util.get_apache_configurator(
self.config_path, self.config_dir, self.work_dir)
config.config.dvsni_port = 443
from letsencrypt_apache import dvsni
self.sni = dvsni.ApacheDvsni(config)
@ -38,7 +36,12 @@ class DvsniPerformTest(util.ApacheTest):
resp = self.sni.perform()
self.assertEqual(len(resp), 0)
def test_perform1(self):
@mock.patch("letsencrypt.le_util.exe_exists")
@mock.patch("letsencrypt.le_util.run_script")
def test_perform1(self, _, mock_exists):
mock_exists.return_value = True
self.sni.configurator.parser.update_runtime_variables = mock.Mock()
achall = self.achalls[0]
self.sni.add_chall(achall)
mock_setup_cert = mock.MagicMock(
@ -53,12 +56,14 @@ class DvsniPerformTest(util.ApacheTest):
# Check to make sure challenge config path is included in apache config.
self.assertEqual(
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)),
1)
"Include", self.sni.challenge_conf)), 1)
self.assertEqual(len(responses), 1)
self.assertEqual(responses[0].s, "randomS1")
def test_perform2(self):
# Avoid load module
self.sni.configurator.parser.modules.add("ssl_module")
for achall in self.achalls:
self.sni.add_chall(achall)
@ -89,13 +94,8 @@ class DvsniPerformTest(util.ApacheTest):
def test_mod_config(self):
for achall in self.achalls:
self.sni.add_chall(achall)
v_addr1 = [common.Addr(("1.2.3.4", "443")),
common.Addr(("5.6.7.8", "443"))]
v_addr2 = [common.Addr(("127.0.0.1", "443"))]
ll_addr = []
ll_addr.append(v_addr1)
ll_addr.append(v_addr2)
self.sni._mod_config(ll_addr) # pylint: disable=protected-access
self.sni._mod_config() # pylint: disable=protected-access
self.sni.configurator.save()
self.sni.configurator.parser.find_dir(
@ -109,15 +109,22 @@ class DvsniPerformTest(util.ApacheTest):
vhs.append(self.sni.configurator._create_vhost(match))
self.assertEqual(len(vhs), 2)
for vhost in vhs:
if vhost.addrs == set(v_addr1):
self.assertEqual(
vhost.names,
set([self.achalls[0].nonce_domain]))
else:
self.assertEqual(vhost.addrs, set(v_addr2))
self.assertEqual(
vhost.names,
set([self.achalls[1].nonce_domain]))
self.assertEqual(vhost.addrs, set([obj.Addr.fromstring("*:443")]))
names = vhost.get_names()
self.assertTrue(
names == set([self.achalls[0].nonce_domain]) or
names == set([self.achalls[1].nonce_domain]))
def test_get_dvsni_addrs_default(self):
self.sni.configurator.choose_vhost = mock.Mock(
return_value=obj.VirtualHost(
"path", "aug_path", set([obj.Addr.fromstring("_default_:443")]),
False, False)
)
self.assertEqual(
set([obj.Addr.fromstring("*:443")]),
self.sni.get_dvsni_addrs(self.achalls[0]))
if __name__ == "__main__":

View file

@ -1,27 +1,135 @@
"""Tests for letsencrypt_apache.obj."""
import unittest
from letsencrypt.plugins import common
class VirtualHostTest(unittest.TestCase):
"""Test the VirtualHost class."""
def setUp(self):
from letsencrypt_apache.obj import Addr
from letsencrypt_apache.obj import VirtualHost
self.addr1 = Addr.fromstring("127.0.0.1")
self.addr2 = Addr.fromstring("127.0.0.1:443")
self.addr_default = Addr.fromstring("_default_:443")
self.vhost1 = VirtualHost(
"filep", "vh_path",
set([common.Addr.fromstring("localhost")]), False, False)
"filep", "vh_path", set([self.addr1]), False, False, "localhost")
self.vhost1b = VirtualHost(
"filep", "vh_path", set([self.addr1]), False, False, "localhost")
self.vhost2 = VirtualHost(
"fp", "vhp", set([self.addr2]), False, False, "localhost")
def test_eq(self):
from letsencrypt_apache.obj import VirtualHost
vhost1b = VirtualHost(
"filep", "vh_path",
set([common.Addr.fromstring("localhost")]), False, False)
self.assertTrue(self.vhost1b == self.vhost1)
self.assertFalse(self.vhost1 == self.vhost2)
self.assertEqual(str(self.vhost1b), str(self.vhost1))
self.assertFalse(self.vhost1b == 1234)
self.assertEqual(vhost1b, self.vhost1)
self.assertEqual(str(vhost1b), str(self.vhost1))
self.assertFalse(vhost1b == 1234)
def test_ne(self):
self.assertTrue(self.vhost1 != self.vhost2)
self.assertFalse(self.vhost1 != self.vhost1b)
def test_conflicts(self):
from letsencrypt_apache.obj import Addr
from letsencrypt_apache.obj import VirtualHost
complex_vh = VirtualHost(
"fp", "vhp",
set([Addr.fromstring("*:443"), Addr.fromstring("1.2.3.4:443")]),
False, False)
self.assertTrue(complex_vh.conflicts([self.addr1]))
self.assertTrue(complex_vh.conflicts([self.addr2]))
self.assertFalse(complex_vh.conflicts([self.addr_default]))
self.assertTrue(self.vhost1.conflicts([self.addr2]))
self.assertFalse(self.vhost1.conflicts([self.addr_default]))
self.assertFalse(self.vhost2.conflicts([self.addr1, self.addr_default]))
def test_same_server(self):
from letsencrypt_apache.obj import VirtualHost
no_name1 = VirtualHost(
"fp", "vhp", set([self.addr1]), False, False, None)
no_name2 = VirtualHost(
"fp", "vhp", set([self.addr2]), False, False, None)
no_name3 = VirtualHost(
"fp", "vhp", set([self.addr_default]),
False, False, None)
no_name4 = VirtualHost(
"fp", "vhp", set([self.addr2, self.addr_default]),
False, False, None)
self.assertTrue(self.vhost1.same_server(self.vhost2))
self.assertTrue(no_name1.same_server(no_name2))
self.assertFalse(self.vhost1.same_server(no_name1))
self.assertFalse(no_name1.same_server(no_name3))
self.assertFalse(no_name1.same_server(no_name4))
class AddrTest(unittest.TestCase):
"""Test obj.Addr."""
def setUp(self):
from letsencrypt_apache.obj import Addr
self.addr = Addr.fromstring("*:443")
self.addr1 = Addr.fromstring("127.0.0.1")
self.addr2 = Addr.fromstring("127.0.0.1:*")
self.addr_defined = Addr.fromstring("127.0.0.1:443")
self.addr_default = Addr.fromstring("_default_:443")
def test_wildcard(self):
self.assertFalse(self.addr.is_wildcard())
self.assertTrue(self.addr1.is_wildcard())
self.assertTrue(self.addr2.is_wildcard())
def test_get_sni_addr(self):
from letsencrypt_apache.obj import Addr
self.assertEqual(
self.addr.get_sni_addr("443"), Addr.fromstring("*:443"))
self.assertEqual(
self.addr.get_sni_addr("225"), Addr.fromstring("*:225"))
self.assertEqual(
self.addr1.get_sni_addr("443"), Addr.fromstring("127.0.0.1"))
def test_conflicts(self):
# Note: Defined IP is more important than defined port in match
self.assertTrue(self.addr.conflicts(self.addr1))
self.assertTrue(self.addr.conflicts(self.addr2))
self.assertTrue(self.addr.conflicts(self.addr_defined))
self.assertFalse(self.addr.conflicts(self.addr_default))
self.assertFalse(self.addr1.conflicts(self.addr))
self.assertTrue(self.addr1.conflicts(self.addr_defined))
self.assertFalse(self.addr1.conflicts(self.addr_default))
self.assertFalse(self.addr_defined.conflicts(self.addr1))
self.assertFalse(self.addr_defined.conflicts(self.addr2))
self.assertFalse(self.addr_defined.conflicts(self.addr))
self.assertFalse(self.addr_defined.conflicts(self.addr_default))
self.assertTrue(self.addr_default.conflicts(self.addr))
self.assertTrue(self.addr_default.conflicts(self.addr1))
self.assertTrue(self.addr_default.conflicts(self.addr_defined))
# Self test
self.assertTrue(self.addr.conflicts(self.addr))
self.assertTrue(self.addr1.conflicts(self.addr1))
# This is a tricky one...
self.assertTrue(self.addr1.conflicts(self.addr2))
def test_equal(self):
self.assertTrue(self.addr1 == self.addr2)
self.assertFalse(self.addr == self.addr1)
self.assertFalse(self.addr == 123)
def test_not_equal(self):
self.assertFalse(self.addr1 != self.addr2)
self.assertTrue(self.addr != self.addr1)
if __name__ == "__main__":

View file

@ -1,52 +1,32 @@
"""Tests for letsencrypt_apache.parser."""
import os
import shutil
import sys
import unittest
import augeas
import mock
import zope.component
from letsencrypt import errors
from letsencrypt.display import util as display_util
from letsencrypt_apache.tests import util
class ApacheParserTest(util.ApacheTest):
class BasicParserTest(util.ParserTest):
"""Apache Parser Test."""
def setUp(self):
super(ApacheParserTest, self).setUp()
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
from letsencrypt_apache.parser import ApacheParser
self.aug = augeas.Augeas(flags=augeas.Augeas.NONE)
self.parser = ApacheParser(self.aug, self.config_path, self.ssl_options)
def setUp(self): # pylint: disable=arguments-differ
super(BasicParserTest, self).setUp()
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_root_normalized(self):
from letsencrypt_apache.parser import ApacheParser
path = os.path.join(self.temp_dir, "debian_apache_2_4/////"
"two_vhost_80/../two_vhost_80/apache2")
parser = ApacheParser(self.aug, path, None)
self.assertEqual(parser.root, self.config_path)
def test_root_absolute(self):
from letsencrypt_apache.parser import ApacheParser
parser = ApacheParser(self.aug, os.path.relpath(self.config_path), None)
self.assertEqual(parser.root, self.config_path)
def test_root_no_trailing_slash(self):
from letsencrypt_apache.parser import ApacheParser
parser = ApacheParser(self.aug, self.config_path + os.path.sep, None)
self.assertEqual(parser.root, self.config_path)
def test_find_config_root_no_root(self):
# pylint: disable=protected-access
os.remove(self.parser.loc["root"])
self.assertRaises(
errors.NoInstallationError, self.parser._find_config_root)
def test_parse_file(self):
"""Test parse_file.
@ -67,11 +47,11 @@ class ApacheParserTest(util.ApacheTest):
self.assertTrue(matches)
def test_find_dir(self):
from letsencrypt_apache.parser import case_i
test = self.parser.find_dir(case_i("Listen"), "443")
test = self.parser.find_dir("Listen", "80")
# This will only look in enabled hosts
test2 = self.parser.find_dir(case_i("documentroot"))
self.assertEqual(len(test), 2)
test2 = self.parser.find_dir("documentroot")
self.assertEqual(len(test), 1)
self.assertEqual(len(test2), 3)
def test_add_dir(self):
@ -93,15 +73,32 @@ class ApacheParserTest(util.ApacheTest):
"""
from letsencrypt_apache.parser import get_aug_path
# This makes sure that find_dir will work
self.parser.modules.add("mod_ssl.c")
self.parser.add_dir_to_ifmodssl(
get_aug_path(self.parser.loc["default"]),
"FakeDirective", "123")
"FakeDirective", ["123"])
matches = self.parser.find_dir("FakeDirective", "123")
self.assertEqual(len(matches), 1)
self.assertTrue("IfModule" in matches[0])
def test_add_dir_to_ifmodssl_multiple(self):
from letsencrypt_apache.parser import get_aug_path
# This makes sure that find_dir will work
self.parser.modules.add("mod_ssl.c")
self.parser.add_dir_to_ifmodssl(
get_aug_path(self.parser.loc["default"]),
"FakeDirective", ["123", "456", "789"])
matches = self.parser.find_dir("FakeDirective")
self.assertEqual(len(matches), 3)
self.assertTrue("IfModule" in matches[0])
def test_get_aug_path(self):
from letsencrypt_apache.parser import get_aug_path
self.assertEqual("/files/etc/apache", get_aug_path("/etc/apache"))
@ -109,20 +106,114 @@ class ApacheParserTest(util.ApacheTest):
def test_set_locations(self):
with mock.patch("letsencrypt_apache.parser.os.path") as mock_path:
mock_path.isfile.return_value = False
# pylint: disable=protected-access
self.assertRaises(errors.PluginError,
self.parser._set_locations, self.ssl_options)
mock_path.isfile.side_effect = [True, False, False]
# pylint: disable=protected-access
results = self.parser._set_locations(self.ssl_options)
results = self.parser._set_locations()
self.assertEqual(results["default"], results["listen"])
self.assertEqual(results["default"], results["name"])
def test_set_user_config_file(self):
# pylint: disable=protected-access
path = os.path.join(self.parser.root, "httpd.conf")
open(path, 'w').close()
self.parser.add_dir(self.parser.loc["default"], "Include", "httpd.conf")
self.assertEqual(
path, self.parser._set_user_config_file())
@mock.patch("letsencrypt_apache.parser.ApacheParser._get_runtime_cfg")
def test_update_runtime_variables(self, mock_cfg):
mock_cfg.return_value = (
'ServerRoot: "/etc/apache2"\n'
'Main DocumentRoot: "/var/www"\n'
'Main ErrorLog: "/var/log/apache2/error.log"\n'
'Mutex ssl-stapling: using_defaults\n'
'Mutex ssl-cache: using_defaults\n'
'Mutex default: dir="/var/lock/apache2" mechanism=fcntl\n'
'Mutex watchdog-callback: using_defaults\n'
'PidFile: "/var/run/apache2/apache2.pid"\n'
'Define: TEST\n'
'Define: DUMP_RUN_CFG\n'
'Define: U_MICH\n'
'Define: TLS=443\n'
'Define: example_path=Documents/path\n'
'User: name="www-data" id=33 not_used\n'
'Group: name="www-data" id=33 not_used\n'
)
expected_vars = {"TEST": "", "U_MICH": "", "TLS": "443",
"example_path":"Documents/path"}
self.parser.update_runtime_variables("ctl")
self.assertEqual(self.parser.variables, expected_vars)
@mock.patch("letsencrypt_apache.parser.ApacheParser._get_runtime_cfg")
def test_update_runtime_vars_bad_output(self, mock_cfg):
mock_cfg.return_value = "Define: TLS=443=24"
self.assertRaises(
errors.PluginError, self.parser.update_runtime_variables, "ctl")
mock_cfg.return_value = "Define: DUMP_RUN_CFG\nDefine: TLS=443=24"
self.assertRaises(
errors.PluginError, self.parser.update_runtime_variables, "ctl")
@mock.patch("letsencrypt_apache.parser.subprocess.Popen")
def test_update_runtime_vars_bad_ctl(self, mock_popen):
mock_popen.side_effect = OSError
self.assertRaises(
errors.MisconfigurationError,
self.parser.update_runtime_variables, "ctl")
@mock.patch("letsencrypt_apache.parser.subprocess.Popen")
def test_update_runtime_vars_bad_exit(self, mock_popen):
mock_popen().communicate.return_value = ("", "")
mock_popen.returncode = -1
self.assertRaises(
errors.MisconfigurationError,
self.parser.update_runtime_variables, "ctl")
class ParserInitTest(util.ApacheTest):
def setUp(self): # pylint: disable=arguments-differ
super(ParserInitTest, self).setUp()
self.aug = augeas.Augeas(
flags=augeas.Augeas.NONE | augeas.Augeas.NO_MODL_AUTOLOAD)
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_root_normalized(self):
from letsencrypt_apache.parser import ApacheParser
with mock.patch("letsencrypt_apache.parser.ApacheParser."
"update_runtime_variables"):
path = os.path.join(
self.temp_dir,
"debian_apache_2_4/////two_vhost_80/../two_vhost_80/apache2")
parser = ApacheParser(self.aug, path, "dummy_ctl")
self.assertEqual(parser.root, self.config_path)
def test_root_absolute(self):
from letsencrypt_apache.parser import ApacheParser
with mock.patch("letsencrypt_apache.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
self.aug, os.path.relpath(self.config_path), "dummy_ctl")
self.assertEqual(parser.root, self.config_path)
def test_root_no_trailing_slash(self):
from letsencrypt_apache.parser import ApacheParser
with mock.patch("letsencrypt_apache.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
self.aug, self.config_path + os.path.sep, "dummy_ctl")
self.assertEqual(parser.root, self.config_path)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -0,0 +1,53 @@
# Global configuration
PidFile ${APACHE_PID_FILE}
#
# Timeout: The number of seconds before receives and sends time out.
#
Timeout 300
#
# KeepAlive: Whether or not to allow persistent connections (more than
# one request per connection). Set to "Off" to deactivate.
#
KeepAlive On
# These need to be set in /etc/apache2/envvars
User ${APACHE_RUN_USER}
Group ${APACHE_RUN_GROUP}
ErrorLog ${APACHE_LOG_DIR}/error.log
LogLevel warn
# Include module configuration:
IncludeOptional mods-enabled/*.load
IncludeOptional mods-enabled/*.conf
<Directory />
Options FollowSymLinks
AllowOverride None
Require all denied
</Directory>
<Directory /var/www/>
Options Indexes FollowSymLinks
AllowOverride None
Require all granted
</Directory>
# Include generic snippets of statements
IncludeOptional conf-enabled/
# Include the virtual host configurations:
IncludeOptional sites-enabled/*.conf
Define COMPLEX
Define tls_port 1234
Define fnmatch_filename test_fnmatch.conf
Include test_variables.conf
# vim: syntax=apache ts=4 sw=4 sts=4 sr noet

View file

@ -0,0 +1,9 @@
# 3 - one arg directives
# 2 - two arg directives
# 1 - three arg directives
TestArgsDirective one_arg
TestArgsDirective one_arg two_arg
TestArgsDirective one_arg
TestArgsDirective one_arg two_arg
TestArgsDirective one_arg two_arg three_arg
TestArgsDirective one_arg

View file

@ -0,0 +1 @@
FNMATCH_DIRECTIVE Success

View file

@ -0,0 +1,65 @@
TestVariablePort ${tls_port}
LoadModule status_module modules/mod_status.so
# Basic IfDefine
<IfDefine COMPLEX>
VAR_DIRECTIVE success
LoadModule ssl_module modules/mod_ssl.so
</IfDefine>
<IfDefine !COMPLEX>
INVALID_VAR_DIRECTIVE failure
</IfDefine>
<IfDefine NOT_COMPLEX>
INVALID_VAR_DIRECTIVE failure
</IfDefine>
<IfDefine !NOT_COMPLEX>
VAR_DIRECTIVE failure
</IfDefine>
# Basic IfModule
<IfModule ssl_module>
MOD_DIRECTIVE Success
</IfModule>
<IfModule !ssl_module>
INVALID_MOD_DIRECTIVE failure
</IfModule>
<IfModule fake_module>
INVALID_MOD_DIRECTIVE failure
</IfModule>
<IfModule !fake_module>
MOD_DIRECTIVE Success
</IfModule>
# Nested Tests
<IfModule status_module>
<IfDefine COMPLEX>
NESTED_DIRECTIVE success
<IfModule mod_ssl.c>
NESTED_DIRECTIVE success
</IfModule>
<IfModule !mod_ssl.c>
INVALID_NESTED_DIRECTIVE failure
</IfModule>
</IfDefine>
<IfDefine !COMPLEX>
INVALID_NESTED_DIRECTIVE failure
<IfModule ssl_module>
INVALID_NESTED_DIRECTIVE failure
</IfModule>
</IfDefine>
NESTED_DIRECTIVE success
</IfModule>

View file

@ -0,0 +1,5 @@
<VirtualHost 1.1.1.1>
ServerName invalid.net
</virtualHost>

View file

@ -0,0 +1 @@
LoadModule rewrite_module /usr/lib/apache2/modules/mod_rewrite.so

View file

@ -1,9 +1,14 @@
"""Common utilities for letsencrypt_apache."""
import os
import pkg_resources
import sys
import unittest
import augeas
import mock
import zope.component
from letsencrypt.display import util as display_util
from letsencrypt.plugins import common
@ -14,19 +19,20 @@ from letsencrypt_apache import obj
class ApacheTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def setUp(self):
def setUp(self, test_dir="debian_apache_2_4/two_vhost_80",
config_root="debian_apache_2_4/two_vhost_80/apache2"):
# pylint: disable=arguments-differ
super(ApacheTest, self).setUp()
self.temp_dir, self.config_dir, self.work_dir = common.dir_setup(
test_dir="debian_apache_2_4/two_vhost_80",
test_dir=test_dir,
pkg="letsencrypt_apache.tests")
self.ssl_options = common.setup_ssl_options(
self.config_dir, constants.MOD_SSL_CONF_SRC,
constants.MOD_SSL_CONF_DEST)
self.config_path = os.path.join(
self.temp_dir, "debian_apache_2_4/two_vhost_80/apache2")
self.config_path = os.path.join(self.temp_dir, config_root)
self.rsa256_file = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
@ -34,29 +40,56 @@ class ApacheTest(unittest.TestCase): # pylint: disable=too-few-public-methods
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
def get_apache_configurator(
config_path, config_dir, work_dir, version=(2, 4, 7)):
"""Create an Apache Configurator with the specified options."""
class ParserTest(ApacheTest): # pytlint: disable=too-few-public-methods
def setUp(self, test_dir="debian_apache_2_4/two_vhost_80",
config_root="debian_apache_2_4/two_vhost_80/apache2"):
super(ParserTest, self).setUp(test_dir, config_root)
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
from letsencrypt_apache.parser import ApacheParser
self.aug = augeas.Augeas(
flags=augeas.Augeas.NONE | augeas.Augeas.NO_MODL_AUTOLOAD)
with mock.patch("letsencrypt_apache.parser.ApacheParser."
"update_runtime_variables"):
self.parser = ApacheParser(
self.aug, self.config_path, "dummy_ctl_path")
def get_apache_configurator(
config_path, config_dir, work_dir, version=(2, 4, 7), conf=None):
"""Create an Apache Configurator with the specified options.
:param conf: Function that returns binary paths. self.conf in Configurator
"""
backups = os.path.join(work_dir, "backups")
with mock.patch("letsencrypt_apache.configurator."
"subprocess.Popen") as mock_popen:
# This just states that the ssl module is already loaded
mock_popen().communicate.return_value = ("ssl_module", "")
config = configurator.ApacheConfigurator(
config=mock.MagicMock(
apache_server_root=config_path,
apache_le_vhost_ext=constants.CLI_DEFAULTS["le_vhost_ext"],
backup_dir=backups,
config_dir=config_dir,
temp_checkpoint_dir=os.path.join(work_dir, "temp_checkpoints"),
in_progress_dir=os.path.join(backups, "IN_PROGRESS"),
work_dir=work_dir),
name="apache",
version=version)
with mock.patch("letsencrypt_apache.parser.ApacheParser."
"update_runtime_variables"):
# This indicates config_test passes
mock_popen().communicate.return_value = ("Fine output", "No problems")
mock_popen().returncode = 0
config.prepare()
config = configurator.ApacheConfigurator(
config=mock.MagicMock(
apache_server_root=config_path,
apache_le_vhost_ext=constants.CLI_DEFAULTS["le_vhost_ext"],
backup_dir=backups,
config_dir=config_dir,
temp_checkpoint_dir=os.path.join(work_dir, "temp_checkpoints"),
in_progress_dir=os.path.join(backups, "IN_PROGRESS"),
work_dir=work_dir),
name="apache",
version=version)
# This allows testing scripts to set it a bit more quickly
if conf is not None:
config.conf = conf # pragma: no cover
config.prepare()
return config
@ -71,23 +104,23 @@ def get_vh_truth(temp_dir, config_name):
obj.VirtualHost(
os.path.join(prefix, "encryption-example.conf"),
os.path.join(aug_pre, "encryption-example.conf/VirtualHost"),
set([common.Addr.fromstring("*:80")]),
False, True, set(["encryption-example.demo"])),
set([obj.Addr.fromstring("*:80")]),
False, True, "encryption-example.demo"),
obj.VirtualHost(
os.path.join(prefix, "default-ssl.conf"),
os.path.join(aug_pre, "default-ssl.conf/IfModule/VirtualHost"),
set([common.Addr.fromstring("_default_:443")]), True, False),
set([obj.Addr.fromstring("_default_:443")]), True, False),
obj.VirtualHost(
os.path.join(prefix, "000-default.conf"),
os.path.join(aug_pre, "000-default.conf/VirtualHost"),
set([common.Addr.fromstring("*:80")]), False, True,
set(["ip-172-30-0-17"])),
set([obj.Addr.fromstring("*:80")]), False, True,
"ip-172-30-0-17"),
obj.VirtualHost(
os.path.join(prefix, "letsencrypt.conf"),
os.path.join(aug_pre, "letsencrypt.conf/VirtualHost"),
set([common.Addr.fromstring("*:80")]), False, True,
set(["letsencrypt.demo"])),
set([obj.Addr.fromstring("*:80")]), False, True,
"letsencrypt.demo"),
]
return vh_truth
return None
return None # pragma: no cover

View file

@ -20,4 +20,5 @@ setup(
'apache = letsencrypt_apache.configurator:ApacheConfigurator',
],
},
include_package_data=True,
)

View file

@ -16,6 +16,7 @@ from letsencrypt_compatibility_test.configurators import common as configurators
APACHE_VERSION_REGEX = re.compile(r"Apache/([0-9\.]*)", re.IGNORECASE)
APACHE_COMMANDS = ["apachectl", "a2enmod"]
class Proxy(configurators_common.Proxy):
@ -29,24 +30,58 @@ class Proxy(configurators_common.Proxy):
super(Proxy, self).__init__(args)
self.le_config.apache_le_vhost_ext = "-le-ssl.conf"
self._patches = list()
subprocess_patch = mock.patch(
"letsencrypt_apache.configurator.subprocess")
subprocess_mock = subprocess_patch.start()
subprocess_mock.check_call = self.check_call_in_docker
subprocess_mock.Popen = self.popen_in_docker
self._patches.append(subprocess_patch)
display_patch = mock.patch(
"letsencrypt_apache.configurator.display_ops.select_vhost")
display_mock = display_patch.start()
display_mock.side_effect = le_errors.PluginError(
"Unable to determine vhost")
self._patches.append(display_mock)
self._setup_mock()
self.modules = self.server_root = self.test_conf = self.version = None
self._apache_configurator = self._all_names = self._test_names = None
def _setup_mock(self):
"""Replaces specific modules with mock.MagicMock"""
mock_subprocess = mock.MagicMock()
mock_subprocess.check_call = self.check_call
mock_subprocess.Popen = self.popen
mock.patch(
"letsencrypt_apache.configurator.subprocess",
mock_subprocess).start()
mock.patch(
"letsencrypt_apache.parser.subprocess",
mock_subprocess).start()
mock.patch(
"letsencrypt.le_util.subprocess",
mock_subprocess).start()
mock.patch(
"letsencrypt_apache.configurator.le_util.exe_exists",
_is_apache_command).start()
patch = mock.patch(
"letsencrypt_apache.configurator.display_ops.select_vhost")
mock_display = patch.start()
mock_display.side_effect = le_errors.PluginError(
"Unable to determine vhost")
def check_call(self, command, *args, **kwargs):
"""If command is an Apache command, command is executed in the
running docker image. Otherwise, subprocess.check_call is used.
"""
if _is_apache_command(command):
command = _modify_command(command)
return super(Proxy, self).check_call(command, *args, **kwargs)
else:
return subprocess.check_call(command, *args, **kwargs)
def popen(self, command, *args, **kwargs):
"""If command is an Apache command, command is executed in the
running docker image. Otherwise, subprocess.Popen is used.
"""
if _is_apache_command(command):
command = _modify_command(command)
return super(Proxy, self).popen(command, *args, **kwargs)
else:
return subprocess.Popen(command, *args, **kwargs)
def __getattr__(self, name):
"""Wraps the Apache Configurator methods"""
method = getattr(self._apache_configurator, name, None)
@ -59,8 +94,7 @@ class Proxy(configurators_common.Proxy):
"""Loads the next configuration for the plugin to test"""
if hasattr(self.le_config, "apache_init_script"):
try:
self.check_call_in_docker(
[self.le_config.apache_init_script, "stop"])
self.check_call([self.le_config.apache_init_script, "stop"])
except errors.Error:
raise errors.Error(
"Failed to stop previous apache config from running")
@ -79,9 +113,8 @@ class Proxy(configurators_common.Proxy):
self._prepare_configurator(server_root, config_file)
try:
self.check_call_in_docker(
"apachectl -d {0} -f {1} -k start".format(
server_root, config_file))
self.check_call("apachectl -d {0} -f {1} -k start".format(
server_root, config_file))
except errors.Error:
raise errors.Error(
"Apache failed to load {0} before tests started".format(
@ -115,6 +148,7 @@ class Proxy(configurators_common.Proxy):
self.le_config.apache_ctl = "apachectl -d {0} -f {1}".format(
server_root, config_file)
self.le_config.apache_enmod = "a2enmod.sh {0}".format(server_root)
self.le_config.apache_dismod = self.le_config.apache_enmod
self.le_config.apache_init_script = self.le_config.apache_ctl + " -k"
self._apache_configurator = configurator.ApacheConfigurator(
@ -125,8 +159,7 @@ class Proxy(configurators_common.Proxy):
def cleanup_from_tests(self):
"""Performs any necessary cleanup from running plugin tests"""
super(Proxy, self).cleanup_from_tests()
for patch in self._patches:
patch.stop()
mock.patch.stopall()
def get_all_names_answer(self):
"""Returns the set of domain names that the plugin should find"""
@ -150,6 +183,30 @@ class Proxy(configurators_common.Proxy):
domain, cert_path, key_path, chain_path)
def _is_apache_command(command):
"""Returns true if command is an Apache command"""
if isinstance(command, list):
command = command[0]
for apache_command in APACHE_COMMANDS:
if command.startswith(apache_command):
return True
return False
def _modify_command(command):
"""Modifies command so configtest works inside the docker image"""
if isinstance(command, list):
for i in xrange(len(command)):
if command[i] == "configtest":
command[i] = "-t"
else:
command = command.replace("configtest", "-t")
return command
def _create_test_conf(server_root, apache_config):
"""Creates a test config file and adds it to the Apache config"""
test_conf = os.path.join(server_root, "test.conf")

View file

@ -3,7 +3,6 @@ import logging
import os
import shutil
import tempfile
import threading
import docker
@ -47,7 +46,7 @@ class Proxy(object):
self._docker_client = docker.Client(
base_url=self.args.docker_url, version="auto")
self.http_port, self.https_port = util.get_two_free_ports()
self._container_id = self._log_thread = None
self._container_id = None
def has_more_configs(self):
"""Returns true if there are more configs to test"""
@ -56,7 +55,6 @@ class Proxy(object):
def cleanup_from_tests(self):
"""Performs any necessary cleanup from running plugin tests"""
self._docker_client.stop(self._container_id, 0)
self._log_thread.join()
if not self.args.no_remove:
self._docker_client.remove_container(self._container_id)
@ -87,26 +85,18 @@ class Proxy(object):
self._container_id = container["Id"]
self._docker_client.start(self._container_id)
self._log_thread = threading.Thread(target=self._start_log_thread)
self._log_thread.start()
def _start_log_thread(self):
client = docker.Client(base_url=self.args.docker_url, version="auto")
for line in client.logs(self._container_id, stream=True):
logger.debug(line.rstrip())
def check_call_in_docker(
self, command, *args, **kwargs): # pylint: disable=unused-argument
def check_call(self, command, *args, **kwargs):
# pylint: disable=unused-argument
"""Simulates a call to check_call but executes the command in the
running docker image
"""
if self.popen_in_docker(command).returncode:
if self.popen(command).returncode:
raise errors.Error(
"{0} exited with a nonzero value".format(command))
def popen_in_docker(
self, command, *args, **kwargs): # pylint: disable=unused-argument
def popen(self, command, *args, **kwargs):
# pylint: disable=unused-argument
"""Simulates a call to Popen but executes the command in the
running docker image

View file

@ -6,6 +6,7 @@ import logging
import os
import shutil
import tempfile
import time
import OpenSSL
@ -60,11 +61,12 @@ def test_authenticator(plugin, config, temp_dir):
type(achalls[i]), achalls[i].domain, config)
success = False
elif isinstance(responses[i], challenges.DVSNIResponse):
if responses[i].simple_verify(achalls[i],
achalls[i].domain,
util.JWK.key.public_key(),
host="127.0.0.1",
port=plugin.https_port):
verify = functools.partial(responses[i].simple_verify, achalls[i],
achalls[i].domain,
util.JWK.key.public_key(),
host="127.0.0.1",
port=plugin.https_port)
if _try_until_true(verify):
logger.info(
"DVSNI verification for %s succeeded", achalls[i].domain)
else:
@ -149,10 +151,11 @@ def test_deploy_cert(plugin, temp_dir, domains):
if not _save_and_restart(plugin, "deployed"):
return False
verify_cert = validator.Validator().certificate
success = True
for domain in domains:
if not verify_cert(cert, domain, "127.0.0.1", plugin.https_port):
verify = functools.partial(validator.Validator().certificate, cert,
domain, "127.0.0.1", plugin.https_port)
if not _try_until_true(verify):
logger.error("Could not verify certificate for domain %s", domain)
success = False
@ -175,18 +178,18 @@ def test_enhancements(plugin, domains):
try:
plugin.enhance(domain, "redirect")
except le_errors.Error as error:
logger.error("Plugin failed to enable redirect for %s:", domain)
logger.exception(error)
return False
# Don't immediately fail because a redirect may already be enabled
logger.warning("Plugin failed to enable redirect for %s:", domain)
logger.warning("%s", error)
if not _save_and_restart(plugin, "enhanced"):
return False
verify_redirect = functools.partial(
validator.Validator().redirect, "localhost", plugin.http_port)
success = True
for domain in domains:
if not verify_redirect(headers={"Host" : domain}):
verify = functools.partial(validator.Validator().redirect, "localhost",
plugin.http_port, headers={"Host" : domain})
if not _try_until_true(verify):
logger.error("Improper redirect for domain %s", domain)
success = False
@ -196,6 +199,17 @@ def test_enhancements(plugin, domains):
return success
def _try_until_true(func, max_tries=3):
"""Calls func up to max_tries times until it returns True"""
for _ in xrange(0, max_tries):
if func():
return True
else:
time.sleep(1)
return False
def _save_and_restart(plugin, title=None):
"""Saves and restart the plugin, returning True if no errors occurred"""
try:

View file

@ -244,22 +244,15 @@ class NginxConfigurator(common.Plugin):
"""
all_names = set()
# Kept in same function to avoid multiple compilations of the regex
priv_ip_regex = (r"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|"
r"(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)")
private_ips = re.compile(priv_ip_regex)
hostname_regex = r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$"
hostnames = re.compile(hostname_regex, re.IGNORECASE)
for vhost in self.parser.get_vhosts():
all_names.update(vhost.names)
for addr in vhost.addrs:
host = addr.get_addr()
if hostnames.match(host):
if common.hostname_regex.match(host):
# If it's a hostname, add it to the names.
all_names.add(host)
elif not private_ips.match(host):
elif not common.private_ips_regex.match(host):
# If it isn't a private IP, do a reverse DNS lookup
# TODO: IPv6 support
try:
@ -451,7 +444,7 @@ class NginxConfigurator(common.Plugin):
# nginx < 0.8.48 uses machine hostname as default server_name instead of
# the empty string
if nginx_version < (0, 8, 48):
raise errors.PluginError("Nginx version must be 0.8.48+")
raise errors.NotSupportedError("Nginx version must be 0.8.48+")
return nginx_version

View file

@ -37,7 +37,7 @@ class RawNginxParser(object):
+ Group(ZeroOrMore(Group(comment | assignment) | block))
+ right_bracket)
script = OneOrMore(Group(comment | assignment) | block) + stringEnd
script = OneOrMore(Group(comment | assignment) ^ block) + stringEnd
def __init__(self, source):
self.source = source

View file

@ -249,7 +249,7 @@ class NginxConfiguratorTest(util.NginxTest):
" (based on LLVM 3.5svn)",
"TLS SNI support enabled",
"configure arguments: --with-http_ssl_module"]))
self.assertRaises(errors.PluginError, self.config.get_version)
self.assertRaises(errors.NotSupportedError, self.config.get_version)
mock_popen.side_effect = OSError("Can't find program")
self.assertRaises(errors.PluginError, self.config.get_version)

View file

@ -5,7 +5,7 @@ import unittest
from pyparsing import ParseException
from letsencrypt_nginx.nginxparser import (
RawNginxParser, load, dumps, dump)
RawNginxParser, loads, load, dumps, dump)
from letsencrypt_nginx.tests import util
@ -160,6 +160,13 @@ class TestRawNginxParser(unittest.TestCase):
['#', ' listen 80;']]],
])
def test_issue_518(self):
parsed = loads('if ($http_accept ~* "webp") { set $webp "true"; }')
self.assertEqual(parsed, [
[['if', '($http_accept ~* "webp")'],
[['set', '$webp "true"']]]
])
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -19,4 +19,5 @@ setup(
'nginx = letsencrypt_nginx.configurator:NginxConfigurator',
],
},
include_package_data=True,
)

View file

@ -172,7 +172,7 @@ def run(args, config, plugins):
authenticator = display_ops.pick_authenticator(
config, args.authenticator, plugins)
else:
# TODO: this assume that user doesn't want to pick authenticator
# TODO: this assumes that user doesn't want to pick authenticator
# and installer separately...
authenticator = installer = display_ops.pick_configurator(
config, args.configurator, plugins)

View file

@ -16,9 +16,9 @@ CLI_DEFAULTS = dict(
"letsencrypt", "cli.ini"),
],
verbose_count=-(logging.WARNING / 10),
server="https://www.letsencrypt-demo.org/acme/new-reg",
server="https://acme-staging.api.letsencrypt.org/acme/new-reg",
rsa_key_size=2048,
rollback_checkpoints=0,
rollback_checkpoints=1,
config_dir="/etc/letsencrypt",
work_dir="/var/lib/letsencrypt",
logs_dir="/var/log/letsencrypt",

View file

@ -82,6 +82,8 @@ def pick_plugin(config, default, plugins, question, ifaces):
elif len(prepared) == 1:
plugin_ep = prepared.values()[0]
logger.debug("Single candidate plugin: %s", plugin_ep)
if plugin_ep.misconfigured:
return None
return plugin_ep.init()
else:
logger.debug("No candidate plugin")
@ -90,7 +92,7 @@ def pick_plugin(config, default, plugins, question, ifaces):
def pick_authenticator(
config, default, plugins, question="How would you "
"like to authenticate with Let's Encrypt CA?"):
"like to authenticate with the Let's Encrypt CA?"):
"""Pick authentication plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.IAuthenticator,))

View file

@ -5,6 +5,10 @@ class Error(Exception):
"""Generic Let's Encrypt client error."""
class SubprocessError(Error):
"""Subprocess handling error."""
class AccountStorageError(Error):
"""Generic `.AccountStorage` error."""
@ -65,6 +69,9 @@ class NoInstallationError(PluginError):
class MisconfigurationError(PluginError):
"""Let's Encrypt Misconfiguration error."""
class NotSupportedError(PluginError):
"""Let's Encrypt Plugin function not supported error."""
class RevokerError(Error):
"""Let's Encrypt Revoker error."""

View file

@ -102,14 +102,19 @@ class IPlugin(zope.interface.Interface):
def prepare():
"""Prepare the plugin.
Finish up any additional initialization.
Finish up any additional initialization.
:raises .MisconfigurationError:
when full initialization cannot be completed. Plugin will
be displayed on a list of available plugins.
:raises .NoInstallationError:
when the necessary programs/files cannot be located. Plugin
will NOT be displayed on a list of available plugins.
:raises .PluginError:
when full initialization cannot be completed.
:raises .MisconfigurationError:
when full initialization cannot be completed. Plugin will
be displayed on a list of available plugins.
:raises .NoInstallationError:
when the necessary programs/files cannot be located. Plugin
will NOT be displayed on a list of available plugins.
:raises .NotSupportedError:
when the installation is recognized, but the version is not
currently supported.
"""
@ -166,6 +171,8 @@ class IAuthenticator(IPlugin):
:rtype: :class:`list` of
:class:`acme.challenges.ChallengeResponse`
:raises .PluginError: If challenges cannot be performed
"""
def cleanup(achalls):
@ -175,6 +182,8 @@ class IAuthenticator(IPlugin):
:class:`~letsencrypt.achallenges.AnnotatedChallenge`
instances, a subset of those previously passed to :func:`perform`.
:raises PluginError: if original configuration cannot be restored
"""
@ -248,6 +257,8 @@ class IInstaller(IPlugin):
:param str key_path: absolute path to the private key file
:param str chain_path: absolute path to the certificate chain file
:raises .PluginError: when cert cannot be deployed
"""
def enhance(domain, enhancement, options=None):
@ -261,6 +272,9 @@ class IInstaller(IPlugin):
:const:`~letsencrypt.constants.ENHANCEMENTS`
for expected options for each enhancement.
:raises .PluginError: If Enhancement is not supported, or if
an error occurs during the enhancement.
"""
def supported_enhancements():
@ -299,19 +313,37 @@ class IInstaller(IPlugin):
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (challenges)
:raises .PluginError: when save is unsuccessful
"""
def rollback_checkpoints(rollback=1):
"""Revert `rollback` number of configuration checkpoints."""
"""Revert `rollback` number of configuration checkpoints.
:raises .PluginError: when configuration cannot be fully reverted
"""
def view_config_changes():
"""Display all of the LE config changes."""
"""Display all of the LE config changes.
:raises .PluginError: when config changes cannot be parsed
"""
def config_test():
"""Make sure the configuration is valid."""
"""Make sure the configuration is valid.
:raises .MisconfigurationError: when the config is not in a usable state
"""
def restart():
"""Restart or refresh the server content."""
"""Restart or refresh the server content.
:raises .PluginError: when server cannot be restarted
"""
class IDisplay(zope.interface.Interface):

View file

@ -4,6 +4,7 @@ import errno
import logging
import os
import re
import subprocess
import stat
from letsencrypt import errors
@ -17,6 +18,58 @@ Key = collections.namedtuple("Key", "file pem")
CSR = collections.namedtuple("CSR", "file data form")
def run_script(params):
"""Run the script with the given params.
:param list params: List of parameters to pass to Popen
"""
try:
proc = subprocess.Popen(params,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except (OSError, ValueError):
msg = "Unable to run the command: %s" % " ".join(params)
logger.error(msg)
raise errors.SubprocessError(msg)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
msg = "Error while running %s.\n%s\n%s" % (
" ".join(params), stdout, stderr)
# Enter recovery routine...
logger.error(msg)
raise errors.SubprocessError(msg)
return stdout, stderr
def exe_exists(exe):
"""Determine whether path/name refers to an executable.
:param str exe: Executable path or name
:returns: If exe is a valid executable
:rtype: bool
"""
def is_exe(path):
"""Determine if path is an exe."""
return os.path.isfile(path) and os.access(path, os.X_OK)
path, _ = os.path.split(exe)
if path:
return is_exe(exe)
else:
for path in os.environ["PATH"].split(os.pathsep):
if is_exe(os.path.join(path, exe)):
return True
return False
def make_or_verify_dir(directory, mode=0o755, uid=0):
"""Make sure directory exists with proper permissions.

View file

@ -1,6 +1,7 @@
"""Plugin common functions."""
import os
import pkg_resources
import re
import shutil
import tempfile
@ -22,6 +23,12 @@ def dest_namespace(name):
"""ArgumentParser dest namespace (prefix of all destinations)."""
return name + "_"
private_ips_regex = re.compile( # pylint: disable=invalid-name
r"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|"
r"(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)")
hostname_regex = re.compile( # pylint: disable=invalid-name
r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$", re.IGNORECASE)
class Plugin(object):
"""Generic plugin."""

View file

@ -1,4 +1,5 @@
"""Reverter class saves configuration checkpoints and allows for recovery."""
import csv
import logging
import os
import shutil
@ -20,6 +21,8 @@ logger = logging.getLogger(__name__)
class Reverter(object):
"""Reverter Class - save and revert configuration checkpoints.
.. note:: Consider moving everything over to CSV format.
:param config: Configuration.
:type config: :class:`letsencrypt.interfaces.IConfig`
@ -27,6 +30,9 @@ class Reverter(object):
def __init__(self, config):
self.config = config
le_util.make_or_verify_dir(
config.backup_dir, constants.CONFIG_DIRS_MODE, os.geteuid())
def revert_temporary_config(self):
"""Reload users original configuration files after a temporary save.
@ -91,6 +97,8 @@ class Reverter(object):
.. todo:: Decide on a policy for error handling, OSError IOError...
:raises .errors.ReverterError: If invalid directory structure.
"""
backups = os.listdir(self.config.backup_dir)
backups.sort(reverse=True)
@ -98,6 +106,7 @@ class Reverter(object):
if not backups:
logger.info("The Let's Encrypt client has not saved any backups "
"of your configuration")
return
# Make sure there isn't anything unexpected in the backup folder
# There should only be timestamped (float) directories
@ -201,7 +210,7 @@ class Reverter(object):
notes_fd.write(save_notes)
def _read_and_append(self, filepath): # pylint: disable=no-self-use
"""Reads the file lines and returns a fd.
"""Reads the file lines and returns a file obj.
Read the file returning the lines, and a pointer to the end of the file.
@ -227,6 +236,10 @@ class Reverter(object):
:raises errors.ReverterError: If unable to recover checkpoint
"""
# Undo all commands
if os.path.isfile(os.path.join(cp_dir, "COMMANDS")):
self._run_undo_commands(os.path.join(cp_dir, "COMMANDS"))
# Revert all changed files
if os.path.isfile(os.path.join(cp_dir, "FILEPATHS")):
try:
with open(os.path.join(cp_dir, "FILEPATHS")) as paths_fd:
@ -251,6 +264,17 @@ class Reverter(object):
raise errors.ReverterError(
"Unable to remove directory: %s" % cp_dir)
def _run_undo_commands(self, filepath): # pylint: disable=no-self-use
"""Run all commands in a file."""
with open(filepath, 'rb') as csvfile:
csvreader = csv.reader(csvfile)
for command in reversed(list(csvreader)):
try:
le_util.run_script(command)
except errors.SubprocessError:
logger.error(
"Unable to run undo command: %s", " ".join(command))
def _check_tempfile_saves(self, save_files):
"""Verify save isn't overwriting any temporary files.
@ -303,13 +327,7 @@ class Reverter(object):
raise errors.ReverterError(
"Forgot to provide files to registration call")
if temporary:
cp_dir = self.config.temp_checkpoint_dir
else:
cp_dir = self.config.in_progress_dir
le_util.make_or_verify_dir(
cp_dir, constants.CONFIG_DIRS_MODE, os.geteuid())
cp_dir = self._get_cp_dir(temporary)
# Append all new files (that aren't already registered)
new_fd = None
@ -328,6 +346,53 @@ class Reverter(object):
if new_fd is not None:
new_fd.close()
def register_undo_command(self, temporary, command):
"""Register a command to be run to undo actions taken.
.. warning:: This function does not enforce order of operations in terms
of file modification vs. command registration. All undo commands
are run first before all normal files are reverted to their previous
state. If you need to maintain strict order, you may create
checkpoints before and after the the command registration. This
function may be improved in the future based on demand.
:param bool temporary: Whether the command should be saved in the
IN_PROGRESS or TEMPORARY checkpoints.
:param command: Command to be run.
:type command: list of str
"""
commands_fp = os.path.join(self._get_cp_dir(temporary), "COMMANDS")
command_file = None
try:
if os.path.isfile(commands_fp):
command_file = open(commands_fp, "ab")
else:
command_file = open(commands_fp, "wb")
csvwriter = csv.writer(command_file)
csvwriter.writerow(command)
except (IOError, OSError):
logger.error("Unable to register undo command")
raise errors.ReverterError(
"Unable to register undo command.")
finally:
if command_file is not None:
command_file.close()
def _get_cp_dir(self, temporary):
"""Return the proper reverter directory."""
if temporary:
cp_dir = self.config.temp_checkpoint_dir
else:
cp_dir = self.config.in_progress_dir
le_util.make_or_verify_dir(
cp_dir, constants.CONFIG_DIRS_MODE, os.geteuid())
return cp_dir
def recovery_routine(self):
"""Revert configuration to most recent finalized checkpoint.
@ -335,6 +400,8 @@ class Reverter(object):
finalized. This is useful to protect against crashes and other
execution interruptions.
:raises .errors.ReverterError: If unable to recover the configuration
"""
# First, any changes found in IConfig.temp_checkpoint_dir are removed,
# then IN_PROGRESS changes are removed The order is important.
@ -380,7 +447,7 @@ class Reverter(object):
os.remove(path)
else:
logger.warning(
"File: %s - Could not be found to be deleted%s"
"File: %s - Could not be found to be deleted %s - "
"LE probably shut down unexpectedly",
os.linesep, path)
except (IOError, OSError):

View file

@ -92,9 +92,19 @@ class PickPluginTest(unittest.TestCase):
def test_single(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
plugin_ep.misconfigured = False
self.reg.ifaces().verify().available.return_value = {"bar": plugin_ep}
self.assertEqual("foo", self._call())
def test_single_misconfigured(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
plugin_ep.misconfigured = True
self.reg.ifaces().verify().available.return_value = {"bar": plugin_ep}
self.assertTrue(self._call() is None)
def test_multiple(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"

View file

@ -11,6 +11,67 @@ import mock
from letsencrypt import errors
class RunScriptTest(unittest.TestCase):
"""Tests for letsencrypt.le_util.run_script."""
@classmethod
def _call(cls, params):
from letsencrypt.le_util import run_script
return run_script(params)
@mock.patch("letsencrypt.le_util.subprocess.Popen")
def test_default(self, mock_popen):
"""These will be changed soon enough with reload."""
mock_popen().returncode = 0
mock_popen().communicate.return_value = ("stdout", "stderr")
out, err = self._call(["test"])
self.assertEqual(out, "stdout")
self.assertEqual(err, "stderr")
@mock.patch("letsencrypt.le_util.subprocess.Popen")
def test_bad_process(self, mock_popen):
mock_popen.side_effect = OSError
self.assertRaises(errors.SubprocessError, self._call, ["test"])
@mock.patch("letsencrypt.le_util.subprocess.Popen")
def test_failure(self, mock_popen):
mock_popen().communicate.return_value = ("", "")
mock_popen().returncode = 1
self.assertRaises(errors.SubprocessError, self._call, ["test"])
class ExeExistsTest(unittest.TestCase):
"""Tests for letsencrypt.le_util.exe_exists."""
@classmethod
def _call(cls, exe):
from letsencrypt.le_util import exe_exists
return exe_exists(exe)
@mock.patch("letsencrypt.le_util.os.path.isfile")
@mock.patch("letsencrypt.le_util.os.access")
def test_full_path(self, mock_access, mock_isfile):
mock_access.return_value = True
mock_isfile.return_value = True
self.assertTrue(self._call("/path/to/exe"))
@mock.patch("letsencrypt.le_util.os.path.isfile")
@mock.patch("letsencrypt.le_util.os.access")
def test_on_path(self, mock_access, mock_isfile):
mock_access.return_value = True
mock_isfile.return_value = True
self.assertTrue(self._call("exe"))
@mock.patch("letsencrypt.le_util.os.path.isfile")
@mock.patch("letsencrypt.le_util.os.access")
def test_not_found(self, mock_access, mock_isfile):
mock_access.return_value = False
mock_isfile.return_value = True
self.assertFalse(self._call("exe"))
class MakeOrVerifyDirTest(unittest.TestCase):
"""Tests for letsencrypt.le_util.make_or_verify_dir.

View file

@ -1,4 +1,6 @@
"""Test letsencrypt.reverter."""
import csv
import itertools
import logging
import os
import shutil
@ -11,7 +13,7 @@ from letsencrypt import errors
class ReverterCheckpointLocalTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes, too-many-public-methods
"""Test the Reverter Class."""
def setUp(self):
from letsencrypt.reverter import Reverter
@ -126,6 +128,42 @@ class ReverterCheckpointLocalTest(unittest.TestCase):
errors.ReverterError, self.reverter.register_file_creation,
"filepath")
def test_register_undo_command(self):
coms = [
["a2dismod", "ssl"],
["a2dismod", "rewrite"],
["cleanslate"]
]
for com in coms:
self.reverter.register_undo_command(True, com)
act_coms = get_undo_commands(self.config.temp_checkpoint_dir)
for a_com, com in itertools.izip(act_coms, coms):
self.assertEqual(a_com, com)
def test_bad_register_undo_command(self):
m_open = mock.mock_open()
with mock.patch("letsencrypt.reverter.open", m_open, create=True):
m_open.side_effect = OSError("bad open")
self.assertRaises(
errors.ReverterError, self.reverter.register_undo_command,
True, ["command"])
@mock.patch("letsencrypt.le_util.run_script")
def test_run_undo_commands(self, mock_run):
mock_run.side_effect = ["", errors.SubprocessError]
coms = [
["invalid_command"],
["a2dismod", "ssl"],
]
for com in coms:
self.reverter.register_undo_command(True, com)
self.reverter.revert_temporary_config()
self.assertEqual(mock_run.call_count, 2)
def test_recovery_routine_in_progress_failure(self):
self.reverter.add_to_checkpoint(self.sets[0], "perm save")
@ -377,7 +415,6 @@ def setup_work_direc():
"""
work_dir = tempfile.mkdtemp("work")
backup_dir = os.path.join(work_dir, "backup")
os.makedirs(backup_dir)
return mock.MagicMock(
work_dir=work_dir, backup_dir=backup_dir,
@ -391,9 +428,9 @@ def setup_test_files():
dir2 = tempfile.mkdtemp("dir2")
config1 = os.path.join(dir1, "config.txt")
config2 = os.path.join(dir2, "config.txt")
with open(config1, 'w') as file_fd:
with open(config1, "w") as file_fd:
file_fd.write("directive-dir1")
with open(config2, 'w') as file_fd:
with open(config2, "w") as file_fd:
file_fd.write("directive-dir2")
sets = [set([config1]),
@ -405,30 +442,36 @@ def setup_test_files():
def get_save_notes(dire):
"""Read save notes"""
return read_in(os.path.join(dire, 'CHANGES_SINCE'))
return read_in(os.path.join(dire, "CHANGES_SINCE"))
def get_filepaths(dire):
"""Get Filepaths"""
return read_in(os.path.join(dire, 'FILEPATHS'))
return read_in(os.path.join(dire, "FILEPATHS"))
def get_new_files(dire):
"""Get new files."""
return read_in(os.path.join(dire, 'NEW_FILES')).splitlines()
return read_in(os.path.join(dire, "NEW_FILES")).splitlines()
def get_undo_commands(dire):
"""Get new files."""
with open(os.path.join(dire, "COMMANDS")) as csvfile:
return list(csv.reader(csvfile))
def read_in(path):
"""Read in a file, return the str"""
with open(path, 'r') as file_fd:
with open(path, "r") as file_fd:
return file_fd.read()
def update_file(filename, string):
"""Update a file with a new value."""
with open(filename, 'w') as file_fd:
with open(filename, "w") as file_fd:
file_fd.write(string)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -10,11 +10,5 @@ export GOPATH="${GOPATH:-/tmp/go}"
go get -d github.com/letsencrypt/boulder/cmd/boulder
cd $GOPATH/src/github.com/letsencrypt/boulder
make -j4 # Travis has 2 cores per build instance.
if [ "$1" = "amqp" ];
then
./start.py &
else
./start.sh &
fi
# Hopefully start.py/start.sh bootstraps before integration test is started...
./start.py &
# Hopefully start.py bootstraps before integration test is started...

View file

@ -2,7 +2,9 @@
if [ "xxx$root" = "xxx" ];
then
root="$(mktemp -d)"
# The -t is required on OS X. It provides a template file path for
# the kernel to use.
root="$(mktemp -d -t leitXXXX)"
echo "Root integration tests directory: $root"
fi
store_flags="--config-dir $root/conf --work-dir $root/work"

View file

@ -22,5 +22,5 @@ rm -f .coverage # --cover-erase is off, make sure stats are correct
# after_success)
cover letsencrypt 97 && \
cover acme 100 && \
cover letsencrypt_apache 78 && \
cover letsencrypt_apache 100 && \
cover letsencrypt_nginx 96