Bring in @Kuba PEP-8 BranchMerge branch 'kuba-pep8'

This commit is contained in:
James Kasten 2014-11-23 16:02:01 -08:00
commit 26e5535d21
5 changed files with 666 additions and 394 deletions

View file

@ -1,9 +1,12 @@
"""Validate JSON objects as ACME protocol messages."""
"""ACME protocol messages."""
import json
import pkg_resources
import jsonschema
from letsencrypt.client import crypto_util
from letsencrypt.client import le_util
SCHEMATA = dict([
(schema, json.load(open(pkg_resources.resource_filename(
@ -57,3 +60,110 @@ def pretty(json_string):
"""
return json.dumps(json.loads(json_string), indent=4)
def challenge_request(names):
"""Create ACME "challengeRequest message.
TODO: Temporarily only enabling one name
:param names: TODO
:type names: list
:returns: ACME "challengeRequest" message.
:rtype: dict
"""
return {
"type": "challengeRequest",
"identifier": names[0],
}
def authorization_request(req_id, name, server_nonce, responses, key_file):
"""Create ACME "authoriazationRequest" message.
:param req_id: TODO
:type req_id: TODO
:param name: TODO
:type name: TODO
:param server_nonce: TODO
:type server_nonce: TODO
:param responses: TODO
:type response: TODO
:param key_file: TODO
:type key_file: TODO
:returns: ACME "authoriazationRequest" message.
:rtype: dict
"""
return {
"type": "authorizationRequest",
"sessionID": req_id,
"nonce": server_nonce,
"responses": responses,
"signature": crypto_util.create_sig(
name + le_util.b64_url_dec(server_nonce), key_file),
}
def certificate_request(csr_der, key):
"""Create ACME "certificateRequest" message.
:param csr_der: TODO
:type csr_der: TODO
:param key: TODO
:type key: TODO
:returns: ACME "certificateRequest" message.
:rtype: dict
"""
return {
"type": "certificateRequest",
"csr": le_util.b64_url_enc(csr_der),
"signature": crypto_util.create_sig(csr_der, key),
}
def revocation_request(key_file, cert_der):
"""Create ACME "revocationRequest" message.
:param key_file: Path to a file containing RSA key. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:type key_file: str
:param cert_der: DER encoded certificate.
:type cert_der: str
:returns: ACME "revocationRequest" message.
:rtype: dict
"""
return {
"type": "revocationRequest",
"certificate": le_util.b64_url_enc(cert_der),
"signature": crypto_util.create_sig(cert_der, key_file),
}
def status_request(token):
"""Create ACME "statusRequest" message.
:param token: Token provided in ACME "defer" message.
:type token: str
:returns: ACME "statusRequest" message.
:rtype: dict
"""
return {
"type": "statusRequest",
"token": token,
}

View file

@ -1,3 +1,10 @@
"""ACME challenge."""
import sys
from letsencrypt.client import CONFIG
from letsencrypt.client import logger
class Challenge(object):
def __init__(self, configurator):
@ -11,3 +18,119 @@ class Challenge(object):
def cleanup(self):
raise NotImplementedError()
def gen_challenge_path(challenges, combos=None):
"""Generate a plan to get authority over the identity.
TODO: Make sure that the challenges are feasible...
Example: Do you have the recovery key?
:param challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client
in order to prove possession of the identifier.
:type challenges: list
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"),
each of which would be sufficient to prove
possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:rtype: list
"""
if combos:
return _find_smart_path(challenges, combos)
else:
return _find_dumb_path(challenges)
def _find_smart_path(challenges, combos):
"""
Can be called if combinations is included
Function uses a simple ranking system to choose the combo with the
lowest cost
:param challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client
in order to prove possession of the identifier.
:type challenges: list
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"),
each of which would be sufficient to prove
possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:rtype: list
"""
chall_cost = {}
max_cost = 0
for i, chall in enumerate(CONFIG.CHALLENGE_PREFERENCES):
chall_cost[chall] = i
max_cost += i
best_combo = []
# Set above completing all of the available challenges
best_combo_cost = max_cost + 1
combo_total = 0
for combo in combos:
for challenge_index in combo:
combo_total += chall_cost.get(challenges[
challenge_index]["type"], max_cost)
if combo_total < best_combo_cost:
best_combo = combo
best_combo_cost = combo_total
combo_total = 0
if not best_combo:
logger.fatal("Client does not support any combination of "
"challenges to satisfy ACME server")
sys.exit(22)
return best_combo
def _find_dumb_path(challenges):
"""
Should be called if the combinations hint is not included by the server
This function returns the best path that does not contain multiple
mutually exclusive challenges
:param challanges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client
in order to prove possession of the identifier.
:type challenges: list
:returns: List of indices from `challenges`.
:rtype: list
"""
# Add logic for a crappy server
# Choose a DV
path = []
for pref_c in CONFIG.CHALLENGE_PREFERENCES:
for i, offered_challenge in enumerate(challenges):
if (pref_c == offered_challenge["type"] and
is_preferred(offered_challenge["type"], path)):
path.append((i, offered_challenge["type"]))
return [i for (i, _) in path]
def is_preferred(offered_challenge_type, path):
for _, challenge_type in path:
for mutually_exclusive in CONFIG.EXCLUSIVE_CHALLENGES:
# Second part is in case we eventually allow multiple names
# to be challenges at the same time
if (challenge_type in mutually_exclusive and
offered_challenge_type in mutually_exclusive and
challenge_type != offered_challenge_type):
return False
return True

File diff suppressed because it is too large Load diff

View file

@ -36,7 +36,9 @@ class Configurator(object):
def get_all_certs_keys(self):
"""Retrieve all certs and keys set in configuration.
returns: list of tuples with form [(cert, key, path)]
:returns: List of tuples with form [(cert, key, path)].
:rtype: list
"""
raise NotImplementedError()

View file

@ -11,6 +11,7 @@ import M2Crypto
from letsencrypt.client import CONFIG
from letsencrypt.client import le_util
from letsencrypt.client import logger
def b64_cert_to_pem(b64_der_cert):
@ -18,36 +19,54 @@ def b64_cert_to_pem(b64_der_cert):
le_util.b64_url_dec(b64_der_cert)).as_pem()
def create_sig(msg, key_file, signer_nonce=None,
signer_nonce_len=CONFIG.NONCE_SIZE):
# DOES prepend signer_nonce to message
# TODO: Change this over to M2Crypto... PKey
# Protect against crypto unicode errors... is this sufficient?
# Do I need to escape?
def create_sig(msg, key_file, nonce=None, nonce_len=CONFIG.NONCE_SIZE):
"""Create signature with nonce prepended to the message.
TODO: Change this over to M2Crypto... PKey
Protect against crypto unicode errors... is this sufficient?
Do I need to escape?
:param msg: Message to be signed
:type msg: Anything with __str__ method
:param key_file: Path to a file containing RSA key. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:type key_file: str
:param nonce: Nonce to be used. If None, nonce of `nonce_len` size
will be randomly genereted.
:type nonce: str or None
:param nonce_len: Size of the automaticaly generated nonce.
:type nonce_len: int
:returns: Signature.
:rtype: dict
"""
msg = str(msg)
key = Crypto.PublicKey.RSA.importKey(open(key_file).read())
if signer_nonce is None:
signer_nonce = Random.get_random_bytes(signer_nonce_len)
hashed = Crypto.Hash.SHA256.new(signer_nonce + msg)
signer = Crypto.Signature.PKCS1_v1_5.new(key)
signature = signer.sign(hashed)
#print "signing:", signer_nonce + msg
#print "signature:", signature
nonce = Random.get_random_bytes(nonce_len) if nonce is None else nonce
msg_with_nonce = nonce + msg
hashed = Crypto.Hash.SHA256.new(msg_with_nonce)
signature = Crypto.Signature.PKCS1_v1_5.new(key).sign(hashed)
logger.debug('%s signed as %s' % (msg_with_nonce, signature))
n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].replace("L", "")))
e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].replace("L", "")))
n_encoded = le_util.b64_url_enc(n_bytes)
e_encoded = le_util.b64_url_enc(e_bytes)
signer_nonce_encoded = le_util.b64_url_enc(signer_nonce)
sig_encoded = le_util.b64_url_enc(signature)
jwk = {"kty": "RSA", "n": n_encoded, "e": e_encoded}
signature = {
"nonce": signer_nonce_encoded,
return {
"nonce": le_util.b64_url_enc(nonce),
"alg": "RS256",
"jwk": jwk,
"sig": sig_encoded
"jwk": {
"kty": "RSA",
"n": le_util.b64_url_enc(n_bytes),
"e": le_util.b64_url_enc(e_bytes),
},
"sig": le_util.b64_url_enc(signature),
}
# return json.dumps(signature)
return signature
def leading_zeros(arg):
@ -151,6 +170,14 @@ def make_ss_cert(key_file, domains):
def get_cert_info(filename):
"""Get certificate info.
:param filename: Name of file containing certificate in PEM format.
:type filename: str
:rtype: dict
"""
# M2Crypto Library only supports RSA right now
cert = M2Crypto.X509.load_cert(filename)