Remove Recovery Token.

This commit is contained in:
Jakub Warmuz 2015-07-25 11:53:37 +00:00
parent 2028c3a454
commit e0651ad050
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
20 changed files with 60 additions and 338 deletions

View file

@ -359,23 +359,6 @@ class RecoveryContactResponse(ChallengeResponse):
token = jose.Field("token", omitempty=True)
@Challenge.register
class RecoveryToken(ContinuityChallenge):
"""ACME "recoveryToken" challenge."""
typ = "recoveryToken"
@ChallengeResponse.register
class RecoveryTokenResponse(ChallengeResponse):
"""ACME "recoveryToken" challenge response.
:ivar unicode token:
"""
typ = "recoveryToken"
token = jose.Field("token", omitempty=True)
@Challenge.register
class ProofOfPossession(ContinuityChallenge):
"""ACME "proofOfPossession" challenge.

View file

@ -368,58 +368,6 @@ class RecoveryContactResponseTest(unittest.TestCase):
self.assertEqual(self.jmsg, msg.to_partial_json())
class RecoveryTokenTest(unittest.TestCase):
def setUp(self):
from acme.challenges import RecoveryToken
self.msg = RecoveryToken()
self.jmsg = {'type': 'recoveryToken'}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import RecoveryToken
self.assertEqual(self.msg, RecoveryToken.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import RecoveryToken
hash(RecoveryToken.from_json(self.jmsg))
class RecoveryTokenResponseTest(unittest.TestCase):
def setUp(self):
from acme.challenges import RecoveryTokenResponse
self.msg = RecoveryTokenResponse(token='23029d88d9e123e')
self.jmsg = {
'resource': 'challenge',
'type': 'recoveryToken',
'token': '23029d88d9e123e'
}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import RecoveryTokenResponse
self.assertEqual(
self.msg, RecoveryTokenResponse.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import RecoveryTokenResponse
hash(RecoveryTokenResponse.from_json(self.jmsg))
def test_json_without_optionals(self):
del self.jmsg['token']
from acme.challenges import RecoveryTokenResponse
msg = RecoveryTokenResponse.from_json(self.jmsg)
self.assertTrue(msg.token is None)
self.assertEqual(self.jmsg, msg.to_partial_json())
class ProofOfPossessionHintsTest(unittest.TestCase):
def setUp(self):

View file

@ -44,7 +44,7 @@ class ClientTest(unittest.TestCase):
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public_key(), recovery_token='t')
contact=self.contact, key=KEY.public_key())
self.new_reg = messages.NewRegistration(**dict(reg))
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',

View file

@ -156,7 +156,6 @@ class Registration(ResourceBody):
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec,
`tuple` of `unicode`.
:ivar unicode recovery_token:
:ivar unicode agreement:
"""
@ -164,7 +163,6 @@ class Registration(ResourceBody):
# JWS.signature.combined.jwk
key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json)
contact = jose.Field('contact', omitempty=True, default=())
recovery_token = jose.Field('recoveryToken', omitempty=True)
agreement = jose.Field('agreement', omitempty=True)
phone_prefix = 'tel:'

View file

@ -101,18 +101,14 @@ class RegistrationTest(unittest.TestCase):
'mailto:admin@foo.com',
'tel:1234',
)
recovery_token = 'XYZ'
agreement = 'https://letsencrypt.org/terms'
from acme.messages import Registration
self.reg = Registration(
key=key, contact=contact, recovery_token=recovery_token,
agreement=agreement)
self.reg = Registration(key=key, contact=contact, agreement=agreement)
self.reg_none = Registration()
self.jobj_to = {
'contact': contact,
'recoveryToken': recovery_token,
'agreement': agreement,
'key': key,
}
@ -232,7 +228,7 @@ class AuthorizationTest(unittest.TestCase):
ChallengeBody(uri='http://challb2', status=STATUS_VALID,
chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')),
ChallengeBody(uri='http://challb3', status=STATUS_VALID,
chall=challenges.RecoveryToken()),
chall=challenges.RecoveryContact()),
)
combinations = ((0, 2), (1, 2))

View file

