mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Merge branch 'master' of github.com:letsencrypt/lets-encrypt-preview
Conflicts: letsencrypt/client/client.py
This commit is contained in:
commit
506c603e30
8 changed files with 252 additions and 63 deletions
|
|
@ -52,10 +52,10 @@ INVALID_EXT = ".acme.invalid"
|
|||
CHALLENGE_PREFERENCES = ["dvsni", "recoveryToken"]
|
||||
|
||||
# Mutually Exclusive Challenges - only solve 1
|
||||
EXCLUSIVE_CHALLENGES = [set(["dvsni", "simpleHttps"])]
|
||||
EXCLUSIVE_CHALLENGES = [frozenset(["dvsni", "simpleHttps"])]
|
||||
|
||||
# These are challenges that must be solved by a Configurator object
|
||||
CONFIG_CHALLENGES = {"dvsni", "simpleHttps"}
|
||||
CONFIG_CHALLENGES = frozenset(["dvsni", "simpleHttps"])
|
||||
|
||||
# Rewrite rule arguments used for redirections to https vhost
|
||||
REWRITE_HTTPS_ARGS = [
|
||||
|
|
|
|||
|
|
@ -108,15 +108,15 @@ def authorization_request(req_id, name, server_nonce, responses, key_file):
|
|||
"nonce": server_nonce,
|
||||
"responses": responses,
|
||||
"signature": crypto_util.create_sig(
|
||||
name + le_util.b64_url_dec(server_nonce), key_file),
|
||||
name + le_util.jose_b64decode(server_nonce), key_file),
|
||||
}
|
||||
|
||||
|
||||
def certificate_request(csr_der, key):
|
||||
"""Create ACME "certificateRequest" message.
|
||||
|
||||
:param csr_der: TODO
|
||||
:type csr_der: TODO
|
||||
:param csr_der: DER encoded CSR.
|
||||
:type csr_der: str
|
||||
|
||||
:param key: TODO
|
||||
:type key: TODO
|
||||
|
|
@ -127,7 +127,7 @@ def certificate_request(csr_der, key):
|
|||
"""
|
||||
return {
|
||||
"type": "certificateRequest",
|
||||
"csr": le_util.b64_url_enc(csr_der),
|
||||
"csr": le_util.jose_b64encode(csr_der),
|
||||
"signature": crypto_util.create_sig(csr_der, key),
|
||||
}
|
||||
|
||||
|
|
@ -148,7 +148,7 @@ def revocation_request(key_file, cert_der):
|
|||
"""
|
||||
return {
|
||||
"type": "revocationRequest",
|
||||
"certificate": le_util.b64_url_enc(cert_der),
|
||||
"certificate": le_util.jose_b64encode(cert_der),
|
||||
"signature": crypto_util.create_sig(cert_der, key_file),
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -50,8 +50,8 @@ class PrettyTest(unittest.TestCase):
|
|||
|
||||
def test_it(self):
|
||||
self.assertEqual(
|
||||
self._call('{"foo": "bar", "foo2": "bar2"}'),
|
||||
'{\n "foo2": "bar2", \n "foo": "bar"\n}')
|
||||
self._call('{"foo": {"bar": "baz"}}'),
|
||||
'{\n "foo": {\n "bar": "baz"\n }\n}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -1106,7 +1106,7 @@ LogLevel warn \n\
|
|||
# Create all of the challenge certs
|
||||
for t in chall_dict["listSNITuple"]:
|
||||
# Need to decode from base64
|
||||
r = le_util.b64_url_dec(t[1])
|
||||
r = le_util.jose_b64decode(t[1])
|
||||
ext = self.dvsni_gen_ext(r, s)
|
||||
self.dvsni_create_chall_cert(t[0], ext, t[2], chall_dict["dvsni_key"])
|
||||
|
||||
|
|
@ -1116,7 +1116,7 @@ LogLevel warn \n\
|
|||
self.save("SNI Challenge", True)
|
||||
self.restart(True)
|
||||
|
||||
s = le_util.b64_url_enc(s)
|
||||
s = le_util.jose_b64encode(s)
|
||||
return {"type":"dvsni", "s":s}
|
||||
|
||||
def cleanup(self):
|
||||
|
|
|
|||
|
|
@ -158,8 +158,8 @@ class Client(object):
|
|||
def acme_certificate(self, csr_der):
|
||||
"""Handle ACME "certificate" phase.
|
||||
|
||||
:param csr_der: TODO
|
||||
:type csr_der: TODO
|
||||
:param csr_der: CSR in DER format.
|
||||
:type csr_der: str
|
||||
|
||||
:returns: ACME "certificate" message.
|
||||
:rtype: dict
|
||||
|
|
@ -590,10 +590,23 @@ class Client(object):
|
|||
return challenge_objs, challenge_obj_indices
|
||||
|
||||
def get_key_csr_pem(self, csr_return_format='der'):
|
||||
"""Return key and CSR, generate if necessary.
|
||||
|
||||
Returns key and CSR using provided files or generating new files
|
||||
if necessary. Both will be saved in PEM format on the
|
||||
filesystem. The CSR can optionally be returned in DER format as
|
||||
the CSR cannot be loaded back into M2Crypto.
|
||||
|
||||
:param csr_return_format: If "der" returned CSR is in DER format,
|
||||
PEM otherwise.
|
||||
:param csr_return_format: str
|
||||
|
||||
:returns: A pair of `(key, csr)`, where `key` is PEM encoded `str`
|
||||
and `csr` is PEM/DER (depedning on `csr_return_format`
|
||||
encoded `str`.
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
Returns key and CSR using provided files or generating new files if
|
||||
necessary. Both will be saved in PEM format on the filesystem.
|
||||
The CSR can optionally be returned in DER format."""
|
||||
key_pem = None
|
||||
csr_pem = None
|
||||
if not self.key_file:
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ from letsencrypt.client import logger
|
|||
|
||||
def b64_cert_to_pem(b64_der_cert):
|
||||
return M2Crypto.X509.load_cert_der_string(
|
||||
le_util.b64_url_dec(b64_der_cert)).as_pem()
|
||||
le_util.jose_b64decode(b64_der_cert)).as_pem()
|
||||
|
||||
|
||||
def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
|
||||
|
|
@ -58,14 +58,14 @@ def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
|
|||
e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].rstrip("L")))
|
||||
|
||||
return {
|
||||
"nonce": le_util.b64_url_enc(nonce),
|
||||
"nonce": le_util.jose_b64encode(nonce),
|
||||
"alg": "RS256",
|
||||
"jwk": {
|
||||
"kty": "RSA",
|
||||
"n": le_util.b64_url_enc(n_bytes),
|
||||
"e": le_util.b64_url_enc(e_bytes),
|
||||
"n": le_util.jose_b64encode(n_bytes),
|
||||
"e": le_util.jose_b64encode(e_bytes),
|
||||
},
|
||||
"sig": le_util.b64_url_enc(signature),
|
||||
"sig": le_util.jose_b64encode(signature),
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,41 +1,63 @@
|
|||
"""Utilities for all Let's Encrypt."""
|
||||
import base64
|
||||
import grp
|
||||
import errno
|
||||
import os
|
||||
import pwd
|
||||
import stat
|
||||
import sys
|
||||
|
||||
from letsencrypt.client import logger
|
||||
|
||||
|
||||
def make_or_verify_dir(directory, permissions=0755, uid=0):
|
||||
def make_or_verify_dir(directory, mode=0755, uid=0):
|
||||
"""Make sure directory exists with proper permissions.
|
||||
|
||||
:param directory: Path to a directry.
|
||||
:type directory: str
|
||||
|
||||
:param mode: Diretory mode.
|
||||
:type mode: int
|
||||
|
||||
:param uid: Directory owner.
|
||||
:type uid: int
|
||||
|
||||
:raises: Exception -- TODO
|
||||
|
||||
"""
|
||||
try:
|
||||
os.makedirs(directory, permissions)
|
||||
os.makedirs(directory, mode)
|
||||
except OSError as exception:
|
||||
if exception.errno == errno.EEXIST:
|
||||
if not check_permissions(directory, permissions, uid):
|
||||
logger.fatal("%s exists and does not contain the proper permissions or owner" % directory)
|
||||
sys.exit(57)
|
||||
if not check_permissions(directory, mode, uid):
|
||||
raise Exception('%s exists and does not contain the proper '
|
||||
'permissions or owner' % directory)
|
||||
else:
|
||||
raise
|
||||
|
||||
def check_permissions(filepath, mode, uid=0):
|
||||
file_stat = os.stat(filepath)
|
||||
if stat.S_IMODE(file_stat.st_mode) != mode:
|
||||
return False
|
||||
return file_stat.st_uid == uid
|
||||
|
||||
def unique_file(default_name, mode = 0777):
|
||||
"""
|
||||
Safely finds a unique file for writing only (by default)
|
||||
def check_permissions(filepath, mode, uid=0):
|
||||
"""Check file or directory permissions.
|
||||
|
||||
:param filepath: Path to the tested file (or directory).
|
||||
:type filepath: str
|
||||
|
||||
:param mode: Expected file mode.
|
||||
:type mode: int
|
||||
|
||||
:param uid: Expected file owner.
|
||||
:type uid: int
|
||||
|
||||
:returns: bool -- True if `mode` and `uid` match, False otherwise.
|
||||
|
||||
"""
|
||||
file_stat = os.stat(filepath)
|
||||
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
|
||||
|
||||
|
||||
def unique_file(default_name, mode=0777):
|
||||
"""Safely finds a unique file for writing only (by default)."""
|
||||
count = 1
|
||||
f_parsed = os.path.splitext(default_name)
|
||||
while 1:
|
||||
try:
|
||||
fd = os.open(default_name, os.O_CREAT|os.O_EXCL|os.O_RDWR, mode)
|
||||
fd = os.open(
|
||||
default_name, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
|
||||
return os.fdopen(fd, 'w'), default_name
|
||||
except OSError:
|
||||
pass
|
||||
|
|
@ -43,31 +65,51 @@ def unique_file(default_name, mode = 0777):
|
|||
count += 1
|
||||
|
||||
|
||||
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
|
||||
#
|
||||
# Jose Base64:
|
||||
#
|
||||
# - URL-safe Base64
|
||||
#
|
||||
# - padding stripped
|
||||
|
||||
def drop_privs():
|
||||
nogroup = grp.getgrnam("nogroup").gr_gid
|
||||
nobody = pwd.getpwnam("nobody").pw_uid
|
||||
os.setgid(nogroup)
|
||||
os.setgroups([])
|
||||
os.setuid(nobody)
|
||||
|
||||
# Quick implementations of b64 url safe encode/decode
|
||||
# We will include a proper library in the future if the library
|
||||
# doesn't conflict with our existing dependencies
|
||||
def b64_url_enc(s):
|
||||
try:
|
||||
s = s.encode("utf8")
|
||||
except:
|
||||
pass
|
||||
def jose_b64encode(data):
|
||||
"""JOSE Base64 encode.
|
||||
|
||||
i = base64.urlsafe_b64encode(s)
|
||||
return i.rstrip("=")
|
||||
:param data: Data to be encoded.
|
||||
:type data: str or bytearray
|
||||
|
||||
def b64_url_dec(s):
|
||||
try:
|
||||
s = s.encode("utf8")
|
||||
except:
|
||||
pass
|
||||
:raises: TypeError
|
||||
|
||||
pad = '=' * (4 - (len(s) % 4))
|
||||
return base64.urlsafe_b64decode(s + pad)
|
||||
:returns: JOSE Base64 string.
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
if not isinstance(data, str):
|
||||
raise TypeError('argument should be str or bytearray')
|
||||
return base64.urlsafe_b64encode(data).rstrip('=')
|
||||
|
||||
|
||||
def jose_b64decode(data):
|
||||
"""JOSE Base64 decode.
|
||||
|
||||
:param data: Base64 string to be decoded. If it's unicode, then
|
||||
only ASCII characters are allowed.
|
||||
:type data: str or unicode
|
||||
|
||||
:raises: ValueError, TypeError
|
||||
|
||||
:returns: Decoded data.
|
||||
|
||||
"""
|
||||
if isinstance(data, unicode):
|
||||
try:
|
||||
data = data.encode('ascii')
|
||||
except UnicodeEncodeError:
|
||||
raise ValueError(
|
||||
'unicode argument should contain only ASCII characters')
|
||||
elif not isinstance(data, str):
|
||||
raise TypeError('argument should be a str or unicode')
|
||||
|
||||
return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4)))
|
||||
|
|
|
|||
134
letsencrypt/client/le_util_test.py
Normal file
134
letsencrypt/client/le_util_test.py
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
"""Tests for letsencrypt.client.le_util."""
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
|
||||
class MakeOrVerifyDirTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.client.le_util.make_or_verify_dir.
|
||||
|
||||
Note that it is not possible to test for a wrong directory owner,
|
||||
as this testing script would have to be run as root.
|
||||
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.root_path = tempfile.mkdtemp()
|
||||
self.path = os.path.join(self.root_path, 'foo')
|
||||
os.mkdir(self.path, 0400)
|
||||
|
||||
self.uid = os.getuid()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.root_path, ignore_errors=True)
|
||||
|
||||
def _call(self, directory, mode):
|
||||
from letsencrypt.client.le_util import make_or_verify_dir
|
||||
return make_or_verify_dir(directory, mode, self.uid)
|
||||
|
||||
def test_creates_dir_when_missing(self):
|
||||
path = os.path.join(self.root_path, 'bar')
|
||||
self._call(path, 0650)
|
||||
self.assertTrue(os.path.isdir(path))
|
||||
# TODO: check mode
|
||||
|
||||
def test_existing_correct_mode_does_not_fail(self):
|
||||
self._call(self.path, 0400)
|
||||
# TODO: check mode
|
||||
|
||||
def test_existing_wrong_mode_fails(self):
|
||||
self.assertRaises(Exception, self._call, self.path, 0600)
|
||||
|
||||
|
||||
class CheckPermissionsTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.client.le_util.check_permissions.
|
||||
|
||||
Note that it is not possible to test for a wrong file owner,
|
||||
as this testing script would have to be run as root.
|
||||
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
_, self.path = tempfile.mkstemp()
|
||||
self.uid = os.getuid()
|
||||
|
||||
def tearDown(self):
|
||||
os.remove(self.path)
|
||||
|
||||
def _call(self, mode):
|
||||
from letsencrypt.client.le_util import check_permissions
|
||||
return check_permissions(self.path, mode, self.uid)
|
||||
|
||||
def test_ok_mode(self):
|
||||
os.chmod(self.path, 0600)
|
||||
self.assertTrue(self._call(0600))
|
||||
|
||||
def test_wrong_mode(self):
|
||||
os.chmod(self.path, 0400)
|
||||
self.assertFalse(self._call(0600))
|
||||
|
||||
|
||||
# https://en.wikipedia.org/wiki/Base64#Examples
|
||||
JOSE_B64_PADDING_EXAMPLES = {
|
||||
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
|
||||
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
|
||||
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
|
||||
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
|
||||
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
|
||||
}
|
||||
|
||||
|
||||
B64_URL_UNSAFE_EXAMPLES = {
|
||||
chr(251) + chr(239): '--8',
|
||||
chr(255) * 2: '__8',
|
||||
}
|
||||
|
||||
|
||||
class JOSEB64EncodeTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.client.le_util.jose_b64encode."""
|
||||
|
||||
def _call(self, data):
|
||||
from letsencrypt.client.le_util import jose_b64encode
|
||||
return jose_b64encode(data)
|
||||
|
||||
def test_unsafe_url(self):
|
||||
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(text), b64)
|
||||
|
||||
def test_different_paddings(self):
|
||||
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(text), b64)
|
||||
|
||||
def test_unicode_fails_with_type_error(self):
|
||||
self.assertRaises(TypeError, self._call, u'some unicode')
|
||||
|
||||
|
||||
class JOSEB64DecodeTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.client.le_util.jose_b64decode."""
|
||||
|
||||
def _call(self, data):
|
||||
from letsencrypt.client.le_util import jose_b64decode
|
||||
return jose_b64decode(data)
|
||||
|
||||
def test_unsafe_url(self):
|
||||
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64), text)
|
||||
|
||||
def test_input_without_padding(self):
|
||||
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64), text)
|
||||
|
||||
def test_input_with_padding(self):
|
||||
for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64 + pad), text)
|
||||
|
||||
def test_unicode_with_ascii(self):
|
||||
self.assertEqual(self._call(u'YQ'), 'a')
|
||||
|
||||
def test_non_ascii_unicode_fails(self):
|
||||
self.assertRaises(ValueError, self._call, u'\u0105')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Reference in a new issue