Merge branch 'master' into revoker

Conflicts:
	letsencrypt/client/auth_handler.py
	letsencrypt/client/client.py
	letsencrypt/client/crypto_util.py
	letsencrypt/client/interfaces.py
	letsencrypt/client/le_util.py
	letsencrypt/client/revoker.py
	letsencrypt/client/standalone_authenticator.py
	letsencrypt/client/tests/apache/dvsni_test.py
	letsencrypt/client/tests/apache/obj_test.py
	letsencrypt/client/tests/recovery_token_test.py
	letsencrypt/client/tests/standalone_authenticator_test.py
	letsencrypt/scripts/main.py
This commit is contained in:
James Kasten 2015-02-18 03:36:53 -08:00
commit 04ecf813bd
67 changed files with 2297 additions and 849 deletions

View file

@ -19,7 +19,7 @@ persistent=yes
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
load-plugins=linter_plugin
[MESSAGES CONTROL]

View file

@ -1,4 +1,4 @@
include README.rst CHANGES.rst
include README.rst CHANGES.rst linter_plugin.py
recursive-include letsencrypt *.json
recursive-include letsencrypt *.sh
recursive-include letsencrypt *.conf

5
docs/api/acme/errors.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.errors`
------------------------------
.. automodule:: letsencrypt.acme.errors
:members:

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.interfaces`
----------------------------------
.. automodule:: letsencrypt.acme.interfaces
:members:

5
docs/api/acme/jose.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.jose`
----------------------------
.. automodule:: letsencrypt.acme.jose
:members:

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.messages`
--------------------------------
.. automodule:: letsencrypt.acme.messages
:members:

5
docs/api/acme/other.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.other`
-----------------------------
.. automodule:: letsencrypt.acme.other
:members:

5
docs/api/acme/util.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.util`
----------------------------
.. automodule:: letsencrypt.acme.util
:members:

View file

