Merge pull request #135 from letsencrypt/configurator_tests

Configurator and Client Refactor
This commit is contained in:
James Kasten 2014-12-21 16:33:17 -08:00
commit 4de3b6a340
18 changed files with 1603 additions and 1367 deletions

View file

@ -0,0 +1 @@
"""Let's Encrypt client.apache."""

View file

@ -0,0 +1,91 @@
"""Module contains classes used by the Apache Configurator."""
class Addr(object):
"""Represents an Apache VirtualHost address.
:param str addr: addr part of vhost address
:param str port: port number or *, or ""
"""
def __init__(self, tup):
self.tup = tup
@classmethod
def fromstring(cls, str_addr):
"""Initialize Addr from string."""
tup = str_addr.partition(':')
return cls((tup[0], tup[2]))
def __str__(self):
if self.tup[1]:
return "%s:%s" % self.tup
return self.tup[0]
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.tup == other.tup
return False
def __hash__(self):
return hash(self.tup)
def get_addr(self):
"""Return addr part of Addr object."""
return self.tup[0]
def get_port(self):
"""Return port."""
return self.tup[1]
def get_addr_obj(self, port):
"""Return new address object with same addr and new port."""
return self.__class__((self.tup[0], port))
# pylint: disable=too-few-public-methods
class VirtualHost(object):
"""Represents an Apache Virtualhost.
:ivar str filep: file path of VH
:ivar str path: Augeas path to virtual host
:ivar set addrs: Virtual Host addresses (:class:`set` of :class:`Addr`)
:ivar set names: Server names/aliases of vhost
(:class:`list` of :class:`str`)
:ivar bool ssl: SSLEngine on in vhost
:ivar bool enabled: Virtual host is enabled
"""
def __init__(self, filep, path, addrs, ssl, enabled, names=None):
"""Initialize a VH."""
self.filep = filep
self.path = path
self.addrs = addrs
self.names = set() if names is None else set(names)
self.ssl = ssl
self.enabled = enabled
def add_name(self, name):
"""Add name to vhost."""
self.names.add(name)
def __str__(self):
addr_str = ", ".join(str(addr) for addr in self.addrs)
return ("file: %s\n"
"vh_path: %s\n"
"addrs: %s\n"
"names: %s\n"
"ssl: %s\n"
"enabled: %s" % (self.filep, self.path, addr_str,
self.names, self.ssl, self.enabled))
def __eq__(self, other):
if isinstance(other, self.__class__):
return (self.filep == other.filep and self.path == other.path and
self.addrs == other.addrs and
self.names == other.names and
self.ssl == other.ssl and self.enabled == other.enabled)
return False

View file

@ -0,0 +1,401 @@
"""ApacheParser is a member object of the ApacheConfigurator class."""
import os
import re
from letsencrypt.client import errors
class ApacheParser(object):
"""Class handles the fine details of parsing the Apache Configuration."""
def __init__(self, aug, root, ssl_options):
# Find configuration root and make sure augeas can parse it.
self.aug = aug
self.root = root
self.loc = self._set_locations(ssl_options)
self._parse_file(self.loc["root"])
# Must also attempt to parse sites-available or equivalent
# Sites-available is not included naturally in configuration
self._parse_file(os.path.join(self.root, "sites-available/*"))
# This problem has been fixed in Augeas 1.0
self.standardize_excl()
def add_dir_to_ifmodssl(self, aug_conf_path, directive, val):
"""Adds directive and value to IfMod ssl block.
Adds given directive and value along configuration path within
an IfMod mod_ssl.c block. If the IfMod block does not exist in
the file, it is created.
:param str aug_conf_path: Desired Augeas config path to add directive
:param str directive: Directive you would like to add
:param str val: Value of directive ie. Listen 443, 443 is the value
"""
# TODO: Add error checking code... does the path given even exist?
# Does it throw exceptions?
if_mod_path = self._get_ifmod(aug_conf_path, "mod_ssl.c")
# IfModule can have only one valid argument, so append after
self.aug.insert(if_mod_path + "arg", "directive", False)
nvh_path = if_mod_path + "directive[1]"
self.aug.set(nvh_path, directive)
self.aug.set(nvh_path + "/arg", val)
def _get_ifmod(self, aug_conf_path, mod):
"""Returns the path to <IfMod mod> and creates one if it doesn't exist.
:param str aug_conf_path: Augeas configuration path
:param str mod: module ie. mod_ssl.c
"""
if_mods = self.aug.match(("%s/IfModule/*[self::arg='%s']" %
(aug_conf_path, mod)))
if len(if_mods) == 0:
self.aug.set("%s/IfModule[last() + 1]" % aug_conf_path, "")
self.aug.set("%s/IfModule[last()]/arg" % aug_conf_path, mod)
if_mods = self.aug.match(("%s/IfModule/*[self::arg='%s']" %
(aug_conf_path, mod)))
# Strip off "arg" at end of first ifmod path
return if_mods[0][:len(if_mods[0]) - 3]
def add_dir(self, aug_conf_path, directive, arg):
"""Appends directive to the end fo the file given by aug_conf_path.
.. note:: Not added to AugeasConfigurator because it may depend
on the lens
:param str aug_conf_path: Augeas configuration path to add directive
:param str directive: Directive to add
:param str arg: Value of the directive. ie. Listen 443, 443 is arg
"""
self.aug.set(aug_conf_path + "/directive[last() + 1]", directive)
if type(arg) is not list:
self.aug.set(aug_conf_path + "/directive[last()]/arg", arg)
else:
for i in range(len(arg)):
self.aug.set("%s/directive[last()]/arg[%d]" %
(aug_conf_path, (i+1)),
arg[i])
def find_dir(self, directive, arg=None, start=None):
"""Finds directive in the configuration.
Recursively searches through config files to find directives
Directives should be in the form of a case insensitive regex currently
.. todo:: Add order to directives returned. Last directive comes last..
.. todo:: arg should probably be a list
Note: Augeas is inherently case sensitive while Apache is case
insensitive. Augeas 1.0 allows case insensitive regexes like
regexp(/Listen/, 'i'), however the version currently supported
by Ubuntu 0.10 does not. Thus I have included my own case insensitive
transformation by calling case_i() on everything to maintain
compatibility.
:param str directive: Directive to look for
:param arg: Specific value direcitve must have, None if all should
be considered
:type arg: str or None
:param str start: Beginning Augeas path to begin looking
"""
# Cannot place member variable in the definition of the function so...
if not start:
start = get_aug_path(self.loc["root"])
# Debug code
# print "find_dir:", directive, "arg:", arg, " | Looking in:", start
# No regexp code
# if arg is None:
# matches = self.aug.match(start +
# "//*[self::directive='"+directive+"']/arg")
# else:
# matches = self.aug.match(start +
# "//*[self::directive='" + directive+"']/* [self::arg='" + arg + "']")
# includes = self.aug.match(start +
# "//* [self::directive='Include']/* [label()='arg']")
if arg is None:
matches = self.aug.match(("%s//*[self::directive=~regexp('%s')]/arg"
% (start, directive)))
else:
matches = self.aug.match(("%s//*[self::directive=~regexp('%s')]/*"
"[self::arg=~regexp('%s')]" %
(start, directive, arg)))
incl_regex = "(%s)|(%s)" % (case_i('Include'),
case_i('IncludeOptional'))
includes = self.aug.match(("%s//* [self::directive=~regexp('%s')]/* "
"[label()='arg']" % (start, incl_regex)))
# for inc in includes:
# print inc, self.aug.get(inc)
for include in includes:
# start[6:] to strip off /files
matches.extend(self.find_dir(
directive, arg, self._get_include_path(
strip_dir(start[6:]), self.aug.get(include))))
return matches
def _get_include_path(self, cur_dir, arg):
"""Converts an Apache Include directive into Augeas path.
Converts an Apache Include directive argument into an Augeas
searchable path
.. todo:: convert to use os.path.join()
:param str cur_dir: current working directory
:param str arg: Argument of Include directive
:returns: Augeas path string
:rtype: str
"""
# Sanity check argument - maybe
# Question: what can the attacker do with control over this string
# Effect parse file... maybe exploit unknown errors in Augeas
# If the attacker can Include anything though... and this function
# only operates on Apache real config data... then the attacker has
# already won.
# Perhaps it is better to simply check the permissions on all
# included files?
# check_config to validate apache config doesn't work because it
# would create a race condition between the check and this input
# TODO: Maybe... although I am convinced we have lost if
# Apache files can't be trusted. The augeas include path
# should be made to be exact.
# Check to make sure only expected characters are used <- maybe remove
# validChars = re.compile("[a-zA-Z0-9.*?_-/]*")
# matchObj = validChars.match(arg)
# if matchObj.group() != arg:
# logging.error("Error: Invalid regexp characters in %s", arg)
# return []
# Standardize the include argument based on server root
if not arg.startswith("/"):
arg = cur_dir + arg
# conf/ is a special variable for ServerRoot in Apache
elif arg.startswith("conf/"):
arg = self.root + arg[5:]
# TODO: Test if Apache allows ../ or ~/ for Includes
# Attempts to add a transform to the file if one does not already exist
self._parse_file(arg)
# Argument represents an fnmatch regular expression, convert it
# Split up the path and convert each into an Augeas accepted regex
# then reassemble
if "*" in arg or "?" in arg:
split_arg = arg.split("/")
for idx, split in enumerate(split_arg):
# * and ? are the two special fnmatch characters
if "*" in split or "?" in split:
# Turn it into a augeas regex
# TODO: Can this instead be an augeas glob instead of regex
split_arg[idx] = ("* [label()=~regexp('%s')]" %
self.fnmatch_to_re(split))
# Reassemble the argument
arg = "/".join(split_arg)
# If the include is a directory, just return the directory as a file
if arg.endswith("/"):
return get_aug_path(arg[:len(arg)-1])
return get_aug_path(arg)
def fnmatch_to_re(self, clean_fn_match): # pylint: disable=no-self-use
"""Method converts Apache's basic fnmatch to regular expression.
:param str clean_fn_match: Apache style filename match, similar to globs
:returns: regex suitable for augeas
:rtype: str
"""
regex = ""
for letter in clean_fn_match:
if letter == '.':
regex = regex + r"\."
elif letter == '*':
regex = regex + ".*"
# According to apache.org ? shouldn't appear
# but in case it is valid...
elif letter == '?':
regex = regex + "."
else:
regex = regex + letter
return regex
def _parse_file(self, file_path):
"""Parse file with Augeas
Checks to see if file_path is parsed by Augeas
If file_path isn't parsed, the file is added and Augeas is reloaded
:param str file_path: Apache config file path
"""
# Test if augeas included file for Httpd.lens
# Note: This works for augeas globs, ie. *.conf
inc_test = self.aug.match(
"/augeas/load/Httpd/incl [. ='%s']" % file_path)
if not inc_test:
# Load up files
# self.httpd_incl.append(file_path)
# self.aug.add_transform("Httpd.lns",
# self.httpd_incl, None, self.httpd_excl)
self._add_httpd_transform(file_path)
self.aug.load()
def standardize_excl(self):
"""Standardize the excl arguments for the Httpd lens in Augeas.
Note: Hack!
Standardize the excl arguments for the Httpd lens in Augeas
Servers sometimes give incorrect defaults
Note: This problem should be fixed in Augeas 1.0. Unfortunately,
Augeas 0.10 appears to be the most popular version currently.
"""
# attempt to protect against augeas error in 0.10.0 - ubuntu
# *.augsave -> /*.augsave upon augeas.load()
# Try to avoid bad httpd files
# There has to be a better way... but after a day and a half of testing
# I had no luck
# This is a hack... work around... submit to augeas if still not fixed
excl = ["*.augnew", "*.augsave", "*.dpkg-dist", "*.dpkg-bak",
"*.dpkg-new", "*.dpkg-old", "*.rpmsave", "*.rpmnew",
"*~",
self.root + "*.augsave",
self.root + "*~",
self.root + "*/*augsave",
self.root + "*/*~",
self.root + "*/*/*.augsave",
self.root + "*/*/*~"]
for i in range(len(excl)):
self.aug.set("/augeas/load/Httpd/excl[%d]" % (i+1), excl[i])
self.aug.load()
def _add_httpd_transform(self, incl):
"""Add a transform to Augeas.
This function will correctly add a transform to augeas
The existing augeas.add_transform in python is broken.
:param str incl: TODO
"""
last_include = self.aug.match("/augeas/load/Httpd/incl [last()]")
self.aug.insert(last_include[0], "incl", False)
self.aug.set("/augeas/load/Httpd/incl[last()]", incl)
def _set_locations(self, ssl_options):
"""Set default location for directives.
Locations are given as file_paths
.. todo:: Make sure that files are included
"""
root = self._find_config_root()
default = self._set_user_config_file(root)
temp = os.path.join(self.root, "ports.conf")
if os.path.isfile(temp):
listen = temp
name = temp
else:
listen = default
name = default
return {"root": root, "default": default, "listen": listen,
"name": name, "ssl_options": ssl_options}
def _find_config_root(self):
"""Find the Apache Configuration Root file."""
location = ["apache2.conf", "httpd.conf"]
for name in location:
if os.path.isfile(os.path.join(self.root, name)):
return os.path.join(self.root, name)
raise errors.LetsEncryptConfiguratorError(
"Could not find configuration root")
def _set_user_config_file(self, root):
"""Set the appropriate user configuration file
.. todo:: This will have to be updated for other distros versions
:param str filename: optional filename that will be used as the
user config
"""
# Basic check to see if httpd.conf exists and
# in heirarchy via direct include
# httpd.conf was very common as a user file in Apache 2.2
if (os.path.isfile(os.path.join(self.root, 'httpd.conf')) and
self.find_dir(
case_i("Include"), case_i("httpd.conf"), root)):
return os.path.join(self.root, 'httpd.conf')
else:
return os.path.join(self.root + 'apache2.conf')
def case_i(string):
"""Returns case insensitive regex.
Returns a sloppy, but necessary version of a case insensitive regex.
Any string should be able to be submitted and the string is
escaped and then made case insensitive.
May be replaced by a more proper /i once augeas 1.0 is widely
supported.
:param str string: string to make case i regex
"""
return "".join(["["+c.upper()+c.lower()+"]"
if c.isalpha() else c for c in re.escape(string)])
def get_aug_path(file_path):
"""Return augeas path for full filepath.
:param str file_path: Full filepath
"""
return "/files%s" % file_path
def strip_dir(path):
"""Returns directory of file path.
.. todo:: Replace this with Python standard function
:param str path: path is a file path. not an augeas section or
directive path
:returns: directory
:rtype: str
"""
index = path.rfind("/")
if index > 0:
return path[:index+1]
# No directory
return ""

