Merge the manual and script plugins (#3890)

* Start of combined manual/script plugin

* Return str from hooks.execute, not bytes

* finish manual/script rewrite

* delete old manual and script plugins

* manually specify we want chall.token

* use consistent quotes

* specify chall for uri

* s/script/hook

* fix spacing on instructions

* remove unneeded response argument

* make achall more helpful

* simplify perform

* remove old test files

* add start of manual_tests

* fix ParseTest.test_help

* stop using manual_test_mode in cli tests

* Revert "make achall more helpful"

This reverts commit 54b01cea30.

* use bad response/validation methods on achalls

* simplify perform and cleanup environment

* finish manual tests

* Add HTTP manual hook integration test

* add manual http scripts

* Add manual DNS script integration test

* remove references to the script plugin

* they're hooks, not scripts

* add --manual-public-ip-logging-ok to integration tests

* use --pref-chall for dns integration

* does dns work?

* validate hooks

* test hook validation

* Revert "does dns work?"

This reverts commit 1224cc2961.

* busy wait in manual-http-auth

* remove DNS script test for now

* Fix challenge prefix and add trailing .

* Add comment about universal_newlines

* Fix typo from 0464ba2c4

* fix nits and typos

* Generalize HookCOmmandNotFound error

* Add verify_exe_exists

* Don't duplicate code in hooks.py

* Revert changes to hooks.py

* Use consistent hook error messages
This commit is contained in:
Brad Warren 2016-12-22 08:24:08 -08:00 committed by GitHub
parent 44d5886429
commit 39f5551305
15 changed files with 237 additions and 655 deletions

View file

@ -72,7 +72,7 @@ obtain, install, and renew certificates:
--standalone Run a standalone webserver for authentication
%s
--webroot Place files in a server's webroot folder for authentication
--manual Obtain certs interactively, or using shell script hoooks
--manual Obtain certs interactively, or using shell script hooks
-n Run non-interactively
--test-cert Obtain a test cert from a staging server
@ -100,7 +100,7 @@ More detailed help:
all, automation, commands, paths, security, testing, or any of the
subcommands or plugins (certonly, renew, install, register, nginx,
apache, standalone, webroot, script, etc.)
apache, standalone, webroot, etc.)
"""
@ -1153,8 +1153,6 @@ def _plugins_parsing(helpful, plugins):
"--nginx", action="store_true", help="Obtain and install certs using Nginx")
helpful.add(["plugins", "certonly"], "--standalone", action="store_true",
help='Obtain certs using a "standalone" webserver.')
helpful.add(["plugins", "certonly"], "--script", action="store_true",
help='Obtain certs using shell script(s)')
helpful.add(["plugins", "certonly"], "--manual", action="store_true",
help='Provide laborious manual instructions for obtaining a cert')
helpful.add(["plugins", "certonly"], "--webroot", action="store_true",

View file

@ -84,7 +84,10 @@ def execute(shell_cmd):
:returns: `tuple` (`str` stderr, `str` stdout)"""
cmd = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=PIPE)
# universal_newlines causes Popen.communicate()
# to return str objects instead of bytes in Python 3
cmd = Popen(shell_cmd, shell=True, stdout=PIPE,
stderr=PIPE, universal_newlines=True)
out, err = cmd.communicate()
if cmd.returncode != 0:
logger.error('Hook command "%s" returned error code %d',

View file

@ -1,56 +1,49 @@
"""Manual plugin."""
"""Manual authenticator plugin"""
import os
import logging
import pipes
import shutil
import socket
import subprocess
import sys
import tempfile
import time
import six
import zope.component
import zope.interface
from acme import challenges
from acme import errors as acme_errors
from certbot import errors
from certbot import interfaces
from certbot import errors
from certbot import hooks
from certbot.plugins import common
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
"""Manual Authenticator.
"""Manual authenticator
This plugin requires user's manual intervention in setting up a HTTP
server for solving http-01 challenges and thus does not need to be
run as a privileged process. Alternatively shows instructions on how
to use Python's built-in HTTP server.
.. todo:: Support for `~.challenges.TLSSNI01`.
This plugin allows the user to perform the domain validation
challenge(s) themselves. This either be done manually by the user or
through shell scripts provided to Certbot.
"""
description = 'Manual configuration or run your own shell scripts'
hidden = True
description = "Manually configure an HTTP server"
MESSAGE_TEMPLATE = {
"dns-01": """\
long_description = (
'Authenticate through manual configuration or custom shell scripts. '
'When using shell scripts, an authenticator script must be provided. '
'The environment variables available to this script are '
'$CERTBOT_DOMAIN which contains the domain being authenticated, '
'$CERTBOT_VALIDATION which is the validation string, and '
'$CERTBOT_TOKEN which is the filename of the resource requested when '
'performing an HTTP-01 challenge. An additional cleanup script can '
'also be provided and can use the additional variable '
'$CERTBOT_AUTH_OUTPUT which contains the stdout output from the auth '
'script.')
_DNS_INSTRUCTIONS = """\
Please deploy a DNS TXT record under the name
{domain} with the following value:
{validation}
Once this is deployed,
""",
"http-01": """\
Once this is deployed,"""
_HTTP_INSTRUCTIONS = """\
Make sure your web server displays the following content at
{uri} before continuing:
@ -59,204 +52,114 @@ Make sure your web server displays the following content at
If you don't have HTTP server configured, you can run the following
command on the target server (as root):
{command}
"""}
# a disclaimer about your current IP being transmitted to Let's Encrypt's servers.
IP_DISCLAIMER = """\
NOTE: The IP of this machine will be publicly logged as having requested this certificate. \
If you're running certbot in manual mode on a machine that is not your server, \
please ensure you're okay with that.
Are you OK with your IP being logged?
"""
# "cd /tmp/certbot" makes sure user doesn't serve /root,
# separate "public_html" ensures that cert.pem/key.pem are not
# served and makes it more obvious that Python command will serve
# anything recursively under the cwd
CMD_TEMPLATE = """\
mkdir -p {root}/public_html/{achall.URI_ROOT_PATH}
cd {root}/public_html
mkdir -p /tmp/certbot/public_html/{achall.URI_ROOT_PATH}
cd /tmp/certbot/public_html
printf "%s" {validation} > {achall.URI_ROOT_PATH}/{encoded_token}
# run only once per server:
$(command -v python2 || command -v python2.7 || command -v python2.6) -c \\
"import BaseHTTPServer, SimpleHTTPServer; \\
s = BaseHTTPServer.HTTPServer(('', {port}), SimpleHTTPServer.SimpleHTTPRequestHandler); \\
s.serve_forever()" """
"""Command template."""
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self._root = (tempfile.mkdtemp() if self.conf("test-mode")
else "/tmp/certbot")
self._httpd = None
self.env = dict()
@classmethod
def add_parser_arguments(cls, add):
add("test-mode", action="store_true",
help="Test mode. Executes the manual command in subprocess.")
add("public-ip-logging-ok", action="store_true",
help="Automatically allows public IP logging. (default: Ask)")
add('auth-hook',
help='Path or command to execute for the authentication script')
add('cleanup-hook',
help='Path or command to execute for the cleanup script')
add('public-ip-logging-ok', action='store_true',
help='Automatically allows public IP logging (default: Ask)')
def prepare(self): # pylint: disable=missing-docstring,no-self-use
if self.config.noninteractive_mode and not self.conf("test-mode"):
raise errors.PluginError("Running manual mode non-interactively is not supported")
def prepare(self): # pylint: disable=missing-docstring
if self.config.noninteractive_mode and not self.conf('auth-hook'):
raise errors.PluginError(
'An authentication script must be provided with --{0} when '
'using the manual plugin non-interactively.'.format(
self.option_name('auth-hook')))
self._validate_hooks()
def _validate_hooks(self):
if self.config.validate_hooks:
for name in ('auth-hook', 'cleanup-hook'):
hook = self.conf(name)
if hook is not None:
hook_prefix = self.option_name(name)[:-len('-hook')]
hooks.validate_hook(hook, hook_prefix)
def more_info(self): # pylint: disable=missing-docstring,no-self-use
return ("This plugin requires user's manual intervention in setting "
"up challenges to prove control of a domain and does not need "
"to be run as a privileged process. When solving "
"http-01 challenges, the user is responsible for setting up "
"an HTTP server. Alternatively, instructions are shown on how "
"to use Python's built-in HTTP server. The user is "
"responsible for configuration of a domain's DNS when solving "
"dns-01 challenges. The type of challenges used can be "
"controlled through the --preferred-challenges flag.")
return (
'This plugin allows the user to customize setup for domain '
'validation challenges either through shell scripts provided by '
'the user or by performing the setup manually.')
def get_chall_pref(self, domain):
# pylint: disable=missing-docstring,no-self-use,unused-argument
return [challenges.HTTP01, challenges.DNS01]
def perform(self, achalls):
# pylint: disable=missing-docstring
self._get_ip_logging_permission()
mapping = {"http-01": self._perform_http01_challenge,
"dns-01": self._perform_dns01_challenge}
def perform(self, achalls): # pylint: disable=missing-docstring
self._verify_ip_logging_ok()
if self.conf('auth-hook'):
perform_achall = self._perform_achall_with_script
else:
perform_achall = self._perform_achall_manually
responses = []
# TODO: group achalls by the same socket.gethostbyname(_ex)
# and prompt only once per server (one "echo -n" per domain)
for achall in achalls:
responses.append(mapping[achall.typ](achall))
perform_achall(achall)
responses.append(achall.response(achall.account_key))
return responses
@classmethod
def _test_mode_busy_wait(cls, port):
while True:
time.sleep(1)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(("localhost", port))
except socket.error: # pragma: no cover
pass
def _verify_ip_logging_ok(self):
if not self.conf('public-ip-logging-ok'):
cli_flag = '--{0}'.format(self.option_name('public-ip-logging-ok'))
msg = ('NOTE: The IP of this machine will be publicly logged as '
"having requested this certificate. If you're running "
'certbot in manual mode on a machine that is not your '
"server, please ensure you're okay with that.\n\n"
'Are you OK with your IP being logged?')
display = zope.component.getUtility(interfaces.IDisplay)
if display.yesno(msg, cli_flag=cli_flag, force_interactive=True):
setattr(self.config, self.dest('public-ip-logging-ok'), True)
else:
break
finally:
sock.close()
raise errors.PluginError('Must agree to IP logging to proceed')
def cleanup(self, achalls):
# pylint: disable=missing-docstring
for achall in achalls:
if isinstance(achall.chall, challenges.HTTP01):
self._cleanup_http01_challenge(achall)
def _perform_http01_challenge(self, achall):
# same path for each challenge response would be easier for
# users, but will not work if multiple domains point at the
# same server: default command doesn't support virtual hosts
response, validation = achall.response_and_validation()
port = (response.port if self.config.http01_port is None
else int(self.config.http01_port))
command = self.CMD_TEMPLATE.format(
root=self._root, achall=achall, response=response,
# TODO(kuba): pipes still necessary?
validation=pipes.quote(validation),
encoded_token=achall.chall.encode("token"),
port=port)
if self.conf("test-mode"):
logger.debug("Test mode. Executing the manual command: %s", command)
# sh shipped with OS X does't support echo -n, but supports printf
try:
self._httpd = subprocess.Popen(
command,
# don't care about setting stdout and stderr,
# we're in test mode anyway
shell=True,
executable=None,
# "preexec_fn" is UNIX specific, but so is "command"
preexec_fn=os.setsid)
except OSError as error: # ValueError should not happen!
logger.debug(
"Couldn't execute manual command: %s", error, exc_info=True)
return False
logger.debug("Manual command running as PID %s.", self._httpd.pid)
# give it some time to bootstrap, before we try to verify
# (cert generation in case of simpleHttpS might take time)
self._test_mode_busy_wait(port)
if self._httpd.poll() is not None:
raise errors.Error("Couldn't execute manual command")
def _perform_achall_with_script(self, achall):
env = dict(CERTBOT_DOMAIN=achall.domain,
CERTBOT_VALIDATION=achall.validation(achall.account_key))
if isinstance(achall.chall, challenges.HTTP01):
env['CERTBOT_TOKEN'] = achall.chall.encode('token')
else:
self._notify_and_wait(
self._get_message(achall).format(
validation=validation,
response=response,
uri=achall.chall.uri(achall.domain),
command=command))
os.environ.pop('CERTBOT_TOKEN', None)
os.environ.update(env)
_, out = hooks.execute(self.conf('auth-hook'))
env['CERTBOT_AUTH_OUTPUT'] = out.strip()
self.env[achall.domain] = env
if not response.simple_verify(
achall.chall, achall.domain,
achall.account_key.public_key(), self.config.http01_port):
logger.warning("Self-verify of challenge failed.")
return response
def _perform_dns01_challenge(self, achall):
response, validation = achall.response_and_validation()
if not self.conf("test-mode"):
self._notify_and_wait(
self._get_message(achall).format(
validation=validation,
domain=achall.validation_domain_name(achall.domain),
response=response))
try:
verification_status = response.simple_verify(
achall.chall, achall.domain,
achall.account_key.public_key())
except acme_errors.DependencyError:
logger.warning("Self verification requires optional "
"dependency `dnspython` to be installed.")
def _perform_achall_manually(self, achall):
validation = achall.validation(achall.account_key)
if isinstance(achall.chall, challenges.HTTP01):
msg = self._HTTP_INSTRUCTIONS.format(
achall=achall, encoded_token=achall.chall.encode('token'),
port=self.config.http01_port,
uri=achall.chall.uri(achall.domain), validation=validation)
else:
if not verification_status:
logger.warning("Self-verify of challenge failed.")
assert isinstance(achall.chall, challenges.DNS01)
msg = self._DNS_INSTRUCTIONS.format(
domain=achall.validation_domain_name(achall.domain),
validation=validation)
display = zope.component.getUtility(interfaces.IDisplay)
display.notification(msg, wrap=False, force_interactive=True)
return response
def _cleanup_http01_challenge(self, achall):
# pylint: disable=missing-docstring,unused-argument
if self.conf("test-mode"):
assert self._httpd is not None, (
"cleanup() must be called after perform()")
if self._httpd.poll() is None:
logger.debug("Terminating manual command process")
self._httpd.terminate()
else:
logger.debug("Manual command process already terminated "
"with %s code", self._httpd.returncode)
shutil.rmtree(self._root)
def _notify_and_wait(self, message):
# pylint: disable=no-self-use
# TODO: IDisplay wraps messages, breaking the command
#answer = zope.component.getUtility(interfaces.IDisplay).notification(
# message=message, pause=True)
sys.stdout.write(message)
six.moves.input("Press ENTER to continue")
def _get_ip_logging_permission(self):
# pylint: disable=missing-docstring
if not (self.conf("test-mode") or self.conf("public-ip-logging-ok")):
if not zope.component.getUtility(interfaces.IDisplay).yesno(
self.IP_DISCLAIMER, "Yes", "No",
cli_flag="--manual-public-ip-logging-ok",
force_interactive=True):
raise errors.PluginError("Must agree to IP logging to proceed")
else:
self.config.namespace.manual_public_ip_logging_ok = True
def _get_message(self, achall):
# pylint: disable=missing-docstring,no-self-use,unused-argument
return self.MESSAGE_TEMPLATE.get(achall.chall.typ, "")
def cleanup(self, achalls): # pylint: disable=missing-docstring
if self.conf('cleanup-hook'):
for achall in achalls:
env = self.env.pop(achall.domain)
if 'CERTBOT_TOKEN' not in env:
os.environ.pop('CERTBOT_TOKEN', None)
os.environ.update(env)
hooks.execute(self.conf('cleanup-hook'))

