Add unit test for deploying cert

This commit is contained in:
yan 2015-04-15 23:11:35 -07:00
parent eeb81cbf1f
commit f050fcfa58
4 changed files with 116 additions and 95 deletions

View file

@ -95,6 +95,8 @@ class NginxConfigurator(object):
Nginx doesn't have a cert chain directive, so the last parameter is
always ignored. It expects the cert file to have the concatenated chain.
.. note:: This doesn't save the config files!
:param str domain: domain to deploy certificate
:param str cert: certificate filename
:param str key: private key filename
@ -122,7 +124,6 @@ class NginxConfigurator(object):
", ".join(str(addr) for addr in vhost.addrs)))
self.save_notes += "\tssl_certificate %s\n" % cert
self.save_notes += "\tssl_certificate_key %s\n" % key
self.save()
#######################
# Vhost parsing methods
@ -424,6 +425,7 @@ class NginxConfigurator(object):
self.reverter.add_to_checkpoint(save_files,
self.save_notes)
# Change 'ext' to something else to not override existing conf files
self.parser.filedump(ext='')
if title and not temporary:
self.reverter.finalize_checkpoint(title)

View file

@ -8,6 +8,9 @@ from letsencrypt.client.plugins.nginx import parser
class NginxDvsni(object):
"""Class performs DVSNI challenges within the Nginx configurator.
.. todo:: This is basically copied-and-pasted from the Apache equivalent.
It doesn't actually work yet.
:ivar configurator: NginxConfigurator object
:type configurator: :class:`~nginx.configurator.NginxConfigurator`

View file

@ -115,16 +115,9 @@ class NginxParser(object):
lambda x: servers[filename].append(x[1]))
# Find 'include' statements in server blocks and append their trees
for server in servers[filename]:
for directive in server:
if (self._is_include_directive(directive)):
included_files = glob.glob(
self.abs_path(directive[1]))
for f in included_files:
try:
server.extend(self.parsed[f])
except:
pass
for i, server in enumerate(servers[filename]):
new_server = self._get_included_directives(server)
servers[filename][i] = new_server
for filename in servers:
for server in servers[filename]:
@ -140,6 +133,26 @@ class NginxParser(object):
return vhosts
def _get_included_directives(self, block):
"""Returns array with the "include" directives expanded out by
concatenating the contents of the included file to the block.
:param list block:
:rtype: list
"""
result = list(block) # Copy the list to keep self.parsed idempotent
for directive in block:
if (self._is_include_directive(directive)):
included_files = glob.glob(
self.abs_path(directive[1]))
for f in included_files:
try:
result.extend(self.parsed[f])
except:
pass
return result
def _parse_server(self, server):
"""Parses a list of server directives.
@ -270,8 +283,9 @@ class NginxParser(object):
# Can't be a server block
return False
new_entry = self._get_included_directives(entry)
server_names = set()
for item in entry:
for item in new_entry:
if type(item) != list:
# Can't be a server block
return False
@ -323,6 +337,7 @@ class NginxParser(object):
lambda x: self._has_server_names(x, names),
lambda x: self._replace_directives(x, directives))
else:
print('adding server directives for %s' % filename)
_do_for_subarray(self.parsed[filename],
lambda x: self._has_server_names(x, names),
lambda x: x.extend(directives))

View file

