make manual use --preferred-challenges flag

This commit is contained in:
Brad Warren 2016-08-29 15:44:31 -07:00
parent 7b7477ab94
commit 322546b8d1
5 changed files with 22 additions and 120 deletions

View file

@ -4,31 +4,27 @@ import logging
import pipes
import shutil
import signal
import socket
import subprocess
import sys
import tempfile
import time
import six
import zope.component
import zope.interface
from functools import partial
from acme import challenges
from acme import errors as acme_errors
from certbot import errors
from certbot import interfaces
from certbot.plugins import common, util
from certbot.util import busy_wait
from certbot.plugins import common
logger = logging.getLogger(__name__)
SUPPORTED_CHALLENGES = [challenges.HTTP01, challenges.DNS01]
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
@ -99,23 +95,10 @@ s.serve_forever()" """
@classmethod
def add_parser_arguments(cls, add):
validator = partial(util.supported_challenges_validator,
supported=SUPPORTED_CHALLENGES)
add("test-mode", action="store_true",
help="Test mode. Executes the manual command in subprocess.")
add("public-ip-logging-ok", action="store_true",
help="Automatically allows public IP logging.")
add("supported-challenges",
help="Supported challenges. Preferred in the order they are listed.",
type=validator,
default="http-01")
@property
def supported_challenges(self):
"""Challenges supported by this plugin."""
return [challenges.Challenge.TYPES[name] for name in
self.conf("supported-challenges").split(",")]
def prepare(self): # pylint: disable=missing-docstring,no-self-use
if self.config.noninteractive_mode and not self.conf("test-mode"):
@ -132,7 +115,7 @@ s.serve_forever()" """
def get_chall_pref(self, domain):
# pylint: disable=missing-docstring,no-self-use,unused-argument
return self.supported_challenges
return [challenges.HTTP01, challenges.DNS01]
def perform(self, achalls):
# pylint: disable=missing-docstring
@ -145,6 +128,20 @@ s.serve_forever()" """
responses.append(mapping[achall.typ](achall))
return responses
@classmethod
def _test_mode_busy_wait(cls, port):
while True:
time.sleep(1)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(("localhost", port))
except socket.error: # pragma: no cover
pass
else:
break
finally:
sock.close()
def cleanup(self, achalls):
# pylint: disable=missing-docstring
for achall in achalls:
@ -184,7 +181,7 @@ s.serve_forever()" """
logger.debug("Manual command running as PID %s.", self._httpd.pid)
# give it some time to bootstrap, before we try to verify
# (cert generation in case of simpleHttpS might take time)
busy_wait(port)
self._test_mode_busy_wait(port)
if self._httpd.poll() is not None:
raise errors.Error("Couldn't execute manual command")

View file

@ -32,7 +32,7 @@ class AuthenticatorTest(unittest.TestCase):
self.http01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.HTTP01_P, domain="foo.com", account_key=KEY)
self.dns01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.DNS01_P, domain="foo.com", account_key=KEY)
challb=acme_util.DNS01_P, domain="foo.com", account_key=KEY)
self.achalls = [self.http01, self.dns01]
@ -106,8 +106,8 @@ class AuthenticatorTest(unittest.TestCase):
mock_popen.side_effect = OSError
self.assertEqual([False], self.auth_test_mode.perform([self.http01]))
@mock.patch("certbot.util.socket.socket")
@mock.patch("certbot.util.time.sleep", autospec=True)
@mock.patch("certbot.plugins.manual.socket.socket")
@mock.patch("certbot.plugins.manual.time.sleep", autospec=True)
@mock.patch("certbot.plugins.manual.subprocess.Popen", autospec=True)
def test_perform_test_command_run_failure(
self, mock_popen, unused_mock_sleep, unused_mock_socket):

View file

@ -1,13 +1,10 @@
"""Plugin utilities."""
import argparse
import logging
import os
import socket
import zope.component
from acme import challenges
from certbot import interfaces
from certbot import util
@ -157,36 +154,3 @@ def already_listening_psutil(port, renewer=False):
# name (AccessDenied).
pass
return False
def supported_challenges_validator(data, supported=None):
"""Supported challenges validator for the `argparse`.
It should be passed as `type` argument to `add_argument`.
:param str data: input value representing the supported_challenges
:returns: original value if valid
"""
challs = data.split(",")
supported = supported or []
# tls-sni-01 was dvsni during private beta
if "dvsni" in challs:
logger.info("Updating legacy standalone_supported_challenges value")
challs = [challenges.TLSSNI01.typ if chall == "dvsni" else chall
for chall in challs]
data = ",".join(challs)
unrecognized = [name for name in challs
if name not in challenges.Challenge.TYPES]
if unrecognized:
raise argparse.ArgumentTypeError(
"Unrecognized challenges: {0}".format(", ".join(unrecognized)))
choices = set(chall.typ for chall in supported)
if not set(challs).issubset(choices):
raise argparse.ArgumentTypeError(
"Plugin does not support the following (valid) "
"challenges: {0}".format(", ".join(set(challs) - choices)))
return data

View file

@ -1,5 +1,4 @@
"""Tests for certbot.plugins.util."""
import argparse
import os
import unittest
import sys
@ -212,42 +211,5 @@ class AlreadyListeningTestPsutil(unittest.TestCase):
mock_net.side_effect = psutil.AccessDenied("")
self.assertFalse(self._call(12345))
class SupportedChallengesValidatorTest(unittest.TestCase):
"""Tests for plugins.standalone.supported_challenges_validator."""
def _call(self, data):
from certbot.plugins.util import supported_challenges_validator
from acme import challenges
supported = [challenges.HTTP01, challenges.DNS01, challenges.TLSSNI01]
return supported_challenges_validator(data, supported=supported)
def test_correct(self):
self.assertEqual("tls-sni-01", self._call("tls-sni-01"))
self.assertEqual("http-01", self._call("http-01"))
self.assertEqual("tls-sni-01,http-01", self._call("tls-sni-01,http-01"))
self.assertEqual("http-01,tls-sni-01", self._call("http-01,tls-sni-01"))
def test_unrecognized(self):
from acme import challenges
assert "foo" not in challenges.Challenge.TYPES
self.assertRaises(argparse.ArgumentTypeError, self._call, "foo")
def test_not_subset(self):
self.assertRaises(argparse.ArgumentTypeError, self._call, "dns")
def test_dvsni(self):
self.assertEqual("tls-sni-01", self._call("dvsni"))
self.assertEqual("http-01,tls-sni-01", self._call("http-01,dvsni"))
self.assertEqual("tls-sni-01,http-01", self._call("dvsni,http-01"))
def test_dns01(self):
self.assertEqual("dns-01", self._call("dns-01"))
self.assertEqual("http-01,dns-01", self._call("http-01,dns-01"))
self.assertEqual("dns-01,http-01", self._call("dns-01,http-01"))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -14,7 +14,6 @@ import socket
import stat
import subprocess
import sys
import time
import configargparse
@ -475,23 +474,3 @@ def get_strict_version(normalized):
# strict version ending with "a" and a number designates a pre-release
# pylint: disable=no-member
return distutils.version.StrictVersion(normalized.replace(".dev", "a"))
def busy_wait(port, host="localhost"):
"""Artificialy wait a fixed amount of time on a specific host and port
:param str port: port of the connection
:param str host: hostname of the connection, "localhost" if None
"""
while True:
time.sleep(1)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
except socket.error: # pragma: no cover
pass
else:
break
finally:
sock.close()