Major refactor of client and main

This commit is contained in:
James Kasten 2014-12-17 06:27:21 -08:00
parent edbe0f451d
commit 083024cb86
8 changed files with 374 additions and 327 deletions

View file

@ -8,21 +8,18 @@ import socket
import subprocess
import sys
import zope.interface
from letsencrypt.client import augeas_configurator
from letsencrypt.client import challenge_util
from letsencrypt.client import CONFIG
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client.apache import obj
from letsencrypt.client.apache import parser
# Configurator should be turned into a Singleton
# Note: Apache 2.4 NameVirtualHost directive is deprecated... all vhost twins
# are considered name based vhosts by default. The use of the directive will
# emit a warning.
# TODO: Augeas sections ie. <VirtualHost>, <IfModule> beginning and closing
# tags need to be the same case, otherwise Augeas doesn't recognize them.
# This is not able to be completely remedied by regular expressions because
@ -75,6 +72,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
:ivar dict assoc: Mapping between domains and vhosts
"""
zope.interface.implements(interfaces.IAuthenticator, interfaces.IInstaller)
def __init__(self, server_root=CONFIG.SERVER_ROOT, direc=None,
ssl_options=CONFIG.OPTIONS_SSL_CONF, version=None):
"""Initialize an Apache Configurator.
@ -131,9 +130,6 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# if it is desired. There may be instances where correct configuration
# isn't required on startup.
# TODO: This function can be improved to ensure that the final directives
# are being modified whether that be in the include files or in the
# virtualhost declaration - these directives can be overwritten
def deploy_cert(self, vhost, cert, key, cert_chain=None):
"""Deploys certificate to specified virtual host.
@ -765,11 +761,13 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return vhost
return None
# TODO - both of these
# TODO: Handle ths as outlined in Interfaces.
def enable_ocsp_stapling(self, ssl_vhost):
"""Enable OCSP Stapling."""
return False
def enable_hsts(self, ssl_vhost):
"""Enable HSTS."""
return False
def get_all_certs_keys(self):
@ -848,7 +846,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return True
return False
def restart(self, quiet=False): # pylint: disable=no-self-use
def restart(self): # pylint: disable=no-self-use
"""Restarts apache server.
:returns: Success
@ -1009,7 +1007,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Save reversible changes and restart the server
self.save("SNI Challenge", True)
self.restart(True)
self.restart()
return responses
@ -1017,7 +1015,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""Revert all challenges."""
self.revert_challenge_config()
self.restart(True)
self.restart()
# TODO: Variable names
def dvsni_mod_config(self, list_sni_tuple, dvsni_key,

View file

@ -8,11 +8,10 @@ import time
import augeas
from letsencrypt.client import CONFIG
from letsencrypt.client import configurator
from letsencrypt.client import le_util
class AugeasConfigurator(configurator.Configurator):
class AugeasConfigurator(object):
"""Base Augeas Configurator class.
:ivar aug: Augeas object
@ -289,8 +288,7 @@ class AugeasConfigurator(configurator.Configurator):
for idx, path in enumerate(filepaths):
shutil.copy2(os.path.join(
cp_dir,
os.path.basename(path) + '_' + str(idx)),
path)
os.path.basename(path) + '_' + str(idx)), path)
except (IOError, OSError):
# This file is required in all checkpoints.
logging.error("Unable to recover files from %s", cp_dir)
@ -327,7 +325,7 @@ class AugeasConfigurator(configurator.Configurator):
return True, ""
# pylint: disable=no-self-use
# pylint: disable=no-self-use, anomalous-backslash-in-string
def register_file_creation(self, temporary, *files):
"""Register the creation of all files during letsencrypt execution.

View file

@ -15,7 +15,6 @@ import M2Crypto
import requests
from letsencrypt.client import acme
from letsencrypt.client import apache_configurator
from letsencrypt.client import challenge
from letsencrypt.client import CONFIG
from letsencrypt.client import crypto_util
@ -30,120 +29,77 @@ from letsencrypt.client import le_util
ALLOW_RAW_IPV6_SERVER = False
# TODO: Look up sphinx doc for an interface
class Client(object):
"""ACME protocol client.
:ivar config: Configurator.
:type config: :class:`letsencrypt.client.configurator.Configurator`
:ivar str server: Certificate authority server
:ivar str server_url: Full URL of the CSR server
:ivar csr: Certificate Signing Request
:type csr: :class:`CSR`
:ivar list names: Domain names (:class:`list` of :class:`str`).
:ivar privkey: Private key
:type privkey: :class:`Key`
:ivar auth: Object that supports the IAuthenticator interface.
:type auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
:ivar bool use_curses: Use curses UI
:ivar installer: Object supporting the IInstaller interface.
:type installer: :class:`letsencrypt.client.interfaces.IInstraller`
"""
Key = collections.namedtuple("Key", "file pem")
CSR = collections.namedtuple("CSR", "file data type")
CSR = collections.namedtuple("CSR", "file data form")
def __init__(self, server, csr=CSR(None, None, None),
privkey=Key(None, None), use_curses=True):
def __init__(self, server, names, authkey, auth, installer):
"""Initialize a client."""
self.server = server
self.server_url = "https://%s/acme/" % self.server
self.names = []
self.use_curses = use_curses
self.names = names
self.authkey = authkey
self.csr = csr
self.privkey = privkey
self._validate_csr_key_cli() # TODO: catch exceptions
sanity_check_names([server] + names)
# TODO: Can probably figure out which configurator to use
# without special packaging based on system info Command
# line arg or client function to discover
self.config = apache_configurator.ApacheConfigurator(
CONFIG.SERVER_ROOT)
self.auth = auth
self.installer = installer
def authenticate(self, domains=None, eula=False, redirect=None):
"""
def obtain_certificate(self, privkey, csr,
cert_path=CONFIG.CERT_PATH,
chain_path=CONFIG.CHAIN_PATH):
"""Obtains a certificate from the ACME server.
:param list domains: List of domains
:param bool eula: EULA accepted
.. todo:: Check for case when privkey is not authkey and adjust
this function accordingly.
:param redirect: If traffic should be forwarded from HTTP to HTTPS.
:type redirect: bool or None
:param privkey: A valid private key that corresponds to the csr
:type privkey: :class:`Key`
:raises errors.LetsEncryptClientError: CSR does not contain one of the
specified names.
:param csr: A valid CSR in der format that corresponds to privkey
:type csr: :class:`CSR`
:param str cert_path: Full desired path to end certificate.
:param str chain_path: Full desired path to end chain file.
:returns: cert_file, chain_file (paths to respective files)
:rtype: `tuple` of `str`
"""
domains = [] if domains is None else domains
# Check configuration
if not self.config.config_test():
sys.exit(1)
# Display preview warning
if not eula:
with open('EULA') as eula_file:
if not display.generic_yesno(eula_file.read(),
"Agree", "Cancel"):
sys.exit(0)
# Display screen to select domains to validate
if domains:
sanity_check_names([self.server] + domains)
self.names = domains
else:
# This function adds all names
# found within the config to self.names
# Then filters them based on user selection
code, self.names = display.filter_names(self.get_all_names())
if code == display.OK and self.names:
# TODO: Allow multiple names once it is setup
self.names = [self.names[0]]
else:
sys.exit(0)
# Request Challenges
challenge_msg = self.acme_challenge()
# Make sure we have key and csr to perform challenges
self.init_key_csr()
# TODO: Handle this exception/problem
if not crypto_util.csr_matches_names(self.csr.data, self.names):
raise errors.LetsEncryptClientError(
"CSR subject does not contain one of the specified names")
# Perform Challenges
responses, challenge_objs = self.verify_identity(challenge_msg)
# Get Authorization
self.acme_authorization(challenge_msg, challenge_objs, responses)
# Retrieve certificate
certificate_dict = self.acme_certificate(self.csr.data)
certificate_dict = self.acme_certificate(csr.data)
# Find set of virtual hosts to deploy certificates to
vhost = self.get_virtual_hosts(self.names)
# Install Certificate
cert_file = self.install_certificate(certificate_dict, vhost)
# Perform optimal config changes
self.optimize_config(vhost, redirect)
self.config.save("Completed Let's Encrypt Authentication")
# Save Certificate
cert_file, chain_file = self.save_certificate(
certificate_dict, cert_path, chain_path)
self.store_cert_key(cert_file, False)
return cert_file, chain_file
def acme_challenge(self):
"""Handle ACME "challenge" phase.
@ -161,7 +117,7 @@ class Client(object):
:param dict challenge_msg: ACME "challenge" message.
:param chal_objs: TODO
:param chal_objs: TODO - this will be a new object...
:param responses: TODO
:returns: ACME "authorization" message.
@ -170,7 +126,7 @@ class Client(object):
"""
auth_dict = self.send(acme.authorization_request(
challenge_msg["sessionID"], self.names[0],
challenge_msg["nonce"], responses, self.privkey.pem))
challenge_msg["nonce"], responses, self.authkey.pem))
try:
return self.is_expected_msg(auth_dict, "authorization")
@ -192,7 +148,7 @@ class Client(object):
"""
logging.info("Preparing and sending CSR...")
return self.send_and_receive_expected(
acme.certificate_request(csr_der, self.privkey.pem), "certificate")
acme.certificate_request(csr_der, self.authkey.pem), "certificate")
def acme_revocation(self, cert):
"""Handle ACME "revocation" phase.
@ -280,12 +236,9 @@ class Client(object):
"""Is reponse expected ACME message?
:param dict response: ACME response message from server.
:param str expected: Name of the expected response ACME message type.
:param int delay: Number of seconds to delay before next round
in case of ACME "defer" response message.
:param int rounds: Number of resend attempts in case of ACME "defer"
reponse message.
@ -312,7 +265,7 @@ class Client(object):
else:
logging.fatal("Received unexpected message")
logging.fatal("Expected: %s" % expected)
logging.fatal("Received: " + response)
logging.fatal("Received: %s" % response)
sys.exit(33)
logging.error(
@ -329,7 +282,7 @@ class Client(object):
return
c_sha1_vh = {}
for (cert, _, path) in self.config.get_all_certs_keys():
for (cert, _, path) in self.installer.get_all_certs_keys():
try:
c_sha1_vh[M2Crypto.X509.load_cert(
cert).get_fingerprint(md='sha1')] = path
@ -369,7 +322,7 @@ class Client(object):
"""
code, tag = display.display_certs(certs)
if code == display.OK:
cert = certs[tag]
if display.confirm_revocation(cert):
@ -383,15 +336,21 @@ class Client(object):
else:
exit(0)
def install_certificate(self, certificate_dict, vhost):
"""Install certificate
def save_certificate(self, certificate_dict, cert_path, chain_path):
"""Saves the certificate received from the ACME server.
:returns: Path to a certificate file.
:rtype: str
:param dict certificate_dict: certificate message from server
:param str cert_path: Path to attempt to save the cert file
:param str chain_path: Path to attempt to save the chain file
:returns: cert_file, chain_file (absolute paths to the actual files)
:rtype: `tuple` of `str`
:raises IOError: If unable to find room to write the cert files
"""
cert_chain_abspath = None
cert_fd, cert_file = le_util.unique_file(CONFIG.CERT_PATH, 0o644)
cert_fd, cert_file = le_util.unique_file(cert_path, 0o644)
cert_fd.write(
crypto_util.b64_cert_to_pem(certificate_dict["certificate"]))
cert_fd.close()
@ -399,7 +358,7 @@ class Client(object):
"Server issued certificate; certificate written to %s", cert_file)
if certificate_dict.get("chain", None):
chain_fd, chain_fn = le_util.unique_file(CONFIG.CHAIN_PATH, 0o644)
chain_fd, chain_fn = le_util.unique_file(chain_path, 0o644)
for cert in certificate_dict.get("chain", []):
chain_fd.write(crypto_util.b64_cert_to_pem(cert))
chain_fd.close()
@ -409,26 +368,43 @@ class Client(object):
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(chain_fn)
return os.path.abspath(cert_file), cert_chain_abspath
def deploy_certificate(self, privkey, cert_file, chain_file):
"""Install certificate
:returns: Path to a certificate file.
:rtype: str
"""
# Find set of virtual hosts to deploy certificates to
vhost = self.get_virtual_hosts(self.names)
chain = None if chain_file is None else os.path.abspath(chain_file)
for host in vhost:
self.config.deploy_cert(host,
os.path.abspath(cert_file),
os.path.abspath(self.privkey.file),
cert_chain_abspath)
self.installer.deploy_cert(host,
os.path.abspath(cert_file),
os.path.abspath(privkey.file),
chain)
# Enable any vhost that was issued to, but not enabled
if not host.enabled:
logging.info("Enabling Site %s", host.filep)
self.config.enable_site(host)
self.installer.enable_site(host)
self.installer.save("Deployed Let's Encrypt Certificate")
# sites may have been enabled / final cleanup
self.config.restart(quiet=self.use_curses)
self.installer.restart()
display.success_installation(self.names)
return cert_file
return vhost
def optimize_config(self, vhost, redirect=None):
"""Optimize the configuration.
.. todo:: Handle multiple vhosts
:param vhost: vhost to optimize
:type vhost: :class:`apache_configurator.VH`
@ -436,13 +412,12 @@ class Client(object):
:type redirect: bool or None
"""
# TODO: this should most definitely be moved to __init__
if redirect is None:
redirect = display.redirect_by_default()
if redirect:
self.redirect_to_ssl(vhost)
self.config.restart(quiet=self.use_curses)
self.installer.restart()
# if self.ocsp_stapling is None:
# q = ("Would you like to protect the privacy of your users "
@ -463,7 +438,7 @@ class Client(object):
logging.info("Cleaning up challenges...")
for chall in challenges:
if chall["type"] in CONFIG.CONFIG_CHALLENGES:
self.config.cleanup()
self.auth.cleanup()
else:
# Handle other cleanup if needed
pass
@ -495,11 +470,11 @@ class Client(object):
for i, c_obj in enumerate(challenge_objs):
resp = "null"
if c_obj["type"] in CONFIG.CONFIG_CHALLENGES:
resp = self.config.perform(c_obj)
resp = self.auth.perform(c_obj)
else:
# Handle RecoveryToken type challenges
pass
self._assign_responses(resp, indices[i], responses)
logging.info(
@ -518,14 +493,13 @@ class Client(object):
"""
if isinstance(resp, list):
assert(len(resp) == len(index_list))
assert len(resp) == len(index_list)
for j, index in enumerate(index_list):
responses[index] = resp[j]
else:
else:
for index in index_list:
responses[index] = resp
def store_cert_key(self, cert_file, encrypt=False):
"""Store certificate key.
@ -554,17 +528,17 @@ class Client(object):
for row in csvreader:
idx = int(row[0]) + 1
csvwriter = csv.writer(csvfile)
csvwriter.writerow([str(idx), cert_file, self.privkey.file])
csvwriter.writerow([str(idx), cert_file, self.authkey.file])
else:
with open(list_file, 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(["0", cert_file, self.privkey.file])
csvwriter.writerow(["0", cert_file, self.authkey.file])
shutil.copy2(self.privkey.file,
shutil.copy2(self.authkey.file,
os.path.join(
CONFIG.CERT_KEY_BACKUP,
os.path.basename(self.privkey.file) + "_" + str(idx)))
os.path.basename(self.authkey.file) + "_" + str(idx)))
shutil.copy2(cert_file,
os.path.join(
CONFIG.CERT_KEY_BACKUP,
@ -576,16 +550,16 @@ class Client(object):
"""Redirect all traffic from HTTP to HTTPS
:param vhost: list of ssl_vhosts
:type vhost: :class:`apache_configurator.VH`
:type vhost: :class:`letsencrypt.client.apache.obj.VH`
"""
for ssl_vh in vhost:
success, redirect_vhost = self.config.enable_redirect(ssl_vh)
success, redirect_vhost = self.installer.enable_redirect(ssl_vh)
logging.info(
"\nRedirect vhost: %s - %s ", redirect_vhost.filep, success)
# If successful, make sure redirect site is enabled
if success:
self.config.enable_site(redirect_vhost)
self.installer.enable_site(redirect_vhost)
def get_virtual_hosts(self, domains):
"""Retrieve the appropriate virtual host for the domain
@ -598,7 +572,7 @@ class Client(object):
"""
vhost = set()
for name in domains:
host = self.config.choose_virtual_host(name)
host = self.installer.choose_virtual_host(name)
if host is not None:
vhost.add(host)
return vhost
@ -651,108 +625,106 @@ class Client(object):
challenge_objs.append({
"type": "dvsni",
"list_sni_tuple": sni_todo,
"dvsni_key": self.privkey,
"dvsni_key": self.authkey,
})
challenge_obj_indices.append(sni_satisfies)
logging.debug(sni_todo)
return challenge_objs, challenge_obj_indices
def init_key_csr(self):
"""Initializes privkey and csr.
Inits key and CSR using provided files or generating new files
if necessary. Both will be saved in PEM format on the
filesystem. The CSR is placed into DER format to allow
the namedtuple to easily work with the protocol.
def validate_key_csr(privkey, csr, names):
"""Validate CSR and key files.
"""
if not self.privkey.file:
key_pem = crypto_util.make_key(CONFIG.RSA_KEY_SIZE)
Verifies that the client key and csr arguments are valid and
correspond to one another.
# Save file
le_util.make_or_verify_dir(CONFIG.KEY_DIR, 0o700)
key_f, key_filename = le_util.unique_file(
os.path.join(CONFIG.KEY_DIR, "key-letsencrypt.pem"), 0o600)
key_f.write(key_pem)
key_f.close()
:raises LetsEncryptClientError: if validation fails
logging.info("Generating key: %s", key_filename)
"""
# TODO: Handle all of these problems appropriately
# The client can eventually do things like prompt the user
# and allow the user to take more appropriate actions
self.privkey = Client.Key(key_filename, key_pem)
if csr.form == "der":
csr_obj = M2Crypto.X509.load_request_der_string(csr.data)
csr = Client.CSR(csr.file, csr_obj.as_pem(), "der")
if not self.csr.file:
csr_pem, csr_der = crypto_util.make_csr(
self.privkey.pem, self.names)
# If CSR is provided, it must be readable and valid.
if csr.data and not crypto_util.valid_csr(csr.data):
raise errors.LetsEncryptClientError(
"The provided CSR is not a valid CSR")
# Save CSR
le_util.make_or_verify_dir(CONFIG.CERT_DIR, 0o755)
csr_f, csr_filename = le_util.unique_file(
os.path.join(CONFIG.CERT_DIR, "csr-letsencrypt.pem"), 0o644)
csr_f.write(csr_pem)
csr_f.close()
# If key is provided, it must be readable and valid.
if privkey.pem and not crypto_util.valid_privkey(privkey.pem):
raise errors.LetsEncryptClientError(
"The provided key is not a valid key")
logging.info("Creating CSR: %s", csr_filename)
self.csr = Client.CSR(csr_filename, csr_der, "der")
elif self.csr.type != "der":
# The user is going to pass in a pem format file
# That is why we must conver it to der since the
# protocol uses der exclusively.
csr_obj = M2Crypto.X509.load_request_string(self.csr.data)
self.csr = Client.CSR(self.csr.file, csr_obj.as_der(), "der")
def _validate_csr_key_cli(self):
"""Validate CSR and key files.
Verifies that the client key and csr arguments are valid and
correspond to one another.
:raises LetsEncryptClientError: if validation fails
"""
# TODO: Handle all of these problems appropriately
# The client can eventually do things like prompt the user
# and allow the user to take more appropriate actions
# If CSR is provided, it must be readable and valid.
if self.csr.data and not crypto_util.valid_csr(self.csr.data):
# If CSR and key are provided, the key must be the same key used
# in the CSR.
if csr.data and privkey.pem:
if not crypto_util.csr_matches_pubkey(
csr.data, privkey.pem):
raise errors.LetsEncryptClientError(
"The provided CSR is not a valid CSR")
"The key and CSR do not match")
# If key is provided, it must be readable and valid.
if (self.privkey.pem and
not crypto_util.valid_privkey(self.privkey.pem)):
raise errors.LetsEncryptClientError(
"The provided key is not a valid key")
if not crypto_util.csr_matches_names(csr.data, names):
raise errors.LetsEncryptClientError(
"CSR subject does not contain one of the specified names")
# If CSR and key are provided, the key must be the same key used
# in the CSR.
if self.csr.data and self.privkey.pem:
if not crypto_util.csr_matches_pubkey(
self.csr.data, self.privkey.pem):
raise errors.LetsEncryptClientError(
"The key and CSR do not match")
def get_all_names(self):
"""Return all valid names in the configuration."""
names = list(self.config.get_all_names())
sanity_check_names(names)
def init_key():
"""Initializes privkey.
if not names:
logging.fatal("No domain names were found in your apache config")
logging.fatal("Either specify which names you would like "
"letsencrypt to validate or add server names "
"to your virtual hosts")
sys.exit(1)
Inits key and CSR using provided files or generating new files
if necessary. Both will be saved in PEM format on the
filesystem. The CSR is placed into DER format to allow
the namedtuple to easily work with the protocol.
return names
"""
key_pem = crypto_util.make_key(CONFIG.RSA_KEY_SIZE)
# Save file
le_util.make_or_verify_dir(CONFIG.KEY_DIR, 0o700)
key_f, key_filename = le_util.unique_file(
os.path.join(CONFIG.KEY_DIR, "key-letsencrypt.pem"), 0o600)
key_f.write(key_pem)
key_f.close()
logging.info("Generating key: %s", key_filename)
return Client.Key(key_filename, key_pem)
def init_csr(privkey, names):
"""Initialize a CSR with the given private key."""
csr_pem, csr_der = crypto_util.make_csr(
privkey.pem, names)
# Save CSR
le_util.make_or_verify_dir(CONFIG.CERT_DIR, 0o755)
csr_f, csr_filename = le_util.unique_file(
os.path.join(CONFIG.CERT_DIR, "csr-letsencrypt.pem"), 0o644)
csr_f.write(csr_pem)
csr_f.close()
logging.info("Creating CSR: %s", csr_filename)
return Client.CSR(csr_filename, csr_der, "der")
def csr_pem_to_der(csr):
"""Convert pem CSR to der."""
csr_obj = M2Crypto.X509.load_request_string(csr.data)
return Client.CSR(csr.file, csr_obj.as_der(), "der")
def remove_cert_key(cert):
"""Remove certificate key.
"""Remove certificate and key.
:param dict cert:
:param dict cert: Cert dict used throughout revocation
"""
list_file = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST")

