mirror of
https://github.com/certbot/certbot.git
synced 2026-06-04 06:15:36 -04:00
Merge branch 'warnings-are-errors' into test-everything-warnings-2
This commit is contained in:
commit
7ef96cc528
67 changed files with 1180 additions and 242 deletions
|
|
@ -10,12 +10,16 @@ Certbot adheres to [Semantic Versioning](http://semver.org/).
|
|||
|
||||
### Changed
|
||||
|
||||
*
|
||||
* Removed documentation mentions of `#letsencrypt` IRC on Freenode.
|
||||
* Write README to the base of (config-dir)/live directory
|
||||
* `--manual` will explicitly warn users that earlier challenges should remain in place when setting up subsequent challenges.
|
||||
|
||||
### Fixed
|
||||
|
||||
* Match Nginx parser update in allowing variable names to start with `${`.
|
||||
* Fix ranking of vhosts in Nginx so that all port-matching vhosts come first
|
||||
* Correct OVH integration tests on machines without internet access.
|
||||
* Stop caching the results of ipv6_info in http01.py
|
||||
|
||||
## 0.27.1 - 2018-09-06
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ Anyone who has gone through the trouble of setting up a secure website knows wha
|
|||
|
||||
How you use Certbot depends on the configuration of your web server. The best way to get started is to use our `interactive guide <https://certbot.eff.org>`_. It generates instructions based on your configuration settings. In most cases, you’ll need `root or administrator access <https://certbot.eff.org/faq/#does-certbot-require-root-administrator-privileges>`_ to your web server to run Certbot.
|
||||
|
||||
If you’re using a hosted service and don’t have direct access to your web server, you might not be able to use Certbot. Check with your hosting provider for documentation about uploading certificates or using certificates issued by Let’s Encrypt.
|
||||
Certbot is meant to be run directly on your web server, not on your personal computer. If you’re using a hosted service and don’t have direct access to your web server, you might not be able to use Certbot. Check with your hosting provider for documentation about uploading certificates or using certificates issued by Let’s Encrypt.
|
||||
|
||||
Certbot is a fully-featured, extensible client for the Let's
|
||||
Encrypt CA (or any other CA that speaks the `ACME
|
||||
|
|
@ -91,8 +91,6 @@ Main Website: https://certbot.eff.org
|
|||
|
||||
Let's Encrypt Website: https://letsencrypt.org
|
||||
|
||||
IRC Channel: #letsencrypt on `Freenode`_
|
||||
|
||||
Community: https://community.letsencrypt.org
|
||||
|
||||
ACME spec: http://ietf-wg-acme.github.io/acme/
|
||||
|
|
@ -101,8 +99,6 @@ ACME working area in github: https://github.com/ietf-wg-acme/acme
|
|||
|
||||
|build-status| |coverage| |docs| |container|
|
||||
|
||||
.. _Freenode: https://webchat.freenode.net?channels=%23letsencrypt
|
||||
|
||||
.. |build-status| image:: https://travis-ci.org/certbot/certbot.svg?branch=master
|
||||
:target: https://travis-ci.org/certbot/certbot
|
||||
:alt: Travis CI status
|
||||
|
|
|
|||
|
|
@ -665,7 +665,7 @@ class ClientTest(ClientTestBase):
|
|||
def test_revocation_payload(self):
|
||||
obj = messages.Revocation(certificate=self.certr.body, reason=self.rsn)
|
||||
self.assertTrue('reason' in obj.to_partial_json().keys())
|
||||
self.assertEquals(self.rsn, obj.to_partial_json()['reason'])
|
||||
self.assertEqual(self.rsn, obj.to_partial_json()['reason'])
|
||||
|
||||
def test_revoke_bad_status_raises_error(self):
|
||||
self.response.status_code = http_client.METHOD_NOT_ALLOWED
|
||||
|
|
@ -1038,8 +1038,8 @@ class ClientNetworkTest(unittest.TestCase):
|
|||
|
||||
# Requests Library Exceptions
|
||||
except requests.exceptions.ConnectionError as z: #pragma: no cover
|
||||
self.assertEqual("('Connection aborted.', "
|
||||
"error(111, 'Connection refused'))", str(z))
|
||||
self.assertTrue("('Connection aborted.', error(111, 'Connection refused'))"
|
||||
== str(z) or "[WinError 10061]" in str(z))
|
||||
|
||||
class ClientNetworkWithMockedResponseTest(unittest.TestCase):
|
||||
"""Tests for acme.client.ClientNetwork which mock out response."""
|
||||
|
|
|
|||
|
|
@ -136,22 +136,16 @@ def probe_sni(name, host, port=443, timeout=300,
|
|||
|
||||
socket_kwargs = {'source_address': source_address}
|
||||
|
||||
host_protocol_agnostic = host
|
||||
if host == '::' or host == '0':
|
||||
# https://github.com/python/typeshed/pull/2136
|
||||
# while PR is not merged, we need to ignore
|
||||
host_protocol_agnostic = None
|
||||
|
||||
try:
|
||||
# pylint: disable=star-args
|
||||
logger.debug(
|
||||
"Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
|
||||
"Attempting to connect to %s:%d%s.", host, port,
|
||||
" from {0}:{1}".format(
|
||||
source_address[0],
|
||||
source_address[1]
|
||||
) if socket_kwargs else ""
|
||||
)
|
||||
socket_tuple = (host_protocol_agnostic, port) # type: Tuple[Optional[str], int]
|
||||
socket_tuple = (host, port) # type: Tuple[str, int]
|
||||
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore
|
||||
except socket.error as error:
|
||||
raise errors.Error(error)
|
||||
|
|
|
|||
|
|
@ -209,8 +209,8 @@ class MakeCSRTest(unittest.TestCase):
|
|||
# have a get_extensions() method, so we skip this test if the method
|
||||
# isn't available.
|
||||
if hasattr(csr, 'get_extensions'):
|
||||
self.assertEquals(len(csr.get_extensions()), 1)
|
||||
self.assertEquals(csr.get_extensions()[0].get_data(),
|
||||
self.assertEqual(len(csr.get_extensions()), 1)
|
||||
self.assertEqual(csr.get_extensions()[0].get_data(),
|
||||
OpenSSL.crypto.X509Extension(
|
||||
b'subjectAltName',
|
||||
critical=False,
|
||||
|
|
@ -227,7 +227,7 @@ class MakeCSRTest(unittest.TestCase):
|
|||
# have a get_extensions() method, so we skip this test if the method
|
||||
# isn't available.
|
||||
if hasattr(csr, 'get_extensions'):
|
||||
self.assertEquals(len(csr.get_extensions()), 2)
|
||||
self.assertEqual(len(csr.get_extensions()), 2)
|
||||
# NOTE: Ideally we would filter by the TLS Feature OID, but
|
||||
# OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID,
|
||||
# and the shortname field is just "UNDEF"
|
||||
|
|
|
|||
|
|
@ -523,7 +523,7 @@ class Order(ResourceBody):
|
|||
"""
|
||||
identifiers = jose.Field('identifiers', omitempty=True)
|
||||
status = jose.Field('status', decoder=Status.from_json,
|
||||
omitempty=True, default=STATUS_PENDING)
|
||||
omitempty=True)
|
||||
authorizations = jose.Field('authorizations', omitempty=True)
|
||||
certificate = jose.Field('certificate', omitempty=True)
|
||||
finalize = jose.Field('finalize', omitempty=True)
|
||||
|
|
@ -553,4 +553,3 @@ class OrderResource(ResourceWithURI):
|
|||
class NewOrder(Order):
|
||||
"""New order."""
|
||||
resource_type = 'new-order'
|
||||
resource = fields.Resource(resource_type)
|
||||
|
|
|
|||
|
|
@ -424,6 +424,19 @@ class OrderResourceTest(unittest.TestCase):
|
|||
'authorizations': None,
|
||||
})
|
||||
|
||||
class NewOrderTest(unittest.TestCase):
|
||||
"""Tests for acme.messages.NewOrder."""
|
||||
|
||||
def setUp(self):
|
||||
from acme.messages import NewOrder
|
||||
self.reg = NewOrder(
|
||||
identifiers=mock.sentinel.identifiers)
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.reg.to_json(), {
|
||||
'identifiers': mock.sentinel.identifiers,
|
||||
})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ class TLSSNI01ServerTest(unittest.TestCase):
|
|||
test_util.load_cert('rsa2048_cert.pem'),
|
||||
)}
|
||||
from acme.standalone import TLSSNI01Server
|
||||
self.server = TLSSNI01Server(("", 0), certs=self.certs)
|
||||
self.server = TLSSNI01Server(('localhost', 0), certs=self.certs)
|
||||
# pylint: disable=no-member
|
||||
self.thread = threading.Thread(target=self.server.serve_forever)
|
||||
self.thread.start()
|
||||
|
|
@ -133,8 +133,11 @@ class BaseDualNetworkedServersTest(unittest.TestCase):
|
|||
self.address_family = socket.AF_INET
|
||||
socketserver.TCPServer.__init__(self, *args, **kwargs)
|
||||
if ipv6:
|
||||
# NB: On Windows, socket.IPPROTO_IPV6 constant may be missing.
|
||||
# We use the corresponding value (41) instead.
|
||||
level = getattr(socket, "IPPROTO_IPV6", 41)
|
||||
# pylint: disable=no-member
|
||||
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
|
||||
self.socket.setsockopt(level, socket.IPV6_V6ONLY, 1)
|
||||
try:
|
||||
self.server_bind()
|
||||
self.server_activate()
|
||||
|
|
@ -147,15 +150,15 @@ class BaseDualNetworkedServersTest(unittest.TestCase):
|
|||
mock_bind.side_effect = socket.error
|
||||
from acme.standalone import BaseDualNetworkedServers
|
||||
self.assertRaises(socket.error, BaseDualNetworkedServers,
|
||||
BaseDualNetworkedServersTest.SingleProtocolServer,
|
||||
("", 0),
|
||||
socketserver.BaseRequestHandler)
|
||||
BaseDualNetworkedServersTest.SingleProtocolServer,
|
||||
('', 0),
|
||||
socketserver.BaseRequestHandler)
|
||||
|
||||
def test_ports_equal(self):
|
||||
from acme.standalone import BaseDualNetworkedServers
|
||||
servers = BaseDualNetworkedServers(
|
||||
BaseDualNetworkedServersTest.SingleProtocolServer,
|
||||
("", 0),
|
||||
('', 0),
|
||||
socketserver.BaseRequestHandler)
|
||||
socknames = servers.getsocknames()
|
||||
prev_port = None
|
||||
|
|
@ -177,7 +180,7 @@ class TLSSNI01DualNetworkedServersTest(unittest.TestCase):
|
|||
test_util.load_cert('rsa2048_cert.pem'),
|
||||
)}
|
||||
from acme.standalone import TLSSNI01DualNetworkedServers
|
||||
self.servers = TLSSNI01DualNetworkedServers(("", 0), certs=self.certs)
|
||||
self.servers = TLSSNI01DualNetworkedServers(('localhost', 0), certs=self.certs)
|
||||
self.servers.serve_forever()
|
||||
|
||||
def tearDown(self):
|
||||
|
|
@ -245,6 +248,7 @@ class HTTP01DualNetworkedServersTest(unittest.TestCase):
|
|||
self.assertFalse(self._test_http01(add=False))
|
||||
|
||||
|
||||
@test_util.broken_on_windows
|
||||
class TestSimpleTLSSNI01Server(unittest.TestCase):
|
||||
"""Tests for acme.standalone.simple_tls_sni_01_server."""
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
|
|
@ -94,3 +95,11 @@ def skip_unless(condition, reason): # pragma: no cover
|
|||
return lambda cls: cls
|
||||
else:
|
||||
return lambda cls: None
|
||||
|
||||
def broken_on_windows(function):
|
||||
"""Decorator to skip temporarily a broken test on Windows."""
|
||||
reason = 'Test is broken and ignored on windows but should be fixed.'
|
||||
return unittest.skipIf(
|
||||
sys.platform == 'win32'
|
||||
and os.environ.get('SKIP_BROKEN_TESTS_ON_WINDOWS', 'true') == 'true',
|
||||
reason)(function)
|
||||
|
|
|
|||
25
appveyor.yml
25
appveyor.yml
|
|
@ -1,12 +1,29 @@
|
|||
# AppVeyor CI pipeline, executed on Windows Server 2016/2012 R2
|
||||
image:
|
||||
# => Windows Server 2012 R2
|
||||
- Visual Studio 2015
|
||||
# => Windows Server 2016
|
||||
- Visual Studio 2017
|
||||
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
- /^\d+\.\d+\.x$/ # version branches like X.X.X
|
||||
- /^\d+\.\d+\.x$/ # Version branches like X.X.X
|
||||
- /^test-.*$/
|
||||
|
||||
install:
|
||||
# Use Python 3.7 by default
|
||||
- "SET PATH=C:\\Python37;C:\\Python37\\Scripts;%PATH%"
|
||||
# Check env
|
||||
- "python --version"
|
||||
# Upgrade pip to avoid warnings
|
||||
- "python -m pip install --upgrade pip"
|
||||
# Ready to install tox and coverage
|
||||
- "pip install tox codecov"
|
||||
|
||||
build: off
|
||||
|
||||
test_script:
|
||||
- ps: Write-Host "Hello, world!"
|
||||
|
||||
- tox -c tox-win.ini -e py34,py35,py36,py37-cover
|
||||
|
||||
on_success:
|
||||
- codecov
|
||||
|
|
|
|||
|
|
@ -64,12 +64,12 @@ class AutoHSTSTest(util.ApacheTest):
|
|||
|
||||
self.config.enable_autohsts(mock.MagicMock(), ["ocspvhost.com"])
|
||||
# Verify initial value
|
||||
self.assertEquals(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
self.assertEqual(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
initial_val)
|
||||
# Increase
|
||||
self.config.update_autohsts(mock.MagicMock())
|
||||
# Verify increased value
|
||||
self.assertEquals(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
self.assertEqual(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
inc_val)
|
||||
self.assertTrue(mock_prepare.called)
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ class AutoHSTSTest(util.ApacheTest):
|
|||
initial_val = maxage.format(constants.AUTOHSTS_STEPS[0])
|
||||
self.config.enable_autohsts(mock.MagicMock(), ["ocspvhost.com"])
|
||||
# Verify initial value
|
||||
self.assertEquals(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
self.assertEqual(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
initial_val)
|
||||
|
||||
self.config.update_autohsts(mock.MagicMock())
|
||||
|
|
@ -117,14 +117,14 @@ class AutoHSTSTest(util.ApacheTest):
|
|||
self.config.update_autohsts(mock.MagicMock())
|
||||
# Value should match pre-permanent increment step
|
||||
cur_val = maxage.format(constants.AUTOHSTS_STEPS[i+1])
|
||||
self.assertEquals(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
self.assertEqual(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
cur_val)
|
||||
# Ensure that the value is raised to max
|
||||
self.assertEquals(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
maxage.format(constants.AUTOHSTS_STEPS[-1]))
|
||||
# Make permanent
|
||||
self.config.deploy_autohsts(mock_lineage)
|
||||
self.assertEquals(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
self.assertEqual(self.get_autohsts_value(self.vh_truth[7].path),
|
||||
max_val)
|
||||
|
||||
def test_autohsts_update_noop(self):
|
||||
|
|
@ -156,7 +156,7 @@ class AutoHSTSTest(util.ApacheTest):
|
|||
mock_id.return_value = "1234567"
|
||||
self.config.enable_autohsts(mock.MagicMock(),
|
||||
["ocspvhost.com", "ocspvhost.com"])
|
||||
self.assertEquals(mock_id.call_count, 1)
|
||||
self.assertEqual(mock_id.call_count, 1)
|
||||
|
||||
def test_autohsts_remove_orphaned(self):
|
||||
# pylint: disable=protected-access
|
||||
|
|
|
|||
|
|
@ -81,9 +81,9 @@ class MultipleVhostsTestCentOS(util.ApacheTest):
|
|||
mock_osi.return_value = ("centos", "7")
|
||||
self.config.parser.update_runtime_variables()
|
||||
|
||||
self.assertEquals(mock_get.call_count, 3)
|
||||
self.assertEquals(len(self.config.parser.modules), 4)
|
||||
self.assertEquals(len(self.config.parser.variables), 2)
|
||||
self.assertEqual(mock_get.call_count, 3)
|
||||
self.assertEqual(len(self.config.parser.modules), 4)
|
||||
self.assertEqual(len(self.config.parser.variables), 2)
|
||||
self.assertTrue("TEST2" in self.config.parser.variables.keys())
|
||||
self.assertTrue("mod_another.c" in self.config.parser.modules)
|
||||
|
||||
|
|
@ -127,7 +127,7 @@ class MultipleVhostsTestCentOS(util.ApacheTest):
|
|||
def test_alt_restart_works(self, mock_run_script):
|
||||
mock_run_script.side_effect = [None, errors.SubprocessError, None]
|
||||
self.config.restart()
|
||||
self.assertEquals(mock_run_script.call_count, 3)
|
||||
self.assertEqual(mock_run_script.call_count, 3)
|
||||
|
||||
@mock.patch("certbot_apache.configurator.util.run_script")
|
||||
def test_alt_restart_errors(self, mock_run_script):
|
||||
|
|
|
|||
|
|
@ -1402,11 +1402,11 @@ class MultipleVhostsTest(util.ApacheTest):
|
|||
vhs = self.config._choose_vhosts_wildcard("*.certbot.demo",
|
||||
create_ssl=True)
|
||||
# Check that the dialog was called with one vh: certbot.demo
|
||||
self.assertEquals(mock_select_vhs.call_args[0][0][0], self.vh_truth[3])
|
||||
self.assertEquals(len(mock_select_vhs.call_args_list), 1)
|
||||
self.assertEqual(mock_select_vhs.call_args[0][0][0], self.vh_truth[3])
|
||||
self.assertEqual(len(mock_select_vhs.call_args_list), 1)
|
||||
|
||||
# And the actual returned values
|
||||
self.assertEquals(len(vhs), 1)
|
||||
self.assertEqual(len(vhs), 1)
|
||||
self.assertTrue(vhs[0].name == "certbot.demo")
|
||||
self.assertTrue(vhs[0].ssl)
|
||||
|
||||
|
|
@ -1421,7 +1421,7 @@ class MultipleVhostsTest(util.ApacheTest):
|
|||
vhs = self.config._choose_vhosts_wildcard("*.certbot.demo",
|
||||
create_ssl=False)
|
||||
self.assertFalse(mock_makessl.called)
|
||||
self.assertEquals(vhs[0], self.vh_truth[1])
|
||||
self.assertEqual(vhs[0], self.vh_truth[1])
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._vhosts_for_wildcard")
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.make_vhost_ssl")
|
||||
|
|
@ -1434,15 +1434,15 @@ class MultipleVhostsTest(util.ApacheTest):
|
|||
mock_select_vhs.return_value = [self.vh_truth[7]]
|
||||
vhs = self.config._choose_vhosts_wildcard("whatever",
|
||||
create_ssl=True)
|
||||
self.assertEquals(mock_select_vhs.call_args[0][0][0], self.vh_truth[7])
|
||||
self.assertEquals(len(mock_select_vhs.call_args_list), 1)
|
||||
self.assertEqual(mock_select_vhs.call_args[0][0][0], self.vh_truth[7])
|
||||
self.assertEqual(len(mock_select_vhs.call_args_list), 1)
|
||||
# Ensure that make_vhost_ssl was not called, vhost.ssl == true
|
||||
self.assertFalse(mock_makessl.called)
|
||||
|
||||
# And the actual returned values
|
||||
self.assertEquals(len(vhs), 1)
|
||||
self.assertEqual(len(vhs), 1)
|
||||
self.assertTrue(vhs[0].ssl)
|
||||
self.assertEquals(vhs[0], self.vh_truth[7])
|
||||
self.assertEqual(vhs[0], self.vh_truth[7])
|
||||
|
||||
|
||||
def test_deploy_cert_wildcard(self):
|
||||
|
|
@ -1455,7 +1455,7 @@ class MultipleVhostsTest(util.ApacheTest):
|
|||
self.config.deploy_cert("*.wildcard.example.org", "/tmp/path",
|
||||
"/tmp/path", "/tmp/path", "/tmp/path")
|
||||
self.assertTrue(mock_dep.called)
|
||||
self.assertEquals(len(mock_dep.call_args_list), 1)
|
||||
self.assertEqual(len(mock_dep.call_args_list), 1)
|
||||
self.assertEqual(self.vh_truth[7], mock_dep.call_args_list[0][0][0])
|
||||
|
||||
@mock.patch("certbot_apache.display_ops.select_vhost_multiple")
|
||||
|
|
|
|||
|
|
@ -121,15 +121,15 @@ class MultipleVhostsTestGentoo(util.ApacheTest):
|
|||
mock_osi.return_value = ("gentoo", "123")
|
||||
self.config.parser.update_runtime_variables()
|
||||
|
||||
self.assertEquals(mock_get.call_count, 1)
|
||||
self.assertEquals(len(self.config.parser.modules), 4)
|
||||
self.assertEqual(mock_get.call_count, 1)
|
||||
self.assertEqual(len(self.config.parser.modules), 4)
|
||||
self.assertTrue("mod_another.c" in self.config.parser.modules)
|
||||
|
||||
@mock.patch("certbot_apache.configurator.util.run_script")
|
||||
def test_alt_restart_works(self, mock_run_script):
|
||||
mock_run_script.side_effect = [None, errors.SubprocessError, None]
|
||||
self.config.restart()
|
||||
self.assertEquals(mock_run_script.call_count, 3)
|
||||
self.assertEqual(mock_run_script.call_count, 3)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ class BasicParserTest(util.ParserTest):
|
|||
self.assertEqual(self.parser.aug.get(match), str(i + 1))
|
||||
|
||||
def test_empty_arg(self):
|
||||
self.assertEquals(None,
|
||||
self.assertEqual(None,
|
||||
self.parser.get_arg("/files/whatever/nonexistent"))
|
||||
|
||||
def test_add_dir_to_ifmodssl(self):
|
||||
|
|
@ -303,7 +303,7 @@ class BasicParserTest(util.ParserTest):
|
|||
from certbot_apache.parser import get_aug_path
|
||||
self.parser.add_comment(get_aug_path(self.parser.loc["name"]), "123456")
|
||||
comm = self.parser.find_comments("123456")
|
||||
self.assertEquals(len(comm), 1)
|
||||
self.assertEqual(len(comm), 1)
|
||||
self.assertTrue(self.parser.loc["name"] in comm[0])
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -276,9 +276,9 @@ class GoogleClientTest(unittest.TestCase):
|
|||
[{'managedZones': [{'id': self.zone}]}])
|
||||
# Record name mocked in setUp
|
||||
found = client.get_existing_txt_rrset(self.zone, "_acme-challenge.example.org")
|
||||
self.assertEquals(found, ["\"example-txt-contents\""])
|
||||
self.assertEqual(found, ["\"example-txt-contents\""])
|
||||
not_found = client.get_existing_txt_rrset(self.zone, "nonexistent.tld")
|
||||
self.assertEquals(not_found, None)
|
||||
self.assertEqual(not_found, None)
|
||||
|
||||
@mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name')
|
||||
@mock.patch('certbot_dns_google.dns_google.open',
|
||||
|
|
|
|||
|
|
@ -14,7 +14,11 @@ Named Arguments
|
|||
DNS to propagate before asking the
|
||||
ACME server to verify the DNS
|
||||
record.
|
||||
(Default: 960)
|
||||
(Default: 1200 because Linode
|
||||
updates its first DNS every 15
|
||||
minutes and we allow 5 more minutes
|
||||
for the update to reach the other 5
|
||||
servers)
|
||||
========================================== ===================================
|
||||
|
||||
|
||||
|
|
@ -74,13 +78,15 @@ Examples
|
|||
-d www.example.com
|
||||
|
||||
.. code-block:: bash
|
||||
:caption: To acquire a certificate for ``example.com``, waiting 60 seconds
|
||||
for DNS propagation
|
||||
:caption: To acquire a certificate for ``example.com``, waiting 1000 seconds
|
||||
for DNS propagation (Linode updates its first DNS every 15 minutes
|
||||
and we allow some extra time for the update to reach the other 5
|
||||
servers)
|
||||
|
||||
certbot certonly \\
|
||||
--dns-linode \\
|
||||
--dns-linode-credentials ~/.secrets/certbot/linode.ini \\
|
||||
--dns-linode-propagation-seconds 60 \\
|
||||
--dns-linode-propagation-seconds 1000 \\
|
||||
-d example.com
|
||||
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class Authenticator(dns_common.DNSAuthenticator):
|
|||
|
||||
@classmethod
|
||||
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
|
||||
super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=960)
|
||||
super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=1200)
|
||||
add('credentials', help='Linode credentials INI file.')
|
||||
|
||||
def more_info(self): # pylint: disable=missing-docstring,no-self-use
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import tempfile
|
|||
import time
|
||||
|
||||
import OpenSSL
|
||||
import six
|
||||
import zope.interface
|
||||
|
||||
from acme import challenges
|
||||
|
|
@ -32,6 +31,12 @@ from certbot_nginx import obj # pylint: disable=unused-import
|
|||
from acme.magic_typing import List, Dict, Set # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
|
||||
NAME_RANK = 0
|
||||
START_WILDCARD_RANK = 1
|
||||
END_WILDCARD_RANK = 2
|
||||
REGEX_RANK = 3
|
||||
NO_SSL_MODIFIER = 4
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -405,7 +410,8 @@ class NginxConfigurator(common.Installer):
|
|||
"""
|
||||
if not matches:
|
||||
return None
|
||||
elif matches[0]['rank'] in six.moves.range(2, 6):
|
||||
elif matches[0]['rank'] in [START_WILDCARD_RANK, END_WILDCARD_RANK,
|
||||
START_WILDCARD_RANK + NO_SSL_MODIFIER, END_WILDCARD_RANK + NO_SSL_MODIFIER]:
|
||||
# Wildcard match - need to find the longest one
|
||||
rank = matches[0]['rank']
|
||||
wildcards = [x for x in matches if x['rank'] == rank]
|
||||
|
|
@ -414,10 +420,9 @@ class NginxConfigurator(common.Installer):
|
|||
# Exact or regex match
|
||||
return matches[0]['vhost']
|
||||
|
||||
|
||||
def _rank_matches_by_name_and_ssl(self, vhost_list, target_name):
|
||||
def _rank_matches_by_name(self, vhost_list, target_name):
|
||||
"""Returns a ranked list of vhosts from vhost_list that match target_name.
|
||||
The ranking gives preference to SSL vhosts.
|
||||
This method should always be followed by a call to _select_best_name_match.
|
||||
|
||||
:param list vhost_list: list of vhosts to filter and rank
|
||||
:param str target_name: The name to match
|
||||
|
|
@ -437,21 +442,37 @@ class NginxConfigurator(common.Installer):
|
|||
if name_type == 'exact':
|
||||
matches.append({'vhost': vhost,
|
||||
'name': name,
|
||||
'rank': 0 if vhost.ssl else 1})
|
||||
'rank': NAME_RANK})
|
||||
elif name_type == 'wildcard_start':
|
||||
matches.append({'vhost': vhost,
|
||||
'name': name,
|
||||
'rank': 2 if vhost.ssl else 3})
|
||||
'rank': START_WILDCARD_RANK})
|
||||
elif name_type == 'wildcard_end':
|
||||
matches.append({'vhost': vhost,
|
||||
'name': name,
|
||||
'rank': 4 if vhost.ssl else 5})
|
||||
'rank': END_WILDCARD_RANK})
|
||||
elif name_type == 'regex':
|
||||
matches.append({'vhost': vhost,
|
||||
'name': name,
|
||||
'rank': 6 if vhost.ssl else 7})
|
||||
'rank': REGEX_RANK})
|
||||
return sorted(matches, key=lambda x: x['rank'])
|
||||
|
||||
def _rank_matches_by_name_and_ssl(self, vhost_list, target_name):
|
||||
"""Returns a ranked list of vhosts from vhost_list that match target_name.
|
||||
The ranking gives preference to SSLishness before name match level.
|
||||
|
||||
:param list vhost_list: list of vhosts to filter and rank
|
||||
:param str target_name: The name to match
|
||||
:returns: list of dicts containing the vhost, the matching name, and
|
||||
the numerical rank
|
||||
:rtype: list
|
||||
|
||||
"""
|
||||
matches = self._rank_matches_by_name(vhost_list, target_name)
|
||||
for match in matches:
|
||||
if not match['vhost'].ssl:
|
||||
match['rank'] += NO_SSL_MODIFIER
|
||||
return sorted(matches, key=lambda x: x['rank'])
|
||||
|
||||
def choose_redirect_vhosts(self, target_name, port, create_if_no_match=False):
|
||||
"""Chooses a single virtual host for redirect enhancement.
|
||||
|
|
@ -531,9 +552,7 @@ class NginxConfigurator(common.Installer):
|
|||
|
||||
matching_vhosts = [vhost for vhost in all_vhosts if _vhost_matches(vhost, port)]
|
||||
|
||||
# We can use this ranking function because sslishness doesn't matter to us, and
|
||||
# there shouldn't be conflicting plaintextish servers listening on 80.
|
||||
return self._rank_matches_by_name_and_ssl(matching_vhosts, target_name)
|
||||
return self._rank_matches_by_name(matching_vhosts, target_name)
|
||||
|
||||
def get_all_names(self):
|
||||
"""Returns all names found in the Nginx Configuration.
|
||||
|
|
@ -568,6 +587,7 @@ class NginxConfigurator(common.Installer):
|
|||
return util.get_filtered_names(all_names)
|
||||
|
||||
def _get_snakeoil_paths(self):
|
||||
"""Generate invalid certs that let us create ssl directives for Nginx"""
|
||||
# TODO: generate only once
|
||||
tmp_dir = os.path.join(self.config.work_dir, "snakeoil")
|
||||
le_key = crypto_util.init_save_key(
|
||||
|
|
|
|||
|
|
@ -40,8 +40,6 @@ class NginxHttp01(common.ChallengePerformer):
|
|||
super(NginxHttp01, self).__init__(configurator)
|
||||
self.challenge_conf = os.path.join(
|
||||
configurator.config.config_dir, "le_http_01_cert_challenge.conf")
|
||||
self._ipv6 = None
|
||||
self._ipv6only = None
|
||||
|
||||
def perform(self):
|
||||
"""Perform a challenge on Nginx.
|
||||
|
|
@ -102,6 +100,7 @@ class NginxHttp01(common.ChallengePerformer):
|
|||
config = [self._make_or_mod_server_block(achall) for achall in self.achalls]
|
||||
config = [x for x in config if x is not None]
|
||||
config = nginxparser.UnspacedList(config)
|
||||
logger.debug("Generated server block:\n%s", str(config))
|
||||
|
||||
self.configurator.reverter.register_file_creation(
|
||||
True, self.challenge_conf)
|
||||
|
|
@ -120,9 +119,7 @@ class NginxHttp01(common.ChallengePerformer):
|
|||
self.configurator.config.http01_port)
|
||||
port = self.configurator.config.http01_port
|
||||
|
||||
if self._ipv6 is None or self._ipv6only is None:
|
||||
self._ipv6, self._ipv6only = self.configurator.ipv6_info(port)
|
||||
ipv6, ipv6only = self._ipv6, self._ipv6only
|
||||
ipv6, ipv6only = self.configurator.ipv6_info(port)
|
||||
|
||||
if ipv6:
|
||||
# If IPv6 is active in Nginx configuration
|
||||
|
|
|
|||
392
certbot-nginx/certbot_nginx/parser_obj.py
Normal file
392
certbot-nginx/certbot_nginx/parser_obj.py
Normal file
|
|
@ -0,0 +1,392 @@
|
|||
""" This file contains parsing routines and object classes to help derive meaning from
|
||||
raw lists of tokens from pyparsing. """
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import six
|
||||
|
||||
from certbot import errors
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
COMMENT = " managed by Certbot"
|
||||
COMMENT_BLOCK = ["#", COMMENT]
|
||||
|
||||
class Parsable(object):
|
||||
""" Abstract base class for "Parsable" objects whose underlying representation
|
||||
is a tree of lists.
|
||||
|
||||
:param .Parsable parent: This object's parsed parent in the tree
|
||||
"""
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self, parent=None):
|
||||
self._data = [] # type: List[object]
|
||||
self._tabs = None
|
||||
self.parent = parent
|
||||
|
||||
@classmethod
|
||||
def parsing_hooks(cls):
|
||||
"""Returns object types that this class should be able to `parse` recusrively.
|
||||
The order of the objects indicates the order in which the parser should
|
||||
try to parse each subitem.
|
||||
:returns: A list of Parsable classes.
|
||||
:rtype list:
|
||||
"""
|
||||
return (Block, Sentence, Statements)
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def should_parse(lists):
|
||||
""" Returns whether the contents of `lists` can be parsed into this object.
|
||||
|
||||
:returns: Whether `lists` can be parsed as this object.
|
||||
:rtype bool:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def parse(self, raw_list, add_spaces=False):
|
||||
""" Loads information into this object from underlying raw_list structure.
|
||||
Each Parsable object might make different assumptions about the structure of
|
||||
raw_list.
|
||||
|
||||
:param list raw_list: A list or sublist of tokens from pyparsing, containing whitespace
|
||||
as separate tokens.
|
||||
:param bool add_spaces: If set, the method can and should manipulate and insert spacing
|
||||
between non-whitespace tokens and lists to delimit them.
|
||||
:raises .errors.MisconfigurationError: when the assumptions about the structure of
|
||||
raw_list are not met.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def iterate(self, expanded=False, match=None):
|
||||
""" Iterates across this object. If this object is a leaf object, only yields
|
||||
itself. If it contains references other parsing objects, and `expanded` is set,
|
||||
this function should first yield itself, then recursively iterate across all of them.
|
||||
:param bool expanded: Whether to recursively iterate on possible children.
|
||||
:param callable match: If provided, an object is only iterated if this callable
|
||||
returns True when called on that object.
|
||||
|
||||
:returns: Iterator over desired objects.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_tabs(self):
|
||||
""" Guess at the tabbing style of this parsed object, based on whitespace.
|
||||
|
||||
If this object is a leaf, it deducts the tabbing based on its own contents.
|
||||
Other objects may guess by calling `get_tabs` recursively on child objects.
|
||||
|
||||
:returns: Guess at tabbing for this object. Should only return whitespace strings
|
||||
that does not contain newlines.
|
||||
:rtype str:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def set_tabs(self, tabs=" "):
|
||||
"""This tries to set and alter the tabbing of the current object to a desired
|
||||
whitespace string. Primarily meant for objects that were constructed, so they
|
||||
can conform to surrounding whitespace.
|
||||
|
||||
:param str tabs: A whitespace string (not containing newlines).
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def dump(self, include_spaces=False):
|
||||
""" Dumps back to pyparsing-like list tree. The opposite of `parse`.
|
||||
|
||||
Note: if this object has not been modified, `dump` with `include_spaces=True`
|
||||
should always return the original input of `parse`.
|
||||
|
||||
:param bool include_spaces: If set to False, magically hides whitespace tokens from
|
||||
dumped output.
|
||||
|
||||
:returns: Pyparsing-like list tree.
|
||||
:rtype list:
|
||||
"""
|
||||
return [elem.dump(include_spaces) for elem in self._data]
|
||||
|
||||
class Statements(Parsable):
|
||||
""" A group or list of "Statements". A Statement is either a Block or a Sentence.
|
||||
|
||||
The underlying representation is simply a list of these Statement objects, with
|
||||
an extra `_trailing_whitespace` string to keep track of the whitespace that does not
|
||||
precede any more statements.
|
||||
"""
|
||||
def __init__(self, parent=None):
|
||||
super(Statements, self).__init__(parent)
|
||||
self._trailing_whitespace = None
|
||||
|
||||
# ======== Begin overridden functions
|
||||
|
||||
@staticmethod
|
||||
def should_parse(lists):
|
||||
return isinstance(lists, list)
|
||||
|
||||
def set_tabs(self, tabs=" "):
|
||||
""" Sets the tabbing for this set of statements. Does this by calling `set_tabs`
|
||||
on each of the child statements.
|
||||
|
||||
Then, if a parent is present, sets trailing whitespace to parent tabbing. This
|
||||
is so that the trailing } of any Block that contains Statements lines up
|
||||
with parent tabbing.
|
||||
"""
|
||||
for statement in self._data:
|
||||
statement.set_tabs(tabs)
|
||||
if self.parent is not None:
|
||||
self._trailing_whitespace = "\n" + self.parent.get_tabs()
|
||||
|
||||
def parse(self, parse_this, add_spaces=False):
|
||||
""" Parses a list of statements.
|
||||
Expects all elements in `parse_this` to be parseable by `type(self).parsing_hooks`,
|
||||
with an optional whitespace string at the last index of `parse_this`.
|
||||
"""
|
||||
if not isinstance(parse_this, list):
|
||||
raise errors.MisconfigurationError("Statements parsing expects a list!")
|
||||
# If there's a trailing whitespace in the list of statements, keep track of it.
|
||||
if len(parse_this) > 0 and isinstance(parse_this[-1], six.string_types) \
|
||||
and parse_this[-1].isspace():
|
||||
self._trailing_whitespace = parse_this[-1]
|
||||
parse_this = parse_this[:-1]
|
||||
self._data = [parse_raw(elem, self, add_spaces) for elem in parse_this]
|
||||
|
||||
def get_tabs(self):
|
||||
""" Takes a guess at the tabbing of all contained Statements by retrieving the
|
||||
tabbing of the first Statement."""
|
||||
if len(self._data) > 0:
|
||||
return self._data[0].get_tabs()
|
||||
return ""
|
||||
|
||||
def dump(self, include_spaces=False):
|
||||
""" Dumps this object by first dumping each statement, then appending its
|
||||
trailing whitespace (if `include_spaces` is set) """
|
||||
data = super(Statements, self).dump(include_spaces)
|
||||
if include_spaces and self._trailing_whitespace is not None:
|
||||
return data + [self._trailing_whitespace]
|
||||
return data
|
||||
|
||||
def iterate(self, expanded=False, match=None):
|
||||
""" Combines each statement's iterator. """
|
||||
for elem in self._data:
|
||||
for sub_elem in elem.iterate(expanded, match):
|
||||
yield sub_elem
|
||||
|
||||
# ======== End overridden functions
|
||||
|
||||
def _space_list(list_):
|
||||
""" Inserts whitespace between adjacent non-whitespace tokens. """
|
||||
spaced_statement = [] # type: List[str]
|
||||
for i in reversed(six.moves.xrange(len(list_))):
|
||||
spaced_statement.insert(0, list_[i])
|
||||
if i > 0 and not list_[i].isspace() and not list_[i-1].isspace():
|
||||
spaced_statement.insert(0, " ")
|
||||
return spaced_statement
|
||||
|
||||
class Sentence(Parsable):
|
||||
""" A list of words. Non-whitespace words are typically separated with whitespace tokens. """
|
||||
|
||||
# ======== Begin overridden functions
|
||||
|
||||
@staticmethod
|
||||
def should_parse(lists):
|
||||
""" Returns True if `lists` can be parseable as a `Sentence`-- that is,
|
||||
every element is a string type.
|
||||
|
||||
:param list lists: The raw unparsed list to check.
|
||||
|
||||
:returns: whether this lists is parseable by `Sentence`.
|
||||
"""
|
||||
return isinstance(lists, list) and len(lists) > 0 and \
|
||||
all([isinstance(elem, six.string_types) for elem in lists])
|
||||
|
||||
def parse(self, parse_this, add_spaces=False):
|
||||
""" Parses a list of string types into this object.
|
||||
If add_spaces is set, adds whitespace tokens between adjacent non-whitespace tokens."""
|
||||
if add_spaces:
|
||||
parse_this = _space_list(parse_this)
|
||||
if not isinstance(parse_this, list) or \
|
||||
any([not isinstance(elem, six.string_types) for elem in parse_this]):
|
||||
raise errors.MisconfigurationError("Sentence parsing expects a list of string types.")
|
||||
self._data = parse_this
|
||||
|
||||
def iterate(self, expanded=False, match=None):
|
||||
""" Simply yields itself. """
|
||||
if match is None or match(self):
|
||||
yield self
|
||||
|
||||
def set_tabs(self, tabs=" "):
|
||||
""" Sets the tabbing on this sentence. Inserts a newline and `tabs` at the
|
||||
beginning of `self._data`. """
|
||||
if self._data[0].isspace():
|
||||
return
|
||||
self._data.insert(0, "\n" + tabs)
|
||||
|
||||
def dump(self, include_spaces=False):
|
||||
""" Dumps this sentence. If include_spaces is set, includes whitespace tokens."""
|
||||
if not include_spaces:
|
||||
return self.words
|
||||
return self._data
|
||||
|
||||
def get_tabs(self):
|
||||
""" Guesses at the tabbing of this sentence. If the first element is whitespace,
|
||||
returns the whitespace after the rightmost newline in the string. """
|
||||
first = self._data[0]
|
||||
if not first.isspace():
|
||||
return ""
|
||||
rindex = first.rfind("\n")
|
||||
return first[rindex+1:]
|
||||
|
||||
# ======== End overridden functions
|
||||
|
||||
@property
|
||||
def words(self):
|
||||
""" Iterates over words, but without spaces. Like Unspaced List. """
|
||||
return [word.strip("\"\'") for word in self._data if not word.isspace()]
|
||||
|
||||
def __getitem__(self, index):
|
||||
return self.words[index]
|
||||
|
||||
def __contains__(self, word):
|
||||
return word in self.words
|
||||
|
||||
class Block(Parsable):
|
||||
""" Any sort of bloc, denoted by a block name and curly braces, like so:
|
||||
The parsed block:
|
||||
block name {
|
||||
content 1;
|
||||
content 2;
|
||||
}
|
||||
might be represented with the list [names, contents], where
|
||||
names = ["block", " ", "name", " "]
|
||||
contents = [["\n ", "content", " ", "1"], ["\n ", "content", " ", "2"], "\n"]
|
||||
"""
|
||||
def __init__(self, parent=None):
|
||||
super(Block, self).__init__(parent)
|
||||
self.names = None # type: Sentence
|
||||
self.contents = None # type: Block
|
||||
|
||||
@staticmethod
|
||||
def should_parse(lists):
|
||||
""" Returns True if `lists` can be parseable as a `Block`-- that is,
|
||||
it's got a length of 2, the first element is a `Sentence` and the second can be
|
||||
a `Statements`.
|
||||
|
||||
:param list lists: The raw unparsed list to check.
|
||||
|
||||
:returns: whether this lists is parseable by `Block`. """
|
||||
return isinstance(lists, list) and len(lists) == 2 and \
|
||||
Sentence.should_parse(lists[0]) and isinstance(lists[1], list)
|
||||
|
||||
def set_tabs(self, tabs=" "):
|
||||
""" Sets tabs by setting equivalent tabbing on names, then adding tabbing
|
||||
to contents."""
|
||||
self.names.set_tabs(tabs)
|
||||
self.contents.set_tabs(tabs + " ")
|
||||
|
||||
def iterate(self, expanded=False, match=None):
|
||||
""" Iterator over self, and if expanded is set, over its contents. """
|
||||
if match is None or match(self):
|
||||
yield self
|
||||
if expanded:
|
||||
for elem in self.contents.iterate(expanded, match):
|
||||
yield elem
|
||||
|
||||
def parse(self, parse_this, add_spaces=False):
|
||||
""" Parses a list that resembles a block.
|
||||
|
||||
The assumptions that this routine makes are:
|
||||
1. the first element of `parse_this` is a valid Sentence.
|
||||
2. the second element of `parse_this` is a valid Statement.
|
||||
If add_spaces is set, we call it recursively on `names` and `contents`, and
|
||||
add an extra trailing space to `names` (to separate the block's opening bracket
|
||||
and the block name).
|
||||
"""
|
||||
if not Block.should_parse(parse_this):
|
||||
raise errors.MisconfigurationError("Block parsing expects a list of length 2. "
|
||||
"First element should be a list of string types (the bloc names), "
|
||||
"and second should be another list of statements (the bloc content).")
|
||||
self.names = Sentence(self)
|
||||
if add_spaces:
|
||||
parse_this[0].append(" ")
|
||||
self.names.parse(parse_this[0], add_spaces)
|
||||
self.contents = Statements(self)
|
||||
self.contents.parse(parse_this[1], add_spaces)
|
||||
self._data = [self.names, self.contents]
|
||||
|
||||
def get_tabs(self):
|
||||
""" Guesses tabbing by retrieving tabbing guess of self.names. """
|
||||
return self.names.get_tabs()
|
||||
|
||||
def _is_comment(parsed_obj):
|
||||
""" Checks whether parsed_obj is a comment.
|
||||
|
||||
:param .Parsable parsed_obj:
|
||||
|
||||
:returns: whether parsed_obj represents a comment sentence.
|
||||
:rtype bool:
|
||||
"""
|
||||
if not isinstance(parsed_obj, Sentence):
|
||||
return False
|
||||
return parsed_obj.words[0] == "#"
|
||||
|
||||
def _is_certbot_comment(parsed_obj):
|
||||
""" Checks whether parsed_obj is a "managed by Certbot" comment.
|
||||
|
||||
:param .Parsable parsed_obj:
|
||||
|
||||
:returns: whether parsed_obj is a "managed by Certbot" comment.
|
||||
:rtype bool:
|
||||
"""
|
||||
if not _is_comment(parsed_obj):
|
||||
return False
|
||||
if len(parsed_obj.words) != len(COMMENT_BLOCK):
|
||||
return False
|
||||
for i, word in enumerate(parsed_obj.words):
|
||||
if word != COMMENT_BLOCK[i]:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _certbot_comment(parent, preceding_spaces=4):
|
||||
""" A "Managed by Certbot" comment.
|
||||
:param int preceding_spaces: Number of spaces between the end of the previous
|
||||
statement and the comment.
|
||||
:returns: Sentence containing the comment.
|
||||
:rtype: .Sentence
|
||||
"""
|
||||
result = Sentence(parent)
|
||||
result.parse([" " * preceding_spaces] + COMMENT_BLOCK)
|
||||
return result
|
||||
|
||||
def _choose_parser(parent, list_):
|
||||
""" Choose a parser from type(parent).parsing_hooks, depending on whichever hook
|
||||
returns True first. """
|
||||
hooks = Parsable.parsing_hooks()
|
||||
if parent:
|
||||
hooks = type(parent).parsing_hooks()
|
||||
for type_ in hooks:
|
||||
if type_.should_parse(list_):
|
||||
return type_(parent)
|
||||
raise errors.MisconfigurationError(
|
||||
"None of the parsing hooks succeeded, so we don't know how to parse this set of lists.")
|
||||
|
||||
def parse_raw(lists_, parent=None, add_spaces=False):
|
||||
""" Primary parsing factory function.
|
||||
|
||||
:param list lists_: raw lists from pyparsing to parse.
|
||||
:param .Parent parent: The parent containing this object.
|
||||
:param bool add_spaces: Whether to pass add_spaces to the parser.
|
||||
|
||||
:returns .Parsable: The parsed object.
|
||||
|
||||
:raises errors.MisconfigurationError: If no parsing hook passes, and we can't
|
||||
determine which type to parse the raw lists into.
|
||||
"""
|
||||
parser = _choose_parser(parent, lists_)
|
||||
parser.parse(lists_, add_spaces)
|
||||
return parser
|
||||
|
|
@ -128,22 +128,39 @@ class NginxConfiguratorTest(util.NginxTest):
|
|||
['#', parser.COMMENT]]]],
|
||||
parsed[0])
|
||||
|
||||
def test_choose_vhosts(self):
|
||||
localhost_conf = set(['localhost', r'~^(www\.)?(example|bar)\.'])
|
||||
server_conf = set(['somename', 'another.alias', 'alias'])
|
||||
example_conf = set(['.example.com', 'example.*'])
|
||||
foo_conf = set(['*.www.foo.com', '*.www.example.com'])
|
||||
ipv6_conf = set(['ipv6.com'])
|
||||
def test_choose_vhosts_alias(self):
|
||||
self._test_choose_vhosts_common('alias', 'server_conf')
|
||||
|
||||
results = {'localhost': localhost_conf,
|
||||
'alias': server_conf,
|
||||
'example.com': example_conf,
|
||||
'example.com.uk.test': example_conf,
|
||||
'www.example.com': example_conf,
|
||||
'test.www.example.com': foo_conf,
|
||||
'abc.www.foo.com': foo_conf,
|
||||
'www.bar.co.uk': localhost_conf,
|
||||
'ipv6.com': ipv6_conf}
|
||||
def test_choose_vhosts_example_com(self):
|
||||
self._test_choose_vhosts_common('example.com', 'example_conf')
|
||||
|
||||
def test_choose_vhosts_localhost(self):
|
||||
self._test_choose_vhosts_common('localhost', 'localhost_conf')
|
||||
|
||||
def test_choose_vhosts_example_com_uk_test(self):
|
||||
self._test_choose_vhosts_common('example.com.uk.test', 'example_conf')
|
||||
|
||||
def test_choose_vhosts_www_example_com(self):
|
||||
self._test_choose_vhosts_common('www.example.com', 'example_conf')
|
||||
|
||||
def test_choose_vhosts_test_www_example_com(self):
|
||||
self._test_choose_vhosts_common('test.www.example.com', 'foo_conf')
|
||||
|
||||
def test_choose_vhosts_abc_www_foo_com(self):
|
||||
self._test_choose_vhosts_common('abc.www.foo.com', 'foo_conf')
|
||||
|
||||
def test_choose_vhosts_www_bar_co_uk(self):
|
||||
self._test_choose_vhosts_common('www.bar.co.uk', 'localhost_conf')
|
||||
|
||||
def test_choose_vhosts_ipv6_com(self):
|
||||
self._test_choose_vhosts_common('ipv6.com', 'ipv6_conf')
|
||||
|
||||
def _test_choose_vhosts_common(self, name, conf):
|
||||
conf_names = {'localhost_conf': set(['localhost', r'~^(www\.)?(example|bar)\.']),
|
||||
'server_conf': set(['somename', 'another.alias', 'alias']),
|
||||
'example_conf': set(['.example.com', 'example.*']),
|
||||
'foo_conf': set(['*.www.foo.com', '*.www.example.com']),
|
||||
'ipv6_conf': set(['ipv6.com'])}
|
||||
|
||||
conf_path = {'localhost': "etc_nginx/nginx.conf",
|
||||
'alias': "etc_nginx/nginx.conf",
|
||||
|
|
@ -155,31 +172,31 @@ class NginxConfiguratorTest(util.NginxTest):
|
|||
'www.bar.co.uk': "etc_nginx/nginx.conf",
|
||||
'ipv6.com': "etc_nginx/sites-enabled/ipv6.com"}
|
||||
|
||||
vhost = self.config.choose_vhosts(name)[0]
|
||||
path = os.path.relpath(vhost.filep, self.temp_dir)
|
||||
|
||||
self.assertEqual(conf_names[conf], vhost.names)
|
||||
self.assertEqual(conf_path[name], path)
|
||||
# IPv6 specific checks
|
||||
if name == "ipv6.com":
|
||||
self.assertTrue(vhost.ipv6_enabled())
|
||||
# Make sure that we have SSL enabled also for IPv6 addr
|
||||
self.assertTrue(
|
||||
any([True for x in vhost.addrs if x.ssl and x.ipv6]))
|
||||
|
||||
def test_choose_vhosts_bad(self):
|
||||
bad_results = ['www.foo.com', 'example', 't.www.bar.co',
|
||||
'69.255.225.155']
|
||||
|
||||
for name in results:
|
||||
vhost = self.config.choose_vhosts(name)[0]
|
||||
path = os.path.relpath(vhost.filep, self.temp_dir)
|
||||
|
||||
self.assertEqual(results[name], vhost.names)
|
||||
self.assertEqual(conf_path[name], path)
|
||||
# IPv6 specific checks
|
||||
if name == "ipv6.com":
|
||||
self.assertTrue(vhost.ipv6_enabled())
|
||||
# Make sure that we have SSL enabled also for IPv6 addr
|
||||
self.assertTrue(
|
||||
any([True for x in vhost.addrs if x.ssl and x.ipv6]))
|
||||
|
||||
for name in bad_results:
|
||||
self.assertRaises(errors.MisconfigurationError,
|
||||
self.config.choose_vhosts, name)
|
||||
|
||||
def test_ipv6only(self):
|
||||
# ipv6_info: (ipv6_active, ipv6only_present)
|
||||
self.assertEquals((True, False), self.config.ipv6_info("80"))
|
||||
self.assertEqual((True, False), self.config.ipv6_info("80"))
|
||||
# Port 443 has ipv6only=on because of ipv6ssl.com vhost
|
||||
self.assertEquals((True, True), self.config.ipv6_info("443"))
|
||||
self.assertEqual((True, True), self.config.ipv6_info("443"))
|
||||
|
||||
def test_ipv6only_detection(self):
|
||||
self.config.version = (1, 3, 1)
|
||||
|
|
@ -790,7 +807,7 @@ class NginxConfiguratorTest(util.NginxTest):
|
|||
self.assertTrue(vhost in mock_select_vhs.call_args[0][0])
|
||||
|
||||
# And the actual returned values
|
||||
self.assertEquals(len(vhs), 1)
|
||||
self.assertEqual(len(vhs), 1)
|
||||
self.assertEqual(vhs[0], vhost)
|
||||
|
||||
def test_choose_vhosts_wildcard_redirect(self):
|
||||
|
|
@ -806,7 +823,7 @@ class NginxConfiguratorTest(util.NginxTest):
|
|||
self.assertTrue(vhost in mock_select_vhs.call_args[0][0])
|
||||
|
||||
# And the actual returned values
|
||||
self.assertEquals(len(vhs), 1)
|
||||
self.assertEqual(len(vhs), 1)
|
||||
self.assertEqual(vhs[0], vhost)
|
||||
|
||||
def test_deploy_cert_wildcard(self):
|
||||
|
|
@ -821,7 +838,7 @@ class NginxConfiguratorTest(util.NginxTest):
|
|||
self.config.deploy_cert("*.com", "/tmp/path",
|
||||
"/tmp/path", "/tmp/path", "/tmp/path")
|
||||
self.assertTrue(mock_dep.called)
|
||||
self.assertEquals(len(mock_dep.call_args_list), 1)
|
||||
self.assertEqual(len(mock_dep.call_args_list), 1)
|
||||
self.assertEqual(vhost, mock_dep.call_args_list[0][0][0])
|
||||
|
||||
@mock.patch("certbot_nginx.display_ops.select_vhost_multiple")
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from certbot import achallenges
|
|||
from certbot.plugins import common_test
|
||||
from certbot.tests import acme_util
|
||||
|
||||
from certbot_nginx.obj import Addr
|
||||
from certbot_nginx.tests import util
|
||||
|
||||
|
||||
|
|
@ -108,6 +109,41 @@ class HttpPerformTest(util.NginxTest):
|
|||
# self.assertEqual(vhost.addrs, set(v_addr2_print))
|
||||
# self.assertEqual(vhost.names, set([response.z_domain.decode('ascii')]))
|
||||
|
||||
@mock.patch("certbot_nginx.configurator.NginxConfigurator.ipv6_info")
|
||||
def test_default_listen_addresses_no_memoization(self, ipv6_info):
|
||||
# pylint: disable=protected-access
|
||||
ipv6_info.return_value = (True, True)
|
||||
self.http01._default_listen_addresses()
|
||||
self.assertEqual(ipv6_info.call_count, 1)
|
||||
ipv6_info.return_value = (False, False)
|
||||
self.http01._default_listen_addresses()
|
||||
self.assertEqual(ipv6_info.call_count, 2)
|
||||
|
||||
@mock.patch("certbot_nginx.configurator.NginxConfigurator.ipv6_info")
|
||||
def test_default_listen_addresses_t_t(self, ipv6_info):
|
||||
# pylint: disable=protected-access
|
||||
ipv6_info.return_value = (True, True)
|
||||
addrs = self.http01._default_listen_addresses()
|
||||
http_addr = Addr.fromstring("80")
|
||||
http_ipv6_addr = Addr.fromstring("[::]:80")
|
||||
self.assertEqual(addrs, [http_addr, http_ipv6_addr])
|
||||
|
||||
@mock.patch("certbot_nginx.configurator.NginxConfigurator.ipv6_info")
|
||||
def test_default_listen_addresses_t_f(self, ipv6_info):
|
||||
# pylint: disable=protected-access
|
||||
ipv6_info.return_value = (True, False)
|
||||
addrs = self.http01._default_listen_addresses()
|
||||
http_addr = Addr.fromstring("80")
|
||||
http_ipv6_addr = Addr.fromstring("[::]:80 ipv6only=on")
|
||||
self.assertEqual(addrs, [http_addr, http_ipv6_addr])
|
||||
|
||||
@mock.patch("certbot_nginx.configurator.NginxConfigurator.ipv6_info")
|
||||
def test_default_listen_addresses_f_f(self, ipv6_info):
|
||||
# pylint: disable=protected-access
|
||||
ipv6_info.return_value = (False, False)
|
||||
addrs = self.http01._default_listen_addresses()
|
||||
http_addr = Addr.fromstring("80")
|
||||
self.assertEqual(addrs, [http_addr])
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
253
certbot-nginx/certbot_nginx/tests/parser_obj_test.py
Normal file
253
certbot-nginx/certbot_nginx/tests/parser_obj_test.py
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
""" Tests for functions and classes in parser_obj.py """
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
from certbot_nginx.parser_obj import parse_raw
|
||||
from certbot_nginx.parser_obj import COMMENT_BLOCK
|
||||
|
||||
class CommentHelpersTest(unittest.TestCase):
|
||||
def test_is_comment(self):
|
||||
from certbot_nginx.parser_obj import _is_comment
|
||||
self.assertTrue(_is_comment(parse_raw(['#'])))
|
||||
self.assertTrue(_is_comment(parse_raw(['#', ' literally anything else'])))
|
||||
self.assertFalse(_is_comment(parse_raw(['not', 'even', 'a', 'comment'])))
|
||||
|
||||
def test_is_certbot_comment(self):
|
||||
from certbot_nginx.parser_obj import _is_certbot_comment
|
||||
self.assertTrue(_is_certbot_comment(
|
||||
parse_raw(COMMENT_BLOCK)))
|
||||
self.assertFalse(_is_certbot_comment(
|
||||
parse_raw(['#', ' not a certbot comment'])))
|
||||
self.assertFalse(_is_certbot_comment(
|
||||
parse_raw(['#', ' managed by Certbot', ' also not a certbot comment'])))
|
||||
self.assertFalse(_is_certbot_comment(
|
||||
parse_raw(['not', 'even', 'a', 'comment'])))
|
||||
|
||||
def test_certbot_comment(self):
|
||||
from certbot_nginx.parser_obj import _certbot_comment, _is_certbot_comment
|
||||
comment = _certbot_comment(None)
|
||||
self.assertTrue(_is_certbot_comment(comment))
|
||||
self.assertEqual(comment.dump(), COMMENT_BLOCK)
|
||||
self.assertEqual(comment.dump(True), [' '] + COMMENT_BLOCK)
|
||||
self.assertEqual(_certbot_comment(None, 2).dump(True),
|
||||
[' '] + COMMENT_BLOCK)
|
||||
|
||||
class ParsingHooksTest(unittest.TestCase):
|
||||
def test_is_sentence(self):
|
||||
from certbot_nginx.parser_obj import Sentence
|
||||
self.assertFalse(Sentence.should_parse([]))
|
||||
self.assertTrue(Sentence.should_parse(['']))
|
||||
self.assertTrue(Sentence.should_parse(['word']))
|
||||
self.assertTrue(Sentence.should_parse(['two', 'words']))
|
||||
self.assertFalse(Sentence.should_parse([[]]))
|
||||
self.assertFalse(Sentence.should_parse(['word', []]))
|
||||
|
||||
def test_is_block(self):
|
||||
from certbot_nginx.parser_obj import Block
|
||||
self.assertFalse(Block.should_parse([]))
|
||||
self.assertFalse(Block.should_parse(['']))
|
||||
self.assertFalse(Block.should_parse(['two', 'words']))
|
||||
self.assertFalse(Block.should_parse([[[]], []]))
|
||||
self.assertFalse(Block.should_parse([['block_name'], ['hi', []], []]))
|
||||
self.assertFalse(Block.should_parse([['block_name'], 'lol']))
|
||||
self.assertTrue(Block.should_parse([['block_name'], ['hi', []]]))
|
||||
self.assertTrue(Block.should_parse([['hello'], []]))
|
||||
self.assertTrue(Block.should_parse([['block_name'], [['many'], ['statements'], 'here']]))
|
||||
self.assertTrue(Block.should_parse([['if', ' ', '(whatever)'], ['hi']]))
|
||||
|
||||
def test_parse_raw(self):
|
||||
fake_parser1 = mock.Mock()
|
||||
fake_parser1.should_parse = lambda x: True
|
||||
fake_parser2 = mock.Mock()
|
||||
fake_parser2.should_parse = lambda x: False
|
||||
# First encountered "match" should parse.
|
||||
parse_raw([])
|
||||
fake_parser1.called_once()
|
||||
fake_parser2.not_called()
|
||||
fake_parser1.reset_mock()
|
||||
# "match" that returns False shouldn't parse.
|
||||
parse_raw([])
|
||||
fake_parser1.not_called()
|
||||
fake_parser2.called_once()
|
||||
|
||||
@mock.patch("certbot_nginx.parser_obj.Parsable.parsing_hooks")
|
||||
def test_parse_raw_no_match(self, parsing_hooks):
|
||||
from certbot import errors
|
||||
fake_parser1 = mock.Mock()
|
||||
fake_parser1.should_parse = lambda x: False
|
||||
parsing_hooks.return_value = (fake_parser1,)
|
||||
self.assertRaises(errors.MisconfigurationError, parse_raw, [])
|
||||
parsing_hooks.return_value = tuple()
|
||||
self.assertRaises(errors.MisconfigurationError, parse_raw, [])
|
||||
|
||||
def test_parse_raw_passes_add_spaces(self):
|
||||
fake_parser1 = mock.Mock()
|
||||
fake_parser1.should_parse = lambda x: True
|
||||
parse_raw([])
|
||||
fake_parser1.parse.called_with([None])
|
||||
parse_raw([], add_spaces=True)
|
||||
fake_parser1.parse.called_with([None, True])
|
||||
|
||||
class SentenceTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from certbot_nginx.parser_obj import Sentence
|
||||
self.sentence = Sentence(None)
|
||||
|
||||
def test_parse_bad_sentence_raises_error(self):
|
||||
from certbot import errors
|
||||
self.assertRaises(errors.MisconfigurationError, self.sentence.parse, 'lol')
|
||||
self.assertRaises(errors.MisconfigurationError, self.sentence.parse, [[]])
|
||||
self.assertRaises(errors.MisconfigurationError, self.sentence.parse, [5])
|
||||
|
||||
def test_parse_sentence_words_hides_spaces(self):
|
||||
og_sentence = ['\r\n', 'hello', ' ', ' ', '\t\n ', 'lol', ' ', 'spaces']
|
||||
self.sentence.parse(og_sentence)
|
||||
self.assertEquals(self.sentence.words, ['hello', 'lol', 'spaces'])
|
||||
self.assertEquals(self.sentence.dump(), ['hello', 'lol', 'spaces'])
|
||||
self.assertEquals(self.sentence.dump(True), og_sentence)
|
||||
|
||||
def test_parse_sentence_with_add_spaces(self):
|
||||
self.sentence.parse(['hi', 'there'], add_spaces=True)
|
||||
self.assertEquals(self.sentence.dump(True), ['hi', ' ', 'there'])
|
||||
self.sentence.parse(['one', ' ', 'space', 'none'], add_spaces=True)
|
||||
self.assertEquals(self.sentence.dump(True), ['one', ' ', 'space', ' ', 'none'])
|
||||
|
||||
def test_iterate(self):
|
||||
expected = [['1', '2', '3']]
|
||||
self.sentence.parse(['1', ' ', '2', ' ', '3'])
|
||||
for i, sentence in enumerate(self.sentence.iterate()):
|
||||
self.assertEquals(sentence.dump(), expected[i])
|
||||
|
||||
def test_set_tabs(self):
|
||||
self.sentence.parse(['tabs', 'pls'], add_spaces=True)
|
||||
self.sentence.set_tabs()
|
||||
self.assertEquals(self.sentence.dump(True)[0], '\n ')
|
||||
self.sentence.parse(['tabs', 'pls'], add_spaces=True)
|
||||
|
||||
def test_get_tabs(self):
|
||||
self.sentence.parse(['no', 'tabs'])
|
||||
self.assertEquals(self.sentence.get_tabs(), '')
|
||||
self.sentence.parse(['\n \n ', 'tabs'])
|
||||
self.assertEquals(self.sentence.get_tabs(), ' ')
|
||||
self.sentence.parse(['\n\t ', 'tabs'])
|
||||
self.assertEquals(self.sentence.get_tabs(), '\t ')
|
||||
self.sentence.parse(['\n\t \n', 'tabs'])
|
||||
self.assertEquals(self.sentence.get_tabs(), '')
|
||||
|
||||
class BlockTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from certbot_nginx.parser_obj import Block
|
||||
self.bloc = Block(None)
|
||||
self.name = ['server', 'name']
|
||||
self.contents = [['thing', '1'], ['thing', '2'], ['another', 'one']]
|
||||
self.bloc.parse([self.name, self.contents])
|
||||
|
||||
def test_iterate(self):
|
||||
# Iterates itself normally
|
||||
self.assertEquals(self.bloc, next(self.bloc.iterate()))
|
||||
# Iterates contents while expanded
|
||||
expected = [self.bloc.dump()] + self.contents
|
||||
for i, elem in enumerate(self.bloc.iterate(expanded=True)):
|
||||
self.assertEquals(expected[i], elem.dump())
|
||||
|
||||
def test_iterate_match(self):
|
||||
# can match on contents while expanded
|
||||
from certbot_nginx.parser_obj import Block, Sentence
|
||||
expected = [['thing', '1'], ['thing', '2']]
|
||||
for i, elem in enumerate(self.bloc.iterate(expanded=True,
|
||||
match=lambda x: isinstance(x, Sentence) and 'thing' in x.words)):
|
||||
self.assertEquals(expected[i], elem.dump())
|
||||
# can match on self
|
||||
self.assertEquals(self.bloc, next(self.bloc.iterate(
|
||||
expanded=True,
|
||||
match=lambda x: isinstance(x, Block) and 'server' in x.names)))
|
||||
|
||||
def test_parse_with_added_spaces(self):
|
||||
import copy
|
||||
self.bloc.parse([copy.copy(self.name), self.contents], add_spaces=True)
|
||||
self.assertEquals(self.bloc.dump(), [self.name, self.contents])
|
||||
self.assertEquals(self.bloc.dump(True), [
|
||||
['server', ' ', 'name', ' '],
|
||||
[['thing', ' ', '1'],
|
||||
['thing', ' ', '2'],
|
||||
['another', ' ', 'one']]])
|
||||
|
||||
def test_bad_parse_raises_error(self):
|
||||
from certbot import errors
|
||||
self.assertRaises(errors.MisconfigurationError, self.bloc.parse, [[[]], [[]]])
|
||||
self.assertRaises(errors.MisconfigurationError, self.bloc.parse, ['lol'])
|
||||
self.assertRaises(errors.MisconfigurationError, self.bloc.parse, ['fake', 'news'])
|
||||
|
||||
def test_set_tabs(self):
|
||||
self.bloc.set_tabs()
|
||||
self.assertEquals(self.bloc.names.dump(True)[0], '\n ')
|
||||
for elem in self.bloc.contents.dump(True)[:-1]:
|
||||
self.assertEquals(elem[0], '\n ')
|
||||
self.assertEquals(self.bloc.contents.dump(True)[-1][0], '\n')
|
||||
|
||||
def test_get_tabs(self):
|
||||
self.bloc.parse([[' \n \t', 'lol'], []])
|
||||
self.assertEquals(self.bloc.get_tabs(), ' \t')
|
||||
|
||||
class StatementsTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from certbot_nginx.parser_obj import Statements
|
||||
self.statements = Statements(None)
|
||||
self.raw = [
|
||||
['sentence', 'one'],
|
||||
['sentence', 'two'],
|
||||
['and', 'another']
|
||||
]
|
||||
self.raw_spaced = [
|
||||
['\n ', 'sentence', ' ', 'one'],
|
||||
['\n ', 'sentence', ' ', 'two'],
|
||||
['\n ', 'and', ' ', 'another'],
|
||||
'\n\n'
|
||||
]
|
||||
|
||||
def test_set_tabs(self):
|
||||
self.statements.parse(self.raw)
|
||||
self.statements.set_tabs()
|
||||
for statement in self.statements.iterate():
|
||||
self.assertEquals(statement.dump(True)[0], '\n ')
|
||||
|
||||
def test_set_tabs_with_parent(self):
|
||||
# Trailing whitespace should inherit from parent tabbing.
|
||||
self.statements.parse(self.raw)
|
||||
self.statements.parent = mock.Mock()
|
||||
self.statements.parent.get_tabs.return_value = '\t\t'
|
||||
self.statements.set_tabs()
|
||||
for statement in self.statements.iterate():
|
||||
self.assertEquals(statement.dump(True)[0], '\n ')
|
||||
self.assertEquals(self.statements.dump(True)[-1], '\n\t\t')
|
||||
|
||||
def test_get_tabs(self):
|
||||
self.raw[0].insert(0, '\n \n \t')
|
||||
self.statements.parse(self.raw)
|
||||
self.assertEquals(self.statements.get_tabs(), ' \t')
|
||||
self.statements.parse([])
|
||||
self.assertEquals(self.statements.get_tabs(), '')
|
||||
|
||||
def test_parse_with_added_spaces(self):
|
||||
self.statements.parse(self.raw, add_spaces=True)
|
||||
self.assertEquals(self.statements.dump(True)[0], ['sentence', ' ', 'one'])
|
||||
|
||||
def test_parse_bad_list_raises_error(self):
|
||||
from certbot import errors
|
||||
self.assertRaises(errors.MisconfigurationError, self.statements.parse, 'lol not a list')
|
||||
|
||||
def test_parse_hides_trailing_whitespace(self):
|
||||
self.statements.parse(self.raw + ['\n\n '])
|
||||
self.assertTrue(isinstance(self.statements.dump()[-1], list))
|
||||
self.assertTrue(self.statements.dump(True)[-1].isspace())
|
||||
self.assertEquals(self.statements.dump(True)[-1], '\n\n ')
|
||||
|
||||
def test_iterate(self):
|
||||
self.statements.parse(self.raw)
|
||||
expected = [['sentence', 'one'], ['sentence', 'two']]
|
||||
for i, elem in enumerate(self.statements.iterate(match=lambda x: 'sentence' in x)):
|
||||
self.assertEquals(expected[i], elem.dump())
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
@ -51,9 +51,6 @@ class NginxTlsSni01(common.TLSSNI01):
|
|||
default_addr = "{0} ssl".format(
|
||||
self.configurator.config.tls_sni_01_port)
|
||||
|
||||
ipv6, ipv6only = self.configurator.ipv6_info(
|
||||
self.configurator.config.tls_sni_01_port)
|
||||
|
||||
for achall in self.achalls:
|
||||
vhosts = self.configurator.choose_vhosts(achall.domain, create_if_no_match=True)
|
||||
|
||||
|
|
@ -61,6 +58,9 @@ class NginxTlsSni01(common.TLSSNI01):
|
|||
if vhosts and vhosts[0].addrs:
|
||||
addresses.append(list(vhosts[0].addrs))
|
||||
else:
|
||||
# choose_vhosts might have modified vhosts, so put this after
|
||||
ipv6, ipv6only = self.configurator.ipv6_info(
|
||||
self.configurator.config.tls_sni_01_port)
|
||||
if ipv6:
|
||||
# If IPv6 is active in Nginx configuration
|
||||
ipv6_addr = "[::]:{0} ssl".format(
|
||||
|
|
@ -141,6 +141,8 @@ class NginxTlsSni01(common.TLSSNI01):
|
|||
with open(self.challenge_conf, "w") as new_conf:
|
||||
nginxparser.dump(config, new_conf)
|
||||
|
||||
logger.debug("Generated server block:\n%s", str(config))
|
||||
|
||||
def _make_server_block(self, achall, addrs):
|
||||
"""Creates a server block for a challenge.
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ test_deployment_and_rollback() {
|
|||
}
|
||||
|
||||
export default_server="default_server"
|
||||
nginx -v
|
||||
reload_nginx
|
||||
certbot_test_nginx --domains nginx.wtf run
|
||||
test_deployment_and_rollback nginx.wtf
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ class PostConfTest(unittest.TestCase):
|
|||
self.config.set('extra_param', 'another_value')
|
||||
self.config.flush()
|
||||
arguments = mock_out.call_args_list[-1][0][0]
|
||||
self.assertEquals('-e', arguments[0])
|
||||
self.assertEqual('-e', arguments[0])
|
||||
self.assertTrue('default_parameter=new_value' in arguments)
|
||||
self.assertTrue('extra_param=another_value' in arguments)
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
Compatibility layer to run certbot both on Linux and Windows.
|
||||
|
||||
The approach used here is similar to Modernizr for Web browsers.
|
||||
We do not check the plateform type to determine if a particular logic is supported.
|
||||
We do not check the platform type to determine if a particular logic is supported.
|
||||
Instead, we apply a logic, and then fallback to another logic if first logic
|
||||
is not supported at runtime.
|
||||
|
||||
|
|
@ -13,6 +13,7 @@ import select
|
|||
import sys
|
||||
import errno
|
||||
import ctypes
|
||||
import stat
|
||||
|
||||
from certbot import errors
|
||||
|
||||
|
|
@ -138,3 +139,12 @@ def release_locked_file(fd, path):
|
|||
raise
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
def compare_file_modes(mode1, mode2):
|
||||
"""Return true if the two modes can be considered as equals for this platform"""
|
||||
if 'fcntl' in sys.modules:
|
||||
# Linux specific: standard compare
|
||||
return oct(stat.S_IMODE(mode1)) == oct(stat.S_IMODE(mode2))
|
||||
# Windows specific: most of mode bits are ignored on Windows. Only check user R/W rights.
|
||||
return (stat.S_IMODE(mode1) & stat.S_IREAD == stat.S_IMODE(mode2) & stat.S_IREAD
|
||||
and stat.S_IMODE(mode1) & stat.S_IWRITE == stat.S_IMODE(mode2) & stat.S_IWRITE)
|
||||
|
|
|
|||
|
|
@ -449,14 +449,17 @@ def _notAfterBefore(cert_path, method):
|
|||
def sha256sum(filename):
|
||||
"""Compute a sha256sum of a file.
|
||||
|
||||
NB: In given file, platform specific newlines characters will be converted
|
||||
into their equivalent unicode counterparts before calculating the hash.
|
||||
|
||||
:param str filename: path to the file whose hash will be computed
|
||||
|
||||
:returns: sha256 digest of the file in hexadecimal
|
||||
:rtype: str
|
||||
"""
|
||||
sha256 = hashlib.sha256()
|
||||
with open(filename, 'rb') as f:
|
||||
sha256.update(f.read())
|
||||
with open(filename, 'rU') as file_d:
|
||||
sha256.update(file_d.read().encode('UTF-8'))
|
||||
return sha256.hexdigest()
|
||||
|
||||
def cert_and_chain_from_fullchain(fullchain_pem):
|
||||
|
|
|
|||
|
|
@ -49,9 +49,9 @@ class Completer(object):
|
|||
readline.set_completer(self.complete)
|
||||
readline.set_completer_delims(' \t\n;')
|
||||
|
||||
# readline can be implemented using GNU readline or libedit
|
||||
# readline can be implemented using GNU readline, pyreadline or libedit
|
||||
# which have different configuration syntax
|
||||
if 'libedit' in readline.__doc__:
|
||||
if readline.__doc__ is not None and 'libedit' in readline.__doc__:
|
||||
readline.parse_and_bind('bind ^I rl_complete')
|
||||
else:
|
||||
readline.parse_and_bind('tab: complete')
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ def _wrap_lines(msg):
|
|||
break_long_words=False,
|
||||
break_on_hyphens=False))
|
||||
|
||||
return os.linesep.join(fixed_l)
|
||||
return '\n'.join(fixed_l)
|
||||
|
||||
|
||||
def input_with_timeout(prompt=None, timeout=36000.0):
|
||||
|
|
|
|||
|
|
@ -1135,7 +1135,8 @@ def _csr_get_and_save_cert(config, le_client):
|
|||
"Dry run: skipping saving certificate to %s", config.cert_path)
|
||||
return None, None
|
||||
cert_path, _, fullchain_path = le_client.save_certificate(
|
||||
cert, chain, config.cert_path, config.chain_path, config.fullchain_path)
|
||||
cert, chain, os.path.normpath(config.cert_path),
|
||||
os.path.normpath(config.chain_path), os.path.normpath(config.fullchain_path))
|
||||
return cert_path, fullchain_path
|
||||
|
||||
def renew_cert(config, plugins, lineage):
|
||||
|
|
|
|||
|
|
@ -37,11 +37,11 @@ class EnhancementTest(test_util.ConfigTestCase):
|
|||
self.assertTrue([i for i in enabled if i["name"] == "somethingelse"])
|
||||
|
||||
def test_are_requested(self):
|
||||
self.assertEquals(
|
||||
self.assertEqual(
|
||||
len([i for i in enhancements.enabled_enhancements(self.config)]), 0)
|
||||
self.assertFalse(enhancements.are_requested(self.config))
|
||||
self.config.auto_hsts = True
|
||||
self.assertEquals(
|
||||
self.assertEqual(
|
||||
len([i for i in enhancements.enabled_enhancements(self.config)]), 1)
|
||||
self.assertTrue(enhancements.are_requested(self.config))
|
||||
|
||||
|
|
@ -57,7 +57,7 @@ class EnhancementTest(test_util.ConfigTestCase):
|
|||
lineage = "lineage"
|
||||
enhancements.enable(lineage, domains, self.mockinstaller, self.config)
|
||||
self.assertTrue(self.mockinstaller.enable_autohsts.called)
|
||||
self.assertEquals(self.mockinstaller.enable_autohsts.call_args[0],
|
||||
self.assertEqual(self.mockinstaller.enable_autohsts.call_args[0],
|
||||
(lineage, domains))
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -94,6 +94,16 @@ using the secret key
|
|||
{key}
|
||||
when it receives a TLS ClientHello with the SNI extension set to
|
||||
{sni_domain}
|
||||
"""
|
||||
_SUBSEQUENT_CHALLENGE_INSTRUCTIONS = """
|
||||
(This must be set up in addition to the previous challenges; do not remove,
|
||||
replace, or undo the previous challenge tasks yet.)
|
||||
"""
|
||||
_SUBSEQUENT_DNS_CHALLENGE_INSTRUCTIONS = """
|
||||
(This must be set up in addition to the previous challenges; do not remove,
|
||||
replace, or undo the previous challenge tasks yet. Note that you might be
|
||||
asked to create multiple distinct TXT records with the same name. This is
|
||||
permitted by DNS standards.)
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
|
@ -103,6 +113,8 @@ when it receives a TLS ClientHello with the SNI extension set to
|
|||
self.env = dict() \
|
||||
# type: Dict[achallenges.KeyAuthorizationAnnotatedChallenge, Dict[str, str]]
|
||||
self.tls_sni_01 = None
|
||||
self.subsequent_dns_challenge = False
|
||||
self.subsequent_any_challenge = False
|
||||
|
||||
@classmethod
|
||||
def add_parser_arguments(cls, add):
|
||||
|
|
@ -212,8 +224,17 @@ when it receives a TLS ClientHello with the SNI extension set to
|
|||
key=self.tls_sni_01.get_key_path(achall),
|
||||
port=self.config.tls_sni_01_port,
|
||||
sni_domain=self.tls_sni_01.get_z_domain(achall))
|
||||
if isinstance(achall.chall, challenges.DNS01):
|
||||
if self.subsequent_dns_challenge:
|
||||
# 2nd or later dns-01 challenge
|
||||
msg += self._SUBSEQUENT_DNS_CHALLENGE_INSTRUCTIONS
|
||||
self.subsequent_dns_challenge = True
|
||||
elif self.subsequent_any_challenge:
|
||||
# 2nd or later challenge of another type
|
||||
msg += self._SUBSEQUENT_CHALLENGE_INSTRUCTIONS
|
||||
display = zope.component.getUtility(interfaces.IDisplay)
|
||||
display.notification(msg, wrap=False, force_interactive=True)
|
||||
self.subsequent_any_challenge = True
|
||||
|
||||
def cleanup(self, achalls): # pylint: disable=missing-docstring
|
||||
if self.conf('cleanup-hook'):
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import unittest
|
|||
|
||||
import six
|
||||
import mock
|
||||
import sys
|
||||
|
||||
from acme import challenges
|
||||
|
||||
|
|
@ -20,8 +21,9 @@ class AuthenticatorTest(test_util.TempDirTestCase):
|
|||
super(AuthenticatorTest, self).setUp()
|
||||
self.http_achall = acme_util.HTTP01_A
|
||||
self.dns_achall = acme_util.DNS01_A
|
||||
self.dns_achall_2 = acme_util.DNS01_A_2
|
||||
self.tls_sni_achall = acme_util.TLSSNI01_A
|
||||
self.achalls = [self.http_achall, self.dns_achall, self.tls_sni_achall]
|
||||
self.achalls = [self.http_achall, self.dns_achall, self.tls_sni_achall, self.dns_achall_2]
|
||||
for d in ["config_dir", "work_dir", "in_progress"]:
|
||||
os.mkdir(os.path.join(self.tempdir, d))
|
||||
# "backup_dir" and "temp_checkpoint_dir" get created in
|
||||
|
|
@ -74,12 +76,14 @@ class AuthenticatorTest(test_util.TempDirTestCase):
|
|||
def test_script_perform(self):
|
||||
self.config.manual_public_ip_logging_ok = True
|
||||
self.config.manual_auth_hook = (
|
||||
'echo ${CERTBOT_DOMAIN}; '
|
||||
'echo ${CERTBOT_TOKEN:-notoken}; '
|
||||
'echo ${CERTBOT_CERT_PATH:-nocert}; '
|
||||
'echo ${CERTBOT_KEY_PATH:-nokey}; '
|
||||
'echo ${CERTBOT_SNI_DOMAIN:-nosnidomain}; '
|
||||
'echo ${CERTBOT_VALIDATION:-novalidation};')
|
||||
'{0} -c "from __future__ import print_function;'
|
||||
'import os; print(os.environ.get(\'CERTBOT_DOMAIN\'));'
|
||||
'print(os.environ.get(\'CERTBOT_TOKEN\', \'notoken\'));'
|
||||
'print(os.environ.get(\'CERTBOT_CERT_PATH\', \'nocert\'));'
|
||||
'print(os.environ.get(\'CERTBOT_KEY_PATH\', \'nokey\'));'
|
||||
'print(os.environ.get(\'CERTBOT_SNI_DOMAIN\', \'nosnidomain\'));'
|
||||
'print(os.environ.get(\'CERTBOT_VALIDATION\', \'novalidation\'));"'
|
||||
.format(sys.executable))
|
||||
dns_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}'.format(
|
||||
self.dns_achall.domain, 'notoken',
|
||||
'nocert', 'nokey', 'nosnidomain',
|
||||
|
|
@ -127,6 +131,7 @@ class AuthenticatorTest(test_util.TempDirTestCase):
|
|||
achall.validation(achall.account_key) in args[0])
|
||||
self.assertFalse(kwargs['wrap'])
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_cleanup(self):
|
||||
self.config.manual_public_ip_logging_ok = True
|
||||
self.config.manual_auth_hook = 'echo foo;'
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ class GetUnpreparedInstallerTest(test_util.ConfigTestCase):
|
|||
|
||||
def test_no_installer_defined(self):
|
||||
self.config.configurator = None
|
||||
self.assertEquals(self._call(), None)
|
||||
self.assertEqual(self._call(), None)
|
||||
|
||||
def test_no_available_installers(self):
|
||||
self.config.configurator = "apache"
|
||||
|
|
|
|||
|
|
@ -4,13 +4,14 @@ import unittest
|
|||
|
||||
import mock
|
||||
|
||||
|
||||
class GetPrefixTest(unittest.TestCase):
|
||||
"""Tests for certbot.plugins.get_prefixes."""
|
||||
def test_get_prefix(self):
|
||||
from certbot.plugins.util import get_prefixes
|
||||
self.assertEqual(get_prefixes('/a/b/c'), ['/a/b/c', '/a/b', '/a', '/'])
|
||||
self.assertEqual(get_prefixes('/'), ['/'])
|
||||
self.assertEqual(
|
||||
get_prefixes('/a/b/c'),
|
||||
[os.path.normpath(path) for path in ['/a/b/c', '/a/b', '/a', '/']])
|
||||
self.assertEqual(get_prefixes('/'), [os.path.normpath('/')])
|
||||
self.assertEqual(get_prefixes('a'), ['a'])
|
||||
|
||||
class PathSurgeryTest(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@ from __future__ import print_function
|
|||
|
||||
import argparse
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import stat
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
|
|
@ -17,6 +17,7 @@ import six
|
|||
from acme import challenges
|
||||
|
||||
from certbot import achallenges
|
||||
from certbot import compat
|
||||
from certbot import errors
|
||||
from certbot.display import util as display_util
|
||||
|
||||
|
|
@ -142,6 +143,7 @@ class AuthenticatorTest(unittest.TestCase):
|
|||
self.assertRaises(errors.PluginError, self.auth.perform, [])
|
||||
os.chmod(self.path, 0o700)
|
||||
|
||||
@test_util.skip_on_windows('On Windows, there is no chown.')
|
||||
@mock.patch("certbot.plugins.webroot.os.chown")
|
||||
def test_failed_chown(self, mock_chown):
|
||||
mock_chown.side_effect = OSError(errno.EACCES, "msg")
|
||||
|
|
@ -169,16 +171,14 @@ class AuthenticatorTest(unittest.TestCase):
|
|||
# Remove exec bit from permission check, so that it
|
||||
# matches the file
|
||||
self.auth.perform([self.achall])
|
||||
path_permissions = stat.S_IMODE(os.stat(self.validation_path).st_mode)
|
||||
self.assertEqual(path_permissions, 0o644)
|
||||
self.assertTrue(compat.compare_file_modes(os.stat(self.validation_path).st_mode, 0o644))
|
||||
|
||||
# Check permissions of the directories
|
||||
|
||||
for dirpath, dirnames, _ in os.walk(self.path):
|
||||
for directory in dirnames:
|
||||
full_path = os.path.join(dirpath, directory)
|
||||
dir_permissions = stat.S_IMODE(os.stat(full_path).st_mode)
|
||||
self.assertEqual(dir_permissions, 0o755)
|
||||
self.assertTrue(compat.compare_file_modes(os.stat(full_path).st_mode, 0o755))
|
||||
|
||||
parent_gid = os.stat(self.path).st_gid
|
||||
parent_uid = os.stat(self.path).st_uid
|
||||
|
|
@ -274,7 +274,7 @@ class WebrootActionTest(unittest.TestCase):
|
|||
|
||||
def test_webroot_map_action(self):
|
||||
args = self.parser.parse_args(
|
||||
["--webroot-map", '{{"thing.com":"{0}"}}'.format(self.path)])
|
||||
["--webroot-map", json.dumps({'thing.com': self.path})])
|
||||
self.assertEqual(args.webroot_map["thing.com"], self.path)
|
||||
|
||||
def test_domain_before_webroot(self):
|
||||
|
|
|
|||
|
|
@ -301,7 +301,7 @@ def renew_cert(config, domains, le_client, lineage):
|
|||
domains = lineage.names()
|
||||
# The private key is the existing lineage private key if reuse_key is set.
|
||||
# Otherwise, generate a fresh private key by passing None.
|
||||
new_key = lineage.privkey if config.reuse_key else None
|
||||
new_key = os.path.normpath(lineage.privkey) if config.reuse_key else None
|
||||
new_cert, new_chain, new_key, _ = le_client.obtain_certificate(domains, new_key)
|
||||
if config.dry_run:
|
||||
logger.debug("Dry run: skipping updating lineage at %s",
|
||||
|
|
|
|||
|
|
@ -214,6 +214,26 @@ def get_link_target(link):
|
|||
target = os.path.join(os.path.dirname(link), target)
|
||||
return os.path.abspath(target)
|
||||
|
||||
def _write_live_readme_to(readme_path, is_base_dir=False):
|
||||
prefix = ""
|
||||
if is_base_dir:
|
||||
prefix = "[cert name]/"
|
||||
with open(readme_path, "w") as f:
|
||||
logger.debug("Writing README to %s.", readme_path)
|
||||
f.write("This directory contains your keys and certificates.\n\n"
|
||||
"`{prefix}privkey.pem` : the private key for your certificate.\n"
|
||||
"`{prefix}fullchain.pem`: the certificate file used in most server software.\n"
|
||||
"`{prefix}chain.pem` : used for OCSP stapling in Nginx >=1.3.7.\n"
|
||||
"`{prefix}cert.pem` : will break many server configurations, and "
|
||||
"should not be used\n"
|
||||
" without reading further documentation (see link below).\n\n"
|
||||
"WARNING: DO NOT MOVE OR RENAME THESE FILES!\n"
|
||||
" Certbot expects these files to remain in this location in order\n"
|
||||
" to function properly!\n\n"
|
||||
"We recommend not moving these files. For more information, see the Certbot\n"
|
||||
"User Guide at https://certbot.eff.org/docs/using.html#where-are-my-"
|
||||
"certificates.\n".format(prefix=prefix))
|
||||
|
||||
|
||||
def _relevant(option):
|
||||
"""
|
||||
|
|
@ -1003,6 +1023,9 @@ class RenewableCert(object):
|
|||
logger.debug("Creating directory %s.", i)
|
||||
config_file, config_filename = util.unique_lineage_name(
|
||||
cli_config.renewal_configs_dir, lineagename)
|
||||
base_readme_path = os.path.join(cli_config.live_dir, README)
|
||||
if not os.path.exists(base_readme_path):
|
||||
_write_live_readme_to(base_readme_path, is_base_dir=True)
|
||||
|
||||
# Determine where on disk everything will go
|
||||
# lineagename will now potentially be modified based on which
|
||||
|
|
@ -1045,21 +1068,7 @@ class RenewableCert(object):
|
|||
|
||||
# Write a README file to the live directory
|
||||
readme_path = os.path.join(live_dir, README)
|
||||
with open(readme_path, "w") as f:
|
||||
logger.debug("Writing README to %s.", readme_path)
|
||||
f.write("This directory contains your keys and certificates.\n\n"
|
||||
"`privkey.pem` : the private key for your certificate.\n"
|
||||
"`fullchain.pem`: the certificate file used in most server software.\n"
|
||||
"`chain.pem` : used for OCSP stapling in Nginx >=1.3.7.\n"
|
||||
"`cert.pem` : will break many server configurations, and "
|
||||
"should not be used\n"
|
||||
" without reading further documentation (see link below).\n\n"
|
||||
"WARNING: DO NOT MOVE THESE FILES!\n"
|
||||
" Certbot expects these files to remain in this location in order\n"
|
||||
" to function properly!\n\n"
|
||||
"We recommend not moving these files. For more information, see the Certbot\n"
|
||||
"User Guide at https://certbot.eff.org/docs/using.html#where-are-my-"
|
||||
"certificates.\n")
|
||||
_write_live_readme_to(readme_path)
|
||||
|
||||
# Document what we've done in a new renewal config file
|
||||
config_file.close()
|
||||
|
|
|
|||
|
|
@ -116,6 +116,7 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
def test_init_creates_dir(self):
|
||||
self.assertTrue(os.path.isdir(self.config.accounts_dir))
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_save_and_restore(self):
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
account_path = os.path.join(self.config.accounts_dir, self.acc.id)
|
||||
|
|
@ -218,12 +219,14 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_upgrade_version_staging(self):
|
||||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertEqual([self.acc], self.storage.find_all())
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_upgrade_version_production(self):
|
||||
self._set_server('https://acme-v01.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
|
|
@ -241,6 +244,7 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_upgrade_load(self):
|
||||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
|
|
@ -249,6 +253,7 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
account = self.storage.load(self.acc.id)
|
||||
self.assertEqual(prev_account, account)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_upgrade_load_single_account(self):
|
||||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
|
|
@ -273,6 +278,7 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
errors.AccountStorageError, self.storage.save,
|
||||
self.acc, self.mock_client)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_delete(self):
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
self.storage.delete(self.acc.id)
|
||||
|
|
@ -307,10 +313,12 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertRaises(errors.AccountNotFound, self.storage.load, self.acc.id)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_delete_folders_up(self):
|
||||
self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self._assert_symlinked_account_removed()
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_delete_folders_down(self):
|
||||
self._test_delete_folders('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self._assert_symlinked_account_removed()
|
||||
|
|
@ -320,10 +328,12 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
with open(os.path.join(self.config.accounts_dir, 'foo'), 'w') as f:
|
||||
f.write('bar')
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_delete_shared_account_up(self):
|
||||
self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory')
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_delete_shared_account_down(self):
|
||||
self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self._test_delete_folders('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ HTTP01 = challenges.HTTP01(
|
|||
TLSSNI01 = challenges.TLSSNI01(
|
||||
token=jose.b64decode(b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJyPCt92wrDoA"))
|
||||
DNS01 = challenges.DNS01(token=b"17817c66b60ce2e4012dfad92657527a")
|
||||
DNS01_2 = challenges.DNS01(token=b"cafecafecafecafecafecafe0feedbac")
|
||||
|
||||
CHALLENGES = [HTTP01, TLSSNI01, DNS01]
|
||||
|
||||
|
|
@ -49,6 +50,7 @@ def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
|
|||
TLSSNI01_P = chall_to_challb(TLSSNI01, messages.STATUS_PENDING)
|
||||
HTTP01_P = chall_to_challb(HTTP01, messages.STATUS_PENDING)
|
||||
DNS01_P = chall_to_challb(DNS01, messages.STATUS_PENDING)
|
||||
DNS01_P_2 = chall_to_challb(DNS01_2, messages.STATUS_PENDING)
|
||||
|
||||
CHALLENGES_P = [HTTP01_P, TLSSNI01_P, DNS01_P]
|
||||
|
||||
|
|
@ -57,6 +59,7 @@ CHALLENGES_P = [HTTP01_P, TLSSNI01_P, DNS01_P]
|
|||
HTTP01_A = auth_handler.challb_to_achall(HTTP01_P, JWK, "example.com")
|
||||
TLSSNI01_A = auth_handler.challb_to_achall(TLSSNI01_P, JWK, "example.net")
|
||||
DNS01_A = auth_handler.challb_to_achall(DNS01_P, JWK, "example.org")
|
||||
DNS01_A_2 = auth_handler.challb_to_achall(DNS01_P_2, JWK, "esimerkki.example.org")
|
||||
|
||||
ACHALLENGES = [HTTP01_A, TLSSNI01_A, DNS01_A]
|
||||
|
||||
|
|
|
|||
|
|
@ -204,7 +204,7 @@ class CertificatesTest(BaseCertManagerTest):
|
|||
shutil.rmtree(empty_tempdir)
|
||||
|
||||
@mock.patch('certbot.cert_manager.ocsp.RevocationChecker.ocsp_revoked')
|
||||
def test_report_human_readable(self, mock_revoked):
|
||||
def test_report_human_readable(self, mock_revoked): #pylint: disable=too-many-statements
|
||||
mock_revoked.return_value = None
|
||||
from certbot import cert_manager
|
||||
import datetime, pytz
|
||||
|
|
@ -228,7 +228,7 @@ class CertificatesTest(BaseCertManagerTest):
|
|||
cert.target_expiry += datetime.timedelta(hours=2)
|
||||
# pylint: disable=protected-access
|
||||
out = get_report()
|
||||
self.assertTrue('1 hour(s)' in out)
|
||||
self.assertTrue('1 hour(s)' in out or '2 hour(s)' in out)
|
||||
self.assertTrue('VALID' in out and not 'INVALID' in out)
|
||||
|
||||
cert.target_expiry += datetime.timedelta(days=1)
|
||||
|
|
@ -589,7 +589,7 @@ class GetCertnameTest(unittest.TestCase):
|
|||
from certbot import cert_manager
|
||||
prompt = "Which certificate would you"
|
||||
self.mock_get_utility().menu.return_value = (display_util.OK, 0)
|
||||
self.assertEquals(
|
||||
self.assertEqual(
|
||||
cert_manager.get_certnames(
|
||||
self.config, "verb", allow_multiple=False), ['example.com'])
|
||||
self.assertTrue(
|
||||
|
|
@ -603,11 +603,11 @@ class GetCertnameTest(unittest.TestCase):
|
|||
from certbot import cert_manager
|
||||
prompt = "custom prompt"
|
||||
self.mock_get_utility().menu.return_value = (display_util.OK, 0)
|
||||
self.assertEquals(
|
||||
self.assertEqual(
|
||||
cert_manager.get_certnames(
|
||||
self.config, "verb", allow_multiple=False, custom_prompt=prompt),
|
||||
['example.com'])
|
||||
self.assertEquals(self.mock_get_utility().menu.call_args[0][0],
|
||||
self.assertEqual(self.mock_get_utility().menu.call_args[0][0],
|
||||
prompt)
|
||||
|
||||
@mock.patch('certbot.storage.renewal_conf_files')
|
||||
|
|
@ -631,7 +631,7 @@ class GetCertnameTest(unittest.TestCase):
|
|||
prompt = "Which certificate(s) would you"
|
||||
self.mock_get_utility().checklist.return_value = (display_util.OK,
|
||||
['example.com'])
|
||||
self.assertEquals(
|
||||
self.assertEqual(
|
||||
cert_manager.get_certnames(
|
||||
self.config, "verb", allow_multiple=True), ['example.com'])
|
||||
self.assertTrue(
|
||||
|
|
@ -646,11 +646,11 @@ class GetCertnameTest(unittest.TestCase):
|
|||
prompt = "custom prompt"
|
||||
self.mock_get_utility().checklist.return_value = (display_util.OK,
|
||||
['example.com'])
|
||||
self.assertEquals(
|
||||
self.assertEqual(
|
||||
cert_manager.get_certnames(
|
||||
self.config, "verb", allow_multiple=True, custom_prompt=prompt),
|
||||
['example.com'])
|
||||
self.assertEquals(
|
||||
self.assertEqual(
|
||||
self.mock_get_utility().checklist.call_args[0][0],
|
||||
prompt)
|
||||
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@ class ParseTest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
|
||||
return output.getvalue()
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch("certbot.cli.flag_default")
|
||||
def test_cli_ini_domains(self, mock_flag_default):
|
||||
tmp_config = tempfile.NamedTemporaryFile()
|
||||
|
|
|
|||
|
|
@ -487,7 +487,7 @@ class EnhanceConfigTest(ClientTestCommon):
|
|||
self.config.hsts = True
|
||||
self._test_with_already_existing()
|
||||
self.assertTrue(mock_log.warning.called)
|
||||
self.assertEquals(mock_log.warning.call_args[0][1],
|
||||
self.assertEqual(mock_log.warning.call_args[0][1],
|
||||
'Strict-Transport-Security')
|
||||
|
||||
@mock.patch("certbot.client.logger")
|
||||
|
|
@ -495,7 +495,7 @@ class EnhanceConfigTest(ClientTestCommon):
|
|||
self.config.redirect = True
|
||||
self._test_with_already_existing()
|
||||
self.assertTrue(mock_log.warning.called)
|
||||
self.assertEquals(mock_log.warning.call_args[0][1],
|
||||
self.assertEqual(mock_log.warning.call_args[0][1],
|
||||
'redirect')
|
||||
|
||||
def test_no_ask_hsts(self):
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ class ImportCSRFileTest(unittest.TestCase):
|
|||
util.CSR(file=csrfile,
|
||||
data=data_pem,
|
||||
form="pem"),
|
||||
["Example.com"],),
|
||||
["Example.com"]),
|
||||
self._call(csrfile, data))
|
||||
|
||||
def test_pem_csr(self):
|
||||
|
|
@ -376,7 +376,6 @@ class NotAfterTest(unittest.TestCase):
|
|||
|
||||
class Sha256sumTest(unittest.TestCase):
|
||||
"""Tests for certbot.crypto_util.notAfter"""
|
||||
|
||||
def test_sha256sum(self):
|
||||
from certbot.crypto_util import sha256sum
|
||||
self.assertEqual(sha256sum(CERT_PATH),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
"""Test certbot.display.completer."""
|
||||
import os
|
||||
import readline
|
||||
try:
|
||||
import readline # pylint: disable=import-error
|
||||
except ImportError:
|
||||
import certbot.display.dummy_readline as readline # type: ignore
|
||||
import string
|
||||
import sys
|
||||
import unittest
|
||||
|
|
@ -9,9 +12,9 @@ import mock
|
|||
from six.moves import reload_module # pylint: disable=import-error
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot.tests.util import TempDirTestCase
|
||||
import certbot.tests.util as test_util
|
||||
|
||||
class CompleterTest(TempDirTestCase):
|
||||
class CompleterTest(test_util.TempDirTestCase):
|
||||
"""Test certbot.display.completer.Completer."""
|
||||
|
||||
def setUp(self):
|
||||
|
|
@ -47,6 +50,8 @@ class CompleterTest(TempDirTestCase):
|
|||
completion = my_completer.complete(self.tempdir, num_paths)
|
||||
self.assertEqual(completion, None)
|
||||
|
||||
@unittest.skipIf('readline' not in sys.modules,
|
||||
reason='Not relevant if readline is not available.')
|
||||
def test_import_error(self):
|
||||
original_readline = sys.modules['readline']
|
||||
sys.modules['readline'] = None
|
||||
|
|
@ -91,7 +96,7 @@ class CompleterTest(TempDirTestCase):
|
|||
|
||||
def enable_tab_completion(unused_command):
|
||||
"""Enables readline tab completion using the system specific syntax."""
|
||||
libedit = 'libedit' in readline.__doc__
|
||||
libedit = readline.__doc__ is not None and 'libedit' in readline.__doc__
|
||||
command = 'bind ^I rl_complete' if libedit else 'tab: complete'
|
||||
readline.parse_and_bind(command)
|
||||
|
||||
|
|
|
|||
|
|
@ -502,9 +502,9 @@ class ChooseValuesTest(unittest.TestCase):
|
|||
items = ["first", "second", "third"]
|
||||
mock_util().checklist.return_value = (display_util.OK, [items[2]])
|
||||
result = self._call(items, None)
|
||||
self.assertEquals(result, [items[2]])
|
||||
self.assertEqual(result, [items[2]])
|
||||
self.assertTrue(mock_util().checklist.called)
|
||||
self.assertEquals(mock_util().checklist.call_args[0][0], None)
|
||||
self.assertEqual(mock_util().checklist.call_args[0][0], None)
|
||||
|
||||
@test_util.patch_get_utility("certbot.display.ops.z_util")
|
||||
def test_choose_names_success_question(self, mock_util):
|
||||
|
|
@ -512,9 +512,9 @@ class ChooseValuesTest(unittest.TestCase):
|
|||
question = "Which one?"
|
||||
mock_util().checklist.return_value = (display_util.OK, [items[1]])
|
||||
result = self._call(items, question)
|
||||
self.assertEquals(result, [items[1]])
|
||||
self.assertEqual(result, [items[1]])
|
||||
self.assertTrue(mock_util().checklist.called)
|
||||
self.assertEquals(mock_util().checklist.call_args[0][0], question)
|
||||
self.assertEqual(mock_util().checklist.call_args[0][0], question)
|
||||
|
||||
@test_util.patch_get_utility("certbot.display.ops.z_util")
|
||||
def test_choose_names_user_cancel(self, mock_util):
|
||||
|
|
@ -522,9 +522,9 @@ class ChooseValuesTest(unittest.TestCase):
|
|||
question = "Want to cancel?"
|
||||
mock_util().checklist.return_value = (display_util.CANCEL, [])
|
||||
result = self._call(items, question)
|
||||
self.assertEquals(result, [])
|
||||
self.assertEqual(result, [])
|
||||
self.assertTrue(mock_util().checklist.called)
|
||||
self.assertEquals(mock_util().checklist.call_args[0][0], question)
|
||||
self.assertEqual(mock_util().checklist.call_args[0][0], question)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
"""Test :mod:`certbot.display.util`."""
|
||||
import inspect
|
||||
import os
|
||||
import socket
|
||||
import tempfile
|
||||
import unittest
|
||||
|
|
@ -10,7 +9,6 @@ import mock
|
|||
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
|
||||
from certbot.display import util as display_util
|
||||
|
||||
|
||||
|
|
@ -281,10 +279,10 @@ class FileOutputDisplayTest(unittest.TestCase):
|
|||
msg = ("This is just a weak test{0}"
|
||||
"This function is only meant to be for easy viewing{0}"
|
||||
"Test a really really really really really really really really "
|
||||
"really really really really long line...".format(os.linesep))
|
||||
"really really really really long line...".format('\n'))
|
||||
text = display_util._wrap_lines(msg)
|
||||
|
||||
self.assertEqual(text.count(os.linesep), 3)
|
||||
self.assertEqual(text.count('\n'), 3)
|
||||
|
||||
def test_get_valid_int_ans_valid(self):
|
||||
# pylint: disable=protected-access
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import mock
|
|||
from acme.magic_typing import Callable, Dict, Union
|
||||
# pylint: enable=unused-import, no-name-in-module
|
||||
|
||||
import certbot.tests.util as test_util
|
||||
|
||||
def get_signals(signums):
|
||||
"""Get the handlers for an iterable of signums."""
|
||||
|
|
@ -65,6 +66,8 @@ class ErrorHandlerTest(unittest.TestCase):
|
|||
self.init_func.assert_called_once_with(*self.init_args,
|
||||
**self.init_kwargs)
|
||||
|
||||
# On Windows, this test kills pytest itself !
|
||||
@test_util.broken_on_windows
|
||||
def test_context_manager_with_signal(self):
|
||||
init_signals = get_signals(self.signals)
|
||||
with signal_receiver(self.signals) as signals_received:
|
||||
|
|
@ -95,6 +98,8 @@ class ErrorHandlerTest(unittest.TestCase):
|
|||
**self.init_kwargs)
|
||||
bad_func.assert_called_once_with()
|
||||
|
||||
# On Windows, this test kills pytest itself !
|
||||
@test_util.broken_on_windows
|
||||
def test_bad_recovery_with_signal(self):
|
||||
sig1 = self.signals[0]
|
||||
sig2 = self.signals[-1]
|
||||
|
|
@ -144,5 +149,10 @@ class ExitHandlerTest(ErrorHandlerTest):
|
|||
**self.init_kwargs)
|
||||
func.assert_called_once_with()
|
||||
|
||||
# On Windows, this test kills pytest itself !
|
||||
@test_util.broken_on_windows
|
||||
def test_bad_recovery_with_signal(self):
|
||||
super(ExitHandlerTest, self).test_bad_recovery_with_signal()
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ class ValidateHookTest(util.TempDirTestCase):
|
|||
from certbot.hooks import validate_hook
|
||||
return validate_hook(*args, **kwargs)
|
||||
|
||||
@util.broken_on_windows
|
||||
def test_not_executable(self):
|
||||
file_path = os.path.join(self.tempdir, "foo")
|
||||
# create a non-executable file
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from certbot import errors
|
|||
from certbot.tests import util as test_util
|
||||
|
||||
|
||||
@test_util.broken_on_windows
|
||||
class LockDirTest(test_util.TempDirTestCase):
|
||||
"""Tests for certbot.lock.lock_dir."""
|
||||
@classmethod
|
||||
|
|
@ -24,6 +25,7 @@ class LockDirTest(test_util.TempDirTestCase):
|
|||
test_util.lock_and_call(assert_raises, lock_path)
|
||||
|
||||
|
||||
@test_util.broken_on_windows
|
||||
class LockFileTest(test_util.TempDirTestCase):
|
||||
"""Tests for certbot.lock.LockFile."""
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import six
|
|||
from acme import messages
|
||||
from acme.magic_typing import Optional # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
|
|
@ -259,7 +260,7 @@ class TempHandlerTest(unittest.TestCase):
|
|||
|
||||
def test_permissions(self):
|
||||
self.assertTrue(
|
||||
util.check_permissions(self.handler.path, 0o600, os.getuid()))
|
||||
util.check_permissions(self.handler.path, 0o600, compat.os_geteuid()))
|
||||
|
||||
def test_delete(self):
|
||||
self.handler.close()
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
from __future__ import print_function
|
||||
|
||||
import itertools
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
import shutil
|
||||
|
|
@ -11,6 +12,8 @@ import traceback
|
|||
import unittest
|
||||
import datetime
|
||||
import pytz
|
||||
import tempfile
|
||||
import sys
|
||||
|
||||
import josepy as jose
|
||||
import six
|
||||
|
|
@ -588,12 +591,14 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
self.assertTrue(message in str(exc))
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_noninteractive(self):
|
||||
args = ['-n', 'certonly']
|
||||
self._cli_missing_flag(args, "specify a plugin")
|
||||
args.extend(['--standalone', '-d', 'eg.is'])
|
||||
self._cli_missing_flag(args, "register before running")
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch('certbot.main._report_new_cert')
|
||||
@mock.patch('certbot.main.client.acme_client.Client')
|
||||
@mock.patch('certbot.main._determine_account')
|
||||
|
|
@ -635,43 +640,46 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
@mock.patch('certbot.main.plug_sel.record_chosen_plugins')
|
||||
@mock.patch('certbot.main.plug_sel.pick_installer')
|
||||
def test_installer_certname(self, _inst, _rec, mock_install):
|
||||
mock_lineage = mock.MagicMock(cert_path="/tmp/cert", chain_path="/tmp/chain",
|
||||
fullchain_path="/tmp/chain",
|
||||
key_path="/tmp/privkey")
|
||||
mock_lineage = mock.MagicMock(cert_path=test_util.temp_join('cert'),
|
||||
chain_path=test_util.temp_join('chain'),
|
||||
fullchain_path=test_util.temp_join('chain'),
|
||||
key_path=test_util.temp_join('privkey'))
|
||||
|
||||
with mock.patch("certbot.cert_manager.lineage_for_certname") as mock_getlin:
|
||||
mock_getlin.return_value = mock_lineage
|
||||
self._call(['install', '--cert-name', 'whatever'], mockisfile=True)
|
||||
call_config = mock_install.call_args[0][0]
|
||||
self.assertEqual(call_config.cert_path, "/tmp/cert")
|
||||
self.assertEqual(call_config.fullchain_path, "/tmp/chain")
|
||||
self.assertEqual(call_config.key_path, "/tmp/privkey")
|
||||
self.assertEqual(call_config.cert_path, test_util.temp_join('cert'))
|
||||
self.assertEqual(call_config.fullchain_path, test_util.temp_join('chain'))
|
||||
self.assertEqual(call_config.key_path, test_util.temp_join('privkey'))
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch('certbot.main._install_cert')
|
||||
@mock.patch('certbot.main.plug_sel.record_chosen_plugins')
|
||||
@mock.patch('certbot.main.plug_sel.pick_installer')
|
||||
def test_installer_param_override(self, _inst, _rec, mock_install):
|
||||
mock_lineage = mock.MagicMock(cert_path="/tmp/cert", chain_path="/tmp/chain",
|
||||
fullchain_path="/tmp/chain",
|
||||
key_path="/tmp/privkey")
|
||||
mock_lineage = mock.MagicMock(cert_path=test_util.temp_join('cert'),
|
||||
chain_path=test_util.temp_join('chain'),
|
||||
fullchain_path=test_util.temp_join('chain'),
|
||||
key_path=test_util.temp_join('privkey'))
|
||||
with mock.patch("certbot.cert_manager.lineage_for_certname") as mock_getlin:
|
||||
mock_getlin.return_value = mock_lineage
|
||||
self._call(['install', '--cert-name', 'whatever',
|
||||
'--key-path', '/tmp/overriding_privkey'], mockisfile=True)
|
||||
'--key-path', test_util.temp_join('overriding_privkey')], mockisfile=True)
|
||||
call_config = mock_install.call_args[0][0]
|
||||
self.assertEqual(call_config.cert_path, "/tmp/cert")
|
||||
self.assertEqual(call_config.fullchain_path, "/tmp/chain")
|
||||
self.assertEqual(call_config.chain_path, "/tmp/chain")
|
||||
self.assertEqual(call_config.key_path, "/tmp/overriding_privkey")
|
||||
self.assertEqual(call_config.cert_path, test_util.temp_join('cert'))
|
||||
self.assertEqual(call_config.fullchain_path, test_util.temp_join('chain'))
|
||||
self.assertEqual(call_config.chain_path, test_util.temp_join('chain'))
|
||||
self.assertEqual(call_config.key_path, test_util.temp_join('overriding_privkey'))
|
||||
|
||||
mock_install.reset()
|
||||
|
||||
self._call(['install', '--cert-name', 'whatever',
|
||||
'--cert-path', '/tmp/overriding_cert'], mockisfile=True)
|
||||
'--cert-path', test_util.temp_join('overriding_cert')], mockisfile=True)
|
||||
call_config = mock_install.call_args[0][0]
|
||||
self.assertEqual(call_config.cert_path, "/tmp/overriding_cert")
|
||||
self.assertEqual(call_config.fullchain_path, "/tmp/chain")
|
||||
self.assertEqual(call_config.key_path, "/tmp/privkey")
|
||||
self.assertEqual(call_config.cert_path, test_util.temp_join('overriding_cert'))
|
||||
self.assertEqual(call_config.fullchain_path, test_util.temp_join('chain'))
|
||||
self.assertEqual(call_config.key_path, test_util.temp_join('privkey'))
|
||||
|
||||
@mock.patch('certbot.main.plug_sel.record_chosen_plugins')
|
||||
@mock.patch('certbot.main.plug_sel.pick_installer')
|
||||
|
|
@ -686,15 +694,17 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
@mock.patch('certbot.cert_manager.get_certnames')
|
||||
@mock.patch('certbot.main._install_cert')
|
||||
def test_installer_select_cert(self, mock_inst, mock_getcert, _inst, _rec):
|
||||
mock_lineage = mock.MagicMock(cert_path="/tmp/cert", chain_path="/tmp/chain",
|
||||
fullchain_path="/tmp/chain",
|
||||
key_path="/tmp/privkey")
|
||||
mock_lineage = mock.MagicMock(cert_path=test_util.temp_join('cert'),
|
||||
chain_path=test_util.temp_join('chain'),
|
||||
fullchain_path=test_util.temp_join('chain'),
|
||||
key_path=test_util.temp_join('privkey'))
|
||||
with mock.patch("certbot.cert_manager.lineage_for_certname") as mock_getlin:
|
||||
mock_getlin.return_value = mock_lineage
|
||||
self._call(['install'], mockisfile=True)
|
||||
self.assertTrue(mock_getcert.called)
|
||||
self.assertTrue(mock_inst.called)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch('certbot.main._report_new_cert')
|
||||
@mock.patch('certbot.util.exe_exists')
|
||||
def test_configurator_selection(self, mock_exe_exists, unused_report):
|
||||
|
|
@ -710,7 +720,8 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
# ret, _, _, _ = self._call(args)
|
||||
# self.assertTrue("Too many flags setting" in ret)
|
||||
|
||||
args = ["install", "--nginx", "--cert-path", "/tmp/blah", "--key-path", "/tmp/blah",
|
||||
args = ["install", "--nginx", "--cert-path",
|
||||
test_util.temp_join('blah'), "--key-path", test_util.temp_join('blah'),
|
||||
"--nginx-server-root", "/nonexistent/thing", "-d",
|
||||
"example.com", "--debug"]
|
||||
if "nginx" in real_plugins:
|
||||
|
|
@ -733,6 +744,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
self._call(["auth", "--standalone"])
|
||||
self.assertEqual(1, mock_certonly.call_count)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_rollback(self):
|
||||
_, _, _, client = self._call(['rollback'])
|
||||
self.assertEqual(1, client.rollback.call_count)
|
||||
|
|
@ -760,6 +772,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
self._call_no_clientmock(['delete'])
|
||||
self.assertEqual(1, mock_cert_manager.call_count)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_plugins(self):
|
||||
flags = ['--init', '--prepare', '--authenticators', '--installers']
|
||||
for args in itertools.chain(
|
||||
|
|
@ -1014,7 +1027,8 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
# The location of the previous live privkey.pem is passed
|
||||
# to obtain_certificate
|
||||
mock_client.obtain_certificate.assert_called_once_with(['isnot.org'],
|
||||
os.path.join(self.config.config_dir, "live/sample-renewal/privkey.pem"))
|
||||
os.path.normpath(os.path.join(
|
||||
self.config.config_dir, "live/sample-renewal/privkey.pem")))
|
||||
else:
|
||||
mock_client.obtain_certificate.assert_called_once_with(['isnot.org'], None)
|
||||
else:
|
||||
|
|
@ -1039,6 +1053,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
self.assertTrue('fullchain.pem' in cert_msg)
|
||||
self.assertTrue('donate' in get_utility().add_message.call_args[0][0])
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch('certbot.crypto_util.notAfter')
|
||||
def test_certonly_renewal_triggers(self, unused_notafter):
|
||||
# --dry-run should force renewal
|
||||
|
|
@ -1087,6 +1102,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
self.assertTrue('No renewals were attempted.' in stdout.getvalue())
|
||||
self.assertTrue('The following certs are not due for renewal yet:' in stdout.getvalue())
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_quiet_renew(self):
|
||||
test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf')
|
||||
args = ["renew", "--dry-run"]
|
||||
|
|
@ -1204,7 +1220,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
renewalparams = {'authenticator': 'webroot'}
|
||||
self._test_renew_common(
|
||||
renewalparams=renewalparams, assert_oc_called=True,
|
||||
args=['renew', '--webroot-map', '{"example.com": "/tmp"}'])
|
||||
args=['renew', '--webroot-map', json.dumps({'example.com': tempfile.gettempdir()})])
|
||||
|
||||
def test_renew_reconstitute_error(self):
|
||||
# pylint: disable=protected-access
|
||||
|
|
@ -1234,7 +1250,9 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
def test_no_renewal_with_hooks(self):
|
||||
_, _, stdout = self._test_renewal_common(
|
||||
due_for_renewal=False, extra_args=None, should_renew=False,
|
||||
args=['renew', '--post-hook', 'echo hello world'])
|
||||
args=['renew', '--post-hook',
|
||||
'{0} -c "from __future__ import print_function; print(\'hello world\');"'
|
||||
.format(sys.executable)])
|
||||
self.assertTrue('No hooks were run.' in stdout.getvalue())
|
||||
|
||||
@test_util.patch_get_utility()
|
||||
|
|
@ -1254,13 +1272,19 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
chain = 'chain'
|
||||
mock_client = mock.MagicMock()
|
||||
mock_client.obtain_certificate_from_csr.return_value = (certr, chain)
|
||||
cert_path = '/etc/letsencrypt/live/example.com/cert_512.pem'
|
||||
full_path = '/etc/letsencrypt/live/example.com/fullchain.pem'
|
||||
cert_path = os.path.normpath(os.path.join(
|
||||
self.config.config_dir,
|
||||
'live/example.com/cert_512.pem'))
|
||||
full_path = os.path.normpath(os.path.join(
|
||||
self.config.config_dir,
|
||||
'live/example.com/fullchain.pem'))
|
||||
mock_client.save_certificate.return_value = cert_path, None, full_path
|
||||
with mock.patch('certbot.main._init_le_client') as mock_init:
|
||||
mock_init.return_value = mock_client
|
||||
with test_util.patch_get_utility() as mock_get_utility:
|
||||
chain_path = '/etc/letsencrypt/live/example.com/chain.pem'
|
||||
chain_path = os.path.normpath(os.path.join(
|
||||
self.config.config_dir,
|
||||
'live/example.com/chain.pem'))
|
||||
args = ('-a standalone certonly --csr {0} --cert-path {1} '
|
||||
'--chain-path {2} --fullchain-path {3}').format(
|
||||
CSR, cert_path, chain_path, full_path).split()
|
||||
|
|
@ -1334,6 +1358,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
self._call(['-c', test_util.vector_path('cli.ini')])
|
||||
self.assertTrue(mocked_run.called)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_register(self):
|
||||
with mock.patch('certbot.main.client') as mocked_client:
|
||||
acc = mock.MagicMock()
|
||||
|
|
@ -1627,7 +1652,7 @@ class EnhanceTest(test_util.ConfigTestCase):
|
|||
mock_lineage.return_value = mock.MagicMock(chain_path="/tmp/nonexistent")
|
||||
self._call(['enhance', '--auto-hsts'])
|
||||
self.assertTrue(self.mockinstaller.enable_autohsts.called)
|
||||
self.assertEquals(self.mockinstaller.enable_autohsts.call_args[0][1],
|
||||
self.assertEqual(self.mockinstaller.enable_autohsts.call_args[0][1],
|
||||
["example.com", "another.tld"])
|
||||
|
||||
@mock.patch('certbot.cert_manager.lineage_for_certname')
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ class RenewUpdaterTest(test_util.ConfigTestCase):
|
|||
self.config.dry_run = True
|
||||
updater.run_generic_updaters(self.config, None, None)
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertEquals(mock_log.call_args[0][0],
|
||||
self.assertEqual(mock_log.call_args[0][0],
|
||||
"Skipping updaters in dry-run mode.")
|
||||
|
||||
@mock.patch("certbot.updater.logger.debug")
|
||||
|
|
@ -61,7 +61,7 @@ class RenewUpdaterTest(test_util.ConfigTestCase):
|
|||
self.config.dry_run = True
|
||||
updater.run_renewal_deployer(self.config, None, None)
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertEquals(mock_log.call_args[0][0],
|
||||
self.assertEqual(mock_log.call_args[0][0],
|
||||
"Skipping renewal deployer in dry-run mode.")
|
||||
|
||||
@mock.patch('certbot.plugins.selection.get_unprepared_installer')
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
|
|||
x = f.read()
|
||||
self.assertTrue("No changes" in x)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_basic_add_to_temp_checkpoint(self):
|
||||
# These shouldn't conflict even though they are both named config.txt
|
||||
self.reverter.add_to_temp_checkpoint(self.sets[0], "save1")
|
||||
|
|
@ -91,6 +92,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
|
|||
self.assertRaises(errors.ReverterError, self.reverter.add_to_checkpoint,
|
||||
set([config3]), "invalid save")
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_multiple_saves_and_temp_revert(self):
|
||||
self.reverter.add_to_temp_checkpoint(self.sets[0], "save1")
|
||||
update_file(self.config1, "updated-directive")
|
||||
|
|
@ -120,6 +122,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
|
|||
self.assertFalse(os.path.isfile(config3))
|
||||
self.assertFalse(os.path.isfile(config4))
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_multiple_registration_same_file(self):
|
||||
self.reverter.register_file_creation(True, self.config1)
|
||||
self.reverter.register_file_creation(True, self.config1)
|
||||
|
|
@ -144,6 +147,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
|
|||
errors.ReverterError, self.reverter.register_file_creation,
|
||||
"filepath")
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_register_undo_command(self):
|
||||
coms = [
|
||||
["a2dismod", "ssl"],
|
||||
|
|
@ -166,6 +170,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
|
|||
errors.ReverterError, self.reverter.register_undo_command,
|
||||
True, ["command"])
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch("certbot.util.run_script")
|
||||
def test_run_undo_commands(self, mock_run):
|
||||
mock_run.side_effect = ["", errors.SubprocessError]
|
||||
|
|
@ -229,6 +234,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
|
|||
self.assertRaises(
|
||||
errors.ReverterError, self.reverter.revert_temporary_config)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch("certbot.reverter.logger.warning")
|
||||
def test_recover_checkpoint_missing_new_files(self, mock_warn):
|
||||
self.reverter.register_file_creation(
|
||||
|
|
@ -243,6 +249,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
|
|||
self.assertRaises(
|
||||
errors.ReverterError, self.reverter.revert_temporary_config)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_recovery_routine_temp_and_perm(self):
|
||||
# Register a new perm checkpoint file
|
||||
config3 = os.path.join(self.dir1, "config3.txt")
|
||||
|
|
@ -306,6 +313,7 @@ class TestFullCheckpointsReverter(test_util.ConfigTestCase):
|
|||
self.assertRaises(
|
||||
errors.ReverterError, self.reverter.rollback_checkpoints, "one")
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_rollback_finalize_checkpoint_valid_inputs(self):
|
||||
|
||||
config3 = self._setup_three_checkpoints()
|
||||
|
|
@ -357,6 +365,7 @@ class TestFullCheckpointsReverter(test_util.ConfigTestCase):
|
|||
self.assertRaises(
|
||||
errors.ReverterError, self.reverter.finalize_checkpoint, "Title")
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch("certbot.reverter.logger")
|
||||
def test_rollback_too_many(self, mock_logger):
|
||||
# Test no exist warning...
|
||||
|
|
@ -369,6 +378,7 @@ class TestFullCheckpointsReverter(test_util.ConfigTestCase):
|
|||
self.reverter.rollback_checkpoints(4)
|
||||
self.assertEqual(mock_logger.warning.call_count, 1)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
def test_multi_rollback(self):
|
||||
config3 = self._setup_three_checkpoints()
|
||||
self.reverter.rollback_checkpoints(3)
|
||||
|
|
|
|||
|
|
@ -480,6 +480,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self.assertTrue(self.test_rc.should_autorenew())
|
||||
mock_ocsp.return_value = False
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch("certbot.storage.relevant_values")
|
||||
def test_save_successor(self, mock_rv):
|
||||
# Mock relevant_values() to claim that all values are relevant here
|
||||
|
|
@ -625,6 +626,8 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
self.assertTrue(result._consistent())
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.config.renewal_configs_dir, "the-lineage.com.conf")))
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.config.live_dir, "README")))
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.config.live_dir, "the-lineage.com", "README")))
|
||||
with open(result.fullchain, "rb") as f:
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import pkg_resources
|
|||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
import sys
|
||||
import warnings
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
|
@ -36,8 +38,15 @@ def vector_path(*names):
|
|||
def load_vector(*names):
|
||||
"""Load contents of a test vector."""
|
||||
# luckily, resource_string opens file in binary mode
|
||||
return pkg_resources.resource_string(
|
||||
data = pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', *names))
|
||||
# Try at most to convert CRLF to LF when data is text
|
||||
try:
|
||||
return data.decode().replace('\r\n', '\n').encode()
|
||||
except ValueError:
|
||||
# Failed to process the file with standard encoding.
|
||||
# Most likely not a text file, return its bytes untouched.
|
||||
return data
|
||||
|
||||
|
||||
def _guess_loader(filename, loader_pem, loader_der):
|
||||
|
|
@ -314,10 +323,21 @@ class TempDirTestCase(unittest.TestCase):
|
|||
"""Base test class which sets up and tears down a temporary directory"""
|
||||
|
||||
def setUp(self):
|
||||
"""Execute before test"""
|
||||
self.tempdir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir)
|
||||
"""Execute after test"""
|
||||
# Then we have various files which are not correctly closed at the time of tearDown.
|
||||
# On Windows, it is visible for the same reasons as above.
|
||||
# For know, we log them until a proper file close handling is written.
|
||||
def onerror_handler(_, path, excinfo):
|
||||
"""On error handler"""
|
||||
message = ('Following error occurred when deleting the tempdir {0}'
|
||||
' for path {1} during tearDown process: {2}'
|
||||
.format(self.tempdir, path, str(excinfo)))
|
||||
warnings.warn(message)
|
||||
shutil.rmtree(self.tempdir, onerror=onerror_handler)
|
||||
|
||||
class ConfigTestCase(TempDirTestCase):
|
||||
"""Test class which sets up a NamespaceConfig object.
|
||||
|
|
@ -378,3 +398,25 @@ def hold_lock(cv, lock_path): # pragma: no cover
|
|||
cv.notify()
|
||||
cv.wait()
|
||||
my_lock.release()
|
||||
|
||||
def skip_on_windows(reason):
|
||||
"""Decorator to skip permanently a test on Windows. A reason is required."""
|
||||
def wrapper(function):
|
||||
"""Wrapped version"""
|
||||
return unittest.skipIf(sys.platform == 'win32', reason)(function)
|
||||
return wrapper
|
||||
|
||||
def broken_on_windows(function):
|
||||
"""Decorator to skip temporarily a broken test on Windows."""
|
||||
reason = 'Test is broken and ignored on windows but should be fixed.'
|
||||
return unittest.skipIf(
|
||||
sys.platform == 'win32'
|
||||
and os.environ.get('SKIP_BROKEN_TESTS_ON_WINDOWS', 'true') == 'true',
|
||||
reason)(function)
|
||||
|
||||
def temp_join(path):
|
||||
"""
|
||||
Return the given path joined to the tempdir path for the current platform
|
||||
Eg.: 'cert' => /tmp/cert (Linux) or 'C:\\Users\\currentuser\\AppData\\Temp\\cert' (Windows)
|
||||
"""
|
||||
return os.path.join(tempfile.gettempdir(), path)
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ import argparse
|
|||
import errno
|
||||
import os
|
||||
import shutil
|
||||
import stat
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
|
@ -89,6 +88,7 @@ class LockDirUntilExit(test_util.TempDirTestCase):
|
|||
import certbot.util
|
||||
reload_module(certbot.util)
|
||||
|
||||
@test_util.broken_on_windows
|
||||
@mock.patch('certbot.util.logger')
|
||||
@mock.patch('certbot.util.atexit_register')
|
||||
def test_it(self, mock_register, mock_logger):
|
||||
|
|
@ -140,9 +140,9 @@ class MakeOrVerifyDirTest(test_util.TempDirTestCase):
|
|||
super(MakeOrVerifyDirTest, self).setUp()
|
||||
|
||||
self.path = os.path.join(self.tempdir, "foo")
|
||||
os.mkdir(self.path, 0o400)
|
||||
os.mkdir(self.path, 0o600)
|
||||
|
||||
self.uid = os.getuid()
|
||||
self.uid = compat.os_geteuid()
|
||||
|
||||
def _call(self, directory, mode):
|
||||
from certbot.util import make_or_verify_dir
|
||||
|
|
@ -152,14 +152,15 @@ class MakeOrVerifyDirTest(test_util.TempDirTestCase):
|
|||
path = os.path.join(self.tempdir, "bar")
|
||||
self._call(path, 0o650)
|
||||
self.assertTrue(os.path.isdir(path))
|
||||
self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o650)
|
||||
self.assertTrue(compat.compare_file_modes(os.stat(path).st_mode, 0o650))
|
||||
|
||||
def test_existing_correct_mode_does_not_fail(self):
|
||||
self._call(self.path, 0o400)
|
||||
self.assertEqual(stat.S_IMODE(os.stat(self.path).st_mode), 0o400)
|
||||
self._call(self.path, 0o600)
|
||||
self.assertTrue(compat.compare_file_modes(os.stat(self.path).st_mode, 0o600))
|
||||
|
||||
@test_util.skip_on_windows('Umask modes are mostly ignored on Windows.')
|
||||
def test_existing_wrong_mode_fails(self):
|
||||
self.assertRaises(errors.Error, self._call, self.path, 0o600)
|
||||
self.assertRaises(errors.Error, self._call, self.path, 0o400)
|
||||
|
||||
def test_reraises_os_error(self):
|
||||
with mock.patch.object(os, "makedirs") as makedirs:
|
||||
|
|
@ -178,7 +179,7 @@ class CheckPermissionsTest(test_util.TempDirTestCase):
|
|||
def setUp(self):
|
||||
super(CheckPermissionsTest, self).setUp()
|
||||
|
||||
self.uid = os.getuid()
|
||||
self.uid = compat.os_geteuid()
|
||||
|
||||
def _call(self, mode):
|
||||
from certbot.util import check_permissions
|
||||
|
|
@ -212,8 +213,8 @@ class UniqueFileTest(test_util.TempDirTestCase):
|
|||
self.assertEqual(open(name).read(), "bar")
|
||||
|
||||
def test_right_mode(self):
|
||||
self.assertEqual(0o700, os.stat(self._call(0o700)[1]).st_mode & 0o777)
|
||||
self.assertEqual(0o100, os.stat(self._call(0o100)[1]).st_mode & 0o777)
|
||||
self.assertTrue(compat.compare_file_modes(0o700, os.stat(self._call(0o700)[1]).st_mode))
|
||||
self.assertTrue(compat.compare_file_modes(0o600, os.stat(self._call(0o600)[1]).st_mode))
|
||||
|
||||
def test_default_exists(self):
|
||||
name1 = self._call()[1] # create 0000_foo.txt
|
||||
|
|
@ -513,17 +514,16 @@ class OsInfoTest(unittest.TestCase):
|
|||
|
||||
def test_systemd_os_release(self):
|
||||
from certbot.util import (get_os_info, get_systemd_os_info,
|
||||
get_os_info_ua)
|
||||
get_os_info_ua)
|
||||
|
||||
with mock.patch('os.path.isfile', return_value=True):
|
||||
self.assertEqual(get_os_info(
|
||||
test_util.vector_path("os-release"))[0], 'systemdos')
|
||||
self.assertEqual(get_os_info(
|
||||
test_util.vector_path("os-release"))[1], '42')
|
||||
self.assertEqual(get_systemd_os_info("/dev/null"), ("", ""))
|
||||
self.assertEqual(get_systemd_os_info(os.devnull), ("", ""))
|
||||
self.assertEqual(get_os_info_ua(
|
||||
test_util.vector_path("os-release")),
|
||||
"SystemdOS")
|
||||
test_util.vector_path("os-release")), "SystemdOS")
|
||||
with mock.patch('os.path.isfile', return_value=False):
|
||||
self.assertEqual(get_systemd_os_info(), ("", ""))
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import platform
|
|||
import re
|
||||
import six
|
||||
import socket
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
|
@ -21,6 +20,7 @@ from collections import OrderedDict
|
|||
import configargparse
|
||||
|
||||
from acme.magic_typing import Tuple, Union # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import lock
|
||||
|
|
@ -204,7 +204,7 @@ def check_permissions(filepath, mode, uid=0):
|
|||
|
||||
"""
|
||||
file_stat = os.stat(filepath)
|
||||
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
|
||||
return compat.compare_file_modes(file_stat.st_mode, mode) and file_stat.st_uid == uid
|
||||
|
||||
|
||||
def safe_open(path, mode="w", chmod=None, buffering=None):
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ Get Certbot
|
|||
About Certbot
|
||||
=============
|
||||
|
||||
*Certbot is meant to be run directly on a web server*, normally by a system administrator. In most cases, running Certbot on your personal computer is not a useful option. The instructions below relate to installing and running Certbot on a server.
|
||||
|
||||
Certbot is packaged for many common operating systems and web servers. Check whether
|
||||
``certbot`` (or ``letsencrypt``) is packaged for your web server's OS by visiting
|
||||
certbot.eff.org_, where you will also find the correct installation instructions for
|
||||
|
|
|
|||
|
|
@ -988,9 +988,6 @@ Getting help
|
|||
If you're having problems, we recommend posting on the Let's Encrypt
|
||||
`Community Forum <https://community.letsencrypt.org>`_.
|
||||
|
||||
You can also chat with us on IRC: `(#letsencrypt @
|
||||
freenode) <https://webchat.freenode.net?channels=%23letsencrypt>`_
|
||||
|
||||
If you find a bug in the software, please do report it in our `issue
|
||||
tracker <https://github.com/certbot/certbot/issues>`_. Remember to
|
||||
give us as much information as possible:
|
||||
|
|
|
|||
4
pytest.ini
Normal file
4
pytest.ini
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
[pytest]
|
||||
addopts = --numprocesses auto --pyargs
|
||||
filterwarnings =
|
||||
error::DeprecationWarning
|
||||
|
|
@ -25,6 +25,7 @@ certbot_test_no_force_renew () {
|
|||
omit_patterns="*/*.egg-info/*,*/dns_common*,*/setup.py,*/test_*,*/tests/*"
|
||||
omit_patterns="$omit_patterns,*_test.py,*_test_*,certbot-apache/*"
|
||||
omit_patterns="$omit_patterns,certbot-compatibility-test/*,certbot-dns*/"
|
||||
omit_patterns="$omit_patterns,certbot-nginx/certbot_nginx/parser_obj.py"
|
||||
coverage run \
|
||||
--append \
|
||||
--source $sources \
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ fi
|
|||
|
||||
temp_cwd=$(mktemp -d)
|
||||
trap "rm -rf $temp_cwd" EXIT
|
||||
cp pytest.ini "$temp_cwd"
|
||||
|
||||
set -x
|
||||
for requirement in "$@" ; do
|
||||
|
|
@ -24,6 +25,6 @@ for requirement in "$@" ; do
|
|||
pkg="certbot"
|
||||
fi
|
||||
cd "$temp_cwd"
|
||||
pytest --numprocesses auto --quiet --pyargs $pkg
|
||||
pytest --quiet $pkg
|
||||
cd -
|
||||
done
|
||||
|
|
|
|||
13
tox-win.ini
Normal file
13
tox-win.ini
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
[tox]
|
||||
skipsdist = True
|
||||
envlist = py{34,35,36,37}-cover
|
||||
|
||||
[testenv]
|
||||
deps = -e acme[dev]
|
||||
-e .[dev]
|
||||
commands = pytest -n auto --pyargs acme
|
||||
pytest -n auto --pyargs certbot
|
||||
|
||||
[testenv:cover]
|
||||
commands = pytest -n auto --cov acme --pyargs acme
|
||||
pytest -n auto --cov certbot --cov-append --pyargs certbot
|
||||
|
|
@ -61,7 +61,7 @@ cover () {
|
|||
fi
|
||||
|
||||
pkg_dir=$(echo "$1" | tr _ -)
|
||||
pytest --cov "$pkg_dir" --cov-append --cov-report= --numprocesses "auto" --pyargs "$1"
|
||||
pytest --cov "$pkg_dir" --cov-append --cov-report= "$1"
|
||||
coverage report --fail-under="$min" --include="$pkg_dir/*" --show-missing
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue