git push origin masterMerge branch 'csr_utility_functions'

This commit is contained in:
James Kasten 2014-11-24 18:59:43 -08:00
commit 316e686ed5
2 changed files with 137 additions and 22 deletions

View file

@ -47,13 +47,7 @@ class Client(object):
self.csr_file = cert_signing_request
self.key_file = private_key
# If CSR is provided, the private key should also be provided.
# TODO: Make sure key was actually used in CSR
# TODO: Make sure key has proper permissions
if self.csr_file and not self.key_file:
logger.fatal("Please provide the private key file used in \
generating the provided CSR")
sys.exit(1)
self.__validate_csr_key_cli()
self.server_url = "https://%s/acme/" % self.server
@ -98,6 +92,9 @@ class Client(object):
# Get key and csr to perform challenges
_, csr_der = self.get_key_csr_pem()
if not crypto_util.csr_matches_names(self.csr_file, self.names):
raise Exception("CSR subject does not contain the specified name")
# Perform Challenges
responses, challenge_objs = self.verify_identity(challenge_msg)
# Get Authorization
@ -595,10 +592,8 @@ class Client(object):
def get_key_csr_pem(self, csr_return_format='der'):
"""
Returns key and CSR using provided files or generating new files if
necessary. Both will be saved in pem format on the filesystem.
The CSR can optionally be returned in DER format as the CSR cannot be
loaded back into M2Crypto.
"""
necessary. Both will be saved in PEM format on the filesystem.
The CSR can optionally be returned in DER format."""
key_pem = None
csr_pem = None
if not self.key_file:
@ -627,17 +622,51 @@ class Client(object):
csr_f.close()
logger.info("Creating CSR: %s" % self.csr_file)
else:
# TODO fix this der situation
try:
csr_pem = open(self.csr_file).read().replace("\r", "")
csr = M2Crypto.X509.load_request(self.csr_file)
csr_pem, csr_der = csr.as_pem(), csr.as_der()
except:
logger.fatal("Unable to open CSR file: %s" % self.csr_file)
sys.exit(1)
if csr_return_format == 'der':
return key_pem, csr_der
else:
return key_pem, csr_pem
return key_pem, csr_pem
def __validate_csr_key_cli(self):
"""
Verifies that the client key and csr arguments are valid and correspond
to one another"""
# If CSR is provided, the private key should also be provided.
if self.csr_file and not self.key_file:
logger.fatal(("Please provide the private key file used in "
"generating the provided CSR"))
sys.exit(1)
# If CSR is provided, it must be readable and valid.
try:
if self.csr_file and not crypto_util.valid_csr(self.csr_file):
logger.fatal("The provided CSR is not a valid CSR")
sys.exit(1)
except IOError, e:
raise Exception("The provided CSR could not be read")
# If key is provided, it must be readable and valid.
try:
if self.key_file and not crypto_util.valid_privkey(self.key_file):
logger.fatal("The provided key is not a valid key")
sys.exit(1)
except IOError, e:
raise Exception("The provided key could not be read")
# If CSR and key are provided, the key must be the same key used
# in the CSR.
if self.csr_file and self.key_file:
try:
if not crypto_util.csr_matches_pubkey(self.csr_file,
self.key_file):
raise Exception("The key and CSR do not match")
except IOError, e:
raise Exception("The key or CSR files could not be read")
# def choice_of_ca(self):
# choices = self.get_cas()

View file

@ -54,8 +54,8 @@ def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
logger.debug('%s signed as %s' % (msg_with_nonce, signature))
n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].replace("L", "")))
e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].replace("L", "")))
n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].rstrip("L")))
e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].rstrip("L")))
return {
"nonce": le_util.b64_url_enc(nonce),
@ -148,12 +148,12 @@ def make_ss_cert(key_file, domains):
cert.set_not_before(current)
cert.set_not_after(expire)
name = cert.get_subject()
name.C = "US"
name.ST = "Michigan"
name.L = "Ann Arbor"
name.O = "University of Michigan and the EFF"
name.CN = domains[0]
subject = cert.get_subject()
subject.C = "US"
subject.ST = "Michigan"
subject.L = "Ann Arbor"
subject.O = "University of Michigan and the EFF"
subject.CN = domains[0]
cert.set_issuer(cert.get_subject())
cert.add_ext(M2Crypto.X509.new_extension('basicConstraints', 'CA:FALSE'))
@ -198,3 +198,89 @@ def get_cert_info(filename):
"pub_key": "RSA " + str(cert.get_pubkey().size() * 8),
}
# WARNING: the csr and private key file are possible attack vectors for TOCTOU
# We should either...
# A. Do more checks to verify that the CSR is trusted/valid
# B. Audit the parsing code for vulnerabilities
def valid_csr(csr_filename):
"""Check if csr_filename is a valid CSR for the given domains.
(Currently, could raise non-X.509-related errors such as IOError
associated with problems reading the file.)
:param csr_filename: Path to the purported CSR file.
:type csr_filename: str
:returns: Validity of CSR.
:rtype: bool"""
try:
csr = M2Crypto.X509.load_request(csr_filename)
return bool(csr.verify(csr.get_pubkey()))
except M2Crypto.X509.X509Error:
return False
def csr_matches_names(csr_filename, domains):
"""Check if csr_filename contains the subject of one of the domains
M2Crypto currently does not expose the OpenSSL interface to
also check the SAN extension. This is insufficient for full testing
(Currently, could raise non-X.509-related errors such as IOError
associated with problems reading the file.)
:param csr_filename: Path to the purported CSR file.
:type csr_filename: str
:param domains: domains the csr should contain
:type domains: list
:returns: If the csr subject contains one of the domains
:rtype: bool"""
try:
csr = M2Crypto.X509.load_request(csr_filename)
subject = csr.get_subject()
return subject.CN in domains
except M2Crypto.X509.X509Error:
return False
def valid_privkey(privkey_filename):
"""Check if privkey_filename is a valid RSA private key. (Currently,
could raise non-RSA-related errors such as IOError associated with
problems reading the file.)
:param privkey_filename: Path to the purported private key file.
:type privkey_filename: str
:returns: Validity of private key.
:rtype: bool"""
try:
privkey = M2Crypto.RSA.load_key(privkey_filename)
return bool(privkey.check_key())
except M2Crypto.RSA.RSAError:
return False
def csr_matches_pubkey(csr_filename, privkey_filename):
"""Check if the private key in the file corresponds to the subject
public key in the CSR.
:param csr_filename: Path to the purported CSR file.
:type csr_filename: str
:param privkey_filename: Path to the purported private key file.
:type privkey_filename: str
:returns: Correspondence of private key to CSR subject public key.
:rtype: bool"""
csr = M2Crypto.X509.load_request(csr_filename)
privkey = M2Crypto.RSA.load_key(privkey_filename)
csr_pub = csr.get_pubkey().get_rsa().pub()
privkey_pub = privkey.pub()
return csr_pub == privkey_pub