@ -1,5 +0,0 @@
:mod:`letsencrypt.client.acme`
------------------------------
.. automodule:: letsencrypt.client.acme
:members:

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.client.standalone_authenticator`
--------------------------------------------------
.. automodule:: letsencrypt.client.standalone_authenticator
:members:

View file

@ -24,7 +24,8 @@ Ubuntu
::
sudo apt-get install python python-setuptools python-virtualenv python-dev \
gcc swig dialog libaugeas0 libssl-dev ca-certificates
gcc swig dialog libaugeas0 libssl-dev libffi-dev \
ca-certificates
Mac OSX

View file

@ -0,0 +1 @@
"""ACME protocol implementation."""

View file

@ -0,0 +1,13 @@
"""ACME errors."""
class Error(Exception):
"""Generic ACME error."""
class ValidationError(Error):
"""ACME message validation error."""
class UnrecognizedMessageTypeError(ValidationError):
"""Unrecognized ACME message type error."""
class SchemaValidationError(ValidationError):
"""JSON schema ACME message validation error."""

View file

@ -0,0 +1,22 @@
"""ACME interfaces."""
import zope.interface
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
class IJSONSerializable(zope.interface.Interface):
# pylint: disable=too-few-public-methods
"""JSON serializable object."""
def to_json():
"""Prepare JSON serializable object.
:returns: JSON object ready to be serialized. Note, however, that
this might return other
:class:`letsencrypt.acme.interfaces.IJSONSerializable`
objects, that haven't been serialized yet, which is fine as
long as :func:`letsencrypt.acme.util.dump_ijsonserializable`
is used.
:rtype: dict
"""

100
letsencrypt/acme/jose.py Normal file
View file

@ -0,0 +1,100 @@
"""JOSE."""
import base64
import binascii
import Crypto.PublicKey.RSA
from letsencrypt.acme import util
def _leading_zeros(arg):
if len(arg) % 2:
return '0' + arg
return arg
class JWK(util.JSONDeSerializable, util.ImmutableMap):
# pylint: disable=too-few-public-methods
"""JSON Web Key.
.. todo:: Currently works for RSA public keys only.
"""
__slots__ = ('key',)
schema = util.load_schema('jwk')
@classmethod
def _encode_param(cls, param):
"""Encode numeric key parameter."""
return b64encode(binascii.unhexlify(
_leading_zeros(hex(param)[2:].rstrip('L'))))
@classmethod
def _decode_param(cls, param):
"""Decode numeric key parameter."""
return long(binascii.hexlify(b64decode(param)), 16)
def to_json(self):
"""Serialize to JSON."""
return {
'kty': 'RSA', # TODO
'n': self._encode_param(self.key.n),
'e': self._encode_param(self.key.e),
}
@classmethod
def _from_valid_json(cls, jobj):
assert 'RSA' == jobj['kty'] # TODO
return cls(key=Crypto.PublicKey.RSA.construct(
(cls._decode_param(jobj['n']), cls._decode_param(jobj['e']))))
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
#
# Jose Base64:
#
# - URL-safe Base64
#
# - padding stripped
def b64encode(data):
"""JOSE Base64 encode.
:param data: Data to be encoded.
:type data: str or bytearray
:returns: JOSE Base64 string.
:rtype: str
:raises TypeError: if `data` is of incorrect type
"""
if not isinstance(data, str):
raise TypeError('argument should be str or bytearray')
return base64.urlsafe_b64encode(data).rstrip('=')
def b64decode(data):
"""JOSE Base64 decode.
:param data: Base64 string to be decoded. If it's unicode, then
only ASCII characters are allowed.
:type data: str or unicode
:returns: Decoded data.
:raises TypeError: if input is of incorrect type
:raises ValueError: if input is unicode with non-ASCII characters
"""
if isinstance(data, unicode):
try:
data = data.encode('ascii')
except UnicodeEncodeError:
raise ValueError(
'unicode argument should contain only ASCII characters')
elif not isinstance(data, str):
raise TypeError('argument should be a str or unicode')
return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4)))

View file

@ -0,0 +1,120 @@
"""Tests for letsencrypt.acme.jose."""
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa512_key.pem'))
class JWKTest(unittest.TestCase):
"""Tests fro letsencrypt.acme.jose.JWK."""
def setUp(self):
from letsencrypt.acme.jose import JWK
self.jwk256 = JWK(key=RSA256_KEY.publickey())
self.jwk256json = {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
}
self.jwk512 = JWK(key=RSA512_KEY.publickey())
self.jwk512json = {
'kty': 'RSA',
'e': 'AQAB',
'n': '9LYRcVE3Nr-qleecEcX8JwVDnjeG1X7ucsCasuuZM0e09c'
'mYuUzxIkMjO_9x4AVcvXXRXPEV-LzWWkfkTlzRMw',
}
def test_equals(self):
self.assertEqual(self.jwk256, self.jwk256)
self.assertEqual(self.jwk512, self.jwk512)
def test_not_equals(self):
self.assertNotEqual(self.jwk256, self.jwk512)
self.assertNotEqual(self.jwk512, self.jwk256)
def test_to_json(self):
self.assertEqual(self.jwk256.to_json(), self.jwk256json)
self.assertEqual(self.jwk512.to_json(), self.jwk512json)
def test_from_json(self):
from letsencrypt.acme.jose import JWK
self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json))
# TODO: fix schemata to allow RSA512
#self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json))
# https://en.wikipedia.org/wiki/Base64#Examples
B64_PADDING_EXAMPLES = {
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
}
B64_URL_UNSAFE_EXAMPLES = {
chr(251) + chr(239): '--8',
chr(255) * 2: '__8',
}
class B64EncodeTest(unittest.TestCase):
"""Tests for letsencrypt.acme.jose.b64encode."""
@classmethod
def _call(cls, data):
from letsencrypt.acme.jose import b64encode
return b64encode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_different_paddings(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_unicode_fails_with_type_error(self):
self.assertRaises(TypeError, self._call, u'some unicode')
class B64DecodeTest(unittest.TestCase):
"""Tests for letsencrypt.acme.jose.b64decode."""
@classmethod
def _call(cls, data):
from letsencrypt.acme.jose import b64decode
return b64decode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_without_padding(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_with_padding(self):
for text, (b64, pad) in B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64 + pad), text)
def test_unicode_with_ascii(self):
self.assertEqual(self._call(u'YQ'), 'a')
def test_non_ascii_unicode_fails(self):
self.assertRaises(ValueError, self._call, u'\u0105')
def test_type_error_no_unicode_or_str(self):
self.assertRaises(TypeError, self._call, object())
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,500 @@
"""ACME protocol messages."""
import M2Crypto
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
from letsencrypt.acme import jose
from letsencrypt.acme import other
from letsencrypt.acme import util
class Message(util.JSONDeSerializable, util.ImmutableMap):
"""ACME message.
Messages are considered immutable.
"""
zope.interface.implements(interfaces.IJSONSerializable)
acme_type = NotImplemented
"""ACME message "type" field. Subclasses must override."""
TYPES = {}
"""Message types registered for JSON deserialization"""
@classmethod
def register(cls, msg_cls):
"""Register class for JSON deserialization."""
cls.TYPES[msg_cls.acme_type] = msg_cls
return msg_cls
def to_json(self):
"""Get JSON serializable object.
:returns: Serializable JSON object representing ACME message.
:meth:`validate` will almost certainly not work, due to reasons
explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`.
:rtype: dict
"""
jobj = self._fields_to_json()
jobj["type"] = self.acme_type
return jobj
def _fields_to_json(self):
"""Prepare ACME message fields for JSON serialiazation.
Subclasses must override this method.
:returns: Serializable JSON object containg all ACME message fields
apart from "type".
:rtype: dict
"""
raise NotImplementedError()
@classmethod
def get_msg_cls(cls, jobj):
"""Get the registered class for ``jobj``."""
if cls in cls.TYPES.itervalues():
# cls is already registered Message type, force to use it
# so that, e.g Revocation.from_json(jobj) fails if
# jobj["type"] != "revocation".
return cls
if not isinstance(jobj, dict):
raise errors.ValidationError(
"{0} is not a dictionary object".format(jobj))
try:
msg_type = jobj["type"]
except KeyError:
raise errors.ValidationError("missing type field")
try:
msg_cls = cls.TYPES[msg_type]
except KeyError:
raise errors.UnrecognizedMessageTypeError(msg_type)
return msg_cls
@classmethod
def from_json(cls, jobj, validate=True):
"""Deserialize validated ACME message from JSON string.
:param str jobj: JSON object.
:param bool validate: Validate against schema before deserializing.
Useful if :class:`JWK` is part of already validated json object.
:raises letsencrypt.acme.errors.ValidationError: if validation
was unsuccessful
:returns: Valid ACME message.
:rtype: subclass of :class:`Message`
"""
msg_cls = cls.get_msg_cls(jobj)
if validate:
msg_cls.validate_json(jobj)
# pylint: disable=protected-access
return msg_cls._from_valid_json(jobj)
@Message.register # pylint: disable=too-few-public-methods
class Challenge(Message):
"""ACME "challenge" message."""
acme_type = "challenge"
schema = util.load_schema(acme_type)
__slots__ = ("session_id", "nonce", "challenges", "combinations")
def _fields_to_json(self):
fields = {
"sessionID": self.session_id,
"nonce": jose.b64encode(self.nonce),
"challenges": self.challenges,
}
if self.combinations:
fields["combinations"] = self.combinations
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
challenges=jobj["challenges"],
combinations=jobj.get("combinations", []))
@Message.register # pylint: disable=too-few-public-methods
class ChallengeRequest(Message):
"""ACME "challengeRequest" message.
:ivar str identifier: Domain name.
"""
acme_type = "challengeRequest"
schema = util.load_schema(acme_type)
__slots__ = ("identifier",)
def _fields_to_json(self):
return {
"identifier": self.identifier,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(identifier=jobj["identifier"])
@Message.register # pylint: disable=too-few-public-methods
class Authorization(Message):
"""ACME "authorization" message."""
acme_type = "authorization"
schema = util.load_schema(acme_type)
__slots__ = ("recovery_token", "identifier", "jwk")
def _fields_to_json(self):
fields = {}
if self.recovery_token is not None:
fields["recoveryToken"] = self.recovery_token
if self.identifier is not None:
fields["identifier"] = self.identifier
if self.jwk is not None:
fields["jwk"] = self.jwk
return fields
@classmethod
def _from_valid_json(cls, jobj):
jwk = jobj.get("jwk")
if jwk is not None:
jwk = jose.JWK.from_json(jwk, validate=False)
return cls(recovery_token=jobj.get("recoveryToken"),
identifier=jobj.get("identifier"), jwk=jwk)
@Message.register
class AuthorizationRequest(Message):
"""ACME "authorizationRequest" message.
:ivar str session_id: "sessionID" from the server challenge
:ivar str nonce: Nonce from the server challenge
:ivar list responses: List of completed challenges
:ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`).
:ivar contact: TODO
"""
acme_type = "authorizationRequest"
schema = util.load_schema(acme_type)
__slots__ = ("session_id", "nonce", "responses", "signature", "contact")
@classmethod
def create(cls, name, key, sig_nonce=None, **kwargs):
"""Create signed "authorizationRequest".
:param str name: Hostname
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "authorizationRequest" ACME message.
:rtype: :class:`AuthorizationRequest`
"""
# pylint: disable=too-many-arguments
signature = other.Signature.from_msg(
name + kwargs["nonce"], key, sig_nonce)
return cls(
signature=signature, contact=kwargs.pop("contact", []), **kwargs)
def verify(self, name):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object
is the correct key for a given context.
:param str name: Hostname
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
return self.signature.verify(name + self.nonce)
def _fields_to_json(self):
fields = {
"sessionID": self.session_id,
"nonce": jose.b64encode(self.nonce),
"responses": self.responses,
"signature": self.signature,
}
if self.contact:
fields["contact"] = self.contact
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
responses=jobj["responses"],
signature=other.Signature.from_json(
jobj["signature"], validate=False),
contact=jobj.get("contact", []))
@Message.register # pylint: disable=too-few-public-methods
class Certificate(Message):
"""ACME "certificate" message.
:ivar certificate: The certificate (:class:`M2Crypto.X509.X509`
wrapped in :class:`letsencrypt.acme.util.ComparableX509`).
:ivar list chain: Chain of certificates (:class:`M2Crypto.X509.X509`
wrapped in :class:`letsencrypt.acme.util.ComparableX509` ).
"""
acme_type = "certificate"
schema = util.load_schema(acme_type)
__slots__ = ("certificate", "chain", "refresh")
def _fields_to_json(self):
fields = {"certificate": self._encode_cert(self.certificate)}
if self.chain:
fields["chain"] = [self._encode_cert(cert) for cert in self.chain]
if self.refresh is not None:
fields["refresh"] = self.refresh
return fields
@classmethod
def _decode_cert(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_cert(cls, cert):
return jose.b64encode(cert.as_der())
@classmethod
def _from_valid_json(cls, jobj):
return cls(certificate=cls._decode_cert(jobj["certificate"]),
chain=[cls._decode_cert(cert) for cert in
jobj.get("chain", [])],
refresh=jobj.get("refresh"))
@Message.register
class CertificateRequest(Message):
"""ACME "certificateRequest" message.
:ivar csr: Certificate Signing Request (:class:`M2Crypto.X509.Request`
wrapped in :class:`letsencrypt.acme.util.ComparableX509`.
:ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`).
"""
acme_type = "certificateRequest"
schema = util.load_schema(acme_type)
__slots__ = ("csr", "signature")
@classmethod
def create(cls, key, sig_nonce=None, **kwargs):
"""Create signed "certificateRequest".
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "certificateRequest" ACME message.
:rtype: :class:`CertificateRequest`
"""
return cls(signature=other.Signature.from_msg(
kwargs["csr"].as_der(), key, sig_nonce), **kwargs)
def verify(self):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object
is the correct key for a given context.
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
return self.signature.verify(self.csr.as_der())
@classmethod
def _decode_csr(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_request_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_csr(cls, csr):
return jose.b64encode(csr.as_der())
def _fields_to_json(self):
return {
"csr": self._encode_csr(self.csr),
"signature": self.signature,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(csr=cls._decode_csr(jobj["csr"]),
signature=other.Signature.from_json(
jobj["signature"], validate=False))
@Message.register # pylint: disable=too-few-public-methods
class Defer(Message):
"""ACME "defer" message."""
acme_type = "defer"
schema = util.load_schema(acme_type)
__slots__ = ("token", "interval", "message")
def _fields_to_json(self):
fields = {"token": self.token}
if self.interval is not None:
fields["interval"] = self.interval
if self.message is not None:
fields["message"] = self.message
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(token=jobj["token"], interval=jobj.get("interval"),
message=jobj.get("message"))
@Message.register # pylint: disable=too-few-public-methods
class Error(Message):
"""ACME "error" message."""
acme_type = "error"
schema = util.load_schema(acme_type)
__slots__ = ("error", "message", "more_info")
CODES = {
"malformed": "The request message was malformed",
"unauthorized": "The client lacks sufficient authorization",
"serverInternal": "The server experienced an internal error",
"notSupported": "The request type is not supported",
"unknown": "The server does not recognize an ID/token in the request",
"badCSR": "The CSR is unacceptable (e.g., due to a short key)",
}
def _fields_to_json(self):
fields = {"error": self.error}
if self.message is not None:
fields["message"] = self.message
if self.more_info is not None:
fields["moreInfo"] = self.more_info
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(error=jobj["error"], message=jobj.get("message"),
more_info=jobj.get("moreInfo"))
@Message.register # pylint: disable=too-few-public-methods
class Revocation(Message):
"""ACME "revocation" message."""
acme_type = "revocation"
schema = util.load_schema(acme_type)
__slots__ = ()
def _fields_to_json(self):
return {}
@classmethod
def _from_valid_json(cls, jobj):
return cls()
@Message.register
class RevocationRequest(Message):
"""ACME "revocationRequest" message.
:ivar certificate: Certificate (:class:`M2Crypto.X509.X509`
wrapped in :class:`letsencrypt.acme.util.ComparableX509`).
:ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`).
"""
acme_type = "revocationRequest"
schema = util.load_schema(acme_type)
__slots__ = ("certificate", "signature")
@classmethod
def create(cls, key, sig_nonce=None, **kwargs):
"""Create signed "revocationRequest".
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "revocationRequest" ACME message.
:rtype: :class:`RevocationRequest`
"""
return cls(signature=other.Signature.from_msg(
kwargs["certificate"].as_der(), key, sig_nonce), **kwargs)
def verify(self):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object
is the correct key for a given context.
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
return self.signature.verify(self.certificate.as_der())
@classmethod
def _decode_cert(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_cert(cls, cert):
return jose.b64encode(cert.as_der())
def _fields_to_json(self):
return {
"certificate": self._encode_cert(self.certificate),
"signature": self.signature,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(certificate=cls._decode_cert(jobj["certificate"]),
signature=other.Signature.from_json(
jobj["signature"], validate=False))
@Message.register # pylint: disable=too-few-public-methods
class StatusRequest(Message):
"""ACME "statusRequest" message.
:ivar unicode token: Token provided in ACME "defer" message.
"""
acme_type = "statusRequest"
schema = util.load_schema(acme_type)
__slots__ = ("token",)
def _fields_to_json(self):
return {"token": self.token}
@classmethod
def _from_valid_json(cls, jobj):
return cls(token=jobj["token"])

View file

@ -0,0 +1,484 @@
"""Tests for letsencrypt.acme.messages."""
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
import M2Crypto.X509
import mock
from letsencrypt.acme import errors
from letsencrypt.acme import jose
from letsencrypt.acme import other
from letsencrypt.acme import util
KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
CERT = util.ComparableX509(M2Crypto.X509.load_cert(
pkg_resources.resource_filename(
'letsencrypt.client.tests', 'testdata/cert.pem')))
CSR = util.ComparableX509(M2Crypto.X509.load_request(
pkg_resources.resource_filename(
'letsencrypt.client.tests', 'testdata/csr.pem')))
class MessageTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages.Message."""
def setUp(self):
# pylint: disable=missing-docstring,too-few-public-methods
from letsencrypt.acme.messages import Message
class TestMessage(Message):
acme_type = 'test'
schema = {
'type': 'object',
'properties': {
'price': {'type': 'number'},
'name': {'type': 'string'},
},
}
@classmethod
def _from_valid_json(cls, jobj):
return jobj
def _fields_to_json(self):
return {'foo': 'bar'}
self.msg_cls = TestMessage
def test_to_json(self):
self.assertEqual(self.msg_cls().to_json(), {
'type': 'test',
'foo': 'bar',
})
def test_fields_to_json_not_implemented(self):
from letsencrypt.acme.messages import Message
# pylint: disable=protected-access
self.assertRaises(NotImplementedError, Message()._fields_to_json)
@classmethod
def _from_json(cls, jobj, validate=True):
from letsencrypt.acme.messages import Message
return Message.from_json(jobj, validate)
def test_from_json_non_dict_fails(self):
self.assertRaises(errors.ValidationError, self._from_json, [])
def test_from_json_dict_no_type_fails(self):
self.assertRaises(errors.ValidationError, self._from_json, {})
def test_from_json_unknown_type_fails(self):
self.assertRaises(errors.UnrecognizedMessageTypeError,
self._from_json, {'type': 'bar'})
@mock.patch('letsencrypt.acme.messages.Message.TYPES')
def test_from_json_validate_errors(self, types):
types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x]
self.assertRaises(errors.SchemaValidationError,
self._from_json, {'type': 'foo', 'price': 'asd'})
@mock.patch('letsencrypt.acme.messages.Message.TYPES')
def test_from_json_valid_returns_cls(self, types):
types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x]
self.assertEqual(self._from_json({'type': 'foo'}, validate=False),
{'type': 'foo'})
class ChallengeTest(unittest.TestCase):
def setUp(self):
challenges = [
{'type': 'simpleHttps', 'token': 'IlirfxKKXAsHtmzK29Pj8A'},
{'type': 'dns', 'token': 'DGyRejmCefe7v4NfDGDKfA'},
{'type': 'recoveryToken'},
]
combinations = [[0, 2], [1, 2]]
from letsencrypt.acme.messages import Challenge
self.msg = Challenge(
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
challenges=challenges, combinations=combinations)
self.jmsg = {
'type': 'challenge',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'challenges': challenges,
'combinations': combinations,
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Challenge
self.assertEqual(Challenge.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['combinations']
from letsencrypt.acme.messages import Challenge
msg = Challenge.from_json(self.jmsg)
self.assertEqual(msg.combinations, [])
self.assertEqual(msg.to_json(), self.jmsg)
class ChallengeRequestTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import ChallengeRequest
self.msg = ChallengeRequest(identifier='example.com')
self.jmsg = {
'type': 'challengeRequest',
'identifier': 'example.com',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import ChallengeRequest
self.assertEqual(ChallengeRequest.from_json(self.jmsg), self.msg)
class AuthorizationTest(unittest.TestCase):
def setUp(self):
jwk = jose.JWK(key=KEY.publickey())
from letsencrypt.acme.messages import Authorization
self.msg = Authorization(recovery_token='tok', jwk=jwk,
identifier='example.com')
self.jmsg = {
'type': 'authorization',
'recoveryToken': 'tok',
'identifier': 'example.com',
'jwk': jwk,
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
self.jmsg['jwk'] = self.jmsg['jwk'].to_json()
from letsencrypt.acme.messages import Authorization
self.assertEqual(Authorization.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['recoveryToken']
del self.jmsg['identifier']
del self.jmsg['jwk']
from letsencrypt.acme.messages import Authorization
msg = Authorization.from_json(self.jmsg)
self.assertTrue(msg.recovery_token is None)
self.assertTrue(msg.identifier is None)
self.assertTrue(msg.jwk is None)
self.assertEqual(self.jmsg, msg.to_json())
class AuthorizationRequestTest(unittest.TestCase):
def setUp(self):
self.responses = [
{'type': 'simpleHttps', 'path': 'Hf5GrX4Q7EBax9hc2jJnfw'},
None, # null
{'type': 'recoveryToken', 'token': '23029d88d9e123e'},
]
self.contact = ["mailto:cert-admin@example.com", "tel:+12025551212"]
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe'
'\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v'
'\xe4\xed\xe8\x03J\xe8\xc8<?\xc8W\x94\x94cj(\xe7\xaa$'
'\x92\xe9\x96\x11\xc2\xefx\x0bR',
nonce='\xab?\x08o\xe6\x81$\x9f\xa1\xc9\x025\x1c\x1b\xa5+')
from letsencrypt.acme.messages import AuthorizationRequest
self.msg = AuthorizationRequest(
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
responses=self.responses,
signature=signature,
contact=self.contact,
)
self.jmsg_to = {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'responses': self.responses,
'signature': signature,
'contact': self.contact,
}
self.jmsg_from = {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'responses': self.responses,
'signature': signature.to_json(),
'contact': self.contact,
}
self.jmsg_from['signature']['jwk'] = self.jmsg_from[
'signature']['jwk'].to_json()
def test_create(self):
from letsencrypt.acme.messages import AuthorizationRequest
self.assertEqual(self.msg, AuthorizationRequest.create(
name='example.com', key=KEY, responses=self.responses,
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
sig_nonce='\xab?\x08o\xe6\x81$\x9f\xa1\xc9\x025\x1c\x1b\xa5+',
contact=self.contact))
def test_verify(self):
self.assertTrue(self.msg.verify('example.com'))
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg_to)
def test_from_json(self):
from letsencrypt.acme.messages import AuthorizationRequest
self.assertEqual(
self.msg, AuthorizationRequest.from_json(self.jmsg_from))
def test_json_without_optionals(self):
del self.jmsg_from['contact']
del self.jmsg_to['contact']
from letsencrypt.acme.messages import AuthorizationRequest
msg = AuthorizationRequest.from_json(self.jmsg_from)
self.assertEqual(msg.contact, [])
self.assertEqual(self.jmsg_to, msg.to_json())
class CertificateTest(unittest.TestCase):
def setUp(self):
refresh = 'https://example.com/refresh/Dr8eAwTVQfSS/'
from letsencrypt.acme.messages import Certificate
self.msg = Certificate(
certificate=CERT, chain=[CERT], refresh=refresh)
self.jmsg = {
'type': 'certificate',
'certificate': jose.b64encode(CERT.as_der()),
'chain': [jose.b64encode(CERT.as_der())],
'refresh': refresh,
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Certificate
self.assertEqual(Certificate.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['chain']
del self.jmsg['refresh']
from letsencrypt.acme.messages import Certificate
msg = Certificate.from_json(self.jmsg)
self.assertEqual(msg.chain, [])
self.assertTrue(msg.refresh is None)
self.assertEqual(self.jmsg, msg.to_json())
class CertificateRequestTest(unittest.TestCase):
def setUp(self):
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
sig='\x15\xed\x84\xaa:\xf2DO\x0e9 \xbcg\xf8\xc0\xcf\x87\x9a'
'\x95\xeb\xffT[\x84[\xec\x85\x7f\x8eK\xe9\xc2\x12\xc8Q'
'\xafo\xc6h\x07\xba\xa6\xdf\xd1\xa7"$\xba=Z\x13n\x14\x0b'
'k\xfe\xee\xb4\xe4\xc8\x05\x9a\x08\xa7',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9')
from letsencrypt.acme.messages import CertificateRequest
self.msg = CertificateRequest(csr=CSR, signature=signature)
self.jmsg = {
'type': 'certificateRequest',
'csr': jose.b64encode(CSR.as_der()),
'signature': signature,
}
def test_create(self):
from letsencrypt.acme.messages import CertificateRequest
self.assertEqual(self.msg, CertificateRequest.create(
csr=CSR, key=KEY,
sig_nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'))
def test_verify(self):
self.assertTrue(self.msg.verify())
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import CertificateRequest
self.jmsg['signature'] = self.jmsg['signature'].to_json()
self.jmsg['signature']['jwk'] = self.jmsg['signature']['jwk'].to_json()
self.assertEqual(self.msg, CertificateRequest.from_json(self.jmsg))
class DeferTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import Defer
self.msg = Defer(
token='O7-s9MNq1siZHlgrMzi9_A', interval=60,
message='Warming up the HSM')
self.jmsg = {
'type': 'defer',
'token': 'O7-s9MNq1siZHlgrMzi9_A',
'interval': 60,
'message': 'Warming up the HSM',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Defer
self.assertEqual(Defer.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['interval']
del self.jmsg['message']
from letsencrypt.acme.messages import Defer
msg = Defer.from_json(self.jmsg)
self.assertTrue(msg.interval is None)
self.assertTrue(msg.message is None)
self.assertEqual(self.jmsg, msg.to_json())
class ErrorTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import Error
self.msg = Error(
error='badCSR', message='RSA keys must be at least 2048 bits long',
more_info='https://ca.example.com/documentation/csr-requirements')
self.jmsg = {
'type': 'error',
'error': 'badCSR',
'message':'RSA keys must be at least 2048 bits long',
'moreInfo': 'https://ca.example.com/documentation/csr-requirements',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Error
self.assertEqual(Error.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['message']
del self.jmsg['moreInfo']
from letsencrypt.acme.messages import Error
msg = Error.from_json(self.jmsg)
self.assertTrue(msg.message is None)
self.assertTrue(msg.more_info is None)
self.assertEqual(self.jmsg, msg.to_json())
class RevocationTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import Revocation
self.msg = Revocation()
self.jmsg = {
'type': 'revocation',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Revocation
self.assertEqual(Revocation.from_json(self.jmsg), self.msg)
class RevocationRequestTest(unittest.TestCase):
def setUp(self):
self.sig_nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
sig='eJ\xfe\x12"U\x87\x8b\xbf/ ,\xdeP\xb2\xdc1\xb00\xe5\x1dB'
'\xfch<\xc6\x9eH@!\x1c\x16\xb2\x0b_\xc4\xddP\x89\xc8\xce?'
'\x16g\x069I\xb9\xb3\x91\xb9\x0e$3\x9f\x87\x8e\x82\xca\xc5'
's\xd9\xd0\xe7',
nonce=self.sig_nonce)
from letsencrypt.acme.messages import RevocationRequest
self.msg = RevocationRequest(certificate=CERT, signature=signature)
self.jmsg = {
'type': 'revocationRequest',
'certificate': jose.b64encode(CERT.as_der()),
'signature': signature,
}
def test_create(self):
from letsencrypt.acme.messages import RevocationRequest
self.assertEqual(self.msg, RevocationRequest.create(
certificate=CERT, key=KEY, sig_nonce=self.sig_nonce))
def test_verify(self):
self.assertTrue(self.msg.verify())
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
self.jmsg['signature'] = self.jmsg['signature'].to_json()
self.jmsg['signature']['jwk'] = self.jmsg['signature']['jwk'].to_json()
from letsencrypt.acme.messages import RevocationRequest
self.assertEqual(self.msg, RevocationRequest.from_json(self.jmsg))
class StatusRequestTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import StatusRequest
self.msg = StatusRequest(token=u'O7-s9MNq1siZHlgrMzi9_A')
self.jmsg = {
'type': 'statusRequest',
'token': u'O7-s9MNq1siZHlgrMzi9_A',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import StatusRequest
self.assertEqual(StatusRequest.from_json(self.jmsg), self.msg)
if __name__ == '__main__':
unittest.main()

83
letsencrypt/acme/other.py Normal file
View file

@ -0,0 +1,83 @@
"""JSON objects in ACME protocol other than messages."""
import logging
from Crypto import Random
import Crypto.Hash.SHA256
import Crypto.Signature.PKCS1_v1_5
from letsencrypt.acme import jose
from letsencrypt.acme import util
class Signature(util.JSONDeSerializable, util.ImmutableMap):
"""ACME signature.
:ivar str alg: Signature algorithm.
:ivar str sig: Signature.
:ivar str nonce: Nonce.
:ivar jwk: JWK.
:type jwk: :class:`letsencrypt.acme.jose.JWK`
.. todo:: Currently works for RSA keys only.
"""
__slots__ = ('alg', 'sig', 'nonce', 'jwk')
schema = util.load_schema('signature')
NONCE_LEN = 16
"""Size of nonce in bytes, as specified in the ACME protocol."""
@classmethod
def from_msg(cls, msg, key, nonce=None):
"""Create signature with nonce prepended to the message.
.. todo:: Protect against crypto unicode errors... is this sufficient?
Do I need to escape?
:param str msg: Message to be signed.
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param nonce: Nonce to be used. If None, nonce of
:const:`NONCE_LEN` size will be randomly generated.
:type nonce: str or None
"""
if nonce is None:
nonce = Random.get_random_bytes(cls.NONCE_LEN)
msg_with_nonce = nonce + msg
hashed = Crypto.Hash.SHA256.new(msg_with_nonce)
sig = Crypto.Signature.PKCS1_v1_5.new(key).sign(hashed)
logging.debug('%s signed as %s', msg_with_nonce, sig)
return cls(alg='RS256', sig=sig, nonce=nonce,
jwk=jose.JWK(key=key.publickey()))
def verify(self, msg):
"""Verify the signature.
:param str msg: Message that was used in signing.
"""
hashed = Crypto.Hash.SHA256.new(self.nonce + msg)
return Crypto.Signature.PKCS1_v1_5.new(self.jwk.key).verify(
hashed, self.sig)
def to_json(self):
"""Prepare JSON serializable object."""
return {
'alg': self.alg,
'sig': jose.b64encode(self.sig),
'nonce': jose.b64encode(self.nonce),
'jwk': self.jwk,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(alg=jobj['alg'], sig=jose.b64decode(jobj['sig']),
nonce=jose.b64decode(jobj['nonce']),
jwk=jose.JWK.from_json(jobj['jwk'], validate=False))

View file

@ -0,0 +1,87 @@
"""Tests for letsencrypt.acme.sig."""
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
from letsencrypt.acme import jose
RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
class SigatureTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
"""Tests for letsencrypt.acme.sig.Signature."""
def setUp(self):
self.msg = 'message'
self.alg = 'RS256'
self.sig = ('IC\xd8*\xe7\x14\x9e\x19S\xb7\xcf\xec3\x12\xe2\x8a\x03'
'\x98u\xff\xf0\x94\xe2\xd7<\x8f\xa8\xed\xa4KN\xc3\xaa'
'\xb9X\xc3w\xaa\xc0_\xd0\x05$y>l#\x10<\x96\xd2\xcdr\xa3'
'\x1b\xa1\xf5!f\xef\xc64\xb6\x13')
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.jwk = jose.JWK(key=RSA256_KEY.publickey())
b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r'
'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew')
b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ'
self.jsig_to = {
'nonce': b64nonce,
'alg': self.alg,
'jwk': self.jwk,
'sig': b64sig,
}
self.jsig_from = {
'nonce': b64nonce,
'alg': self.alg,
'jwk': self.jwk.to_json(),
'sig': b64sig,
}
from letsencrypt.acme.other import Signature
self.signature = Signature(
alg=self.alg, sig=self.sig, nonce=self.nonce, jwk=self.jwk)
def test_attributes(self):
self.assertEqual(self.signature.nonce, self.nonce)
self.assertEqual(self.signature.alg, self.alg)
self.assertEqual(self.signature.sig, self.sig)
self.assertEqual(self.signature.jwk, self.jwk)
def test_verify_good_succeeds(self):
self.assertTrue(self.signature.verify(self.msg))
def test_verify_bad_fails(self):
self.assertFalse(self.signature.verify(self.msg + 'x'))
@classmethod
def _from_msg(cls, *args, **kwargs):
from letsencrypt.acme.other import Signature
return Signature.from_msg(*args, **kwargs)
def test_create_from_msg(self):
signature = self._from_msg(self.msg, RSA256_KEY, self.nonce)
self.assertEqual(self.signature, signature)
def test_create_from_msg_random_nonce(self):
signature = self._from_msg(self.msg, RSA256_KEY)
self.assertEqual(signature.alg, self.alg)
self.assertEqual(signature.jwk, self.jwk)
self.assertTrue(signature.verify(self.msg))
def test_to_json(self):
self.assertEqual(self.signature.to_json(), self.jsig_to)
def test_from_json(self):
from letsencrypt.acme.other import Signature
# pylint: disable=protected-access
self.assertEqual(
self.signature, Signature._from_valid_json(self.jsig_from))
if __name__ == '__main__':
unittest.main()

View file

@ -15,7 +15,7 @@
"type": "string"
},
"jwk": {
"$ref": "file:letsencrypt/client/schemata/jwk.json"
"$ref": "file:letsencrypt/acme/schemata/jwk.json"
}
}
}

View file

@ -15,14 +15,14 @@
"type": "string"
},
"signature" : {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
},
"responses": {
"type": "array",
"minItems": 1,
"items": {
"anyOf": [
{ "$ref": "file:letsencrypt/client/schemata/responseobject.json" },
{ "$ref": "file:letsencrypt/acme/schemata/responseobject.json" },
{ "type": "null" }
]
}

View file

@ -13,7 +13,7 @@
"pattern": "^[-_=0-9A-Za-z]+$"
},
"signature" : {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
}
}
}

View file

@ -18,7 +18,7 @@
"type": "array",
"minItems": 1,
"items": {
"$ref": "file:letsencrypt/client/schemata/challengeobject.json"
"$ref": "file:letsencrypt/acme/schemata/challengeobject.json"
}
},
"combinations": {

View file

@ -59,7 +59,7 @@
"pattern": "^[-_=0-9A-Za-z]+$"
},
"signature": {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
}
}
},

View file

@ -12,7 +12,7 @@
"type" : "string"
},
"signature" : {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
}
}
}

147
letsencrypt/acme/util.py Normal file
View file

@ -0,0 +1,147 @@
"""ACME utilities."""
import json
import pkg_resources
import jsonschema
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
class ComparableX509(object): # pylint: disable=too-few-public-methods
"""Wrapper for M2Crypto.X509.* objects that supports __eq__.
Wraps around:
- :class:`M2Crypto.X509.X509`
- :class:`M2Crypto.X509.Request`
"""
def __init__(self, wrapped):
self._wrapped = wrapped
def __getattr__(self, name):
return getattr(self._wrapped, name)
def __eq__(self, other):
return self.as_der() == other.as_der()
def load_schema(name):
"""Load JSON schema from distribution."""
return json.load(open(pkg_resources.resource_filename(
__name__, "schemata/%s.json" % name)))
class JSONDeSerializable(object):
"""JSON (de)serializable object."""
zope.interface.implements(interfaces.IJSONSerializable)
schema = NotImplemented
@classmethod
def validate_json(cls, jobj):
"""Validate JSON object against schema.
:raises letsencrypt.acme.errors.SchemaValidationError: if object
couldn't be validated.
"""
try:
jsonschema.validate(jobj, cls.schema)
except jsonschema.ValidationError as error:
raise errors.SchemaValidationError(error)
@classmethod
def from_json(cls, jobj, validate=True):
"""Deserialize from JSON.
Note that the input ``jobj`` has not been sanitized in any way.
:param jobj: JSON object.
:param bool validate: Validate against schema before deserializing.
Useful if :class:`JWK` is part of already validated json object.
:raises letsencrypt.acme.errors.SchemaValidationError: if ``validate``
was ``True`` and object couldn't be validated.
:returns: instance of the class
"""
if validate:
cls.validate_json(jobj)
return cls._from_valid_json(jobj)
@classmethod
def _from_valid_json(cls, jobj):
"""Deserializa from valid JSON object.
:param jobj: JSON object that has been validated against schema.
"""
raise NotImplementedError()
@classmethod
def json_loads(cls, json_string, validate=True):
"""Load JSON string."""
return cls.from_json(json.loads(json_string), validate)
def to_json(self):
"""Prepare JSON serializable object."""
raise NotImplementedError()
def json_dumps(self):
"""Dump to JSON string using proper serializer.
:returns: JSON serialized string.
:rtype: str
"""
return json.dumps(self, default=dump_ijsonserializable)
def dump_ijsonserializable(python_object):
"""Serialize IJSONSerializable to JSON.
This is meant to be passed to :func:`json.dumps` as ``default``
argument.
"""
# providedBy | pylint: disable=no-member
if interfaces.IJSONSerializable.providedBy(python_object):
return python_object.to_json()
else:
raise TypeError(repr(python_object) + ' is not JSON serializable')
class ImmutableMap(object): # pylint: disable=too-few-public-methods
"""Immutable key to value mapping with attribute access."""
__slots__ = ()
"""Must be overriden in subclasses."""
def __init__(self, **kwargs):
if set(kwargs) != set(self.__slots__):
raise TypeError(
'__init__() takes exactly the following arguments: {0} '
'({1} given)'.format(', '.join(self.__slots__),
', '.join(kwargs) if kwargs else 'none'))
for slot in self.__slots__:
object.__setattr__(self, slot, kwargs.pop(slot))
def __setattr__(self, name, value):
raise AttributeError("can't set attribute")
def __eq__(self, other):
return isinstance(other, self.__class__) and all(
getattr(self, slot) == getattr(other, slot)
for slot in self.__slots__)
def __hash__(self):
return hash(tuple(getattr(self, slot) for slot in self.__slots__))
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, ', '.join(
'{0}={1!r}'.format(slot, getattr(self, slot))
for slot in self.__slots__))

View file

@ -0,0 +1,167 @@
"""Tests for letsencrypt.acme.util."""
import functools
import json
import unittest
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
class MockJSONSerialiazable(object):
# pylint: disable=missing-docstring,too-few-public-methods,no-self-use
zope.interface.implements(interfaces.IJSONSerializable)
def to_json(self):
return [3, 2, 1]
class JSONDeSerializableTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.JSONDeSerializable."""
def setUp(self):
from letsencrypt.acme.util import JSONDeSerializable
class Tester(JSONDeSerializable):
# pylint: disable=missing-docstring,no-self-use,
# pylint: disable=too-few-public-methods
zope.interface.implements(interfaces.IJSONSerializable)
schema = {'type': 'integer'}
def __init__(self, jobj):
self.jobj = jobj
@classmethod
def _from_valid_json(cls, jobj):
return cls(jobj)
def to_json(self):
return {'foo': MockJSONSerialiazable()}
self.tester_cls = Tester
def test_validate_invalid_json(self):
self.assertRaises(errors.SchemaValidationError,
self.tester_cls.validate_json, 'bang!')
def test_validate_valid_json(self):
self.tester_cls.validate_json(5)
def test_from_json(self):
self.assertEqual(5, self.tester_cls.from_json(5, validate=True).jobj)
def test_from_json_no_validation(self):
self.assertEqual(['1', 2], self.tester_cls.from_json(
['1', 2], validate=False).jobj)
def test_from_valid_json_raises_error(self):
from letsencrypt.acme.util import JSONDeSerializable
# pylint: disable=protected-access
self.assertRaises(
NotImplementedError, JSONDeSerializable._from_valid_json, 'foo')
def test_json_loads(self):
tester = self.tester_cls.json_loads('5', validate=True)
self.assertEqual(tester.jobj, 5)
def test_json_loads_no_validation(self):
self.assertEqual(
'foo', self.tester_cls.json_loads('"foo"', validate=False).jobj)
def test_to_json_raises_error(self):
from letsencrypt.acme.util import JSONDeSerializable
self.assertRaises(NotImplementedError, JSONDeSerializable().to_json)
def test_json_dumps(self):
self.assertEqual(
self.tester_cls('foo').json_dumps(), '{"foo": [3, 2, 1]}')
class DumpIJSONSerializableTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.dump_ijsonserializable."""
@classmethod
def _call(cls, obj):
from letsencrypt.acme.util import dump_ijsonserializable
return json.dumps(obj, default=dump_ijsonserializable)
def test_json_type(self):
self.assertEqual('5', self._call(5))
def test_ijsonserializable(self):
self.assertEqual('[3, 2, 1]', self._call(MockJSONSerialiazable()))
def test_raises_type_error(self):
self.assertRaises(TypeError, self._call, object())
class ImmutableMapTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.ImmutableMap."""
def setUp(self):
# pylint: disable=invalid-name,too-few-public-methods
# pylint: disable=missing-docstring
from letsencrypt.acme.util import ImmutableMap
class A(ImmutableMap):
__slots__ = ('x', 'y')
class B(ImmutableMap):
__slots__ = ('x', 'y')
self.A = A
self.B = B
self.a1 = self.A(x=1, y=2)
self.a1_swap = self.A(y=2, x=1)
self.a2 = self.A(x=3, y=4)
self.b = self.B(x=1, y=2)
def test_order_of_args_does_not_matter(self):
self.assertEqual(self.a1, self.a1_swap)
def test_type_error_on_missing(self):
self.assertRaises(TypeError, self.A, x=1)
self.assertRaises(TypeError, self.A, y=2)
def test_type_error_on_unrecognized(self):
self.assertRaises(TypeError, self.A, x=1, z=2)
self.assertRaises(TypeError, self.A, x=1, y=2, z=3)
def test_get_attr(self):
self.assertEqual(1, self.a1.x)
self.assertEqual(2, self.a1.y)
self.assertEqual(1, self.a1_swap.x)
self.assertEqual(2, self.a1_swap.y)
def test_set_attr_raises_attribute_error(self):
self.assertRaises(
AttributeError, functools.partial(self.a1.__setattr__, 'x'), 10)
def test_equal(self):
self.assertEqual(self.a1, self.a1)
self.assertEqual(self.a2, self.a2)
self.assertNotEqual(self.a1, self.a2)
def test_same_slots_diff_cls_not_equal(self):
self.assertEqual(self.a1.x, self.b.x)
self.assertEqual(self.a1.y, self.b.y)
self.assertNotEqual(self.a1, self.b)
def test_hash(self):
self.assertEqual(hash((1, 2)), hash(self.a1))
def test_unhashable(self):
self.assertRaises(TypeError, self.A(x=1, y={}).__hash__)
def test_repr(self):
self.assertEqual('A(x=1, y=2)', repr(self.a1))
self.assertEqual('A(x=1, y=2)', repr(self.a1_swap))
self.assertEqual('B(x=1, y=2)', repr(self.b))
self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar')))
if __name__ == '__main__':
unittest.main()

View file

@ -1,155 +0,0 @@
"""ACME protocol messages."""
import json
import pkg_resources
import jsonschema
from letsencrypt.client import crypto_util
from letsencrypt.client import le_util
SCHEMATA = dict([
(schema, json.load(open(pkg_resources.resource_filename(
__name__, "schemata/%s.json" % schema)))) for schema in [
"authorization",
"authorizationRequest",
"certificate",
"certificateRequest",
"challenge",
"challengeRequest",
"defer",
"error",
"revocation",
"revocationRequest",
"statusRequest"
]
])
def acme_object_validate(json_string, schemata=None):
"""Validate a JSON string against the ACME protocol using JSON Schema.
:param str json_string: Well-formed input JSON string.
:param dict schemata: Mapping from type name to JSON Schema
definition. Useful for testing.
:returns: None if validation was successful.
:raises jsonschema.ValidationError: if validation was unsuccessful
:raises ValueError: if the object cannot even be parsed as valid JSON
"""
schemata = SCHEMATA if schemata is None else schemata
json_object = json.loads(json_string)
if not isinstance(json_object, dict):
raise jsonschema.ValidationError("this is not a dictionary object")
if "type" not in json_object:
raise jsonschema.ValidationError("missing type field")
if json_object["type"] not in schemata:
raise jsonschema.ValidationError(
"unknown type %s" % json_object["type"])
jsonschema.validate(json_object, schemata[json_object["type"]])
def pretty(json_string):
"""Return a pretty-printed version of any JSON string.
Useful when printing out protocol messages for debugging purposes.
"""
return json.dumps(json.loads(json_string), indent=4)
def challenge_request(name):
"""Create ACME "challengeRequest message.
:param str name: Domain name
:returns: ACME "challengeRequest" message.
:rtype: dict
"""
return {
"type": "challengeRequest",
"identifier": name,
}
def authorization_request(req_id, name, server_nonce, responses, key,
nonce=None):
"""Create ACME "authorizationRequest" message.
:param str req_id: SessionID from the server challenge
:param str name: Hostname
:param str server_nonce: Nonce from the server challenge
:param list responses: List of completed challenges
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "authorizationRequest" message.
:rtype: dict
"""
return {
"type": "authorizationRequest",
"sessionID": req_id,
"nonce": server_nonce,
"responses": responses,
"signature": crypto_util.create_sig(
name + le_util.jose_b64decode(server_nonce), key, nonce),
}
def certificate_request(csr_der, key, nonce=None):
"""Create ACME "certificateRequest" message.
:param str csr_der: DER encoded CSR.
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "certificateRequest" message.
:rtype: dict
"""
return {
"type": "certificateRequest",
"csr": le_util.jose_b64encode(csr_der),
"signature": crypto_util.create_sig(csr_der, key, nonce),
}
def revocation_request(cert_der, key, nonce=None):
"""Create ACME "revocationRequest" message.
:param str cert_der: DER encoded certificate.
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "revocationRequest" message.
:rtype: dict
"""
return {
"type": "revocationRequest",
"certificate": le_util.jose_b64encode(cert_der),
"signature": crypto_util.create_sig(cert_der, key, nonce),
}
def status_request(token):
"""Create ACME "statusRequest" message.
:param unicode token: Token provided in ACME "defer" message.
:returns: ACME "statusRequest" message.
:rtype: dict
"""
return {
"type": "statusRequest",
"token": token,
}

View file

@ -457,11 +457,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
ssl_addr_p = self.aug.match(
addr_match % (ssl_fp, parser.case_i('VirtualHost')))
for i in range(len(ssl_addr_p)):
for addr in ssl_addr_p:
old_addr = obj.Addr.fromstring(
str(self.aug.get(ssl_addr_p[i])))
str(self.aug.get(addr)))
ssl_addr = old_addr.get_addr_obj("443")
self.aug.set(ssl_addr_p[i], str(ssl_addr))
self.aug.set(addr, str(ssl_addr))
ssl_addrs.add(ssl_addr)
# Add directives

View file

@ -77,13 +77,12 @@ class ApacheParser(object):
"""
self.aug.set(aug_conf_path + "/directive[last() + 1]", directive)
if type(arg) is not list:
self.aug.set(aug_conf_path + "/directive[last()]/arg", arg)
if isinstance(arg, list):
for i, value in enumerate(arg, 1):
self.aug.set(
"%s/directive[last()]/arg[%d]" % (aug_conf_path, i), value)
else:
for i in range(len(arg)):
self.aug.set("%s/directive[last()]/arg[%d]" %
(aug_conf_path, (i+1)),
arg[i])
self.aug.set(aug_conf_path + "/directive[last()]/arg", arg)
def find_dir(self, directive, arg=None, start=None):
"""Finds directive in the configuration.
@ -96,7 +95,7 @@ class ApacheParser(object):
Note: Augeas is inherently case sensitive while Apache is case
insensitive. Augeas 1.0 allows case insensitive regexes like
regexp(/Listen/, 'i'), however the version currently supported
regexp(/Listen/, "i"), however the version currently supported
by Ubuntu 0.10 does not. Thus I have included my own case insensitive
transformation by calling case_i() on everything to maintain
compatibility.
@ -119,10 +118,11 @@ class ApacheParser(object):
# No regexp code
# if arg is None:
# matches = self.aug.match(start +
# "//*[self::directive='"+directive+"']/arg")
# "//*[self::directive='" + directive + "']/arg")
# else:
# matches = self.aug.match(start +
# "//*[self::directive='" + directive+"']/* [self::arg='" + arg + "']")
# "//*[self::directive='" + directive +
# "']/* [self::arg='" + arg + "']")
# includes = self.aug.match(start +
# "//* [self::directive='Include']/* [label()='arg']")
@ -313,8 +313,8 @@ class ApacheParser(object):
self.root + "/*/*/*.augsave",
self.root + "/*/*/*~"]
for i in range(len(excl)):
self.aug.set("/augeas/load/Httpd/excl[%d]" % (i+1), excl[i])
for i, excluded in enumerate(excl, 1):
self.aug.set("/augeas/load/Httpd/excl[%d]" % i, excluded)
self.aug.load()

View file

@ -2,7 +2,10 @@
import logging
import sys
from letsencrypt.client import acme
import Crypto.PublicKey.RSA
from letsencrypt.acme import messages
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import errors
@ -53,7 +56,9 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
"""Add a challenge message to the AuthHandler.
:param str domain: domain for authorization
:param dict msg: ACME challenge message
:param msg: ACME "challenge" message
:type msg: :class:`letsencrypt.acme.message.Challenge`
:param authkey: authorized key for the challenge
:type authkey: :class:`letsencrypt.client.le_util.Key`
@ -64,7 +69,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
"Multiple ACMEChallengeMessages for the same domain "
"is not supported.")
self.domains.append(domain)
self.responses[domain] = ["null"] * len(msg["challenges"])
self.responses[domain] = ["null"] * len(msg.challenges)
self.msgs[domain] = msg
self.authkey[domain] = authkey
@ -102,18 +107,19 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
:param str domain: domain that is requesting authorization
:returns: ACME "authorization" message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Authorization`
"""
try:
auth = self.network.send_and_receive_expected(
acme.authorization_request(
self.msgs[domain]["sessionID"],
domain,
self.msgs[domain]["nonce"],
self.responses[domain],
self.authkey[domain].pem),
"authorization")
messages.AuthorizationRequest.create(
session_id=self.msgs[domain].session_id,
nonce=self.msgs[domain].nonce,
responses=self.responses[domain],
name=domain,
key=Crypto.PublicKey.RSA.importKey(
self.authkey[domain].pem)),
messages.Authorization)
logging.info("Received Authorization for %s", domain)
return auth
except errors.LetsEncryptClientError as err:
@ -129,14 +135,15 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
.. todo:: It might be worth it to try different challenges to
find one that doesn't throw an exception
.. todo:: separate into more functions
"""
logging.info("Performing the following challenges:")
for dom in self.domains:
self.paths[dom] = gen_challenge_path(
self.msgs[dom]["challenges"],
self.msgs[dom].challenges,
self._get_chall_pref(dom),
self.msgs[dom].get("combinations", None))
self.msgs[dom].combinations)
self.dv_c[dom], self.client_c[dom] = self._challenge_factory(
dom, self.paths[dom])
@ -145,15 +152,19 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
# Order is important here as we will not expose the outside
# Authenticator to our own indices.
flat_client = []
flat_auth = []
flat_dv = []
for dom in self.domains:
flat_client.extend(ichall.chall for ichall in self.client_c[dom])
flat_auth.extend(ichall.chall for ichall in self.dv_c[dom])
flat_dv.extend(ichall.chall for ichall in self.dv_c[dom])
client_resp = []
dv_resp = []
try:
if flat_client:
client_resp = self.client_auth.perform(flat_client)
if flat_auth:
dv_resp = self.dv_auth.perform(flat_auth)
if flat_dv:
dv_resp = self.dv_auth.perform(flat_dv)
# This will catch both specific types of errors.
except errors.LetsEncryptAuthHandlerError as err:
logging.critical("Failure in setting up challenges:")
@ -168,8 +179,10 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
logging.info("Ready for verification...")
# Assemble Responses
self._assign_responses(client_resp, self.client_c)
self._assign_responses(dv_resp, self.dv_c)
if client_resp:
self._assign_responses(client_resp, self.client_c)
if dv_resp:
self._assign_responses(dv_resp, self.dv_c)
def _assign_responses(self, flat_list, ichall_dict):
"""Assign responses from flat_list back to the IndexedChall dicts.
@ -213,9 +226,13 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
# These are indexed challenges... give just the challenges to the auth
# Chose to make these lists instead of a generator to make it easier to
# work with...
self.dv_auth.cleanup([ichall.chall for ichall in self.dv_c[domain]])
self.client_auth.cleanup(
[ichall.chall for ichall in self.client_c[domain]])
dv_list = [ichall.chall for ichall in self.dv_c[domain]]
client_list = [ichall.chall for ichall in self.client_c[domain]]
if dv_list:
self.dv_auth.cleanup(dv_list)
if client_list:
self.client_auth.cleanup(client_list)
def _cleanup_state(self, delete_list):
"""Cleanup state after an authorization is received.
@ -252,7 +269,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
recognized
"""
challenges = self.msgs[domain]["challenges"]
challenges = self.msgs[domain].challenges
dv_chall = []
client_chall = []

View file

@ -4,9 +4,10 @@ import hashlib
from Crypto import Random
from letsencrypt.acme import jose
from letsencrypt.client import constants
from letsencrypt.client import crypto_util
from letsencrypt.client import le_util
# Authenticator Challenges
@ -45,7 +46,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key):
"""
# Generate S
dvsni_s = Random.get_random_bytes(constants.S_SIZE)
dvsni_r = le_util.jose_b64decode(r_b64)
dvsni_r = jose.b64decode(r_b64)
# Generate extension
ext = _dvsni_gen_ext(dvsni_r, dvsni_s)
@ -53,7 +54,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key):
cert_pem = crypto_util.make_ss_cert(
key.pem, [nonce + constants.DVSNI_DOMAIN_SUFFIX, name, ext])
return cert_pem, le_util.jose_b64encode(dvsni_s)
return cert_pem, jose.b64encode(dvsni_s)
def _dvsni_gen_ext(dvsni_r, dvsni_s):

