Fixed unit tests and lint

This commit is contained in:
Brad Warren 2015-08-05 15:39:31 -07:00
parent cfabfa1a67
commit 14c150ae17
16 changed files with 111 additions and 105 deletions

View file

@ -211,7 +211,7 @@ class SimpleHTTPResponse(ChallengeResponse):
http_response.text, http_response.headers)
if self.CONTENT_TYPE != http_response.headers.get(
"Content-Type", self.CONTENT_TYPE):
"Content-Type", self.CONTENT_TYPE):
return False
try:

View file

@ -145,28 +145,34 @@ class SimpleHTTPResponseTest(unittest.TestCase):
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_good_token(self, mock_get):
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
for resp in self.resp_http, self.resp_https:
mock_get.reset_mock()
validation = resp.gen_validation(self.chall, account_key)
mock_get.return_value = mock.MagicMock(
text=self.chall.token, headers=self.good_headers)
self.assertTrue(resp.simple_verify(self.chall, "local"))
mock_get.assert_called_once_with(resp.uri("local"), verify=False)
text=validation.json_dumps(), headers=self.good_headers)
self.assertTrue(resp.simple_verify(self.chall, "local", None))
mock_get.assert_called_once_with(resp.uri(
"local", self.chall), verify=False)
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_token(self, mock_get):
mock_get.return_value = mock.MagicMock(
text=self.chall.token + "!", headers=self.good_headers)
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
self.assertFalse(self.resp_http.simple_verify(
self.chall, "local", None))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_content_type(self, mock_get):
mock_get().text = self.chall.token
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
self.assertFalse(self.resp_http.simple_verify(
self.chall, "local", None))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_connection_error(self, mock_get):
mock_get.side_effect = requests.exceptions.RequestException
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
self.assertFalse(self.resp_http.simple_verify(
self.chall, "local", None))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_port(self, mock_get):

View file

@ -24,8 +24,8 @@ class Fixed(jose.Field):
def encode(self, value):
if value != self.value:
logger.warn('Overriding fixed field ({0}) with {1}'.format(
self.json_name, value))
logger.warn(
'Overriding fixed field (%s) with %r', self.json_name, value)
return value

View file

@ -163,7 +163,8 @@ class ApacheDvsni(common.Dvsni):
# parses it as "\n"... c.f.:
# https://docs.python.org/2.7/reference/lexical_analysis.html
return self.VHOST_TEMPLATE.format(
vhost=ips, server_name=achall.nonce_domain,
vhost=ips,
server_name=achall.gen_response(achall.account.key).z_domain,
ssl_options_conf_path=self.configurator.mod_ssl_conf,
cert_path=self.get_cert_path(achall),
key_path=self.get_key_path(achall),

View file

@ -11,7 +11,6 @@ from acme import challenges
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt.tests import acme_util
@ -374,11 +373,11 @@ class TwoVhost80Test(util.ApacheTest):
def test_perform(self, mock_restart, mock_dvsni_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
_, achall1, achall2 = self.get_achalls()
account_key, achall1, achall2 = self.get_achalls()
dvsni_ret_val = [
challenges.DVSNIResponse(s="randomS1"),
challenges.DVSNIResponse(s="randomS2"),
achall1.gen_response(account_key.key),
achall2.gen_response(account_key.key),
]
mock_dvsni_perform.return_value = dvsni_ret_val
@ -585,23 +584,21 @@ class TwoVhost80Test(util.ApacheTest):
def get_achalls(self):
"""Return testing achallenges."""
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
account = mock.MagicMock(key=self.rsa512jwk)
achall1 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
token="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q"),
"pending"),
domain="encryption-example.demo", key=auth_key)
domain="encryption-example.demo", account=account)
achall2 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
nonce="59ed014cac95f77057b1d7a1b2c596ba"),
token="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU"),
"pending"),
domain="letsencrypt.demo", key=auth_key)
domain="letsencrypt.demo", account=account)
return auth_key, achall1, achall2
return account, achall1, achall2
def test_make_addrs_sni_ready(self):
self.config.version = (2, 2)

View file

