Merge pull request #3046 from certbot/csr++

Use CN from CSRs
This commit is contained in:
Noah Swartz 2016-05-24 22:18:01 -07:00
commit b4f7b1cfab
7 changed files with 201 additions and 91 deletions

View file

@ -6,10 +6,8 @@ import logging
import logging.handlers
import os
import sys
import traceback
import configargparse
import OpenSSL
import six
import certbot
@ -336,36 +334,41 @@ class HelpfulArgumentParser(object):
# Do any post-parsing homework here
if self.verb == "renew":
parsed_args.noninteractive_mode = True
if parsed_args.staging or parsed_args.dry_run:
if parsed_args.server not in (flag_default("server"), constants.STAGING_URI):
conflicts = ["--staging"] if parsed_args.staging else []
conflicts += ["--dry-run"] if parsed_args.dry_run else []
raise errors.Error("--server value conflicts with {0}".format(
" and ".join(conflicts)))
parsed_args.server = constants.STAGING_URI
if parsed_args.dry_run:
if self.verb not in ["certonly", "renew"]:
raise errors.Error("--dry-run currently only works with the "
"'certonly' or 'renew' subcommands (%r)" % self.verb)
parsed_args.break_my_certs = parsed_args.staging = True
if glob.glob(os.path.join(parsed_args.config_dir, constants.ACCOUNTS_DIR, "*")):
# The user has a prod account, but might not have a staging
# one; we don't want to start trying to perform interactive registration
parsed_args.tos = True
parsed_args.register_unsafely_without_email = True
self.set_test_server(parsed_args)
if parsed_args.csr:
if parsed_args.allow_subset_of_names:
raise errors.Error("--allow-subset-of-names "
"cannot be used with --csr")
self.handle_csr(parsed_args)
hooks.validate_hooks(parsed_args)
return parsed_args
def set_test_server(self, parsed_args):
"""We have --staging/--dry-run; perform sanity check and set config.server"""
if parsed_args.server not in (flag_default("server"), constants.STAGING_URI):
conflicts = ["--staging"] if parsed_args.staging else []
conflicts += ["--dry-run"] if parsed_args.dry_run else []
raise errors.Error("--server value conflicts with {0}".format(
" and ".join(conflicts)))
parsed_args.server = constants.STAGING_URI
if parsed_args.dry_run:
if self.verb not in ["certonly", "renew"]:
raise errors.Error("--dry-run currently only works with the "
"'certonly' or 'renew' subcommands (%r)" % self.verb)
parsed_args.break_my_certs = parsed_args.staging = True
if glob.glob(os.path.join(parsed_args.config_dir, constants.ACCOUNTS_DIR, "*")):
# The user has a prod account, but might not have a staging
# one; we don't want to start trying to perform interactive registration
parsed_args.tos = True
parsed_args.register_unsafely_without_email = True
def handle_csr(self, parsed_args):
"""Process a --csr flag."""
if parsed_args.verb != "certonly":
@ -373,21 +376,11 @@ class HelpfulArgumentParser(object):
"when obtaining a new or replacement "
"via the certonly command. Please try the "
"certonly command instead.")
if parsed_args.allow_subset_of_names:
raise errors.Error("--allow-subset-of-names cannot be used with --csr")
try:
csr = le_util.CSR(file=parsed_args.csr[0], data=parsed_args.csr[1], form="der")
typ = OpenSSL.crypto.FILETYPE_ASN1
domains = crypto_util.get_sans_from_csr(csr.data, OpenSSL.crypto.FILETYPE_ASN1)
except OpenSSL.crypto.Error:
try:
e1 = traceback.format_exc()
typ = OpenSSL.crypto.FILETYPE_PEM
csr = le_util.CSR(file=parsed_args.csr[0], data=parsed_args.csr[1], form="pem")
domains = crypto_util.get_sans_from_csr(csr.data, typ)
except OpenSSL.crypto.Error:
logger.debug("DER CSR parse error %s", e1)
logger.debug("PEM CSR parse error %s", traceback.format_exc())
raise errors.Error("Failed to parse CSR file: {0}".format(parsed_args.csr[0]))
csrfile, contents = parsed_args.csr[0:2]
typ, csr, domains = crypto_util.import_csr_file(csrfile, contents)
# This is not necessary for webroot to work, however,
# obtain_certificate_from_csr requires parsed_args.domains to be set

View file