View file

@ -1,134 +1,112 @@
"""Tests for certbot.plugins.manual."""
"""Tests for certbot.plugins.manual"""
import os
import unittest
import six
import mock
from acme import challenges
from acme import errors as acme_errors
from acme import jose
from certbot import achallenges
from certbot import errors
from certbot.tests import acme_util
from certbot.tests import util as test_util
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class AuthenticatorTest(unittest.TestCase):
"""Tests for certbot.plugins.manual.Authenticator."""
def setUp(self):
from certbot.plugins.manual import Authenticator
self.http_achall = acme_util.HTTP01_A
self.dns_achall = acme_util.DNS01_A
self.achalls = [self.http_achall, self.dns_achall]
self.config = mock.MagicMock(
http01_port=8080, manual_test_mode=False,
manual_public_ip_logging_ok=False, noninteractive_mode=True)
self.auth = Authenticator(config=self.config, name="manual")
http01_port=0, manual_auth_hook=None, manual_cleanup_hook=None,
manual_public_ip_logging_ok=False, noninteractive_mode=False,
validate_hooks=False)
self.http01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.HTTP01_P, domain="foo.com", account_key=KEY)
self.dns01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.DNS01_P, domain="foo.com", account_key=KEY)
from certbot.plugins.manual import Authenticator
self.auth = Authenticator(self.config, name='manual')
self.achalls = [self.http01, self.dns01]
config_test_mode = mock.MagicMock(
http01_port=8080, manual_test_mode=True, noninteractive_mode=True)
self.auth_test_mode = Authenticator(
config=config_test_mode, name="manual")
def test_prepare(self):
def test_prepare_no_hook_noninteractive(self):
self.config.noninteractive_mode = True
self.assertRaises(errors.PluginError, self.auth.prepare)
self.auth_test_mode.prepare() # error not raised
def test_prepare_bad_hook(self):
self.config.manual_auth_hook = os.path.abspath(os.sep) # is / on UNIX
self.config.validate_hooks = True
self.assertRaises(errors.HookCommandNotFound, self.auth.prepare)
def test_more_info(self):
self.assertTrue(isinstance(self.auth.more_info(), str))
self.assertTrue(isinstance(self.auth.more_info(), six.string_types))
def test_get_chall_pref(self):
self.assertTrue(all(issubclass(pref, challenges.Challenge)
for pref in self.auth.get_chall_pref("foo.com")))
self.assertEqual(self.auth.get_chall_pref('example.org'),
[challenges.HTTP01, challenges.DNS01])
@mock.patch("certbot.plugins.manual.zope.component.getUtility")
def test_perform_empty(self, mock_interaction):
mock_interaction().yesno.return_value = True
self.assertEqual([], self.auth.perform([]))
@mock.patch('certbot.plugins.manual.zope.component.getUtility')
def test_ip_logging_not_ok(self, mock_get_utility):
mock_get_utility().yesno.return_value = False
self.assertRaises(errors.PluginError, self.auth.perform, [])
@mock.patch("certbot.plugins.manual.zope.component.getUtility")
@mock.patch("certbot.plugins.manual.sys.stdout")
@mock.patch("acme.challenges.HTTP01Response.simple_verify")
@mock.patch("six.moves.input")
def test_perform(self, mock_raw_input, mock_verify, mock_stdout, mock_interaction):
mock_verify.return_value = True
mock_interaction().yesno.return_value = True
@mock.patch('certbot.plugins.manual.zope.component.getUtility')
def test_ip_logging_ok(self, mock_get_utility):
mock_get_utility().yesno.return_value = True
self.auth.perform([])
self.assertTrue(self.config.manual_public_ip_logging_ok)
resp_http = self.http01.response(KEY)
resp_dns = self.dns01.response(KEY)
def test_script_perform(self):
self.config.manual_public_ip_logging_ok = True
self.config.manual_auth_hook = (
'echo $CERTBOT_DOMAIN; echo ${CERTBOT_TOKEN:-notoken}; '
'echo $CERTBOT_VALIDATION;')
dns_expected = '{0}\n{1}\n{2}'.format(
self.dns_achall.domain, 'notoken',
self.dns_achall.validation(self.dns_achall.account_key))
http_expected = '{0}\n{1}\n{2}'.format(
self.http_achall.domain, self.http_achall.chall.encode('token'),
self.http_achall.validation(self.http_achall.account_key))
self.assertEqual([resp_http, resp_dns], self.auth.perform(self.achalls))
self.assertEqual(2, mock_raw_input.call_count)
mock_verify.assert_called_with(
self.http01.challb.chall, "foo.com", KEY.public_key(), 8080)
self.assertEqual(
self.auth.perform(self.achalls),
[achall.response(achall.account_key) for achall in self.achalls])
self.assertEqual(
self.auth.env[self.dns_achall.domain]['CERTBOT_AUTH_OUTPUT'],
dns_expected)
self.assertEqual(
self.auth.env[self.http_achall.domain]['CERTBOT_AUTH_OUTPUT'],
http_expected)
message = mock_stdout.write.mock_calls[0][1][0]
self.assertTrue(self.http01.chall.encode("token") in message)
@mock.patch('certbot.plugins.manual.zope.component.getUtility')
def test_manual_perform(self, mock_get_utility):
self.config.manual_public_ip_logging_ok = True
self.assertEqual(
self.auth.perform(self.achalls),
[achall.response(achall.account_key) for achall in self.achalls])
for i, (args, kwargs) in enumerate(mock_get_utility().notification.call_args_list):
achall = self.achalls[i]
self.assertTrue(achall.validation(achall.account_key) in args[0])
self.assertFalse(kwargs['wrap'])
mock_verify.return_value = False
with mock.patch("certbot.plugins.manual.logger") as mock_logger:
self.auth.perform(self.achalls)
self.assertEqual(2, mock_logger.warning.call_count)
def test_cleanup(self):
self.config.manual_public_ip_logging_ok = True
self.config.manual_auth_hook = 'echo foo;'
self.config.manual_cleanup_hook = '# cleanup'
self.auth.perform(self.achalls)
@mock.patch("certbot.plugins.manual.zope.component.getUtility")
@mock.patch("acme.challenges.DNS01Response.simple_verify")
@mock.patch("six.moves.input")
def test_perform_missing_dependency(self, mock_raw_input, mock_verify, mock_interaction):
mock_interaction().yesno.return_value = True
mock_verify.side_effect = acme_errors.DependencyError()
for achall in self.achalls:
self.auth.cleanup([achall])
self.assertEqual(os.environ['CERTBOT_AUTH_OUTPUT'], 'foo')
self.assertEqual(os.environ['CERTBOT_DOMAIN'], achall.domain)
self.assertEqual(
os.environ['CERTBOT_VALIDATION'],
achall.validation(achall.account_key))
with mock.patch("certbot.plugins.manual.logger") as mock_logger:
self.auth.perform([self.dns01])
self.assertEqual(1, mock_logger.warning.call_count)
mock_raw_input.assert_called_once_with("Press ENTER to continue")
@mock.patch("certbot.plugins.manual.zope.component.getUtility")
@mock.patch("certbot.plugins.manual.Authenticator._notify_and_wait")
def test_disagree_with_ip_logging(self, mock_notify, mock_interaction):
mock_interaction().yesno.return_value = False
mock_notify.side_effect = errors.Error("Exception not raised, \
continued execution even after disagreeing with IP logging")
self.assertRaises(errors.PluginError, self.auth.perform, self.achalls)
@mock.patch("certbot.plugins.manual.subprocess.Popen", autospec=True)
def test_perform_test_command_oserror(self, mock_popen):
mock_popen.side_effect = OSError
self.assertEqual([False], self.auth_test_mode.perform([self.http01]))
@mock.patch("certbot.plugins.manual.socket.socket")
@mock.patch("certbot.plugins.manual.time.sleep", autospec=True)
@mock.patch("certbot.plugins.manual.subprocess.Popen", autospec=True)
def test_perform_test_command_run_failure(
self, mock_popen, unused_mock_sleep, unused_mock_socket):
mock_popen.poll.return_value = 10
mock_popen.return_value.pid = 1234
self.assertRaises(
errors.Error, self.auth_test_mode.perform, self.achalls)
def test_cleanup_test_mode_already_terminated(self):
# pylint: disable=protected-access
self.auth_test_mode._httpd = httpd = mock.Mock()
httpd.poll.return_value = 0
self.auth_test_mode.cleanup(self.achalls)
def test_cleanup_test_mode_kills_still_running(self):
# pylint: disable=protected-access
self.auth_test_mode._httpd = httpd = mock.Mock(pid=1234)
httpd.poll.return_value = None
self.auth_test_mode.cleanup(self.achalls)
httpd.terminate.assert_called_once_with()
if isinstance(achall.chall, challenges.HTTP01):
self.assertEqual(
os.environ['CERTBOT_TOKEN'],
achall.chall.encode('token'))
else:
self.assertFalse('CERTBOT_TOKEN' in os.environ)
if __name__ == "__main__":
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,161 +0,0 @@
"""Script-based Authenticator."""
import logging
import os
import sys
import zope.interface
from acme import challenges
from certbot import errors
from certbot import interfaces
from certbot import hooks
from certbot.plugins import common
logger = logging.getLogger(__name__)
CHALLENGES = ["http-01", "dns-01"]
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
"""Script authenticator
calls user defined script to perform authentication and
optionally cleanup.
"""
description = "Authenticate using user provided script(s)"
long_description = ("Authenticate using user provided script(s). " +
"Authenticator script has the following environment " +
"variables available for it: " +
"CERTBOT_DOMAIN - The domain being authenticated " +
"CERTBOT_VALIDATION - The validation string " +
"CERTBOT_TOKEN - Resource name part of HTTP-01 " +
"challenge (HTTP-01 only). " +
"Cleanup script has all the above, and additional " +
"var: CERTBOT_AUTH_OUTPUT - stdout output from the " +
"authenticator"
)
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.cleanup_script = None
self.auth_script = None
self.challenges = []
@classmethod
def add_parser_arguments(cls, add):
add("auth", default=None, required=False,
help="path or command for the authentication script")
add("cleanup", default=None, required=False,
help="path or command for the cleanup script")
@property
def supported_challenges(self):
"""Challenges supported by this plugin."""
return self.challenges
def more_info(self): # pylint: disable=missing-docstring
return("This authenticator enables user to perform authentication " +
"using shell script(s).")
def prepare(self):
"""Prepare script plugin, check challenge, scripts and register them"""
pref_challenges = self.config.pref_challs
for c in pref_challenges:
if c.typ in CHALLENGES:
self.challenges.append(c)
if not self.challenges and len(pref_challenges):
# Challenges requested, but not supported
raise errors.PluginError(
"Unfortunately script plugin doesn't yet support " +
"the requested challenges")
# Challenge not defined on cli, set default
if not self.challenges:
self.challenges.append(challenges.Challenge.TYPES["http-01"])
if not self.conf("auth"):
raise errors.PluginError("Parameter --script-auth is required " +
"for script plugin")
self._prepare_scripts()
def _prepare_scripts(self):
"""Helper method for prepare, to take care of validating scripts"""
script_path = self.conf("auth")
cleanup_path = self.conf("cleanup")
if self.config.validate_hooks:
hooks.validate_hook(script_path, "script_auth")
self.auth_script = script_path
if cleanup_path:
if self.config.validate_hooks:
hooks.validate_hook(cleanup_path, "script_cleanup")
self.cleanup_script = cleanup_path
def get_chall_pref(self, domain):
"""Return challenge(s) we're answering to """
# pylint: disable=unused-argument
return self.challenges
def perform(self, achalls):
"""Perform the authentication per challenge"""
mapping = {"http-01": self._setup_env_http,
"dns-01": self._setup_env_dns}
responses = []
for achall in achalls:
response, validation = achall.response_and_validation()
# Setup env vars
mapping[achall.typ](achall, validation)
output = self.execute(self.auth_script)
if output:
self._write_auth_output(output)
responses.append(response)
return responses
def _setup_env_http(self, achall, validation):
"""Write environment variables for http challenge"""
ev = dict()
ev["CERTBOT_TOKEN"] = achall.chall.encode("token")
ev["CERTBOT_VALIDATION"] = validation
ev["CERTBOT_DOMAIN"] = achall.domain
os.environ.update(ev)
def _setup_env_dns(self, achall, validation):
"""Write environment variables for dns challenge"""
ev = dict()
ev["CERTBOT_VALIDATION"] = validation
ev["CERTBOT_DOMAIN"] = achall.domain
os.environ.update(ev)
def _write_auth_output(self, out):
"""Write output from auth script to env var for
cleanup to act upon"""
os.environ.update({"CERTBOT_AUTH_OUTPUT": out.strip()})
def _normalize_string(self, value):
"""Return string instead of bytestring for Python3.
Helper function for writing env vars, as os.environ needs str"""
if isinstance(value, bytes):
value = value.decode(sys.getdefaultencoding())
return str(value)
def execute(self, shell_cmd):
"""Run a script.
:param str shell_cmd: Command to run
:returns: `str` stdout output"""
_, out = hooks.execute(shell_cmd)
return self._normalize_string(out)
def cleanup(self, achalls): # pylint: disable=unused-argument
"""Run cleanup.sh """
if self.cleanup_script:
self.execute(self.cleanup_script)

