From 6f32c41da3c2e6613ed9173211607fa76d9221e5 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Sat, 22 Nov 2014 20:57:31 +0100 Subject: [PATCH 01/11] le_util cleanup, le_util_test --- letsencrypt/client/apache_configurator.py | 4 +- letsencrypt/client/client.py | 6 +- letsencrypt/client/crypto_util.py | 10 +- letsencrypt/client/le_util.py | 123 ++++++++++++++-------- letsencrypt/client/le_util_test.py | 104 ++++++++++++++++++ 5 files changed, 195 insertions(+), 52 deletions(-) create mode 100644 letsencrypt/client/le_util_test.py diff --git a/letsencrypt/client/apache_configurator.py b/letsencrypt/client/apache_configurator.py index 9612b5872..b95d5c8bb 100644 --- a/letsencrypt/client/apache_configurator.py +++ b/letsencrypt/client/apache_configurator.py @@ -1106,7 +1106,7 @@ LogLevel warn \n\ # Create all of the challenge certs for t in chall_dict["listSNITuple"]: # Need to decode from base64 - r = le_util.b64_url_dec(t[1]) + r = le_util.jose_b64decode(t[1]) ext = self.dvsni_gen_ext(r, s) self.dvsni_create_chall_cert(t[0], ext, t[2], chall_dict["dvsni_key"]) @@ -1116,7 +1116,7 @@ LogLevel warn \n\ self.save("SNI Challenge", True) self.restart(True) - s = le_util.b64_url_enc(s) + s = le_util.jose_b64encode(s) return {"type":"dvsni", "s":s} def cleanup(self): diff --git a/letsencrypt/client/client.py b/letsencrypt/client/client.py index 7516eee05..36ab435cc 100644 --- a/letsencrypt/client/client.py +++ b/letsencrypt/client/client.py @@ -244,7 +244,7 @@ class Client(object): def revocation_request(self, key_file, cert_der): return {"type":"revocationRequest", - "certificate":le_util.b64_url_enc(cert_der), + "certificate":le_util.jose_b64encode(cert_der), "signature":crypto_util.create_sig(cert_der, key_file)} @@ -306,7 +306,7 @@ class Client(object): def certificate_request(self, csr_der, key): logger.info("Preparing and sending CSR..") return {"type":"certificateRequest", - "csr":le_util.b64_url_enc(csr_der), + "csr":le_util.jose_b64encode(csr_der), "signature":crypto_util.create_sig(csr_der, self.key_file)} def cleanup_challenges(self, challenge_objs): @@ -351,7 +351,7 @@ class Client(object): "nonce":server_nonce} auth_req["signature"] = crypto_util.create_sig( - name + le_util.b64_url_dec(server_nonce), self.key_file) + name + le_util.jose_b64decode(server_nonce), self.key_file) auth_req["responses"] = responses return auth_req diff --git a/letsencrypt/client/crypto_util.py b/letsencrypt/client/crypto_util.py index 815e795cd..f09dfd51f 100644 --- a/letsencrypt/client/crypto_util.py +++ b/letsencrypt/client/crypto_util.py @@ -15,7 +15,7 @@ from letsencrypt.client import le_util def b64_cert_to_pem(b64_der_cert): return M2Crypto.X509.load_cert_der_string( - le_util.b64_url_dec(b64_der_cert)).as_pem() + le_util.jose_b64decode(b64_der_cert)).as_pem() def create_sig(msg, key_file, signer_nonce=None, @@ -35,10 +35,10 @@ def create_sig(msg, key_file, signer_nonce=None, #print "signature:", signature n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].replace("L", ""))) e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].replace("L", ""))) - n_encoded = le_util.b64_url_enc(n_bytes) - e_encoded = le_util.b64_url_enc(e_bytes) - signer_nonce_encoded = le_util.b64_url_enc(signer_nonce) - sig_encoded = le_util.b64_url_enc(signature) + n_encoded = le_util.jose_b64encode(n_bytes) + e_encoded = le_util.jose_b64encode(e_bytes) + signer_nonce_encoded = le_util.jose_b64encode(signer_nonce) + sig_encoded = le_util.jose_b64encode(signature) jwk = {"kty": "RSA", "n": n_encoded, "e": e_encoded} signature = { "nonce": signer_nonce_encoded, diff --git a/letsencrypt/client/le_util.py b/letsencrypt/client/le_util.py index f9afd8171..68a8c405d 100644 --- a/letsencrypt/client/le_util.py +++ b/letsencrypt/client/le_util.py @@ -1,41 +1,63 @@ """Utilities for all Let's Encrypt.""" import base64 -import grp import errno import os -import pwd import stat -import sys - -from letsencrypt.client import logger -def make_or_verify_dir(directory, permissions=0755, uid=0): +def make_or_verify_dir(directory, mode=0755, uid=0): + """Make sure directory exists with proper permissions. + + :param directory: Path to a directry. + :type directory: str + + :param mode: Diretory mode. + :type mode: int + + :param uid: Directory owner. + :type uid: int + + :raises: Exception -- TODO + + """ try: - os.makedirs(directory, permissions) + os.makedirs(directory, mode) except OSError as exception: if exception.errno == errno.EEXIST: - if not check_permissions(directory, permissions, uid): - logger.fatal("%s exists and does not contain the proper permissions or owner" % directory) - sys.exit(57) + if not check_permissions(directory, mode, uid): + raise Exception('%s exists and does not contain the proper ' + 'permissions or owner' % directory) else: raise -def check_permissions(filepath, mode, uid=0): - file_stat = os.stat(filepath) - if stat.S_IMODE(file_stat.st_mode) != mode: - return False - return file_stat.st_uid == uid -def unique_file(default_name, mode = 0777): - """ - Safely finds a unique file for writing only (by default) +def check_permissions(filepath, mode, uid=0): + """Check file or directory permissions. + + :param filepath: Path to the tested file (or directory). + :type filepath: str + + :param mode: Expected file mode. + :type mode: int + + :param uid: Expected file owner. + :type uid: int + + :returns: bool -- True if `mode` and `uid` match, False otherwise. + """ + file_stat = os.stat(filepath) + return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid + + +def unique_file(default_name, mode=0777): + """Safely finds a unique file for writing only (by default).""" count = 1 f_parsed = os.path.splitext(default_name) while 1: try: - fd = os.open(default_name, os.O_CREAT|os.O_EXCL|os.O_RDWR, mode) + fd = os.open( + default_name, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode) return os.fdopen(fd, 'w'), default_name except OSError: pass @@ -43,31 +65,48 @@ def unique_file(default_name, mode = 0777): count += 1 +def _to_utf8(arg): + """Normalize to UTF-8 string.""" + return arg.encode('utf-8') if isinstance(arg, unicode) else arg -def drop_privs(): - nogroup = grp.getgrnam("nogroup").gr_gid - nobody = pwd.getpwnam("nobody").pw_uid - os.setgid(nogroup) - os.setgroups([]) - os.setuid(nobody) -# Quick implementations of b64 url safe encode/decode -# We will include a proper library in the future if the library -# doesn't conflict with our existing dependencies -def b64_url_enc(s): - try: - s = s.encode("utf8") - except: - pass +def jose_b64encode(arg): + """JOSE Base64 encode. - i = base64.urlsafe_b64encode(s) - return i.rstrip("=") + JOSE Base64: + - URL-safe Base64 + - padding stripped -def b64_url_dec(s): - try: - s = s.encode("utf8") - except: - pass + https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C - pad = '=' * (4 - (len(s) % 4)) - return base64.urlsafe_b64decode(s + pad) + :param arg: String to be encoded. Unicode input will be encoded + to UTF-8 before Base64 encoding. + :type arg: str or unicode + + :returns: JOSE Base64 string. + :rtype: str + + """ + return base64.urlsafe_b64encode(_to_utf8(arg)).rstrip('=') + + +def jose_b64decode(arg): + """JOSE Base64 decode. + + Jose Base64: + - URL-safe Base64 + - padding stripped + + https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C + + :param arg: Base64 string to be decoded. Unicode input will be + encoded to UTF-8 before Base64 decoding. + :type arg: str or unicode + + :returns: Decoded string. + :rtype: str + + """ + normalized = _to_utf8(arg) + return base64.urlsafe_b64decode( + normalized + '=' * (4 - (len(normalized) % 4))) diff --git a/letsencrypt/client/le_util_test.py b/letsencrypt/client/le_util_test.py new file mode 100644 index 000000000..66c21b473 --- /dev/null +++ b/letsencrypt/client/le_util_test.py @@ -0,0 +1,104 @@ +"""Tests for letsencrypt.client.le_util.""" +import os +import shutil +import tempfile +import unittest + + +class MakeOrVerifyDirTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.make_or_verify_dir. + + Note that it is not possible to test for a wrong directory owner, + as this testing script would have to be run as root. + + """ + + def setUp(self): + self.root_path = tempfile.mkdtemp() + self.path = os.path.join(self.root_path, 'foo') + os.mkdir(self.path, 0400) + + self.uid = os.getuid() + + def tearDown(self): + shutil.rmtree(self.root_path, ignore_errors=True) + + def _call(self, directory, mode): + from letsencrypt.client.le_util import make_or_verify_dir + return make_or_verify_dir(directory, mode, self.uid) + + def test_creates_dir_when_missing(self): + path = os.path.join(self.root_path, 'bar') + self._call(path, 0650) + self.assertTrue(os.path.isdir(path)) + # TODO: check mode + + def test_existing_correct_mode_does_not_fail(self): + self._call(self.path, 0400) + # TODO: check mode + + def test_existing_wrong_mode_fails(self): + self.assertRaises(Exception, self._call, self.path, 0600) + + +class CheckPermissionsTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.check_permissions. + + Note that it is not possible to test for a wrong file owner, + as this testing script would have to be run as root. + + """ + + def setUp(self): + _, self.path = tempfile.mkstemp() + self.uid = os.getuid() + + def tearDown(self): + os.remove(self.path) + + def _call(self, mode): + from letsencrypt.client.le_util import check_permissions + return check_permissions(self.path, mode, self.uid) + + def test_ok_mode(self): + os.chmod(self.path, 0600) + self.assertTrue(self._call(0600)) + + def test_wrong_mode(self): + os.chmod(self.path, 0400) + self.assertFalse(self._call(0600)) + + +class JOSEB64EncodeTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.jose_b64encode.""" + + def _call(self, arg): + from letsencrypt.client.le_util import jose_b64encode + return jose_b64encode(arg) + + def test_str(self): + self.assertEqual(self._call('foo'), 'Zm9v') + + def test_unicode(self): + self.assertEqual(self._call(u'\u0105'), 'xIU') + + +class JOSEB64DecodeTest(unittest.TestCase): + """Tests for letsencrypt.client.le_util.jose_b64decode.""" + + def _call(self, arg): + from letsencrypt.client.le_util import jose_b64decode + return jose_b64decode(arg) + + def test_str(self): + self.assertEqual(self._call('Zm9v='), 'foo') + + def test_unicode(self): + self.assertEqual(self._call(u'XIU='), '\\\x85') + + def test_fills_padding(self): + self.assertEqual(self._call('Zm9v'), 'foo') + + +if __name__ == '__main__': + unittest.main() From af4d9558069dc88b9daf95d6085a62fad9753e47 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Sun, 23 Nov 2014 02:37:34 +0100 Subject: [PATCH 02/11] Fix JOSE encoding mess --- letsencrypt/client/le_util.py | 53 +++++++++++++++--------------- letsencrypt/client/le_util_test.py | 26 +++++++-------- 2 files changed, 39 insertions(+), 40 deletions(-) diff --git a/letsencrypt/client/le_util.py b/letsencrypt/client/le_util.py index 68a8c405d..31edfd13b 100644 --- a/letsencrypt/client/le_util.py +++ b/letsencrypt/client/le_util.py @@ -65,48 +65,47 @@ def unique_file(default_name, mode=0777): count += 1 -def _to_utf8(arg): - """Normalize to UTF-8 string.""" - return arg.encode('utf-8') if isinstance(arg, unicode) else arg +# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C +# +# Jose Base64: +# +# - URL-safe Base64 +# +# - padding stripped -def jose_b64encode(arg): +def jose_b64encode(data, encoding='utf-8'): """JOSE Base64 encode. - JOSE Base64: - - URL-safe Base64 - - padding stripped + :param data: Data to be encoded. + :type data: str or unicode - https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C - - :param arg: String to be encoded. Unicode input will be encoded - to UTF-8 before Base64 encoding. - :type arg: str or unicode + :param encoding: Name of the encoding to be performed before + Base64 encoding. If not None, then `data` + has to be unicode. + :type encoding: str or None :returns: JOSE Base64 string. :rtype: str """ - return base64.urlsafe_b64encode(_to_utf8(arg)).rstrip('=') + encoded = data if encoding is None else data.encode(encoding) + return base64.urlsafe_b64encode(encoded).rstrip('=') -def jose_b64decode(arg): +def jose_b64decode(arg, decoding='utf-8'): """JOSE Base64 decode. - Jose Base64: - - URL-safe Base64 - - padding stripped + :param arg: Base64 string to be decoded. + :type arg: str - https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C + :param decoding: Name of the encoding to be performed after + Base64 decoding. + :type decoding: str or None - :param arg: Base64 string to be decoded. Unicode input will be - encoded to UTF-8 before Base64 decoding. - :type arg: str or unicode - - :returns: Decoded string. - :rtype: str + :returns: Decoded data. Unicode if `decoding` is not None. + :rtype: str or unicode """ - normalized = _to_utf8(arg) - return base64.urlsafe_b64decode( - normalized + '=' * (4 - (len(normalized) % 4))) + decoded = base64.urlsafe_b64decode(arg + '=' * (4 - (len(arg) % 4))) + return decoded if decoding is None else decoded.decode(decoding) diff --git a/letsencrypt/client/le_util_test.py b/letsencrypt/client/le_util_test.py index 66c21b473..b5126218b 100644 --- a/letsencrypt/client/le_util_test.py +++ b/letsencrypt/client/le_util_test.py @@ -72,32 +72,32 @@ class CheckPermissionsTest(unittest.TestCase): class JOSEB64EncodeTest(unittest.TestCase): """Tests for letsencrypt.client.le_util.jose_b64encode.""" - def _call(self, arg): + def _call(self, data, encoding): from letsencrypt.client.le_util import jose_b64encode - return jose_b64encode(arg) + return jose_b64encode(data, encoding) - def test_str(self): - self.assertEqual(self._call('foo'), 'Zm9v') + def test_without_encoding(self): + self.assertEqual(self._call('foo', None), 'Zm9v') - def test_unicode(self): - self.assertEqual(self._call(u'\u0105'), 'xIU') + def test_with_encoding(self): + self.assertEqual(self._call(u'\u0105', 'utf-8'), 'xIU') class JOSEB64DecodeTest(unittest.TestCase): """Tests for letsencrypt.client.le_util.jose_b64decode.""" - def _call(self, arg): + def _call(self, jose_b64_string, decoding): from letsencrypt.client.le_util import jose_b64decode - return jose_b64decode(arg) + return jose_b64decode(jose_b64_string, decoding) - def test_str(self): - self.assertEqual(self._call('Zm9v='), 'foo') + def test_without_decoding(self): + self.assertEqual(self._call('Zm9v=', None), 'foo') - def test_unicode(self): - self.assertEqual(self._call(u'XIU='), '\\\x85') + def test_with_encoding(self): + self.assertEqual(self._call('xIU=', 'utf-8'), u'\u0105') def test_fills_padding(self): - self.assertEqual(self._call('Zm9v'), 'foo') + self.assertEqual(self._call('Zm9v', None), 'foo') if __name__ == '__main__': From 22bea4c9750238a36fa96872e026d7a86a0094b4 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Sun, 23 Nov 2014 22:17:34 +0100 Subject: [PATCH 03/11] Better tests for jose b64 padding --- letsencrypt/client/le_util_test.py | 41 +++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/letsencrypt/client/le_util_test.py b/letsencrypt/client/le_util_test.py index b5126218b..7f8bc8c58 100644 --- a/letsencrypt/client/le_util_test.py +++ b/letsencrypt/client/le_util_test.py @@ -69,6 +69,22 @@ class CheckPermissionsTest(unittest.TestCase): self.assertFalse(self._call(0600)) +# https://en.wikipedia.org/wiki/Base64#Examples +JOSE_B64_PADDING_EXAMPLES = { + 'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='), + 'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='), + 'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''), + 'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='), + 'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='), +} + + +B64_URL_UNSAFE_EXAMPLES = { + chr(251) + chr(239): '--8', + chr(255) * 2: '__8', +} + + class JOSEB64EncodeTest(unittest.TestCase): """Tests for letsencrypt.client.le_util.jose_b64encode.""" @@ -76,8 +92,13 @@ class JOSEB64EncodeTest(unittest.TestCase): from letsencrypt.client.le_util import jose_b64encode return jose_b64encode(data, encoding) - def test_without_encoding(self): - self.assertEqual(self._call('foo', None), 'Zm9v') + def test_unsafe_url(self): + for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): + self.assertEqual(self._call(text, None), b64) + + def test_different_paddings(self): + for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(text, None), b64) def test_with_encoding(self): self.assertEqual(self._call(u'\u0105', 'utf-8'), 'xIU') @@ -90,15 +111,21 @@ class JOSEB64DecodeTest(unittest.TestCase): from letsencrypt.client.le_util import jose_b64decode return jose_b64decode(jose_b64_string, decoding) - def test_without_decoding(self): - self.assertEqual(self._call('Zm9v=', None), 'foo') + def test_unsafe_url(self): + for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64, None), text) + + def test_input_without_padding(self): + for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64, None), text) + + def test_input_with_padding(self): + for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64 + pad, None), text) def test_with_encoding(self): self.assertEqual(self._call('xIU=', 'utf-8'), u'\u0105') - def test_fills_padding(self): - self.assertEqual(self._call('Zm9v', None), 'foo') - if __name__ == '__main__': unittest.main() From 421f54127140d4c82e14b3ee65617d4f41ddaf33 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Sun, 23 Nov 2014 22:27:22 +0100 Subject: [PATCH 04/11] Allow unicode input for JOSE Base64 --- letsencrypt/client/le_util.py | 5 +++-- letsencrypt/client/le_util_test.py | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/letsencrypt/client/le_util.py b/letsencrypt/client/le_util.py index 31edfd13b..8a87d7c09 100644 --- a/letsencrypt/client/le_util.py +++ b/letsencrypt/client/le_util.py @@ -97,7 +97,7 @@ def jose_b64decode(arg, decoding='utf-8'): """JOSE Base64 decode. :param arg: Base64 string to be decoded. - :type arg: str + :type arg: str or unicode :param decoding: Name of the encoding to be performed after Base64 decoding. @@ -107,5 +107,6 @@ def jose_b64decode(arg, decoding='utf-8'): :rtype: str or unicode """ - decoded = base64.urlsafe_b64decode(arg + '=' * (4 - (len(arg) % 4))) + ascii = arg.encode('ascii') # equivalent to str(arg), for unicode input + decoded = base64.urlsafe_b64decode(ascii + '=' * (4 - (len(ascii) % 4))) return decoded if decoding is None else decoded.decode(decoding) diff --git a/letsencrypt/client/le_util_test.py b/letsencrypt/client/le_util_test.py index 7f8bc8c58..6c2714a8d 100644 --- a/letsencrypt/client/le_util_test.py +++ b/letsencrypt/client/le_util_test.py @@ -126,6 +126,9 @@ class JOSEB64DecodeTest(unittest.TestCase): def test_with_encoding(self): self.assertEqual(self._call('xIU=', 'utf-8'), u'\u0105') + def test_unicode(self): + self.assertEqual(self._call(u'YQ', None), 'a') + if __name__ == '__main__': unittest.main() From c2d0acb0637e14d6e30ed56be3eebac4d336155a Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Mon, 24 Nov 2014 02:16:24 +0100 Subject: [PATCH 05/11] Remove heisenbug from acme_test --- letsencrypt/client/acme_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/letsencrypt/client/acme_test.py b/letsencrypt/client/acme_test.py index e5eae6c9a..b4d552c0b 100644 --- a/letsencrypt/client/acme_test.py +++ b/letsencrypt/client/acme_test.py @@ -50,8 +50,8 @@ class PrettyTest(unittest.TestCase): def test_it(self): self.assertEqual( - self._call('{"foo": "bar", "foo2": "bar2"}'), - '{\n "foo2": "bar2", \n "foo": "bar"\n}') + self._call('{"foo": {"bar": "baz"}}'), + '{\n "foo": {\n "bar": "baz"\n }\n}') if __name__ == '__main__': From 1c1d9221c82e5c52763ef8133a1bfe4616cd81e7 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Mon, 24 Nov 2014 02:23:27 +0100 Subject: [PATCH 06/11] py26 doesn't support set {} constructor. Use frozenset. --- letsencrypt/client/CONFIG.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/letsencrypt/client/CONFIG.py b/letsencrypt/client/CONFIG.py index bccfc777f..1c11510c1 100644 --- a/letsencrypt/client/CONFIG.py +++ b/letsencrypt/client/CONFIG.py @@ -52,10 +52,10 @@ INVALID_EXT = ".acme.invalid" CHALLENGE_PREFERENCES = ["dvsni", "recoveryToken"] # Mutually Exclusive Challenges - only solve 1 -EXCLUSIVE_CHALLENGES = [set(["dvsni", "simpleHttps"])] +EXCLUSIVE_CHALLENGES = [frozenset(["dvsni", "simpleHttps"])] # These are challenges that must be solved by a Configurator object -CONFIG_CHALLENGES = {"dvsni", "simpleHttps"} +CONFIG_CHALLENGES = frozenset(["dvsni", "simpleHttps"]) # Rewrite rule arguments used for redirections to https vhost REWRITE_HTTPS_ARGS = [ From aef18c4413c4d480eef786a9ae47efd024e57da7 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Mon, 24 Nov 2014 11:00:08 +0100 Subject: [PATCH 07/11] JOSE Base64: encode str only, decode str or ascii unicode --- letsencrypt/client/le_util.py | 41 ++++++++++++++++-------------- letsencrypt/client/le_util_test.py | 30 +++++++++++----------- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/letsencrypt/client/le_util.py b/letsencrypt/client/le_util.py index 8a87d7c09..19070858f 100644 --- a/letsencrypt/client/le_util.py +++ b/letsencrypt/client/le_util.py @@ -74,39 +74,42 @@ def unique_file(default_name, mode=0777): # - padding stripped -def jose_b64encode(data, encoding='utf-8'): +def jose_b64encode(data): """JOSE Base64 encode. :param data: Data to be encoded. - :type data: str or unicode + :type data: str or bytearray - :param encoding: Name of the encoding to be performed before - Base64 encoding. If not None, then `data` - has to be unicode. - :type encoding: str or None + :raises: TypeError :returns: JOSE Base64 string. :rtype: str """ - encoded = data if encoding is None else data.encode(encoding) - return base64.urlsafe_b64encode(encoded).rstrip('=') + if not isinstance(data, str): + raise TypeError('argument should be str or bytearray') + return base64.urlsafe_b64encode(data).rstrip('=') -def jose_b64decode(arg, decoding='utf-8'): +def jose_b64decode(data): """JOSE Base64 decode. - :param arg: Base64 string to be decoded. - :type arg: str or unicode + :param data: Base64 string to be decoded. If it's unicode, then + only ASCII characters are allowed. + :type data: str or unicode - :param decoding: Name of the encoding to be performed after - Base64 decoding. - :type decoding: str or None + :raises: ValueError, TypeError - :returns: Decoded data. Unicode if `decoding` is not None. - :rtype: str or unicode + :returns: Decoded data. """ - ascii = arg.encode('ascii') # equivalent to str(arg), for unicode input - decoded = base64.urlsafe_b64decode(ascii + '=' * (4 - (len(ascii) % 4))) - return decoded if decoding is None else decoded.decode(decoding) + if isinstance(data, unicode): + try: + data = data.encode('ascii') + except UnicodeEncodeError: + raise ValueError( + 'unicode argument should contain only ASCII characters') + elif not isinstance(data, str): + raise TypeError('argument should be a str or unicode') + + return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4))) diff --git a/letsencrypt/client/le_util_test.py b/letsencrypt/client/le_util_test.py index 6c2714a8d..30743c24a 100644 --- a/letsencrypt/client/le_util_test.py +++ b/letsencrypt/client/le_util_test.py @@ -88,46 +88,46 @@ B64_URL_UNSAFE_EXAMPLES = { class JOSEB64EncodeTest(unittest.TestCase): """Tests for letsencrypt.client.le_util.jose_b64encode.""" - def _call(self, data, encoding): + def _call(self, data): from letsencrypt.client.le_util import jose_b64encode - return jose_b64encode(data, encoding) + return jose_b64encode(data) def test_unsafe_url(self): for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): - self.assertEqual(self._call(text, None), b64) + self.assertEqual(self._call(text), b64) def test_different_paddings(self): for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): - self.assertEqual(self._call(text, None), b64) + self.assertEqual(self._call(text), b64) - def test_with_encoding(self): - self.assertEqual(self._call(u'\u0105', 'utf-8'), 'xIU') + def test_unicode_fails_with_type_error(self): + self.assertRaises(TypeError, self._call, u'some unicode') class JOSEB64DecodeTest(unittest.TestCase): """Tests for letsencrypt.client.le_util.jose_b64decode.""" - def _call(self, jose_b64_string, decoding): + def _call(self, data): from letsencrypt.client.le_util import jose_b64decode - return jose_b64decode(jose_b64_string, decoding) + return jose_b64decode(data) def test_unsafe_url(self): for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): - self.assertEqual(self._call(b64, None), text) + self.assertEqual(self._call(b64), text) def test_input_without_padding(self): for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): - self.assertEqual(self._call(b64, None), text) + self.assertEqual(self._call(b64), text) def test_input_with_padding(self): for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems(): - self.assertEqual(self._call(b64 + pad, None), text) + self.assertEqual(self._call(b64 + pad), text) - def test_with_encoding(self): - self.assertEqual(self._call('xIU=', 'utf-8'), u'\u0105') + def test_unicode_with_ascii(self): + self.assertEqual(self._call(u'YQ'), 'a') - def test_unicode(self): - self.assertEqual(self._call(u'YQ', None), 'a') + def test_non_ascii_unicode_fails(self): + self.assertRaises(ValueError, self._call, u'\u0105') if __name__ == '__main__': From 882170559d9fc94ddbb5334a36645222a8541e5c Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Mon, 24 Nov 2014 13:15:09 +0100 Subject: [PATCH 08/11] TODO encode? comments for jose_b64encode --- letsencrypt/client/acme.py | 4 ++-- letsencrypt/client/apache_configurator.py | 2 +- letsencrypt/client/crypto_util.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/letsencrypt/client/acme.py b/letsencrypt/client/acme.py index 53000d491..dcd8157e3 100644 --- a/letsencrypt/client/acme.py +++ b/letsencrypt/client/acme.py @@ -127,7 +127,7 @@ def certificate_request(csr_der, key): """ return { "type": "certificateRequest", - "csr": le_util.jose_b64encode(csr_der), + "csr": le_util.jose_b64encode(csr_der), # TODO: csr_der.encode? "signature": crypto_util.create_sig(csr_der, key), } @@ -148,7 +148,7 @@ def revocation_request(key_file, cert_der): """ return { "type": "revocationRequest", - "certificate": le_util.jose_b64encode(cert_der), + "certificate": le_util.jose_b64encode(cert_der), # TODO: csr_der.encode? "signature": crypto_util.create_sig(cert_der, key_file), } diff --git a/letsencrypt/client/apache_configurator.py b/letsencrypt/client/apache_configurator.py index b95d5c8bb..389530e56 100644 --- a/letsencrypt/client/apache_configurator.py +++ b/letsencrypt/client/apache_configurator.py @@ -1116,7 +1116,7 @@ LogLevel warn \n\ self.save("SNI Challenge", True) self.restart(True) - s = le_util.jose_b64encode(s) + s = le_util.jose_b64encode(s) # TODO: s.encode? return {"type":"dvsni", "s":s} def cleanup(self): diff --git a/letsencrypt/client/crypto_util.py b/letsencrypt/client/crypto_util.py index 0ba6451b0..9185ec2ca 100644 --- a/letsencrypt/client/crypto_util.py +++ b/letsencrypt/client/crypto_util.py @@ -58,14 +58,14 @@ def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE): e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].replace("L", ""))) return { - "nonce": le_util.jose_b64encode(nonce), + "nonce": le_util.jose_b64encode(nonce), # TODO: nonce.encode? "alg": "RS256", "jwk": { "kty": "RSA", - "n": le_util.jose_b64encode(n_bytes), - "e": le_util.jose_b64encode(e_bytes), + "n": le_util.jose_b64encode(n_bytes), # TODO: n_bytes.encode? + "e": le_util.jose_b64encode(e_bytes), # TODO: e_bytes.encode? }, - "sig": le_util.jose_b64encode(signature), + "sig": le_util.jose_b64encode(signature), # TODO: signature.encode? } From 0dd530a4d12b7f52b0edd9ce4e8bcfaeeabebc59 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Mon, 24 Nov 2014 13:26:21 +0100 Subject: [PATCH 09/11] Fix TODO encode? in create_sig - n, e are `int`s, applied transformations make it `str` - Crypto.Signature.PKCS1_v1_5.new [1] returns PKCS115_SigScheme, and its `sign()` method returns `str` [2] - docs is OK: requires `nonce` to be `str` - `create_sig` is not called with custom `nonce` argument anywhere in the code [1] https://www.dlitz.net/software/pycrypto/api/2.6/Crypto.Signature.PKCS1_v1_5-module.html#new [2] https://www.dlitz.net/software/pycrypto/api/2.6/Crypto.Signature.PKCS1_v1_5.PKCS115_SigScheme-class.html#sign --- letsencrypt/client/crypto_util.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/letsencrypt/client/crypto_util.py b/letsencrypt/client/crypto_util.py index 9185ec2ca..0ba6451b0 100644 --- a/letsencrypt/client/crypto_util.py +++ b/letsencrypt/client/crypto_util.py @@ -58,14 +58,14 @@ def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE): e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].replace("L", ""))) return { - "nonce": le_util.jose_b64encode(nonce), # TODO: nonce.encode? + "nonce": le_util.jose_b64encode(nonce), "alg": "RS256", "jwk": { "kty": "RSA", - "n": le_util.jose_b64encode(n_bytes), # TODO: n_bytes.encode? - "e": le_util.jose_b64encode(e_bytes), # TODO: e_bytes.encode? + "n": le_util.jose_b64encode(n_bytes), + "e": le_util.jose_b64encode(e_bytes), }, - "sig": le_util.jose_b64encode(signature), # TODO: signature.encode? + "sig": le_util.jose_b64encode(signature), } From 018ebd40879aa5b5156d71b9505e7dbd5ce12e68 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Mon, 24 Nov 2014 13:32:55 +0100 Subject: [PATCH 10/11] Fix TODO encode? in apache_configurator. `s = Random.get_random_bytes(CONFIG.S_SIZE)`, so `s` is `str`. --- letsencrypt/client/apache_configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/letsencrypt/client/apache_configurator.py b/letsencrypt/client/apache_configurator.py index 389530e56..b95d5c8bb 100644 --- a/letsencrypt/client/apache_configurator.py +++ b/letsencrypt/client/apache_configurator.py @@ -1116,7 +1116,7 @@ LogLevel warn \n\ self.save("SNI Challenge", True) self.restart(True) - s = le_util.jose_b64encode(s) # TODO: s.encode? + s = le_util.jose_b64encode(s) return {"type":"dvsni", "s":s} def cleanup(self): From 43ae01b4c81a75e5442abec989cbcd2673835100 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Mon, 24 Nov 2014 13:45:15 +0100 Subject: [PATCH 11/11] Fix TODO encode? in acme.py --- letsencrypt/client/acme.py | 8 ++++---- letsencrypt/client/client.py | 25 ++++++++++++++++++------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/letsencrypt/client/acme.py b/letsencrypt/client/acme.py index dcd8157e3..392546b29 100644 --- a/letsencrypt/client/acme.py +++ b/letsencrypt/client/acme.py @@ -115,8 +115,8 @@ def authorization_request(req_id, name, server_nonce, responses, key_file): def certificate_request(csr_der, key): """Create ACME "certificateRequest" message. - :param csr_der: TODO - :type csr_der: TODO + :param csr_der: DER encoded CSR. + :type csr_der: str :param key: TODO :type key: TODO @@ -127,7 +127,7 @@ def certificate_request(csr_der, key): """ return { "type": "certificateRequest", - "csr": le_util.jose_b64encode(csr_der), # TODO: csr_der.encode? + "csr": le_util.jose_b64encode(csr_der), "signature": crypto_util.create_sig(csr_der, key), } @@ -148,7 +148,7 @@ def revocation_request(key_file, cert_der): """ return { "type": "revocationRequest", - "certificate": le_util.jose_b64encode(cert_der), # TODO: csr_der.encode? + "certificate": le_util.jose_b64encode(cert_der), "signature": crypto_util.create_sig(cert_der, key_file), } diff --git a/letsencrypt/client/client.py b/letsencrypt/client/client.py index 590714d6f..43e3ae17e 100644 --- a/letsencrypt/client/client.py +++ b/letsencrypt/client/client.py @@ -161,8 +161,8 @@ class Client(object): def acme_certificate(self, csr_der): """Handle ACME "certificate" phase. - :param csr_der: TODO - :type csr_der: TODO + :param csr_der: CSR in DER format. + :type csr_der: str :returns: ACME "certificate" message. :rtype: dict @@ -593,11 +593,22 @@ class Client(object): return challenge_objs, challenge_obj_indices def get_key_csr_pem(self, csr_return_format='der'): - """ - Returns key and CSR using provided files or generating new files if - necessary. Both will be saved in pem format on the filesystem. - The CSR can optionally be returned in DER format as the CSR cannot be - loaded back into M2Crypto. + """Return key and CSR, generate if necessary. + + Returns key and CSR using provided files or generating new files + if necessary. Both will be saved in PEM format on the + filesystem. The CSR can optionally be returned in DER format as + the CSR cannot be loaded back into M2Crypto. + + :param csr_return_format: If "der" returned CSR is in DER format, + PEM otherwise. + :param csr_return_format: str + + :returns: A pair of `(key, csr)`, where `key` is PEM encoded `str` + and `csr` is PEM/DER (depedning on `csr_return_format` + encoded `str`. + :rtype: tuple + """ key_pem = None csr_pem = None