mirror of
https://github.com/certbot/certbot.git
synced 2026-06-07 07:42:08 -04:00
Add accounts and tests
This commit is contained in:
parent
214c0e9355
commit
1e97c0c598
4 changed files with 268 additions and 36 deletions
|
|
@ -1,6 +1,4 @@
|
|||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
import configobj
|
||||
import zope.component
|
||||
|
|
@ -13,6 +11,7 @@ from letsencrypt.client import interfaces
|
|||
from letsencrypt.client import le_util
|
||||
|
||||
from letsencrypt.client.display import ops as display_ops
|
||||
from letsencrypt.client.display import util as display_util
|
||||
|
||||
|
||||
class Account(object):
|
||||
|
|
@ -26,51 +25,85 @@ class Account(object):
|
|||
:ivar str email: Client's email address
|
||||
:ivar str phone: Client's phone number
|
||||
|
||||
:ivar bool save: Whether or not to save the account information
|
||||
|
||||
:ivar regr: Registration Resource
|
||||
:type regr: :class:`~letsencrypt.acme.messages2.RegistrationResource`
|
||||
|
||||
"""
|
||||
|
||||
# Just make sure we don't get pwned
|
||||
# Make sure that it also doesn't start with a period or have two consecutive
|
||||
# periods <- this needs to be done in addition to the regex
|
||||
EMAIL_REGEX = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+"
|
||||
|
||||
def __init__(self, config, key, email=None, phone=None, regr=None):
|
||||
self.key = key
|
||||
self.config = config
|
||||
self.email = email
|
||||
if email is not None:
|
||||
self.email = self.scrub_email(email)
|
||||
else:
|
||||
self.email = None
|
||||
self.phone = phone
|
||||
|
||||
self.regr = regr
|
||||
|
||||
@property
|
||||
def uri(self):
|
||||
"""URI link for new registrations."""
|
||||
if self.regr is not None:
|
||||
return self.regr.uri
|
||||
|
||||
@property
|
||||
def new_authzr_uri(self): # pylint: disable=missing-docstring
|
||||
if self.regr is not None:
|
||||
return self.regr.new_authzr_uri
|
||||
|
||||
@property
|
||||
def terms_of_service(self): # pylint: disable=missing-docstring
|
||||
if self.regr is not None:
|
||||
return self.regr.terms_of_service
|
||||
|
||||
@property
|
||||
def recovery_token(self): # pylint: disable=missing-docstring
|
||||
if self.regr is not None and self.regr.body is not None:
|
||||
return self.regr.body.recovery_token
|
||||
|
||||
def save(self):
|
||||
# account_dir = le_util.make_or_verify_dir(
|
||||
# os.path.join(self.config.config_dir, "accounts"))
|
||||
# account_key_dir = le_util.make_or_verify_dir(
|
||||
# os.path.join(account_dir, "keys"), 0o700)
|
||||
"""Save account to disk."""
|
||||
le_util.make_or_verify_dir(self.accounts_dir)
|
||||
|
||||
acc_config = configobj.ConfigObj()
|
||||
# acc_config.filename = os.path.join(
|
||||
# account_dir, self._get_config_filename())
|
||||
acc_config.filename = sys.stdout
|
||||
acc_config.filename = os.path.join(
|
||||
self.config.accounts_dir, self._get_config_filename(self.email))
|
||||
|
||||
acc_config.initial_comment = [
|
||||
"Account information for %s under %s" % (
|
||||
self._get_config_filename(self.email), self.config.server)]
|
||||
acc_config["key"] = self.key.path
|
||||
|
||||
acc_config["key"] = self.key.file
|
||||
acc_config["phone"] = self.phone
|
||||
|
||||
regr_json = self.regr.to_json()
|
||||
regr_dict = json.loads(regr_json)
|
||||
if self.regr is not None:
|
||||
acc_config["RegistrationResource"] = {}
|
||||
acc_config["RegistrationResource"]["uri"] = self.uri
|
||||
acc_config["RegistrationResource"]["new_authzr_uri"] = (
|
||||
self.new_authzr_uri)
|
||||
acc_config["RegistrationResource"]["terms_of_service"] = (
|
||||
self.terms_of_service)
|
||||
|
||||
regr_dict = self.regr.body.to_json()
|
||||
acc_config["RegistrationResource"]["body"] = regr_dict
|
||||
|
||||
acc_config["regr"] = regr_dict
|
||||
acc_config.write()
|
||||
|
||||
@classmethod
|
||||
def _get_config_filename(self, email):
|
||||
def _get_config_filename(cls, email):
|
||||
return email if email is not None else "default"
|
||||
|
||||
@classmethod
|
||||
def from_existing_account(cls, config, email=None):
|
||||
"""Populate an account from an existing email."""
|
||||
accounts_dir = os.path.join(
|
||||
config.config_dir, "accounts", config.server)
|
||||
config.accounts_dir)
|
||||
config_fp = os.path.join(accounts_dir, cls._get_config_filename(email))
|
||||
return cls._from_config_fp(config, config_fp)
|
||||
|
||||
|
|
@ -82,20 +115,36 @@ class Account(object):
|
|||
except IOError:
|
||||
raise errors.LetsEncryptClientError(
|
||||
"Account for %s does not exist" % os.path.basename(config_fp))
|
||||
json_regr = json.dumps(acc_config["regr"])
|
||||
return cls(config, acc_config["key"], acc_config["email"],
|
||||
acc_config["phone"],
|
||||
messages2.RegistrationResource.from_json(json_regr))
|
||||
|
||||
if os.path.basename(config_fp) != "default":
|
||||
email = os.path.basename(config_fp)
|
||||
else:
|
||||
email = None
|
||||
phone = acc_config["phone"] if acc_config["phone"] != "None" else None
|
||||
|
||||
with open(acc_config["key"]) as key_file:
|
||||
key = le_util.Key(acc_config["key"], key_file.read())
|
||||
|
||||
if "RegistrationResource" in acc_config:
|
||||
acc_config_rr = acc_config["RegistrationResource"]
|
||||
regr = messages2.RegistrationResource(
|
||||
uri=acc_config_rr["uri"],
|
||||
new_authzr_uri=acc_config_rr["new_authzr_uri"],
|
||||
terms_of_service=acc_config_rr["terms_of_service"],
|
||||
body=messages2.Registration.from_json(acc_config_rr["body"]))
|
||||
else:
|
||||
regr = None
|
||||
|
||||
return cls(config, key, email, phone, regr)
|
||||
|
||||
@classmethod
|
||||
def choose_account(cls, config):
|
||||
"""Choose one of the available accounts."""
|
||||
accounts = []
|
||||
accounts_dir = os.path.join(config.config_dir, "accounts")
|
||||
filenames = os.listdir(accounts_dir)
|
||||
filenames = os.listdir(config.accounts_dir)
|
||||
for name in filenames:
|
||||
# Not some directory ie. keys
|
||||
config_fp = os.path.join(accounts_dir, name)
|
||||
config_fp = os.path.join(config.accounts_dir, name)
|
||||
if os.path.isfile(config_fp):
|
||||
accounts.append(cls._from_config_fp(config, config_fp))
|
||||
|
||||
|
|
@ -108,8 +157,23 @@ class Account(object):
|
|||
|
||||
@classmethod
|
||||
def from_prompts(cls, config):
|
||||
email = zope.component.getUtility(interfaces.IDisplay).input(
|
||||
"""Generate an account from prompted user input."""
|
||||
code, email = zope.component.getUtility(interfaces.IDisplay).input(
|
||||
"Enter email address")
|
||||
key_dir = os.path.join(config.config_dir, "accounts", config.server, "keys")
|
||||
key = crypto_util.init_save_key(2048, config.accounts_dir, email)
|
||||
return cls(config, email, key)
|
||||
if code == display_util.OK:
|
||||
email = email if email != "" else None
|
||||
|
||||
print config.account_keys_dir
|
||||
le_util.make_or_verify_dir(
|
||||
config.account_keys_dir, 0o700, os.geteuid())
|
||||
key = crypto_util.init_save_key(
|
||||
2048, config.account_keys_dir, email)
|
||||
return cls(config, key, email)
|
||||
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def scrub_email(cls, email):
|
||||
"""Scrub email address before using it."""
|
||||
# TODO: Fill in
|
||||
return email
|
||||
|
|
|
|||
|
|
@ -42,17 +42,18 @@ def choose_authenticator(auths, errs):
|
|||
else:
|
||||
return
|
||||
|
||||
|
||||
def choose_account(accounts):
|
||||
"""Choose an account.
|
||||
|
||||
:param list accounts: where each is of type
|
||||
:param list accounts: Containing at least one
|
||||
:class:`~letsencrypt.client.account.Account`
|
||||
|
||||
"""
|
||||
# Note this will get more complicated once we start recording authorizations
|
||||
|
||||
labels = [
|
||||
"%s | %s" % (acc.email.ljust(display_util.WIDTH - 39), acc.phone)
|
||||
"%s | %s" % (acc.email.ljust(display_util.WIDTH - 39),
|
||||
acc.phone if acc.phone is not None else "")
|
||||
for acc in accounts
|
||||
]
|
||||
|
||||
|
|
@ -63,6 +64,7 @@ def choose_account(accounts):
|
|||
else:
|
||||
return None
|
||||
|
||||
|
||||
def choose_names(installer):
|
||||
"""Display screen to select domains to validate.
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,132 @@
|
|||
import mock
|
||||
import os
|
||||
import pkg_resources
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import zope.component
|
||||
|
||||
from letsencrypt.acme import messages2
|
||||
|
||||
from letsencrypt.client import account
|
||||
from letsencrypt.client import configuration
|
||||
from letsencrypt.client import le_util
|
||||
|
||||
from letsencrypt.client.display import util as display_util
|
||||
|
||||
|
||||
mock_config = mock.MagicMock(spec=configuration.NamespaceConfig)
|
||||
acc = account.Account.from_prompts(mock_config)
|
||||
class AccountTest(unittest.TestCase):
|
||||
"""Tests letsencrypt.client.account.Account."""
|
||||
|
||||
acc.save()
|
||||
def setUp(self):
|
||||
self.accounts_dir = tempfile.mkdtemp("accounts")
|
||||
self.account_keys_dir = os.path.join(self.accounts_dir, "keys")
|
||||
os.makedirs(self.account_keys_dir, 0o700)
|
||||
|
||||
self.config = mock.MagicMock(
|
||||
spec=configuration.NamespaceConfig, accounts_dir=self.accounts_dir,
|
||||
account_keys_dir=self.account_keys_dir,
|
||||
server="letsencrypt-demo.org")
|
||||
|
||||
rsa256_file = pkg_resources.resource_filename(
|
||||
"letsencrypt.client.tests", "testdata/rsa256_key.pem")
|
||||
rsa256_pem = pkg_resources.resource_string(
|
||||
"letsencrypt.client.tests", "testdata/rsa256_key.pem")
|
||||
|
||||
self.key = le_util.Key(rsa256_file, rsa256_pem)
|
||||
self.email = "client@letsencrypt.org"
|
||||
self.regr = messages2.RegistrationResource(
|
||||
uri="uri",
|
||||
new_authzr_uri="new_authzr_uri",
|
||||
terms_of_service="terms_of_service",
|
||||
body=messages2.Registration(
|
||||
recovery_token="recovery_token", agreement="agreement")
|
||||
)
|
||||
|
||||
self.test_account = account.Account(
|
||||
self.config, self.key, self.email, None, self.regr)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.accounts_dir)
|
||||
|
||||
@mock.patch("letsencrypt.client.account.zope.component.getUtility")
|
||||
@mock.patch("letsencrypt.client.account.crypto_util.init_save_key")
|
||||
def test_prompts(self, mock_key, mock_util):
|
||||
displayer = display_util.FileDisplay(sys.stdout)
|
||||
zope.component.provideUtility(displayer)
|
||||
|
||||
mock_util().input.return_value = (display_util.OK, self.email)
|
||||
|
||||
mock_key.return_value = self.key
|
||||
acc = account.Account.from_prompts(self.config)
|
||||
|
||||
self.assertEqual(acc.email, self.email)
|
||||
self.assertEqual(acc.key, self.key)
|
||||
self.assertEqual(acc.config, self.config)
|
||||
|
||||
def test_save(self):
|
||||
self.test_account.save()
|
||||
self._read_out_config(self.email)
|
||||
|
||||
def test_save_from_existing_account(self):
|
||||
self.test_account.save()
|
||||
acc = account.Account.from_existing_account(self.config, self.email)
|
||||
|
||||
self.assertEqual(acc.key, self.test_account.key)
|
||||
self.assertEqual(acc.email, self.test_account.email)
|
||||
self.assertEqual(acc.phone, self.test_account.phone)
|
||||
self.assertEqual(acc.regr, self.test_account.regr)
|
||||
|
||||
def test_properties(self):
|
||||
self.assertEqual(self.test_account.uri, "uri")
|
||||
self.assertEqual(self.test_account.new_authzr_uri, "new_authzr_uri")
|
||||
self.assertEqual(self.test_account.terms_of_service, "terms_of_service")
|
||||
self.assertEqual(self.test_account.recovery_token, "recovery_token")
|
||||
|
||||
def test_partial_properties(self):
|
||||
partial = account.Account(self.config, self.key)
|
||||
|
||||
self.assertTrue(partial.uri is None)
|
||||
self.assertTrue(partial.new_authzr_uri is None)
|
||||
self.assertTrue(partial.terms_of_service is None)
|
||||
self.assertTrue(partial.recovery_token is None)
|
||||
|
||||
|
||||
def test_partial_account_default(self):
|
||||
partial = account.Account(self.config, self.key)
|
||||
partial.save()
|
||||
|
||||
acc = account.Account.from_existing_account(self.config)
|
||||
|
||||
self.assertEqual(partial.key, acc.key)
|
||||
self.assertEqual(partial.email, acc.email)
|
||||
self.assertEqual(partial.phone, acc.phone)
|
||||
self.assertEqual(partial.regr, acc.regr)
|
||||
|
||||
@mock.patch("letsencrypt.client.account.display_ops.choose_account")
|
||||
def test_choose_account(self, mock_op):
|
||||
mock_op.return_value = self.test_account
|
||||
|
||||
# Test 0
|
||||
self.assertTrue(account.Account.choose_account(self.config) is None)
|
||||
|
||||
# Test 1
|
||||
self.test_account.save()
|
||||
acc = account.Account.choose_account(self.config)
|
||||
self.assertEqual(acc.email, self.test_account.email)
|
||||
|
||||
# Test multiple
|
||||
self.assertFalse(mock_op.called)
|
||||
acc2 = account.Account(self.config, self.key)
|
||||
acc2.save()
|
||||
test_acc = account.Account.choose_account(self.config)
|
||||
self.assertTrue(mock_op.called)
|
||||
self.assertTrue(test_acc.email, self.test_account.email)
|
||||
|
||||
def _read_out_config(self, filep):
|
||||
print open(os.path.join(self.accounts_dir, filep)).read()
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
@ -1,10 +1,13 @@
|
|||
"""Test letsencrypt.client.display.ops."""
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
import zope.component
|
||||
|
||||
from letsencrypt.client import le_util
|
||||
from letsencrypt.client.display import util as display_util
|
||||
|
||||
|
||||
|
|
@ -50,10 +53,51 @@ class ChooseAuthenticatorTest(unittest.TestCase):
|
|||
@mock.patch("letsencrypt.client.display.ops.util")
|
||||
def test_no_choice(self, mock_util):
|
||||
mock_util().menu.return_value = (display_util.CANCEL, 0)
|
||||
|
||||
self.assertTrue(self._call(self.auths, {}) is None)
|
||||
|
||||
|
||||
class ChooseAccountTest(unittest.TestCase):
|
||||
"""Test choose_account."""
|
||||
def setUp(self):
|
||||
from letsencrypt.client import account
|
||||
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
|
||||
|
||||
self.accounts_dir = tempfile.mkdtemp("accounts")
|
||||
self.account_keys_dir = os.path.join(self.accounts_dir, "keys")
|
||||
os.makedirs(self.account_keys_dir, 0o700)
|
||||
|
||||
self.config = mock.MagicMock(
|
||||
accounts_dir=self.accounts_dir,
|
||||
account_keys_dir=self.account_keys_dir,
|
||||
server="letsencrypt-demo.org")
|
||||
self.key = le_util.Key("keypath", "pem")
|
||||
|
||||
self.acc1 = account.Account(self.config, self.key, "email1")
|
||||
self.acc2 = account.Account(self.config, self.key, "email2", "phone")
|
||||
self.acc1.save()
|
||||
self.acc2.save()
|
||||
|
||||
@classmethod
|
||||
def _call(cls, accounts):
|
||||
from letsencrypt.client.display import ops
|
||||
return ops.choose_account(accounts)
|
||||
|
||||
@mock.patch("letsencrypt.client.display.ops.util")
|
||||
def test_one(self, mock_util):
|
||||
mock_util().menu.return_value = (display_util.OK, 0)
|
||||
self.assertEqual(self._call([self.acc1]), self.acc1)
|
||||
|
||||
@mock.patch("letsencrypt.client.display.ops.util")
|
||||
def test_two(self, mock_util):
|
||||
mock_util().menu.return_value = (display_util.OK, 1)
|
||||
self.assertEqual(self._call([self.acc1, self.acc2]), self.acc2)
|
||||
|
||||
@mock.patch("letsencrypt.client.display.ops.util")
|
||||
def test_cancel(self, mock_util):
|
||||
mock_util().menu.return_value = (display_util.CANCEL, 1)
|
||||
self.assertTrue(self._call([self.acc1, self.acc2]) is None)
|
||||
|
||||
|
||||
class GenHttpsNamesTest(unittest.TestCase):
|
||||
"""Test _gen_https_names."""
|
||||
def setUp(self):
|
||||
|
|
|
|||
Loading…
Reference in a new issue