View file

@ -1,170 +0,0 @@
"""Tests for certbot.plugins.manual."""
import os
import tempfile
import unittest
import mock
from acme import challenges
from acme import jose
from certbot import achallenges
from certbot import errors
from certbot.tests import acme_util
from certbot.tests import util as test_util
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class AuthenticatorTest(unittest.TestCase):
"""Tests for certbot.plugins.script.Authenticator."""
def setUp(self):
from certbot.plugins.script import Authenticator
self.auth_return_value = "return from auth\n"
self.script_nonexec = create_script(b'# empty')
self.script_exec = create_script_exec(b'echo "return from auth\n"')
self.config = mock.MagicMock(
script_auth=self.script_exec,
script_cleanup=self.script_exec,
pref_challs=[challenges.Challenge.TYPES["http-01"],
challenges.Challenge.TYPES["dns-01"],
challenges.Challenge.TYPES["tls-sni-01"]])
self.tlssni_config = mock.MagicMock(
script_auth=self.script_exec,
script_cleanup=self.script_exec,
pref_challs=[challenges.Challenge.TYPES["tls-sni-01"]])
self.nochall_config = mock.MagicMock(
script_auth=self.script_exec,
script_cleanup=self.script_exec,
)
self.default = Authenticator(config=self.config, name="script")
self.onlytlssni = Authenticator(config=self.tlssni_config,
name="script")
self.nochall = Authenticator(config=self.nochall_config,
name="script")
self.http01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.HTTP01_P, domain="foo.com", account_key=KEY)
self.dns01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.DNS01_P, domain="foo.com", account_key=KEY)
self.achalls = [self.http01, self.dns01]
def tearDown(self):
os.remove(self.script_exec)
os.remove(self.script_nonexec)
def test_prepare_normal(self):
"""Test prepare with typical configuration"""
from certbot.plugins.script import Authenticator
# Erroring combinations in from of (auth_script, cleanup_script, error)
for v in [("/NONEXISTENT/script.sh", "/NONEXISTENT/script.sh",
errors.HookCommandNotFound),
(self.script_nonexec, "/NONEXISTENT/script.sh",
errors.HookCommandNotFound),
(self.script_exec, "/NONEXISTENT/script.sh",
errors.HookCommandNotFound),
("/NONEXISTENT/script.sh", self.script_nonexec,
errors.HookCommandNotFound),
("/NONEXISTENT/script.sh", self.script_exec,
errors.HookCommandNotFound),
(None, self.script_exec,
errors.PluginError)]:
testconf = mock.MagicMock(
script_auth=v[0],
script_cleanup=v[1],
pref_challs=[challenges.Challenge.TYPES["http-01"]])
testauth = Authenticator(config=testconf, name="script")
self.assertRaises(v[2], testauth.prepare)
# This should not error
self.default.prepare()
self.assertEqual(len(self.default.challenges), 2)
def test_prepare_tlssni(self):
"""Test for provided, but unsupported challenge type"""
self.assertRaises(errors.PluginError, self.onlytlssni.prepare)
def test_prepare_nochall(self):
"""Test for default challenge"""
self.nochall.prepare()
self.assertEqual(len(self.nochall.challenges), 1)
def test_more_info(self):
self.assertTrue(isinstance(self.default.more_info(), str))
def test_get_chall_pref(self):
self.default.prepare()
self.assertTrue(all(issubclass(pref, challenges.Challenge)
for pref in self.default.get_chall_pref(
"foo.com")))
def test_get_supported_challenges(self):
self.default.prepare()
self.assertTrue(all(issubclass(sup, challenges.Challenge)
for sup in self.default.supported_challenges))
def test_perform(self):
resp_http = self.http01.response(KEY)
resp_dns = self.dns01.response(KEY)
self.default.prepare()
# Check for the env vars prior to the run
self.assertFalse("CERTBOT_VALIDATION" in os.environ.keys())
self.assertFalse("CERTBOT_DOMAIN" in os.environ.keys())
self.assertFalse("CERTBOT_AUTH_OUTPUT" in os.environ.keys())
pref_resp = self.default.perform(self.achalls)
self.assertEqual([resp_http, resp_dns], pref_resp)
# Check for the env vars post run
self.assertTrue("CERTBOT_VALIDATION" in os.environ.keys())
self.assertTrue("CERTBOT_DOMAIN" in os.environ.keys())
self.assertTrue("CERTBOT_AUTH_OUTPUT" in os.environ.keys())
self.assertEqual(os.environ["CERTBOT_AUTH_OUTPUT"],
self.auth_return_value.strip())
@mock.patch('certbot.plugins.script.Authenticator.execute')
def test_cleanup(self, mock_exec):
mock_exec.return_value = (0, None, None)
self.default.prepare()
self.default.cleanup(self.achalls)
self.assertEqual(mock_exec.call_count, 1)
@mock.patch('certbot.hooks.Popen')
def test_execute(self, mock_popen):
proc = mock.Mock()
# tuple values: stdout, stderr, errorcode, num_of_logger_calls
for t in [("", "", 0, 0),
(self.auth_return_value, "", 0, 0),
(None, "stderr_output", 0, 1),
("whatever", "stderr_output", 1, 2),
(b'bytestring outval', "", 0, 0)]:
proc = mock.Mock()
attrs = {'communicate.return_value': (t[0], t[1]),
'returncode': t[2]}
proc.configure_mock(**attrs) # pylint: disable=star-args
mock_popen.return_value = proc
with mock.patch('certbot.hooks.logger.error') as mock_log:
output = self.default.execute(self.script_exec)
self.assertEqual(mock_log.call_count, t[3])
self.assertTrue(isinstance(output, str))
def create_script(contents):
""" Helper to create temporary file """
f = tempfile.NamedTemporaryFile(delete=False, prefix='.sh')
f.write(contents)
f.close()
return f.name
def create_script_exec(contents):
""" Helper to create temporary file with exec permissions"""
fname = create_script(contents)
os.chmod(fname, 0o700)
return fname