@ -6,6 +6,7 @@
"""
import logging
import os
import traceback
import OpenSSL
import pyrfc3339
@ -179,6 +180,30 @@ def csr_matches_pubkey(csr, privkey):
return False
def import_csr_file(csrfile, data):
"""Import a CSR file, which can be either PEM or DER.
:param str csrfile: CSR filename
:param str data: contents of the CSR file
:returns: (`OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`,
le_util.CSR object representing the CSR,
list of domains requested in the CSR)
:rtype: tuple
"""
for form, typ in (("der", OpenSSL.crypto.FILETYPE_ASN1,),
("pem", OpenSSL.crypto.FILETYPE_PEM,),):
try:
domains = get_names_from_csr(data, typ)
except OpenSSL.crypto.Error:
logger.debug("CSR parse error (form=%s, typ=%s):", form, typ)
logger.debug(traceback.format_exc())
continue
return typ, le_util.CSR(file=csrfile, data=data, form=form), domains
raise errors.Error("Failed to parse CSR file: {0}".format(csrfile))
def make_key(bits):
"""Generate PEM encoded RSA key.
@ -228,15 +253,20 @@ def pyopenssl_load_certificate(data):
str(error) for error in openssl_errors)))
def _get_sans_from_cert_or_req(cert_or_req_str, load_func,
typ=OpenSSL.crypto.FILETYPE_PEM):
def _load_cert_or_req(cert_or_req_str, load_func,
typ=OpenSSL.crypto.FILETYPE_PEM):
try:
cert_or_req = load_func(typ, cert_or_req_str)
return load_func(typ, cert_or_req_str)
except OpenSSL.crypto.Error as error:
logger.exception(error)
raise
def _get_sans_from_cert_or_req(cert_or_req_str, load_func,
typ=OpenSSL.crypto.FILETYPE_PEM):
# pylint: disable=protected-access
return acme_crypto_util._pyopenssl_cert_or_req_san(cert_or_req)
return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req(
cert_or_req_str, load_func, typ))
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
@ -267,6 +297,25 @@ def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
csr, OpenSSL.crypto.load_certificate_request, typ)
def get_names_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get a list of domains from a CSR, including the CN if it is set.
:param str csr: CSR (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:returns: A list of domain names.
:rtype: list
"""
loaded_csr = _load_cert_or_req(
csr, OpenSSL.crypto.load_certificate_request, typ)
# Use a set to avoid duplication with CN and Subject Alt Names
domains = set(d for d in (loaded_csr.get_subject().CN,) if d is not None)
# pylint: disable=protected-access
domains.update(acme_crypto_util._pyopenssl_cert_or_req_san(loaded_csr))
return list(domains)
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle.

View file

@ -681,9 +681,6 @@ def main(cli_args=sys.argv[1:]):
displayer = display_util.NoninteractiveDisplay(sys.stdout)
elif config.text_mode:
displayer = display_util.FileDisplay(sys.stdout)
elif config.verb == "renew":
config.noninteractive_mode = True
displayer = display_util.NoninteractiveDisplay(sys.stdout)
else:
displayer = display_util.NcursesDisplay()
zope.component.provideUtility(displayer)

View file

@ -349,8 +349,9 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
['-d', '204.11.231.35'])
def test_csr_with_besteffort(self):
args = ["--csr", CSR, "--allow-subset-of-names"]
self.assertRaises(errors.Error, self._call, args)
self.assertRaises(
errors.Error, self._call,
'certonly --csr {0} --allow-subset-of-names'.format(CSR).split())
def test_run_with_csr(self):
# This is an error because you can only use --csr with certonly
@ -361,6 +362,17 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
return
assert False, "Expected supplying --csr to fail with default verb"
def test_csr_with_no_domains(self):
self.assertRaises(
errors.Error, self._call,
'certonly --csr {0}'.format(
test_util.vector_path('csr-nonames.pem')).split())
def test_csr_with_inconsistent_domains(self):
self.assertRaises(
errors.Error, self._call,
'certonly -d example.org --csr {0}'.format(CSR).split())
def _get_argument_parser(self):
plugins = disco.PluginsRegistry.find_all()
return functools.partial(cli.prepare_and_parse_args, plugins)

View file

@ -134,57 +134,39 @@ class ClientTest(unittest.TestCase):
self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr)
# FIXME move parts of this to test_cli.py...
@mock.patch("certbot.client.logger")
def test_obtain_certificate_from_csr(self, mock_logger):
self._mock_obtain_certificate()
from certbot import cli
test_csr = le_util.CSR(form="der", file=None, data=CSR_SAN)
mock_parsed_args = mock.MagicMock()
# The CLI should believe that this is a certonly request, because
# a CSR would not be allowed with other kinds of requests!
mock_parsed_args.verb = "certonly"
with mock.patch("certbot.client.le_util.CSR") as mock_CSR:
mock_CSR.return_value = test_csr
mock_parsed_args.domains = self.eg_domains[:]
mock_parser = mock.MagicMock(cli.HelpfulArgumentParser)
cli.HelpfulArgumentParser.handle_csr(mock_parser, mock_parsed_args)
auth_handler = self.client.auth_handler
# Now provoke an inconsistent domains error...
mock_parsed_args.domains.append("hippopotamus.io")
self.assertRaises(errors.ConfigurationError,
cli.HelpfulArgumentParser.handle_csr, mock_parser, mock_parsed_args)
authzr = self.client.auth_handler.get_authorizations(self.eg_domains, False)
self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
self.eg_domains,
test_csr,
authzr=authzr))
# and that the cert was obtained correctly
self._check_obtain_certificate()
# Test for authzr=None
self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
self.eg_domains,
test_csr,
authzr=None))
self.client.auth_handler.get_authorizations.assert_called_with(
self.eg_domains)
# Test for no auth_handler
self.client.auth_handler = None
self.assertRaises(
errors.Error,
self.client.obtain_certificate_from_csr,
authzr = auth_handler.get_authorizations(self.eg_domains, False)
self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
self.eg_domains,
test_csr)
mock_logger.warning.assert_called_once_with(mock.ANY)
test_csr,
authzr=authzr))
# and that the cert was obtained correctly
self._check_obtain_certificate()
# Test for authzr=None
self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
self.eg_domains,
test_csr,
authzr=None))
auth_handler.get_authorizations.assert_called_with(self.eg_domains)
# Test for no auth_handler
self.client.auth_handler = None
self.assertRaises(
errors.Error,
self.client.obtain_certificate_from_csr,
self.eg_domains,
test_csr)
mock_logger.warning.assert_called_once_with(mock.ANY)
@mock.patch("certbot.client.crypto_util")
def test_obtain_certificate(self, mock_crypto_util):

View file

@ -10,6 +10,7 @@ import zope.component
from certbot import errors
from certbot import interfaces
from certbot import le_util
from certbot.tests import test_util
@ -159,6 +160,44 @@ class CSRMatchesPubkeyTest(unittest.TestCase):
test_util.load_vector('csr.pem'), RSA256_KEY))
class ImportCSRFileTest(unittest.TestCase):
"""Tests for certbot.certbot_util.import_csr_file."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.crypto_util import import_csr_file
return import_csr_file(*args, **kwargs)
def test_der_csr(self):
csrfile = test_util.vector_path('csr.der')
data = test_util.load_vector('csr.der')
self.assertEqual(
(OpenSSL.crypto.FILETYPE_ASN1,
le_util.CSR(file=csrfile,
data=data,
form="der"),
["example.com"],),
self._call(csrfile, data))
def test_pem_csr(self):
csrfile = test_util.vector_path('csr.pem')
data = test_util.load_vector('csr.pem')
self.assertEqual(
(OpenSSL.crypto.FILETYPE_PEM,
le_util.CSR(file=csrfile,
data=data,
form="pem"),
["example.com"],),
self._call(csrfile, data))
def test_bad_csr(self):
self.assertRaises(errors.Error, self._call,
test_util.vector_path('cert.pem'),
test_util.load_vector('cert.pem'))
class MakeKeyTest(unittest.TestCase): # pylint: disable=too-few-public-methods
"""Tests for certbot.crypto_util.make_key."""
@ -234,6 +273,36 @@ class GetSANsFromCSRTest(unittest.TestCase):
[], self._call(test_util.load_vector('csr-nosans.pem')))
class GetNamesFromCSRTest(unittest.TestCase):
"""Tests for certbot.crypto_util.get_names_from_csr."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.crypto_util import get_names_from_csr
return get_names_from_csr(*args, **kwargs)
def test_extract_one_san(self):
self.assertEqual(['example.com'], self._call(
test_util.load_vector('csr.pem')))
def test_extract_two_sans(self):
self.assertEqual(set(('example.com', 'www.example.com',)), set(
self._call(test_util.load_vector('csr-san.pem'))))
def test_extract_six_sans(self):
self.assertEqual(
set(self._call(test_util.load_vector('csr-6sans.pem'))),
set(("example.com", "example.org", "example.net",
"example.info", "subdomain.example.com",
"other.subdomain.example.com",)))
def test_parse_non_csr(self):
self.assertRaises(OpenSSL.crypto.Error, self._call, "hello there")
def test_parse_no_sans(self):
self.assertEqual(["example.org"],
self._call(test_util.load_vector('csr-nosans.pem')))
class CertLoaderTest(unittest.TestCase):
"""Tests for certbot.crypto_util.pyopenssl_load_certificate"""

View file

@ -0,0 +1,8 @@
-----BEGIN CERTIFICATE REQUEST-----
MIH/MIGqAgEAMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwXDANBgkqhkiG9w0BAQEF
AANLADBIAkEArHVztFHtH92ucFJD/N/HW9AsdRsUuHUBBBDlHwNlRd3fp580rv2+
6QWE30cWgdmJS86ObRz6lUTor4R0T+3C5QIDAQABoAAwDQYJKoZIhvcNAQELBQAD
QQBt9XLSZ9DGfWcGGaBUTCiSY7lWBegpNlCeo8pK3ydWmKpjcza+j7lF5paph2LH
lKWVQ8+xwYMscGWK0NApHGco
-----END CERTIFICATE REQUEST-----