diff --git a/letsencrypt/client/CONFIG.py b/letsencrypt/client/CONFIG.py index bccfc777f..1c11510c1 100644 --- a/letsencrypt/client/CONFIG.py +++ b/letsencrypt/client/CONFIG.py @@ -52,10 +52,10 @@ INVALID_EXT = ".acme.invalid" CHALLENGE_PREFERENCES = ["dvsni", "recoveryToken"] # Mutually Exclusive Challenges - only solve 1 -EXCLUSIVE_CHALLENGES = [set(["dvsni", "simpleHttps"])] +EXCLUSIVE_CHALLENGES = [frozenset(["dvsni", "simpleHttps"])] # These are challenges that must be solved by a Configurator object -CONFIG_CHALLENGES = {"dvsni", "simpleHttps"} +CONFIG_CHALLENGES = frozenset(["dvsni", "simpleHttps"]) # Rewrite rule arguments used for redirections to https vhost REWRITE_HTTPS_ARGS = [ diff --git a/letsencrypt/client/acme.py b/letsencrypt/client/acme.py index ecf5a9ac8..392546b29 100644 --- a/letsencrypt/client/acme.py +++ b/letsencrypt/client/acme.py @@ -108,15 +108,15 @@ def authorization_request(req_id, name, server_nonce, responses, key_file): "nonce": server_nonce, "responses": responses, "signature": crypto_util.create_sig( - name + le_util.b64_url_dec(server_nonce), key_file), + name + le_util.jose_b64decode(server_nonce), key_file), } def certificate_request(csr_der, key): """Create ACME "certificateRequest" message. - :param csr_der: TODO - :type csr_der: TODO + :param csr_der: DER encoded CSR. + :type csr_der: str :param key: TODO :type key: TODO @@ -127,7 +127,7 @@ def certificate_request(csr_der, key): """ return { "type": "certificateRequest", - "csr": le_util.b64_url_enc(csr_der), + "csr": le_util.jose_b64encode(csr_der), "signature": crypto_util.create_sig(csr_der, key), } @@ -148,7 +148,7 @@ def revocation_request(key_file, cert_der): """ return { "type": "revocationRequest", - "certificate": le_util.b64_url_enc(cert_der), + "certificate": le_util.jose_b64encode(cert_der), "signature": crypto_util.create_sig(cert_der, key_file), } diff --git a/letsencrypt/client/acme_test.py b/letsencrypt/client/acme_test.py index e5eae6c9a..b4d552c0b 100644 --- a/letsencrypt/client/acme_test.py +++ b/letsencrypt/client/acme_test.py @@ -50,8 +50,8 @@ class PrettyTest(unittest.TestCase): def test_it(self): self.assertEqual( - self._call('{"foo": "bar", "foo2": "bar2"}'), - '{\n "foo2": "bar2", \n "foo": "bar"\n}') + self._call('{"foo": {"bar": "baz"}}'), + '{\n "foo": {\n "bar": "baz"\n }\n}') if __name__ == '__main__': diff --git a/letsencrypt/client/apache_configurator.py b/letsencrypt/client/apache_configurator.py index 9612b5872..b95d5c8bb 100644 --- a/letsencrypt/client/apache_configurator.py +++ b/letsencrypt/client/apache_configurator.py @@ -1106,7 +1106,7 @@ LogLevel warn \n\ # Create all of the challenge certs for t in chall_dict["listSNITuple"]: # Need to decode from base64 - r = le_util.b64_url_dec(t[1]) + r = le_util.jose_b64decode(t[1]) ext = self.dvsni_gen_ext(r, s) self.dvsni_create_chall_cert(t[0], ext, t[2], chall_dict["dvsni_key"]) @@ -1116,7 +1116,7 @@ LogLevel warn \n\ self.save("SNI Challenge", True) self.restart(True) - s = le_util.b64_url_enc(s) + s = le_util.jose_b64encode(s) return {"type":"dvsni", "s":s} def cleanup(self): diff --git a/letsencrypt/client/client.py b/letsencrypt/client/client.py index 2c976b34c..b4dc5542d 100644 --- a/letsencrypt/client/client.py +++ b/letsencrypt/client/client.py @@ -158,8 +158,8 @@ class Client(object): def acme_certificate(self, csr_der): """Handle ACME "certificate" phase. - :param csr_der: TODO - :type csr_der: TODO + :param csr_der: CSR in DER format. + :type csr_der: str :returns: ACME "certificate" message. :rtype: dict @@ -590,10 +590,23 @@ class Client(object): return challenge_objs, challenge_obj_indices def get_key_csr_pem(self, csr_return_format='der'): + """Return key and CSR, generate if necessary. + + Returns key and CSR using provided files or generating new files + if necessary. Both will be saved in PEM format on the + filesystem. The CSR can optionally be returned in DER format as + the CSR cannot be loaded back into M2Crypto. + + :param csr_return_format: If "der" returned CSR is in DER format, + PEM otherwise. + :param csr_return_format: str + + :returns: A pair of `(key, csr)`, where `key` is PEM encoded `str` + and `csr` is PEM/DER (depedning on `csr_return_format` + encoded `str`. + :rtype: tuple + """ - Returns key and CSR using provided files or generating new files if - necessary. Both will be saved in PEM format on the filesystem. - The CSR can optionally be returned in DER format.""" key_pem = None csr_pem = None if not self.key_file: diff --git a/letsencrypt/client/crypto_util.py b/letsencrypt/client/crypto_util.py index 2196f25ba..39b363362 100644 --- a/letsencrypt/client/crypto_util.py +++ b/letsencrypt/client/crypto_util.py @@ -16,7 +16,7 @@ from letsencrypt.client import logger def b64_cert_to_pem(b64_der_cert): return M2Crypto.X509.load_cert_der_string( - le_util.b64_url_dec(b64_der_cert)).as_pem() + le_util.jose_b64decode(b64_der_cert)).as_pem() def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE): @@ -58,14 +58,14 @@ def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE): e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].rstrip("L"))) return { - "nonce": le_util.b64_url_enc(nonce), + "nonce": le_util.jose_b64encode(nonce), "alg": "RS256", "jwk": { "kty": "RSA", - "n": le_util.b64_url_enc(n_bytes), - "e": le_util.b64_url_enc(e_bytes), + "n": le_util.jose_b64encode(n_bytes), + "e": le_util.jose_b64encode(e_bytes), }, - "sig": le_util.b64_url_enc(signature), + "sig": le_util.jose_b64encode(signature), } diff --git a/letsencrypt/client/le_util.py b/letsencrypt/client/le_util.py index f9afd8171..19070858f 100644 --- a/letsencrypt/client/le_util.py +++ b/letsencrypt/client/le_util.py @@ -1,41 +1,63 @@ """Utilities for all Let's Encrypt.""" import base64 -import grp import errno import os -import pwd import stat -import sys - -from letsencrypt.client import logger -def make_or_verify_dir(directory, permissions=0755, uid=0): +def make_or_verify_dir(directory, mode=0755, uid=0): + """Make sure directory exists with proper permissions. + + :param directory: Path to a directry. + :type directory: str + + :param mode: Diretory mode. + :type mode: int + + :param uid: Directory owner. + :type uid: int + + :raises: Exception -- TODO + + """ try: - os.makedirs(directory, permissions) + os.makedirs(directory, mode) except OSError as exception: if exception.errno == errno.EEXIST: - if not check_permissions(directory, permissions, uid): - logger.fatal("%s exists and does not contain the proper permissions or owner" % directory) - sys.exit(57) + if not check_permissions(directory, mode, uid): + raise Exception('%s exists and does not contain the proper ' + 'permissions or owner' % directory) else: raise -def check_permissions(filepath, mode, uid=0): - file_stat = os.stat(filepath) - if stat.S_IMODE(file_stat.st_mode) != mode: - return False - return file_stat.st_uid == uid -def unique_file(default_name, mode = 0777): - """ - Safely finds a unique file for writing only (by default) +def check_permissions(filepath, mode, uid=0): + """Check file or directory permissions. + + :param filepath: Path to the tested file (or directory). + :type filepath: str + + :param mode: Expected file mode. + :type mode: int + + :param uid: Expected file owner. + :type uid: int + + :returns: bool -- True if `mode` and `uid` match, False otherwise. + """ + file_stat = os.stat(filepath) + return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid + + +def unique_file(default_name, mode=0777): + """Safely finds a unique file for writing only (by default).""" count = 1 f_parsed = os.path.splitext(default_name) while 1: try: - fd = os.open(default_name, os.O_CREAT|os.O_EXCL|os.O_RDWR, mode) + fd = os.open( + default_name, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode) return os.fdopen(fd, 'w'), default_name except OSError: pass @@ -43,31 +65,51 @@ def unique_file(default_name, mode = 0777): count += 1 +# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C +# +# Jose Base64: +# +# - URL-safe Base64 +# +# - padding stripped -def drop_privs(): - nogroup = grp.getgrnam("nogroup").gr_gid - nobody = pwd.getpwnam("nobody").pw_uid - os.setgid(nogroup) - os.setgroups([]) - os.setuid(nobody) -# Quick implementations of b64 url safe encode/decode -# We will include a proper library in the future if the library -# doesn't conflict with our existing dependencies -def b64_url_enc(s): - try: - s = s.encode("utf8") - except: - pass +def jose_b64encode(data): + """JOSE Base64 encode. - i = base64.urlsafe_b64encode(s) - return i.rstrip("=") + :param data: Data to be encoded. + :type data: str or bytearray -def b64_url_dec(s): - try: - s = s.encode("utf8") - except: - pass + :raises: TypeError - pad = '=' * (4 - (len(s) % 4)) - return base64.urlsafe_b64decode(s + pad) + :returns: JOSE Base64 string. + :rtype: str + + """ + if not isinstance(data, str): + raise TypeError('argument should be str or bytearray') + return base64.urlsafe_b64encode(data).rstrip('=') + + +def jose_b64decode(data): + """JOSE Base64 decode. + + :param data: Base64 string to be decoded. If it's unicode, then + only ASCII characters are allowed. + :type data: str or unicode + + :raises: ValueError, TypeError + + :returns: Decoded data. + + """ + if isinstance(data, unicode): + try: + data = data.encode('ascii') + except UnicodeEncodeError: + raise ValueError( + 'unicode argument should contain only ASCII characters') + elif not isinstance(data, str): + raise TypeError('argument should be a str or unicode') + + return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4))) diff --git a/letsencrypt/client/le_util_test.py b/letsencrypt/client/le_util_test.py new file mode 100644 index 000000000..30743c24a --- /dev/null +++ b/letsencrypt/client/le_util_test.py @@ -0,0 +1,134 @@ +"""Tests for letsencrypt.client.le_util.""" +import os +import shutil +import tempfile +import unittest + + +class MakeOrVerifyDirTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.make_or_verify_dir. + + Note that it is not possible to test for a wrong directory owner, + as this testing script would have to be run as root. + + """ + + def setUp(self): + self.root_path = tempfile.mkdtemp() + self.path = os.path.join(self.root_path, 'foo') + os.mkdir(self.path, 0400) + + self.uid = os.getuid() + + def tearDown(self): + shutil.rmtree(self.root_path, ignore_errors=True) + + def _call(self, directory, mode): + from letsencrypt.client.le_util import make_or_verify_dir + return make_or_verify_dir(directory, mode, self.uid) + + def test_creates_dir_when_missing(self): + path = os.path.join(self.root_path, 'bar') + self._call(path, 0650) + self.assertTrue(os.path.isdir(path)) + # TODO: check mode + + def test_existing_correct_mode_does_not_fail(self): + self._call(self.path, 0400) + # TODO: check mode + + def test_existing_wrong_mode_fails(self): + self.assertRaises(Exception, self._call, self.path, 0600) + + +class CheckPermissionsTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.check_permissions. + + Note that it is not possible to test for a wrong file owner, + as this testing script would have to be run as root. + + """ + + def setUp(self): + _, self.path = tempfile.mkstemp() + self.uid = os.getuid() + + def tearDown(self): + os.remove(self.path) + + def _call(self, mode): + from letsencrypt.client.le_util import check_permissions + return check_permissions(self.path, mode, self.uid) + + def test_ok_mode(self): + os.chmod(self.path, 0600) + self.assertTrue(self._call(0600)) + + def test_wrong_mode(self): + os.chmod(self.path, 0400) + self.assertFalse(self._call(0600)) + + +# https://en.wikipedia.org/wiki/Base64#Examples +JOSE_B64_PADDING_EXAMPLES = { + 'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='), + 'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='), + 'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''), + 'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='), + 'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='), +} + + +B64_URL_UNSAFE_EXAMPLES = { + chr(251) + chr(239): '--8', + chr(255) * 2: '__8', +} + + +class JOSEB64EncodeTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.jose_b64encode.""" + + def _call(self, data): + from letsencrypt.client.le_util import jose_b64encode + return jose_b64encode(data) + + def test_unsafe_url(self): + for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): + self.assertEqual(self._call(text), b64) + + def test_different_paddings(self): + for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(text), b64) + + def test_unicode_fails_with_type_error(self): + self.assertRaises(TypeError, self._call, u'some unicode') + + +class JOSEB64DecodeTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.jose_b64decode.""" + + def _call(self, data): + from letsencrypt.client.le_util import jose_b64decode + return jose_b64decode(data) + + def test_unsafe_url(self): + for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64), text) + + def test_input_without_padding(self): + for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64), text) + + def test_input_with_padding(self): + for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64 + pad), text) + + def test_unicode_with_ascii(self): + self.assertEqual(self._call(u'YQ'), 'a') + + def test_non_ascii_unicode_fails(self): + self.assertRaises(ValueError, self._call, u'\u0105') + + +if __name__ == '__main__': + unittest.main()