View file

@ -133,7 +133,7 @@ def choose_plugin(prepared, question):
else:
return None
noninstaller_plugins = ["webroot", "manual", "standalone", "script"]
noninstaller_plugins = ["webroot", "manual", "standalone"]
def record_chosen_plugins(config, plugins, auth, inst):
"Update the config entries to reflect the plugins we actually selected."
@ -238,8 +238,6 @@ def cli_plugin_requests(config):
req_auth = set_configurator(req_auth, "webroot")
if config.manual:
req_auth = set_configurator(req_auth, "manual")
if config.script:
req_auth = set_configurator(req_auth, "script")
logger.debug("Requested authenticator %s and installer %s", req_auth, req_inst)
return req_auth, req_inst

View file

@ -7,9 +7,12 @@ from acme import challenges
from acme import jose
from acme import messages
from certbot import auth_handler
from certbot.tests import util
JWK = jose.JWK.load(util.load_vector('rsa512_key.pem'))
KEY = util.load_rsa_private_key('rsa512_key.pem')
# Challenges
@ -50,6 +53,14 @@ DNS01_P = chall_to_challb(DNS01, messages.STATUS_PENDING)
CHALLENGES_P = [HTTP01_P, TLSSNI01_P, DNS01_P]
# AnnotatedChallenge objects
HTTP01_A = auth_handler.challb_to_achall(HTTP01_P, JWK, "example.com")
TLSSNI01_A = auth_handler.challb_to_achall(TLSSNI01_P, JWK, "example.net")
DNS01_A = auth_handler.challb_to_achall(DNS01_P, JWK, "example.org")
ACHALLENGES = [HTTP01_A, TLSSNI01_A, DNS01_A]
def gen_authzr(authz_status, domain, challs, statuses, combos=True):
"""Generate an authorization resource.

View file

@ -81,7 +81,7 @@ class ParseTest(unittest.TestCase):
out = self._help_output(['--help', 'all'])
self.assertTrue("--configurator" in out)
self.assertTrue("how a cert is deployed" in out)
self.assertTrue("--manual-test-mode" in out)
self.assertTrue("--webroot-path" in out)
self.assertTrue("--text" not in out)
self.assertTrue("--dialog" not in out)
self.assertTrue("%s" not in out)
@ -91,7 +91,7 @@ class ParseTest(unittest.TestCase):
if "nginx" in self.plugins:
# may be false while building distributions without plugins
self.assertTrue("--nginx-ctl" in out)
self.assertTrue("--manual-test-mode" not in out)
self.assertTrue("--webroot-path" not in out)
self.assertTrue("--checkpoints" not in out)
out = self._help_output(['-h'])
@ -102,7 +102,7 @@ class ParseTest(unittest.TestCase):
self.assertTrue("(the certbot nginx plugin is not" in out)
out = self._help_output(['--help', 'plugins'])
self.assertTrue("--manual-test-mode" not in out)
self.assertTrue("--webroot-path" not in out)
self.assertTrue("--prepare" in out)
self.assertTrue('"plugins" subcommand' in out)
@ -305,22 +305,22 @@ class SetByCliTest(unittest.TestCase):
def test_report_config_interaction_str(self):
cli.report_config_interaction('manual_public_ip_logging_ok',
'manual_test_mode')
cli.report_config_interaction('manual_test_mode', 'manual')
'manual_auth_hook')
cli.report_config_interaction('manual_auth_hook', 'manual')
self._test_report_config_interaction_common()
def test_report_config_interaction_iterable(self):
cli.report_config_interaction(('manual_public_ip_logging_ok',),
('manual_test_mode',))
cli.report_config_interaction(('manual_test_mode',), ('manual',))
('manual_auth_hook',))
cli.report_config_interaction(('manual_auth_hook',), ('manual',))
self._test_report_config_interaction_common()
def _test_report_config_interaction_common(self):
"""Tests implied interaction between manual flags.
--manual implies --manual-test-mode which implies
--manual implies --manual-auth-hook which implies
--manual-public-ip-logging-ok. These interactions don't actually
exist in the client, but are used here for testing purposes.
@ -328,13 +328,13 @@ class SetByCliTest(unittest.TestCase):
args = ['--manual']
verb = 'renew'
for v in ('manual', 'manual_test_mode', 'manual_public_ip_logging_ok'):
for v in ('manual', 'manual_auth_hook', 'manual_public_ip_logging_ok'):
self.assertTrue(_call_set_by_cli(v, args, verb))
cli.set_by_cli.detector = None
args = ['--manual-test-mode']
for v in ('manual_test_mode', 'manual_public_ip_logging_ok'):
args = ['--manual-auth-hook', 'command']
for v in ('manual_auth_hook', 'manual_public_ip_logging_ok'):
self.assertTrue(_call_set_by_cli(v, args, verb))
self.assertFalse(_call_set_by_cli('manual', args, verb))