View file

@ -3,10 +3,13 @@ import logging
import os
import sys
import Crypto.PublicKey.RSA
import M2Crypto
import zope.component
from letsencrypt.client import acme
from letsencrypt.acme import messages
from letsencrypt.acme import util as acme_util
from letsencrypt.client import auth_handler
from letsencrypt.client import client_authenticator
from letsencrypt.client import crypto_util
@ -42,7 +45,6 @@ class Client(object):
:type config: :class:`~letsencrypt.client.interfaces.IConfig`
"""
zope.interface.implements(interfaces.IAuthenticator)
def __init__(self, config, authkey, dv_auth, installer):
"""Initialize a client.
@ -94,11 +96,11 @@ class Client(object):
csr = init_csr(self.authkey, domains, self.config.cert_dir)
# Retrieve certificate
certificate_dict = self.acme_certificate(csr.data)
certificate_msg = self.acme_certificate(csr.data)
# Save Certificate
cert_file, chain_file = self.save_certificate(
certificate_dict, self.config.cert_path, self.config.chain_path)
certificate_msg, self.config.cert_path, self.config.chain_path)
revoker.Revoker.store_cert_key(cert_file, self.authkey.file, False)
@ -108,11 +110,12 @@ class Client(object):
"""Handle ACME "challenge" phase.
:returns: ACME "challenge" message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Challenge`
"""
return self.network.send_and_receive_expected(
acme.challenge_request(domain), "challenge")
messages.ChallengeRequest(identifier=domain),
messages.Challenge)
def acme_certificate(self, csr_der):
"""Handle ACME "certificate" phase.
@ -120,18 +123,24 @@ class Client(object):
:param str csr_der: CSR in DER format.
:returns: ACME "certificate" message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.message.Certificate`
"""
logging.info("Preparing and sending CSR...")
return self.network.send_and_receive_expected(
acme.certificate_request(csr_der, self.authkey.pem), "certificate")
messages.CertificateRequest.create(
csr=acme_util.ComparableX509(
M2Crypto.X509.load_request_der_string(csr_der)),
key=Crypto.PublicKey.RSA.importKey(self.authkey.pem)),
messages.Certificate)
def save_certificate(self, certificate_dict, cert_path, chain_path):
def save_certificate(self, certificate_msg, cert_path, chain_path):
# pylint: disable=no-self-use
"""Saves the certificate received from the ACME server.
:param dict certificate_dict: certificate message from server
:param certificate_msg: ACME "certificate" message from server.
:type certificate_msg: :class:`letsencrypt.acme.messages.Certificate`
:param str cert_path: Path to attempt to save the cert file
:param str chain_path: Path to attempt to save the chain file
@ -143,16 +152,15 @@ class Client(object):
"""
cert_chain_abspath = None
cert_fd, cert_file = le_util.unique_file(cert_path, 0o644)
cert_fd.write(
crypto_util.b64_cert_to_pem(certificate_dict["certificate"]))
cert_fd.write(certificate_msg.certificate.as_pem())
cert_fd.close()
logging.info(
"Server issued certificate; certificate written to %s", cert_file)
if certificate_dict.get("chain", None):
if certificate_msg.chain:
chain_fd, chain_fn = le_util.unique_file(chain_path, 0o644)
for cert in certificate_dict.get("chain", []):
chain_fd.write(crypto_util.b64_cert_to_pem(cert))
for cert in certificate_msg.chain:
chain_fd.write(cert.to_pem())
chain_fd.close()
logging.info("Cert chain written to %s", chain_fn)
@ -335,14 +343,6 @@ def init_csr(privkey, names, cert_dir):
return le_util.CSR(csr_filename, csr_der, "der")
def csr_pem_to_der(csr):
"""Convert pem CSR to der."""
csr_obj = M2Crypto.X509.load_request_string(csr.data)
return le_util.CSR(csr.file, csr_obj.as_der(), "der")
# This should be controlled by commandline parameters
def determine_authenticator(config):
"""Returns a valid IAuthenticator.

View file

@ -66,3 +66,7 @@ IConfig.work_dir. Used for easy revocation."""
REC_TOKEN_DIR = "recovery_tokens"
"""Directory where all recovery tokens are saved (relative to
IConfig.work_dir)."""
NETSTAT = "/bin/netstat"
"""Location of netstat binary for checking whether a listener is already
running on the specified port (Linux-specific)."""