@ -6,7 +6,9 @@ import mock
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client.plugins.nginx.tests import util
@ -92,65 +94,66 @@ class NginxConfiguratorTest(util.NginxTest):
self.assertTrue('nginx.conf' in self.config.more_info())
def test_deploy_cert(self):
pass
# Get the default 443 vhost
# self.config.assoc["random.demo"] = self.vh_truth[1]
# self.config.deploy_cert(
# "random.demo",
# "example/cert.pem", "example/key.pem", "example/cert_chain.pem")
# self.config.save()
#
# loc_cert = self.config.parser.find_dir(
# parser.case_i("sslcertificatefile"),
# re.escape("example/cert.pem"), self.vh_truth[1].path)
# loc_key = self.config.parser.find_dir(
# parser.case_i("sslcertificateKeyfile"),
# re.escape("example/key.pem"), self.vh_truth[1].path)
# loc_chain = self.config.parser.find_dir(
# parser.case_i("SSLCertificateChainFile"),
# re.escape("example/cert_chain.pem"), self.vh_truth[1].path)
#
# # Verify one directive was found in the correct file
# self.assertEqual(len(loc_cert), 1)
# self.assertEqual(configurator.get_file_path(loc_cert[0]),
# self.vh_truth[1].filep)
#
# self.assertEqual(len(loc_key), 1)
# self.assertEqual(configurator.get_file_path(loc_key[0]),
# self.vh_truth[1].filep)
#
# self.assertEqual(len(loc_chain), 1)
# self.assertEqual(configurator.get_file_path(loc_chain[0]),
# self.vh_truth[1].filep)
server_conf = self.config.parser.abs_path('server.conf')
nginx_conf = self.config.parser.abs_path('nginx.conf')
example_conf = self.config.parser.abs_path('sites-enabled/example.com')
def test_make_vhost_ssl(self):
pass
# ssl_vhost = self.config.make_vhost_ssl(self.vh_truth[0])
#
# self.assertEqual(
# ssl_vhost.filep,
# os.path.join(self.config_path, "sites-available",
# "encryption-example-le-ssl.conf"))
#
# self.assertEqual(ssl_vhost.path,
# "/files" + ssl_vhost.filep + "/IfModule/VirtualHost")
# self.assertEqual(len(ssl_vhost.addrs), 1)
# self.assertEqual(set([obj.Addr.fromstring("*:443")]), ssl_vhost.addrs)
# self.assertEqual(ssl_vhost.names, set(["encryption-example.demo"]))
# self.assertTrue(ssl_vhost.ssl)
# self.assertFalse(ssl_vhost.enabled)
#
# self.assertTrue(self.config.parser.find_dir(
# "SSLCertificateFile", None, ssl_vhost.path))
# self.assertTrue(self.config.parser.find_dir(
# "SSLCertificateKeyFile", None, ssl_vhost.path))
# self.assertTrue(self.config.parser.find_dir(
# "Include", self.ssl_options, ssl_vhost.path))
#
# self.assertEqual(self.config.is_name_vhost(self.vh_truth[0]),
# self.config.is_name_vhost(ssl_vhost))
#
# self.assertEqual(len(self.config.vhosts), 5)
# Get the default 443 vhost
self.config.deploy_cert(
"www.example.com",
"example/cert.pem", "example/key.pem")
self.config.deploy_cert(
"another.alias",
"/etc/nginx/cert.pem", "/etc/nginx/key.pem")
self.config.save()
self.config.parser.load()
self.assertEqual([[['server'],
[['listen', '69.50.225.155:9000'],
['listen', '127.0.0.1'],
['server_name', '.example.com'],
['server_name', 'example.*'],
['listen', '443 ssl'],
['ssl_certificate', 'example/cert.pem'],
['ssl_certificate_key', 'example/key.pem'],
['include',
self.config.parser.loc["ssl_options"]]]]],
self.config.parser.parsed[example_conf])
self.assertEqual([['server_name', 'somename alias another.alias']],
self.config.parser.parsed[server_conf])
self.assertEqual([['server'],
[['listen', '8000'],
['listen', 'somename:8080'],
['include', 'server.conf'],
[['location', '/'],
[['root', 'html'],
['index', 'index.html index.htm']]],
['listen', '443 ssl'],
['ssl_certificate', '/etc/nginx/cert.pem'],
['ssl_certificate_key', '/etc/nginx/key.pem'],
['include',
self.config.parser.loc["ssl_options"]]]],
self.config.parser.parsed[nginx_conf][-1][-1][-3])
def test_get_all_certs_keys(self):
nginx_conf = self.config.parser.abs_path('nginx.conf')
example_conf = self.config.parser.abs_path('sites-enabled/example.com')
# Get the default 443 vhost
self.config.deploy_cert(
"www.example.com",
"example/cert.pem", "example/key.pem")
self.config.deploy_cert(
"another.alias",
"/etc/nginx/cert.pem", "/etc/nginx/key.pem")
self.config.save()
self.config.parser.load()
self.assertEqual(set([
('example/cert.pem', 'example/key.pem', example_conf),
('/etc/nginx/cert.pem', '/etc/nginx/key.pem', nginx_conf),
]), self.config.get_all_certs_keys())
@mock.patch("letsencrypt.client.plugins.nginx.configurator."
"dvsni.NginxDvsni.perform")
@ -159,31 +162,29 @@ class NginxConfiguratorTest(util.NginxTest):
def test_perform(self, mock_restart, mock_dvsni_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
pass
# auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
# achall1 = achallenges.DVSNI(
# chall=challenges.DVSNI(
# r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
# nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
# domain="encryption-example.demo", key=auth_key)
# achall2 = achallenges.DVSNI(
# chall=challenges.DVSNI(
# r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
# nonce="59ed014cac95f77057b1d7a1b2c596ba"),
# domain="letsencrypt.demo", key=auth_key)
#
# dvsni_ret_val = [
# challenges.DVSNIResponse(s="randomS1"),
# challenges.DVSNIResponse(s="randomS2"),
# ]
#
# mock_dvsni_perform.return_value = dvsni_ret_val
# responses = self.config.perform([achall1, achall2])
#
# self.assertEqual(mock_dvsni_perform.call_count, 1)
# self.assertEqual(responses, dvsni_ret_val)
#
# self.assertEqual(mock_restart.call_count, 1)
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
achall1 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
domain="encryption-example.demo", key=auth_key)
achall2 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
nonce="59ed014cac95f77057b1d7a1b2c596ba"),
domain="letsencrypt.demo", key=auth_key)
dvsni_ret_val = [
challenges.DVSNIResponse(s="randomS1"),
challenges.DVSNIResponse(s="randomS2"),
]
mock_dvsni_perform.return_value = dvsni_ret_val
responses = self.config.perform([achall1, achall2])
self.assertEqual(mock_dvsni_perform.call_count, 1)
self.assertEqual(responses, dvsni_ret_val)
self.assertEqual(mock_restart.call_count, 1)
@mock.patch("letsencrypt.client.plugins.nginx.configurator."
"subprocess.Popen")