View file

@ -131,7 +131,6 @@ setup(
'null = certbot.plugins.null:Installer',
'standalone = certbot.plugins.standalone:Authenticator',
'webroot = certbot.plugins.webroot:Authenticator',
'script = certbot.plugins.script:Authenticator',
],
},
)

View file

@ -44,7 +44,12 @@ python_server_pid=$!
common --domains le2.wtf --preferred-challenges http-01 run
kill $python_server_pid
common -a manual -d le.wtf auth --rsa-key-size 4096
common certonly -a manual -d le.wtf --rsa-key-size 4096 \
--manual-auth-hook ./tests/manual-http-auth.sh \
--manual-cleanup-hook ./tests/manual-http-cleanup.sh
common certonly -a manual -d dns.le.wtf --preferred-challenges dns-01 \
--manual-auth-hook ./tests/manual-dns-auth.sh
export CSR_PATH="${root}/csr.der" KEY_PATH="${root}/key.pem" \
OPENSSL_CNF=examples/openssl.cnf

View file

@ -25,7 +25,7 @@ certbot_test_no_force_renew () {
--no-verify-ssl \
--tls-sni-01-port $tls_sni_01_port \
--http-01-port $http_01_port \
--manual-test-mode \
--manual-public-ip-logging-ok \
$store_flags \
--non-interactive \
--no-redirect \

4
tests/manual-dns-auth.sh Executable file
View file

@ -0,0 +1,4 @@
#!/bin/sh
curl -X POST 'http://localhost:8055/set-txt' -d \
"{\"host\": \"_acme-challenge.$CERTBOT_DOMAIN.\", \
\"value\": \"$CERTBOT_VALIDATION\"}"

12
tests/manual-http-auth.sh Executable file
View file

@ -0,0 +1,12 @@
#!/bin/sh
uri_path=".well-known/acme-challenge/$CERTBOT_TOKEN"
cd $(mktemp -d)
mkdir -p $(dirname $uri_path)
echo $CERTBOT_VALIDATION > $uri_path
python -m SimpleHTTPServer $http_01_port >/dev/null 2>&1 &
server_pid=$!
while ! curl "http://localhost:$http_01_port/$uri_path" >/dev/null 2>&1; do
sleep 1s
done
echo $server_pid

2
tests/manual-http-cleanup.sh Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
kill $CERTBOT_AUTH_OUTPUT