@ -94,15 +94,11 @@ def report_new_account(acc, config):
config.config_dir),
reporter.MEDIUM_PRIORITY, True)
assert acc.regr.body.recovery_token is not None
recovery_msg = ("If you lose your account credentials, you can recover "
"them using the token \"{0}\". You must write that down "
"and put it in a safe place.".format(
acc.regr.body.recovery_token))
if acc.regr.body.emails:
recovery_msg += (" Another recovery method will be e-mails sent to "
"{0}.".format(", ".join(acc.regr.body.emails)))
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
recovery_msg = ("If you lose your account credentials, you can "
"recover through e-mails sent to {0}.".format(
", ".join(acc.regr.body.emails)))
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
class AccountMemoryStorage(interfaces.AccountStorage):

View file

@ -99,12 +99,6 @@ class RecoveryContact(AnnotatedChallenge):
acme_type = challenges.RecoveryContact
class RecoveryToken(AnnotatedChallenge):
"""Client annotated "recoveryToken" ACME challenge."""
__slots__ = ('challb', 'domain')
acme_type = challenges.RecoveryToken
class ProofOfPossession(AnnotatedChallenge):
"""Client annotated "proofOfPossession" ACME challenge."""
__slots__ = ('challb', 'domain')

View file

@ -349,8 +349,6 @@ def challb_to_achall(challb, account, domain):
challb=challb, domain=domain, key=account.key)
elif isinstance(chall, challenges.DNS):
return achallenges.DNS(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryToken):
return achallenges.RecoveryToken(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryContact):
return achallenges.RecoveryContact(
challb=challb, domain=domain)

View file