@ -4,8 +4,6 @@ import shutil
import mock
from acme import challenges
from letsencrypt.plugins import common_test
from letsencrypt_apache import obj
@ -15,6 +13,7 @@ from letsencrypt_apache.tests import util
class DvsniPerformTest(util.ApacheTest):
"""Test the ApacheDVSNI challenge."""
auth_key = common_test.DvsniTest.auth_key
achalls = common_test.DvsniTest.achalls
def setUp(self): # pylint: disable=arguments-differ
@ -44,8 +43,8 @@ class DvsniPerformTest(util.ApacheTest):
achall = self.achalls[0]
self.sni.add_chall(achall)
mock_setup_cert = mock.MagicMock(
return_value=challenges.DVSNIResponse(s="randomS1"))
response = self.achalls[0].gen_response(self.auth_key)
mock_setup_cert = mock.MagicMock(return_value=response)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
@ -58,22 +57,22 @@ class DvsniPerformTest(util.ApacheTest):
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)), 1)
self.assertEqual(len(responses), 1)
self.assertEqual(responses[0].s, "randomS1")
self.assertEqual(responses[0], response)
def test_perform2(self):
# Avoid load module
self.sni.configurator.parser.modules.add("ssl_module")
acme_responses = []
for achall in self.achalls:
self.sni.add_chall(achall)
acme_responses.append(achall.gen_response(self.auth_key))
mock_setup_cert = mock.MagicMock(side_effect=[
challenges.DVSNIResponse(s="randomS0"),
challenges.DVSNIResponse(s="randomS1")])
mock_setup_cert = mock.MagicMock(side_effect=acme_responses)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
responses = self.sni.perform()
sni_responses = self.sni.perform()
self.assertEqual(mock_setup_cert.call_count, 2)
@ -87,13 +86,16 @@ class DvsniPerformTest(util.ApacheTest):
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)),
1)
self.assertEqual(len(responses), 2)
self.assertEqual(len(sni_responses), 2)
for i in xrange(2):
self.assertEqual(responses[i].s, "randomS%d" % i)
self.assertEqual(sni_responses[i], acme_responses[i])
def test_mod_config(self):
z_domains = []
for achall in self.achalls:
self.sni.add_chall(achall)
z_domain = achall.gen_response(self.auth_key).z_domain
z_domains.append(set([z_domain]))
self.sni._mod_config() # pylint: disable=protected-access
self.sni.configurator.save()
@ -111,9 +113,7 @@ class DvsniPerformTest(util.ApacheTest):
for vhost in vhs:
self.assertEqual(vhost.addrs, set([obj.Addr.fromstring("*:443")]))
names = vhost.get_names()
self.assertTrue(
names == set([self.achalls[0].nonce_domain]) or
names == set([self.achalls[1].nonce_domain]))
self.assertTrue(names in z_domains)
def test_get_dvsni_addrs_default(self):
self.sni.configurator.choose_vhost = mock.Mock(

View file

@ -1,6 +1,5 @@
"""Common utilities for letsencrypt_apache."""
import os
import pkg_resources
import sys
import unittest
@ -8,10 +7,14 @@ import augeas
import mock
import zope.component
from acme import jose
from letsencrypt.display import util as display_util
from letsencrypt.plugins import common
from letsencrypt.tests import test_util
from letsencrypt_apache import configurator
from letsencrypt_apache import constants
from letsencrypt_apache import obj
@ -34,10 +37,8 @@ class ApacheTest(unittest.TestCase): # pylint: disable=too-few-public-methods
self.config_path = os.path.join(self.temp_dir, config_root)
self.rsa256_file = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa256_pem = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa512jwk = jose.JWKRSA.load(test_util.load_vector(
"rsa512_key.pem"))
class ParserTest(ApacheTest): # pytlint: disable=too-few-public-methods

View file

@ -132,7 +132,8 @@ class NginxDvsni(common.Dvsni):
block = [['listen', str(addr)] for addr in addrs]
block.extend([['server_name', achall.nonce_domain],
block.extend([['server_name',
achall.gen_response(achall.account.key).z_domain],
['include', self.configurator.parser.loc["ssl_options"]],
# access and error logs necessary for
# integration testing (non-root)

View file

@ -11,7 +11,6 @@ from acme import messages
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt_nginx.tests import util
@ -174,27 +173,23 @@ class NginxConfiguratorTest(util.NginxTest):
def test_perform(self, mock_restart, mock_dvsni_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
account = mock.MagicMock(key=self.rsa512jwk)
achall1 = achallenges.DVSNI(
challb=messages.ChallengeBody(
chall=challenges.DVSNI(
r="foo",
nonce="bar"),
chall=challenges.DVSNI(token="kNdwjwOeX0I_A8DXt9Msmg"),
uri="https://ca.org/chall0_uri",
status=messages.Status("pending"),
), domain="localhost", key=auth_key)
), domain="localhost", account=account)
achall2 = achallenges.DVSNI(
challb=messages.ChallengeBody(
chall=challenges.DVSNI(
r="abc",
nonce="def"),
chall=challenges.DVSNI(token="m8TdO1qik4JVFtgPPurJmg"),
uri="https://ca.org/chall1_uri",
status=messages.Status("pending"),
), domain="example.com", key=auth_key)
), domain="example.com", account=account)
dvsni_ret_val = [
challenges.DVSNIResponse(s="irrelevant"),
challenges.DVSNIResponse(s="arbitrary"),
achall1.gen_response(account.key),
achall2.gen_response(account.key),
]
mock_dvsni_perform.return_value = dvsni_ret_val

View file

@ -19,31 +19,26 @@ from letsencrypt_nginx.tests import util
class DvsniPerformTest(util.NginxTest):
"""Test the NginxDVSNI challenge."""
account = mock.MagicMock(key=common_test.DvsniTest.auth_key)
achalls = [
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="foo",
nonce="bar"
), "pending"),
domain="www.example.com", key=common_test.DvsniTest.auth_key),
challenges.DVSNI(token="kNdwjwOeX0I_A8DXt9Msmg"), "pending"),
domain="www.example.com", account=account),
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7"
"\xa1\xb2\xc5\x96\xba"
token="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y"
"\x80\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945"
), "pending"),
domain="blah", key=common_test.DvsniTest.auth_key),
domain="blah", account=account),
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9"
"\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18"
token="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd"
"\xeb9\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4"
), "pending"),
domain="www.example.org", key=common_test.DvsniTest.auth_key)
domain="www.example.org", account=account),
]
@ -76,8 +71,8 @@ class DvsniPerformTest(util.NginxTest):
@mock.patch("letsencrypt_nginx.configurator.NginxConfigurator.save")
def test_perform1(self, mock_save):
self.sni.add_chall(self.achalls[0])
mock_setup_cert = mock.MagicMock(
return_value=challenges.DVSNIResponse(s="nginxS1"))
response = self.achalls[0].gen_response(self.account.key)
mock_setup_cert = mock.MagicMock(return_value=response)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
@ -85,7 +80,7 @@ class DvsniPerformTest(util.NginxTest):
responses = self.sni.perform()
mock_setup_cert.assert_called_once_with(self.achalls[0])
self.assertEqual([challenges.DVSNIResponse(s="nginxS1")], responses)
self.assertEqual([response], responses)
self.assertEqual(mock_save.call_count, 2)
# Make sure challenge config is included in main config
@ -94,17 +89,16 @@ class DvsniPerformTest(util.NginxTest):
self.assertTrue(['include', self.sni.challenge_conf] in http[1])
def test_perform2(self):
acme_responses = []
for achall in self.achalls:
self.sni.add_chall(achall)
acme_responses.append(achall.gen_response(self.account.key))
mock_setup_cert = mock.MagicMock(side_effect=[
challenges.DVSNIResponse(s="nginxS0"),
challenges.DVSNIResponse(s="nginxS1"),
challenges.DVSNIResponse(s="nginxS2")])
mock_setup_cert = mock.MagicMock(side_effect=acme_responses)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
responses = self.sni.perform()
sni_responses = self.sni.perform()
self.assertEqual(mock_setup_cert.call_count, 3)
@ -117,9 +111,9 @@ class DvsniPerformTest(util.NginxTest):
self.assertTrue(['include', self.sni.challenge_conf] in http[1])
self.assertTrue(['server_name', 'blah'] in http[1][-2][1])
self.assertEqual(len(responses), 3)
self.assertEqual(len(sni_responses), 3)
for i in xrange(3):
self.assertEqual(responses[i].s, "nginxS%d" % i)
self.assertEqual(sni_responses[i], acme_responses[i])
def test_mod_config(self):
self.sni.add_chall(self.achalls[0])
@ -144,12 +138,11 @@ class DvsniPerformTest(util.NginxTest):
for vhost in vhs:
if vhost.addrs == set(v_addr1):
self.assertEqual(
vhost.names, set([self.achalls[0].nonce_domain]))
response = self.achalls[0].gen_response(self.account.key)
else:
response = self.achalls[2].gen_response(self.account.key)
self.assertEqual(vhost.addrs, set(v_addr2))
self.assertEqual(
vhost.names, set([self.achalls[2].nonce_domain]))
self.assertEqual(vhost.names, set([response.z_domain]))
self.assertEqual(len(vhs), 2)

