diff --git a/letsencrypt/plugins/common.py b/letsencrypt/plugins/common.py index 32bee2b49..90296c5c7 100644 --- a/letsencrypt/plugins/common.py +++ b/letsencrypt/plugins/common.py @@ -1,8 +1,14 @@ """Plugin common functions.""" +import os +import pkg_resources +import shutil +import tempfile + import zope.interface from acme.jose import util as jose_util +from letsencrypt import constants from letsencrypt import interfaces @@ -69,3 +75,127 @@ class Plugin(object): with unique plugin name prefix. """ + +# other + +class Addr(object): + r"""Represents an virtual host address. + + :param str addr: addr part of vhost address + :param str port: port number or \*, or "" + + """ + def __init__(self, tup): + self.tup = tup + + @classmethod + def fromstring(cls, str_addr): + """Initialize Addr from string.""" + tup = str_addr.partition(':') + return cls((tup[0], tup[2])) + + def __str__(self): + if self.tup[1]: + return "%s:%s" % self.tup + return self.tup[0] + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.tup == other.tup + return False + + def __hash__(self): + return hash(self.tup) + + def get_addr(self): + """Return addr part of Addr object.""" + return self.tup[0] + + def get_port(self): + """Return port.""" + return self.tup[1] + + def get_addr_obj(self, port): + """Return new address object with same addr and new port.""" + return self.__class__((self.tup[0], port)) + + +class Dvsni(object): + """Class that perform DVSNI challenges.""" + + def __init__(self, configurator): + self.configurator = configurator + self.achalls = [] + self.indices = [] + self.challenge_conf = os.path.join( + configurator.config.config_dir, "le_dvsni_cert_challenge.conf") + # self.completed = 0 + + def add_chall(self, achall, idx=None): + """Add challenge to DVSNI object to perform at once. + + :param achall: Annotated DVSNI challenge. + :type achall: :class:`letsencrypt.achallenges.DVSNI` + + :param int idx: index to challenge in a larger array + + """ + self.achalls.append(achall) + if idx is not None: + self.indices.append(idx) + + def get_cert_file(self, achall): + """Returns standardized name for challenge certificate. + + :param achall: Annotated DVSNI challenge. + :type achall: :class:`letsencrypt.achallenges.DVSNI` + + :returns: certificate file name + :rtype: str + + """ + return os.path.join( + self.configurator.config.work_dir, achall.nonce_domain + ".crt") + + def _setup_challenge_cert(self, achall, s=None): + # pylint: disable=invalid-name + """Generate and write out challenge certificate.""" + cert_path = self.get_cert_file(achall) + # Register the path before you write out the file + self.configurator.reverter.register_file_creation(True, cert_path) + + cert_pem, response = achall.gen_cert_and_response(s) + + # Write out challenge cert + with open(cert_path, "w") as cert_chall_fd: + cert_chall_fd.write(cert_pem) + + return response + + +# test utils + +def setup_ssl_options(config_dir, mod_ssl_conf): + """Move the ssl_options into position and return the path.""" + option_path = os.path.join(config_dir, "options-ssl.conf") + shutil.copyfile(mod_ssl_conf, option_path) + return option_path + + +def dir_setup(test_dir, pkg): + """Setup the directories necessary for the configurator.""" + temp_dir = tempfile.mkdtemp("temp") + config_dir = tempfile.mkdtemp("config") + work_dir = tempfile.mkdtemp("work") + + os.chmod(temp_dir, constants.CONFIG_DIRS_MODE) + os.chmod(config_dir, constants.CONFIG_DIRS_MODE) + os.chmod(work_dir, constants.CONFIG_DIRS_MODE) + + test_configs = pkg_resources.resource_filename( + pkg, os.path.join("testdata", test_dir)) + + shutil.copytree( + test_configs, os.path.join(temp_dir, test_dir), symlinks=True) + + return temp_dir, config_dir, work_dir diff --git a/letsencrypt/plugins/common_test.py b/letsencrypt/plugins/common_test.py index 12dd18bdf..6de86f2b8 100644 --- a/letsencrypt/plugins/common_test.py +++ b/letsencrypt/plugins/common_test.py @@ -1,8 +1,16 @@ """Tests for letsencrypt.plugins.common.""" +import pkg_resources import unittest import mock +from acme import challenges + +from letsencrypt import achallenges +from letsencrypt import le_util + +from letsencrypt.tests import acme_util + class NamespaceFunctionsTest(unittest.TestCase): """Tests for letsencrypt.plugins.common.*_namespace functions.""" @@ -57,5 +65,103 @@ class PluginTest(unittest.TestCase): "--mock-foo-bar", dest="different_to_foo_bar", x=1, y=None) +class AddrTest(unittest.TestCase): + """Tests for letsencrypt.client.plugins.common.Addr.""" + + def setUp(self): + from letsencrypt.plugins.common import Addr + self.addr1 = Addr.fromstring("192.168.1.1") + self.addr2 = Addr.fromstring("192.168.1.1:*") + self.addr3 = Addr.fromstring("192.168.1.1:80") + + def test_fromstring(self): + self.assertEqual(self.addr1.get_addr(), "192.168.1.1") + self.assertEqual(self.addr1.get_port(), "") + self.assertEqual(self.addr2.get_addr(), "192.168.1.1") + self.assertEqual(self.addr2.get_port(), "*") + self.assertEqual(self.addr3.get_addr(), "192.168.1.1") + self.assertEqual(self.addr3.get_port(), "80") + + def test_str(self): + self.assertEqual(str(self.addr1), "192.168.1.1") + self.assertEqual(str(self.addr2), "192.168.1.1:*") + self.assertEqual(str(self.addr3), "192.168.1.1:80") + + def test_get_addr_obj(self): + self.assertEqual(str(self.addr1.get_addr_obj("443")), "192.168.1.1:443") + self.assertEqual(str(self.addr2.get_addr_obj("")), "192.168.1.1") + self.assertEqual(str(self.addr1.get_addr_obj("*")), "192.168.1.1:*") + + def test_eq(self): + self.assertEqual(self.addr1, self.addr2.get_addr_obj("")) + self.assertNotEqual(self.addr1, self.addr2) + self.assertFalse(self.addr1 == 3333) + + def test_set_inclusion(self): + from letsencrypt.plugins.common import Addr + set_a = set([self.addr1, self.addr2]) + addr1b = Addr.fromstring("192.168.1.1") + addr2b = Addr.fromstring("192.168.1.1:*") + set_b = set([addr1b, addr2b]) + + self.assertEqual(set_a, set_b) + + +class DvsniTest(unittest.TestCase): + """Tests for letsencrypt.plugins.common.DvsniTest.""" + + rsa256_file = pkg_resources.resource_filename( + "acme.jose", "testdata/rsa256_key.pem") + rsa256_pem = pkg_resources.resource_string( + "acme.jose", "testdata/rsa256_key.pem") + + auth_key = le_util.Key(rsa256_file, rsa256_pem) + achalls = [ + achallenges.DVSNI( + challb=acme_util.chall_to_challb( + challenges.DVSNI( + r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9" + "\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4", + nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18", + ), "pending"), + domain="encryption-example.demo", key=auth_key), + achallenges.DVSNI( + challb=acme_util.chall_to_challb( + challenges.DVSNI( + r="\xba\xa9\xda? """ - def __init__(self, configurator): - self.configurator = configurator - self.achalls = [] - self.indices = [] - self.challenge_conf = os.path.join( - configurator.config.config_dir, "le_dvsni_cert_challenge.conf") - # self.completed = 0 - - def add_chall(self, achall, idx=None): - """Add challenge to DVSNI object to perform at once. - - :param achall: Annotated DVSNI challenge. - :type achall: :class:`letsencrypt.achallenges.DVSNI` - - :param int idx: index to challenge in a larger array - - """ - self.achalls.append(achall) - if idx is not None: - self.indices.append(idx) def perform(self): """Peform a DVSNI challenge.""" @@ -107,28 +89,12 @@ class ApacheDvsni(object): return responses - def _setup_challenge_cert(self, achall, s=None): - # pylint: disable=invalid-name - """Generate and write out challenge certificate.""" - cert_path = self.get_cert_file(achall) - # Register the path before you write out the file - self.configurator.reverter.register_file_creation(True, cert_path) - - cert_pem, response = achall.gen_cert_and_response(s) - - # Write out challenge cert - with open(cert_path, "w") as cert_chall_fd: - cert_chall_fd.write(cert_pem) - - return response - def _mod_config(self, ll_addrs): """Modifies Apache config files to include challenge vhosts. Result: Apache config includes virtual servers for issued challs - :param list ll_addrs: list of list of - :class:`letsencrypt.plugins.apache.obj.Addr` to apply + :param list ll_addrs: list of list of `~.common.Addr` to apply """ # TODO: Use ip address of existing vhost instead of relying on FQDN @@ -167,7 +133,7 @@ class ApacheDvsni(object): :type achall: :class:`letsencrypt.achallenges.DVSNI` :param list ip_addrs: addresses of challenged domain - :class:`list` of type :class:`~apache.obj.Addr` + :class:`list` of type `~.common.Addr` :returns: virtual host configuration text :rtype: str @@ -186,16 +152,3 @@ class ApacheDvsni(object): ssl_options_conf_path=self.configurator.parser.loc["ssl_options"], cert_path=self.get_cert_file(achall), key_path=achall.key.file, document_root=document_root).replace("\n", os.linesep) - - def get_cert_file(self, achall): - """Returns standardized name for challenge certificate. - - :param achall: Annotated DVSNI challenge. - :type achall: :class:`letsencrypt.achallenges.DVSNI` - - :returns: certificate file name - :rtype: str - - """ - return os.path.join( - self.configurator.config.work_dir, achall.nonce_domain + ".crt") diff --git a/letsencrypt_apache/obj.py b/letsencrypt_apache/obj.py index 905e3f192..fecf46ff9 100644 --- a/letsencrypt_apache/obj.py +++ b/letsencrypt_apache/obj.py @@ -1,54 +1,13 @@ """Module contains classes used by the Apache Configurator.""" -class Addr(object): - r"""Represents an Apache VirtualHost address. - - :param str addr: addr part of vhost address - :param str port: port number or \*, or "" - - """ - def __init__(self, tup): - self.tup = tup - - @classmethod - def fromstring(cls, str_addr): - """Initialize Addr from string.""" - tup = str_addr.partition(':') - return cls((tup[0], tup[2])) - - def __str__(self): - if self.tup[1]: - return "%s:%s" % self.tup - return self.tup[0] - - def __eq__(self, other): - if isinstance(other, self.__class__): - return self.tup == other.tup - return False - - def __hash__(self): - return hash(self.tup) - - def get_addr(self): - """Return addr part of Addr object.""" - return self.tup[0] - - def get_port(self): - """Return port.""" - return self.tup[1] - - def get_addr_obj(self, port): - """Return new address object with same addr and new port.""" - return self.__class__((self.tup[0], port)) - - class VirtualHost(object): # pylint: disable=too-few-public-methods """Represents an Apache Virtualhost. :ivar str filep: file path of VH :ivar str path: Augeas path to virtual host - :ivar set addrs: Virtual Host addresses (:class:`set` of :class:`Addr`) + :ivar set addrs: Virtual Host addresses (:class:`set` of + :class:`common.Addr`) :ivar set names: Server names/aliases of vhost (:class:`list` of :class:`str`) diff --git a/letsencrypt_apache/tests/configurator_test.py b/letsencrypt_apache/tests/configurator_test.py index 11b88f9e5..c3064eb5b 100644 --- a/letsencrypt_apache/tests/configurator_test.py +++ b/letsencrypt_apache/tests/configurator_test.py @@ -12,10 +12,11 @@ from letsencrypt import achallenges from letsencrypt import errors from letsencrypt import le_util +from letsencrypt.plugins import common + from letsencrypt.tests import acme_util from letsencrypt_apache import configurator -from letsencrypt_apache import obj from letsencrypt_apache import parser from letsencrypt_apache.tests import util @@ -112,7 +113,7 @@ class TwoVhost80Test(util.ApacheTest): self.vh_truth[1].filep) def test_is_name_vhost(self): - addr = obj.Addr.fromstring("*:80") + addr = common.Addr.fromstring("*:80") self.assertTrue(self.config.is_name_vhost(addr)) self.config.version = (2, 2) self.assertFalse(self.config.is_name_vhost(addr)) @@ -133,7 +134,7 @@ class TwoVhost80Test(util.ApacheTest): self.assertEqual(ssl_vhost.path, "/files" + ssl_vhost.filep + "/IfModule/VirtualHost") self.assertEqual(len(ssl_vhost.addrs), 1) - self.assertEqual(set([obj.Addr.fromstring("*:443")]), ssl_vhost.addrs) + self.assertEqual(set([common.Addr.fromstring("*:443")]), ssl_vhost.addrs) self.assertEqual(ssl_vhost.names, set(["encryption-example.demo"])) self.assertTrue(ssl_vhost.ssl) self.assertFalse(ssl_vhost.enabled) diff --git a/letsencrypt_apache/tests/dvsni_test.py b/letsencrypt_apache/tests/dvsni_test.py index 321dce42c..bf43bc359 100644 --- a/letsencrypt_apache/tests/dvsni_test.py +++ b/letsencrypt_apache/tests/dvsni_test.py @@ -1,5 +1,4 @@ """Test for letsencrypt_apache.dvsni.""" -import pkg_resources import unittest import shutil @@ -7,18 +6,17 @@ import mock from acme import challenges -from letsencrypt import achallenges -from letsencrypt import le_util +from letsencrypt.plugins import common +from letsencrypt.plugins import common_test -from letsencrypt.tests import acme_util - -from letsencrypt_apache import obj from letsencrypt_apache.tests import util class DvsniPerformTest(util.ApacheTest): """Test the ApacheDVSNI challenge.""" + achalls = common_test.DvsniTest.achalls + def setUp(self): super(DvsniPerformTest, self).setUp() @@ -32,32 +30,6 @@ class DvsniPerformTest(util.ApacheTest): from letsencrypt_apache import dvsni self.sni = dvsni.ApacheDvsni(config) - rsa256_file = pkg_resources.resource_filename( - "acme.jose", "testdata/rsa256_key.pem") - rsa256_pem = pkg_resources.resource_string( - "acme.jose", "testdata/rsa256_key.pem") - - auth_key = le_util.Key(rsa256_file, rsa256_pem) - self.achalls = [ - achallenges.DVSNI( - challb=acme_util.chall_to_challb( - challenges.DVSNI( - r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9" - "\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4", - nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18", - ), "pending"), - domain="encryption-example.demo", key=auth_key), - achallenges.DVSNI( - challb=acme_util.chall_to_challb( - challenges.DVSNI( - r="\xba\xa9\xda?