@ -22,7 +22,6 @@ class NamespaceConfig(object):
- `cert_key_backup`
- `in_progress_dir`
- `key_dir`
- `rec_token_dir`
- `renewer_config_file`
- `temp_checkpoint_dir`
@ -71,11 +70,6 @@ class NamespaceConfig(object):
def key_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.config_dir, constants.KEY_DIR)
# TODO: This should probably include the server name
@property
def rec_token_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.work_dir, constants.REC_TOKEN_DIR)
@property
def temp_checkpoint_dir(self): # pylint: disable=missing-docstring
return os.path.join(

View file

@ -88,10 +88,6 @@ LIVE_DIR = "live"
TEMP_CHECKPOINT_DIR = "temp_checkpoint"
"""Temporary checkpoint directory (relative to `IConfig.work_dir`)."""
REC_TOKEN_DIR = "recovery_tokens"
"""Directory where all recovery tokens are saved (relative to
`IConfig.work_dir`)."""
RENEWAL_CONFIGS_DIR = "configs"
"""Renewal configs directory, relative to `IConfig.config_dir`."""

View file

@ -7,16 +7,12 @@ from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt import proof_of_possession
from letsencrypt import recovery_token
class ContinuityAuthenticator(object):
"""IAuthenticator for
:const:`~acme.challenges.ContinuityChallenge` class challenges.
:ivar rec_token: Performs "recoveryToken" challenges.
:type rec_token: :class:`letsencrypt.recovery_token.RecoveryToken`
:ivar proof_of_pos: Performs "proofOfPossession" challenges.
:type proof_of_pos:
:class:`letsencrypt.proof_of_possession.Proof_of_Possession`
@ -25,7 +21,7 @@ class ContinuityAuthenticator(object):
zope.interface.implements(interfaces.IAuthenticator)
# This will have an installer soon for get_key/cert purposes
def __init__(self, config, installer):
def __init__(self, config, installer): # pylint: disable=unused-argument
"""Initialize Client Authenticator.
:param config: Configuration.
@ -35,13 +31,11 @@ class ContinuityAuthenticator(object):
:type installer: :class:`letsencrypt.interfaces.IInstaller`
"""
self.rec_token = recovery_token.RecoveryToken(
config.server, config.rec_token_dir)
self.proof_of_pos = proof_of_possession.ProofOfPossession(installer)
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Return list of challenge preferences."""
return [challenges.ProofOfPossession, challenges.RecoveryToken]
return [challenges.ProofOfPossession]
def perform(self, achalls):
"""Perform client specific challenges for IAuthenticator"""
@ -49,16 +43,12 @@ class ContinuityAuthenticator(object):
for achall in achalls:
if isinstance(achall, achallenges.ProofOfPossession):
responses.append(self.proof_of_pos.perform(achall))
elif isinstance(achall, achallenges.RecoveryToken):
responses.append(self.rec_token.perform(achall))
else:
raise errors.ContAuthError("Unexpected Challenge")
return responses
def cleanup(self, achalls):
def cleanup(self, achalls): # pylint: disable=no-self-use
"""Cleanup call for IAuthenticator."""
for achall in achalls:
if isinstance(achall, achallenges.RecoveryToken):
self.rec_token.cleanup(achall)
elif not isinstance(achall, achallenges.ProofOfPossession):
if not isinstance(achall, achallenges.ProofOfPossession):
raise errors.ContAuthError("Unexpected Challenge")

View file

@ -206,8 +206,6 @@ class IConfig(zope.interface.Interface):
in_progress_dir = zope.interface.Attribute(
"Directory used before a permanent checkpoint is finalized.")
key_dir = zope.interface.Attribute("Keys storage.")
rec_token_dir = zope.interface.Attribute(
"Directory where all recovery tokens are saved.")
temp_checkpoint_dir = zope.interface.Attribute(
"Temporary checkpoint directory.")

View file

@ -1,72 +0,0 @@
"""Recovery Token Identifier Validation Challenge."""
import errno
import os
import zope.component
from acme import challenges
from letsencrypt import le_util
from letsencrypt import interfaces
class RecoveryToken(object):
"""Recovery Token Identifier Validation Challenge.
Based on draft-barnes-acme, section 6.4.
"""
def __init__(self, server, direc):
self.token_dir = os.path.join(direc, server)
def perform(self, chall):
"""Perform the Recovery Token Challenge.
:param chall: Recovery Token Challenge
:type chall: :class:`letsencrypt.achallenges.RecoveryToken`
:returns: response
:rtype: dict
"""
token_fp = os.path.join(self.token_dir, chall.domain)
if os.path.isfile(token_fp):
with open(token_fp) as token_fd:
return challenges.RecoveryTokenResponse(token=token_fd.read())
cancel, token = zope.component.getUtility(
interfaces.IDisplay).input(
"%s - Input Recovery Token: " % chall.domain)
if cancel != 1:
return challenges.RecoveryTokenResponse(token=token)
return None
def cleanup(self, chall):
"""Cleanup the saved recovery token if it exists.
:param chall: Recovery Token Challenge
:type chall: :class:`letsencrypt.achallenges.RecoveryToken`
"""
try:
le_util.safely_remove(os.path.join(self.token_dir, chall.domain))
except OSError as err:
if err.errno != errno.ENOENT:
raise
def requires_human(self, domain):
"""Indicates whether or not domain can be auto solved."""
return not os.path.isfile(os.path.join(self.token_dir, domain))
def store_token(self, domain, token):
"""Store token for later automatic use.
:param str domain: domain associated with the token
:param str token: token from authorization
"""
le_util.make_or_verify_dir(self.token_dir, 0o700, os.geteuid())
with open(os.path.join(self.token_dir, domain), "w") as token_fd:
token_fd.write(str(token))

View file

@ -63,7 +63,6 @@ class ReportNewAccountTest(unittest.TestCase):
def setUp(self):
self.config = mock.MagicMock(config_dir="/etc/letsencrypt")
reg = messages.Registration.from_data(email="rhino@jungle.io")
reg = reg.update(recovery_token="ECCENTRIC INVISIBILITY RHINOCEROS")
self.acc = mock.MagicMock(regr=messages.RegistrationResource(
uri=None, new_authzr_uri=None, body=reg))
@ -81,7 +80,6 @@ class ReportNewAccountTest(unittest.TestCase):
self._call()
call_list = mock_zope().add_message.call_args_list
self.assertTrue(self.config.config_dir in call_list[0][0][0])
self.assertTrue(self.acc.regr.body.recovery_token in call_list[1][0][0])
self.assertTrue(
", ".join(self.acc.regr.body.emails) in call_list[1][0][0])

View file

@ -21,7 +21,6 @@ RECOVERY_CONTACT = challenges.RecoveryContact(
activation_url="https://example.ca/sendrecovery/a5bd99383fb0",
success_url="https://example.ca/confirmrecovery/bb1b9928932",
contact="c********n@example.com")
RECOVERY_TOKEN = challenges.RecoveryToken()
POP = challenges.ProofOfPossession(
alg="RS256", nonce=jose.b64decode("eET5udtV7aoX8Xl8gYiZIA"),
hints=challenges.ProofOfPossession.Hints(
@ -42,7 +41,7 @@ POP = challenges.ProofOfPossession(
)
)
CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, RECOVERY_TOKEN, POP]
CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, POP]
DV_CHALLENGES = [chall for chall in CHALLENGES
if isinstance(chall, challenges.DVChallenge)]
CONT_CHALLENGES = [chall for chall in CHALLENGES
@ -84,11 +83,9 @@ DVSNI_P = chall_to_challb(DVSNI, messages.STATUS_PENDING)
SIMPLE_HTTP_P = chall_to_challb(SIMPLE_HTTP, messages.STATUS_PENDING)
DNS_P = chall_to_challb(DNS, messages.STATUS_PENDING)
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages.STATUS_PENDING)
RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, messages.STATUS_PENDING)
POP_P = chall_to_challb(POP, messages.STATUS_PENDING)
CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P,
RECOVERY_CONTACT_P, RECOVERY_TOKEN_P, POP_P]
CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P, RECOVERY_CONTACT_P, POP_P]
DV_CHALLENGES_P = [challb for challb in CHALLENGES_P
if isinstance(challb.chall, challenges.DVChallenge)]
CONT_CHALLENGES_P = [

View file

@ -19,7 +19,6 @@ TRANSLATE = {
"dvsni": "DVSNI",
"simpleHttp": "SimpleHTTP",
"dns": "DNS",
"recoveryToken": "RecoveryToken",
"recoveryContact": "RecoveryContact",
"proofOfPossession": "ProofOfPossession",
}
@ -41,7 +40,8 @@ class ChallengeFactoryTest(unittest.TestCase):
[messages.STATUS_PENDING]*6, False)
def test_all(self):
cont_c, dv_c = self.handler._challenge_factory(self.dom, range(0, 6))
cont_c, dv_c = self.handler._challenge_factory(
self.dom, range(0, len(acme_util.CHALLENGES)))
self.assertEqual(
[achall.chall for achall in cont_c], acme_util.CONT_CHALLENGES)
@ -49,10 +49,10 @@ class ChallengeFactoryTest(unittest.TestCase):
[achall.chall for achall in dv_c], acme_util.DV_CHALLENGES)
def test_one_dv_one_cont(self):
cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 4])
cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 3])
self.assertEqual(
[achall.chall for achall in cont_c], [acme_util.RECOVERY_TOKEN])
[achall.chall for achall in cont_c], [acme_util.RECOVERY_CONTACT])
self.assertEqual([achall.chall for achall in dv_c], [acme_util.DVSNI])
def test_unrecognized(self):
@ -80,7 +80,7 @@ class GetAuthorizationsTest(unittest.TestCase):
self.mock_dv_auth.get_chall_pref.return_value = [challenges.DVSNI]
self.mock_cont_auth.get_chall_pref.return_value = [
challenges.RecoveryToken]
challenges.RecoveryContact]
self.mock_cont_auth.perform.side_effect = gen_auth_resp
self.mock_dv_auth.perform.side_effect = gen_auth_resp
@ -313,11 +313,11 @@ class GenChallengePathTest(unittest.TestCase):
self.assertTrue(self._call(challbs[::-1], prefs, None))
def test_common_case_with_continuity(self):
challbs = (acme_util.RECOVERY_TOKEN_P,
challbs = (acme_util.POP_P,
acme_util.RECOVERY_CONTACT_P,
acme_util.DVSNI_P,
acme_util.SIMPLE_HTTP_P)
prefs = [challenges.RecoveryToken, challenges.DVSNI]
prefs = [challenges.ProofOfPossession, challenges.DVSNI]
combos = acme_util.gen_combos(challbs)
self.assertEqual(self._call(challbs, prefs, combos), (0, 2))
@ -325,21 +325,19 @@ class GenChallengePathTest(unittest.TestCase):
self.assertTrue(self._call(challbs, prefs, None))
def test_full_cont_server(self):
challbs = (acme_util.RECOVERY_TOKEN_P,
acme_util.RECOVERY_CONTACT_P,
challbs = (acme_util.RECOVERY_CONTACT_P,
acme_util.POP_P,
acme_util.DVSNI_P,
acme_util.SIMPLE_HTTP_P,
acme_util.DNS_P)
# Typical webserver client that can do everything except DNS
# Attempted to make the order realistic
prefs = [challenges.RecoveryToken,
challenges.ProofOfPossession,
prefs = [challenges.ProofOfPossession,
challenges.SimpleHTTP,
challenges.DVSNI,
challenges.RecoveryContact]
combos = acme_util.gen_combos(challbs)
self.assertEqual(self._call(challbs, prefs, combos), (0, 4))
self.assertEqual(self._call(challbs, prefs, combos), (1, 3))
# Dumb path trivial test
self.assertTrue(self._call(challbs, prefs, None))

View file

@ -36,7 +36,6 @@ class NamespaceConfigTest(unittest.TestCase):
constants.CERT_DIR = 'certs'
constants.IN_PROGRESS_DIR = '../p'
constants.KEY_DIR = 'keys'
constants.REC_TOKEN_DIR = '/r'
constants.TEMP_CHECKPOINT_DIR = 't'
self.assertEqual(
@ -47,7 +46,6 @@ class NamespaceConfigTest(unittest.TestCase):
self.config.cert_key_backup, '/tmp/foo/c/acme-server.org:443/new')
self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p')
self.assertEqual(self.config.key_dir, '/tmp/config/keys')
self.assertEqual(self.config.rec_token_dir, '/r')
self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t')

View file

@ -17,44 +17,19 @@ class PerformTest(unittest.TestCase):
self.auth = ContinuityAuthenticator(
mock.MagicMock(server="demo_server.org"), None)
self.auth.rec_token.perform = mock.MagicMock(
name="rec_token_perform", side_effect=gen_client_resp)
self.auth.proof_of_pos.perform = mock.MagicMock(
name="proof_of_pos_perform", side_effect=gen_client_resp)
def test_rec_token1(self):
token = achallenges.RecoveryToken(challb=None, domain="0")
responses = self.auth.perform([token])
self.assertEqual(responses, ["RecoveryToken0"])
def test_rec_token5(self):
tokens = []
for i in xrange(5):
tokens.append(achallenges.RecoveryToken(challb=None, domain=str(i)))
responses = self.auth.perform(tokens)
self.assertEqual(len(responses), 5)
for i in xrange(5):
self.assertEqual(responses[i], "RecoveryToken%d" % i)
def test_pop_and_rec_token(self):
def test_pop(self):
achalls = []
for i in xrange(4):
if i % 2 == 0:
achalls.append(achallenges.RecoveryToken(challb=None,
domain=str(i)))
else:
achalls.append(achallenges.ProofOfPossession(challb=None,
domain=str(i)))
achalls.append(achallenges.ProofOfPossession(
challb=None, domain=str(i)))
responses = self.auth.perform(achalls)
self.assertEqual(len(responses), 4)
for i in xrange(4):
if i % 2 == 0:
self.assertEqual(responses[i], "RecoveryToken%d" % i)
else:
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
def test_unexpected(self):
self.assertRaises(
@ -65,7 +40,7 @@ class PerformTest(unittest.TestCase):
def test_chall_pref(self):
self.assertEqual(
self.auth.get_chall_pref("example.com"),
[challenges.ProofOfPossession, challenges.RecoveryToken])
[challenges.ProofOfPossession])
class CleanupTest(unittest.TestCase):
@ -76,25 +51,11 @@ class CleanupTest(unittest.TestCase):
self.auth = ContinuityAuthenticator(
mock.MagicMock(server="demo_server.org"), None)
self.mock_cleanup = mock.MagicMock(name="rec_token_cleanup")
self.auth.rec_token.cleanup = self.mock_cleanup
def test_rec_token2(self):
token1 = achallenges.RecoveryToken(challb=None, domain="0")
token2 = achallenges.RecoveryToken(challb=None, domain="1")
self.auth.cleanup([token1, token2])
self.assertEqual(self.mock_cleanup.call_args_list,
[mock.call(token1), mock.call(token2)])
def test_unexpected(self):
token = achallenges.RecoveryToken(challb=None, domain="0")
unexpected = achallenges.DVSNI(
challb=None, domain="0", account=mock.Mock("dummy_key"))
self.assertRaises(
errors.ContAuthError, self.auth.cleanup, [token, unexpected])
self.assertRaises(errors.ContAuthError, self.auth.cleanup, [unexpected])
def gen_client_resp(chall):

View file

@ -166,6 +166,37 @@ class UniqueLineageNameTest(unittest.TestCase):
self.assertRaises(OSError, self._call, "wow")
class SafelyRemoveTest(unittest.TestCase):
"""Tests for letsencrypt.le_util.safely_remove."""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.path = os.path.join(self.tmp, "foo")
def tearDown(self):
shutil.rmtree(self.tmp)
def _call(self):
from letsencrypt.le_util import safely_remove
return safely_remove(self.path)
def test_exists(self):
with open(self.path, "w"):
pass # just create the file
self._call()
self.assertFalse(os.path.exists(self.path))
def test_missing(self):
self._call()
# no error, yay!
self.assertFalse(os.path.exists(self.path))
@mock.patch("letsencrypt.le_util.os.remove")
def test_other_error_passthrough(self, mock_remove):
mock_remove.side_effect = OSError
self.assertRaises(OSError, self._call)
class SafeEmailTest(unittest.TestCase):
"""Test safe_email."""
@classmethod

View file

@ -1,80 +0,0 @@
"""Tests for recovery_token.py."""
import os
import unittest
import shutil
import tempfile
import mock
from acme import challenges
from letsencrypt import achallenges
class RecoveryTokenTest(unittest.TestCase):
def setUp(self):
from letsencrypt.recovery_token import RecoveryToken
server = "demo_server"
self.base_dir = tempfile.mkdtemp("tokens")
self.token_dir = os.path.join(self.base_dir, server)
self.rec_token = RecoveryToken(server, self.base_dir)
def tearDown(self):
shutil.rmtree(self.base_dir)
def test_store_token(self):
self.rec_token.store_token("example.com", 111)
path = os.path.join(self.token_dir, "example.com")
self.assertTrue(os.path.isfile(path))
with open(path) as token_fd:
self.assertEqual(token_fd.read(), "111")
def test_requires_human(self):
self.rec_token.store_token("example2.com", 222)
self.assertFalse(self.rec_token.requires_human("example2.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
def test_cleanup(self):
self.rec_token.store_token("example3.com", 333)
self.assertFalse(self.rec_token.requires_human("example3.com"))
self.rec_token.cleanup(achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example3.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
# Shouldn't throw an error
self.rec_token.cleanup(achallenges.RecoveryToken(
challb=None, domain="example4.com"))
# SHOULD throw an error (OSError other than nonexistent file)
self.assertRaises(
OSError, self.rec_token.cleanup,
achallenges.RecoveryToken(
challb=None, domain=("a" + "r" * 10000 + ".com")))
def test_perform_stored(self):
self.rec_token.store_token("example4.com", 444)
response = self.rec_token.perform(
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example4.com"))
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="444"))
@mock.patch("letsencrypt.recovery_token.zope.component.getUtility")
def test_perform_not_stored(self, mock_input):
mock_input().input.side_effect = [(0, "555"), (1, "000")]
response = self.rec_token.perform(
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example5.com"))
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="555"))
response = self.rec_token.perform(
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example6.com"))
self.assertTrue(response is None)
if __name__ == "__main__":
unittest.main() # pragma: no cover