Merge pull request #593 from kuba/account-resource-json

Rewrite accounts and registration
This commit is contained in:
James Kasten 2015-07-10 00:37:17 -07:00
commit cb3863b5fd
39 changed files with 935 additions and 836 deletions

View file

@ -510,7 +510,8 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
def test_head(self):
self.assertEqual(self.response, self.net.head('url', 'foo', bar='baz'))
self.send_request.assert_called_once('HEAD', 'url', 'foo', bar='baz')
self.send_request.assert_called_once_with(
'HEAD', 'url', 'foo', bar='baz')
def test_get(self):
self.assertEqual(self.checked_response, self.net.get(

View file

@ -182,30 +182,6 @@ class Registration(interfaces.ClientRequestableResource, ResourceBody):
"""All emails found in the ``contact`` field."""
return self._filter_contact(self.email_prefix)
@property
def phone(self):
"""Phone.
Picks any phone from `phones` or ``None`` if not available.
"""
try:
return self.phones[0]
except IndexError:
return None
@property
def email(self):
"""Email.
Picks any email from `emails` or ``None`` if not available.
"""
try:
return self.emails[0]
except IndexError:
return None
class RegistrationResource(interfaces.ClientRequestableResource,
ResourceWithURI):

View file

@ -147,18 +147,6 @@ class RegistrationTest(unittest.TestCase):
def test_emails(self):
self.assertEqual(('admin@foo.com',), self.reg.emails)
def test_phone(self):
self.assertEqual('1234', self.reg.phone)
def test_phone_none(self):
self.assertTrue(self.reg_none.phone is None)
def test_email(self):
self.assertEqual('admin@foo.com', self.reg.email)
def test_email_none(self):
self.assertTrue(self.reg_none.email is None)
def test_to_partial_json(self):
self.assertEqual(self.jobj_to, self.reg.to_partial_json())

View file

@ -1,5 +0,0 @@
:mod:`letsencrypt.network`
--------------------------
.. automodule:: letsencrypt.network
:members:

View file

@ -1,233 +1,203 @@
"""Creates ACME accounts for server."""
import datetime
import hashlib
import logging
import os
import re
import socket
import configobj
from cryptography.hazmat.primitives import serialization
import pyrfc3339
import pytz
import zope.component
from acme import fields as acme_fields
from acme import jose
from acme import messages
from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt.display import util as display_util
logger = logging.getLogger(__name__)
class Account(object):
class Account(object): # pylint: disable=too-few-public-methods
"""ACME protocol registration.
:ivar config: Client configuration object
:type config: :class:`~letsencrypt.interfaces.IConfig`
:ivar key: Account/Authorized Key
:type key: :class:`~letsencrypt.le_util.Key`
:ivar str email: Client's email address
:ivar str phone: Client's phone number
:ivar regr: Registration Resource
:type regr: :class:`~acme.messages.RegistrationResource`
:ivar .RegistrationResource regr: Registration Resource
:ivar .JWK key: Authorized Account Key
:ivar .Meta: Account metadata
:ivar str id: Globally unique account identifier.
"""
# Just make sure we don't get pwned
# Make sure that it also doesn't start with a period or have two consecutive
# periods <- this needs to be done in addition to the regex
EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$")
class Meta(jose.JSONObjectWithFields):
"""Account metadata
def __init__(self, config, key, email=None, phone=None, regr=None):
le_util.make_or_verify_dir(
config.accounts_dir, 0o700, os.geteuid())
self.key = key
self.config = config
if email is not None and self.safe_email(email):
self.email = email
else:
self.email = None
self.phone = phone
:ivar datetime.datetime creation_dt: Creation date and time (UTC).
:ivar str creation_host: FQDN of host, where account has been created.
self.regr = regr
@property
def uri(self):
"""URI link for new registrations."""
if self.regr is not None:
return self.regr.uri
else:
return None
@property
def new_authzr_uri(self): # pylint: disable=missing-docstring
if self.regr is not None:
return self.regr.new_authzr_uri
else:
return None
@property
def terms_of_service(self): # pylint: disable=missing-docstring
if self.regr is not None:
return self.regr.terms_of_service
else:
return None
@property
def recovery_token(self): # pylint: disable=missing-docstring
if self.regr is not None and self.regr.body is not None:
return self.regr.body.recovery_token
else:
return None
def save(self):
"""Save account to disk."""
le_util.make_or_verify_dir(
self.config.accounts_dir, 0o700, os.geteuid())
acc_config = configobj.ConfigObj()
acc_config.filename = os.path.join(
self.config.accounts_dir, self._get_config_filename(self.email))
acc_config.initial_comment = [
"DO NOT EDIT THIS FILE",
"Account information for %s under %s" % (
self._get_config_filename(self.email), self.config.server),
]
acc_config["key"] = self.key.file
acc_config["phone"] = self.phone
if self.regr is not None:
acc_config["RegistrationResource"] = {}
acc_config["RegistrationResource"]["uri"] = self.uri
acc_config["RegistrationResource"]["new_authzr_uri"] = (
self.new_authzr_uri)
acc_config["RegistrationResource"]["terms_of_service"] = (
self.terms_of_service)
regr_dict = self.regr.body.to_json()
acc_config["RegistrationResource"]["body"] = regr_dict
acc_config.write()
@classmethod
def _get_config_filename(cls, email):
return email if email is not None and email else "default"
@classmethod
def from_existing_account(cls, config, email=None):
"""Populate an account from an existing email."""
config_fp = os.path.join(
config.accounts_dir, cls._get_config_filename(email))
return cls._from_config_fp(config, config_fp)
@classmethod
def _from_config_fp(cls, config, config_fp):
try:
acc_config = configobj.ConfigObj(
infile=config_fp, file_error=True, create_empty=False)
except IOError:
raise errors.Error(
"Account for %s does not exist" % os.path.basename(config_fp))
if os.path.basename(config_fp) != "default":
email = os.path.basename(config_fp)
else:
email = None
phone = acc_config["phone"] if acc_config["phone"] != "None" else None
with open(acc_config["key"]) as key_file:
key = le_util.Key(acc_config["key"], key_file.read())
if "RegistrationResource" in acc_config:
acc_config_rr = acc_config["RegistrationResource"]
regr = messages.RegistrationResource(
uri=acc_config_rr["uri"],
new_authzr_uri=acc_config_rr["new_authzr_uri"],
terms_of_service=acc_config_rr["terms_of_service"],
body=messages.Registration.from_json(acc_config_rr["body"]))
else:
regr = None
return cls(config, key, email, phone, regr)
@classmethod
def get_accounts(cls, config):
"""Return all current accounts.
:param config: Configuration
:type config: :class:`letsencrypt.interfaces.IConfig`
.. note:: ``creation_dt`` and ``creation_host`` are useful in
cross-machine migration scenarios.
"""
creation_dt = acme_fields.RFC3339Field("creation_dt")
creation_host = jose.Field("creation_host")
def __init__(self, regr, key, meta=None):
self.key = key
self.regr = regr
self.meta = self.Meta(
# pyrfc3339 drops microseconds, make sure __eq__ is sane
creation_dt=datetime.datetime.now(
tz=pytz.UTC).replace(microsecond=0),
creation_host=socket.getfqdn()) if meta is None else meta
self.id = hashlib.md5( # pylint: disable=invalid-name
self.key.key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
).hexdigest()
# Implementation note: Email? Multiple accounts can have the
# same email address. Registration URI? Assigned by the
# server, not guaranteed to be stable over time, nor
# cannonical URI can be generated. ACME protocol doesn't allow
# account key (and thus its fingerprint) to be updated...
@property
def slug(self):
"""Short account identification string, useful for UI."""
return "{1}@{0} ({2})".format(pyrfc3339.generate(
self.meta.creation_dt), self.meta.creation_host, self.id[:4])
def __repr__(self):
return "<{0}({1})>".format(self.__class__.__name__, self.id)
def __eq__(self, other):
return (isinstance(other, self.__class__) and
self.key == other.key and self.regr == other.regr and
self.meta == other.meta)
def report_new_account(acc, config):
"""Informs the user about their new Let's Encrypt account."""
reporter = zope.component.queryUtility(interfaces.IReporter)
if reporter is None:
return
reporter.add_message(
"Your account credentials have been saved in your Let's Encrypt "
"configuration directory at {0}. You should make a secure backup "
"of this folder now. This configuration directory will also "
"contain certificates and private keys obtained by Let's Encrypt "
"so making regular backups of this folder is ideal.".format(
config.config_dir),
reporter.MEDIUM_PRIORITY, True)
assert acc.regr.body.recovery_token is not None
recovery_msg = ("If you lose your account credentials, you can recover "
"them using the token \"{0}\". You must write that down "
"and put it in a safe place.".format(
acc.regr.body.recovery_token))
if acc.regr.body.emails:
recovery_msg += (" Another recovery method will be e-mails sent to "
"{0}.".format(", ".join(acc.regr.body.emails)))
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
class AccountMemoryStorage(interfaces.AccountStorage):
"""In-memory account strage."""
def __init__(self, initial_accounts=None):
self.accounts = initial_accounts if initial_accounts is not None else {}
def find_all(self):
return self.accounts.values()
def save(self, account):
if account.id in self.accounts:
logger.debug("Overwriting account: %s", account.id)
self.accounts[account.id] = account
def load(self, account_id):
try:
filenames = os.listdir(config.accounts_dir)
return self.accounts[account_id]
except KeyError:
raise errors.AccountNotFound(account_id)
class AccountFileStorage(interfaces.AccountStorage):
"""Accounts file storage.
:ivar .IConfig config: Client configuration
"""
def __init__(self, config):
le_util.make_or_verify_dir(config.accounts_dir, 0o700, os.geteuid())
self.config = config
def _account_dir_path(self, account_id):
return os.path.join(self.config.accounts_dir, account_id)
@classmethod
def _regr_path(cls, account_dir_path):
return os.path.join(account_dir_path, "regr.json")
@classmethod
def _key_path(cls, account_dir_path):
return os.path.join(account_dir_path, "private_key.json")
@classmethod
def _metadata_path(cls, account_dir_path):
return os.path.join(account_dir_path, "meta.json")
def find_all(self):
try:
candidates = os.listdir(self.config.accounts_dir)
except OSError:
return []
accounts = []
for name in filenames:
# Not some directory ie. keys
config_fp = os.path.join(config.accounts_dir, name)
if os.path.isfile(config_fp):
accounts.append(cls._from_config_fp(config, config_fp))
for account_id in candidates:
try:
accounts.append(self.load(account_id))
except errors.AccountStorageError:
logger.debug("Account loading problem", exc_info=True)
return accounts
@classmethod
def from_prompts(cls, config):
"""Generate an account from prompted user input.
def load(self, account_id):
account_dir_path = self._account_dir_path(account_id)
if not os.path.isdir(account_dir_path):
raise errors.AccountNotFound(
"Account at %s does not exist" % account_dir_path)
:param config: Configuration
:type config: :class:`letsencrypt.interfaces.IConfig`
try:
with open(self._regr_path(account_dir_path)) as regr_file:
regr = messages.RegistrationResource.json_loads(regr_file.read())
with open(self._key_path(account_dir_path)) as key_file:
key = jose.JWK.json_loads(key_file.read())
with open(self._metadata_path(account_dir_path)) as metadata_file:
meta = Account.Meta.json_loads(metadata_file.read())
except IOError as error:
raise errors.AccountStorageError(error)
:returns: Account or None
:rtype: :class:`letsencrypt.account.Account`
acc = Account(regr, key, meta)
if acc.id != account_id:
raise errors.AccountStorageError(
"Account ids mismatch (expected: {0}, found: {1}".format(
account_id, acc.id))
return acc
"""
while True:
code, email = zope.component.getUtility(interfaces.IDisplay).input(
"Enter email address")
if code == display_util.OK:
try:
return cls.from_email(config, email)
except errors.Error:
continue
else:
return None
@classmethod
def from_email(cls, config, email):
"""Generate a new account from an email address.
:param config: Configuration
:type config: :class:`letsencrypt.interfaces.IConfig`
:param str email: Email address
:raises .errors.Error: If invalid email address is given.
"""
if not email or cls.safe_email(email):
email = email if email else None
le_util.make_or_verify_dir(
config.account_keys_dir, 0o700, os.geteuid())
key = crypto_util.init_save_key(
config.rsa_key_size, config.account_keys_dir,
cls._get_config_filename(email))
return cls(config, key, email)
raise errors.Error("Invalid email address.")
@classmethod
def safe_email(cls, email):
"""Scrub email address before using it."""
if cls.EMAIL_REGEX.match(email):
return not email.startswith(".") and ".." not in email
else:
logger.warn("Invalid email address: %s.", email)
return False
def save(self, account):
account_dir_path = self._account_dir_path(account.id)
le_util.make_or_verify_dir(account_dir_path, 0o700, os.geteuid())
try:
with open(self._regr_path(account_dir_path), "w") as regr_file:
regr_file.write(account.regr.json_dumps())
with le_util.safe_open(self._key_path(account_dir_path),
"w", chmod=0o400) as key_file:
key_file.write(account.key.json_dumps())
with open(self._metadata_path(account_dir_path), "w") as metadata_file:
metadata_file.write(account.meta.json_dumps())
except IOError as error:
raise errors.AccountStorageError(error)

View file

@ -57,7 +57,7 @@ class DVSNI(AnnotatedChallenge):
"""
response = challenges.DVSNIResponse(s=s)
cert_pem = crypto_util.make_ss_cert(self.key.pem, [
cert_pem = crypto_util.make_ss_cert(self.key, [
self.domain, self.nonce_domain, response.z_domain(self.challb)])
return cert_pem, response

View file

@ -28,9 +28,7 @@ class AuthHandler(object):
:class:`~acme.challenges.ContinuityChallenge` types
:type cont_auth: :class:`letsencrypt.interfaces.IAuthenticator`
:ivar network: Network object for sending and receiving authorization
messages
:type network: :class:`letsencrypt.network.Network`
:ivar acme.client.Client acme: ACME client API.
:ivar account: Client's Account
:type account: :class:`letsencrypt.account.Account`
@ -43,10 +41,10 @@ class AuthHandler(object):
form of :class:`letsencrypt.achallenges.AnnotatedChallenge`
"""
def __init__(self, dv_auth, cont_auth, network, account):
def __init__(self, dv_auth, cont_auth, acme, account):
self.dv_auth = dv_auth
self.cont_auth = cont_auth
self.network = network
self.acme = acme
self.account = account
self.authzr = dict()
@ -71,8 +69,8 @@ class AuthHandler(object):
"""
for domain in domains:
self.authzr[domain] = self.network.request_domain_challenges(
domain, self.account.new_authzr_uri)
self.authzr[domain] = self.acme.request_domain_challenges(
domain, self.account.regr.new_authzr_uri)
self._choose_challenges(domains)
@ -157,7 +155,7 @@ class AuthHandler(object):
for achall, resp in itertools.izip(achalls, resps):
# Don't send challenges for None and False authenticator responses
if resp:
self.network.answer_challenge(achall.challb, resp)
self.acme.answer_challenge(achall.challb, resp)
# TODO: answer_challenge returns challr, with URI,
# that can be used in _find_updated_challr
# comparisons...
@ -211,7 +209,7 @@ class AuthHandler(object):
completed = []
failed = []
self.authzr[domain], _ = self.network.poll(self.authzr[domain])
self.authzr[domain], _ = self.acme.poll(self.authzr[domain])
if self.authzr[domain].body.status == messages.STATUS_VALID:
return achalls, []

View file

@ -6,6 +6,7 @@ import functools
import logging
import logging.handlers
import os
import pkg_resources
import sys
import time
import traceback
@ -76,23 +77,6 @@ More detailed help:
"""
def _account_init(args, config):
# Prepare for init of Client
if args.email is None:
return client.determine_account(config)
else:
try:
# The way to get the default would be args.email = ""
# First try existing account
return account.Account.from_existing_account(config, args.email)
except errors.Error:
try:
# Try to make an account based on the email address
return account.Account.from_email(config, args.email)
except errors.Error:
return None
def _find_domains(args, installer):
if args.domains is None:
domains = display_ops.choose_names(installer)
@ -106,30 +90,77 @@ def _find_domains(args, installer):
return domains
def _init_acme(config, acc, authenticator, installer):
acme = client.Client(config, acc, authenticator, installer)
def _determine_account(args, config):
"""Determine which account to use.
# Validate the key and csr
client.validate_key_csr(acc.key)
In order to make the renewer (configuration de/serialization) happy,
if ``args.account`` is ``None``, it will be updated based on the
user input. Same for ``args.email``.
:param argparse.Namespace args: CLI arguments
:param letsencrypt.interface.IConfig config: Configuration object
:param .AccountStorage account_storage: Account storage.
:returns: Account and optionally ACME client API (biproduct of new
registration).
:rtype: `tuple` of `letsencrypt.account.Account` and
`acme.client.Client`
"""
account_storage = account.AccountFileStorage(config)
acme = None
if args.account is not None:
acc = account_storage.load(args.account)
else:
accounts = account_storage.find_all()
if len(accounts) > 1:
acc = display_ops.choose_account(accounts)
elif len(accounts) == 1:
acc = accounts[0]
else: # no account registered yet
if args.email is None:
args.email = display_ops.get_email()
if not args.email: # get_email might return ""
args.email = None
def _tos_cb(regr):
if args.tos:
return True
msg = ("Please read the Terms of Service at {0}. You "
"must agree in order to register with the ACME "
"server at {1}".format(
regr.terms_of_service, config.server))
return zope.component.getUtility(interfaces.IDisplay).yesno(
msg, "Agree", "Cancel")
if authenticator is not None:
if acc.regr is None:
try:
acme.register()
acc, acme = client.register(
config, account_storage, tos_cb=_tos_cb)
except errors.Error as error:
logger.debug(error)
raise errors.Error("Unable to register an account with ACME "
"server")
logger.debug(error, exc_info=True)
raise errors.Error(
"Unable to register an account with ACME server")
return acme
args.account = acc.id
return acc, acme
def _init_le_client(args, config, authenticator, installer):
if authenticator is not None:
# if authenticator was given, then we will need account...
acc, acme = _determine_account(args, config)
logger.debug("Picked account: %r", acc)
# XXX
#crypto_util.validate_key_csr(acc.key)
else:
acc, acme = None, None
return client.Client(config, acc, authenticator, installer, acme=acme)
def run(args, config, plugins):
"""Obtain a certificate and install."""
acc = _account_init(args, config)
if acc is None:
return None
if args.configurator is not None and (args.installer is not None or
args.authenticator is not None):
return ("Either --configurator or --authenticator/--installer"
@ -150,14 +181,15 @@ def run(args, config, plugins):
return "Configurator could not be determined"
domains = _find_domains(args, installer)
# TODO: Handle errors from _init_acme?
acme = _init_acme(config, acc, authenticator, installer)
lineage = acme.obtain_and_enroll_certificate(
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(args, config, authenticator, installer)
lineage = le_client.obtain_and_enroll_certificate(
domains, authenticator, installer, plugins)
if not lineage:
return "Certificate could not be obtained"
acme.deploy_certificate(domains, lineage.privkey, lineage.cert, lineage.chain)
acme.enhance_config(domains, args.redirect)
le_client.deploy_certificate(
domains, lineage.privkey, lineage.cert, lineage.chain)
le_client.enhance_config(domains, args.redirect)
def auth(args, config, plugins):
@ -169,10 +201,6 @@ def auth(args, config, plugins):
# supplied, check if CSR matches given domains?
return "--domains and --csr are mutually exclusive"
acc = _account_init(args, config)
if acc is None:
return None
authenticator = display_ops.pick_authenticator(
config, args.authenticator, plugins)
if authenticator is None:
@ -183,16 +211,17 @@ def auth(args, config, plugins):
else:
installer = None
# TODO: Handle errors from _init_acme?
acme = _init_acme(config, acc, authenticator, installer)
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(args, config, authenticator, installer)
if args.csr is not None:
certr, chain = acme.obtain_certificate_from_csr(le_util.CSR(
certr, chain = le_client.obtain_certificate_from_csr(le_util.CSR(
file=args.csr[0], data=args.csr[1], form="der"))
acme.save_certificate(certr, chain, args.cert_path, args.chain_path)
le_client.save_certificate(
certr, chain, args.cert_path, args.chain_path)
else:
domains = _find_domains(args, installer)
if not acme.obtain_and_enroll_certificate(
if not le_client.obtain_and_enroll_certificate(
domains, authenticator, installer, plugins):
return "Certificate could not be obtained"
@ -200,18 +229,16 @@ def auth(args, config, plugins):
def install(args, config, plugins):
"""Install a previously obtained cert in a server."""
# XXX: Update for renewer/RenewableCert
acc = _account_init(args, config)
if acc is None:
return None
installer = display_ops.pick_installer(config, args.installer, plugins)
if installer is None:
return "Installer could not be determined"
domains = _find_domains(args, installer)
acme = _init_acme(config, acc, authenticator=None, installer=installer)
le_client = _init_le_client(
args, config, authenticator=None, installer=installer)
assert args.cert_path is not None # required=True in the subparser
acme.deploy_certificate(domains, args.key_path, args.cert_path, args.chain_path)
acme.enhance_config(domains, args.redirect)
le_client.deploy_certificate(
domains, args.key_path, args.cert_path, args.chain_path)
le_client.enhance_config(domains, args.redirect)
def revoke(args, unused_config, unused_plugins):
@ -468,8 +495,14 @@ def create_parser(plugins, args):
"automation", "--no-confirm", dest="no_confirm", action="store_true",
help="Turn off confirmation screens, currently used for --revoke")
helpful.add(
"automation", "--agree-eula", "-e", dest="tos", action="store_true",
"automation", "--agree-eula", dest="eula", action="store_true",
help="Agree to the Let's Encrypt Developer Preview EULA")
helpful.add(
"automation", "--agree-tos", dest="tos", action="store_true",
help="Agree to the Let's Encrypt Subscriber Agreement")
helpful.add(
"automation", "--account", metavar="ACCOUNT_ID",
help="Account ID to use")
helpful.add_group(
"testing", description="The following flags are meant for "
@ -724,6 +757,13 @@ def main(cli_args=sys.argv[1:]):
zope.component.provideUtility(report)
atexit.register(report.atexit_print_messages)
# TODO: remove developer EULA prompt for the launch
if not config.eula:
eula = pkg_resources.resource_string("letsencrypt", "EULA")
if not zope.component.getUtility(interfaces.IDisplay).yesno(
eula, "Agree", "Cancel"):
raise errors.Error("Must agree to TOS")
if not os.geteuid() == 0:
logger.warning(
"Root (sudo) is required to run most of letsencrypt functionality.")

View file

@ -1,13 +1,15 @@
"""ACME protocol client class and helper functions."""
"""Let's Encrypt client API."""
import logging
import os
import pkg_resources
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
import OpenSSL
import zope.component
from acme import client as acme_client
from acme import jose
from acme.jose import jwk
from acme import messages
from letsencrypt import account
from letsencrypt import auth_handler
@ -18,7 +20,6 @@ from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt import network
from letsencrypt import reverter
from letsencrypt import revoker
from letsencrypt import storage
@ -30,48 +31,107 @@ from letsencrypt.display import enhancements
logger = logging.getLogger(__name__)
def _acme_from_config_key(config, key):
# TODO: Allow for other alg types besides RS256
return acme_client.Client(new_reg_uri=config.server, key=key,
verify_ssl=(not config.no_verify_ssl))
def register(config, account_storage, tos_cb=None):
"""Register new account with an ACME CA.
This function takes care of generating fresh private key,
registering the account, optionally accepting CA Terms of Service
and finally saving the account. It should be called prior to
initialization of `Client`, unless account has already been created.
:param .IConfig config: Client configuration.
:param .AccountStorage account_storage: Account storage where newly
registered account will be saved to. Save happens only after TOS
acceptance step, so any account private keys or
`.RegistrationResource` will not be persisted if `tos_cb`
returns ``False``.
:param tos_cb: If ACME CA requires the user to accept a Terms of
Service before registering account, client action is
necessary. For example, a CLI tool would prompt the user
acceptance. `tos_cb` must be a callable that should accept
`.RegistrationResource` and return a `bool`: ``True`` iff the
Terms of Service present in the contained
`.Registration.terms_of_service` is accepted by the client, and
``False`` otherwise. ``tos_cb`` will be called only if the
client acction is necessary, i.e. when ``terms_of_service is not
None``. This argument is optional, if not supplied it will
default to automatic acceptance!
:raises letsencrypt.errors.Error: In case of any client problems, in
particular registration failure, or unaccepted Terms of Service.
:raises acme.errors.Error: In case of any protocol problems.
:returns: Newly registered and saved account, as well as protocol
API handle (should be used in `Client` initialization).
:rtype: `tuple` of `.Account` and `acme.client.Client`
"""
# Log non-standard actions, potentially wrong API calls
if account_storage.find_all():
logger.info("There are already existing accounts for %s", config.server)
if config.email is None:
logger.warn("Registering without email!")
# Each new registration shall use a fresh new key
key = jose.JWKRSA(key=jose.ComparableRSAKey(
rsa.generate_private_key(
public_exponent=65537,
key_size=config.rsa_key_size,
backend=default_backend())))
acme = _acme_from_config_key(config, key)
# TODO: add phone?
regr = acme.register(messages.Registration.from_data(email=config.email))
if regr.terms_of_service is not None:
if tos_cb is not None and not tos_cb(regr):
raise errors.Error(
"Registration cannot proceed without accepting "
"Terms of Service.")
regr = acme.agree_to_tos(regr)
acc = account.Account(regr, key)
account.report_new_account(acc, config)
account_storage.save(acc)
return acc, acme
class Client(object):
"""ACME protocol client.
:ivar network: Network object for sending and receiving messages
:type network: :class:`letsencrypt.network.Network`
:ivar account: Account object used for registration
:type account: :class:`letsencrypt.account.Account`
:ivar auth_handler: Object that supports the IAuthenticator interface.
auth_handler contains both a dv_authenticator and a
continuity_authenticator
:type auth_handler: :class:`letsencrypt.auth_handler.AuthHandler`
:ivar installer: Object supporting the IInstaller interface.
:type installer: :class:`letsencrypt.interfaces.IInstaller`
:ivar config: Configuration.
:type config: :class:`~letsencrypt.interfaces.IConfig`
:ivar .IConfig config: Client configuration.
:ivar .Account account: Account registered with `register`.
:ivar .AuthHandler auth_handler: Authorizations handler that will
dispatch DV and Continuity challenges to appropriate
authenticators (providing `.IAuthenticator` interface).
:ivar .IInstaller installer: Installer.
:ivar acme.client.Client acme: Optional ACME client API handle.
You might already have one from `register`.
"""
def __init__(self, config, account_, dv_auth, installer):
def __init__(self, config, account_, dv_auth, installer, acme=None):
"""Initialize a client.
:param dv_auth: IAuthenticator that can solve the
:const:`letsencrypt.constants.DV_CHALLENGES`.
The :meth:`~letsencrypt.interfaces.IAuthenticator.prepare`
must have already been run.
:type dv_auth: :class:`letsencrypt.interfaces.IAuthenticator`
:param .IAuthenticator dv_auth: Prepared (`.IAuthenticator.prepare`)
authenticator that can solve the `.constants.DV_CHALLENGES`.
"""
self.config = config
self.account = account_
self.installer = installer
# TODO: Allow for other alg types besides RS256
self.network = network.Network(
config.server, jwk.JWKRSA.load(self.account.key.pem),
verify_ssl=(not config.no_verify_ssl))
self.config = config
# Initialize ACME if account is provided
if acme is None and self.account is not None:
acme = _acme_from_config_key(config, self.account.key)
self.acme = acme
# TODO: Check if self.config.enroll_autorenew is None. If
# so, set it based to the default: figure out if dv_auth is
@ -81,53 +141,10 @@ class Client(object):
cont_auth = continuity_auth.ContinuityAuthenticator(config,
installer)
self.auth_handler = auth_handler.AuthHandler(
dv_auth, cont_auth, self.network, self.account)
dv_auth, cont_auth, self.acme, self.account)
else:
self.auth_handler = None
def register(self):
"""New Registration with the ACME server."""
self.account = self.network.register_from_account(self.account)
if self.account.terms_of_service is not None:
if not self.config.tos:
# TODO: Replace with self.account.terms_of_service
eula = pkg_resources.resource_string("letsencrypt", "EULA")
agree = zope.component.getUtility(interfaces.IDisplay).yesno(
eula, "Agree", "Cancel")
else:
agree = True
if agree:
self.account.regr = self.network.agree_to_tos(self.account.regr)
else:
# What is the proper response here...
raise errors.Error("Must agree to TOS")
self.account.save()
self._report_new_account()
def _report_new_account(self):
"""Informs the user about their new Let's Encrypt account."""
reporter = zope.component.getUtility(interfaces.IReporter)
reporter.add_message(
"Your account credentials have been saved in your Let's Encrypt "
"configuration directory at {0}. You should make a secure backup "
"of this folder now. This configuration directory will also "
"contain certificates and private keys obtained by Let's Encrypt "
"so making regular backups of this folder is ideal.".format(
self.config.config_dir),
reporter.MEDIUM_PRIORITY, True)
assert self.account.recovery_token is not None
recovery_msg = ("If you lose your account credentials, you can recover "
"them using the token \"{0}\". You must write that down "
"and put it in a safe place.".format(
self.account.recovery_token))
if self.account.email is not None:
recovery_msg += (" Another recovery method will be e-mails sent to "
"{0}.".format(self.account.email))
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
def _obtain_certificate(self, domains, csr):
"""Obtain certificate.
@ -155,11 +172,11 @@ class Client(object):
logger.debug("CSR: %s, domains: %s", csr, domains)
authzr = self.auth_handler.get_authorizations(domains)
certr = self.network.request_issuance(
certr = self.acme.request_issuance(
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, csr.data)),
authzr)
return certr, self.network.fetch_chain(certr)
return certr, self.acme.fetch_chain(certr)
def obtain_certificate_from_csr(self, csr):
"""Obtain certficiate from CSR.
@ -451,28 +468,6 @@ def validate_key_csr(privkey, csr=None):
raise errors.Error("The key and CSR do not match")
def determine_account(config):
"""Determine which account to use.
Will create an account if necessary.
:param config: Configuration object
:type config: :class:`letsencrypt.interfaces.IConfig`
:returns: Account
:rtype: :class:`letsencrypt.account.Account`
"""
accounts = account.Account.get_accounts(config)
if len(accounts) == 1:
return accounts[0]
elif len(accounts) > 1:
return display_ops.choose_account(accounts)
return account.Account.from_prompts(config)
def rollback(default_installer, checkpoints, config, plugins):
"""Revert configuration the specified number of checkpoints.

View file

@ -18,7 +18,6 @@ class NamespaceConfig(object):
paths defined in :py:mod:`letsencrypt.constants`:
- `accounts_dir`
- `account_keys_dir`
- `cert_dir`
- `cert_key_backup`
- `in_progress_dir`
@ -51,10 +50,6 @@ class NamespaceConfig(object):
return os.path.join(
self.namespace.config_dir, constants.ACCOUNTS_DIR, self.server_path)
@property
def account_keys_dir(self): #pylint: disable=missing-docstring
return os.path.join(self.accounts_dir, constants.ACCOUNT_KEYS_DIR)
@property
def backup_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.work_dir, constants.BACKUP_DIR)

View file

@ -65,9 +65,6 @@ CONFIG_DIRS_MODE = 0o755
ACCOUNTS_DIR = "accounts"
"""Directory where all accounts are saved."""
ACCOUNT_KEYS_DIR = "keys"
"""Directory where account keys are saved. Relative to `ACCOUNTS_DIR`."""
BACKUP_DIR = "backups"
"""Directory (relative to `IConfig.work_dir`) where backups are kept."""

View file

@ -8,8 +8,11 @@ import datetime
import logging
import os
from cryptography.hazmat.primitives import serialization
import OpenSSL
from acme import jose
from letsencrypt import errors
from letsencrypt import le_util
@ -212,15 +215,21 @@ def pyopenssl_load_certificate(data):
return _pyopenssl_load(data, OpenSSL.crypto.load_certificate)
def make_ss_cert(key_str, domains, not_before=None,
def make_ss_cert(key, domains, not_before=None,
validity=(7 * 24 * 60 * 60)):
"""Returns new self-signed cert in PEM form.
Uses key_str and contains all domains.
Uses key and contains all domains.
"""
if isinstance(key, jose.JWK):
key = key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
assert domains, "Must provide one or more hostnames for the cert."
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_str)
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
cert = OpenSSL.crypto.X509()
cert.set_serial_number(1337)
cert.set_version(2)

View file

@ -5,6 +5,7 @@ import os
import zope.component
from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt.display import util as display_util
@ -112,6 +113,24 @@ def pick_configurator(
(interfaces.IAuthenticator, interfaces.IInstaller))
def get_email():
"""Prompt for valid email address.
:returns: Email or ``None`` if cancelled by user.
:rtype: str
"""
while True:
code, email = zope.component.getUtility(interfaces.IDisplay).input(
"Enter email address")
if code == display_util.OK:
if le_util.safe_email(email):
return email
else:
return None
def choose_account(accounts):
"""Choose an account.
@ -120,11 +139,7 @@ def choose_account(accounts):
"""
# Note this will get more complicated once we start recording authorizations
labels = [
"%s | %s" % (acc.email.ljust(display_util.WIDTH - 39),
acc.phone if acc.phone is not None else "")
for acc in accounts
]
labels = [acc.slug for acc in accounts]
code, index = util(interfaces.IDisplay).menu(
"Please choose an account", labels)

View file

@ -5,6 +5,14 @@ class Error(Exception):
"""Generic Let's Encrypt client error."""
class AccountStorageError(Error):
"""Generic `.AccountStorage` error."""
class AccountNotFound(AccountStorageError):
"""Account not found error."""
class ReverterError(Error):
"""Let's Encrypt Reverter error."""

View file

@ -1,10 +1,46 @@
"""Let's Encrypt client interfaces."""
import abc
import zope.interface
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
# pylint: disable=too-few-public-methods
class AccountStorage(object):
"""Accounts storage interface."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def find_all(self): # pragma: no cover
"""Find all accounts.
:returns: All found accounts.
:rtype: list
"""
raise NotImplementedError()
@abc.abstractmethod
def load(self, account_id): # pragma: no cover
"""Load an account by its id.
:raises .AccountNotFound: if account could not be found
:raises .AccountStorageError: if account could not be loaded
"""
raise NotImplementedError()
@abc.abstractmethod
def save(self, account): # pragma: no cover
"""Save account.
:raises .AccountStorageError: if account could not be saved
"""
raise NotImplementedError()
class IPluginFactory(zope.interface.Interface):
"""IPlugin factory.
@ -160,8 +196,6 @@ class IConfig(zope.interface.Interface):
accounts_dir = zope.interface.Attribute(
"Directory where all account information is stored.")
account_keys_dir = zope.interface.Attribute(
"Directory where all account keys are stored.")
backup_dir = zope.interface.Attribute("Configuration backups directory.")
cert_dir = zope.interface.Attribute(
"Directory where newly generated Certificate Signing Requests "

View file

@ -1,12 +1,17 @@
"""Utilities for all Let's Encrypt."""
import collections
import errno
import logging
import os
import re
import stat
from letsencrypt import errors
logger = logging.getLogger(__name__)
Key = collections.namedtuple("Key", "file pem")
# Note: form is the type of data, "pem" or "der"
CSR = collections.namedtuple("CSR", "file data form")
@ -53,16 +58,30 @@ def check_permissions(filepath, mode, uid=0):
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
def _safely_attempt_open(fname, mode):
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
return os.fdopen(file_d, "w"), fname
def safe_open(path, mode="w", chmod=None, buffering=None):
"""Safely open a file.
:param str path: Path to a file.
:param str mode: Same os `mode` for `open`.
:param int chmod: Same as `mode` for `os.open`, uses Python defaults
if ``None``.
:param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
def _unique_file(path, filename_pat, count, mode):
while True:
current_path = os.path.join(path, filename_pat(count))
try:
return _safely_attempt_open(
os.path.join(path, filename_pat(count)), mode)
return safe_open(current_path, chmod=mode), current_path
except OSError as err:
# "File exists," is okay, try a different name.
if err.errno != errno.EEXIST:
@ -100,9 +119,9 @@ def unique_lineage_name(path, filename, mode=0o777):
specified location.
"""
preferred_path = os.path.join(path, "%s.conf" % (filename))
try:
return _safely_attempt_open(
os.path.join(path, "%s.conf" % (filename)), mode=mode)
return safe_open(preferred_path, chmod=mode), preferred_path
except OSError as err:
if err.errno != errno.EEXIST:
raise
@ -118,3 +137,16 @@ def safely_remove(path):
except OSError as err:
if err.errno != errno.ENOENT:
raise
# Just make sure we don't get pwned... Make sure that it also doesn't
# start with a period or have two consecutive periods <- this needs to
# be done in addition to the regex
EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$")
def safe_email(email):
"""Scrub email address before using it."""
if EMAIL_REGEX.match(email) is not None:
return not email.startswith(".") and ".." not in email
else:
logger.warn("Invalid email address: %s.", email)
return False

View file

@ -1,23 +0,0 @@
"""Networking for ACME protocol."""
from acme import client
from acme import messages
class Network(client.Client):
"""ACME networking."""
def register_from_account(self, account):
"""Register with server.
.. todo:: this should probably not be a part of network...
:param account: Account
:type account: :class:`letsencrypt.account.Account`
:returns: Updated account
:rtype: :class:`letsencrypt.account.Account`
"""
account.regr = self.register(messages.Registration.from_data(
email=account.email, phone=account.phone))
return account

View file

@ -4,12 +4,14 @@ import pkg_resources
import shutil
import tempfile
from cryptography.hazmat.primitives import serialization
import zope.interface
from acme.jose import util as jose_util
from letsencrypt import constants
from letsencrypt import interfaces
from letsencrypt import le_util
def option_namespace(name):
@ -144,7 +146,7 @@ class Dvsni(object):
if idx is not None:
self.indices.append(idx)
def get_cert_file(self, achall):
def get_cert_path(self, achall):
"""Returns standardized name for challenge certificate.
:param achall: Annotated DVSNI challenge.
@ -157,19 +159,34 @@ class Dvsni(object):
return os.path.join(
self.configurator.config.work_dir, achall.nonce_domain + ".crt")
def get_key_path(self, achall):
"""Get standardized path to challenge key."""
return os.path.join(
self.configurator.config.work_dir, achall.nonce_domain + '.pem')
def _setup_challenge_cert(self, achall, s=None):
# pylint: disable=invalid-name
"""Generate and write out challenge certificate."""
cert_path = self.get_cert_file(achall)
cert_path = self.get_cert_path(achall)
key_path = self.get_key_path(achall)
# Register the path before you write out the file
self.configurator.reverter.register_file_creation(True, key_path)
self.configurator.reverter.register_file_creation(True, cert_path)
cert_pem, response = achall.gen_cert_and_response(s)
# Write out challenge cert
with open(cert_path, "w") as cert_chall_fd:
with open(cert_path, "wb") as cert_chall_fd:
cert_chall_fd.write(cert_pem)
# Write out challenge key
key_pem = achall.key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
with le_util.safe_open(key_path, 'wb', chmod=0o400) as key_file:
key_file.write(key_pem)
return response

View file

@ -150,22 +150,28 @@ class DvsniTest(unittest.TestCase):
# open context managers more elegantly. It avoids dealing with
# __enter__ and __exit__ calls.
# http://www.voidspace.org.uk/python/mock/helpers.html#mock.mock_open
m_open = mock.mock_open()
mock_open, mock_safe_open = mock.mock_open(), mock.mock_open()
response = challenges.DVSNIResponse(s="randomS1")
achall = mock.MagicMock(nonce=self.achalls[0].nonce,
nonce_domain=self.achalls[0].nonce_domain)
achall.gen_cert_and_response.return_value = ("pem", response)
with mock.patch("letsencrypt.plugins.common.open", m_open, create=True):
# pylint: disable=protected-access
self.assertEqual(response, self.sni._setup_challenge_cert(
achall, "randomS1"))
with mock.patch("letsencrypt.plugins.common.open",
mock_open, create=True):
with mock.patch("letsencrypt.plugins.common.le_util.safe_open",
mock_safe_open):
# pylint: disable=protected-access
self.assertEqual(response, self.sni._setup_challenge_cert(
achall, "randomS1"))
self.assertTrue(m_open.called)
self.assertEqual(
m_open.call_args[0], (self.sni.get_cert_file(achall), "w"))
self.assertEqual(m_open().write.call_args[0][0], "pem")
# pylint: disable=no-member
mock_open.assert_called_once_with(self.sni.get_cert_path(achall), "wb")
mock_open.return_value.write.assert_called_once_with("pem")
mock_safe_open.assert_called_once_with(
self.sni.get_key_path(achall), "wb", chmod=0o400)
mock_safe_open.return_value.write.assert_called_once_with(
achall.key.key.private_bytes())
if __name__ == "__main__":

View file

@ -41,7 +41,7 @@ class ManualAuthenticatorTest(unittest.TestCase):
resp = challenges.SimpleHTTPResponse(tls=False, path='Zm9v')
self.assertEqual([resp], self.auth.perform(self.achalls))
mock_raw_input.assert_called_once()
self.assertEqual(1, mock_raw_input.call_count)
mock_verify.assert_called_with(self.achalls[0].challb, "foo.com", 4430)
message = mock_stdout.write.mock_calls[0][1][0]

View file

@ -6,6 +6,7 @@ import socket
import sys
import time
from cryptography.hazmat.primitives import serialization
import OpenSSL
import zope.component
import zope.interface
@ -214,7 +215,10 @@ class StandaloneAuthenticator(common.Plugin):
# Signal that we've successfully bound TCP port
os.kill(self.parent_pid, signal.SIGIO)
self.private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key.pem)
OpenSSL.crypto.FILETYPE_PEM, key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()))
while True:
self.connection, _ = self.sock.accept()

View file

@ -6,21 +6,27 @@ import signal
import socket
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
from acme import challenges
from acme import jose
from letsencrypt import achallenges
from letsencrypt import le_util
from letsencrypt.tests import acme_util
KEY = le_util.Key("foo", pkg_resources.resource_string(
"acme.jose", os.path.join("testdata", "rsa512_key.pem")))
KEY_PATH = pkg_resources.resource_filename(
"acme.jose", os.path.join("testdata", "rsa512_key.pem"))
KEY_DATA = pkg_resources.resource_string(
"acme.jose", os.path.join("testdata", "rsa512_key.pem"))
KEY = jose.JWKRSA(key=jose.ComparableRSAKey(serialization.load_pem_private_key(
KEY_DATA, password=None, backend=default_backend())))
PRIVATE_KEY = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, KEY.pem)
OpenSSL.crypto.FILETYPE_PEM, KEY_DATA)
CONFIG = mock.Mock(dvsni_port=5001)

View file

@ -15,6 +15,7 @@ import configobj
import OpenSSL
import zope.component
from letsencrypt import account
from letsencrypt import configuration
from letsencrypt import cli
from letsencrypt import client
@ -76,15 +77,13 @@ def renew(cert, old_version):
authenticator = authenticator.init(config)
authenticator.prepare()
account = client.determine_account(config)
# TODO: are there other ways to get the right account object, e.g.
# based on the email parameter that might be present in
# renewalparams?
acc = account.AccountFileStorage(config).load(
account_id=renewalparams["account"])
our_client = client.Client(config, account, authenticator, None)
le_client = client.Client(config, acc, authenticator, None)
with open(cert.version("cert", old_version)) as f:
sans = crypto_util.get_sans_from_cert(f.read())
new_certr, new_chain, new_key, _ = our_client.obtain_certificate(sans)
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(sans)
if new_chain is not None:
# XXX: Assumes that there was no key change. We need logic
# for figuring out whether there was or not. Probably

View file

@ -15,12 +15,12 @@ import tempfile
import OpenSSL
from acme import client as acme_client
from acme.jose import util as jose_util
from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt import network
from letsencrypt.display import util as display_util
from letsencrypt.display import revocation
@ -34,8 +34,7 @@ class Revoker(object):
.. todo:: Add a method to specify your own certificate for revocation - CLI
:ivar network: Network object
:type network: :class:`letsencrypt.network`
:ivar .acme.client.Client acme: ACME client
:ivar installer: Installer object
:type installer: :class:`~letsencrypt.interfaces.IInstaller`
@ -48,7 +47,7 @@ class Revoker(object):
"""
def __init__(self, installer, config, no_confirm=False):
# XXX
self.network = network.Network(new_reg_uri=None, key=None, alg=None)
self.acme = acme_client.Client(new_reg_uri=None, key=None, alg=None)
self.installer = installer
self.config = config
@ -263,7 +262,7 @@ class Revoker(object):
raise errors.RevokerError(
"Corrupted backup key file: %s" % cert.backup_key_path)
return self.network.revoke(cert=None) # XXX
return self.acme.revoke(cert=None) # XXX
def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use
"""Remove certificate and key.

View file

@ -1,208 +1,186 @@
"""Tests for letsencrypt.account."""
import logging
import mock
import datetime
import os
import pkg_resources
import shutil
import stat
import tempfile
import unittest
import mock
import pytz
from acme import jose
from acme import messages
from letsencrypt import configuration
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt.display import util as display_util
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
__name__, os.path.join("testdata", "rsa512_key.pem")))
class AccountTest(unittest.TestCase):
"""Tests letsencrypt.account.Account."""
"""Tests for letsencrypt.account.Account."""
def setUp(self):
from letsencrypt.account import Account
self.regr = mock.MagicMock()
self.meta = Account.Meta(
creation_host="test.letsencrypt.org",
creation_dt=datetime.datetime(
2015, 7, 4, 14, 4, 10, tzinfo=pytz.UTC))
self.acc = Account(self.regr, KEY, self.meta)
logging.disable(logging.CRITICAL)
with mock.patch("letsencrypt.account.socket") as mock_socket:
mock_socket.getfqdn.return_value = "test.letsencrypt.org"
with mock.patch("letsencrypt.account.datetime") as mock_dt:
mock_dt.datetime.now.return_value = self.meta.creation_dt
self.acc_no_meta = Account(self.regr, KEY)
self.accounts_dir = tempfile.mkdtemp("accounts")
self.account_keys_dir = os.path.join(self.accounts_dir, "keys")
os.makedirs(self.account_keys_dir, 0o700)
def test_init(self):
self.assertEqual(self.regr, self.acc.regr)
self.assertEqual(KEY, self.acc.key)
self.assertEqual(self.meta, self.acc_no_meta.meta)
def test_id(self):
self.assertEqual(
self.acc.id, "2ba35a3bdf380ed76a5ac9e740568395")
def test_slug(self):
self.assertEqual(
self.acc.slug, "test.letsencrypt.org@2015-07-04T14:04:10Z (2ba3)")
def test_repr(self):
self.assertEqual(
repr(self.acc),
"<Account(2ba35a3bdf380ed76a5ac9e740568395)>")
class ReportNewAccountTest(unittest.TestCase):
"""Tests for letsencrypt.account.report_new_account."""
def setUp(self):
self.config = mock.MagicMock(config_dir='/etc/letsencrypt')
reg = messages.Registration.from_data(email="rhino@jungle.io")
reg = reg.update(recovery_token="ECCENTRIC INVISIBILITY RHINOCEROS")
self.acc = mock.MagicMock(regr=messages.RegistrationResource(
uri=None, new_authzr_uri=None, body=reg))
def _call(self):
from letsencrypt.account import report_new_account
report_new_account(self.acc, self.config)
@mock.patch("letsencrypt.client.zope.component.queryUtility")
def test_no_reporter(self, mock_zope):
mock_zope.return_value = None
self._call()
@mock.patch("letsencrypt.client.zope.component.queryUtility")
def test_it(self, mock_zope):
self._call()
call_list = mock_zope().add_message.call_args_list
self.assertTrue(self.config.config_dir in call_list[0][0][0])
self.assertTrue(self.acc.regr.body.recovery_token in call_list[1][0][0])
self.assertTrue(
", ".join(self.acc.regr.body.emails) in call_list[1][0][0])
class AccountMemoryStorageTest(unittest.TestCase):
"""Tests for letsencrypt.account.AccountMemoryStorage."""
def setUp(self):
from letsencrypt.account import AccountMemoryStorage
self.storage = AccountMemoryStorage()
def test_it(self):
account = mock.Mock(id="x")
self.assertEqual([], self.storage.find_all())
self.assertRaises(errors.AccountNotFound, self.storage.load, "x")
self.storage.save(account)
self.assertEqual([account], self.storage.find_all())
self.assertEqual(account, self.storage.load("x"))
self.storage.save(account)
self.assertEqual([account], self.storage.find_all())
class AccountFileStorageTest(unittest.TestCase):
"""Tests for letsencrypt.account.AccountFileStorage."""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.config = mock.MagicMock(
spec=configuration.NamespaceConfig, accounts_dir=self.accounts_dir,
account_keys_dir=self.account_keys_dir, rsa_key_size=2048,
server="letsencrypt-demo.org")
accounts_dir=os.path.join(self.tmp, "accounts"))
from letsencrypt.account import AccountFileStorage
self.storage = AccountFileStorage(self.config)
key_file = pkg_resources.resource_filename(
"acme.jose", os.path.join("testdata", "rsa512_key.pem"))
key_pem = pkg_resources.resource_string(
"acme.jose", os.path.join("testdata", "rsa512_key.pem"))
self.key = le_util.Key(key_file, key_pem)
self.email = "client@letsencrypt.org"
self.regr = messages.RegistrationResource(
uri="uri",
new_authzr_uri="new_authzr_uri",
terms_of_service="terms_of_service",
body=messages.Registration(
recovery_token="recovery_token", agreement="agreement")
)
self.test_account = Account(
self.config, self.key, self.email, None, self.regr)
from letsencrypt.account import Account
self.acc = Account(
regr=messages.RegistrationResource(
uri=None, new_authzr_uri=None, body=messages.Registration()),
key=KEY)
def tearDown(self):
shutil.rmtree(self.accounts_dir)
logging.disable(logging.NOTSET)
shutil.rmtree(self.tmp)
@mock.patch("letsencrypt.account.zope.component.getUtility")
@mock.patch("letsencrypt.account.crypto_util.init_save_key")
def test_prompts(self, mock_key, mock_util):
from letsencrypt.account import Account
def test_init_creates_dir(self):
self.assertTrue(os.path.isdir(self.config.accounts_dir))
mock_util().input.return_value = (display_util.OK, self.email)
mock_key.return_value = self.key
def test_save_and_restore(self):
self.storage.save(self.acc)
account_path = os.path.join(self.config.accounts_dir, self.acc.id)
self.assertTrue(os.path.exists(account_path))
for file_name in "regr.json", "meta.json", "private_key.json":
self.assertTrue(os.path.exists(
os.path.join(account_path, file_name)))
self.assertEqual("0400", oct(os.stat(os.path.join(
account_path, "private_key.json"))[stat.ST_MODE] & 0o777))
acc = Account.from_prompts(self.config)
self.assertEqual(acc.email, self.email)
self.assertEqual(acc.key, self.key)
self.assertEqual(acc.config, self.config)
# restore
self.assertEqual(self.acc, self.storage.load(self.acc.id))
@mock.patch("letsencrypt.account.zope.component.getUtility")
@mock.patch("letsencrypt.account.Account.from_email")
def test_prompts_bad_email(self, mock_from_email, mock_util):
from letsencrypt.account import Account
def test_find_all(self):
self.storage.save(self.acc)
self.assertEqual([self.acc], self.storage.find_all())
mock_from_email.side_effect = (errors.Error, "acc")
mock_util().input.return_value = (display_util.OK, self.email)
def test_find_all_none_empty_list(self):
self.assertEqual([], self.storage.find_all())
self.assertEqual(Account.from_prompts(self.config), "acc")
def test_find_all_accounts_dir_absent(self):
os.rmdir(self.config.accounts_dir)
self.assertEqual([], self.storage.find_all())
def test_find_all_load_skips(self):
self.storage.load = mock.MagicMock(
side_effect=["x", errors.AccountStorageError, "z"])
with mock.patch("letsencrypt.account.os.listdir") as mock_listdir:
mock_listdir.return_value = ["x", "y", "z"]
self.assertEqual(["x", "z"], self.storage.find_all())
@mock.patch("letsencrypt.account.zope.component.getUtility")
@mock.patch("letsencrypt.account.crypto_util.init_save_key")
def test_prompts_empty_email(self, mock_key, mock_util):
from letsencrypt.account import Account
def test_load_non_existent_raises_error(self):
self.assertRaises(errors.AccountNotFound, self.storage.load, "missing")
mock_util().input.return_value = (display_util.OK, "")
acc = Account.from_prompts(self.config)
self.assertTrue(acc.email is None)
# _get_config_filename | pylint: disable=protected-access
mock_key.assert_called_once_with(
mock.ANY, mock.ANY, acc._get_config_filename(None))
def test_load_id_mismatch_raises_error(self):
self.storage.save(self.acc)
shutil.move(os.path.join(self.config.accounts_dir, self.acc.id),
os.path.join(self.config.accounts_dir, "x" + self.acc.id))
self.assertRaises(errors.AccountStorageError, self.storage.load,
"x" + self.acc.id)
@mock.patch("letsencrypt.account.zope.component.getUtility")
def test_prompts_cancel(self, mock_util):
from letsencrypt.account import Account
def test_load_ioerror(self):
self.storage.save(self.acc)
mock_open = mock.mock_open()
mock_open.side_effect = IOError
with mock.patch("__builtin__.open", mock_open):
self.assertRaises(
errors.AccountStorageError, self.storage.load, self.acc.id)
mock_util().input.return_value = (display_util.CANCEL, "")
self.assertTrue(Account.from_prompts(self.config) is None)
def test_from_email(self):
from letsencrypt.account import Account
self.assertRaises(
errors.Error, Account.from_email, self.config, "not_valid...email")
def test_save_from_existing_account(self):
from letsencrypt.account import Account
self.test_account.save()
acc = Account.from_existing_account(self.config, self.email)
self.assertEqual(acc.key, self.test_account.key)
self.assertEqual(acc.email, self.test_account.email)
self.assertEqual(acc.phone, self.test_account.phone)
self.assertEqual(acc.regr, self.test_account.regr)
def test_properties(self):
self.assertEqual(self.test_account.uri, "uri")
self.assertEqual(self.test_account.new_authzr_uri, "new_authzr_uri")
self.assertEqual(self.test_account.terms_of_service, "terms_of_service")
self.assertEqual(self.test_account.recovery_token, "recovery_token")
def test_partial_properties(self):
from letsencrypt.account import Account
partial = Account(self.config, self.key)
self.assertTrue(partial.uri is None)
self.assertTrue(partial.new_authzr_uri is None)
self.assertTrue(partial.terms_of_service is None)
self.assertTrue(partial.recovery_token is None)
def test_partial_account_default(self):
from letsencrypt.account import Account
partial = Account(self.config, self.key)
partial.save()
acc = Account.from_existing_account(self.config)
self.assertEqual(partial.key, acc.key)
self.assertEqual(partial.email, acc.email)
self.assertEqual(partial.phone, acc.phone)
self.assertEqual(partial.regr, acc.regr)
def test_get_accounts(self):
from letsencrypt.account import Account
accs = Account.get_accounts(self.config)
self.assertFalse(accs)
self.test_account.save()
accs = Account.get_accounts(self.config)
self.assertEqual(len(accs), 1)
self.assertEqual(accs[0].email, self.test_account.email)
acc2 = Account(self.config, self.key, "testing_email@gmail.com")
acc2.save()
accs = Account.get_accounts(self.config)
self.assertEqual(len(accs), 2)
def test_get_accounts_no_accounts(self):
from letsencrypt.account import Account
self.assertEqual(Account.get_accounts(
mock.Mock(accounts_dir="non-existant")), [])
def test_failed_existing_account(self):
from letsencrypt.account import Account
self.assertRaises(errors.Error, Account.from_existing_account,
self.config, "non-existant@email.org")
class SafeEmailTest(unittest.TestCase):
"""Test safe_email."""
def setUp(self):
logging.disable(logging.CRITICAL)
def tearDown(self):
logging.disable(logging.NOTSET)
@classmethod
def _call(cls, addr):
from letsencrypt.account import Account
return Account.safe_email(addr)
def test_valid_emails(self):
addrs = [
"letsencrypt@letsencrypt.org",
"tbd.ade@gmail.com",
"abc_def.jdk@hotmail.museum",
]
for addr in addrs:
self.assertTrue(self._call(addr), "%s failed." % addr)
def test_invalid_emails(self):
addrs = [
"letsencrypt@letsencrypt..org",
".tbd.ade@gmail.com",
"~/abc_def.jdk@hotmail.museum",
]
for addr in addrs:
self.assertFalse(self._call(addr), "%s failed." % addr)
def test_save_ioerrors(self):
mock_open = mock.mock_open()
mock_open.side_effect = IOError # TODO: [None, None, IOError]
with mock.patch("__builtin__.open", mock_open):
self.assertRaises(
errors.AccountStorageError, self.storage.save, self.acc)
if __name__ == "__main__":

View file

@ -3,12 +3,14 @@ import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import OpenSSL
from acme import challenges
from acme import jose
from letsencrypt import crypto_util
from letsencrypt import le_util
from letsencrypt.tests import acme_util
@ -19,8 +21,11 @@ class DVSNITest(unittest.TestCase):
self.chall = acme_util.chall_to_challb(
challenges.DVSNI(r="r_value", nonce="12345ABCDE"), "pending")
self.response = challenges.DVSNIResponse()
key = le_util.Key("path", pkg_resources.resource_string(
"acme.jose", os.path.join("testdata", "rsa512_key.pem")))
key = jose.JWKRSA(key=jose.ComparableRSAKey(
serialization.load_pem_private_key(
pkg_resources.resource_string(
"acme.jose", os.path.join("testdata", "rsa512_key.pem")),
password=None, backend=default_backend())))
from letsencrypt.achallenges import DVSNI
self.achall = DVSNI(challb=self.chall, domain="example.com", key=key)

View file

@ -6,11 +6,11 @@ import unittest
import mock
from acme import challenges
from acme import client as acme_client
from acme import messages
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt import network
from letsencrypt.tests import acme_util
@ -86,7 +86,7 @@ class GetAuthorizationsTest(unittest.TestCase):
self.mock_dv_auth.perform.side_effect = gen_auth_resp
self.mock_account = mock.Mock(key=le_util.Key("file_path", "PEM"))
self.mock_net = mock.MagicMock(spec=network.Network)
self.mock_net = mock.MagicMock(spec=acme_client.Client)
self.handler = AuthHandler(
self.mock_dv_auth, self.mock_cont_auth,

View file

@ -8,6 +8,8 @@ import unittest
import mock
from letsencrypt import account
from letsencrypt import configuration
from letsencrypt import errors
@ -26,7 +28,8 @@ class CLITest(unittest.TestCase):
def _call(self, args):
from letsencrypt import cli
args = ['--text', '--config-dir', self.config_dir,
'--work-dir', self.work_dir, '--logs-dir', self.logs_dir] + args
'--work-dir', self.work_dir, '--logs-dir', self.logs_dir,
'--agree-eula'] + args
with mock.patch('letsencrypt.cli.sys.stdout') as stdout:
with mock.patch('letsencrypt.cli.sys.stderr') as stderr:
with mock.patch('letsencrypt.cli.client') as client:
@ -42,7 +45,7 @@ class CLITest(unittest.TestCase):
def test_rollback(self):
_, _, _, client = self._call(['rollback'])
client.rollback.assert_called_once()
self.assertEqual(1, client.rollback.call_count)
_, _, _, client = self._call(['rollback', '--checkpoints', '123'])
client.rollback.assert_called_once_with(
@ -50,7 +53,7 @@ class CLITest(unittest.TestCase):
def test_config_changes(self):
_, _, _, client = self._call(['config_changes'])
client.view_config_changes.assert_called_once()
self.assertEqual(1, client.view_config_changes.call_count)
def test_plugins(self):
flags = ['--init', '--prepare', '--authenticators', '--installers']
@ -96,5 +99,68 @@ class CLITest(unittest.TestCase):
traceback.format_exception_only(KeyboardInterrupt, interrupt)))
class DetermineAccountTest(unittest.TestCase):
"""Tests for letsencrypt.cli._determine_account."""
def setUp(self):
self.args = mock.MagicMock(account=None, email=None)
self.config = configuration.NamespaceConfig(self.args)
self.accs = [mock.MagicMock(id="x"), mock.MagicMock(id="y")]
self.account_storage = account.AccountMemoryStorage()
def _call(self):
# pylint: disable=protected-access
from letsencrypt.cli import _determine_account
with mock.patch("letsencrypt.cli.account.AccountFileStorage") as mock_storage:
mock_storage.return_value = self.account_storage
return _determine_account(self.args, self.config)
def test_args_account_set(self):
self.account_storage.save(self.accs[1])
self.args.account = self.accs[1].id
self.assertEqual((self.accs[1], None), self._call())
self.assertEqual(self.accs[1].id, self.args.account)
self.assertTrue(self.args.email is None)
def test_single_account(self):
self.account_storage.save(self.accs[0])
self.assertEqual((self.accs[0], None), self._call())
self.assertEqual(self.accs[0].id, self.args.account)
self.assertTrue(self.args.email is None)
@mock.patch("letsencrypt.client.display_ops.choose_account")
def test_multiple_accounts(self, mock_choose_accounts):
for acc in self.accs:
self.account_storage.save(acc)
mock_choose_accounts.return_value = self.accs[1]
self.assertEqual((self.accs[1], None), self._call())
self.assertEqual(
set(mock_choose_accounts.call_args[0][0]), set(self.accs))
self.assertEqual(self.accs[1].id, self.args.account)
self.assertTrue(self.args.email is None)
@mock.patch("letsencrypt.client.display_ops.get_email")
def test_no_accounts_no_email(self, mock_get_email):
mock_get_email.return_value = "foo@bar.baz"
with mock.patch("letsencrypt.cli.client") as client:
client.register.return_value = (
self.accs[0], mock.sentinel.acme)
self.assertEqual((self.accs[0], mock.sentinel.acme), self._call())
client.register.assert_called_once_with(
self.config, self.account_storage, tos_cb=mock.ANY)
self.assertEqual(self.accs[0].id, self.args.account)
self.assertEqual("foo@bar.baz", self.args.email)
def test_no_accounts_email(self):
self.args.email = "other email"
with mock.patch("letsencrypt.cli.client") as client:
client.register.return_value = (self.accs[1], mock.sentinel.acme)
self._call()
self.assertEqual(self.accs[1].id, self.args.account)
self.assertEqual("other email", self.args.email)
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -2,8 +2,6 @@
import os
import unittest
import pkg_resources
import shutil
import tempfile
import configobj
import OpenSSL
@ -13,6 +11,7 @@ from acme import jose
from letsencrypt import account
from letsencrypt import configuration
from letsencrypt import errors
from letsencrypt import le_util
@ -22,6 +21,38 @@ CSR_SAN = pkg_resources.resource_string(
__name__, os.path.join("testdata", "csr-san.der"))
class RegisterTest(unittest.TestCase):
"""Tests for letsencrypt.client.register."""
def setUp(self):
self.config = mock.MagicMock(rsa_key_size=1024)
self.account_storage = account.AccountMemoryStorage()
self.tos_cb = mock.MagicMock()
def _call(self):
from letsencrypt.client import register
return register(self.config, self.account_storage, self.tos_cb)
def test_no_tos(self):
with mock.patch("letsencrypt.client.acme_client.Client") as mock_client:
mock_client.register().terms_of_service = "http://tos"
with mock.patch("letsencrypt.account.report_new_account"):
self.tos_cb.return_value = False
self.assertRaises(errors.Error, self._call)
self.tos_cb.return_value = True
self._call()
self.tos_cb = None
self._call()
def test_it(self):
with mock.patch("letsencrypt.client.acme_client.Client"):
with mock.patch("letsencrypt.account."
"report_new_account"):
self._call()
class ClientTest(unittest.TestCase):
"""Tests for letsencrypt.client.Client."""
@ -32,29 +63,30 @@ class ClientTest(unittest.TestCase):
self.account = mock.MagicMock(**{"key.pem": KEY})
from letsencrypt.client import Client
with mock.patch("letsencrypt.client.network.Network") as network:
with mock.patch("letsencrypt.client.acme_client.Client") as acme:
self.acme_client = acme
self.acme = acme.return_value = mock.MagicMock()
self.client = Client(
config=self.config, account_=self.account,
dv_auth=None, installer=None)
self.network = network
def test_init_network_verify_ssl(self):
self.network.assert_called_once_with(
mock.ANY, mock.ANY, verify_ssl=True)
def test_init_acme_verify_ssl(self):
self.acme_client.assert_called_once_with(
new_reg_uri=mock.ANY, key=mock.ANY, verify_ssl=True)
def _mock_obtain_certificate(self):
self.client.auth_handler = mock.MagicMock()
self.network().request_issuance.return_value = mock.sentinel.certr
self.network().fetch_chain.return_value = mock.sentinel.chain
self.acme.request_issuance.return_value = mock.sentinel.certr
self.acme.fetch_chain.return_value = mock.sentinel.chain
def _check_obtain_certificate(self):
self.client.auth_handler.get_authorizations.assert_called_once_with(
["example.com", "www.example.com"])
self.network.request_issuance.assert_callend_once_with(
self.acme.request_issuance.assert_called_once_with(
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, CSR_SAN)),
self.client.auth_handler.get_authorizations())
self.network().fetch_chain.assert_called_once_with(mock.sentinel.certr)
self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr)
def test_obtain_certificate_from_csr(self):
self._mock_obtain_certificate()
@ -83,18 +115,6 @@ class ClientTest(unittest.TestCase):
mock.sentinel.key, domains, self.config.cert_dir)
self._check_obtain_certificate()
@mock.patch("letsencrypt.client.zope.component.getUtility")
def test_report_new_account(self, mock_zope):
# pylint: disable=protected-access
self.account.recovery_token = "ECCENTRIC INVISIBILITY RHINOCEROS"
self.account.email = "rhino@jungle.io"
self.client._report_new_account()
call_list = mock_zope().add_message.call_args_list
self.assertTrue(self.config.config_dir in call_list[0][0][0])
self.assertTrue(self.account.recovery_token in call_list[1][0][0])
self.assertTrue(self.account.email in call_list[1][0][0])
@mock.patch("letsencrypt.client.zope.component.getUtility")
def test_report_renewal_status(self, mock_zope):
# pylint: disable=protected-access
@ -128,50 +148,6 @@ class ClientTest(unittest.TestCase):
self.assertTrue(cert.cli_config.renewal_configs_dir in msg)
class DetermineAccountTest(unittest.TestCase):
"""Tests for letsencrypt.client.determine_authenticator."""
def setUp(self):
self.accounts_dir = tempfile.mkdtemp("accounts")
account_keys_dir = os.path.join(self.accounts_dir, "keys")
os.makedirs(account_keys_dir, 0o700)
self.config = mock.MagicMock(
spec=configuration.NamespaceConfig, accounts_dir=self.accounts_dir,
account_keys_dir=account_keys_dir, rsa_key_size=2048,
server="letsencrypt-demo.org")
def tearDown(self):
shutil.rmtree(self.accounts_dir)
@mock.patch("letsencrypt.account.Account.from_prompts")
@mock.patch("letsencrypt.client.display_ops.choose_account")
def test_determine_account(self, mock_op, mock_prompt):
"""Test determine account"""
from letsencrypt import client
key = le_util.Key(tempfile.mkstemp()[1], "pem")
test_acc = account.Account(self.config, key, "email1@gmail.com")
mock_op.return_value = test_acc
# Test 0
mock_prompt.return_value = None
self.assertTrue(client.determine_account(self.config) is None)
# Test 1
test_acc.save()
acc = client.determine_account(self.config)
self.assertEqual(acc.email, test_acc.email)
# Test multiple
self.assertFalse(mock_op.called)
acc2 = account.Account(self.config, key)
acc2.save()
chosen_acc = client.determine_account(self.config)
self.assertTrue(mock_op.called)
self.assertTrue(chosen_acc.email, test_acc.email)
class RollbackTest(unittest.TestCase):
"""Tests for letsencrypt.client.rollback."""

View file

@ -31,7 +31,6 @@ class NamespaceConfigTest(unittest.TestCase):
@mock.patch('letsencrypt.configuration.constants')
def test_dynamic_dirs(self, constants):
constants.ACCOUNTS_DIR = 'acc'
constants.ACCOUNT_KEYS_DIR = 'keys'
constants.BACKUP_DIR = 'backups'
constants.CERT_KEY_BACKUP_DIR = 'c/'
constants.CERT_DIR = 'certs'
@ -42,9 +41,6 @@ class NamespaceConfigTest(unittest.TestCase):
self.assertEqual(
self.config.accounts_dir, '/tmp/config/acc/acme-server.org:443/new')
self.assertEqual(
self.config.account_keys_dir,
'/tmp/config/acc/acme-server.org:443/new/keys')
self.assertEqual(self.config.backup_dir, '/tmp/foo/backups')
self.assertEqual(self.config.cert_dir, '/tmp/config/certs')
self.assertEqual(

View file

@ -1,5 +1,6 @@
"""Test letsencrypt.display.ops."""
import os
import pkg_resources
import sys
import tempfile
import unittest
@ -7,13 +8,19 @@ import unittest
import mock
import zope.component
from acme import jose
from acme import messages
from letsencrypt import account
from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt.display import util as display_util
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem")))
class ChoosePluginTest(unittest.TestCase):
"""Tests for letsencrypt.display.ops.choose_plugin."""
@ -73,11 +80,11 @@ class PickPluginTest(unittest.TestCase):
def test_default_provided(self):
self.default = "foo"
self._call()
self.reg.filter.assert_called_once()
self.assertEqual(1, self.reg.filter.call_count)
def test_no_default(self):
self._call()
self.reg.filter.assert_called_once()
self.assertEqual(1, self.reg.ifaces.call_count)
def test_no_candidate(self):
self.assertTrue(self._call() is None)
@ -140,8 +147,40 @@ class ConveniencePickPluginTest(unittest.TestCase):
interfaces.IAuthenticator, interfaces.IInstaller))
class GetEmailTest(unittest.TestCase):
"""Tests for letsencrypt.display.ops.get_email."""
def setUp(self):
mock_display = mock.MagicMock()
self.input = mock_display.input
zope.component.provideUtility(mock_display, interfaces.IDisplay)
@classmethod
def _call(cls):
from letsencrypt.display.ops import get_email
return get_email()
def test_cancel_none(self):
self.input.return_value = (display_util.CANCEL, "foo@bar.baz")
self.assertTrue(self._call() is None)
def test_ok_safe(self):
self.input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("letsencrypt.display.ops.le_util"
".safe_email") as mock_safe_email:
mock_safe_email.return_value = True
self.assertTrue(self._call() is "foo@bar.baz")
def test_ok_not_safe(self):
self.input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("letsencrypt.display.ops.le_util"
".safe_email") as mock_safe_email:
mock_safe_email.side_effect = [False, True]
self.assertTrue(self._call() is "foo@bar.baz")
class ChooseAccountTest(unittest.TestCase):
"""Test choose_account."""
"""Tests for letsencrypt.display.ops.choose_account."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
@ -153,13 +192,14 @@ class ChooseAccountTest(unittest.TestCase):
accounts_dir=self.accounts_dir,
account_keys_dir=self.account_keys_dir,
server="letsencrypt-demo.org")
self.key = le_util.Key("keypath", "pem")
self.key = KEY
self.acc1 = account.Account(self.config, self.key, "email1@g.com")
self.acc2 = account.Account(
self.config, self.key, "email2@g.com", "phone")
self.acc1.save()
self.acc2.save()
self.acc1 = account.Account(messages.RegistrationResource(
uri=None, new_authzr_uri=None, body=messages.Registration.from_data(
email="email1@g.com")), self.key)
self.acc2 = account.Account(messages.RegistrationResource(
uri=None, new_authzr_uri=None, body=messages.Registration.from_data(
email="email2@g.com", phone="phone")), self.key)
@classmethod
def _call(cls, accounts):

View file

@ -21,7 +21,7 @@ class MakeOrVerifyDirTest(unittest.TestCase):
def setUp(self):
self.root_path = tempfile.mkdtemp()
self.path = os.path.join(self.root_path, 'foo')
self.path = os.path.join(self.root_path, "foo")
os.mkdir(self.path, 0o400)
self.uid = os.getuid()
@ -34,7 +34,7 @@ class MakeOrVerifyDirTest(unittest.TestCase):
return make_or_verify_dir(directory, mode, self.uid)
def test_creates_dir_when_missing(self):
path = os.path.join(self.root_path, 'bar')
path = os.path.join(self.root_path, "bar")
self._call(path, 0o650)
self.assertTrue(os.path.isdir(path))
self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o650)
@ -47,9 +47,9 @@ class MakeOrVerifyDirTest(unittest.TestCase):
self.assertRaises(errors.Error, self._call, self.path, 0o600)
def test_reraises_os_error(self):
with mock.patch.object(os, 'makedirs') as makedirs:
with mock.patch.object(os, "makedirs") as makedirs:
makedirs.side_effect = OSError()
self.assertRaises(OSError, self._call, 'bar', 12312312)
self.assertRaises(OSError, self._call, "bar", 12312312)
class CheckPermissionsTest(unittest.TestCase):
@ -85,7 +85,7 @@ class UniqueFileTest(unittest.TestCase):
def setUp(self):
self.root_path = tempfile.mkdtemp()
self.default_name = os.path.join(self.root_path, 'foo.txt')
self.default_name = os.path.join(self.root_path, "foo.txt")
def tearDown(self):
shutil.rmtree(self.root_path, ignore_errors=True)
@ -96,9 +96,9 @@ class UniqueFileTest(unittest.TestCase):
def test_returns_fd_for_writing(self):
fd, name = self._call()
fd.write('bar')
fd.write("bar")
fd.close()
self.assertEqual(open(name).read(), 'bar')
self.assertEqual(open(name).read(), "bar")
def test_right_mode(self):
self.assertEqual(0o700, os.stat(self._call(0o700)[1]).st_mode & 0o777)
@ -118,11 +118,11 @@ class UniqueFileTest(unittest.TestCase):
self.assertEqual(os.path.dirname(name3), self.root_path)
basename1 = os.path.basename(name2)
self.assertTrue(basename1.endswith('foo.txt'))
self.assertTrue(basename1.endswith("foo.txt"))
basename2 = os.path.basename(name2)
self.assertTrue(basename2.endswith('foo.txt'))
self.assertTrue(basename2.endswith("foo.txt"))
basename3 = os.path.basename(name3)
self.assertTrue(basename3.endswith('foo.txt'))
self.assertTrue(basename3.endswith("foo.txt"))
class UniqueLineageNameTest(unittest.TestCase):
@ -139,9 +139,9 @@ class UniqueLineageNameTest(unittest.TestCase):
return unique_lineage_name(self.root_path, filename, mode)
def test_basic(self):
f, name = self._call("wow")
f, path = self._call("wow")
self.assertTrue(isinstance(f, file))
self.assertTrue(isinstance(name, str))
self.assertEqual(os.path.join(self.root_path, "wow.conf"), path)
def test_multiple(self):
for _ in xrange(10):
@ -165,5 +165,32 @@ class UniqueLineageNameTest(unittest.TestCase):
mock_fdopen.side_effect = err
self.assertRaises(OSError, self._call, "wow")
if __name__ == '__main__':
class SafeEmailTest(unittest.TestCase):
"""Test safe_email."""
@classmethod
def _call(cls, addr):
from letsencrypt.le_util import safe_email
return safe_email(addr)
def test_valid_emails(self):
addrs = [
"letsencrypt@letsencrypt.org",
"tbd.ade@gmail.com",
"abc_def.jdk@hotmail.museum",
]
for addr in addrs:
self.assertTrue(self._call(addr), "%s failed." % addr)
def test_invalid_emails(self):
addrs = [
"letsencrypt@letsencrypt..org",
".tbd.ade@gmail.com",
"~/abc_def.jdk@hotmail.museum",
]
for addr in addrs:
self.assertFalse(self._call(addr), "%s failed." % addr)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -1,53 +0,0 @@
"""Tests for letsencrypt.network."""
import shutil
import tempfile
import unittest
import mock
from acme import messages
from letsencrypt import account
class NetworkTest(unittest.TestCase):
"""Tests for letsencrypt.network.Network."""
def setUp(self):
from letsencrypt.network import Network
self.net = Network(
new_reg_uri=None, key=None, alg=None, verify_ssl=None)
self.config = mock.Mock(accounts_dir=tempfile.mkdtemp())
def tearDown(self):
shutil.rmtree(self.config.accounts_dir)
def test_register_from_account(self):
self.net.register = mock.Mock()
acc = account.Account(
self.config, 'key', email='cert-admin@example.com',
phone='+12025551212')
self.net.register_from_account(acc)
self.net.register.assert_called_once()
self.assertEqual(
set(self.net.register.mock_calls[0][1][0].contact),
set(('mailto:cert-admin@example.com', 'tel:+12025551212')))
def test_register_from_account_partial_info(self):
self.net.register = mock.Mock()
acc = account.Account(
self.config, 'key', email='cert-admin@example.com')
acc2 = account.Account(self.config, 'key')
self.net.register_from_account(acc)
self.net.register.assert_called_with(messages.Registration(
contact=('mailto:cert-admin@example.com',)))
self.net.register_from_account(acc2)
self.net.register.assert_called_with(messages.Registration())
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -556,9 +556,9 @@ class RenewableCertTests(unittest.TestCase):
datetime.timedelta(intended[time]))
@mock.patch("letsencrypt.renewer.plugins_disco")
@mock.patch("letsencrypt.client.determine_account")
@mock.patch("letsencrypt.account.AccountFileStorage")
@mock.patch("letsencrypt.client.Client")
def test_renew(self, mock_c, mock_da, mock_pd):
def test_renew(self, mock_c, mock_acc_storage, mock_pd):
from letsencrypt import renewer
test_cert = pkg_resources.resource_string(
@ -580,6 +580,7 @@ class RenewableCertTests(unittest.TestCase):
self.test_rc.configfile["renewalparams"]["server"] = "acme.example.com"
self.test_rc.configfile["renewalparams"]["authenticator"] = "fake"
self.test_rc.configfile["renewalparams"]["dvsni_port"] = "4430"
self.test_rc.configfile["renewalparams"]["account"] = "abcde"
mock_auth = mock.MagicMock()
mock_pd.PluginsRegistry.find_all.return_value = {"apache": mock_auth}
# Fails because "fake" != "apache"
@ -594,7 +595,7 @@ class RenewableCertTests(unittest.TestCase):
self.assertEqual(2, renewer.renew(self.test_rc, 1))
# TODO: We could also make several assertions about calls that should
# have been made to the mock functions here.
self.assertEqual(mock_da.call_count, 1)
mock_acc_storage().load.assert_called_once_with(account_id="abcde")
mock_client.obtain_certificate.return_value = (
mock.sentinel.certr, None, mock.sentinel.key, mock.sentinel.csr)
# This should fail because the renewal itself appears to fail

View file

@ -69,9 +69,9 @@ class RevokerTest(RevokerBase):
def tearDown(self):
shutil.rmtree(self.backup_dir)
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_key_all(self, mock_display, mock_net):
def test_revoke_by_key_all(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_key(self.key)
@ -81,7 +81,7 @@ class RevokerTest(RevokerBase):
for i in xrange(2):
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
self.assertEqual(mock_net.call_count, 2)
self.assertEqual(mock_acme.call_count, 2)
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey")
def test_revoke_by_invalid_keys(self, mock_load_privatekey):
@ -93,9 +93,9 @@ class RevokerTest(RevokerBase):
self.assertRaises(
errors.RevokerError, self.revoker.revoke_from_key, self.key)
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_wrong_key(self, mock_display, mock_net):
def test_revoke_by_wrong_key(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
key_path = pkg_resources.resource_filename(
@ -107,11 +107,11 @@ class RevokerTest(RevokerBase):
# Nothing was removed
self.assertEqual(len(self._get_rows()), 2)
# No revocation went through
self.assertEqual(mock_net.call_count, 0)
self.assertEqual(mock_acme.call_count, 0)
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_cert(self, mock_display, mock_net):
def test_revoke_by_cert(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_cert(self.paths[1])
@ -124,11 +124,11 @@ class RevokerTest(RevokerBase):
self.assertTrue(self._backups_exist(row0))
self.assertFalse(self._backups_exist(row1))
self.assertEqual(mock_net.call_count, 1)
self.assertEqual(mock_acme.call_count, 1)
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_cert_not_found(self, mock_display, mock_net):
def test_revoke_by_cert_not_found(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_cert(self.paths[0])
@ -143,11 +143,11 @@ class RevokerTest(RevokerBase):
self.assertTrue(self._backups_exist(row1))
self.assertFalse(self._backups_exist(row0))
self.assertEqual(mock_net.call_count, 1)
self.assertEqual(mock_acme.call_count, 1)
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_menu(self, mock_display, mock_net):
def test_revoke_by_menu(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
mock_display.display_certs.side_effect = [
(display_util.HELP, 0),
@ -165,13 +165,13 @@ class RevokerTest(RevokerBase):
self.assertFalse(self._backups_exist(row0))
self.assertTrue(self._backups_exist(row1))
self.assertEqual(mock_net.call_count, 1)
self.assertEqual(mock_acme.call_count, 1)
self.assertEqual(mock_display.more_info_cert.call_count, 1)
@mock.patch("letsencrypt.revoker.logger")
@mock.patch("letsencrypt.network.Network.revoke")
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_menu_delete_all(self, mock_display, mock_net, mock_log):
def test_revoke_by_menu_delete_all(self, mock_display, mock_acme, mock_log):
mock_display().confirm_revocation.return_value = True
mock_display.display_certs.return_value = (display_util.OK, 0)
@ -183,7 +183,7 @@ class RevokerTest(RevokerBase):
for i in xrange(2):
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
self.assertEqual(mock_net.call_count, 2)
self.assertEqual(mock_acme.call_count, 2)
# Info is called when there aren't any certs left...
self.assertTrue(mock_log.info.called)

View file

@ -142,5 +142,6 @@ class ApacheDvsni(common.Dvsni):
return self.VHOST_TEMPLATE.format(
vhost=ips, server_name=achall.nonce_domain,
ssl_options_conf_path=self.configurator.parser.loc["ssl_options"],
cert_path=self.get_cert_file(achall), key_path=achall.key.file,
cert_path=self.get_cert_path(achall),
key_path=self.get_key_path(achall),
document_root=document_root).replace("\n", os.linesep)

View file

@ -140,8 +140,8 @@ class NginxDvsni(common.Dvsni):
self.configurator.config.work_dir, 'access.log')],
['error_log', os.path.join(
self.configurator.config.work_dir, 'error.log')],
['ssl_certificate', self.get_cert_file(achall)],
['ssl_certificate_key', achall.key.file],
['ssl_certificate', self.get_cert_path(achall)],
['ssl_certificate_key', self.get_key_path(achall)],
[['location', '/'], [['root', document_root]]]])
return [['server'], block]

View file

@ -39,7 +39,7 @@ acme_install_requires = [
# rsa_recover_prime_factors (>=0.8)
'cryptography>=0.8',
#'letsencrypt' # TODO: uses testdata vectors
'mock',
'mock<1.1.0', # py26
'pyrfc3339',
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
@ -54,7 +54,7 @@ letsencrypt_install_requires = [
'ConfigArgParse',
'configobj',
#'cryptography>=0.7', # load_pem_x509_certificate, version pin mismatch
'mock',
'mock<1.1.0', # py26
'parsedatetime',
'psutil>=2.1.0', # net_connections introduced in 2.1.0
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509Req.get_extensions
@ -68,7 +68,7 @@ letsencrypt_install_requires = [
letsencrypt_apache_install_requires = [
#'acme',
#'letsencrypt',
'mock',
'mock<1.1.0', # py26
'python-augeas',
'zope.component',
'zope.interface',
@ -77,7 +77,7 @@ letsencrypt_nginx_install_requires = [
#'acme',
#'letsencrypt',
'pyparsing>=1.5.5', # Python3 support; perhaps unnecessary?
'mock',
'mock<1.1.0', # py26
'zope.interface',
]
@ -86,7 +86,7 @@ install_requires = [
'cryptography>=0.8',
'ConfigArgParse',
'configobj',
'mock',
'mock<1.1.0', # py26
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'parsedatetime',
'psutil>=2.1.0', # net_connections introduced in 2.1.0

View file

@ -18,6 +18,7 @@ letsencrypt_test () {
$store_flags \
--text \
--agree-eula \
--agree-tos \
--email "" \
--debug \
-vvvvvvv \