View file

@ -1,98 +0,0 @@
"""Configurator."""
class Configurator(object):
"""Generic Let's Encrypt configurator.
Class represents all possible webservers and configuration editors
This includes the generic webserver which wont have configuration
files at all, but instead create a new process to handle the DVSNI
and other challenges.
"""
def deploy_cert(self, vhost, cert, key, cert_chain=None):
"""Deploy certificate.
:param vhost
:param str cert: CSR
:param str key: Private key
"""
raise NotImplementedError()
def choose_virtual_host(self, name):
"""Chooses a virtual host based on a given domain name."""
raise NotImplementedError()
def get_all_names(self):
"""Returns all names found in the configuration."""
raise NotImplementedError()
def enable_redirect(self, ssl_vhost):
"""Redirect all traffic to the given ssl_vhost (port 80 => 443)."""
raise NotImplementedError()
def enable_hsts(self, ssl_vhost):
"""Enable HSTS on the given ssl_vhost."""
raise NotImplementedError()
def enable_ocsp_stapling(self, ssl_vhost):
"""Enable OCSP stapling on given ssl_vhost."""
raise NotImplementedError()
def get_all_certs_keys(self):
"""Retrieve all certs and keys set in configuration.
:returns: List of tuples with form [(cert, key, path)].
:rtype: list
"""
raise NotImplementedError()
def enable_site(self, vhost):
"""Enable the site at the given vhost."""
raise NotImplementedError()
def save(self, title=None, temporary=False):
"""Saves all changes to the configuration files.
Both title and temporary are needed because a save may be
intended to be permanent, but the save is not ready to be a full
checkpoint
:param str title: The title of the save. If a title is given, the
configuration will be saved as a new checkpoint and put in a
timestamped directory. `title` has no effect if temporary is true.
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (challenges)
"""
raise NotImplementedError()
def revert_challenge_config(self):
"""Reload the users original configuration files."""
raise NotImplementedError()
def rollback_checkpoints(self, rollback=1):
"""Revert `rollback` number of configuration checkpoints."""
raise NotImplementedError()
def display_checkpoints(self):
"""Display the saved configuration checkpoints."""
raise NotImplementedError()
def config_test(self):
"""Make sure the configuration is valid."""
raise NotImplementedError()
def restart(self, quiet=False):
"""Restart or refresh the server content."""
raise NotImplementedError()
def perform(self, chall_dict):
"""Perform the given challenge"""
raise NotImplementedError()
def cleanup(self):
"""Cleanup configuration changes from challenge."""
raise NotImplementedError()