View file

@ -4,70 +4,14 @@
is capable of handling the signatures.
"""
import binascii
import logging
import time
from Crypto import Random
import Crypto.Hash.SHA256
import Crypto.PublicKey.RSA
import Crypto.Signature.PKCS1_v1_5
import M2Crypto
from letsencrypt.client import constants
from letsencrypt.client import le_util
def create_sig(msg, key_str, nonce=None):
"""Create signature with nonce prepended to the message.
.. todo:: Protect against crypto unicode errors... is this sufficient?
Do I need to escape?
:param str msg: Message to be signed
:param str key_str: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str msg: Message to be signed
:param str nonce: Nonce to be used (required size)
:returns: Signature.
:rtype: dict
"""
key = Crypto.PublicKey.RSA.importKey(key_str)
if nonce is None:
nonce = Random.get_random_bytes(constants.NONCE_SIZE)
assert len(nonce) == constants.NONCE_SIZE
msg_with_nonce = nonce + msg
hashed = Crypto.Hash.SHA256.new(msg_with_nonce)
signature = Crypto.Signature.PKCS1_v1_5.new(key).sign(hashed)
logging.debug("%s signed as %s", msg_with_nonce, signature)
n_bytes = binascii.unhexlify(_leading_zeros(hex(key.n)[2:].rstrip("L")))
e_bytes = binascii.unhexlify(_leading_zeros(hex(key.e)[2:].rstrip("L")))
return {
"nonce": le_util.jose_b64encode(nonce),
"alg": "RS256",
"jwk": {
"kty": "RSA",
"n": le_util.jose_b64encode(n_bytes),
"e": le_util.jose_b64encode(e_bytes),
},
"sig": le_util.jose_b64encode(signature),
}
def _leading_zeros(arg):
if len(arg) % 2:
return "0" + arg
return arg
def make_csr(key_str, domains):
"""Generate a CSR.