View file

@ -8,15 +8,12 @@ import time
import augeas
from letsencrypt.client import CONFIG
from letsencrypt.client import configurator
from letsencrypt.client import le_util
class AugeasConfigurator(configurator.Configurator):
class AugeasConfigurator(object):
"""Base Augeas Configurator class.
.. todo:: Fix generic exception handling.
:ivar aug: Augeas object
:type aug: :class:`augeas.Augeas`
@ -32,7 +29,6 @@ class AugeasConfigurator(configurator.Configurator):
(used mostly for testing)
"""
super(AugeasConfigurator, self).__init__()
if not direc:
direc = {"backup": CONFIG.BACKUP_DIR,
@ -291,8 +287,7 @@ class AugeasConfigurator(configurator.Configurator):
for idx, path in enumerate(filepaths):
shutil.copy2(os.path.join(
cp_dir,
os.path.basename(path) + '_' + str(idx)),
path)
os.path.basename(path) + '_' + str(idx)), path)
except (IOError, OSError):
# This file is required in all checkpoints.
logging.error("Unable to recover files from %s", cp_dir)
@ -329,7 +324,7 @@ class AugeasConfigurator(configurator.Configurator):
return True, ""
# pylint: disable=no-self-use
# pylint: disable=no-self-use, anomalous-backslash-in-string
def register_file_creation(self, temporary, *files):
"""Register the creation of all files during letsencrypt execution.

View file

@ -1,27 +1,23 @@
"""ACME protocol client class and helper functions."""
import collections
import csv
import json
import logging
import os
import shutil
import socket
import string
import sys
import time
import jsonschema
import M2Crypto
import requests
from letsencrypt.client import acme
from letsencrypt.client import apache_configurator
from letsencrypt.client import challenge
from letsencrypt.client import CONFIG
from letsencrypt.client import crypto_util
from letsencrypt.client import display
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client import network
# it's weird to point to chocolate servers via raw IPv6 addresses, and
@ -33,112 +29,71 @@ ALLOW_RAW_IPV6_SERVER = False
class Client(object):
"""ACME protocol client.
:ivar config: Configurator.
:type config: :class:`letsencrypt.client.configurator.Configurator`
:ivar str server: Certificate authority server
:ivar str server_url: Full URL of the CSR server
:ivar csr: Certificate Signing Request
:type csr: :class:`CSR`
:ivar network: Network object for sending and receiving messages
:type network: :class:`letsencrypt.client.network.Network`
:ivar list names: Domain names (:class:`list` of :class:`str`).
:ivar privkey: Private key
:type privkey: :class:`Key`
:ivar authkey: Authorization Key
:type authkey: :class:`letsencrypt.client.client.Client.Key`
:ivar bool use_curses: Use curses UI
:ivar auth: Object that supports the IAuthenticator interface.
:type auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
:ivar installer: Object supporting the IInstaller interface.
:type installer: :class:`letsencrypt.client.interfaces.IInstraller`
"""
Key = collections.namedtuple("Key", "file pem")
CSR = collections.namedtuple("CSR", "file data type")
CSR = collections.namedtuple("CSR", "file data form")
def __init__(self, server, csr=CSR(None, None, None),
privkey=Key(None, None), use_curses=True):
def __init__(self, server, names, authkey, auth, installer):
"""Initialize a client."""
self.server = server
self.server_url = "https://%s/acme/" % self.server
self.names = []
self.use_curses = use_curses
self.network = network.Network(server)
self.names = names
self.authkey = authkey
self.csr = csr
self.privkey = privkey
self._validate_csr_key_cli() # TODO: catch exceptions
sanity_check_names([server] + names)
# TODO: Can probably figure out which configurator to use
# without special packaging based on system info Command
# line arg or client function to discover
self.config = apache_configurator.ApacheConfigurator(
CONFIG.SERVER_ROOT)
self.auth = auth
self.installer = installer
def authenticate(self, domains=None, eula=False, redirect=None):
"""
def obtain_certificate(self, csr,
cert_path=CONFIG.CERT_PATH,
chain_path=CONFIG.CHAIN_PATH):
"""Obtains a certificate from the ACME server.
:param list domains: List of domains
:param bool eula: EULA accepted
:param csr: A valid CSR in DER format for the certificate the client
intends to receive.
:type csr: :class:`CSR`
:param redirect: If traffic should be forwarded from HTTP to HTTPS.
:type redirect: bool or None
:param str cert_path: Full desired path to end certificate.
:param str chain_path: Full desired path to end chain file.
:raises errors.LetsEncryptClientError: CSR does not contain one of the
specified names.
:returns: cert_file, chain_file (paths to respective files)
:rtype: `tuple` of `str`
"""
domains = [] if domains is None else domains
# Check configuration
if not self.config.config_test():
sys.exit(1)
# Display preview warning
if not eula:
with open('EULA') as eula_file:
if not display.generic_yesno(eula_file.read(),
"Agree", "Cancel"):
sys.exit(0)
# Display screen to select domains to validate
if domains:
sanity_check_names([self.server] + domains)
self.names = domains
else:
# This function adds all names
# found within the config to self.names
# Then filters them based on user selection
code, self.names = display.filter_names(self.get_all_names())
if code == display.OK and self.names:
# TODO: Allow multiple names once it is setup
self.names = [self.names[0]]
else:
sys.exit(0)
# Request Challenges
challenge_msg = self.acme_challenge()
# Make sure we have key and csr to perform challenges
self.init_key_csr()
# Perform Challenges
responses, challenge_objs = self.verify_identity(challenge_msg)
# Get Authorization
self.acme_authorization(challenge_msg, challenge_objs, responses)
# Retrieve certificate
certificate_dict = self.acme_certificate(self.csr.data)
certificate_dict = self.acme_certificate(csr.data)
# Find set of virtual hosts to deploy certificates to
vhost = self.get_virtual_hosts(self.names)
# Install Certificate
cert_file = self.install_certificate(certificate_dict, vhost)
# Perform optimal config changes
self.optimize_config(vhost, redirect)
self.config.save("Completed Let's Encrypt Authentication")
# Save Certificate
cert_file, chain_file = self.save_certificate(
certificate_dict, cert_path, chain_path)
self.store_cert_key(cert_file, False)
return cert_file, chain_file
def acme_challenge(self):
"""Handle ACME "challenge" phase.
@ -148,7 +103,7 @@ class Client(object):
:rtype: dict
"""
return self.send_and_receive_expected(
return self.network.send_and_receive_expected(
acme.challenge_request(self.names[0]), "challenge")
def acme_authorization(self, challenge_msg, chal_objs, responses):
@ -156,20 +111,21 @@ class Client(object):
:param dict challenge_msg: ACME "challenge" message.
:param chal_objs: TODO
:param chal_objs: TODO - this will be a new object...
:param responses: TODO
:returns: ACME "authorization" message.
:rtype: dict
"""
auth_dict = self.send(acme.authorization_request(
challenge_msg["sessionID"], self.names[0],
challenge_msg["nonce"], responses, self.privkey.pem))
try:
return self.is_expected_msg(auth_dict, "authorization")
except:
return self.network.send_and_receive_expected(
acme.authorization_request(
challenge_msg["sessionID"], self.names[0],
challenge_msg["nonce"], responses, self.authkey.pem),
"authorization")
except errors.LetsEncryptClientError as err:
logging.fatal(str(err))
logging.fatal(
"Failed Authorization procedure - cleaning up challenges")
sys.exit(1)
@ -186,207 +142,25 @@ class Client(object):
"""
logging.info("Preparing and sending CSR...")
return self.send_and_receive_expected(
acme.certificate_request(csr_der, self.privkey.pem), "certificate")
return self.network.send_and_receive_expected(
acme.certificate_request(csr_der, self.authkey.pem), "certificate")
def acme_revocation(self, cert):
"""Handle ACME "revocation" phase.
# pylint: disable=no-self-use
def save_certificate(self, certificate_dict, cert_path, chain_path):
"""Saves the certificate received from the ACME server.
:param dict cert: TODO
:param dict certificate_dict: certificate message from server
:param str cert_path: Path to attempt to save the cert file
:param str chain_path: Path to attempt to save the chain file
:returns: ACME "revocation" message.
:rtype: dict
:returns: cert_file, chain_file (absolute paths to the actual files)
:rtype: `tuple` of `str`
"""
cert_der = M2Crypto.X509.load_cert(cert["backup_cert_file"]).as_der()
with open(cert["backup_key_file"], 'rU') as backup_key_file:
key = backup_key_file.read()
revocation = self.send_and_receive_expected(
acme.revocation_request(cert_der, key), "revocation")
display.generic_notification(
"You have successfully revoked the certificate for "
"%s" % cert["cn"], width=70, height=9)
remove_cert_key(cert)
self.list_certs_keys()
return revocation
def send(self, msg):
"""Send ACME message to server.
:param dict msg: ACME message (JSON serializable).
:returns: Server response message.
:rtype: dict
:raises TypeError: if `msg` is not JSON serializable
:raises jsonschema.ValidationError: if not valid ACME message
:raises errors.LetsEncryptClientError: in case of connection error
or if response from server is not a valid ACME message.
"""
json_encoded = json.dumps(msg)
acme.acme_object_validate(json_encoded)
try:
response = requests.post(
self.server_url,
data=json_encoded,
headers={"Content-Type": "application/json"},
)
except requests.exceptions.RequestException as error:
raise errors.LetsEncryptClientError(
'Sending ACME message to server has failed: %s' % error)
try:
acme.acme_object_validate(response.content)
except ValueError:
raise errors.LetsEncryptClientError(
'Server did not send JSON serializable message')
except jsonschema.ValidationError as error:
raise errors.LetsEncryptClientError(
'Response from server is not a valid ACME message')
return response.json()
def send_and_receive_expected(self, msg, expected):
"""Send ACME message to server and return expected message.
:param dict msg: ACME message (JSON serializable).
:param str expected: Name of the expected response ACME message type.
:returns: ACME response message of expected type.
:rtype: dict
:raises errors.LetsEncryptClientError: An exception is thrown
"""
response = self.send(msg)
try:
return self.is_expected_msg(response, expected)
except: # TODO: too generic exception
raise errors.LetsEncryptClientError(
'Expected message (%s) not received' % expected)
def is_expected_msg(self, response, expected, delay=3, rounds=20):
"""Is reponse expected ACME message?
:param dict response: ACME response message from server.
:param str expected: Name of the expected response ACME message type.
:param int delay: Number of seconds to delay before next round
in case of ACME "defer" response message.
:param int rounds: Number of resend attempts in case of ACME "defer"
reponse message.
:returns: ACME response message from server.
:rtype: dict
:raises LetsEncryptClientError: if server sent ACME "error" message
"""
for _ in xrange(rounds):
if response["type"] == expected:
return response
elif response["type"] == "error":
logging.error(
"%s: %s - More Info: %s", response["error"],
response.get("message", ""), response.get("moreInfo", ""))
raise errors.LetsEncryptClientError(response["error"])
elif response["type"] == "defer":
logging.info("Waiting for %d seconds...", delay)
time.sleep(delay)
response = self.send(acme.status_request(response["token"]))
else:
logging.fatal("Received unexpected message")
logging.fatal("Expected: %s" % expected)
logging.fatal("Received: " + response)
sys.exit(33)
logging.error(
"Server has deferred past the max of %d seconds", rounds * delay)
def list_certs_keys(self):
"""List trusted Let's Encrypt certificates."""
list_file = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST")
certs = []
if not os.path.isfile(list_file):
logging.info(
"You don't have any certificates saved from letsencrypt")
return
c_sha1_vh = {}
for (cert, _, path) in self.config.get_all_certs_keys():
try:
c_sha1_vh[M2Crypto.X509.load_cert(
cert).get_fingerprint(md='sha1')] = path
except:
continue
with open(list_file, 'rb') as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
cert = crypto_util.get_cert_info(row[1])
b_k = os.path.join(CONFIG.CERT_KEY_BACKUP,
os.path.basename(row[2]) + "_" + row[0])
b_c = os.path.join(CONFIG.CERT_KEY_BACKUP,
os.path.basename(row[1]) + "_" + row[0])
cert.update({
"orig_key_file": row[2],
"orig_cert_file": row[1],
"idx": int(row[0]),
"backup_key_file": b_k,
"backup_cert_file": b_c,
"installed": c_sha1_vh.get(cert["fingerprint"], ""),
})
certs.append(cert)
if certs:
self.choose_certs(certs)
else:
display.generic_notification(
"There are not any trusted Let's Encrypt "
"certificates for this server.")
def choose_certs(self, certs):
"""Display choose certificates menu.
:param list certs: List of cert dicts.
"""
code, tag = display.display_certs(certs)
if code == display.OK:
cert = certs[tag]
if display.confirm_revocation(cert):
self.acme_revocation(cert)
else:
self.choose_certs(certs)
elif code == display.HELP:
cert = certs[tag]
display.more_info_cert(cert)
self.choose_certs(certs)
else:
exit(0)
def install_certificate(self, certificate_dict, vhost):
"""Install certificate
:returns: Path to a certificate file.
:rtype: str
:raises IOError: If unable to find room to write the cert files
"""
cert_chain_abspath = None
cert_fd, cert_file = le_util.unique_file(CONFIG.CERT_PATH, 0o644)
cert_fd, cert_file = le_util.unique_file(cert_path, 0o644)
cert_fd.write(
crypto_util.b64_cert_to_pem(certificate_dict["certificate"]))
cert_fd.close()
@ -394,7 +168,7 @@ class Client(object):
"Server issued certificate; certificate written to %s", cert_file)
if certificate_dict.get("chain", None):
chain_fd, chain_fn = le_util.unique_file(CONFIG.CHAIN_PATH, 0o644)
chain_fd, chain_fn = le_util.unique_file(chain_path, 0o644)
for cert in certificate_dict.get("chain", []):
chain_fd.write(crypto_util.b64_cert_to_pem(cert))
chain_fd.close()
@ -404,40 +178,56 @@ class Client(object):
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(chain_fn)
return os.path.abspath(cert_file), cert_chain_abspath
def deploy_certificate(self, privkey, cert_file, chain_file):
"""Install certificate
:returns: Path to a certificate file.
:rtype: str
"""
# Find set of virtual hosts to deploy certificates to
vhost = self.get_virtual_hosts(self.names)
chain = None if chain_file is None else os.path.abspath(chain_file)
for host in vhost:
self.config.deploy_cert(host,
os.path.abspath(cert_file),
os.path.abspath(self.privkey.file),
cert_chain_abspath)
self.installer.deploy_cert(host,
os.path.abspath(cert_file),
os.path.abspath(privkey.file),
chain)
# Enable any vhost that was issued to, but not enabled
if not host.enabled:
logging.info("Enabling Site %s", host.filep)
self.config.enable_site(host)
self.installer.enable_site(host)
self.installer.save("Deployed Let's Encrypt Certificate")
# sites may have been enabled / final cleanup
self.config.restart(quiet=self.use_curses)
self.installer.restart()
display.success_installation(self.names)
return cert_file
return vhost
def optimize_config(self, vhost, redirect=None):
"""Optimize the configuration.
.. todo:: Handle multiple vhosts
:param vhost: vhost to optimize
:type vhost: :class:`apache_configurator.VH`
:type vhost: :class:`letsencrypt.client.apache.obj.VirtualHost`
:param redirect: If traffic should be forwarded from HTTP to HTTPS.
:type redirect: bool or None
"""
# TODO: this should most definitely be moved to __init__
if redirect is None:
redirect = display.redirect_by_default()
if redirect:
self.redirect_to_ssl(vhost)
self.config.restart(quiet=self.use_curses)
self.installer.restart()
# if self.ocsp_stapling is None:
# q = ("Would you like to protect the privacy of your users "
@ -458,7 +248,7 @@ class Client(object):
logging.info("Cleaning up challenges...")
for chall in challenges:
if chall["type"] in CONFIG.CONFIG_CHALLENGES:
self.config.cleanup()
self.auth.cleanup()
else:
# Handle other cleanup if needed
pass
@ -490,11 +280,11 @@ class Client(object):
for i, c_obj in enumerate(challenge_objs):
resp = "null"
if c_obj["type"] in CONFIG.CONFIG_CHALLENGES:
resp = self.config.perform(c_obj)
resp = self.auth.perform(c_obj)
else:
# Handle RecoveryToken type challenges
pass
self._assign_responses(resp, indices[i], responses)
logging.info(
@ -502,6 +292,7 @@ class Client(object):
return responses, challenge_objs
# pylint: disable=no-self-use
def _assign_responses(self, resp, index_list, responses):
"""Assign chall_response to appropriate places in response list.
@ -513,14 +304,13 @@ class Client(object):
"""
if isinstance(resp, list):
assert(len(resp) == len(index_list))
assert len(resp) == len(index_list)
for j, index in enumerate(index_list):
responses[index] = resp[j]
else:
else:
for index in index_list:
responses[index] = resp
def store_cert_key(self, cert_file, encrypt=False):
"""Store certificate key.
@ -549,17 +339,17 @@ class Client(object):
for row in csvreader:
idx = int(row[0]) + 1
csvwriter = csv.writer(csvfile)
csvwriter.writerow([str(idx), cert_file, self.privkey.file])
csvwriter.writerow([str(idx), cert_file, self.authkey.file])
else:
with open(list_file, 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(["0", cert_file, self.privkey.file])
csvwriter.writerow(["0", cert_file, self.authkey.file])
shutil.copy2(self.privkey.file,
shutil.copy2(self.authkey.file,
os.path.join(
CONFIG.CERT_KEY_BACKUP,
os.path.basename(self.privkey.file) + "_" + str(idx)))
os.path.basename(self.authkey.file) + "_" + str(idx)))
shutil.copy2(cert_file,
os.path.join(
CONFIG.CERT_KEY_BACKUP,
@ -571,16 +361,16 @@ class Client(object):
"""Redirect all traffic from HTTP to HTTPS
:param vhost: list of ssl_vhosts
:type vhost: :class:`apache_configurator.VH`
:type vhost: :class:`letsencrypt.client.interfaces.IInstaller`
"""
for ssl_vh in vhost:
success, redirect_vhost = self.config.enable_redirect(ssl_vh)
success, redirect_vhost = self.installer.enable_redirect(ssl_vh)
logging.info(
"\nRedirect vhost: %s - %s ", redirect_vhost.filep, success)
# If successful, make sure redirect site is enabled
if success:
self.config.enable_site(redirect_vhost)
self.installer.enable_site(redirect_vhost)
def get_virtual_hosts(self, domains):
"""Retrieve the appropriate virtual host for the domain
@ -588,12 +378,12 @@ class Client(object):
:param list domains: Domains to find ssl vhosts for
:returns: associated vhosts
:rtype: :class:`apache_configurator.VH`
:rtype: :class:`letsencrypt.client.apache.obj.VirtualHost`
"""
vhost = set()
for name in domains:
host = self.config.choose_virtual_host(name)
host = self.installer.choose_virtual_host(name)
if host is not None:
vhost.add(host)
return vhost
@ -646,129 +436,95 @@ class Client(object):
challenge_objs.append({
"type": "dvsni",
"list_sni_tuple": sni_todo,
"dvsni_key": self.privkey,
"dvsni_key": self.authkey,
})
challenge_obj_indices.append(sni_satisfies)
logging.debug(sni_todo)
return challenge_objs, challenge_obj_indices
def init_key_csr(self):
"""Initializes privkey and csr.
Inits key and CSR using provided files or generating new files
if necessary. Both will be saved in PEM format on the
filesystem. The CSR is placed into DER format to allow
the namedtuple to easily work with the protocol.
def validate_key_csr(privkey, csr, names):
"""Validate CSR and key files.
"""
if not self.privkey.file:
key_pem = crypto_util.make_key(CONFIG.RSA_KEY_SIZE)
Verifies that the client key and csr arguments are valid and
correspond to one another.
# Save file
le_util.make_or_verify_dir(CONFIG.KEY_DIR, 0o700)
key_f, key_filename = le_util.unique_file(
os.path.join(CONFIG.KEY_DIR, "key-letsencrypt.pem"), 0o600)
key_f.write(key_pem)
key_f.close()
logging.info("Generating key: %s", key_filename)
self.privkey = Client.Key(key_filename, key_pem)
if not self.csr.file:
csr_pem, csr_der = crypto_util.make_csr(
self.privkey.pem, self.names)
# Save CSR
le_util.make_or_verify_dir(CONFIG.CERT_DIR, 0o755)
csr_f, csr_filename = le_util.unique_file(
os.path.join(CONFIG.CERT_DIR, "csr-letsencrypt.pem"), 0o644)
csr_f.write(csr_pem)
csr_f.close()
logging.info("Creating CSR: %s", csr_filename)
self.csr = Client.CSR(csr_filename, csr_der, "der")
elif self.csr.type != "der":
# The user is going to pass in a pem format file
# That is why we must conver it to der since the
# protocol uses der exclusively.
csr_obj = M2Crypto.X509.load_request_string(self.csr.data)
self.csr = Client.CSR(self.csr.file, csr_obj.as_der(), "der")
def _validate_csr_key_cli(self):
"""Validate CSR and key files.
Verifies that the client key and csr arguments are valid and
correspond to one another.
:raises LetsEncryptClientError: if validation fails
"""
# TODO: Handle all of these problems appropriately
# The client can eventually do things like prompt the user
# and allow the user to take more appropriate actions
# If CSR is provided, it must be readable and valid.
if self.csr.data and not crypto_util.valid_csr(self.csr.data):
raise errors.LetsEncryptClientError(
"The provided CSR is not a valid CSR")
# If key is provided, it must be readable and valid.
if (self.privkey.pem and
not crypto_util.valid_privkey(self.privkey.pem)):
raise errors.LetsEncryptClientError(
"The provided key is not a valid key")
# If CSR and key are provided, the key must be the same key used
# in the CSR.
if self.csr.data and self.privkey.pem:
if not crypto_util.csr_matches_pubkey(
self.csr.data, self.privkey.pem):
raise errors.LetsEncryptClientError(
"The key and CSR do not match")
def get_all_names(self):
"""Return all valid names in the configuration."""
names = list(self.config.get_all_names())
sanity_check_names(names)
if not names:
logging.fatal("No domain names were found in your apache config")
logging.fatal("Either specify which names you would like "
"letsencrypt to validate or add server names "
"to your virtual hosts")
sys.exit(1)
return names
def remove_cert_key(cert):
"""Remove certificate key.
:param dict cert:
:raises LetsEncryptClientError: if validation fails
"""
list_file = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST")
list_file2 = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST.tmp")
# TODO: Handle all of these problems appropriately
# The client can eventually do things like prompt the user
# and allow the user to take more appropriate actions
with open(list_file, 'rb') as orgfile:
csvreader = csv.reader(orgfile)
if csr.form == "der":
csr_obj = M2Crypto.X509.load_request_der_string(csr.data)
csr = Client.CSR(csr.file, csr_obj.as_pem(), "der")
with open(list_file2, 'wb') as newfile:
csvwriter = csv.writer(newfile)
# If CSR is provided, it must be readable and valid.
if csr.data and not crypto_util.valid_csr(csr.data):
raise errors.LetsEncryptClientError(
"The provided CSR is not a valid CSR")
for row in csvreader:
if not (row[0] == str(cert["idx"]) and
row[1] == cert["orig_cert_file"] and
row[2] == cert["orig_key_file"]):
csvwriter.writerow(row)
# If key is provided, it must be readable and valid.
if privkey.pem and not crypto_util.valid_privkey(privkey.pem):
raise errors.LetsEncryptClientError(
"The provided key is not a valid key")
shutil.copy2(list_file2, list_file)
os.remove(list_file2)
os.remove(cert["backup_cert_file"])
os.remove(cert["backup_key_file"])
# If CSR and key are provided, the key must be the same key used
# in the CSR.
if csr.data and privkey.pem:
if not crypto_util.csr_matches_pubkey(
csr.data, privkey.pem):
raise errors.LetsEncryptClientError(
"The key and CSR do not match")
def init_key():
"""Initializes privkey.
Inits key and CSR using provided files or generating new files
if necessary. Both will be saved in PEM format on the
filesystem. The CSR is placed into DER format to allow
the namedtuple to easily work with the protocol.
"""
key_pem = crypto_util.make_key(CONFIG.RSA_KEY_SIZE)
# Save file
le_util.make_or_verify_dir(CONFIG.KEY_DIR, 0o700)
key_f, key_filename = le_util.unique_file(
os.path.join(CONFIG.KEY_DIR, "key-letsencrypt.pem"), 0o600)
key_f.write(key_pem)
key_f.close()
logging.info("Generating key: %s", key_filename)
return Client.Key(key_filename, key_pem)
def init_csr(privkey, names):
"""Initialize a CSR with the given private key."""
csr_pem, csr_der = crypto_util.make_csr(privkey.pem, names)
# Save CSR
le_util.make_or_verify_dir(CONFIG.CERT_DIR, 0o755)
csr_f, csr_filename = le_util.unique_file(
os.path.join(CONFIG.CERT_DIR, "csr-letsencrypt.pem"), 0o644)
csr_f.write(csr_pem)
csr_f.close()
logging.info("Creating CSR: %s", csr_filename)
return Client.CSR(csr_filename, csr_der, "der")
def csr_pem_to_der(csr):
"""Convert pem CSR to der."""
csr_obj = M2Crypto.X509.load_request_string(csr.data)
return Client.CSR(csr.file, csr_obj.as_der(), "der")
def sanity_check_names(names):
@ -808,5 +564,5 @@ def is_hostname_sane(hostname):
# is this a valid IPv6 address?
socket.getaddrinfo(hostname, 443, socket.AF_INET6)
return True
except:
except socket.error:
return False

View file

@ -1,98 +0,0 @@
"""Configurator."""
class Configurator(object):
"""Generic Let's Encrypt configurator.
Class represents all possible webservers and configuration editors
This includes the generic webserver which wont have configuration
files at all, but instead create a new process to handle the DVSNI
and other challenges.
"""
def deploy_cert(self, vhost, cert, key, cert_chain=None):
"""Deploy certificate.
:param vhost
:param str cert: CSR
:param str key: Private key
"""
raise NotImplementedError()
def choose_virtual_host(self, name):
"""Chooses a virtual host based on a given domain name."""
raise NotImplementedError()
def get_all_names(self):
"""Returns all names found in the configuration."""
raise NotImplementedError()
def enable_redirect(self, ssl_vhost):
"""Redirect all traffic to the given ssl_vhost (port 80 => 443)."""
raise NotImplementedError()
def enable_hsts(self, ssl_vhost):
"""Enable HSTS on the given ssl_vhost."""
raise NotImplementedError()
def enable_ocsp_stapling(self, ssl_vhost):
"""Enable OCSP stapling on given ssl_vhost."""
raise NotImplementedError()
def get_all_certs_keys(self):
"""Retrieve all certs and keys set in configuration.
:returns: List of tuples with form [(cert, key, path)].
:rtype: list
"""
raise NotImplementedError()
def enable_site(self, vhost):
"""Enable the site at the given vhost."""
raise NotImplementedError()
def save(self, title=None, temporary=False):
"""Saves all changes to the configuration files.
Both title and temporary are needed because a save may be
intended to be permanent, but the save is not ready to be a full
checkpoint
:param str title: The title of the save. If a title is given, the
configuration will be saved as a new checkpoint and put in a
timestamped directory. `title` has no effect if temporary is true.
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (challenges)
"""
raise NotImplementedError()
def revert_challenge_config(self):
"""Reload the users original configuration files."""
raise NotImplementedError()
def rollback_checkpoints(self, rollback=1):
"""Revert `rollback` number of configuration checkpoints."""
raise NotImplementedError()
def display_checkpoints(self):
"""Display the saved configuration checkpoints."""
raise NotImplementedError()
def config_test(self):
"""Make sure the configuration is valid."""
raise NotImplementedError()
def restart(self, quiet=False):
"""Restart or refresh the server content."""
raise NotImplementedError()
def perform(self, chall_dict):
"""Perform the given challenge"""
raise NotImplementedError()
def cleanup(self):
"""Cleanup configuration changes from challenge."""
raise NotImplementedError()

View file

@ -47,8 +47,8 @@ def create_sig(msg, key_str, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
logging.debug('%s signed as %s', msg_with_nonce, signature)
n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].rstrip("L")))
e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].rstrip("L")))
n_bytes = binascii.unhexlify(_leading_zeros(hex(key.n)[2:].rstrip("L")))
e_bytes = binascii.unhexlify(_leading_zeros(hex(key.e)[2:].rstrip("L")))
return {
"nonce": le_util.jose_b64encode(nonce),
@ -62,7 +62,7 @@ def create_sig(msg, key_str, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
}
def leading_zeros(arg):
def _leading_zeros(arg):
if len(arg) % 2:
return "0" + arg
return arg

View file

@ -0,0 +1,91 @@
"""Interfaces."""
import zope.interface
class IAuthenticator(zope.interface.Interface):
"""Generic Let's Encrypt Authenticator.
Class represents all possible tools processes that have the
ability to perform challenges and attain a certificate.
"""
def perform(chall_dict):
"""Perform the given challenge"""
def cleanup():
"""Revert changes and shutdown after challenges complete."""
class IInstaller(zope.interface.Interface):
"""Generic Let's Encrypt Installer Interface.
Represents any server that an X509 certificate can be placed.
With a focus on HTTPS optimizations.
.. todo:: All optimizations should be of the form .enable("hsts")
This will make it general towards any optimization... we should also
define a function to glean what optimizations are available.
Perhaps with text that describes the optimizations...
"""
def get_all_names():
"""Returns all names that may be authenticated."""
def deploy_cert(vhost, cert, key, cert_chain=None):
"""Deploy certificate.
:param vhost
:param str cert: CSR
:param str key: Private key
"""
def choose_virtual_host(name):
"""Chooses a virtual host based on a given domain name."""
def enable_redirect(ssl_vhost):
"""Redirect all traffic to the given ssl_vhost (port 80 => 443)."""
def enable_hsts(ssl_vhost):
"""Enable HSTS on the given ssl_vhost."""
def enable_ocsp_stapling(ssl_vhost):
"""Enable OCSP stapling on given ssl_vhost."""
def get_all_certs_keys():
"""Retrieve all certs and keys set in configuration.
:returns: List of tuples with form [(cert, key, path)].
:rtype: list
"""
def enable_site(vhost):
"""Enable the site at the given vhost."""
def save(title=None, temporary=False):
"""Saves all changes to the configuration files.
Both title and temporary are needed because a save may be
intended to be permanent, but the save is not ready to be a full
checkpoint
:param str title: The title of the save. If a title is given, the
configuration will be saved as a new checkpoint and put in a
timestamped directory. `title` has no effect if temporary is true.
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (challenges)
"""
def rollback_checkpoints(rollback=1):
"""Revert `rollback` number of configuration checkpoints."""
def display_checkpoints():
"""Display the saved configuration checkpoints."""
def config_test():
"""Make sure the configuration is valid."""
def restart():
"""Restart or refresh the server content."""

View file

@ -0,0 +1,119 @@
"""Network Module."""
import json
import logging
import sys
import time
import jsonschema
import requests
from letsencrypt.client import acme
from letsencrypt.client import errors
class Network(object):
"""Class for communicating with ACME servers.
:ivar str server: Certificate authority server
:ivar str server_url: Full URL of the CSR server
"""
def __init__(self, server):
self.server = server
self.server_url = "https://%s/acme/" % self.server
def send(self, msg):
"""Send ACME message to server.
:param dict msg: ACME message (JSON serializable).
:returns: Server response message.
:rtype: dict
:raises TypeError: if `msg` is not JSON serializable
:raises jsonschema.ValidationError: if not valid ACME message
:raises errors.LetsEncryptClientError: in case of connection error
or if response from server is not a valid ACME message.
"""
json_encoded = json.dumps(msg)
acme.acme_object_validate(json_encoded)
try:
response = requests.post(
self.server_url,
data=json_encoded,
headers={"Content-Type": "application/json"},
)
except requests.exceptions.RequestException as error:
raise errors.LetsEncryptClientError(
'Sending ACME message to server has failed: %s' % error)
try:
acme.acme_object_validate(response.content)
except ValueError:
raise errors.LetsEncryptClientError(
'Server did not send JSON serializable message')
except jsonschema.ValidationError as error:
raise errors.LetsEncryptClientError(
'Response from server is not a valid ACME message')
return response.json()
def send_and_receive_expected(self, msg, expected):
"""Send ACME message to server and return expected message.
:param dict msg: ACME message (JSON serializable).
:param str expected: Name of the expected response ACME message type.
:returns: ACME response message of expected type.
:rtype: dict
:raises errors.LetsEncryptClientError: An exception is thrown
"""
response = self.send(msg)
try:
return self.is_expected_msg(response, expected)
except: # TODO: too generic exception
raise errors.LetsEncryptClientError(
'Expected message (%s) not received' % expected)
def is_expected_msg(self, response, expected, delay=3, rounds=20):
"""Is reponse expected ACME message?
:param dict response: ACME response message from server.
:param str expected: Name of the expected response ACME message type.
:param int delay: Number of seconds to delay before next round
in case of ACME "defer" response message.
:param int rounds: Number of resend attempts in case of ACME "defer"
reponse message.
:returns: ACME response message from server.
:rtype: dict
:raises LetsEncryptClientError: if server sent ACME "error" message
"""
for _ in xrange(rounds):
if response["type"] == expected:
return response
elif response["type"] == "error":
logging.error(
"%s: %s - More Info: %s", response["error"],
response.get("message", ""), response.get("moreInfo", ""))
raise errors.LetsEncryptClientError(response["error"])
elif response["type"] == "defer":
logging.info("Waiting for %d seconds...", delay)
time.sleep(delay)
response = self.send(acme.status_request(response["token"]))
else:
logging.fatal("Received unexpected message")
logging.fatal("Expected: %s", expected)
logging.fatal("Received: %s", response)
sys.exit(33)
logging.error(
"Server has deferred past the max of %d seconds", rounds * delay)

View file

@ -0,0 +1,137 @@
"""Revoker module to enable LE revocations."""
import csv
import logging
import os
import shutil
import M2Crypto
from letsencrypt.client import acme
from letsencrypt.client import CONFIG
from letsencrypt.client import crypto_util
from letsencrypt.client import display
from letsencrypt.client import network
class Revoker(object):
"""A revocation class for LE."""
def __init__(self, server, installer):
self.network = network.Network(server)
self.installer = installer
def acme_revocation(self, cert):
"""Handle ACME "revocation" phase.
:param dict cert: TODO
:returns: ACME "revocation" message.
:rtype: dict
"""
cert_der = M2Crypto.X509.load_cert(cert["backup_cert_file"]).as_der()
with open(cert["backup_key_file"], 'rU') as backup_key_file:
key = backup_key_file.read()
revocation = self.network.send_and_receive_expected(
acme.revocation_request(cert_der, key), "revocation")
display.generic_notification(
"You have successfully revoked the certificate for "
"%s" % cert["cn"], width=70, height=9)
self.remove_cert_key(cert)
self.list_certs_keys()
return revocation
def list_certs_keys(self):
"""List trusted Let's Encrypt certificates."""
list_file = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST")
certs = []
if not os.path.isfile(list_file):
logging.info(
"You don't have any certificates saved from letsencrypt")
return
c_sha1_vh = {}
for (cert, _, path) in self.installer.get_all_certs_keys():
try:
c_sha1_vh[M2Crypto.X509.load_cert(
cert).get_fingerprint(md='sha1')] = path
except:
continue
with open(list_file, 'rb') as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
cert = crypto_util.get_cert_info(row[1])
b_k = os.path.join(CONFIG.CERT_KEY_BACKUP,
os.path.basename(row[2]) + "_" + row[0])
b_c = os.path.join(CONFIG.CERT_KEY_BACKUP,
os.path.basename(row[1]) + "_" + row[0])
cert.update({
"orig_key_file": row[2],
"orig_cert_file": row[1],
"idx": int(row[0]),
"backup_key_file": b_k,
"backup_cert_file": b_c,
"installed": c_sha1_vh.get(cert["fingerprint"], ""),
})
certs.append(cert)
if certs:
self.choose_certs(certs)
else:
display.generic_notification(
"There are not any trusted Let's Encrypt "
"certificates for this server.")
def choose_certs(self, certs):
"""Display choose certificates menu.
:param list certs: List of cert dicts.
"""
code, tag = display.display_certs(certs)
if code == display.OK:
cert = certs[tag]
if display.confirm_revocation(cert):
self.acme_revocation(cert)
else:
self.choose_certs(certs)
elif code == display.HELP:
cert = certs[tag]
display.more_info_cert(cert)
self.choose_certs(certs)
else:
exit(0)
# pylint: disable=no-self-use
def remove_cert_key(self, cert):
"""Remove certificate and key.
:param dict cert: Cert dict used throughout revocation
"""
list_file = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST")
list_file2 = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST.tmp")
with open(list_file, 'rb') as orgfile:
csvreader = csv.reader(orgfile)
with open(list_file2, 'wb') as newfile:
csvwriter = csv.writer(newfile)
for row in csvreader:
if not (row[0] == str(cert["idx"]) and
row[1] == cert["orig_cert_file"] and
row[2] == cert["orig_key_file"]):
csvwriter.writerow(row)
shutil.copy2(list_file2, list_file)
os.remove(list_file2)
os.remove(cert["backup_cert_file"])
os.remove(cert["backup_key_file"])

View file

@ -1,22 +1,19 @@
"""Test for letsencrypt.client.apache_configurator."""
import os
import pkg_resources
import re
import shutil
import sys
import tempfile
import unittest
import mock
from letsencrypt.client import apache_configurator
from letsencrypt.client import CONFIG
from letsencrypt.client import display
from letsencrypt.client import errors
from letsencrypt.client.apache import configurator
from letsencrypt.client.apache import obj
from letsencrypt.client.apache import parser
UBUNTU_CONFIGS = pkg_resources.resource_filename(
__name__, "testdata/debian_apache_2_4")
from letsencrypt.client.tests import config_util
class TwoVhost80Test(unittest.TestCase):
@ -25,101 +22,36 @@ class TwoVhost80Test(unittest.TestCase):
def setUp(self):
display.set_display(display.NcursesDisplay())
self.temp_dir = os.path.join(
tempfile.mkdtemp("temp"), "debian_apache_2_4")
self.config_dir = tempfile.mkdtemp("config")
self.work_dir = tempfile.mkdtemp("work")
self.temp_dir, self.config_dir, self.work_dir = config_util.dir_setup(
"debian_apache_2_4/two_vhost_80")
shutil.copytree(UBUNTU_CONFIGS, self.temp_dir, symlinks=True)
temp_options = pkg_resources.resource_filename(
"letsencrypt.client", os.path.basename(CONFIG.OPTIONS_SSL_CONF))
shutil.copyfile(
temp_options, os.path.join(self.config_dir, "options-ssl.conf"))
self.ssl_options = config_util.setup_apache_ssl_options(self.config_dir)
# Final slash is currently important
self.config_path = os.path.join(self.temp_dir, "two_vhost_80/apache2/")
self.ssl_options = os.path.join(self.config_dir, "options-ssl.conf")
backups = os.path.join(self.work_dir, "backups")
self.config_path = os.path.join(
self.temp_dir, "debian_apache_2_4/two_vhost_80/apache2/")
with mock.patch("letsencrypt.client.apache_configurator."
"subprocess.Popen") as mock_popen:
# This just states that the ssl module is already loaded
mock_popen().communicate.return_value = ("ssl_module", "")
self.config = apache_configurator.ApacheConfigurator(
self.config_path,
{
"backup": backups,
"temp": os.path.join(self.work_dir, "temp_checkpoint"),
"progress": os.path.join(backups, "IN_PROGRESS"),
"config": self.config_dir,
"work": self.work_dir,
},
self.ssl_options,
(2, 4, 7))
self.config = config_util.get_apache_configurator(
self.config_path, self.config_dir, self.work_dir, self.ssl_options)
prefix = os.path.join(
self.temp_dir, "two_vhost_80/apache2/sites-available")
aug_pre = "/files" + prefix
self.vh_truth = [
apache_configurator.VH(
os.path.join(prefix, "encryption-example.conf"),
os.path.join(aug_pre, "encryption-example.conf/VirtualHost"),
["*:80"], False, True, ["encryption-example.demo"]),
apache_configurator.VH(
os.path.join(prefix, "default-ssl.conf"),
os.path.join(aug_pre, "default-ssl.conf/IfModule/VirtualHost"),
["_default_:443"], True, False),
apache_configurator.VH(
os.path.join(prefix, "000-default.conf"),
os.path.join(aug_pre, "000-default.conf/VirtualHost"),
["*:80"], False, True, ["ip-172-30-0-17"]),
apache_configurator.VH(
os.path.join(prefix, "letsencrypt.conf"),
os.path.join(aug_pre, "letsencrypt.conf/VirtualHost"),
["*:80"], False, True, ["letsencrypt.demo"]),
]
self.vh_truth = config_util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/two_vhost_80")
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_parse_file(self):
"""Test parse_file.
letsencrypt.conf is chosen as the test file as it will not be
included during the normal course of execution.
"""
file_path = os.path.join(
self.config_path, "sites-available", "letsencrypt.conf")
self.config._parse_file(file_path) # pylint: disable=protected-access
# search for the httpd incl
matches = self.config.aug.match(
"/augeas/load/Httpd/incl [. ='%s']" % file_path)
self.assertTrue(matches)
def test_get_all_names(self):
names = self.config.get_all_names()
self.assertEqual(set(names), set(
self.assertEqual(names, set(
['letsencrypt.demo', 'encryption-example.demo', 'ip-172-30-0-17']))
def test_find_directive(self):
test = self.config.find_directive(
apache_configurator.case_i("Listen"), "443")
# This will only look in enabled hosts
test2 = self.config.find_directive(
apache_configurator.case_i("documentroot"))
self.assertEqual(len(test), 2)
self.assertEqual(len(test2), 3)
def test_get_virtual_hosts(self):
vhs = self.config.get_virtual_hosts()
self.assertEqual(len(vhs), 4)
found = 0
for vhost in vhs:
for truth in self.vh_truth:
if vhost == truth:
@ -134,67 +66,45 @@ class TwoVhost80Test(unittest.TestCase):
self.assertTrue(self.config.is_site_enabled(self.vh_truth[2].filep))
self.assertTrue(self.config.is_site_enabled(self.vh_truth[3].filep))
def test_add_dir(self):
aug_default = "/files" + self.config.location["default"]
self.config.add_dir(
aug_default, "AddDirective", "test")
self.assertTrue(
self.config.find_directive("AddDirective", "test", aug_default))
def test_deploy_cert(self):
self.config.deploy_cert(
self.vh_truth[1],
"example/cert.pem", "example/key.pem", "example/cert_chain.pem")
loc_cert = self.config.find_directive(
apache_configurator.case_i("sslcertificatefile"),
loc_cert = self.config.parser.find_dir(
parser.case_i("sslcertificatefile"),
re.escape("example/cert.pem"), self.vh_truth[1].path)
loc_key = self.config.find_directive(
apache_configurator.case_i("sslcertificateKeyfile"),
loc_key = self.config.parser.find_dir(
parser.case_i("sslcertificateKeyfile"),
re.escape("example/key.pem"), self.vh_truth[1].path)
loc_chain = self.config.find_directive(
apache_configurator.case_i("SSLCertificateChainFile"),
loc_chain = self.config.parser.find_dir(
parser.case_i("SSLCertificateChainFile"),
re.escape("example/cert_chain.pem"), self.vh_truth[1].path)
# Verify one directive was found in the correct file
self.assertEqual(len(loc_cert), 1)
self.assertEqual(apache_configurator.get_file_path(loc_cert[0]),
self.assertEqual(configurator.get_file_path(loc_cert[0]),
self.vh_truth[1].filep)
self.assertEqual(len(loc_key), 1)
self.assertEqual(apache_configurator.get_file_path(loc_key[0]),
self.assertEqual(configurator.get_file_path(loc_key[0]),
self.vh_truth[1].filep)
self.assertEqual(len(loc_chain), 1)
self.assertEqual(apache_configurator.get_file_path(loc_chain[0]),
self.assertEqual(configurator.get_file_path(loc_chain[0]),
self.vh_truth[1].filep)
def test_is_name_vhost(self):
self.assertTrue(self.config.is_name_vhost("*:80"))
addr = obj.Addr.fromstring("*:80")
self.assertTrue(self.config.is_name_vhost(addr))
self.config.version = (2, 2)
self.assertFalse(self.config.is_name_vhost("*:80"))
self.assertFalse(self.config.is_name_vhost(addr))
def test_add_name_vhost(self):
self.config.add_name_vhost("*:443")
# self.config.save(temporary=True)
self.assertTrue(self.config.find_directive(
self.assertTrue(self.config.parser.find_dir(
"NameVirtualHost", re.escape("*:443")))
def test_add_dir_to_ifmodssl(self):
"""test _add_dir_to_ifmodssl.
Path must be valid before attempting to add to augeas
"""
self.config._add_dir_to_ifmodssl( # pylint: disable=protected-access
"/files" + self.config.location["default"], "FakeDirective", "123")
matches = self.config.find_directive("FakeDirective", "123")
self.assertEqual(len(matches), 1)
self.assertTrue("IfModule" in matches[0])
def test_make_vhost_ssl(self):
ssl_vhost = self.config.make_vhost_ssl(self.vh_truth[0])
@ -205,16 +115,17 @@ class TwoVhost80Test(unittest.TestCase):
self.assertEqual(ssl_vhost.path,
"/files" + ssl_vhost.filep + "/IfModule/VirtualHost")
self.assertEqual(ssl_vhost.addrs, ["*:443"])
self.assertEqual(ssl_vhost.names, ["encryption-example.demo"])
self.assertEqual(len(ssl_vhost.addrs), 1)
self.assertEqual(set([obj.Addr.fromstring("*:443")]), ssl_vhost.addrs)
self.assertEqual(ssl_vhost.names, set(["encryption-example.demo"]))
self.assertTrue(ssl_vhost.ssl)
self.assertFalse(ssl_vhost.enabled)
self.assertTrue(self.config.find_directive(
self.assertTrue(self.config.parser.find_dir(
"SSLCertificateFile", None, ssl_vhost.path))
self.assertTrue(self.config.find_directive(
self.assertTrue(self.config.parser.find_dir(
"SSLCertificateKeyFile", None, ssl_vhost.path))
self.assertTrue(self.config.find_directive(
self.assertTrue(self.config.parser.find_dir(
"Include", self.ssl_options, ssl_vhost.path))
self.assertEqual(self.config.is_name_vhost(self.vh_truth[0]),
@ -222,7 +133,7 @@ class TwoVhost80Test(unittest.TestCase):
self.assertEqual(len(self.config.vhosts), 5)
@mock.patch("letsencrypt.client.apache_configurator."
@mock.patch("letsencrypt.client.apache.configurator."
"subprocess.Popen")
def test_get_version(self, mock_popen):
mock_popen().communicate.return_value = (
@ -247,6 +158,5 @@ class TwoVhost80Test(unittest.TestCase):
self.assertRaises(
errors.LetsEncryptConfiguratorError, self.config.get_version)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,61 @@
import unittest
from letsencrypt.client.apache import obj
class AddrTest(unittest.TestCase):
"""Test the Addr class."""
def setUp(self):
self.addr1 = obj.Addr.fromstring("192.168.1.1")
self.addr2 = obj.Addr.fromstring("192.168.1.1:*")
self.addr3 = obj.Addr.fromstring("192.168.1.1:80")
def test_fromstring(self):
self.assertEqual(self.addr1.get_addr(), "192.168.1.1")
self.assertEqual(self.addr1.get_port(), "")
self.assertEqual(self.addr2.get_addr(), "192.168.1.1")
self.assertEqual(self.addr2.get_port(), "*")
self.assertEqual(self.addr3.get_addr(), "192.168.1.1")
self.assertEqual(self.addr3.get_port(), "80")
def test_str(self):
self.assertEqual(str(self.addr1), "192.168.1.1")
self.assertEqual(str(self.addr2), "192.168.1.1:*")
self.assertEqual(str(self.addr3), "192.168.1.1:80")
def test_get_addr_obj(self):
self.assertEqual(str(self.addr1.get_addr_obj("443")), "192.168.1.1:443")
self.assertEqual(str(self.addr2.get_addr_obj("")), "192.168.1.1")
self.assertEqual(str(self.addr1.get_addr_obj("*")), "192.168.1.1:*")
def test_eq(self):
self.assertEqual(self.addr1, self.addr2.get_addr_obj(""))
self.assertNotEqual(self.addr1, self.addr2)
# This is specifically designed to hit line 28 but coverage denies me
# the satisfaction :(
self.assertNotEqual(self.addr1, 3333)
def test_set_inclusion(self):
set_a = set([self.addr1, self.addr2])
addr1b = obj.Addr.fromstring("192.168.1.1")
addr2b = obj.Addr.fromstring("192.168.1.1:*")
set_b = set([addr1b, addr2b])
self.assertEqual(set_a, set_b)
class VirtualHostTest(unittest.TestCase):
"""Test the VirtualHost class."""
def setUp(self):
self.vhost1 = obj.VirtualHost(
"filep", "vh_path",
set([obj.Addr.fromstring("localhost")]), False, False)
def test_eq(self):
vhost1b = obj.VirtualHost(
"filep", "vh_path",
set([obj.Addr.fromstring("localhost")]), False, False)
self.assertEqual(vhost1b, self.vhost1)
self.assertEqual(str(vhost1b), str(self.vhost1))
self.assertNotEqual(vhost1b, 1234)

View file

@ -0,0 +1,112 @@
import os
import shutil
import sys
import unittest
import augeas
import mock
from letsencrypt.client import display
from letsencrypt.client import errors
from letsencrypt.client.apache import parser
from letsencrypt.client.tests import config_util
class ApacheParserTest(unittest.TestCase):
def setUp(self):
display.set_display(display.FileDisplay(sys.stdout))
self.temp_dir, self.config_dir, self.work_dir = config_util.dir_setup(
"debian_apache_2_4/two_vhost_80")
self.ssl_options = config_util.setup_apache_ssl_options(self.config_dir)
# Final slash is currently important
self.config_path = os.path.join(
self.temp_dir, "debian_apache_2_4/two_vhost_80/apache2/")
self.parser = parser.ApacheParser(
augeas.Augeas(flags=augeas.Augeas.NONE),
self.config_path, self.ssl_options)
def tearDown(self):
shutil.rmtree(self.temp_dir)
shutil.rmtree(self.config_dir)
shutil.rmtree(self.work_dir)
def test_parse_file(self):
"""Test parse_file.
letsencrypt.conf is chosen as the test file as it will not be
included during the normal course of execution.
"""
file_path = os.path.join(
self.config_path, "sites-available", "letsencrypt.conf")
# pylint: disable=protected-access
self.parser._parse_file(file_path)
# search for the httpd incl
matches = self.parser.aug.match(
"/augeas/load/Httpd/incl [. ='%s']" % file_path)
self.assertTrue(matches)
def test_find_dir(self):
test = self.parser.find_dir(parser.case_i("Listen"), "443")
# This will only look in enabled hosts
test2 = self.parser.find_dir(
parser.case_i("documentroot"))
self.assertEqual(len(test), 2)
self.assertEqual(len(test2), 3)
def test_add_dir(self):
aug_default = "/files" + self.parser.loc["default"]
self.parser.add_dir(aug_default, "AddDirective", "test")
self.assertTrue(
self.parser.find_dir("AddDirective", "test", aug_default))
self.parser.add_dir(aug_default, "AddList", ["1", "2", "3", "4"])
matches = self.parser.find_dir("AddList", None, aug_default)
for i, match in enumerate(matches):
self.assertEqual(self.parser.aug.get(match), str(i + 1))
def test_add_dir_to_ifmodssl(self):
"""test add_dir_to_ifmodssl.
Path must be valid before attempting to add to augeas
"""
self.parser.add_dir_to_ifmodssl(
parser.get_aug_path(self.parser.loc["default"]),
"FakeDirective", "123")
matches = self.parser.find_dir("FakeDirective", "123")
self.assertEqual(len(matches), 1)
self.assertTrue("IfModule" in matches[0])
def test_get_aug_path(self):
self.assertEqual(
"/files/etc/apache", parser.get_aug_path("/etc/apache"))
def test_set_locations(self):
with mock.patch("letsencrypt.client.apache.parser."
"os.path") as mock_path:
mock_path.isfile.return_value = False
# pylint: disable=protected-access
self.assertRaises(errors.LetsEncryptConfiguratorError,
self.parser._set_locations, self.ssl_options)
mock_path.isfile.side_effect = [True, False, False]
# pylint: disable=protected-access
results = self.parser._set_locations(self.ssl_options)
self.assertEqual(results["default"], results["listen"])
self.assertEqual(results["default"], results["name"])

View file

@ -0,0 +1,93 @@
import os
import pkg_resources
import shutil
import tempfile
import mock
from letsencrypt.client import CONFIG
from letsencrypt.client.apache import configurator
from letsencrypt.client.apache import obj
def dir_setup(test_dir="debian_apache_2_4/two_vhost_80"):
"""Setup the directories necessary for the configurator."""
temp_dir = tempfile.mkdtemp("temp")
config_dir = tempfile.mkdtemp("config")
work_dir = tempfile.mkdtemp("work")
test_configs = pkg_resources.resource_filename(
__name__, "testdata/%s" % test_dir)
shutil.copytree(
test_configs, os.path.join(temp_dir, test_dir), symlinks=True)
return temp_dir, config_dir, work_dir
def setup_apache_ssl_options(config_dir):
"""Move the ssl_options into position and return the path."""
option_path = os.path.join(config_dir, "options-ssl.conf")
temp_options = pkg_resources.resource_filename(
"letsencrypt.client", os.path.basename(CONFIG.OPTIONS_SSL_CONF))
shutil.copyfile(
temp_options, option_path)
return option_path
def get_apache_configurator(
config_path, config_dir, work_dir, ssl_options, version=(2, 4, 7)):
"""Create an Apache Configurator with the specified options."""
backups = os.path.join(work_dir, "backups")
with mock.patch("letsencrypt.client.apache.configurator."
"subprocess.Popen") as mock_popen:
# This just states that the ssl module is already loaded
mock_popen().communicate.return_value = ("ssl_module", "")
config = configurator.ApacheConfigurator(
config_path,
{
"backup": backups,
"temp": os.path.join(work_dir, "temp_checkpoint"),
"progress": os.path.join(backups, "IN_PROGRESS"),
"config": config_dir,
"work": work_dir,
},
ssl_options,
version)
return config
def get_vh_truth(temp_dir, config_name):
"""Return the ground truth for the specified directory."""
if config_name == "debian_apache_2_4/two_vhost_80":
prefix = os.path.join(
temp_dir, config_name, "apache2/sites-available")
aug_pre = "/files" + prefix
vh_truth = [
obj.VirtualHost(
os.path.join(prefix, "encryption-example.conf"),
os.path.join(aug_pre, "encryption-example.conf/VirtualHost"),
set([obj.Addr.fromstring("*:80")]),
False, True, set(["encryption-example.demo"])),
obj.VirtualHost(
os.path.join(prefix, "default-ssl.conf"),
os.path.join(aug_pre, "default-ssl.conf/IfModule/VirtualHost"),
set([obj.Addr.fromstring("_default_:443")]), True, False),
obj.VirtualHost(
os.path.join(prefix, "000-default.conf"),
os.path.join(aug_pre, "000-default.conf/VirtualHost"),
set([obj.Addr.fromstring("*:80")]), False, True,
set(["ip-172-30-0-17"])),
obj.VirtualHost(
os.path.join(prefix, "letsencrypt.conf"),
os.path.join(aug_pre, "letsencrypt.conf/VirtualHost"),
set([obj.Addr.fromstring("*:80")]), False, True,
set(["letsencrypt.demo"])),
]
return vh_truth
return None

View file

@ -5,11 +5,14 @@ import logging
import os
import sys
from letsencrypt.client import apache_configurator
from letsencrypt.client import CONFIG
from letsencrypt.client import client
from letsencrypt.client import display
from letsencrypt.client import interfaces
from letsencrypt.client import errors
from letsencrypt.client import log
from letsencrypt.client import revoker
from letsencrypt.client.apache import configurator
def main():
@ -62,9 +65,39 @@ def main():
# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) # TODO: --log
logger.setLevel(logging.INFO)
if args.use_curses:
logger.addHandler(log.DialogHandler())
display.set_display(display.NcursesDisplay())
else:
display.set_display(display.FileDisplay(sys.stdout))
installer = determine_installer()
server = CONFIG.ACME_SERVER if args.server is None else args.server
if args.revoke:
revoc = revoker.Revoker(server, installer)
revoc.list_certs_keys()
sys.exit()
if args.rollback > 0:
rollback(installer, args.rollback)
sys.exit()
if args.view_checkpoints:
view_checkpoints(installer)
sys.exit()
# Use the same object if possible
if interfaces.IAuthenticator.providedBy(installer):
auth = installer
else:
auth = determine_authenticator()
if not args.eula:
display_eula()
domains = choose_names(installer) if args.domains is None else args.domains
# Enforce '--privkey' is set along with '--csr'.
if args.csr and not args.privkey:
@ -72,36 +105,88 @@ def main():
"with the certificate signing request file (--csr)"
.format(os.linesep))
if args.use_curses:
display.set_display(display.NcursesDisplay())
else:
display.set_display(display.FileDisplay(sys.stdout))
if args.rollback > 0:
rollback(apache_configurator.ApacheConfigurator(), args.rollback)
sys.exit()
if args.view_checkpoints:
view_checkpoints(apache_configurator.ApacheConfigurator())
sys.exit()
server = args.server is None and CONFIG.ACME_SERVER or args.server
# Prepare for init of Client
if args.privkey is None:
privkey = client.Client.Key(None, None)
privkey = client.init_key()
else:
privkey = client.Client.Key(args.privkey[0], args.privkey[1])
if args.csr is None:
csr = client.Client.CSR(None, None, None)
csr = client.init_csr(privkey, domains)
else:
csr = client.Client.CSR(args.csr[0], args.csr[1], "pem")
csr = client.csr_pem_to_der(
client.Client.CSR(args.csr[0], args.csr[1], "pem"))
acme = client.Client(server, csr, privkey, args.use_curses)
if args.revoke:
acme.list_certs_keys()
acme = client.Client(server, domains, privkey, auth, installer)
# Validate the key and csr
client.validate_key_csr(privkey, csr, domains)
cert_file, chain_file = acme.obtain_certificate(csr)
vhost = acme.deploy_certificate(privkey, cert_file, chain_file)
acme.optimize_config(vhost, args.redirect)
def display_eula():
"""Displays the end user agreement."""
with open('EULA') as eula_file:
if not display.generic_yesno(
eula_file.read(), "Agree", "Cancel"):
sys.exit(0)
def choose_names(installer):
"""Display screen to select domains to validate.
:param installer: An installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
# This function adds all names
# found within the config to self.names
# Then filters them based on user selection
code, names = display.filter_names(get_all_names(installer))
if code == display.OK and names:
# TODO: Allow multiple names once it is setup
return [names[0]]
else:
acme.authenticate(args.domains, args.eula, args.redirect)
sys.exit(0)
def get_all_names(installer):
"""Return all valid names in the configuration.
:param installer: An installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
names = list(installer.get_all_names())
client.sanity_check_names(names)
if not names:
logging.fatal("No domain names were found in your installation")
logging.fatal("Either specify which names you would like "
"letsencrypt to validate or add server names "
"to your virtual hosts")
sys.exit(1)
return names
# This should be controlled by commandline parameters
def determine_authenticator():
"""Returns a valid authenticator."""
try:
return configurator.ApacheConfigurator()
except errors.LetsEncryptConfiguratorError:
logging.info("Unable to find a way to authenticate.")
def determine_installer():
"""Returns a valid installer if one exists."""
try:
return configurator.ApacheConfigurator()
except errors.LetsEncryptConfiguratorError:
logging.info("Unable to find a way to install the certificate.")
def read_file(filename):
@ -143,6 +228,5 @@ def view_checkpoints(config):
"""
config.display_checkpoints()
if __name__ == "__main__":
main()

View file

@ -11,6 +11,7 @@ install_requires = [
'python-augeas',
'python2-pythondialog',
'requests',
'zope.interface',
]
docs_extras = [
@ -35,6 +36,7 @@ setup(
packages=[
'letsencrypt',
'letsencrypt.client',
'letsencrypt.client.apache',
'letsencrypt.client.tests',
'letsencrypt.scripts',
],

View file

@ -14,7 +14,7 @@ commands =
[testenv:cover]
commands =
python setup.py dev
python setup.py nosetests --with-coverage --cover-min-percentage=44
python setup.py nosetests --with-coverage --cover-min-percentage=47
[testenv:lint]
commands =