View file

@ -5,6 +5,10 @@ import unittest
import mock
from acme import jose
from letsencrypt.tests import test_util
from letsencrypt.plugins import common
from letsencrypt_nginx import constants
@ -25,10 +29,8 @@ class NginxTest(unittest.TestCase): # pylint: disable=too-few-public-methods
self.config_path = os.path.join(self.temp_dir, "etc_nginx")
self.rsa256_file = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa256_pem = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa512jwk = jose.JWKRSA.load(test_util.load_vector(
"rsa512_key.pem"))
def get_data_filename(filename):

View file

@ -18,7 +18,6 @@ Note, that all annotated challenges act as a proxy objects::
"""
import logging
import os
import OpenSSL
@ -93,9 +92,21 @@ class SimpleHTTP(AnnotatedChallenge):
acme_type = challenges.SimpleHTTP
def gen_response_and_validation(self, tls):
"""Generates a SimpleHTTP response and validation.
:param bool tls: True if TLS should be used
:returns: ``(response, validation)`` tuple, where ``response`` is
an instance of `acme.challenges.SimpleHTTPResponse` and
``validation`` is an instance of
`acme.challenges.SimpleHTTPProvisionedResource`.
:rtype: tuple
"""
response = challenges.SimpleHTTPResponse(tls=tls)
validation = response.gen_validation(self.chall, self.account.key)
validation = response.gen_validation(
self.challb.chall, self.account.key)
logger.debug("Simple HTTP validation payload: %s", validation.payload)
return response, validation

View file

@ -1,16 +1,15 @@
"""Tests for letsencrypt.plugins.common."""
import os
import pkg_resources
import unittest
import mock
from acme import challenges
from acme import jose
from letsencrypt import achallenges
from letsencrypt import le_util
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
class NamespaceFunctionsTest(unittest.TestCase):
@ -111,12 +110,7 @@ class AddrTest(unittest.TestCase):
class DvsniTest(unittest.TestCase):
"""Tests for letsencrypt.plugins.common.DvsniTest."""
rsa256_file = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
rsa256_pem = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
auth_key = le_util.Key(rsa256_file, rsa256_pem)
auth_key = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
achalls = [
achallenges.DVSNI(
challb=acme_util.chall_to_challb(

View file

@ -13,7 +13,6 @@ import zope.component
import zope.interface
from acme import challenges
from acme import jose
from letsencrypt import errors
from letsencrypt import interfaces
@ -158,8 +157,8 @@ binary for temporary key/certificate generation.""".replace("\n", "")
else:
self._notify_and_wait(self.MESSAGE_TEMPLATE.format(
achall=achall, response=response,
uri=response.uri(achall.domain), ct=response.CONTENT_TYPE,
command=command))
uri=response.uri(achall.domain, achall.challb.chall),
ct=response.CONTENT_TYPE, command=command))
if response.simple_verify(
achall.chall, achall.domain,

View file

@ -5,11 +5,16 @@ import unittest
import mock
from acme import challenges
from acme import jose
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class ManualAuthenticatorTest(unittest.TestCase):
@ -21,8 +26,9 @@ class ManualAuthenticatorTest(unittest.TestCase):
no_simple_http_tls=True, simple_http_port=4430,
manual_test_mode=False)
self.auth = ManualAuthenticator(config=self.config, name="manual")
account = mock.MagicMock(key=KEY)
self.achalls = [achallenges.SimpleHTTP(
challb=acme_util.SIMPLE_HTTP, domain="foo.com", key=None)]
challb=acme_util.SIMPLE_HTTP_P, domain="foo.com", account=account)]
config_test_mode = mock.MagicMock(
no_simple_http_tls=True, simple_http_port=4430,
@ -49,14 +55,14 @@ class ManualAuthenticatorTest(unittest.TestCase):
mock_urandom.return_value = "foo"
mock_verify.return_value = True
resp = challenges.SimpleHTTPResponse(tls=False, path='Zm9v')
resp = challenges.SimpleHTTPResponse(tls=False)
self.assertEqual([resp], self.auth.perform(self.achalls))
self.assertEqual(1, mock_raw_input.call_count)
mock_verify.assert_called_with(self.achalls[0].challb, "foo.com", 4430)
mock_verify.assert_called_with(
self.achalls[0].challb.chall, "foo.com", KEY.public_key(), 4430)
message = mock_stdout.write.mock_calls[0][1][0]
self.assertTrue(self.achalls[0].token in message)
self.assertTrue('Zm9v' in message)
mock_verify.return_value = False
self.assertEqual([None], self.auth.perform(self.achalls))

View file

@ -436,7 +436,7 @@ class ReportFailedChallsTest(unittest.TestCase):
self.simple_http = achallenges.SimpleHTTP(
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
domain="example.com",
key=acme_util.KEY)
account=mock.Mock(key=acme_util.KEY))
kwargs["chall"] = acme_util.DVSNI
self.dvsni_same = achallenges.DVSNI(