Deferred EFF subscription until the first certificate is successfully issued (#8076)

* Base logic

* Various controls when email is None

* Adapt eff tests

* Forward compatibility

* Also for csr

* Explicit regr or meta updates in account objects

* Adapt logic to ask for eff subscription during registering

* Adapt tests

* Move dry-run control

* Add some relevant controls on handle_subscription call checks
This commit is contained in:
Adrien Ferrand 2020-06-19 00:58:19 +02:00 committed by GitHub
parent 70c8481fd8
commit 860af81fef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 303 additions and 138 deletions

View file

@ -15,6 +15,7 @@ import zope.component
from acme import fields as acme_fields
from acme import messages
from acme.client import ClientBase # pylint: disable=unused-import
from certbot import errors
from certbot import interfaces
from certbot import util
@ -39,6 +40,8 @@ class Account(object):
:ivar datetime.datetime creation_dt: Creation date and time (UTC).
:ivar str creation_host: FQDN of host, where account has been created.
:ivar str register_to_eff: If not None, Certbot will register the provided
email during the account registration.
.. note:: ``creation_dt`` and ``creation_host`` are useful in
cross-machine migration scenarios.
@ -46,15 +49,16 @@ class Account(object):
"""
creation_dt = acme_fields.RFC3339Field("creation_dt")
creation_host = jose.Field("creation_host")
register_to_eff = jose.Field("register_to_eff", omitempty=True)
def __init__(self, regr, key, meta=None):
self.key = key
self.regr = regr
self.meta = self.Meta(
# pyrfc3339 drops microseconds, make sure __eq__ is sane
creation_dt=datetime.datetime.now(
tz=pytz.UTC).replace(microsecond=0),
creation_host=socket.getfqdn()) if meta is None else meta
creation_dt=datetime.datetime.now(tz=pytz.UTC).replace(microsecond=0),
creation_host=socket.getfqdn(),
register_to_eff=None) if meta is None else meta
# try MD5, else use MD5 in non-security mode (e.g. for FIPS systems / RHEL)
try:
@ -242,15 +246,47 @@ class AccountFileStorage(interfaces.AccountStorage):
return self._load_for_server_path(account_id, self.config.server_path)
def save(self, account, client):
self._save(account, client, regr_only=False)
# type: (Account, ClientBase) -> None
"""Create a new account.
def save_regr(self, account, acme):
"""Save the registration resource.
:param Account account: account whose regr should be saved
:param Account account: account to create
:param ClientBase client: ACME client associated to the account
"""
self._save(account, acme, regr_only=True)
try:
dir_path = self._prepare(account)
self._create(account, dir_path)
self._update_meta(account, dir_path)
self._update_regr(account, client, dir_path)
except IOError as error:
raise errors.AccountStorageError(error)
def update_regr(self, account, client):
# type: (Account, ClientBase) -> None
"""Update the registration resource.
:param Account account: account to update
:param ClientBase client: ACME client associated to the account
"""
try:
dir_path = self._prepare(account)
self._update_regr(account, client, dir_path)
except IOError as error:
raise errors.AccountStorageError(error)
def update_meta(self, account):
# type: (Account) -> None
"""Update the meta resource.
:param Account account: account to update
"""
try:
dir_path = self._prepare(account)
self._update_meta(account, dir_path)
except IOError as error:
raise errors.AccountStorageError(error)
def delete(self, account_id):
"""Delete registration info from disk
@ -318,32 +354,36 @@ class AccountFileStorage(interfaces.AccountStorage):
return dir_path
def _save(self, account, acme, regr_only):
def _prepare(self, account):
# type: (Account) -> str
account_dir_path = self._account_dir_path(account.id)
util.make_or_verify_dir(account_dir_path, 0o700, self.config.strict_permissions)
try:
with open(self._regr_path(account_dir_path), "w") as regr_file:
regr = account.regr
# If we have a value for new-authz, save it for forwards
# compatibility with older versions of Certbot. If we don't
# have a value for new-authz, this is an ACMEv2 directory where
# an older version of Certbot won't work anyway.
if hasattr(acme.directory, "new-authz"):
regr = RegistrationResourceWithNewAuthzrURI(
new_authzr_uri=acme.directory.new_authz,
body={},
uri=regr.uri)
else:
regr = messages.RegistrationResource(
body={},
uri=regr.uri)
regr_file.write(regr.json_dumps())
if not regr_only:
with util.safe_open(self._key_path(account_dir_path),
"w", chmod=0o400) as key_file:
key_file.write(account.key.json_dumps())
with open(self._metadata_path(
account_dir_path), "w") as metadata_file:
metadata_file.write(account.meta.json_dumps())
except IOError as error:
raise errors.AccountStorageError(error)
return account_dir_path
def _create(self, account, dir_path):
# type: (Account, str) -> None
with util.safe_open(self._key_path(dir_path), "w", chmod=0o400) as key_file:
key_file.write(account.key.json_dumps())
def _update_regr(self, account, acme, dir_path):
# type: (Account, ClientBase, str) -> None
with open(self._regr_path(dir_path), "w") as regr_file:
regr = account.regr
# If we have a value for new-authz, save it for forwards
# compatibility with older versions of Certbot. If we don't
# have a value for new-authz, this is an ACMEv2 directory where
# an older version of Certbot won't work anyway.
if hasattr(acme.directory, "new-authz"):
regr = RegistrationResourceWithNewAuthzrURI(
new_authzr_uri=acme.directory.new_authz,
body={},
uri=regr.uri)
else:
regr = messages.RegistrationResource(
body={},
uri=regr.uri)
regr_file.write(regr.json_dumps())
def _update_meta(self, account, dir_path):
with open(self._metadata_path(dir_path), "w") as metadata_file:
metadata_file.write(account.meta.json_dumps())

View file

@ -178,7 +178,7 @@ def register(config, account_storage, tos_cb=None):
account.report_new_account(config)
account_storage.save(acc, acme)
eff.handle_subscription(config)
eff.prepare_subscription(config, acc)
return acc, acme
@ -389,6 +389,7 @@ class Client(object):
authzr = self.auth_handler.handle_authorizations(orderr, best_effort)
return orderr.update(authorizations=authzr)
def obtain_and_enroll_certificate(self, domains, certname):
"""Obtain and enroll certificate.

View file

@ -4,32 +4,68 @@ import logging
import requests
import zope.component
from acme.magic_typing import Optional # pylint: disable=unused-import
from certbot import interfaces
from certbot._internal import constants
from certbot._internal.account import Account # pylint: disable=unused-import
from certbot._internal.account import AccountFileStorage
from certbot.interfaces import IConfig # pylint: disable=unused-import
logger = logging.getLogger(__name__)
def handle_subscription(config):
"""High level function to take care of EFF newsletter subscriptions.
def prepare_subscription(config, acc):
# type: (IConfig, Account) -> None
"""High level function to store potential EFF newsletter subscriptions.
The user may be asked if they want to sign up for the newsletter if
they have not already specified.
they have not given their explicit approval or refusal using --eff-mail
or --no-eff-mail flag.
:param .IConfig config: Client configuration.
Decision about EFF subscription will be stored in the account metadata.
:param IConfig config: Client configuration.
:param Account acc: Current client account.
"""
if config.email is None:
if config.eff_email:
_report_failure("you didn't provide an e-mail address")
if config.eff_email is False:
return
if config.eff_email is None:
config.eff_email = _want_subscription()
if config.eff_email:
subscribe(config.email)
if config.eff_email is True:
if config.email is None:
_report_failure("you didn't provide an e-mail address")
else:
acc.meta = acc.meta.update(register_to_eff=config.email)
elif config.email and _want_subscription():
acc.meta = acc.meta.update(register_to_eff=config.email)
if acc.meta.register_to_eff:
storage = AccountFileStorage(config)
storage.update_meta(acc)
def handle_subscription(config, acc):
# type: (IConfig, Account) -> None
"""High level function to take care of EFF newsletter subscriptions.
Once subscription is handled, it will not be handled again.
:param IConfig config: Client configuration.
:param Account acc: Current client account.
"""
if config.dry_run:
return
if acc.meta.register_to_eff:
subscribe(acc.meta.register_to_eff)
acc.meta = acc.meta.update(register_to_eff=None)
storage = AccountFileStorage(config)
storage.update_meta(acc)
def _want_subscription():
# type: () -> bool
"""Does the user want to be subscribed to the EFF newsletter?
:returns: True if we should subscribe the user, otherwise, False
@ -37,16 +73,17 @@ def _want_subscription():
"""
prompt = (
'Would you be willing to share your email address with the '
"Electronic Frontier Foundation, a founding partner of the Let's "
'Encrypt project and the non-profit organization that develops '
"Certbot? We'd like to send you email about our work encrypting "
'Would you be willing, once your first certificate is successfully issued, '
'to share your email address with the Electronic Frontier Foundation, a '
"founding partner of the Let's Encrypt project and the non-profit organization "
"that develops Certbot? We'd like to send you email about our work encrypting "
"the web, EFF news, campaigns, and ways to support digital freedom. ")
display = zope.component.getUtility(interfaces.IDisplay)
return display.yesno(prompt, default=False)
def subscribe(email):
# type: (str) -> None
"""Subscribe the user to the EFF mailing list.
:param str email: the e-mail address to subscribe
@ -56,11 +93,13 @@ def subscribe(email):
data = {'data_type': 'json',
'email': email,
'form_id': 'eff_supporters_library_subscribe_form'}
logger.info('Subscribe to the EFF mailing list (email: %s).', email)
logger.debug('Sending POST request to %s:\n%s', url, data)
_check_response(requests.post(url, data=data))
def _check_response(response):
# type: (requests.Response) -> None
"""Check for errors in the server's response.
If an error occurred, it will be reported to the user.
@ -81,6 +120,7 @@ def _check_response(response):
def _report_failure(reason=None):
# type: (Optional[str]) -> None
"""Notify the user of failing to sign them up for the newsletter.
:param reason: a phrase describing what the problem was

View file

@ -721,11 +721,12 @@ def update_account(config, unused_plugins):
# the v2 uri. Since it's the same object on disk, put it back to the v1 uri
# so that we can also continue to use the account object with acmev1.
acc.regr = acc.regr.update(uri=prev_regr_uri)
account_storage.save_regr(acc, cb_client.acme)
eff.handle_subscription(config)
account_storage.update_regr(acc, cb_client.acme)
eff.prepare_subscription(config, acc)
add_msg("Your e-mail address was updated to {0}.".format(config.email))
return None
def _install_cert(config, le_client, domains, lineage=None):
"""Install a cert
@ -1116,6 +1117,7 @@ def run(config, plugins):
display_ops.success_renewal(domains)
_suggest_donation_if_appropriate(config)
eff.handle_subscription(config, le_client.account)
return None
@ -1189,6 +1191,7 @@ def renew_cert(config, plugins, lineage):
notify("new certificate deployed with reload of {0} server; fullchain is {1}".format(
config.installer, lineage.fullchain), pause=False)
def certonly(config, plugins):
"""Authenticate & obtain cert, but do not install it.
@ -1220,6 +1223,7 @@ def certonly(config, plugins):
cert_path, fullchain_path = _csr_get_and_save_cert(config, le_client)
_report_new_cert(config, cert_path, fullchain_path)
_suggest_donation_if_appropriate(config)
eff.handle_subscription(config, le_client.account)
return
domains, certname = _find_domains_or_certname(config, installer)
@ -1237,6 +1241,8 @@ def certonly(config, plugins):
key_path = lineage.key_path if lineage else None
_report_new_cert(config, cert_path, fullchain_path, key_path)
_suggest_donation_if_appropriate(config)
eff.handle_subscription(config, le_client.account)
def renew(config, unused_plugins):
"""Renew previously-obtained certificates.

View file

@ -17,6 +17,7 @@ from certbot.compat import misc
from certbot.compat import os
import certbot.tests.util as test_util
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
@ -56,6 +57,32 @@ class AccountTest(unittest.TestCase):
self.assertTrue(repr(self.acc).startswith(
"<Account(i_am_a_regr, 7adac10320f585ddf118429c0c4af2cd, Meta("))
class MetaTest(unittest.TestCase):
"""Tests for certbot._internal.account.Meta."""
def test_deserialize_partial(self):
from certbot._internal.account import Account
meta = Account.Meta.json_loads(
'{'
' "creation_dt": "2020-06-13T07:46:45Z",'
' "creation_host": "hyperion.localdomain"'
'}')
self.assertIsNotNone(meta.creation_dt)
self.assertIsNotNone(meta.creation_host)
self.assertIsNone(meta.register_to_eff)
def test_deserialize_full(self):
from certbot._internal.account import Account
meta = Account.Meta.json_loads(
'{'
' "creation_dt": "2020-06-13T07:46:45Z",'
' "creation_host": "hyperion.localdomain",'
' "register_to_eff": "bar"'
'}')
self.assertIsNotNone(meta.creation_dt)
self.assertIsNotNone(meta.creation_host)
self.assertIsNotNone(meta.register_to_eff)
class ReportNewAccountTest(test_util.ConfigTestCase):
"""Tests for certbot._internal.account.report_new_account."""
@ -138,15 +165,23 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
regr = json.load(f)
self.assertTrue("new_authzr_uri" in regr)
def test_save_regr(self):
self.storage.save_regr(self.acc, self.mock_client)
def test_update_regr(self):
self.storage.update_regr(self.acc, self.mock_client)
account_path = os.path.join(self.config.accounts_dir, self.acc.id)
self.assertTrue(os.path.exists(account_path))
self.assertTrue(os.path.exists(os.path.join(
account_path, "regr.json")))
for file_name in "meta.json", "private_key.json":
self.assertFalse(os.path.exists(
os.path.join(account_path, file_name)))
self.assertTrue(os.path.exists(os.path.join(account_path, "regr.json")))
self.assertFalse(os.path.exists(os.path.join(account_path, "meta.json")))
self.assertFalse(os.path.exists(os.path.join(account_path, "private_key.json")))
def test_update_meta(self):
self.storage.update_meta(self.acc)
account_path = os.path.join(self.config.accounts_dir, self.acc.id)
self.assertTrue(os.path.exists(account_path))
self.assertTrue(os.path.exists(os.path.join(account_path, "meta.json")))
self.assertFalse(os.path.exists(os.path.join(account_path, "regr.json")))
self.assertFalse(os.path.exists(os.path.join(account_path, "private_key.json")))
def test_find_all(self):
self.storage.save(self.acc, self.mock_client)

View file

@ -17,6 +17,7 @@ from certbot.compat import filesystem
from certbot.compat import os
import certbot.tests.util as test_util
KEY = test_util.load_vector("rsa512_key.pem")
CSR_SAN = test_util.load_vector("csr-san_512.pem")
@ -91,15 +92,15 @@ class RegisterTest(test_util.ConfigTestCase):
with mock.patch("certbot._internal.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client.new_account_and_tos().terms_of_service = "http://tos"
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.eff.handle_subscription") as mock_handle:
with mock.patch("certbot._internal.eff.prepare_subscription") as mock_prepare:
with mock.patch("certbot._internal.account.report_new_account"):
mock_client().new_account_and_tos.side_effect = errors.Error
self.assertRaises(errors.Error, self._call)
self.assertFalse(mock_handle.called)
self.assertFalse(mock_prepare.called)
mock_client().new_account_and_tos.side_effect = None
self._call()
self.assertTrue(mock_handle.called)
self.assertTrue(mock_prepare.called)
def test_it(self):
with mock.patch("certbot._internal.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
@ -117,11 +118,11 @@ class RegisterTest(test_util.ConfigTestCase):
mx_err = messages.Error.with_code('invalidContact', detail=msg)
with mock.patch("certbot._internal.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.eff.handle_subscription") as mock_handle:
with mock.patch("certbot._internal.eff.prepare_subscription") as mock_prepare:
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
self._call()
self.assertEqual(mock_get_email.call_count, 1)
self.assertTrue(mock_handle.called)
self.assertTrue(mock_prepare.called)
@mock.patch("certbot._internal.account.report_new_account")
def test_email_invalid_noninteractive(self, _rep):
@ -141,7 +142,7 @@ class RegisterTest(test_util.ConfigTestCase):
@mock.patch("certbot._internal.client.logger")
def test_without_email(self, mock_logger):
with mock.patch("certbot._internal.eff.handle_subscription") as mock_handle:
with mock.patch("certbot._internal.eff.prepare_subscription") as mock_prepare:
with mock.patch("certbot._internal.client.acme_client.BackwardsCompatibleClientV2") as mock_clnt:
mock_clnt().external_account_required.side_effect = self._false_mock
with mock.patch("certbot._internal.account.report_new_account"):
@ -150,7 +151,7 @@ class RegisterTest(test_util.ConfigTestCase):
self.config.dry_run = False
self._call()
mock_logger.info.assert_called_once_with(mock.ANY)
self.assertTrue(mock_handle.called)
self.assertTrue(mock_prepare.called)
@mock.patch("certbot._internal.account.report_new_account")
@mock.patch("certbot._internal.client.display_ops.get_email")

View file

@ -1,82 +1,91 @@
"""Tests for certbot._internal.eff."""
import datetime
import unittest
try:
import mock
except ImportError: # pragma: no cover
except ImportError: # pragma: no cover
from unittest import mock
import josepy
import pytz
import requests
from acme import messages
from certbot._internal import account
from certbot._internal import constants
import certbot.tests.util as test_util
class HandleSubscriptionTest(test_util.ConfigTestCase):
"""Tests for certbot._internal.eff.handle_subscription."""
_KEY = josepy.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class SubscriptionTest(test_util.ConfigTestCase):
"""Abstract class for subscription tests."""
def setUp(self):
super(HandleSubscriptionTest, self).setUp()
self.email = 'certbot@example.org'
self.config.email = self.email
super(SubscriptionTest, self).setUp()
self.account = account.Account(
regr=messages.RegistrationResource(
uri=None, body=messages.Registration(),
new_authzr_uri='hi'),
key=_KEY,
meta=account.Account.Meta(
creation_host='test.certbot.org',
creation_dt=datetime.datetime(
2015, 7, 4, 14, 4, 10, tzinfo=pytz.UTC)))
self.config.email = 'certbot@example.org'
self.config.eff_email = None
class PrepareSubscriptionTest(SubscriptionTest):
"""Tests for certbot._internal.eff.prepare_subscription."""
def _call(self):
from certbot._internal.eff import handle_subscription
return handle_subscription(self.config)
from certbot._internal.eff import prepare_subscription
prepare_subscription(self.config, self.account)
@test_util.patch_get_utility()
@mock.patch('certbot._internal.eff.subscribe')
def test_failure(self, mock_subscribe, mock_get_utility):
def test_failure(self, mock_get_utility):
self.config.email = None
self.config.eff_email = True
self._call()
self.assertFalse(mock_subscribe.called)
self.assertFalse(mock_get_utility().yesno.called)
actual = mock_get_utility().add_message.call_args[0][0]
expected_part = "because you didn't provide an e-mail address"
self.assertTrue(expected_part in actual)
@mock.patch('certbot._internal.eff.subscribe')
def test_no_subscribe_with_no_prompt(self, mock_subscribe):
self.config.eff_email = False
with test_util.patch_get_utility() as mock_get_utility:
self._call()
self.assertFalse(mock_subscribe.called)
self._assert_no_get_utility_calls(mock_get_utility)
self.assertIsNone(self.account.meta.register_to_eff)
@test_util.patch_get_utility()
@mock.patch('certbot._internal.eff.subscribe')
def test_subscribe_with_no_prompt(self, mock_subscribe, mock_get_utility):
def test_will_not_subscribe_with_no_prompt(self, mock_get_utility):
self.config.eff_email = False
self._call()
self._assert_no_get_utility_calls(mock_get_utility)
self.assertIsNone(self.account.meta.register_to_eff)
@test_util.patch_get_utility()
def test_will_subscribe_with_no_prompt(self, mock_get_utility):
self.config.eff_email = True
self._call()
self._assert_subscribed(mock_subscribe)
self._assert_no_get_utility_calls(mock_get_utility)
self.assertEqual(self.account.meta.register_to_eff, self.config.email)
@test_util.patch_get_utility()
def test_will_not_subscribe_with_prompt(self, mock_get_utility):
mock_get_utility().yesno.return_value = False
self._call()
self.assertFalse(mock_get_utility().add_message.called)
self._assert_correct_yesno_call(mock_get_utility)
self.assertIsNone(self.account.meta.register_to_eff)
@test_util.patch_get_utility()
def test_will_subscribe_with_prompt(self, mock_get_utility):
mock_get_utility().yesno.return_value = True
self._call()
self.assertFalse(mock_get_utility().add_message.called)
self._assert_correct_yesno_call(mock_get_utility)
self.assertEqual(self.account.meta.register_to_eff, self.config.email)
def _assert_no_get_utility_calls(self, mock_get_utility):
self.assertFalse(mock_get_utility().yesno.called)
self.assertFalse(mock_get_utility().add_message.called)
@test_util.patch_get_utility()
@mock.patch('certbot._internal.eff.subscribe')
def test_subscribe_with_prompt(self, mock_subscribe, mock_get_utility):
mock_get_utility().yesno.return_value = True
self._call()
self._assert_subscribed(mock_subscribe)
self.assertFalse(mock_get_utility().add_message.called)
self._assert_correct_yesno_call(mock_get_utility)
def _assert_subscribed(self, mock_subscribe):
self.assertTrue(mock_subscribe.called)
self.assertEqual(mock_subscribe.call_args[0][0], self.email)
@test_util.patch_get_utility()
@mock.patch('certbot._internal.eff.subscribe')
def test_no_subscribe_with_prompt(self, mock_subscribe, mock_get_utility):
mock_get_utility().yesno.return_value = False
self._call()
self.assertFalse(mock_subscribe.called)
self.assertFalse(mock_get_utility().add_message.called)
self._assert_correct_yesno_call(mock_get_utility)
def _assert_correct_yesno_call(self, mock_get_utility):
self.assertTrue(mock_get_utility().yesno.called)
call_args, call_kwargs = mock_get_utility().yesno.call_args
@ -86,6 +95,25 @@ class HandleSubscriptionTest(test_util.ConfigTestCase):
self.assertFalse(call_kwargs.get('default', True))
class HandleSubscriptionTest(SubscriptionTest):
"""Tests for certbot._internal.eff.handle_subscription."""
def _call(self):
from certbot._internal.eff import handle_subscription
handle_subscription(self.config, self.account)
@mock.patch('certbot._internal.eff.subscribe')
def test_no_subscribe(self, mock_subscribe):
self._call()
self.assertFalse(mock_subscribe.called)
@mock.patch('certbot._internal.eff.subscribe')
def test_subscribe(self, mock_subscribe):
self.account.meta = self.account.meta.update(register_to_eff=self.config.email)
self._call()
self.assertTrue(mock_subscribe.called)
self.assertEqual(mock_subscribe.call_args[0][0], self.config.email)
class SubscribeTest(unittest.TestCase):
"""Tests for certbot._internal.eff.subscribe."""
def setUp(self):

View file

@ -39,6 +39,7 @@ from certbot.compat import os
from certbot.plugins import enhancements
import certbot.tests.util as test_util
CERT_PATH = test_util.vector_path('cert_512.pem')
CERT = test_util.vector_path('cert_512.pem')
CSR = test_util.vector_path('csr_512.der')
@ -71,7 +72,9 @@ class RunTest(test_util.ConfigTestCase):
mock.patch('certbot._internal.main._init_le_client'),
mock.patch('certbot._internal.main._suggest_donation_if_appropriate'),
mock.patch('certbot._internal.main._report_new_cert'),
mock.patch('certbot._internal.main._find_cert')]
mock.patch('certbot._internal.main._find_cert'),
mock.patch('certbot._internal.eff.handle_subscription'),
]
self.mock_auth = patches[0].start()
self.mock_success_installation = patches[1].start()
@ -80,6 +83,7 @@ class RunTest(test_util.ConfigTestCase):
self.mock_suggest_donation = patches[4].start()
self.mock_report_cert = patches[5].start()
self.mock_find_cert = patches[6].start()
self.mock_subscription = patches[7].start()
for patch in patches:
self.addCleanup(patch.stop)
@ -137,7 +141,8 @@ class CertonlyTest(unittest.TestCase):
with mock.patch('certbot._internal.main._init_le_client') as mock_init:
with mock.patch('certbot._internal.main._suggest_donation_if_appropriate'):
main.certonly(config, plugins)
with mock.patch('certbot._internal.eff.handle_subscription'):
main.certonly(config, plugins)
return mock_init() # returns the client
@ -589,13 +594,14 @@ class MainTest(test_util.ConfigTestCase):
args.extend(['--standalone', '-d', 'eg.is'])
self._cli_missing_flag(args, "register before running")
@mock.patch('certbot._internal.eff.handle_subscription')
@mock.patch('certbot._internal.log.post_arg_parse_setup')
@mock.patch('certbot._internal.main._report_new_cert')
@mock.patch('certbot._internal.main.client.acme_client.Client')
@mock.patch('certbot._internal.main._determine_account')
@mock.patch('certbot._internal.main.client.Client.obtain_and_enroll_certificate')
@mock.patch('certbot._internal.main._get_and_save_cert')
def test_user_agent(self, gsc, _obt, det, _client, _, __):
def test_user_agent(self, gsc, _obt, det, _client, _, __, ___):
# Normally the client is totally mocked out, but here we need more
# arguments to automate it...
args = ["--standalone", "certonly", "-m", "none@none.com",
@ -695,10 +701,11 @@ class MainTest(test_util.ConfigTestCase):
self.assertTrue(mock_getcert.called)
self.assertTrue(mock_inst.called)
@mock.patch('certbot._internal.eff.handle_subscription')
@mock.patch('certbot._internal.log.post_arg_parse_setup')
@mock.patch('certbot._internal.main._report_new_cert')
@mock.patch('certbot.util.exe_exists')
def test_configurator_selection(self, mock_exe_exists, _, __):
def test_configurator_selection(self, mock_exe_exists, _, __, ___):
mock_exe_exists.return_value = True
real_plugins = disco.PluginsRegistry.find_all()
args = ['--apache', '--authenticator', 'standalone']
@ -929,9 +936,10 @@ class MainTest(test_util.ConfigTestCase):
# Asserts we don't suggest donating after a successful dry run
self.assertEqual(mock_get_utility().add_message.call_count, 1)
@mock.patch('certbot._internal.eff.handle_subscription')
@mock.patch('certbot.crypto_util.notAfter')
@test_util.patch_get_utility()
def test_certonly_new_request_success(self, mock_get_utility, mock_notAfter):
def test_certonly_new_request_success(self, mock_get_utility, mock_notAfter, mock_subscription):
cert_path = os.path.normpath(os.path.join(self.config.config_dir, 'live/foo.bar'))
key_path = os.path.normpath(os.path.join(self.config.config_dir, 'live/baz.qux'))
date = '1970-01-01'
@ -950,12 +958,15 @@ class MainTest(test_util.ConfigTestCase):
self.assertTrue(key_path in cert_msg)
self.assertTrue(
'donate' in mock_get_utility().add_message.call_args[0][0])
self.assertTrue(mock_subscription.called)
def test_certonly_new_request_failure(self):
@mock.patch('certbot._internal.eff.handle_subscription')
def test_certonly_new_request_failure(self, mock_subscription):
mock_client = mock.MagicMock()
mock_client.obtain_and_enroll_certificate.return_value = False
self.assertRaises(errors.Error,
self._certonly_new_request_common, mock_client)
self.assertFalse(mock_subscription.called)
def _test_renewal_common(self, due_for_renewal, extra_args, log_out=None,
args=None, should_renew=True, error_expected=False,
@ -995,21 +1006,22 @@ class MainTest(test_util.ConfigTestCase):
with mock.patch('certbot._internal.main.renewal.crypto_util') \
as mock_crypto_util:
mock_crypto_util.notAfter.return_value = expiry_date
if not args:
args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly']
if extra_args:
args += extra_args
try:
ret, stdout, _, _ = self._call(args, stdout)
if ret:
print("Returned", ret)
raise AssertionError(ret)
assert not error_expected, "renewal should have errored"
except: # pylint: disable=bare-except
if not error_expected:
raise AssertionError(
"Unexpected renewal error:\n" +
traceback.format_exc())
with mock.patch('certbot._internal.eff.handle_subscription'):
if not args:
args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly']
if extra_args:
args += extra_args
try:
ret, stdout, _, _ = self._call(args, stdout)
if ret:
print("Returned", ret)
raise AssertionError(ret)
assert not error_expected, "renewal should have errored"
except: # pylint: disable=bare-except
if not error_expected:
raise AssertionError(
"Unexpected renewal error:\n" +
traceback.format_exc())
if should_renew:
if reuse_key:
@ -1310,13 +1322,15 @@ class MainTest(test_util.ConfigTestCase):
return mock_get_utility
def test_certonly_csr(self):
@mock.patch('certbot._internal.eff.handle_subscription')
def test_certonly_csr(self, mock_subscription):
mock_get_utility = self._test_certonly_csr_common()
cert_msg = mock_get_utility().add_message.call_args_list[0][0][0]
self.assertTrue('fullchain.pem' in cert_msg)
self.assertFalse('Your key file has been saved at' in cert_msg)
self.assertTrue(
'donate' in mock_get_utility().add_message.call_args[0][0])
self.assertTrue(mock_subscription.called)
def test_certonly_csr_dry_run(self):
mock_get_utility = self._test_certonly_csr_common(['--dry-run'])
@ -1395,7 +1409,7 @@ class MainTest(test_util.ConfigTestCase):
def test_update_account_with_email(self, mock_utility, mock_email):
email = "user@example.com"
mock_email.return_value = email
with mock.patch('certbot._internal.eff.handle_subscription') as mock_handle:
with mock.patch('certbot._internal.eff.prepare_subscription') as mock_prepare:
with mock.patch('certbot._internal.main._determine_account') as mocked_det:
with mock.patch('certbot._internal.main.account') as mocked_account:
with mock.patch('certbot._internal.main.client') as mocked_client:
@ -1415,10 +1429,10 @@ class MainTest(test_util.ConfigTestCase):
self.assertTrue(
cb_client.acme.update_registration.called)
# and we saved the updated registration on disk
self.assertTrue(mocked_storage.save_regr.called)
self.assertTrue(mocked_storage.update_regr.called)
self.assertTrue(
email in mock_utility().add_message.call_args[0][0])
self.assertTrue(mock_handle.called)
self.assertTrue(mock_prepare.called)
@mock.patch('certbot._internal.plugins.selection.choose_configurator_plugins')
@mock.patch('certbot._internal.updater._run_updaters')