View file

@ -30,8 +30,11 @@ class IAuthenticator(zope.interface.Interface):
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.).
- chall_list will never be empty
- chall_list will only contain types found within
:func:`get_chall_pref`
:returns: Challenge responses or if it cannot be completed then:
:returns: ACME Challenge responses or if it cannot be completed then:
``None``
Authenticator can perform challenge, but can't at this time
@ -45,8 +48,12 @@ class IAuthenticator(zope.interface.Interface):
def cleanup(chall_list):
"""Revert changes and shutdown after challenges complete.
:param list chall_list: namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.).
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.)
- Only challenges given previously in the perform function will be
found in chall_list.
- chall_list will never be empty
"""

View file

@ -1,5 +1,4 @@
"""Utilities for all Let's Encrypt."""
import base64
import collections
import errno
import os
@ -12,7 +11,6 @@ Key = collections.namedtuple("Key", "file pem")
# Note: form is the type of data, "pem" or "der"
CSR = collections.namedtuple("CSR", "file data form")
def make_or_verify_dir(directory, mode=0o755, uid=0):
"""Make sure directory exists with proper permissions.
@ -83,53 +81,3 @@ def safely_remove(path):
except OSError as err:
if err.errno != errno.ENOENT:
raise
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
#
# Jose Base64:
#
# - URL-safe Base64
#
# - padding stripped
def jose_b64encode(data):
"""JOSE Base64 encode.
:param data: Data to be encoded.
:type data: str or bytearray
:returns: JOSE Base64 string.
:rtype: str
:raises TypeError: if `data` is of incorrect type
"""
if not isinstance(data, str):
raise TypeError("argument should be str or bytearray")
return base64.urlsafe_b64encode(data).rstrip("=")
def jose_b64decode(data):
"""JOSE Base64 decode.
:param data: Base64 string to be decoded. If it's unicode, then
only ASCII characters are allowed.
:type data: str or unicode
:returns: Decoded data.
:raises TypeError: if input is of incorrect type
:raises ValueError: if input is unicode with non-ASCII characters
"""
if isinstance(data, unicode):
try:
data = data.encode("ascii")
except UnicodeEncodeError:
raise ValueError(
"unicode argument should contain only ASCII characters")
elif not isinstance(data, str):
raise TypeError("argument should be a str or unicode")
return base64.urlsafe_b64decode(data + "=" * (4 - (len(data) % 4)))

View file

@ -1,13 +1,12 @@
"""Network Module."""
import json
import logging
import sys
import time
import jsonschema
import requests
from letsencrypt.client import acme
from letsencrypt.acme import messages
from letsencrypt.client import errors
@ -31,10 +30,11 @@ class Network(object):
def send(self, msg):
"""Send ACME message to server.
:param dict msg: ACME message (JSON serializable).
:param msg: ACME message.
:type msg: :class:`letsencrypt.acme.messages.Message`
:returns: Server response message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Message`
:raises TypeError: if `msg` is not JSON serializable
:raises jsonschema.ValidationError: if not valid ACME message
@ -42,13 +42,10 @@ class Network(object):
or if response from server is not a valid ACME message.
"""
json_encoded = json.dumps(msg)
acme.acme_object_validate(json_encoded)
try:
response = requests.post(
self.server_url,
data=json_encoded,
data=msg.json_dumps(),
headers={"Content-Type": "application/json"},
verify=True
)
@ -56,66 +53,55 @@ class Network(object):
raise errors.LetsEncryptClientError(
'Sending ACME message to server has failed: %s' % error)
try:
acme.acme_object_validate(response.content)
except ValueError:
raise errors.LetsEncryptClientError(
'Server did not send JSON serializable message')
except jsonschema.ValidationError as error:
raise errors.LetsEncryptClientError(
'Response from server is not a valid ACME message')
return response.json()
return messages.Message.from_json(response.json(), validate=True)
def send_and_receive_expected(self, msg, expected):
"""Send ACME message to server and return expected message.
:param dict msg: ACME message (JSON serializable).
:param str expected: Name of the expected response ACME message type.
:param msg: ACME message.
:type msg: :class:`letsencrypt.acme.Message`
:returns: ACME response message of expected type.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Message`
:raises errors.LetsEncryptClientError: An exception is thrown
"""
response = self.send(msg)
try:
return self.is_expected_msg(response, expected)
except: # TODO: too generic exception
raise errors.LetsEncryptClientError(
'Expected message (%s) not received' % expected)
return self.is_expected_msg(response, expected)
def is_expected_msg(self, response, expected, delay=3, rounds=20):
"""Is response expected ACME message?
:param dict response: ACME response message from server.
:param str expected: Name of the expected response ACME message type.
:param response: ACME response message from server.
:type response: :class:`letsencrypt.acme.messages.Message`
:param expected: Expected response type.
:type expected: subclass of :class:`letsencrypt.acme.messages.Message`
:param int delay: Number of seconds to delay before next round
in case of ACME "defer" response message.
:param int rounds: Number of resend attempts in case of ACME "defer"
response message.
:returns: ACME response message from server.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Message`
:raises LetsEncryptClientError: if server sent ACME "error" message
"""
for _ in xrange(rounds):
if response["type"] == expected:
if isinstance(response, expected):
return response
elif response["type"] == "error":
logging.error(
"%s: %s - More Info: %s", response["error"],
response.get("message", ""), response.get("moreInfo", ""))
raise errors.LetsEncryptClientError(response["error"])
elif response["type"] == "defer":
elif isinstance(response, messages.Error):
logging.error("%s", response)
raise errors.LetsEncryptClientError(response.error)
elif isinstance(response, messages.Defer):
logging.info("Waiting for %d seconds...", delay)
time.sleep(delay)
response = self.send(acme.status_request(response["token"]))
response = self.send(
messages.StatusRequest(token=response.token))
else:
logging.fatal("Received unexpected message")
logging.fatal("Expected: %s", expected)

View file

@ -424,7 +424,8 @@ class Reverter(object):
# It is possible save checkpoints faster than 1 per second resulting in
# collisions in the naming convention.
cur_time = time.time()
for _ in range(10):
for _ in xrange(10):
final_dir = os.path.join(self.config.backup_dir, str(cur_time))
try:
os.rename(self.config.in_progress_dir, final_dir)

View file

@ -12,12 +12,14 @@ import logging
import os
import shutil
import Crypto.PublicKey.RSA
import M2Crypto
from letsencrypt.client import acme
from letsencrypt.acme import messages
from letsencrypt.acme import util as acme_util
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client import network
from letsencrypt.client.display import display_util
@ -134,7 +136,9 @@ class Revoker(object):
# TODO: Catch error associated with already revoked and proceed.
return self.network.send_and_receive_expected(
acme.revocation_request(cert_der, key), "revocation")
messages.RevocationRequest.create(
certificate=certificate, key=key),
messages.Revocation)
def display_menu(self):
"""List trusted Let's Encrypt certificates."""

View file

@ -1,15 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""An authenticator that doesn't rely on any existing server program.
This authenticator creates its own ephemeral TCP listener on the specified
port in order to respond to incoming DVSNI challenges from the certificate
authority."""
"""Standalone authenticator."""
import os
import signal
import socket
import subprocess
import sys
import time
@ -26,11 +19,14 @@ from letsencrypt.client import interfaces
class StandaloneAuthenticator(object):
# pylint: disable=too-many-instance-attributes
"""The StandaloneAuthenticator class itself.
"""Standalone authenticator.
This authenticator can be invoked by the Let's Encrypt client
according to the IAuthenticator API interface. It creates a local
TCP listener on a specified port and satisfies DVSNI challenges."""
This authenticator creates its own ephemeral TCP listener on the
specified port in order to respond to incoming DVSNI challenges from
the certificate authority. Therefore, it does not rely on any
existing server program.
"""
zope.interface.implements(interfaces.IAuthenticator)
def __init__(self):
@ -49,10 +45,12 @@ class StandaloneAuthenticator(object):
This handler receives inter-process communication from the
child process in the form of Unix signals.
:param int sig: Which signal the process received."""
# subprocess → client READY : SIGIO
# subprocess → client INUSE : SIGUSR1
# subprocess → client CANTBIND: SIGUSR2
:param int sig: Which signal the process received.
"""
# subprocess to client READY: SIGIO
# subprocess to client INUSE: SIGUSR1
# subprocess to client CANTBIND: SIGUSR2
if sig == signal.SIGIO:
self.subproc_state = "ready"
elif sig == signal.SIGUSR1:
@ -69,8 +67,10 @@ class StandaloneAuthenticator(object):
This handler receives inter-process communication from the parent
process in the form of Unix signals.
:param int sig: Which signal the process received."""
# client → subprocess CLEANUP : SIGINT
:param int sig: Which signal the process received.
"""
# client to subprocess CLEANUP : SIGINT
if sig == signal.SIGINT:
try:
self.ssl_conn.shutdown()
@ -91,6 +91,7 @@ class StandaloneAuthenticator(object):
# reported here and none of them should impede us from
# exiting as gracefully as possible.
pass
os.kill(self.parent_pid, signal.SIGUSR1)
sys.exit(0)
@ -101,18 +102,20 @@ class StandaloneAuthenticator(object):
connection when an incoming connection provides an SNI name
(in order to serve the appropriate certificate, if any).
:param OpenSSL.Connection connection: The TLS connection object
on which the SNI extension was received."""
:param connection: The TLS connection object on which the SNI
extension was received.
:type connection: :class:`OpenSSL.Connection`
"""
sni_name = connection.get_servername()
if sni_name in self.tasks:
pem_cert = self.tasks[sni_name]
else:
# TODO: Should we really present a certificate if we get an
# unexpected SNI name? Or should we just disconnect?
# unexpected SNI name? Or should we just disconnect?
pem_cert = self.tasks.values()[0]
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
pem_cert)
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pem_cert)
new_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
new_ctx.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda: False)
new_ctx.use_certificate(cert)
@ -122,32 +125,37 @@ class StandaloneAuthenticator(object):
def do_parent_process(self, port, delay_amount=5):
"""Perform the parent process side of the TCP listener task.
This should only be called by start_listener(). We will wait
up to delay_amount seconds to hear from the child process via
a signal.
This should only be called by :meth:`start_listener`. We will
wait up to delay_amount seconds to hear from the child process
via a signal.
:param int port: Which TCP port to bind.
:param float delay_amount: How long in seconds to wait for the
subprocess to notify us whether it succeeded.
subprocess to notify us whether it succeeded.
:returns: True or False according to whether we were notified
that the child process succeeded or failed in binding the port."""
:returns: ``True`` or ``False`` according to whether we were notified
that the child process succeeded or failed in binding the port.
:rtype: bool
"""
signal.signal(signal.SIGIO, self.client_signal_handler)
signal.signal(signal.SIGUSR1, self.client_signal_handler)
signal.signal(signal.SIGUSR2, self.client_signal_handler)
display = zope.component.getUtility(interfaces.IDisplay)
start_time = time.time()
while time.time() < start_time + delay_amount:
if self.subproc_state == "ready":
return True
if self.subproc_state == "inuse":
elif self.subproc_state == "inuse":
display.generic_notification(
"Could not bind TCP port {0} because it is already in "
"use it is already in use by another process on this "
"system (such as a web server).".format(port))
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(port))
return False
if self.subproc_state == "cantbind":
elif self.subproc_state == "cantbind":
display.generic_notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
@ -155,23 +163,28 @@ class StandaloneAuthenticator(object):
"root).".format(port))
return False
time.sleep(0.1)
display.generic_notification(
"Subprocess unexpectedly timed out while trying to bind TCP "
"port {0}.".format(port))
return False
def do_child_process(self, port, key):
"""Perform the child process side of the TCP listener task.
This should only be called by start_listener().
This should only be called by :meth:`start_listener`.
Normally does not return; instead, the child process exits from
within this function or from within the child process signal
handler.
:param int port: Which TCP port to bind.
:param le_util.Key key: The private key to use to respond to
DVSNI challenge requests."""
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: `letsencrypt.client.le_util.Key`
"""
signal.signal(signal.SIGINT, self.subproc_signal_handler)
self.sock = socket.socket()
try:
@ -217,14 +230,20 @@ class StandaloneAuthenticator(object):
self.ssl_conn.close()
def start_listener(self, port, key):
"""Create a child process which will start a TCP listener on the
"""Start listener.
Create a child process which will start a TCP listener on the
specified port to perform the specified DVSNI challenges.
:param int port: The TCP port to bind.
:param le_util.Key key: The private key to use to respond to
DVSNI challenge requests.
:returns: True or False to indicate success or failure creating
the subprocess.
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: :class:`letsencrypt.client.le_util.Key`
:returns: ``True`` or ``False`` to indicate success or failure creating
the subprocess.
:rtype: bool
"""
fork_result = os.fork()
Crypto.Random.atfork()
@ -241,35 +260,85 @@ class StandaloneAuthenticator(object):
# should terminate via sys.exit().
return self.do_child_process(port, key)
def already_listening(self, port): # pylint: disable=no-self-use
"""Check if a process is already listening on the port.
If so, also tell the user via a display notification.
.. warning::
The current implementation is Linux-specific. (On other
operating systems, it will simply not detect bound ports.)
This function can only usefully be run as root.
:param int port: The TCP port in question.
:returns: True or False."""
try:
proc = subprocess.Popen(
[constants.NETSTAT, "-nta", "--program"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = proc.communicate()
if proc.wait() != 0:
raise OSError("netstat subprocess failed")
lines = [x.split() for x in stdout.split("\n")[2:] if x]
listeners = [L[6] for L in lines if
# IPv4 socket case
(L[0] == 'tcp' and L[5] == 'LISTEN' \
and L[3] == '0.0.0.0:{0}'.format(port)) or \
# IPv6 socket case
(L[0] == 'tcp6' and L[5] == 'LISTEN' \
and L[3] == ':::{0}'.format(port))]
if listeners:
pid, name = listeners[0].split("/")
display = zope.component.getUtility(interfaces.IDisplay)
display.generic_notification(
"The program {0} (process ID {1}) is already listening "
"on TCP port {2}. This will prevent us from binding to "
"that port. Please stop the {0} program temporarily "
"and then try again.".format(name, pid, port))
return True
except (OSError, ValueError, IndexError):
# A sign that this command isn't available or usable this
# way on this operating system, or there was something
# unexpected about the format of the netstat output; we will
# not be able to recover from this condition.
pass
return False
# IAuthenticator method implementations follow
def get_chall_pref(self, unused_domain):
# pylint: disable=no-self-use
"""IAuthenticator interface method get_chall_pref.
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Get challenge preferences.
IAuthenticator interface method get_chall_pref.
Return a list of challenge types that this authenticator
can perform for this domain. In the case of the
StandaloneAuthenticator, the only challenge type that can ever
be performed is dvsni.
:returns: A list containing only 'dvsni'."""
:returns: A list containing only 'dvsni'.
"""
return ["dvsni"]
def perform(self, chall_list):
"""IAuthenticator interface method perform.
"""Perform the challenge.
Attempt to perform the
specified challenges, returning the status of each. For the
StandaloneAuthenticator, because there is no convenient way to add
additional requests, this should only be invoked once; subsequent
invocations are an error. To perform validations for multiple
independent sets of domains, a separate StandaloneAuthenticator
should be instantiated.
.. warning::
For the StandaloneAuthenticator, because there is no convenient
way to add additional requests, this should only be invoked
once; subsequent invocations are an error. To perform
validations for multiple independent sets of domains, a separate
StandaloneAuthenticator should be instantiated.
:param list chall_list: A list of the the challenge objects to
be attempted by this authenticator.
:returns: A list in the same order containing, in each position,
the successfully configured challenge, False, or None."""
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.)
:returns: ACME Challenge DVSNI responses following IAuthenticator
interface.
:rtype: :class:`list` of :class`dict`
"""
if self.child_pid or self.tasks:
# We should not be willing to continue with perform
# if there were existing pending challenges.
@ -295,6 +364,12 @@ class StandaloneAuthenticator(object):
results_if_failure.append(False)
if not self.tasks:
raise ValueError("nothing for .perform() to do")
if self.already_listening(constants.DVSNI_CHALLENGE_PORT):
# If we know a process is already listening on this port,
# tell the user, and don't even attempt to bind it. (This
# test is Linux-specific and won't indicate that the port
# is bound if invoked on a different operating system.)
return results_if_failure
# Try to do the authentication; note that this creates
# the listener subprocess via os.fork()
if self.start_listener(constants.DVSNI_CHALLENGE_PORT, key):
@ -305,17 +380,14 @@ class StandaloneAuthenticator(object):
return results_if_failure
def cleanup(self, chall_list):
"""IAuthenticator interface method cleanup.
"""Clean up.
Remove each of the specified challenges from the list of
challenges that still need to be performed. (In the case of
the StandaloneAuthenticator, if some challenges are removed
from the list, the authenticator socket will still respond to
those challenges.) Once all challenges have been removed from
the list, the listener is deactivated and stops listening.
If some challenges are removed from the list, the authenticator
socket will still respond to those challenges. Once all
challenges have been removed from the list, the listener is
deactivated and stops listening.
:param list chall_list: A list of the the challenge objects to
be deactivated."""
"""
# Remove this from pending tasks list
for chall in chall_list:
assert isinstance(chall, challenge_util.DvsniChall)

View file

@ -1,158 +0,0 @@
"""Tests for letsencrypt.client.acme."""
import pkg_resources
import unittest
import jsonschema
class ACMEObjectValidateTest(unittest.TestCase):
"""Tests for letsencrypt.client.acme.acme_object_validate."""
def setUp(self):
self.schemata = {
'foo': {
'type': 'object',
'properties': {
'price': {'type': 'number'},
'name': {'type': 'string'},
},
},
}
def _call(self, json_string):
from letsencrypt.client.acme import acme_object_validate
return acme_object_validate(json_string, self.schemata)
def _test_fails(self, json_string):
self.assertRaises(jsonschema.ValidationError, self._call, json_string)
def test_non_dictionary_fails(self):
self._test_fails('[]')
def test_dict_without_type_fails(self):
self._test_fails('{}')
def test_unknown_type_fails(self):
self._test_fails('{"type": "bar"}')
def test_valid_returns_none(self):
self.assertTrue(self._call('{"type": "foo"}') is None)
def test_invalid_fails(self):
self._test_fails('{"type": "foo", "price": "asd"}')
class PrettyTest(unittest.TestCase): # pylint: disable=too-few-public-methods
"""Tests for letsencrypt.client.acme.pretty."""
@classmethod
def _call(cls, json_string):
from letsencrypt.client.acme import pretty
return pretty(json_string)
def test_it(self):
self.assertEqual(
self._call('{"foo": {"bar": "baz"}}'),
'{\n "foo": {\n "bar": "baz"\n }\n}')
class MessageFactoriesTest(unittest.TestCase):
"""Tests for ACME message factories from letsencrypt.client.acme."""
def setUp(self):
self.privkey = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ'
@classmethod
def _validate(cls, msg):
from letsencrypt.client.acme import SCHEMATA
jsonschema.validate(msg, SCHEMATA[msg['type']])
def test_challenge_request(self):
from letsencrypt.client.acme import challenge_request
msg = challenge_request('example.com')
self._validate(msg)
self.assertEqual(msg, {
'type': 'challengeRequest',
'identifier': 'example.com',
})
def test_authorization_request(self):
from letsencrypt.client.acme import authorization_request
responses = [
{
'type': 'simpleHttps',
'path': 'Hf5GrX4Q7EBax9hc2jJnfw',
},
None, # null
{
'type': 'recoveryToken',
'token': '23029d88d9e123e',
}
]
msg = authorization_request(
'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'example.com',
'czpsrF0KMH6dgajig3TGHw',
responses,
self.privkey,
self.nonce,
)
self._validate(msg)
self.assertEqual(
msg.pop('signature')['sig'],
'VkpReso87ogwGul2MGck96TkYs4QoblIgNthgrm9O7EBGlzCRCnTHnx'
'bj6loqaC4f5bn1rgS927Gp1Kvbqnmqg'
)
self.assertEqual(msg, {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': 'czpsrF0KMH6dgajig3TGHw',
'responses': responses,
})
def test_certificate_request(self):
from letsencrypt.client.acme import certificate_request
msg = certificate_request(
'TODO: real DER CSR?', self.privkey, self.nonce)
self._validate(msg)
self.assertEqual(
msg.pop('signature')['sig'],
'HEQVN4MU1yDrArP2T7WZQ12XlHCn5DgTPgb5eWT5_vjRPppLSNe6uWE'
'x9SFwG9d9umqn49nZCSW7uskA2lcW6Q'
)
self.assertEqual(msg, {
'type': 'certificateRequest',
'csr': 'VE9ETzogcmVhbCBERVIgQ1NSPw',
})
def test_revocation_request(self):
from letsencrypt.client.acme import revocation_request
msg = revocation_request(
'TODO: real DER cert?', self.privkey, self.nonce)
self._validate(msg)
self.assertEqual(
msg.pop('signature')['sig'],
'ABXA1IsyTalTXIojxmGnIUGyZASmvqEvTQ98jJ5KFs2FTswLEmsoqFX'
'fU6l5_fous-tsbXOfLN-7PjfZ5XWPvg'
)
self.assertEqual(msg, {
'type': 'revocationRequest',
'certificate': 'VE9ETzogcmVhbCBERVIgY2VydD8',
})
def test_status_request(self):
from letsencrypt.client.acme import status_request
msg = status_request(u'O7-s9MNq1siZHlgrMzi9_A')
self._validate(msg)
self.assertEqual(msg, {
'type': 'statusRequest',
'token': u'O7-s9MNq1siZHlgrMzi9_A',
})
if __name__ == '__main__':
unittest.main()

View file

@ -26,7 +26,7 @@ CHALLENGES = {
"successURL": "https://example.ca/confirmrecovery/bb1b9928932",
"contact": "c********n@example.com"
},
"recoveryTokent":
"recoveryToken":
{
"type": "recoveryToken"
},
@ -80,7 +80,6 @@ def gen_combos(challs):
"""Generate natural combinations for challs."""
dv_chall = []
renewal_chall = []
combos = []
for i, chall in enumerate(challs):
if chall["type"] in constants.DV_CHALLENGES:
@ -89,24 +88,5 @@ def gen_combos(challs):
renewal_chall.append(i)
# Gen combos for 1 of each type
for i in range(len(dv_chall)):
for j in range(len(renewal_chall)):
combos.append([i, j])
return combos
def get_chall_msg(iden, nonce, challenges, combos=None):
"""Produce an ACME challenge message."""
chall_msg = {
"type": "challenge",
"sessionID": iden,
"nonce": nonce,
"challenges": challenges
}
if combos is None:
return chall_msg
chall_msg["combinations"] = combos
return chall_msg
return [[i, j] for i in xrange(len(dv_chall))
for j in xrange(len(renewal_chall))]

View file

@ -9,6 +9,7 @@ from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import le_util
from letsencrypt.client.apache.obj import Addr
from letsencrypt.client.tests.apache import util
@ -134,7 +135,6 @@ class DvsniPerformTest(util.ApacheTest):
self.assertEqual(responses[i]["s"], "randomS%d" % i)
def test_mod_config(self):
from letsencrypt.client.apache.obj import Addr
for chall in self.challs:
self.sni.add_chall(chall)
v_addr1 = [Addr(("1.2.3.4", "443")), Addr(("5.6.7.8", "443"))]

View file

@ -31,9 +31,7 @@ class AddrTest(unittest.TestCase):
def test_eq(self):
self.assertEqual(self.addr1, self.addr2.get_addr_obj(""))
self.assertNotEqual(self.addr1, self.addr2)
# This is specifically designed to hit line 28 but coverage denies me
# the satisfaction :(
self.assertNotEqual(self.addr1, 3333)
self.assertFalse(self.addr1 == 3333)
def test_set_inclusion(self):
from letsencrypt.client.apache.obj import Addr
@ -63,7 +61,7 @@ class VirtualHostTest(unittest.TestCase):
self.assertEqual(vhost1b, self.vhost1)
self.assertEqual(str(vhost1b), str(self.vhost1))
self.assertNotEqual(vhost1b, 1234)
self.assertFalse(vhost1b == 1234)
if __name__ == "__main__":

View file

@ -4,8 +4,11 @@ import unittest
import mock
from letsencrypt.acme import messages
from letsencrypt.client import challenge_util
from letsencrypt.client import errors
from letsencrypt.client.tests import acme_util
@ -25,8 +28,8 @@ class SatisfyChallengesTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.auth_handler import AuthHandler
self.mock_dv_auth = mock.MagicMock(name='ApacheConfigurator')
self.mock_client_auth = mock.MagicMock(name='ClientAuthenticator')
self.mock_dv_auth = mock.MagicMock(name="ApacheConfigurator")
self.mock_client_auth = mock.MagicMock(name="ClientAuthenticator")
self.mock_dv_auth.get_chall_pref.return_value = ["dvsni"]
self.mock_client_auth.get_chall_pref.return_value = ["recoveryToken"]
@ -45,7 +48,8 @@ class SatisfyChallengesTest(unittest.TestCase):
def test_name1_dvsni1(self):
dom = "0"
challenge = [acme_util.CHALLENGES["dvsni"]]
msg = acme_util.get_chall_msg(dom, "nonce0", challenge)
msg = messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenge, combinations=[])
self.handler.add_chall_msg(dom, msg, "dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -59,12 +63,37 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.dv_c[dom]), 1)
self.assertEqual(len(self.handler.client_c[dom]), 0)
def test_name1_rectok1(self):
dom = "0"
challenge = [acme_util.CHALLENGES["recoveryToken"]]
msg = messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenge, combinations=[])
self.handler.add_chall_msg(dom, msg, "dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
self.assertEqual(len(self.handler.responses), 1)
self.assertEqual(len(self.handler.responses[dom]), 1)
# Test if statement for dv_auth perform
self.assertEqual(self.mock_client_auth.perform.call_count, 1)
self.assertEqual(self.mock_dv_auth.perform.call_count, 0)
self.assertEqual("RecTokenChall0", self.handler.responses[dom][0])
# Assert 1 domain
self.assertEqual(len(self.handler.dv_c), 1)
self.assertEqual(len(self.handler.client_c), 1)
# Assert 1 auth challenge, 0 dv
self.assertEqual(len(self.handler.dv_c[dom]), 0)
self.assertEqual(len(self.handler.client_c[dom]), 1)
def test_name5_dvsni5(self):
challenge = [acme_util.CHALLENGES["dvsni"]]
for i in xrange(5):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(str(i), "nonce%d" % i, challenge),
messages.Challenge(session_id=str(i), nonce="nonce%d" % i,
challenges=challenge, combinations=[]),
"dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -74,6 +103,10 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.client_c), 5)
# Each message contains 1 auth, 0 client
# Test proper call count for methods
self.assertEqual(self.mock_client_auth.perform.call_count, 0)
self.assertEqual(self.mock_dv_auth.perform.call_count, 1)
for i in xrange(5):
dom = str(i)
self.assertEqual(len(self.handler.responses[dom]), 1)
@ -91,7 +124,8 @@ class SatisfyChallengesTest(unittest.TestCase):
combos = acme_util.gen_combos(challenges)
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg("0", "nonce0", challenges, combos),
messages.Challenge(session_id="0", nonce="nonce0",
challenges=challenges, combinations=combos),
"dummy_key")
path = gen_path(["simpleHttps"], challenges)
@ -103,6 +137,10 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.dv_c), 1)
self.assertEqual(len(self.handler.client_c), 1)
# Test if statement for client_auth perform
self.assertEqual(self.mock_client_auth.perform.call_count, 0)
self.assertEqual(self.mock_dv_auth.perform.call_count, 1)
self.assertEqual(
self.handler.responses[dom],
self._get_exp_response(dom, path, challenges))
@ -120,7 +158,8 @@ class SatisfyChallengesTest(unittest.TestCase):
combos = acme_util.gen_combos(challenges)
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg(dom, "nonce0", challenges, combos),
messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenges, combinations=combos),
"dummy_key")
path = gen_path(["simpleHttps", "recoveryToken"], challenges)
@ -150,8 +189,9 @@ class SatisfyChallengesTest(unittest.TestCase):
for i in xrange(5):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(
str(i), "nonce%d" % i, challenges, combos),
messages.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=challenges, combinations=combos),
"dummy_key")
path = gen_path(["dvsni", "recoveryContact"], challenges)
@ -166,7 +206,7 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.dv_c), 5)
self.assertEqual(len(self.handler.client_c), 5)
for i in range(5):
for i in xrange(5):
dom = str(i)
self.assertEqual(
self.handler.responses[dom],
@ -199,8 +239,9 @@ class SatisfyChallengesTest(unittest.TestCase):
paths.append(gen_path(chosen_chall[i], challenge_list[i]))
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg(
dom, "nonce%d" % i, challenge_list[i]),
messages.Challenge(
session_id=dom, nonce="nonce%d" % i,
challenges=challenge_list[i], combinations=[]),
"dummy_key")
mock_chall_path.side_effect = paths
@ -247,40 +288,47 @@ class SatisfyChallengesTest(unittest.TestCase):
for i in xrange(3):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(
str(i), "nonce%d" % i, challenges, combos),
messages.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=challenges, combinations=combos),
"dummy_key")
mock_chall_path.return_value = gen_path(
["dvsni", "proofOfPossession"], challenges)
mock_chall_path.side_effect = [
gen_path(["dvsni", "proofOfPossession"], challenges),
gen_path(["proofOfPossession"], challenges),
gen_path(["dvsni"], challenges),
]
# This may change in the future... but for now catch the error
self.assertRaises(errors.LetsEncryptAuthHandlerError,
self.handler._satisfy_challenges)
# Verify cleanup is actually run correctly
self.assertEqual(self.mock_dv_auth.cleanup.call_count, 3)
self.assertEqual(self.mock_client_auth.cleanup.call_count, 3)
self.assertEqual(self.mock_dv_auth.cleanup.call_count, 2)
self.assertEqual(self.mock_client_auth.cleanup.call_count, 2)
dv_cleanup_args = self.mock_dv_auth.cleanup.call_args_list
client_cleanup_args = self.mock_client_auth.cleanup.call_args_list
# Check DV cleanup
mock_cleanup_args = self.mock_dv_auth.cleanup.call_args_list
for i in xrange(3):
# Assert length of arg list was 1
arg_chall_list = mock_cleanup_args[i][0][0]
self.assertEqual(len(arg_chall_list), 1)
self.assertTrue(isinstance(arg_chall_list[0],
challenge_util.DvsniChall))
for i in xrange(2):
dv_chall_list = dv_cleanup_args[i][0][0]
self.assertEqual(len(dv_chall_list), 1)
self.assertTrue(
isinstance(dv_chall_list[0], challenge_util.DvsniChall))
# Check Auth cleanup
mock_cleanup_args = self.mock_client_auth.cleanup.call_args_list
for i in xrange(3):
arg_chall_list = mock_cleanup_args[i][0][0]
self.assertEqual(len(arg_chall_list), 1)
self.assertTrue(isinstance(arg_chall_list[0],
challenge_util.PopChall))
for i in xrange(2):
client_chall_list = client_cleanup_args[i][0][0]
self.assertEqual(len(client_chall_list), 1)
self.assertTrue(
isinstance(client_chall_list[0], challenge_util.PopChall))
def _get_exp_response(self, domain, path, challenges): # pylint: disable=no-self-use
def _get_exp_response(self, domain, path, challenges):
# pylint: disable=no-self-use
exp_resp = ["null"] * len(challenges)
for i in path:
exp_resp[i] = TRANSLATE[challenges[i]["type"]] + str(domain)
@ -293,8 +341,8 @@ class GetAuthorizationsTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.auth_handler import AuthHandler
self.mock_dv_auth = mock.MagicMock(name='ApacheConfigurator')
self.mock_client_auth = mock.MagicMock(name='ClientAuthenticator')
self.mock_dv_auth = mock.MagicMock(name="ApacheConfigurator")
self.mock_client_auth = mock.MagicMock(name="ClientAuthenticator")
self.mock_sat_chall = mock.MagicMock(name="_satisfy_challenges")
self.mock_acme_auth = mock.MagicMock(name="acme_authorization")
@ -313,7 +361,8 @@ class GetAuthorizationsTest(unittest.TestCase):
for i in xrange(3):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(str(i), "nonce%d" % i, challenge),
messages.Challenge(session_id=str(i), nonce="nonce%d" % i,
challenges=challenge, combinations=[]),
"dummy_key")
self.mock_sat_chall.side_effect = self._sat_solved_at_once
@ -341,7 +390,8 @@ class GetAuthorizationsTest(unittest.TestCase):
challenges = acme_util.get_challenges()
self.handler.add_chall_msg(
"0",
acme_util.get_chall_msg("0", "nonce0", challenges),
messages.Challenge(session_id="0", nonce="nonce0",
challenges=challenges, combinations=[]),
"dummy_key")
# Don't do anything to satisfy challenges
@ -356,7 +406,7 @@ class GetAuthorizationsTest(unittest.TestCase):
def _sat_failure(self):
dom = "0"
self.handler.paths[dom] = gen_path(
["dns", "recoveryToken"], self.handler.msgs[dom]["challenges"])
["dns", "recoveryToken"], self.handler.msgs[dom].challenges)
dv_c, c_c = self.handler._challenge_factory(
dom, self.handler.paths[dom])
self.handler.dv_c[dom], self.handler.client_c[dom] = dv_c, c_c
@ -369,7 +419,8 @@ class GetAuthorizationsTest(unittest.TestCase):
dom = str(i)
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg(dom, "nonce%d" % i, challs[i]),
messages.Challenge(session_id=dom, nonce="nonce%d" % i,
challenges=challs[i], combinations=[]),
"dummy_key")
self.mock_sat_chall.side_effect = self._sat_incremental
@ -462,7 +513,7 @@ class PathSatisfiedTest(unittest.TestCase):
def gen_auth_resp(chall_list):
"""Generate a dummy authorization response."""
return ["%s%s" % (type(chall).__name__, chall.domain)
return ["%s%s" % (chall.__class__.__name__, chall.domain)
for chall in chall_list]
@ -484,5 +535,5 @@ def gen_path(str_list, challenges):
return path
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View file

@ -6,6 +6,8 @@ import unittest
import M2Crypto
from letsencrypt.acme import jose
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import le_util
@ -19,7 +21,7 @@ class DvsniGenCertTest(unittest.TestCase):
"""Basic test for straightline code."""
domain = "example.com"
dvsni_r = "r_value"
r_b64 = le_util.jose_b64encode(dvsni_r)
r_b64 = jose.b64encode(dvsni_r)
pem = pkg_resources.resource_string(
__name__, os.path.join("testdata", "rsa256_key.pem"))
key = le_util.Key("path", pem)
@ -28,7 +30,7 @@ class DvsniGenCertTest(unittest.TestCase):
# pylint: disable=protected-access
ext = challenge_util._dvsni_gen_ext(
dvsni_r, le_util.jose_b64decode(s_b64))
dvsni_r, jose.b64decode(s_b64))
self._standard_check_cert(cert_pem, domain, nonce, ext)
def _standard_check_cert(self, pem, domain, nonce, ext):

View file

@ -3,6 +3,9 @@ import unittest
import mock
from letsencrypt.client import challenge_util
from letsencrypt.client import errors
class PerformTest(unittest.TestCase):
"""Test client perform function."""
@ -16,33 +19,27 @@ class PerformTest(unittest.TestCase):
name="rec_token_perform", side_effect=gen_client_resp)
def test_rec_token1(self):
from letsencrypt.client.challenge_util import RecTokenChall
token = RecTokenChall("0")
token = challenge_util.RecTokenChall("0")
responses = self.auth.perform([token])
self.assertEqual(responses, ["RecTokenChall0"])
def test_rec_token5(self):
from letsencrypt.client.challenge_util import RecTokenChall
tokens = []
for i in range(5):
tokens.append(RecTokenChall(str(i)))
for i in xrange(5):
tokens.append(challenge_util.RecTokenChall(str(i)))
responses = self.auth.perform(tokens)
self.assertEqual(len(responses), 5)
for i in range(5):
for i in xrange(5):
self.assertEqual(responses[i], "RecTokenChall%d" % i)
def test_unexpected(self):
from letsencrypt.client.challenge_util import DvsniChall
from letsencrypt.client.errors import LetsEncryptClientAuthError
unexpected = DvsniChall("0", "rb64", "123", "invalid_key")
unexpected = challenge_util.DvsniChall(
"0", "rb64", "123", "invalid_key")
self.assertRaises(
LetsEncryptClientAuthError, self.auth.perform, [unexpected])
errors.LetsEncryptClientAuthError, self.auth.perform, [unexpected])
class CleanupTest(unittest.TestCase):
@ -57,9 +54,8 @@ class CleanupTest(unittest.TestCase):
self.auth.rec_token.cleanup = self.mock_cleanup
def test_rec_token2(self):
from letsencrypt.client.challenge_util import RecTokenChall
token1 = RecTokenChall("0")
token2 = RecTokenChall("1")
token1 = challenge_util.RecTokenChall("0")
token2 = challenge_util.RecTokenChall("1")
self.auth.cleanup([token1, token2])
@ -67,20 +63,16 @@ class CleanupTest(unittest.TestCase):
[mock.call(token1), mock.call(token2)])
def test_unexpected(self):
from letsencrypt.client.challenge_util import DvsniChall
from letsencrypt.client.challenge_util import RecTokenChall
from letsencrypt.client.errors import LetsEncryptClientAuthError
token = challenge_util.RecTokenChall("0")
unexpected = challenge_util.DvsniChall("0", "rb64", "123", "dummy_key")
token = RecTokenChall("0")
unexpected = DvsniChall("0", "rb64", "123", "dummy_key")
self.assertRaises(
LetsEncryptClientAuthError, self.auth.cleanup, [token, unexpected])
self.assertRaises(errors.LetsEncryptClientAuthError,
self.auth.cleanup, [token, unexpected])
def gen_client_resp(chall):
"""Generate a dummy response."""
return "%s%s" % (type(chall).__name__, chall.domain)
return "%s%s" % (chall.__class__.__name__, chall.domain)
if __name__ == '__main__':

View file

@ -10,43 +10,6 @@ RSA256_KEY = pkg_resources.resource_string(__name__, 'testdata/rsa256_key.pem')
RSA512_KEY = pkg_resources.resource_string(__name__, 'testdata/rsa512_key.pem')
class CreateSigTest(unittest.TestCase):
"""Tests for letsencrypt.client.crypto_util.create_sig."""
def setUp(self):
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ'
self.signature = {
'nonce': self.b64nonce,
'alg': 'RS256',
'jwk': {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
},
'sig': 'SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r'
'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew',
}
@classmethod
def _call(cls, *args, **kwargs):
from letsencrypt.client.crypto_util import create_sig
return create_sig(*args, **kwargs)
def test_it(self):
self.assertEqual(
self._call('message', RSA256_KEY, self.nonce), self.signature)
def test_random_nonce(self):
signature = self._call('message', RSA256_KEY)
signature.pop('sig')
signature.pop('nonce')
del self.signature['sig']
del self.signature['nonce']
self.assertEqual(signature, self.signature)
class ValidCSRTest(unittest.TestCase):
"""Tests for letsencrypt.client.crypto_util.valid_csr."""
@ -169,17 +132,5 @@ class MakeSSCertTest(unittest.TestCase):
# self._call('cert-san.pem')
class B64CertToPEMTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Tests for letsencrypt.client.crypto_util.b64_cert_to_pem."""
def test_it(self):
from letsencrypt.client.crypto_util import b64_cert_to_pem
self.assertEqual(
b64_cert_to_pem(pkg_resources.resource_string(
__name__, 'testdata/cert.b64jose')),
pkg_resources.resource_string(__name__, 'testdata/cert.pem'))
if __name__ == '__main__':
unittest.main()

View file

@ -122,71 +122,5 @@ class UniqueFileTest(unittest.TestCase):
self.assertTrue(basename3.endswith('foo.txt'))
# https://en.wikipedia.org/wiki/Base64#Examples
JOSE_B64_PADDING_EXAMPLES = {
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
}
B64_URL_UNSAFE_EXAMPLES = {
chr(251) + chr(239): '--8',
chr(255) * 2: '__8',
}
class JOSEB64EncodeTest(unittest.TestCase):
"""Tests for letsencrypt.client.le_util.jose_b64encode."""
@classmethod
def _call(cls, data):
from letsencrypt.client.le_util import jose_b64encode
return jose_b64encode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_different_paddings(self):
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_unicode_fails_with_type_error(self):
self.assertRaises(TypeError, self._call, u'some unicode')
class JOSEB64DecodeTest(unittest.TestCase):
"""Tests for letsencrypt.client.le_util.jose_b64decode."""
@classmethod
def _call(cls, data):
from letsencrypt.client.le_util import jose_b64decode
return jose_b64decode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_without_padding(self):
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_with_padding(self):
for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64 + pad), text)
def test_unicode_with_ascii(self):
self.assertEqual(self._call(u'YQ'), 'a')
def test_non_ascii_unicode_fails(self):
self.assertRaises(ValueError, self._call, u'\u0105')
def test_type_error_no_unicode_or_str(self):
self.assertRaises(TypeError, self._call, object())
if __name__ == '__main__':
unittest.main()

View file

@ -6,6 +6,8 @@ import tempfile
import mock
from letsencrypt.client import challenge_util
class RecoveryTokenTest(unittest.TestCase):
def setUp(self):
@ -31,32 +33,32 @@ class RecoveryTokenTest(unittest.TestCase):
self.assertTrue(self.rec_token.requires_human("example3.com"))
def test_cleanup(self):
from letsencrypt.client.challenge_util import RecTokenChall
self.rec_token.store_token("example3.com", 333)
self.assertFalse(self.rec_token.requires_human("example3.com"))
self.rec_token.cleanup(RecTokenChall("example3.com"))
self.rec_token.cleanup(challenge_util.RecTokenChall("example3.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
# Shouldn't throw an error
self.rec_token.cleanup(RecTokenChall("example4.com"))
self.rec_token.cleanup(challenge_util.RecTokenChall("example4.com"))
def test_perform_stored(self):
from letsencrypt.client.challenge_util import RecTokenChall
self.rec_token.store_token("example4.com", 444)
response = self.rec_token.perform(RecTokenChall("example4.com"))
response = self.rec_token.perform(
challenge_util.RecTokenChall("example4.com"))
self.assertEqual(response, {"type": "recoveryToken", "token": "444"})
@mock.patch("letsencrypt.client.recovery_token.zope.component.getUtility")
def test_perform_not_stored(self, mock_input):
from letsencrypt.client.challenge_util import RecTokenChall
mock_input().input.side_effect = [(0, "555"), (1, "000")]
response = self.rec_token.perform(RecTokenChall("example5.com"))
response = self.rec_token.perform(
challenge_util.RecTokenChall("example5.com"))
self.assertEqual(response, {"type": "recoveryToken", "token": "555"})
response = self.rec_token.perform(RecTokenChall("example6.com"))
response = self.rec_token.perform(
challenge_util.RecTokenChall("example6.com"))
self.assertTrue(response is None)

View file

@ -1,17 +1,16 @@
#!/usr/bin/env python
"""Tests for standalone_authenticator.py."""
import mock
import unittest
"""Tests for letsencrypt.client.standalone_authenticator."""
import os
import pkg_resources
import signal
import socket
import unittest
import mock
import OpenSSL.crypto
import OpenSSL.SSL
from letsencrypt.acme import jose
from letsencrypt.client import challenge_util
from letsencrypt.client import le_util
@ -20,7 +19,7 @@ from letsencrypt.client import le_util
# after one iteration, based on.
# http://igorsobreira.com/2013/03/17/testing-infinite-loops.html
class SocketAcceptOnlyNTimes(object):
class _SocketAcceptOnlyNTimes(object):
# pylint: disable=too-few-public-methods
"""
Callable that will raise `CallableExhausted`
@ -39,6 +38,7 @@ class SocketAcceptOnlyNTimes(object):
# Modified here for a single use as socket.accept()
return (mock.MagicMock(), "ignored")
class CallableExhausted(Exception):
# pylint: disable=too-few-public-methods
"""Exception raised when a method is called more than the
@ -63,9 +63,9 @@ class SNICallbackTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", le_util.jose_b64encode("x" * 32)
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)
self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0]
private_key = OpenSSL.crypto.load_privatekey(
@ -98,6 +98,7 @@ class SNICallbackTest(unittest.TestCase):
called_ctx = connection.set_context.call_args[0][0]
self.assertTrue(isinstance(called_ctx, OpenSSL.SSL.Context))
class ClientSignalHandlerTest(unittest.TestCase):
"""Tests for client_signal_handler() method."""
def setUp(self):
@ -179,6 +180,79 @@ class SubprocSignalHandlerTest(unittest.TestCase):
mock_exit.assert_called_once_with(0)
class AlreadyListeningTest(unittest.TestCase):
"""Tests for already_listening() method."""
def setUp(self):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
def test_subprocess_fails(self, mock_popen):
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = ("foo", "bar")
subprocess_object.wait.return_value = 1
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertFalse(result)
subprocess_object.wait.assert_called_once_with()
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
def test_no_relevant_line(self, mock_popen):
# pylint: disable=line-too-long,trailing-whitespace
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = (
"""Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.1.1:53 0.0.0.0:* LISTEN 1234/foo
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 2345/bar
tcp 0 0 0.0.0.0:180 0.0.0.0:* LISTEN 11111/hello """,
"I am the standard error")
subprocess_object.wait.return_value = 0
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertFalse(result)
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
@mock.patch("letsencrypt.client.standalone_authenticator."
"zope.component.getUtility")
def test_has_relevant_line(self, mock_get_utility, mock_popen):
# pylint: disable=line-too-long,trailing-whitespace
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = (
"""Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.1.1:53 0.0.0.0:* LISTEN 1234/foo
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 2345/bar
tcp 0 0 0.0.0.0:17 0.0.0.0:* LISTEN 11111/hello
tcp 0 0 0.0.0.0:1728 0.0.0.0:* LISTEN 2345/bar """,
"I am the standard error")
subprocess_object.wait.return_value = 0
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertTrue(result)
self.assertEqual(mock_get_utility.call_count, 1)
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
@mock.patch("letsencrypt.client.standalone_authenticator."
"zope.component.getUtility")
def test_has_relevant_ipv6_line(self, mock_get_utility, mock_popen):
# pylint: disable=line-too-long,trailing-whitespace
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = (
"""Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.1.1:53 0.0.0.0:* LISTEN 1234/foo
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 2345/bar
tcp6 0 0 :::17 :::* LISTEN 11111/hello
tcp 0 0 0.0.0.0:1728 0.0.0.0:* LISTEN 2345/bar """,
"I am the standard error")
subprocess_object.wait.return_value = 0
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertTrue(result)
self.assertEqual(mock_get_utility.call_count, 1)
class PerformTest(unittest.TestCase):
"""Tests for perform() method."""
def setUp(self):
@ -186,10 +260,21 @@ class PerformTest(unittest.TestCase):
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
def test_perform_when_already_listening(self):
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
self.authenticator.already_listening = mock.Mock()
self.authenticator.already_listening.return_value = True
result = self.authenticator.perform([chall1])
self.assertEqual(result, [None])
def test_can_perform(self):
"""What happens if start_listener() returns True."""
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
@ -216,7 +301,7 @@ class PerformTest(unittest.TestCase):
def test_cannot_perform(self):
"""What happens if start_listener() returns False."""
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
@ -345,9 +430,9 @@ class DoChildProcessTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", le_util.jose_b64encode("x" * 32)
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)
self.key = key
self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0]
@ -412,10 +497,10 @@ class DoChildProcessTest(unittest.TestCase):
"OpenSSL.SSL.Connection")
@mock.patch("letsencrypt.client.standalone_authenticator.socket.socket")
@mock.patch("letsencrypt.client.standalone_authenticator.os.kill")
def test_do_child_process_success(self, mock_kill, mock_socket,
mock_connection):
def test_do_child_process_success(
self, mock_kill, mock_socket, mock_connection):
sample_socket = mock.MagicMock()
sample_socket.accept.side_effect = SocketAcceptOnlyNTimes(2)
sample_socket.accept.side_effect = _SocketAcceptOnlyNTimes(2)
mock_socket.return_value = sample_socket
mock_connection.return_value = mock.MagicMock()
self.assertRaises(
@ -457,5 +542,5 @@ class CleanupTest(unittest.TestCase):
self.assertRaises(ValueError, self.authenticator.cleanup, [chall])
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

23
linter_plugin.py Normal file
View file

@ -0,0 +1,23 @@
"""Let's Encrypt ACME PyLint plugin.
http://docs.pylint.org/plugins.html
"""
from astroid import MANAGER
from astroid import nodes
def register(unused_linter):
"""Register this module as PyLint plugin."""
def _transform(cls):
# fix the "no-member" error on instances of
# letsencrypt.acme.util.ImmutableMap subclasses (instance
# attributes are initialized dynamically based on __slots__)
if (('Message' in cls.basenames or 'ImmutableMap' in cls.basenames or
'util.ImmutableMap' in cls.basenames) and (cls.slots() is not None)):
for slot in cls.slots():
cls.locals[slot.value] = [nodes.EmptyNode()]
MANAGER.register_transform(nodes.Class, _transform)

View file

@ -61,6 +61,7 @@ setup(
url="https://letsencrypt.org",
packages=[
'letsencrypt',
'letsencrypt.acme',
'letsencrypt.client',
'letsencrypt.client.apache',
'letsencrypt.client.display',

View file

@ -10,11 +10,14 @@ commands =
pip install -e .[testing]
python setup.py test -q # -q does not suppress errors
setenv =
PYTHONPATH = {toxinidir}
[testenv:cover]
basepython = python2.7
commands =
pip install -e .[testing]
python setup.py nosetests --with-coverage --cover-min-percentage=66
python setup.py nosetests --with-coverage --cover-min-percentage=73
[testenv:lint]
# recent versions of pylint do not support Python 2.6 (#97, #187)