View file

@ -15,8 +15,6 @@ from letsencrypt.client import CONFIG
from letsencrypt.client import le_util
# TODO: All of these functions need unit tests
def b64_cert_to_pem(b64_der_cert):
return M2Crypto.X509.load_cert_der_string(
le_util.jose_b64decode(b64_der_cert)).as_pem()
@ -55,8 +53,8 @@ def create_sig(msg, key_str, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
logging.debug('%s signed as %s', msg_with_nonce, signature)
n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].rstrip("L")))
e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].rstrip("L")))
n_bytes = binascii.unhexlify(_leading_zeros(hex(key.n)[2:].rstrip("L")))
e_bytes = binascii.unhexlify(_leading_zeros(hex(key.e)[2:].rstrip("L")))
return {
"nonce": le_util.jose_b64encode(nonce),
@ -70,7 +68,7 @@ def create_sig(msg, key_str, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
}
def leading_zeros(arg):
def _leading_zeros(arg):
if len(arg) % 2:
return "0" + arg
return arg
@ -82,9 +80,8 @@ def sha256(arg):
# based on M2Crypto unit test written by Toby Allsopp
def make_key(bits=CONFIG.RSA_KEY_SIZE):
"""
Returns new RSA key in PEM form with specified bits
"""
"""Returns new RSA key in PEM form with specified bits."""
# Python Crypto module doesn't produce any stdout
key = Crypto.PublicKey.RSA.generate(bits)
# rsa = M2Crypto.RSA.gen_key(bits, 65537)
@ -229,7 +226,7 @@ def csr_matches_names(csr, domains):
M2Crypto currently does not expose the OpenSSL interface to
also check the SAN extension. This is insufficient for full testing
:param str csr: CSR file contents
:param str csr: CSR file contents in pem form
:param list domains: Domains the CSR should contain.
@ -238,7 +235,7 @@ def csr_matches_names(csr, domains):
"""
try:
csr_obj = M2Crypto.X509.load_request_der_string(csr)
csr_obj = M2Crypto.X509.load_request_string(csr)
return csr_obj.get_subject().CN in domains
except M2Crypto.X509.X509Error:
return False

View file

@ -0,0 +1,92 @@
"""Interfaces."""
import zope.interface
class IAuthenticator(zope.interface.Interface):
"""Generic Let's Encrypt Authenticator.
Class represents all possible tools processes that have the
ability to perform challenges and attain a certificate.
"""
def perform(chall_dict):
"""Perform the given challenge"""
def cleanup():
"""Revert changes and shutdown after challenges complete."""
class IInstaller(zope.interface.Interface):
"""Generic Let's Encrypt Installer Interface.
Represents any server that an X509 certificate can be placed.
With a focus on HTTPS optimizations.
.. todo:: All optimizations should be of the form .enable("hsts")
This will make it general towards any optimization... we should also
define a function to glean what optimizations are available.
Perhaps with text that describes the optimizations...
"""
def get_all_names():
"""Returns all names that may be authenticated."""
def deploy_cert(vhost, cert, key, cert_chain=None):
"""Deploy certificate.
:param vhost
:param str cert: CSR
:param str key: Private key
"""
def choose_virtual_host(name):
"""Chooses a virtual host based on a given domain name."""
def enable_redirect(ssl_vhost):
"""Redirect all traffic to the given ssl_vhost (port 80 => 443)."""
def enable_hsts(ssl_vhost):
"""Enable HSTS on the given ssl_vhost."""
def enable_ocsp_stapling(ssl_vhost):
"""Enable OCSP stapling on given ssl_vhost."""
def get_all_certs_keys():
"""Retrieve all certs and keys set in configuration.
:returns: List of tuples with form [(cert, key, path)].
:rtype: list
"""
def enable_site(vhost):
"""Enable the site at the given vhost."""
def save(title=None, temporary=False):
"""Saves all changes to the configuration files.
Both title and temporary are needed because a save may be
intended to be permanent, but the save is not ready to be a full
checkpoint
:param str title: The title of the save. If a title is given, the
configuration will be saved as a new checkpoint and put in a
timestamped directory. `title` has no effect if temporary is true.
:param bool temporary: Indicates whether the changes made will
be quickly reversed in the future (challenges)
"""
def rollback_checkpoints(rollback=1):
"""Revert `rollback` number of configuration checkpoints."""
def display_checkpoints():
"""Display the saved configuration checkpoints."""
def config_test():
"""Make sure the configuration is valid."""
def restart():
"""Restart or refresh the server content."""

View file

@ -5,10 +5,14 @@ import logging
import os
import sys
import zope.interface
from letsencrypt.client import apache_configurator
from letsencrypt.client import CONFIG
from letsencrypt.client import client
from letsencrypt.client import display
from letsencrypt.client import interfaces
from letsencrypt.client import errors
from letsencrypt.client import log
@ -87,21 +91,105 @@ def main():
server = args.server is None and CONFIG.ACME_SERVER or args.server
if not args.eula:
display_eula()
auth = determine_authenticator()
# Use the same object if possible
if interfaces.IInstaller.providedBy(auth):
installer = auth
else:
installer = determine_installer()
domains = choose_names(installer) if args.domains is None else args.domains
# Prepare for init of Client
if args.privkey is None:
privkey = client.Client.Key(None, None)
privkey = client.init_key()
else:
privkey = client.Client.Key(args.privkey[0], args.privkey[1])
if args.csr is None:
csr = client.Client.CSR(None, None, None)
csr = client.init_csr(privkey, domains)
else:
csr = client.Client.CSR(args.csr[0], args.csr[1], "pem")
csr = client.csr_pem_to_der(
client.Client.CSR(args.csr[0], args.csr[1], "pem"))
acme = client.Client(server, csr, privkey, args.use_curses)
acme = client.Client(server, domains, privkey, auth, installer)
if args.revoke:
acme.list_certs_keys()
else:
acme.authenticate(args.domains, args.eula, args.redirect)
# Validate the key and csr
client.validate_key_csr(privkey, csr, domains)
cert_file, chain_file = acme.obtain_certificate(privkey, csr)
vhost = acme.deploy_certificate(privkey, cert_file, chain_file)
acme.optimize_config(vhost, args.redirect)
def display_eula():
"""Displays the end user agreement."""
with open('EULA') as eula_file:
if not display.generic_yesno(
eula_file.read(), "Agree", "Cancel"):
sys.exit(0)
def choose_names(installer):
"""Display screen to select domains to validate.
:param installer: An installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
# This function adds all names
# found within the config to self.names
# Then filters them based on user selection
code, names = display.filter_names(get_all_names(installer))
if code == display.OK and names:
# TODO: Allow multiple names once it is setup
return [names[0]]
else:
sys.exit(0)
def get_all_names(installer):
"""Return all valid names in the configuration.
:param installer: An installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
names = list(installer.get_all_names())
client.sanity_check_names(names)
if not names:
logging.fatal("No domain names were found in your installation")
logging.fatal("Either specify which names you would like "
"letsencrypt to validate or add server names "
"to your virtual hosts")
sys.exit(1)
return names
# This should be controlled by commandline parameters
def determine_authenticator():
"""Returns a valid authenticator."""
try:
return apache_configurator.ApacheConfigurator()
except errors.LetsEncryptConfiguratorError:
log.info("Unable to find a way to authenticate.")
def determine_installer():
"""Returns a valid installer if one exists."""
try:
return apache_configurator.ApacheConfigurator()
except errors.LetsEncryptConfiguratorError:
log.info("Unable to find a way to install the certificate.")
def read_file(filename):
@ -143,6 +231,5 @@ def view_checkpoints(config):
"""
config.display_checkpoints()
if __name__ == "__main__":
main()

View file

@ -11,6 +11,7 @@ install_requires = [
'python-augeas',
'python2-pythondialog',
'requests',
'zope.interface',
]
docs_extras = [