Updated tests to update challenge changes

This commit is contained in:
Brad Warren 2015-08-13 18:41:49 -07:00
commit 0aaf9f2be7
56 changed files with 983 additions and 950 deletions

View file

@ -2,6 +2,7 @@ language: python
services:
- rabbitmq
- mysql
# http://docs.travis-ci.com/user/ci-environment/#CI-environment-OS
before_install:
@ -18,8 +19,6 @@ env:
matrix:
- TOXENV=py26 BOULDER_INTEGRATION=1
- TOXENV=py27 BOULDER_INTEGRATION=1
- TOXENV=py33
- TOXENV=py34
- TOXENV=lint
- TOXENV=cover
@ -27,6 +26,7 @@ env:
addons:
hosts:
- le.wtf
mariadb: "10.0"
install: "travis_retry pip install tox coveralls"
before_script: '[ "xxx$BOULDER_INTEGRATION" = "xxx" ] || ./tests/boulder-start.sh'

65
Dockerfile-dev Normal file
View file

@ -0,0 +1,65 @@
# This Dockerfile builds an image for development.
FROM ubuntu:trusty
MAINTAINER Jakub Warmuz <jakub@warmuz.org>
MAINTAINER William Budington <bill@eff.org>
MAINTAINER Yan <yan@eff.org>
# Note: this only exposes the port to other docker containers. You
# still have to bind to 443@host at runtime, as per the ACME spec.
EXPOSE 443
# TODO: make sure --config-dir and --work-dir cannot be changed
# through the CLI (letsencrypt-docker wrapper that uses standalone
# authenticator and text mode only?)
VOLUME /etc/letsencrypt /var/lib/letsencrypt
WORKDIR /opt/letsencrypt
# no need to mkdir anything:
# https://docs.docker.com/reference/builder/#copy
# If <dest> doesn't exist, it is created along with all missing
# directories in its path.
# TODO: Install non-default Python versions for tox.
# TODO: Install Apache/Nginx for plugin development.
COPY bootstrap/ubuntu.sh /opt/letsencrypt/src/
RUN /opt/letsencrypt/src/ubuntu.sh && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* \
/tmp/* \
/var/tmp/*
# the above is not likely to change, so by putting it further up the
# Dockerfile we make sure we cache as much as possible
COPY setup.py README.rst CHANGES.rst MANIFEST.in requirements.txt EULA linter_plugin.py tox.cover.sh tox.ini /opt/letsencrypt/src/
# all above files are necessary for setup.py, however, package source
# code directory has to be copied separately to a subdirectory...
# https://docs.docker.com/reference/builder/#copy: "If <src> is a
# directory, the entire contents of the directory are copied,
# including filesystem metadata. Note: The directory itself is not
# copied, just its contents." Order again matters, three files are far
# more likely to be cached than the whole project directory
COPY letsencrypt /opt/letsencrypt/src/letsencrypt/
COPY acme /opt/letsencrypt/src/acme/
COPY letsencrypt-apache /opt/letsencrypt/src/letsencrypt-apache/
COPY letsencrypt-nginx /opt/letsencrypt/src/letsencrypt-nginx/
COPY tests /opt/letsencrypt/src/tests/
RUN virtualenv --no-site-packages -p python2 /opt/letsencrypt/venv && \
/opt/letsencrypt/venv/bin/pip install \
-r /opt/letsencrypt/src/requirements.txt \
-e /opt/letsencrypt/src/acme \
-e /opt/letsencrypt/src \
-e /opt/letsencrypt/src/letsencrypt-apache \
-e /opt/letsencrypt/src/letsencrypt-nginx \
-e /opt/letsencrypt/src[dev,docs,testing]
# install in editable mode (-e) to save space: it's not possible to
# "rm -rf /opt/letsencrypt/src" (it's stays in the underlaying image);
# this might also help in debugging: you can "docker run --entrypoint
# bash" and investigate, apply patches, etc.
ENV PATH /opt/letsencrypt/venv/bin:$PATH

View file

@ -1,12 +1,18 @@
.. notice for github users
Official **documentation**, including `installation instructions`_, is
available at https://letsencrypt.readthedocs.org.
Disclaimer
==========
Generic information about Let's Encrypt project can be found at
https://letsencrypt.org. Please read `Frequently Asked Questions (FAQ)
<https://letsencrypt.org/faq/>`_.
This is a **DEVELOPER PREVIEW** intended for developers and testers only.
**DO NOT RUN THIS CODE ON A PRODUCTION SERVER. IT WILL INSTALL CERTIFICATES
SIGNED BY A TEST CA, AND WILL CAUSE CERT WARNINGS FOR USERS.**
Browser-trusted certificates will be available in the coming months.
For more information regarding the status of the project, please see
https://letsencrypt.org. Be sure to checkout the
`Frequently Asked Questions (FAQ) <https://letsencrypt.org/faq/>`_.
About the Let's Encrypt Client
==============================
@ -18,7 +24,7 @@ In short: getting and installing SSL/TLS certificates made easy (`watch demo vid
The Let's Encrypt Client is a tool to automatically receive and install
X.509 certificates to enable TLS on servers. The client will
interoperate with the Let's Encrypt CA which will be issuing browser-trusted
certificates for free beginning the summer of 2015.
certificates for free.
It's all automated:
@ -32,7 +38,7 @@ All you need to do to sign a single domain is::
user@www:~$ sudo letsencrypt -d www.example.org auth
For multiple domains (SAN) use::
user@www:~$ sudo letsencrypt -d www.example.org -d example.org auth
and if you have a compatible web server (Apache or Nginx), Let's Encrypt can
@ -67,15 +73,6 @@ server automatically!::
.. _watch demo video: https://www.youtube.com/watch?v=Gas_sSB-5SU
Disclaimer
----------
This is a **DEVELOPER PREVIEW** intended for developers and testers only.
**DO NOT RUN THIS CODE ON A PRODUCTION SERVER. IT WILL INSTALL CERTIFICATES
SIGNED BY A TEST CA, AND WILL CAUSE CERT WARNINGS FOR USERS.**
Current Features
----------------
@ -99,6 +96,13 @@ Current Features
* Free and Open Source Software, made with Python.
Installation Instructions
-------------------------
Official **documentation**, including `installation instructions`_, is
available at https://letsencrypt.readthedocs.org.
Links
-----

View file

@ -1,13 +1,9 @@
"""ACME Identifier Validation Challenges."""
import binascii
import functools
import hashlib
import logging
import os
import socket
from cryptography.hazmat.backends import default_backend
from cryptography import x509
import OpenSSL
import requests
@ -54,43 +50,45 @@ class SimpleHTTP(DVChallenge):
"""
typ = "simpleHttp"
token = jose.Field("token")
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
"""Minimum size of the :attr:`token` in bytes."""
# TODO: acme-spec doesn't specify token as base64-encoded value
token = jose.Field(
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
@property
def good_token(self): # XXX: @token.decoder
"""Is `token` good?
.. todo:: acme-spec wants "It MUST NOT contain any non-ASCII
characters", but it should also warrant that it doesn't
contain ".." or "/"...
"""
# TODO: check that path combined with uri does not go above
# URI_ROOT_PATH!
return b'..' not in self.token and b'/' not in self.token
@ChallengeResponse.register
class SimpleHTTPResponse(ChallengeResponse):
"""ACME "simpleHttp" challenge response.
:ivar unicode path:
:ivar unicode tls:
:ivar bool tls:
"""
typ = "simpleHttp"
path = jose.Field("path")
tls = jose.Field("tls", default=True, omitempty=True)
URI_ROOT_PATH = ".well-known/acme-challenge"
"""URI root path for the server provisioned resource."""
_URI_TEMPLATE = "{scheme}://{domain}/" + URI_ROOT_PATH + "/{path}"
_URI_TEMPLATE = "{scheme}://{domain}/" + URI_ROOT_PATH + "/{token}"
MAX_PATH_LEN = 25
"""Maximum allowed `path` length."""
CONTENT_TYPE = "text/plain"
@property
def good_path(self):
"""Is `path` good?
.. todo:: acme-spec: "The value MUST be comprised entirely of
characters from the URL-safe alphabet for Base64 encoding
[RFC4648]", base64.b64decode ignores those characters
"""
# TODO: check that path combined with uri does not go above
# URI_ROOT_PATH!
return len(self.path) <= 25
CONTENT_TYPE = "application/jose+json"
@property
def scheme(self):
@ -102,19 +100,73 @@ class SimpleHTTPResponse(ChallengeResponse):
"""Port that the ACME client should be listening for validation."""
return 443 if self.tls else 80
def uri(self, domain):
def uri(self, domain, chall):
"""Create an URI to the provisioned resource.
Forms an URI to the HTTPS server provisioned resource
(containing :attr:`~SimpleHTTP.token`).
:param unicode domain: Domain name being verified.
:param challenges.SimpleHTTP chall:
"""
return self._URI_TEMPLATE.format(
scheme=self.scheme, domain=domain, path=self.path)
scheme=self.scheme, domain=domain, token=chall.encode("token"))
def simple_verify(self, chall, domain, port=None):
def gen_resource(self, chall):
"""Generate provisioned resource.
:param .SimpleHTTP chall:
:rtype: SimpleHTTPProvisionedResource
"""
return SimpleHTTPProvisionedResource(token=chall.token, tls=self.tls)
def gen_validation(self, chall, account_key, alg=jose.RS256, **kwargs):
"""Generate validation.
:param .SimpleHTTP chall:
:param .JWK account_key: Private account key.
:param .JWA alg:
:returns: `.SimpleHTTPProvisionedResource` signed in `.JWS`
:rtype: .JWS
"""
return jose.JWS.sign(
payload=self.gen_resource(chall).json_dumps(
sort_keys=True).encode('utf-8'),
key=account_key, alg=alg, **kwargs)
def check_validation(self, validation, chall, account_public_key):
"""Check validation.
:param .JWS validation:
:param .SimpleHTTP chall:
:type account_public_key:
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
wrapped in `.ComparableKey
:rtype: bool
"""
if not validation.verify(key=account_public_key):
return False
try:
resource = SimpleHTTPProvisionedResource.json_loads(
validation.payload.decode('utf-8'))
except jose.DeserializationError as error:
logger.debug(error)
return False
return resource.token == chall.token and resource.tls == self.tls
def simple_verify(self, chall, domain, account_public_key, port=None):
"""Simple verify.
According to the ACME specification, "the ACME server MUST
@ -123,6 +175,16 @@ class SimpleHTTPResponse(ChallengeResponse):
:param .SimpleHTTP chall: Corresponding challenge.
:param unicode domain: Domain name being verified.
:param account_public_key: Public key for the key pair
being authorized. If ``None`` key verification is not
performed!
:type account_public_key:
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
wrapped in `.ComparableKey
:param int port: Port used in the validation.
:returns: ``True`` iff validation is successful, ``False``
@ -138,77 +200,67 @@ class SimpleHTTPResponse(ChallengeResponse):
"Using non-standard port for SimpleHTTP verification: %s", port)
domain += ":{0}".format(port)
uri = self.uri(domain)
uri = self.uri(domain, chall)
logger.debug("Verifying %s at %s...", chall.typ, uri)
try:
http_response = requests.get(uri, verify=False)
except requests.exceptions.RequestException as error:
logger.error("Unable to reach %s: %s", uri, error)
return False
logger.debug(
"Received %s. Headers: %s", http_response, http_response.headers)
logger.debug("Received %s: %s. Headers: %s", http_response,
http_response.text, http_response.headers)
good_token = http_response.text == chall.token
if not good_token:
logger.error(
"Unable to verify %s! Expected: %r, returned: %r.",
uri, chall.token, http_response.text)
# TODO: spec contradicts itself, c.f.
# https://github.com/letsencrypt/acme-spec/pull/156/files#r33136438
good_ct = self.CONTENT_TYPE == http_response.headers.get(
"Content-Type", self.CONTENT_TYPE)
return self.good_path and good_ct and good_token
if self.CONTENT_TYPE != http_response.headers.get(
"Content-Type", self.CONTENT_TYPE):
return False
try:
validation = jose.JWS.json_loads(http_response.text)
except jose.DeserializationError as error:
logger.debug(error)
return False
return self.check_validation(validation, chall, account_public_key)
class SimpleHTTPProvisionedResource(jose.JSONObjectWithFields):
"""SimpleHTTP provisioned resource."""
typ = fields.Fixed("type", SimpleHTTP.typ)
token = SimpleHTTP._fields["token"]
# If the "tls" field is not included in the response, then
# validation object MUST have its "tls" field set to "true".
tls = jose.Field("tls", omitempty=False)
@Challenge.register
class DVSNI(DVChallenge):
"""ACME "dvsni" challenge.
:ivar bytes r: Random data, **not** base64-encoded.
:ivar bytes nonce: Random data, **not** hex-encoded.
:ivar bytes token: Random data, **not** base64-encoded.
"""
typ = "dvsni"
DOMAIN_SUFFIX = b".acme.invalid"
"""Domain name suffix."""
R_SIZE = 32
"""Required size of the :attr:`r` in bytes."""
NONCE_SIZE = 16
"""Required size of the :attr:`nonce` in bytes."""
PORT = 443
"""Port to perform DVSNI challenge."""
r = jose.Field("r", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
decoder=functools.partial(jose.decode_b64jose, size=R_SIZE))
nonce = jose.Field("nonce", encoder=jose.encode_hex16,
decoder=functools.partial(functools.partial(
jose.decode_hex16, size=NONCE_SIZE)))
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
"""Minimum size of the :attr:`token` in bytes."""
@property
def nonce_domain(self):
"""Domain name used in SNI.
token = jose.Field(
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
:rtype: bytes
def gen_response(self, account_key, alg=jose.RS256, **kwargs):
"""Generate response.
:param .JWK account_key: Private account key.
:rtype: .DVSNIResponse
"""
return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX
def probe_cert(self, domain, **kwargs):
"""Probe DVSNI challenge certificate."""
if "host" not in kwargs:
host = socket.gethostbyname(domain)
logging.debug('%s resolved to %s', domain, host)
kwargs["host"] = host
kwargs.setdefault("port", self.PORT)
kwargs["name"] = self.nonce_domain
# TODO: try different methods?
# pylint: disable=protected-access
return crypto_util.probe_sni(**kwargs)
return DVSNIResponse(validation=jose.JWS.sign(
payload=self.json_dumps(sort_keys=True).encode('utf-8'),
key=account_key, alg=alg, **kwargs))
@ChallengeResponse.register
@ -220,105 +272,138 @@ class DVSNIResponse(ChallengeResponse):
"""
typ = "dvsni"
DOMAIN_SUFFIX = DVSNI.DOMAIN_SUFFIX
DOMAIN_SUFFIX = b".acme.invalid"
"""Domain name suffix."""
S_SIZE = 32
"""Required size of the :attr:`s` in bytes."""
PORT = DVSNI.PORT
"""Port to perform DVSNI challenge."""
s = jose.Field("s", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
decoder=functools.partial(jose.decode_b64jose, size=S_SIZE))
validation = jose.Field("validation", decoder=jose.JWS.from_json)
def __init__(self, s=None, *args, **kwargs):
s = os.urandom(self.S_SIZE) if s is None else s
super(DVSNIResponse, self).__init__(s=s, *args, **kwargs)
def z(self, chall): # pylint: disable=invalid-name
"""Compute the parameter ``z``.
:param challenge: Corresponding challenge.
:type challenge: :class:`DVSNI`
@property
def z(self): # pylint: disable=invalid-name
"""The ``z`` parameter.
:rtype: bytes
"""
z = hashlib.new("sha256") # pylint: disable=invalid-name
z.update(chall.r)
z.update(self.s)
return z.hexdigest().encode()
# Instance of 'Field' has no 'signature' member
# pylint: disable=no-member
return hashlib.sha256(self.validation.signature.encode(
"signature").encode("utf-8")).hexdigest().encode()
def z_domain(self, chall):
@property
def z_domain(self):
"""Domain name for certificate subjectAltName.
:rtype bytes:
:rtype: bytes
"""
return self.z(chall) + self.DOMAIN_SUFFIX
z = self.z # pylint: disable=invalid-name
return z[:32] + b'.' + z[32:] + self.DOMAIN_SUFFIX
def gen_cert(self, chall, domain, key):
@property
def chall(self):
"""Get challenge encoded in the `validation` payload.
:rtype: DVSNI
"""
# pylint: disable=no-member
return DVSNI.json_loads(self.validation.payload.decode('utf-8'))
def gen_cert(self, key=None, bits=2048):
"""Generate DVSNI certificate.
:param .DVSNI chall: Corresponding challenge.
:param unicode domain:
:param OpenSSL.crypto.PKey
:param OpenSSL.crypto.PKey key: Optional private key used in
certificate generation. If not provided (``None``), then
fresh key will be generated.
:param int bits: Number of bits for newly generated key.
:rtype: `tuple` of `OpenSSL.crypto.X509` and
`OpenSSL.crypto.PKey`
"""
if key is None:
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
return crypto_util.gen_ss_cert(key, [
domain, chall.nonce_domain.decode(), self.z_domain(chall).decode()])
# z_domain is too big to fit into CN, hence first dummy domain
'dummy', self.z_domain.decode()], force_san=True), key
def simple_verify(self, chall, domain, public_key, **kwargs):
def probe_cert(self, domain, **kwargs):
"""Probe DVSNI challenge certificate.
:param unicode domain:
"""
if "host" not in kwargs:
host = socket.gethostbyname(domain)
logging.debug('%s resolved to %s', domain, host)
kwargs["host"] = host
kwargs.setdefault("port", self.PORT)
kwargs["name"] = self.z_domain
# TODO: try different methods?
# pylint: disable=protected-access
return crypto_util.probe_sni(**kwargs)
def verify_cert(self, cert):
"""Verify DVSNI challenge certificate."""
# pylint: disable=protected-access
sans = crypto_util._pyopenssl_cert_or_req_san(cert)
logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans)
return self.z_domain.decode() in sans
def simple_verify(self, chall, domain, account_public_key,
cert=None, **kwargs):
"""Simple verify.
Probes DVSNI certificate and checks it using `verify_cert`;
hence all arguments documented in `verify_cert`.
"""
try:
cert = chall.probe_cert(domain=domain, **kwargs)
except errors.Error as error:
logger.debug(error, exc_info=True)
return False
return self.verify_cert(chall, domain, public_key, cert)
def verify_cert(self, chall, domain, public_key, cert):
"""Verify DVSNI certificate.
Verify ``validation`` using ``account_public_key``, optionally
probe DVSNI certificate and check using `verify_cert`.
:param .challenges.DVSNI chall: Corresponding challenge.
:param str domain: Domain name being validated.
:param public_key: Public key for the key pair
being authorized. If ``None`` key verification is not
performed!
:type public_key:
:type account_public_key:
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
wrapped in `.ComparableKey
:param OpenSSL.crypto.X509 cert:
:param OpenSSL.crypto.X509 cert: Optional certificate. If not
provided (``None``) certificate will be retrieved using
`probe_cert`.
:returns: ``True`` iff client's control of the domain has been
verified, ``False`` otherwise.
:rtype: bool
"""
# TODO: check "It is a valid self-signed certificate" and
# return False if not
# pylint: disable=protected-access
sans = crypto_util._pyopenssl_cert_or_req_san(cert)
logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans)
cert = x509.load_der_x509_certificate(
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert),
default_backend())
if public_key is None:
logging.warn('No key verification is performed')
elif public_key != jose.ComparableKey(cert.public_key()):
# pylint: disable=no-member
if not self.validation.verify(key=account_public_key):
return False
return domain in sans and self.z_domain(chall).decode() in sans
# TODO: it's not checked that payload has exectly 2 fields!
try:
decoded_chall = self.chall
except jose.DeserializationError as error:
logger.debug(error, exc_info=True)
return False
if decoded_chall.token != chall.token:
logger.debug("Wrong token: expected %r, found %r",
chall.token, decoded_chall.token)
return False
if cert is None:
try:
cert = self.probe_cert(domain=domain, **kwargs)
except errors.Error as error:
logger.debug(error, exc_info=True)
return False
return self.verify_cert(cert)
@Challenge.register
@ -348,23 +433,6 @@ class RecoveryContactResponse(ChallengeResponse):
token = jose.Field("token", omitempty=True)
@Challenge.register
class RecoveryToken(ContinuityChallenge):
"""ACME "recoveryToken" challenge."""
typ = "recoveryToken"
@ChallengeResponse.register
class RecoveryTokenResponse(ChallengeResponse):
"""ACME "recoveryToken" challenge response.
:ivar unicode token:
"""
typ = "recoveryToken"
token = jose.Field("token", omitempty=True)
@Challenge.register
class ProofOfPossession(ContinuityChallenge):
"""ACME "proofOfPossession" challenge.

View file

@ -22,10 +22,11 @@ class SimpleHTTPTest(unittest.TestCase):
def setUp(self):
from acme.challenges import SimpleHTTP
self.msg = SimpleHTTP(
token='evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA')
token=jose.decode_b64jose(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA'))
self.jmsg = {
'type': 'simpleHttp',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
}
def test_to_partial_json(self):
@ -39,56 +40,36 @@ class SimpleHTTPTest(unittest.TestCase):
from acme.challenges import SimpleHTTP
hash(SimpleHTTP.from_json(self.jmsg))
def test_good_token(self):
self.assertTrue(self.msg.good_token)
self.assertFalse(
self.msg.update(token=b'..').good_token)
class SimpleHTTPResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
def setUp(self):
from acme.challenges import SimpleHTTPResponse
self.msg_http = SimpleHTTPResponse(
path='6tbIMBC5Anhl5bOlWT5ZFA', tls=False)
self.msg_https = SimpleHTTPResponse(path='6tbIMBC5Anhl5bOlWT5ZFA')
self.msg_http = SimpleHTTPResponse(tls=False)
self.msg_https = SimpleHTTPResponse(tls=True)
self.jmsg_http = {
'resource': 'challenge',
'type': 'simpleHttp',
'path': '6tbIMBC5Anhl5bOlWT5ZFA',
'tls': False,
}
self.jmsg_https = {
'resource': 'challenge',
'type': 'simpleHttp',
'path': '6tbIMBC5Anhl5bOlWT5ZFA',
'tls': True,
}
from acme.challenges import SimpleHTTP
self.chall = SimpleHTTP(token="foo")
self.resp_http = SimpleHTTPResponse(path="bar", tls=False)
self.resp_https = SimpleHTTPResponse(path="bar", tls=True)
self.chall = SimpleHTTP(token=(b"x" * 16))
self.resp_http = SimpleHTTPResponse(tls=False)
self.resp_https = SimpleHTTPResponse(tls=True)
self.good_headers = {'Content-Type': SimpleHTTPResponse.CONTENT_TYPE}
def test_good_path(self):
self.assertTrue(self.msg_http.good_path)
self.assertTrue(self.msg_https.good_path)
self.assertFalse(
self.msg_http.update(path=(self.msg_http.path * 10)).good_path)
def test_scheme(self):
self.assertEqual('http', self.msg_http.scheme)
self.assertEqual('https', self.msg_https.scheme)
def test_port(self):
self.assertEqual(80, self.msg_http.port)
self.assertEqual(443, self.msg_https.port)
def test_uri(self):
self.assertEqual(
'http://example.com/.well-known/acme-challenge/'
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_http.uri('example.com'))
self.assertEqual(
'https://example.com/.well-known/acme-challenge/'
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_https.uri('example.com'))
def test_to_partial_json(self):
self.assertEqual(self.jmsg_http, self.msg_http.to_partial_json())
self.assertEqual(self.jmsg_https, self.msg_https.to_partial_json())
@ -105,34 +86,98 @@ class SimpleHTTPResponseTest(unittest.TestCase):
hash(SimpleHTTPResponse.from_json(self.jmsg_http))
hash(SimpleHTTPResponse.from_json(self.jmsg_https))
def test_scheme(self):
self.assertEqual('http', self.msg_http.scheme)
self.assertEqual('https', self.msg_https.scheme)
def test_port(self):
self.assertEqual(80, self.msg_http.port)
self.assertEqual(443, self.msg_https.port)
def test_uri(self):
self.assertEqual(
'http://example.com/.well-known/acme-challenge/'
'eHh4eHh4eHh4eHh4eHh4eA', self.msg_http.uri(
'example.com', self.chall))
self.assertEqual(
'https://example.com/.well-known/acme-challenge/'
'eHh4eHh4eHh4eHh4eHh4eA', self.msg_https.uri(
'example.com', self.chall))
def test_gen_check_validation(self):
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
self.assertTrue(self.resp_http.check_validation(
validation=self.resp_http.gen_validation(self.chall, account_key),
chall=self.chall, account_public_key=account_key.public_key()))
def test_gen_check_validation_wrong_key(self):
key1 = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
key2 = jose.JWKRSA.load(test_util.load_vector('rsa1024_key.pem'))
self.assertFalse(self.resp_http.check_validation(
validation=self.resp_http.gen_validation(self.chall, key1),
chall=self.chall, account_public_key=key2.public_key()))
def test_check_validation_wrong_payload(self):
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
validations = tuple(
jose.JWS.sign(payload=payload, alg=jose.RS256, key=account_key)
for payload in (b'', b'{}', self.chall.json_dumps().encode('utf-8'),
self.resp_http.json_dumps().encode('utf-8'))
)
for validation in validations:
self.assertFalse(self.resp_http.check_validation(
validation=validation, chall=self.chall,
account_public_key=account_key.public_key()))
def test_check_validation_wrong_fields(self):
resource = self.resp_http.gen_resource(self.chall)
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
validations = tuple(
jose.JWS.sign(payload=bad_resource.json_dumps().encode('utf-8'),
alg=jose.RS256, key=account_key)
for bad_resource in (resource.update(tls=True),
resource.update(token=b'x'*20))
)
for validation in validations:
self.assertFalse(self.resp_http.check_validation(
validation=validation, chall=self.chall,
account_public_key=account_key.public_key()))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_good_token(self, mock_get):
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
for resp in self.resp_http, self.resp_https:
mock_get.reset_mock()
validation = resp.gen_validation(self.chall, account_key)
mock_get.return_value = mock.MagicMock(
text=self.chall.token, headers=self.good_headers)
self.assertTrue(resp.simple_verify(self.chall, "local"))
mock_get.assert_called_once_with(resp.uri("local"), verify=False)
text=validation.json_dumps(), headers=self.good_headers)
self.assertTrue(resp.simple_verify(self.chall, "local", None))
mock_get.assert_called_once_with(resp.uri(
"local", self.chall), verify=False)
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_token(self, mock_get):
mock_get.return_value = mock.MagicMock(
text=self.chall.token + "!", headers=self.good_headers)
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
self.assertFalse(self.resp_http.simple_verify(
self.chall, "local", None))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_content_type(self, mock_get):
mock_get().text = self.chall.token
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
self.assertFalse(self.resp_http.simple_verify(
self.chall, "local", None))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_connection_error(self, mock_get):
mock_get.side_effect = requests.exceptions.RequestException
self.assertFalse(self.resp_http.simple_verify(self.chall, "local"))
self.assertFalse(self.resp_http.simple_verify(
self.chall, "local", None))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_port(self, mock_get):
self.resp_http.simple_verify(self.chall, "local", 4430)
self.resp_http.simple_verify(
self.chall, domain="local", account_public_key=None, port=4430)
self.assertEqual("local:4430", urllib_parse.urlparse(
mock_get.mock_calls[0][1][0]).netloc)
@ -142,19 +187,12 @@ class DVSNITest(unittest.TestCase):
def setUp(self):
from acme.challenges import DVSNI
self.msg = DVSNI(
r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=b'\xa8-_\xf8\xeft\r\x12\x88\x1fm<"w\xab.')
token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e'))
self.jmsg = {
'type': 'dvsni',
'r': 'Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI',
'nonce': 'a82d5ff8ef740d12881f6d3c2277ab2e',
'token': 'a82d5ff8ef740d12881f6d3c2277ab2e',
}
def test_nonce_domain(self):
self.assertEqual(b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid',
self.msg.nonce_domain)
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
@ -166,17 +204,66 @@ class DVSNITest(unittest.TestCase):
from acme.challenges import DVSNI
hash(DVSNI.from_json(self.jmsg))
def test_from_json_invalid_r_length(self):
def test_from_json_invalid_token_length(self):
from acme.challenges import DVSNI
self.jmsg['r'] = 'abcd'
self.jmsg['token'] = jose.encode_b64jose(b'abcd')
self.assertRaises(
jose.DeserializationError, DVSNI.from_json, self.jmsg)
def test_from_json_invalid_nonce_length(self):
def test_gen_response(self):
key = jose.JWKRSA(key=KEY)
from acme.challenges import DVSNI
self.jmsg['nonce'] = 'abcd'
self.assertRaises(
jose.DeserializationError, DVSNI.from_json, self.jmsg)
self.assertEqual(self.msg, DVSNI.json_loads(
self.msg.gen_response(key).validation.payload.decode()))
class DVSNIResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
def setUp(self):
self.key = jose.JWKRSA(key=KEY)
from acme.challenges import DVSNI
self.chall = DVSNI(
token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e'))
from acme.challenges import DVSNIResponse
self.validation = jose.JWS.sign(
payload=self.chall.json_dumps(sort_keys=True).encode(),
key=self.key, alg=jose.RS256)
self.msg = DVSNIResponse(validation=self.validation)
self.jmsg_to = {
'resource': 'challenge',
'type': 'dvsni',
'validation': self.validation,
}
self.jmsg_from = {
'resource': 'challenge',
'type': 'dvsni',
'validation': self.validation.to_json(),
}
# pylint: disable=invalid-name
label1 = b'e2df3498860637c667fedadc5a8494ec'
label2 = b'09dcc75553c9b3bd73662b50e71b1e42'
self.z = label1 + label2
self.z_domain = label1 + b'.' + label2 + b'.acme.invalid'
self.domain = 'foo.com'
def test_z_and_domain(self):
self.assertEqual(self.z, self.msg.z)
self.assertEqual(self.z_domain, self.msg.z_domain)
def test_to_partial_json(self):
self.assertEqual(self.jmsg_to, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import DVSNIResponse
self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg_from))
def test_from_json_hashable(self):
from acme.challenges import DVSNIResponse
hash(DVSNIResponse.from_json(self.jmsg_from))
@mock.patch('acme.challenges.socket.gethostbyname')
@mock.patch('acme.challenges.crypto_util.probe_sni')
@ -186,7 +273,7 @@ class DVSNITest(unittest.TestCase):
mock_gethostbyname.assert_called_once_with('foo.com')
mock_probe_sni.assert_called_once_with(
host='127.0.0.1', port=self.msg.PORT,
name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
name=self.z_domain)
self.msg.probe_cert('foo.com', host='8.8.8.8')
mock_probe_sni.assert_called_with(
@ -203,88 +290,54 @@ class DVSNITest(unittest.TestCase):
self.msg.probe_cert('foo.com', name=b'xxx')
mock_probe_sni.assert_called_with(
host=mock.ANY, port=mock.ANY,
name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
name=self.z_domain)
def test_gen_verify_cert(self):
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
cert, key2 = self.msg.gen_cert(key1)
self.assertEqual(key1, key2)
self.assertTrue(self.msg.verify_cert(cert))
class DVSNIResponseTest(unittest.TestCase):
def test_gen_verify_cert_gen_key(self):
cert, key = self.msg.gen_cert()
self.assertTrue(isinstance(key, OpenSSL.crypto.PKey))
self.assertTrue(self.msg.verify_cert(cert))
def setUp(self):
from acme.challenges import DVSNIResponse
# pylint: disable=invalid-name
s = '9dbjsl3gTAtOnEtKFEmhS6Mj-ajNjDcOmRkp3Lfzm3c'
self.msg = DVSNIResponse(s=jose.decode_b64jose(s))
self.jmsg = {
'resource': 'challenge',
'type': 'dvsni',
's': s,
}
def test_verify_bad_cert(self):
self.assertFalse(self.msg.verify_cert(test_util.load_cert('cert.pem')))
from acme.challenges import DVSNI
self.chall = DVSNI(
r=jose.decode_b64jose('Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI'),
nonce=jose.decode_b64jose('a82d5ff8ef740d12881f6d3c2277ab2e'))
self.z = (b'38e612b0397cc2624a07d351d7ef50e4'
b'6134c0213d9ed52f7d7c611acaeed41b')
self.domain = 'foo.com'
self.key = test_util.load_pyopenssl_private_key('rsa512_key.pem')
self.public_key = test_util.load_rsa_private_key(
'rsa512_key.pem').public_key()
def test_simple_verify_wrong_account_key(self):
self.assertFalse(self.msg.simple_verify(
self.chall, self.domain, jose.JWKRSA.load(
test_util.load_vector('rsa256_key.pem')).public_key()))
def test_z_and_domain(self):
# pylint: disable=invalid-name
self.assertEqual(self.z, self.msg.z(self.chall))
self.assertEqual(
self.z + b'.acme.invalid', self.msg.z_domain(self.chall))
def test_simple_verify_wrong_payload(self):
for payload in b'', b'{}':
msg = self.msg.update(validation=jose.JWS.sign(
payload=payload, key=self.key, alg=jose.RS256))
self.assertFalse(msg.simple_verify(
self.chall, self.domain, self.key.public_key()))
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_simple_verify_wrong_token(self):
msg = self.msg.update(validation=jose.JWS.sign(
payload=self.chall.update(token=b'b'*20).json_dumps().encode(),
key=self.key, alg=jose.RS256))
self.assertFalse(msg.simple_verify(
self.chall, self.domain, self.key.public_key()))
def test_from_json(self):
from acme.challenges import DVSNIResponse
self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import DVSNIResponse
hash(DVSNIResponse.from_json(self.jmsg))
@mock.patch('acme.challenges.DVSNIResponse.verify_cert')
@mock.patch('acme.challenges.DVSNIResponse.verify_cert', autospec=True)
def test_simple_verify(self, mock_verify_cert):
chall = mock.Mock()
chall.probe_cert.return_value = mock.sentinel.cert
mock_verify_cert.return_value = 'x'
self.assertEqual('x', self.msg.simple_verify(
chall, mock.sentinel.domain, mock.sentinel.key))
chall.probe_cert.assert_called_once_with(domain=mock.sentinel.domain)
self.msg.verify_cert.assert_called_once_with(
chall, mock.sentinel.domain, mock.sentinel.key,
mock.sentinel.cert)
mock_verify_cert.return_value = mock.sentinel.verification
self.assertEqual(mock.sentinel.verification, self.msg.simple_verify(
self.chall, self.domain, self.key.public_key(),
cert=mock.sentinel.cert))
mock_verify_cert.assert_called_once_with(self.msg, mock.sentinel.cert)
def test_simple_verify_false_on_probe_error(self):
chall = mock.Mock()
chall.probe_cert.side_effect = errors.Error
self.assertFalse(self.msg.simple_verify(
chall=chall, domain=None, public_key=None))
def test_gen_verify_cert_postive_no_key(self):
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
self.assertTrue(self.msg.verify_cert(
self.chall, self.domain, public_key=None, cert=cert))
def test_gen_verify_cert_postive_with_key(self):
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
self.assertTrue(self.msg.verify_cert(
self.chall, self.domain, public_key=self.public_key, cert=cert))
def test_gen_verify_cert_negative_with_wrong_key(self):
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
key = test_util.load_rsa_private_key('rsa256_key.pem').public_key()
self.assertFalse(self.msg.verify_cert(
self.chall, self.domain, public_key=key, cert=cert))
def test_gen_verify_cert_negative(self):
cert = self.msg.gen_cert(self.chall, self.domain + 'x', self.key)
self.assertFalse(self.msg.verify_cert(
self.chall, self.domain, public_key=None, cert=cert))
self.chall, self.domain, self.key.public_key()))
class RecoveryContactTest(unittest.TestCase):
@ -360,58 +413,6 @@ class RecoveryContactResponseTest(unittest.TestCase):
self.assertEqual(self.jmsg, msg.to_partial_json())
class RecoveryTokenTest(unittest.TestCase):
def setUp(self):
from acme.challenges import RecoveryToken
self.msg = RecoveryToken()
self.jmsg = {'type': 'recoveryToken'}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import RecoveryToken
self.assertEqual(self.msg, RecoveryToken.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import RecoveryToken
hash(RecoveryToken.from_json(self.jmsg))
class RecoveryTokenResponseTest(unittest.TestCase):
def setUp(self):
from acme.challenges import RecoveryTokenResponse
self.msg = RecoveryTokenResponse(token='23029d88d9e123e')
self.jmsg = {
'resource': 'challenge',
'type': 'recoveryToken',
'token': '23029d88d9e123e'
}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import RecoveryTokenResponse
self.assertEqual(
self.msg, RecoveryTokenResponse.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import RecoveryTokenResponse
hash(RecoveryTokenResponse.from_json(self.jmsg))
def test_json_without_optionals(self):
del self.jmsg['token']
from acme.challenges import RecoveryTokenResponse
msg = RecoveryTokenResponse.from_json(self.jmsg)
self.assertTrue(msg.token is None)
self.assertEqual(self.jmsg, msg.to_partial_json())
class ProofOfPossessionHintsTest(unittest.TestCase):
def setUp(self):

View file

@ -44,7 +44,7 @@ class ClientTest(unittest.TestCase):
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public_key(), recovery_token='t')
contact=self.contact, key=KEY.public_key())
self.new_reg = messages.NewRegistration(**dict(reg))
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',

View file

@ -155,13 +155,18 @@ def _pyopenssl_cert_or_req_san(cert_or_req):
for part in parts if part.startswith(prefix)]
def gen_ss_cert(key, domains, not_before=None, validity=(7 * 24 * 60 * 60)):
def gen_ss_cert(key, domains, not_before=None,
validity=(7 * 24 * 60 * 60), force_san=True):
"""Generate new self-signed certificate.
:type domains: `list` of `unicode`
:param OpenSSL.crypto.PKey key:
:param bool force_san:
Uses key and contains all domains.
If more than one domain is provided, all of the domains are put into
``subjectAltName`` X.509 extension and first domain is set as the
subject CN. If only one domain is provided no ``subjectAltName``
extension is used, unless `force_san` is ``True``.
"""
assert domains, "Must provide one or more hostnames for the cert."
@ -178,7 +183,7 @@ def gen_ss_cert(key, domains, not_before=None, validity=(7 * 24 * 60 * 60)):
# TODO: what to put into cert.get_subject()?
cert.set_issuer(cert.get_subject())
if len(domains) > 1:
if force_san or len(domains) > 1:
extensions.append(OpenSSL.crypto.X509Extension(
b"subjectAltName",
critical=False,

View file

@ -1,9 +1,34 @@
"""ACME JSON fields."""
import logging
import pyrfc3339
from acme import jose
logger = logging.getLogger(__name__)
class Fixed(jose.Field):
"""Fixed field."""
def __init__(self, json_name, value):
self.value = value
super(Fixed, self).__init__(
json_name=json_name, default=value, omitempty=False)
def decode(self, value):
if value != self.value:
raise jose.DeserializationError('Expected {0!r}'.format(self.value))
return self.value
def encode(self, value):
if value != self.value:
logger.warn(
'Overriding fixed field (%s) with %r', self.json_name, value)
return value
class RFC3339Field(jose.Field):
"""RFC3339 field encoder/decoder.
@ -31,8 +56,6 @@ class Resource(jose.Field):
def __init__(self, resource_type, *args, **kwargs):
self.resource_type = resource_type
super(Resource, self).__init__(
# TODO: omitempty used only to trick
# JSONObjectWithFieldsMeta._defaults..., server implementation
'resource', default=resource_type, *args, **kwargs)
def decode(self, value):

View file

@ -7,6 +7,26 @@ import pytz
from acme import jose
class FixedTest(unittest.TestCase):
"""Tests for acme.fields.Fixed."""
def setUp(self):
from acme.fields import Fixed
self.field = Fixed('name', 'x')
def test_decode(self):
self.assertEqual('x', self.field.decode('x'))
def test_decode_bad(self):
self.assertRaises(jose.DeserializationError, self.field.decode, 'y')
def test_encode(self):
self.assertEqual('x', self.field.encode('x'))
def test_encode_override(self):
self.assertEqual('y', self.field.encode('y'))
class RFC3339FieldTest(unittest.TestCase):
"""Tests for acme.fields.RFC3339Field."""

View file

@ -8,6 +8,10 @@ class Error(Exception):
class DeserializationError(Error):
"""JSON deserialization error."""
def __str__(self):
return "Deserialization error: {0}".format(
super(DeserializationError, self).__str__())
class SerializationError(Error):
"""JSON serialization error."""

View file

@ -5,6 +5,7 @@ import json
import six
from acme.jose import errors
from acme.jose import util
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
@ -172,7 +173,11 @@ class JSONDeSerializable(object):
@classmethod
def json_loads(cls, json_string):
"""Deserialize from JSON document string."""
return cls.from_json(json.loads(json_string))
try:
loads = json.loads(json_string)
except ValueError as error:
raise errors.DeserializationError(error)
return cls.from_json(loads)
def json_dumps(self, **kwargs):
"""Dump to JSON string using proper serializer.

View file

@ -221,6 +221,22 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
super(JSONObjectWithFields, self).__init__(
**(dict(self._defaults(), **kwargs)))
def encode(self, name):
"""Encode a single field.
:param str name: Name of the field to be encoded.
:raises erors.SerializationError: if field cannot be serialized
:raises errors.Error: if field could not be found
"""
try:
field = self._fields[name]
except KeyError:
raise errors.Error("Field not found: {0}".format(name))
return field.encode(getattr(self, name))
def fields_to_partial_json(self):
"""Serialize fields to JSON."""
jobj = {}
@ -310,7 +326,8 @@ def decode_b64jose(data, size=None, minimum=False):
if size is not None and ((not minimum and len(decoded) != size)
or (minimum and len(decoded) < size)):
raise errors.DeserializationError()
raise errors.DeserializationError(
"Expected at least or exactly {0} bytes".format(size))
return decoded
@ -418,7 +435,9 @@ class TypedJSONObjectWithFields(JSONObjectWithFields):
def get_type_cls(cls, jobj):
"""Get the registered class for ``jobj``."""
if cls in six.itervalues(cls.TYPES):
assert jobj[cls.type_field_name]
if cls.type_field_name not in jobj:
raise errors.DeserializationError(
"Missing type field ({0})".format(cls.type_field_name))
# cls is already registered type_cls, force to use it
# so that, e.g Revocation.from_json(jobj) fails if
# jobj["type"] != "revocation".

View file

@ -160,6 +160,18 @@ class JSONObjectWithFieldsTest(unittest.TestCase):
def test_init_defaults(self):
self.assertEqual(self.mock, self.MockJSONObjectWithFields(y=2, z=3))
def test_encode(self):
self.assertEqual(10, self.MockJSONObjectWithFields(
x=5, y=0, z=0).encode("x"))
def test_encode_wrong_field(self):
self.assertRaises(errors.Error, self.mock.encode, 'foo')
def test_encode_serialization_error_passthrough(self):
self.assertRaises(
errors.SerializationError,
self.MockJSONObjectWithFields(y=500, z=None).encode, "y")
def test_fields_to_partial_json_omits_empty(self):
self.assertEqual(self.mock.fields_to_partial_json(), {'y': 2, 'Z': 3})

View file

@ -156,7 +156,6 @@ class Registration(ResourceBody):
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec,
`tuple` of `unicode`.
:ivar unicode recovery_token:
:ivar unicode agreement:
"""
@ -164,7 +163,6 @@ class Registration(ResourceBody):
# JWS.signature.combined.jwk
key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json)
contact = jose.Field('contact', omitempty=True, default=())
recovery_token = jose.Field('recoveryToken', omitempty=True)
agreement = jose.Field('agreement', omitempty=True)
phone_prefix = 'tel:'

View file

@ -101,18 +101,14 @@ class RegistrationTest(unittest.TestCase):
'mailto:admin@foo.com',
'tel:1234',
)
recovery_token = 'XYZ'
agreement = 'https://letsencrypt.org/terms'
from acme.messages import Registration
self.reg = Registration(
key=key, contact=contact, recovery_token=recovery_token,
agreement=agreement)
self.reg = Registration(key=key, contact=contact, agreement=agreement)
self.reg_none = Registration()
self.jobj_to = {
'contact': contact,
'recoveryToken': recovery_token,
'agreement': agreement,
'key': key,
}
@ -228,11 +224,12 @@ class AuthorizationTest(unittest.TestCase):
self.challbs = (
ChallengeBody(
uri='http://challb1', status=STATUS_VALID,
chall=challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A')),
chall=challenges.SimpleHTTP(token=b'IlirfxKKXAsHtmzK29Pj8A')),
ChallengeBody(uri='http://challb2', status=STATUS_VALID,
chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')),
chall=challenges.DNS(
token=b'DGyRejmCefe7v4NfDGDKfA')),
ChallengeBody(uri='http://challb3', status=STATUS_VALID,
chall=challenges.RecoveryToken()),
chall=challenges.RecoveryContact()),
)
combinations = ((0, 2), (1, 2))

View file

@ -52,7 +52,7 @@ The following tools are there to help you:
before submitting a new pull request.
- ``tox -e cover`` checks the test coverage only. Calling the
``./tox-cover.sh`` script directly might be a bit quicker, though.
``./tox.cover.sh`` script directly might be a bit quicker, though.
- ``tox -e lint`` checks the style of the whole project, while
``pylint --rcfile=.pylintrc path`` will check a single file or
@ -78,7 +78,7 @@ patient - it will take some time... Once its ready, you will see
./tests/boulder-integration.sh && echo OK || echo FAIL
If you would like to test `lesencrypt_nginx` plugin (highly
If you would like to test `letsencrypt_nginx` plugin (highly
encouraged) make sure to install prerequisites as listed in
``tests/integration/nginx.sh``:
@ -121,6 +121,28 @@ Support for other Linux distributions coming soon.
.. _related issue: https://github.com/ClusterHQ/flocker/issues/516
Docker
------
OSX users will probably find it easiest to set up a Docker container for
development. Let's Encrypt comes with a Dockerfile (``Dockerfile-dev``)
for doing so. To use Docker on OSX, install boot2docker using the
instructions at https://docs.docker.com/installation/mac/ and start it
from the command line (``boot2docker init``).
To build the development Docker image::
docker build -t letsencrypt -f Dockerfile-dev .
Now run tests inside the Docker image:
.. code-block:: shell
docker run -it letsencrypt bash
cd src
tox -e py27
Code components and layout
==========================

View file

@ -163,7 +163,8 @@ class ApacheDvsni(common.Dvsni):
# parses it as "\n"... c.f.:
# https://docs.python.org/2.7/reference/lexical_analysis.html
return self.VHOST_TEMPLATE.format(
vhost=ips, server_name=achall.nonce_domain,
vhost=ips,
server_name=achall.gen_response(achall.account_key).z_domain,
ssl_options_conf_path=self.configurator.mod_ssl_conf,
cert_path=self.get_cert_path(achall),
key_path=self.get_key_path(achall),

View file

@ -11,7 +11,6 @@ from acme import challenges
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt.tests import acme_util
@ -374,11 +373,11 @@ class TwoVhost80Test(util.ApacheTest):
def test_perform(self, mock_restart, mock_dvsni_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
_, achall1, achall2 = self.get_achalls()
account_key, achall1, achall2 = self.get_achalls()
dvsni_ret_val = [
challenges.DVSNIResponse(s="randomS1"),
challenges.DVSNIResponse(s="randomS2"),
achall1.gen_response(account_key),
achall2.gen_response(account_key),
]
mock_dvsni_perform.return_value = dvsni_ret_val
@ -585,23 +584,21 @@ class TwoVhost80Test(util.ApacheTest):
def get_achalls(self):
"""Return testing achallenges."""
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
account_key = self.rsa512jwk
achall1 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
token="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q"),
"pending"),
domain="encryption-example.demo", key=auth_key)
domain="encryption-example.demo", account_key=account_key)
achall2 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
nonce="59ed014cac95f77057b1d7a1b2c596ba"),
token="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU"),
"pending"),
domain="letsencrypt.demo", key=auth_key)
domain="letsencrypt.demo", account_key=account_key)
return auth_key, achall1, achall2
return account_key, achall1, achall2
def test_make_addrs_sni_ready(self):
self.config.version = (2, 2)

View file

@ -4,8 +4,6 @@ import shutil
import mock
from acme import challenges
from letsencrypt.plugins import common_test
from letsencrypt_apache import obj
@ -15,6 +13,7 @@ from letsencrypt_apache.tests import util
class DvsniPerformTest(util.ApacheTest):
"""Test the ApacheDVSNI challenge."""
auth_key = common_test.DvsniTest.auth_key
achalls = common_test.DvsniTest.achalls
def setUp(self): # pylint: disable=arguments-differ
@ -44,8 +43,8 @@ class DvsniPerformTest(util.ApacheTest):
achall = self.achalls[0]
self.sni.add_chall(achall)
mock_setup_cert = mock.MagicMock(
return_value=challenges.DVSNIResponse(s="randomS1"))
response = self.achalls[0].gen_response(self.auth_key)
mock_setup_cert = mock.MagicMock(return_value=response)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
@ -58,22 +57,22 @@ class DvsniPerformTest(util.ApacheTest):
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)), 1)
self.assertEqual(len(responses), 1)
self.assertEqual(responses[0].s, "randomS1")
self.assertEqual(responses[0], response)
def test_perform2(self):
# Avoid load module
self.sni.configurator.parser.modules.add("ssl_module")
acme_responses = []
for achall in self.achalls:
self.sni.add_chall(achall)
acme_responses.append(achall.gen_response(self.auth_key))
mock_setup_cert = mock.MagicMock(side_effect=[
challenges.DVSNIResponse(s="randomS0"),
challenges.DVSNIResponse(s="randomS1")])
mock_setup_cert = mock.MagicMock(side_effect=acme_responses)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
responses = self.sni.perform()
sni_responses = self.sni.perform()
self.assertEqual(mock_setup_cert.call_count, 2)
@ -87,13 +86,16 @@ class DvsniPerformTest(util.ApacheTest):
len(self.sni.configurator.parser.find_dir(
"Include", self.sni.challenge_conf)),
1)
self.assertEqual(len(responses), 2)
self.assertEqual(len(sni_responses), 2)
for i in xrange(2):
self.assertEqual(responses[i].s, "randomS%d" % i)
self.assertEqual(sni_responses[i], acme_responses[i])
def test_mod_config(self):
z_domains = []
for achall in self.achalls:
self.sni.add_chall(achall)
z_domain = achall.gen_response(self.auth_key).z_domain
z_domains.append(set([z_domain]))
self.sni._mod_config() # pylint: disable=protected-access
self.sni.configurator.save()
@ -111,9 +113,7 @@ class DvsniPerformTest(util.ApacheTest):
for vhost in vhs:
self.assertEqual(vhost.addrs, set([obj.Addr.fromstring("*:443")]))
names = vhost.get_names()
self.assertTrue(
names == set([self.achalls[0].nonce_domain]) or
names == set([self.achalls[1].nonce_domain]))
self.assertTrue(names in z_domains)
def test_get_dvsni_addrs_default(self):
self.sni.configurator.choose_vhost = mock.Mock(

View file

@ -1,6 +1,5 @@
"""Common utilities for letsencrypt_apache."""
import os
import pkg_resources
import sys
import unittest
@ -8,10 +7,14 @@ import augeas
import mock
import zope.component
from acme import jose
from letsencrypt.display import util as display_util
from letsencrypt.plugins import common
from letsencrypt.tests import test_util
from letsencrypt_apache import configurator
from letsencrypt_apache import constants
from letsencrypt_apache import obj
@ -34,10 +37,8 @@ class ApacheTest(unittest.TestCase): # pylint: disable=too-few-public-methods
self.config_path = os.path.join(self.temp_dir, config_root)
self.rsa256_file = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa256_pem = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa512jwk = jose.JWKRSA.load(test_util.load_vector(
"rsa512_key.pem"))
class ParserTest(ApacheTest): # pytlint: disable=too-few-public-methods

View file

@ -63,7 +63,7 @@ def test_authenticator(plugin, config, temp_dir):
elif isinstance(responses[i], challenges.DVSNIResponse):
verify = functools.partial(responses[i].simple_verify, achalls[i],
achalls[i].domain,
util.JWK.key.public_key(),
util.JWK.public_key(),
host="127.0.0.1",
port=plugin.https_port)
if _try_until_true(verify):
@ -101,12 +101,11 @@ def _create_achalls(plugin):
for chall_type in prefs:
if chall_type == challenges.DVSNI:
chall = challenges.DVSNI(
r=os.urandom(challenges.DVSNI.R_SIZE),
nonce=os.urandom(challenges.DVSNI.NONCE_SIZE))
token=os.urandom(challenges.DVSNI.TOKEN_SIZE))
challb = acme_util.chall_to_challb(
chall, messages.STATUS_PENDING)
achall = achallenges.DVSNI(
challb=challb, domain=domain, key=util.JWK)
challb=challb, domain=domain, account_key=util.JWK)
achalls.append(achall)
return achalls
@ -249,6 +248,7 @@ def _create_backup(config, temp_dir):
shutil.rmtree(backup, ignore_errors=True)
shutil.copytree(config, backup, symlinks=True)
print backup
return backup

View file

@ -132,7 +132,8 @@ class NginxDvsni(common.Dvsni):
block = [['listen', str(addr)] for addr in addrs]
block.extend([['server_name', achall.nonce_domain],
block.extend([['server_name',
achall.gen_response(achall.account_key).z_domain],
['include', self.configurator.parser.loc["ssl_options"]],
# access and error logs necessary for
# integration testing (non-root)

View file

@ -387,9 +387,6 @@ def _wildcard_match(target_name, name, start):
parts.reverse()
match_parts.reverse()
if len(match_parts) == 0:
return False
# The first part must be a wildcard or blank, e.g. '.eff.org'
first = match_parts.pop(0)
if first != '*' and first != '':

View file

@ -11,7 +11,6 @@ from acme import messages
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt_nginx.tests import util
@ -174,27 +173,22 @@ class NginxConfiguratorTest(util.NginxTest):
def test_perform(self, mock_restart, mock_dvsni_perform):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
achall1 = achallenges.DVSNI(
challb=messages.ChallengeBody(
chall=challenges.DVSNI(
r="foo",
nonce="bar"),
chall=challenges.DVSNI(token="kNdwjwOeX0I_A8DXt9Msmg"),
uri="https://ca.org/chall0_uri",
status=messages.Status("pending"),
), domain="localhost", key=auth_key)
), domain="localhost", account_key=self.rsa512jwk)
achall2 = achallenges.DVSNI(
challb=messages.ChallengeBody(
chall=challenges.DVSNI(
r="abc",
nonce="def"),
chall=challenges.DVSNI(token="m8TdO1qik4JVFtgPPurJmg"),
uri="https://ca.org/chall1_uri",
status=messages.Status("pending"),
), domain="example.com", key=auth_key)
), domain="example.com", account_key=self.rsa512jwk)
dvsni_ret_val = [
challenges.DVSNIResponse(s="irrelevant"),
challenges.DVSNIResponse(s="arbitrary"),
achall1.gen_response(self.rsa512jwk),
achall2.gen_response(self.rsa512jwk),
]
mock_dvsni_perform.return_value = dvsni_ret_val

View file

@ -19,31 +19,26 @@ from letsencrypt_nginx.tests import util
class DvsniPerformTest(util.NginxTest):
"""Test the NginxDVSNI challenge."""
account_key = common_test.DvsniTest.auth_key
achalls = [
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="foo",
nonce="bar"
), "pending"),
domain="www.example.com", key=common_test.DvsniTest.auth_key),
challenges.DVSNI(token="kNdwjwOeX0I_A8DXt9Msmg"), "pending"),
domain="www.example.com", account_key=account_key),
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7"
"\xa1\xb2\xc5\x96\xba"
token="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y"
"\x80\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945"
), "pending"),
domain="blah", key=common_test.DvsniTest.auth_key),
domain="blah", account_key=account_key),
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9"
"\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18"
token="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd"
"\xeb9\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4"
), "pending"),
domain="www.example.org", key=common_test.DvsniTest.auth_key)
domain="www.example.org", account_key=account_key),
]
@ -76,8 +71,8 @@ class DvsniPerformTest(util.NginxTest):
@mock.patch("letsencrypt_nginx.configurator.NginxConfigurator.save")
def test_perform1(self, mock_save):
self.sni.add_chall(self.achalls[0])
mock_setup_cert = mock.MagicMock(
return_value=challenges.DVSNIResponse(s="nginxS1"))
response = self.achalls[0].gen_response(self.account_key)
mock_setup_cert = mock.MagicMock(return_value=response)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
@ -85,7 +80,7 @@ class DvsniPerformTest(util.NginxTest):
responses = self.sni.perform()
mock_setup_cert.assert_called_once_with(self.achalls[0])
self.assertEqual([challenges.DVSNIResponse(s="nginxS1")], responses)
self.assertEqual([response], responses)
self.assertEqual(mock_save.call_count, 2)
# Make sure challenge config is included in main config
@ -94,17 +89,16 @@ class DvsniPerformTest(util.NginxTest):
self.assertTrue(['include', self.sni.challenge_conf] in http[1])
def test_perform2(self):
acme_responses = []
for achall in self.achalls:
self.sni.add_chall(achall)
acme_responses.append(achall.gen_response(self.account_key))
mock_setup_cert = mock.MagicMock(side_effect=[
challenges.DVSNIResponse(s="nginxS0"),
challenges.DVSNIResponse(s="nginxS1"),
challenges.DVSNIResponse(s="nginxS2")])
mock_setup_cert = mock.MagicMock(side_effect=acme_responses)
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
responses = self.sni.perform()
sni_responses = self.sni.perform()
self.assertEqual(mock_setup_cert.call_count, 3)
@ -117,9 +111,9 @@ class DvsniPerformTest(util.NginxTest):
self.assertTrue(['include', self.sni.challenge_conf] in http[1])
self.assertTrue(['server_name', 'blah'] in http[1][-2][1])
self.assertEqual(len(responses), 3)
self.assertEqual(len(sni_responses), 3)
for i in xrange(3):
self.assertEqual(responses[i].s, "nginxS%d" % i)
self.assertEqual(sni_responses[i], acme_responses[i])
def test_mod_config(self):
self.sni.add_chall(self.achalls[0])
@ -144,12 +138,11 @@ class DvsniPerformTest(util.NginxTest):
for vhost in vhs:
if vhost.addrs == set(v_addr1):
self.assertEqual(
vhost.names, set([self.achalls[0].nonce_domain]))
response = self.achalls[0].gen_response(self.account_key)
else:
response = self.achalls[2].gen_response(self.account_key)
self.assertEqual(vhost.addrs, set(v_addr2))
self.assertEqual(
vhost.names, set([self.achalls[2].nonce_domain]))
self.assertEqual(vhost.names, set([response.z_domain]))
self.assertEqual(len(vhs), 2)

View file

@ -5,6 +5,10 @@ import unittest
import mock
from acme import jose
from letsencrypt.tests import test_util
from letsencrypt.plugins import common
from letsencrypt_nginx import constants
@ -25,10 +29,8 @@ class NginxTest(unittest.TestCase): # pylint: disable=too-few-public-methods
self.config_path = os.path.join(self.temp_dir, "etc_nginx")
self.rsa256_file = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa256_pem = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
self.rsa512jwk = jose.JWKRSA.load(test_util.load_vector(
"rsa512_key.pem"))
def get_data_filename(filename):

View file

@ -20,6 +20,7 @@ events {
}
http {
server_names_hash_bucket_size 2048;
# Set an array of temp and cache file options that will otherwise default to
# restricted locations accessible only to root.
client_body_temp_path $root/client_body;

View file

@ -94,15 +94,11 @@ def report_new_account(acc, config):
config.config_dir),
reporter.MEDIUM_PRIORITY, True)
assert acc.regr.body.recovery_token is not None
recovery_msg = ("If you lose your account credentials, you can recover "
"them using the token \"{0}\". You must write that down "
"and put it in a safe place.".format(
acc.regr.body.recovery_token))
if acc.regr.body.emails:
recovery_msg += (" Another recovery method will be e-mails sent to "
"{0}.".format(", ".join(acc.regr.body.emails)))
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
recovery_msg = ("If you lose your account credentials, you can "
"recover through e-mails sent to {0}.".format(
", ".join(acc.regr.body.emails)))
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
class AccountMemoryStorage(interfaces.AccountStorage):

View file

@ -17,18 +17,21 @@ Note, that all annotated challenges act as a proxy objects::
achall.token == challb.token
"""
import logging
import OpenSSL
from acme import challenges
from acme.jose import util as jose_util
from acme import jose
from letsencrypt import crypto_util
logger = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
class AnnotatedChallenge(jose_util.ImmutableMap):
class AnnotatedChallenge(jose.ImmutableMap):
"""Client annotated challenge.
Wraps around server provided challenge and annotates with data
@ -45,33 +48,68 @@ class AnnotatedChallenge(jose_util.ImmutableMap):
class DVSNI(AnnotatedChallenge):
"""Client annotated "dvsni" ACME challenge."""
__slots__ = ('challb', 'domain', 'key')
"""Client annotated "dvsni" ACME challenge.
:ivar .JWK account_key: Authorized Account Key
"""
__slots__ = ('challb', 'domain', 'account_key')
acme_type = challenges.DVSNI
def gen_cert_and_response(self, s=None): # pylint: disable=invalid-name
def gen_cert_and_response(self, key_pem=None, bits=2048, alg=jose.RS256):
"""Generate a DVSNI cert and response.
:returns: ``(cert_pem, response)`` tuple, where ``cert_pem`` is the PEM
encoded certificate and ``response`` is an instance
:class:`acme.challenges.DVSNIResponse`.
:param bytes key_pem: Private PEM-encoded key used for
certificate generation. If none provided, a fresh key will
be generated.
:param int bits: Number of bits for fresh key generation.
:param .JWAAlgorithm alg:
:returns: ``(response, cert_pem, key_pem)`` tuple, where
``response`` is an instance of
`acme.challenges.DVSNIResponse`, ``cert_pem`` is the
PEM-encoded certificate and ``key_pem`` is PEM-encoded
private key.
:rtype: tuple
"""
key = crypto_util.private_jwk_to_pyopenssl(self.key)
response = challenges.DVSNIResponse(s=s)
cert = response.gen_cert(self.challb.chall, self.domain, key)
key = None if key_pem is None else OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key_pem)
response = self.challb.chall.gen_response(self.account_key, alg=alg)
cert, key = response.gen_cert(key=key, bits=bits)
cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert)
key_pem = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key)
return cert_pem, response
return response, cert_pem, key_pem
class SimpleHTTP(AnnotatedChallenge):
"""Client annotated "simpleHttp" ACME challenge."""
__slots__ = ('challb', 'domain', 'key')
__slots__ = ('challb', 'domain', 'account_key')
acme_type = challenges.SimpleHTTP
def gen_response_and_validation(self, tls):
"""Generates a SimpleHTTP response and validation.
:param bool tls: True if TLS should be used
:returns: ``(response, validation)`` tuple, where ``response`` is
an instance of `acme.challenges.SimpleHTTPResponse` and
``validation`` is an instance of
`acme.challenges.SimpleHTTPProvisionedResource`.
:rtype: tuple
"""
response = challenges.SimpleHTTPResponse(tls=tls)
validation = response.gen_validation(
self.challb.chall, self.account_key)
logger.debug("Simple HTTP validation payload: %s", validation.payload)
return response, validation
class DNS(AnnotatedChallenge):
"""Client annotated "dns" ACME challenge."""
@ -85,12 +123,6 @@ class RecoveryContact(AnnotatedChallenge):
acme_type = challenges.RecoveryContact
class RecoveryToken(AnnotatedChallenge):
"""Client annotated "recoveryToken" ACME challenge."""
__slots__ = ('challb', 'domain')
acme_type = challenges.RecoveryToken
class ProofOfPossession(AnnotatedChallenge):
"""Client annotated "proofOfPossession" ACME challenge."""
__slots__ = ('challb', 'domain')

View file

@ -332,15 +332,11 @@ class AuthHandler(object):
return cont_chall, dv_chall
def challb_to_achall(challb, key, domain):
def challb_to_achall(challb, account_key, domain):
"""Converts a ChallengeBody object to an AnnotatedChallenge.
:param challb: ChallengeBody
:type challb: :class:`acme.messages.ChallengeBody`
:param key: Key
:type key: :class:`letsencrypt.le_util.Key`
:param .ChallengeBody challb: ChallengeBody
:param .JWK account_key: Authorized Account Key
:param str domain: Domain of the challb
:returns: Appropriate AnnotatedChallenge
@ -352,14 +348,12 @@ def challb_to_achall(challb, key, domain):
if isinstance(chall, challenges.DVSNI):
return achallenges.DVSNI(
challb=challb, domain=domain, key=key)
challb=challb, domain=domain, account_key=account_key)
elif isinstance(chall, challenges.SimpleHTTP):
return achallenges.SimpleHTTP(
challb=challb, domain=domain, key=key)
challb=challb, domain=domain, account_key=account_key)
elif isinstance(chall, challenges.DNS):
return achallenges.DNS(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryToken):
return achallenges.RecoveryToken(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryContact):
return achallenges.RecoveryContact(
challb=challb, domain=domain)

View file

@ -22,7 +22,6 @@ class NamespaceConfig(object):
- `cert_key_backup`
- `in_progress_dir`
- `key_dir`
- `rec_token_dir`
- `renewer_config_file`
- `temp_checkpoint_dir`
@ -71,11 +70,6 @@ class NamespaceConfig(object):
def key_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.config_dir, constants.KEY_DIR)
# TODO: This should probably include the server name
@property
def rec_token_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.work_dir, constants.REC_TOKEN_DIR)
@property
def temp_checkpoint_dir(self): # pylint: disable=missing-docstring
return os.path.join(

View file

@ -88,10 +88,6 @@ LIVE_DIR = "live"
TEMP_CHECKPOINT_DIR = "temp_checkpoint"
"""Temporary checkpoint directory (relative to `IConfig.work_dir`)."""
REC_TOKEN_DIR = "recovery_tokens"
"""Directory where all recovery tokens are saved (relative to
`IConfig.work_dir`)."""
RENEWAL_CONFIGS_DIR = "configs"
"""Renewal configs directory, relative to `IConfig.config_dir`."""

View file

@ -7,16 +7,12 @@ from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt import proof_of_possession
from letsencrypt import recovery_token
class ContinuityAuthenticator(object):
"""IAuthenticator for
:const:`~acme.challenges.ContinuityChallenge` class challenges.
:ivar rec_token: Performs "recoveryToken" challenges.
:type rec_token: :class:`letsencrypt.recovery_token.RecoveryToken`
:ivar proof_of_pos: Performs "proofOfPossession" challenges.
:type proof_of_pos:
:class:`letsencrypt.proof_of_possession.Proof_of_Possession`
@ -25,7 +21,7 @@ class ContinuityAuthenticator(object):
zope.interface.implements(interfaces.IAuthenticator)
# This will have an installer soon for get_key/cert purposes
def __init__(self, config, installer):
def __init__(self, config, installer): # pylint: disable=unused-argument
"""Initialize Client Authenticator.
:param config: Configuration.
@ -35,13 +31,11 @@ class ContinuityAuthenticator(object):
:type installer: :class:`letsencrypt.interfaces.IInstaller`
"""
self.rec_token = recovery_token.RecoveryToken(
config.server, config.rec_token_dir)
self.proof_of_pos = proof_of_possession.ProofOfPossession(installer)
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Return list of challenge preferences."""
return [challenges.ProofOfPossession, challenges.RecoveryToken]
return [challenges.ProofOfPossession]
def perform(self, achalls):
"""Perform client specific challenges for IAuthenticator"""
@ -49,16 +43,12 @@ class ContinuityAuthenticator(object):
for achall in achalls:
if isinstance(achall, achallenges.ProofOfPossession):
responses.append(self.proof_of_pos.perform(achall))
elif isinstance(achall, achallenges.RecoveryToken):
responses.append(self.rec_token.perform(achall))
else:
raise errors.ContAuthError("Unexpected Challenge")
return responses
def cleanup(self, achalls):
def cleanup(self, achalls): # pylint: disable=no-self-use
"""Cleanup call for IAuthenticator."""
for achall in achalls:
if isinstance(achall, achallenges.RecoveryToken):
self.rec_token.cleanup(achall)
elif not isinstance(achall, achallenges.ProofOfPossession):
if not isinstance(achall, achallenges.ProofOfPossession):
raise errors.ContAuthError("Unexpected Challenge")

View file

@ -8,7 +8,6 @@ import datetime
import logging
import os
from cryptography.hazmat.primitives import serialization
import OpenSSL
from acme import crypto_util as acme_crypto_util
@ -215,15 +214,6 @@ def pyopenssl_load_certificate(data):
return _pyopenssl_load(data, OpenSSL.crypto.load_certificate)
def private_jwk_to_pyopenssl(jwk):
"""Convert private JWK to pyOpenSSL key."""
key_pem = jwk.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_pem)
def _get_sans_from_cert_or_req(
cert_or_req_str, load_func, typ=OpenSSL.crypto.FILETYPE_PEM):
try:
@ -238,7 +228,7 @@ def _get_sans_from_cert_or_req(
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get a list of Subject Alternative Names from a certificate.
:param str csr: Certificate (encoded).
:param str cert: Certificate (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:returns: A list of Subject Alternative Names.

View file

@ -5,10 +5,6 @@ class Error(Exception):
"""Generic Let's Encrypt client error."""
class SubprocessError(Error):
"""Subprocess handling error."""
class AccountStorageError(Error):
"""Generic `.AccountStorage` error."""
@ -21,6 +17,14 @@ class ReverterError(Error):
"""Let's Encrypt Reverter error."""
class SubprocessError(Error):
"""Subprocess handling error."""
class CertStorageError(Error):
"""Generic `.CertStorage` error."""
# Auth Handler Errors
class AuthorizationError(Error):
"""Authorization error."""

View file

@ -215,8 +215,6 @@ class IConfig(zope.interface.Interface):
in_progress_dir = zope.interface.Attribute(
"Directory used before a permanent checkpoint is finalized.")
key_dir = zope.interface.Attribute("Keys storage.")
rec_token_dir = zope.interface.Attribute(
"Directory where all recovery tokens are saved.")
temp_checkpoint_dir = zope.interface.Attribute(
"Temporary checkpoint directory.")

View file

@ -5,7 +5,6 @@ import re
import shutil
import tempfile
from cryptography.hazmat.primitives import serialization
import zope.interface
from acme.jose import util as jose_util
@ -163,13 +162,13 @@ class Dvsni(object):
:rtype: str
"""
return os.path.join(
self.configurator.config.work_dir, achall.nonce_domain + ".crt")
return os.path.join(self.configurator.config.work_dir,
achall.chall.encode("token") + ".crt")
def get_key_path(self, achall):
"""Get standardized path to challenge key."""
return os.path.join(
self.configurator.config.work_dir, achall.nonce_domain + '.pem')
return os.path.join(self.configurator.config.work_dir,
achall.chall.encode("token") + '.pem')
def _setup_challenge_cert(self, achall, s=None):
# pylint: disable=invalid-name
@ -180,17 +179,11 @@ class Dvsni(object):
self.configurator.reverter.register_file_creation(True, key_path)
self.configurator.reverter.register_file_creation(True, cert_path)
cert_pem, response = achall.gen_cert_and_response(s)
response, cert_pem, key_pem = achall.gen_cert_and_response(s)
# Write out challenge cert
# Write out challenge cert and key
with open(cert_path, "wb") as cert_chall_fd:
cert_chall_fd.write(cert_pem)
# Write out challenge key
key_pem = achall.key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
with le_util.safe_open(key_path, 'wb', chmod=0o400) as key_file:
key_file.write(key_pem)

View file

@ -1,16 +1,15 @@
"""Tests for letsencrypt.plugins.common."""
import os
import pkg_resources
import unittest
import mock
from acme import challenges
from acme import jose
from letsencrypt import achallenges
from letsencrypt import le_util
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
class NamespaceFunctionsTest(unittest.TestCase):
@ -111,30 +110,16 @@ class AddrTest(unittest.TestCase):
class DvsniTest(unittest.TestCase):
"""Tests for letsencrypt.plugins.common.DvsniTest."""
rsa256_file = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
rsa256_pem = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa256_key.pem"))
auth_key = le_util.Key(rsa256_file, rsa256_pem)
auth_key = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
achalls = [
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9"
"\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18",
), "pending"),
domain="encryption-example.demo", key=auth_key),
challenges.DVSNI(token=b'dvsni1'), "pending"),
domain="encryption-example.demo", account_key=auth_key),
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7\xa1\xb2\xc5"
"\x96\xba",
), "pending"),
domain="letsencrypt.demo", key=auth_key),
challenges.DVSNI(token=b'dvsni2'), "pending"),
domain="letsencrypt.demo", account_key=auth_key),
]
def setUp(self):
@ -153,10 +138,9 @@ class DvsniTest(unittest.TestCase):
# http://www.voidspace.org.uk/python/mock/helpers.html#mock.mock_open
mock_open, mock_safe_open = mock.mock_open(), mock.mock_open()
response = challenges.DVSNIResponse(s="randomS1")
achall = mock.MagicMock(nonce=self.achalls[0].nonce,
nonce_domain=self.achalls[0].nonce_domain)
achall.gen_cert_and_response.return_value = ("pem", response)
response = challenges.DVSNIResponse(validation=mock.Mock())
achall = mock.MagicMock()
achall.gen_cert_and_response.return_value = (response, "cert", "key")
with mock.patch("letsencrypt.plugins.common.open",
mock_open, create=True):
@ -168,11 +152,10 @@ class DvsniTest(unittest.TestCase):
# pylint: disable=no-member
mock_open.assert_called_once_with(self.sni.get_cert_path(achall), "wb")
mock_open.return_value.write.assert_called_once_with("pem")
mock_open.return_value.write.assert_called_once_with("cert")
mock_safe_open.assert_called_once_with(
self.sni.get_key_path(achall), "wb", chmod=0o400)
mock_safe_open.return_value.write.assert_called_once_with(
achall.key.key.private_bytes())
mock_safe_open.return_value.write.assert_called_once_with("key")
if __name__ == "__main__":

View file

@ -1,6 +1,7 @@
"""Manual plugin."""
import os
import logging
import pipes
import shutil
import signal
import subprocess
@ -12,7 +13,6 @@ import zope.component
import zope.interface
from acme import challenges
from acme import jose
from letsencrypt import errors
from letsencrypt import interfaces
@ -55,7 +55,7 @@ command on the target server (as root):
HTTP_TEMPLATE = """\
mkdir -p {root}/public_html/{response.URI_ROOT_PATH}
cd {root}/public_html
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
echo -n {validation} > {response.URI_ROOT_PATH}/{encoded_token}
# run only once per server:
python -c "import BaseHTTPServer, SimpleHTTPServer; \\
SimpleHTTPServer.SimpleHTTPRequestHandler.extensions_map = {{'': '{ct}'}}; \\
@ -67,7 +67,7 @@ s.serve_forever()" """
HTTPS_TEMPLATE = """\
mkdir -p {root}/public_html/{response.URI_ROOT_PATH}
cd {root}/public_html
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
echo -n {validation} > {response.URI_ROOT_PATH}/{encoded_token}
# run only once per server:
openssl req -new -newkey rsa:4096 -subj "/" -days 1 -nodes -x509 -keyout ../key.pem -out ../cert.pem
python -c "import BaseHTTPServer, SimpleHTTPServer, ssl; \\
@ -124,13 +124,13 @@ binary for temporary key/certificate generation.""".replace("\n", "")
# same path for each challenge response would be easier for
# users, but will not work if multiple domains point at the
# same server: default command doesn't support virtual hosts
response = challenges.SimpleHTTPResponse(
path=jose.b64encode(os.urandom(18)),
response, validation = achall.gen_response_and_validation(
tls=(not self.config.no_simple_http_tls))
assert response.good_path # is encoded os.urandom(18) good?
command = self.template.format(
root=self._root, achall=achall, response=response,
validation=pipes.quote(validation.json_dumps()),
encoded_token=achall.chall.encode("token"),
ct=response.CONTENT_TYPE, port=(
response.port if self.config.simple_http_port is None
else self.config.simple_http_port))
@ -157,11 +157,12 @@ binary for temporary key/certificate generation.""".replace("\n", "")
else:
self._notify_and_wait(self.MESSAGE_TEMPLATE.format(
achall=achall, response=response,
uri=response.uri(achall.domain), ct=response.CONTENT_TYPE,
command=command))
uri=response.uri(achall.domain, achall.challb.chall),
ct=response.CONTENT_TYPE, command=command))
if response.simple_verify(
achall.challb, achall.domain, self.config.simple_http_port):
achall.chall, achall.domain,
achall.account_key.public_key(), self.config.simple_http_port):
return response
else:
if self.conf("test-mode") and self._httpd.poll() is not None:

View file

@ -5,11 +5,16 @@ import unittest
import mock
from acme import challenges
from acme import jose
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class ManualAuthenticatorTest(unittest.TestCase):
@ -22,7 +27,7 @@ class ManualAuthenticatorTest(unittest.TestCase):
manual_test_mode=False)
self.auth = ManualAuthenticator(config=self.config, name="manual")
self.achalls = [achallenges.SimpleHTTP(
challb=acme_util.SIMPLE_HTTP, domain="foo.com", key=None)]
challb=acme_util.SIMPLE_HTTP_P, domain="foo.com", account_key=KEY)]
config_test_mode = mock.MagicMock(
no_simple_http_tls=True, simple_http_port=4430,
@ -46,17 +51,17 @@ class ManualAuthenticatorTest(unittest.TestCase):
@mock.patch("__builtin__.raw_input")
def test_perform(self, mock_raw_input, mock_verify, mock_urandom,
mock_stdout):
mock_urandom.return_value = "foo"
mock_urandom.side_effect = nonrandom_urandom
mock_verify.return_value = True
resp = challenges.SimpleHTTPResponse(tls=False, path='Zm9v')
resp = challenges.SimpleHTTPResponse(tls=False)
self.assertEqual([resp], self.auth.perform(self.achalls))
self.assertEqual(1, mock_raw_input.call_count)
mock_verify.assert_called_with(self.achalls[0].challb, "foo.com", 4430)
mock_verify.assert_called_with(
self.achalls[0].challb.chall, "foo.com", KEY.public_key(), 4430)
message = mock_stdout.write.mock_calls[0][1][0]
self.assertTrue(self.achalls[0].token in message)
self.assertTrue('Zm9v' in message)
mock_verify.return_value = False
self.assertEqual([None], self.auth.perform(self.achalls))
@ -101,5 +106,10 @@ class ManualAuthenticatorTest(unittest.TestCase):
mock_killpg.assert_called_once_with(1234, signal.SIGTERM)
def nonrandom_urandom(num_bytes):
"""Returns a string of length num_bytes"""
return "x" * num_bytes
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -6,7 +6,6 @@ import socket
import sys
import time
from cryptography.hazmat.primitives import serialization
import OpenSSL
import zope.component
import zope.interface
@ -14,6 +13,7 @@ import zope.interface
from acme import challenges
from letsencrypt import achallenges
from letsencrypt import crypto_util
from letsencrypt import interfaces
from letsencrypt.plugins import common
@ -28,6 +28,11 @@ class StandaloneAuthenticator(common.Plugin):
the certificate authority. Therefore, it does not rely on any
existing server program.
:param OpenSSL.crypto.PKey private_key: DVSNI challenge certificate
key.
:param sni_names: Mapping from z_domain (`bytes`) to PEM-encoded
certificate (`bytes`).
"""
zope.interface.implements(interfaces.IAuthenticator)
zope.interface.classProvides(interfaces.IPluginFactory)
@ -40,9 +45,12 @@ class StandaloneAuthenticator(common.Plugin):
self.parent_pid = os.getpid()
self.subproc_state = None
self.tasks = {}
self.sni_names = {}
self.sock = None
self.connection = None
self.private_key = None
self.key_pem = crypto_util.make_key(bits=2048)
self.private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, self.key_pem)
self.ssl_conn = None
def prepare(self):
@ -121,12 +129,12 @@ class StandaloneAuthenticator(common.Plugin):
"""
sni_name = connection.get_servername()
if sni_name in self.tasks:
pem_cert = self.tasks[sni_name]
if sni_name in self.sni_names:
pem_cert = self.sni_names[sni_name]
else:
# TODO: Should we really present a certificate if we get an
# unexpected SNI name? Or should we just disconnect?
pem_cert = self.tasks.values()[0]
pem_cert = next(self.sni_names.itervalues())
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pem_cert)
new_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
@ -179,7 +187,7 @@ class StandaloneAuthenticator(common.Plugin):
return False
def do_child_process(self, port, key):
def do_child_process(self, port):
"""Perform the child process side of the TCP listener task.
This should only be called by :meth:`start_listener`.
@ -189,9 +197,6 @@ class StandaloneAuthenticator(common.Plugin):
handler.
:param int port: Which TCP port to bind.
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: `letsencrypt.le_util.Key`
"""
signal.signal(signal.SIGINT, self.subproc_signal_handler)
@ -218,11 +223,6 @@ class StandaloneAuthenticator(common.Plugin):
self.sock.listen(1)
# Signal that we've successfully bound TCP port
os.kill(self.parent_pid, signal.SIGIO)
self.private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()))
while True:
self.connection, _ = self.sock.accept()
@ -245,16 +245,13 @@ class StandaloneAuthenticator(common.Plugin):
self.ssl_conn.shutdown()
self.ssl_conn.close()
def start_listener(self, port, key):
def start_listener(self, port):
"""Start listener.
Create a child process which will start a TCP listener on the
specified port to perform the specified DVSNI challenges.
:param int port: The TCP port to bind.
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: :class:`letsencrypt.le_util.Key`
:returns: ``True`` or ``False`` to indicate success or failure creating
the subprocess.
@ -290,7 +287,7 @@ class StandaloneAuthenticator(common.Plugin):
self.child_pid = os.getpid()
# do_child_process() is normally not expected to return but
# should terminate via sys.exit().
return self.do_child_process(port, key)
return self.do_child_process(port)
def already_listening(self, port): # pylint: disable=no-self-use
"""Check if a process is already listening on the port.
@ -368,12 +365,14 @@ class StandaloneAuthenticator(common.Plugin):
results_if_failure = []
if not achalls or not isinstance(achalls, list):
raise ValueError(".perform() was called without challenge list")
# TODO: "bits" should be user-configurable
for achall in achalls:
if isinstance(achall, achallenges.DVSNI):
# We will attempt to do it
key = achall.key # TODO: bug; one key per start_listener
cert_pem, response = achall.gen_cert_and_response()
self.tasks[achall.nonce_domain] = cert_pem
response, cert_pem, _ = achall.gen_cert_and_response(
key_pem=self.key_pem)
self.sni_names[response.z_domain] = cert_pem
self.tasks[achall.token] = cert_pem
results_if_success.append(response)
results_if_failure.append(None)
else:
@ -392,7 +391,7 @@ class StandaloneAuthenticator(common.Plugin):
return results_if_failure
# Try to do the authentication; note that this creates
# the listener subprocess via os.fork()
if self.start_listener(self.config.dvsni_port, key):
if self.start_listener(self.config.dvsni_port):
return results_if_success
else:
# TODO: This should probably raise a DVAuthError exception
@ -411,8 +410,8 @@ class StandaloneAuthenticator(common.Plugin):
# Remove this from pending tasks list
for achall in achalls:
assert isinstance(achall, achallenges.DVSNI)
if achall.nonce_domain in self.tasks:
del self.tasks[achall.nonce_domain]
if achall.token in self.tasks:
del self.tasks[achall.token]
else:
# Could not find the challenge to remove!
raise ValueError("could not find the challenge to remove")

View file

@ -1,13 +1,10 @@
"""Tests for letsencrypt.plugins.standalone.authenticator."""
import os
import pkg_resources
import psutil
import signal
import socket
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
@ -17,16 +14,13 @@ from acme import jose
from letsencrypt import achallenges
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
KEY_PATH = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem"))
KEY_DATA = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem"))
KEY = jose.JWKRSA(key=jose.ComparableRSAKey(serialization.load_pem_private_key(
KEY_DATA, password=None, backend=default_backend())))
PRIVATE_KEY = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, KEY_DATA)
ACCOUNT_KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
CHALL_KEY_PEM = test_util.load_vector("rsa512_key_2.pem")
CHALL_KEY = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, CHALL_KEY_PEM)
CONFIG = mock.Mock(dvsni_port=5001)
@ -80,9 +74,11 @@ class SNICallbackTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.cert = achallenges.DVSNI(
challb=acme_util.DVSNI_P,
domain="example.com", key=KEY).gen_cert_and_response()[0]
self.authenticator.private_key = PRIVATE_KEY
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
domain="example.com",
account_key=ACCOUNT_KEY
).gen_cert_and_response(key_pem=CHALL_KEY_PEM)[1]
self.authenticator.private_key = CHALL_KEY
self.authenticator.sni_names = {"abcdef.acme.invalid": self.cert}
self.authenticator.child_pid = 12345
def test_real_servername(self):
@ -116,7 +112,7 @@ class ClientSignalHandlerTest(unittest.TestCase):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.authenticator.tasks = {"foononce.acme.invalid": "stuff"}
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
self.authenticator.child_pid = 12345
def test_client_signal_handler(self):
@ -145,7 +141,7 @@ class SubprocSignalHandlerTest(unittest.TestCase):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.authenticator.tasks = {"foononce.acme.invalid": "stuff"}
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
self.authenticator.child_pid = 12345
self.authenticator.parent_pid = 23456
@ -303,12 +299,12 @@ class PerformTest(unittest.TestCase):
self.achall1 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="foo"), "pending"),
domain="foo.example.com", key=KEY)
challenges.DVSNI(token=b"foo"), "pending"),
domain="foo.example.com", account_key=ACCOUNT_KEY)
self.achall2 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="bar"), "pending"),
domain="bar.example.com", key=KEY)
challenges.DVSNI(token=b"bar"), "pending"),
domain="bar.example.com", account_key=ACCOUNT_KEY)
bad_achall = ("This", "Represents", "A Non-DVSNI", "Challenge")
self.achalls = [self.achall1, self.achall2, bad_achall]
@ -326,16 +322,16 @@ class PerformTest(unittest.TestCase):
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
self.authenticator.tasks.has_key(self.achall1.token))
self.assertTrue(
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
self.authenticator.tasks.has_key(self.achall2.token))
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertTrue(isinstance(result[0], challenges.ChallengeResponse))
self.assertTrue(isinstance(result[1], challenges.ChallengeResponse))
self.assertFalse(result[2])
self.authenticator.start_listener.assert_called_once_with(
CONFIG.dvsni_port, KEY)
CONFIG.dvsni_port)
def test_cannot_perform(self):
"""What happens if start_listener() returns False."""
@ -345,17 +341,17 @@ class PerformTest(unittest.TestCase):
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
self.authenticator.tasks.has_key(self.achall1.token))
self.assertTrue(
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
self.authenticator.tasks.has_key(self.achall2.token))
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertEqual(result, [None, None, False])
self.authenticator.start_listener.assert_called_once_with(
CONFIG.dvsni_port, KEY)
CONFIG.dvsni_port)
def test_perform_with_pending_tasks(self):
self.authenticator.tasks = {"foononce.acme.invalid": "cert_data"}
self.authenticator.tasks = {"footoken.acme.invalid": "cert_data"}
extra_achall = acme_util.DVSNI_P
self.assertRaises(
ValueError, self.authenticator.perform, [extra_achall])
@ -384,7 +380,7 @@ class StartListenerTest(unittest.TestCase):
self.authenticator.do_parent_process = mock.Mock()
self.authenticator.do_parent_process.return_value = True
mock_fork.return_value = 22222
result = self.authenticator.start_listener(1717, "key")
result = self.authenticator.start_listener(1717)
# start_listener is expected to return the True or False return
# value from do_parent_process.
self.assertTrue(result)
@ -396,10 +392,9 @@ class StartListenerTest(unittest.TestCase):
self.authenticator.do_parent_process = mock.Mock()
self.authenticator.do_child_process = mock.Mock()
mock_fork.return_value = 0
self.authenticator.start_listener(1717, "key")
self.authenticator.start_listener(1717)
self.assertEqual(self.authenticator.child_pid, os.getpid())
self.authenticator.do_child_process.assert_called_once_with(
1717, "key")
self.authenticator.do_child_process.assert_called_once_with(1717)
class DoParentProcessTest(unittest.TestCase):
@ -452,9 +447,10 @@ class DoChildProcessTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.cert = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r=("x" * 32), nonce="abcdef"), "pending"),
domain="example.com", key=KEY).gen_cert_and_response()[0]
self.authenticator.private_key = PRIVATE_KEY
challenges.DVSNI(token=b"abcdef"), "pending"),
domain="example.com", account_key=ACCOUNT_KEY).gen_cert_and_response(
key_pem=CHALL_KEY_PEM)[1]
self.authenticator.private_key = CHALL_KEY
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
self.authenticator.parent_pid = 12345
@ -475,7 +471,7 @@ class DoChildProcessTest(unittest.TestCase):
# do_child_process code assumes that calling sys.exit() will
# cause subsequent code not to be executed.)
self.assertRaises(
IndentationError, self.authenticator.do_child_process, 1717, KEY)
IndentationError, self.authenticator.do_child_process, 1717)
mock_exit.assert_called_once_with(1)
mock_kill.assert_called_once_with(12345, signal.SIGUSR2)
@ -490,7 +486,7 @@ class DoChildProcessTest(unittest.TestCase):
sample_socket.bind.side_effect = eaccess
mock_socket.return_value = sample_socket
self.assertRaises(
IndentationError, self.authenticator.do_child_process, 1717, KEY)
IndentationError, self.authenticator.do_child_process, 1717)
mock_exit.assert_called_once_with(1)
mock_kill.assert_called_once_with(12345, signal.SIGUSR1)
@ -506,7 +502,7 @@ class DoChildProcessTest(unittest.TestCase):
sample_socket.bind.side_effect = eio
mock_socket.return_value = sample_socket
self.assertRaises(
socket.error, self.authenticator.do_child_process, 1717, KEY)
socket.error, self.authenticator.do_child_process, 1717)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"OpenSSL.SSL.Connection")
@ -519,7 +515,7 @@ class DoChildProcessTest(unittest.TestCase):
mock_socket.return_value = sample_socket
mock_connection.return_value = mock.MagicMock()
self.assertRaises(
CallableExhausted, self.authenticator.do_child_process, 1717, KEY)
CallableExhausted, self.authenticator.do_child_process, 1717)
mock_socket.assert_called_once_with()
sample_socket.bind.assert_called_once_with(("0.0.0.0", 1717))
sample_socket.listen.assert_called_once_with(1)
@ -538,9 +534,9 @@ class CleanupTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.achall = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="foononce"), "pending"),
domain="foo.example.com", key="key")
self.authenticator.tasks = {self.achall.nonce_domain: "stuff"}
challenges.DVSNI(token=b"footoken"), "pending"),
domain="foo.example.com", account_key="key")
self.authenticator.tasks = {self.achall.token: "stuff"}
self.authenticator.child_pid = 12345
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
@ -558,8 +554,8 @@ class CleanupTest(unittest.TestCase):
self.assertRaises(
ValueError, self.authenticator.cleanup, [achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="badnonce"), "pending"),
domain="bad.example.com", key="key")])
challenges.DVSNI(token=b"badtoken"), "pending"),
domain="bad.example.com", account_key="key")])
class MoreInfoTest(unittest.TestCase):

View file

@ -1,72 +0,0 @@
"""Recovery Token Identifier Validation Challenge."""
import errno
import os
import zope.component
from acme import challenges
from letsencrypt import le_util
from letsencrypt import interfaces
class RecoveryToken(object):
"""Recovery Token Identifier Validation Challenge.
Based on draft-barnes-acme, section 6.4.
"""
def __init__(self, server, direc):
self.token_dir = os.path.join(direc, server)
def perform(self, chall):
"""Perform the Recovery Token Challenge.
:param chall: Recovery Token Challenge
:type chall: :class:`letsencrypt.achallenges.RecoveryToken`
:returns: response
:rtype: dict
"""
token_fp = os.path.join(self.token_dir, chall.domain)
if os.path.isfile(token_fp):
with open(token_fp) as token_fd:
return challenges.RecoveryTokenResponse(token=token_fd.read())
cancel, token = zope.component.getUtility(
interfaces.IDisplay).input(
"%s - Input Recovery Token: " % chall.domain)
if cancel != 1:
return challenges.RecoveryTokenResponse(token=token)
return None
def cleanup(self, chall):
"""Cleanup the saved recovery token if it exists.
:param chall: Recovery Token Challenge
:type chall: :class:`letsencrypt.achallenges.RecoveryToken`
"""
try:
le_util.safely_remove(os.path.join(self.token_dir, chall.domain))
except OSError as err:
if err.errno != errno.ENOENT:
raise
def requires_human(self, domain):
"""Indicates whether or not domain can be auto solved."""
return not os.path.isfile(os.path.join(self.token_dir, domain))
def store_token(self, domain, token):
"""Store token for later automatic use.
:param str domain: domain associated with the token
:param str token: token from authorization
"""
le_util.make_or_verify_dir(self.token_dir, 0o700, os.geteuid())
with open(os.path.join(self.token_dir, domain), "w") as token_fd:
token_fd.write(str(token))

View file

@ -20,6 +20,7 @@ from letsencrypt import configuration
from letsencrypt import cli
from letsencrypt import client
from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import notify
from letsencrypt import storage
@ -164,7 +165,7 @@ def main(config=None, args=sys.argv[1:]):
# dramatically improve performance for large deployments
# where autorenewal is widely turned off.
cert = storage.RenewableCert(rc_config, cli_config=cli_config)
except ValueError:
except errors.CertStorageError:
# This indicates an invalid renewal configuration file, such
# as one missing a required parameter (in the future, perhaps
# also one that is internally inconsistent or is missing a

View file

@ -11,6 +11,7 @@ import pytz
import pyrfc3339
from letsencrypt import constants
from letsencrypt import errors
from letsencrypt import le_util
ALL_FOUR = ("cert", "privkey", "chain", "fullchain")
@ -90,7 +91,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
renewal config file.
:param .RenewerConfiguration cli_config:
:raises ValueError: if the configuration file's name didn't end
:raises .CertStorageError: if the configuration file's name didn't end
in ".conf", or the file is missing or broken.
:raises TypeError: if the provided renewal configuration isn't a
ConfigObj object.
@ -99,7 +100,8 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
self.cli_config = cli_config
if isinstance(configfile, configobj.ConfigObj):
if not os.path.basename(configfile.filename).endswith(".conf"):
raise ValueError("renewal config file name must end in .conf")
raise errors.CertStorageError(
"renewal config file name must end in .conf")
self.lineagename = os.path.basename(
configfile.filename)[:-len(".conf")]
else:
@ -117,8 +119,9 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
self.configuration.merge(self.configfile)
if not all(x in self.configuration for x in ALL_FOUR):
raise ValueError("renewal config file {0} is missing a required "
"file reference".format(configfile))
raise errors.CertStorageError(
"renewal config file {0} is missing a required "
"file reference".format(configfile))
self.cert = self.configuration["cert"]
self.privkey = self.configuration["privkey"]
@ -213,7 +216,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
raise errors.CertStorageError("unknown kind of item")
link = getattr(self, kind)
if not os.path.exists(link):
return None
@ -236,7 +239,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
raise errors.CertStorageError("unknown kind of item")
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
target = self.current_target(kind)
if target is None or not os.path.exists(target):
@ -263,12 +266,12 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
raise errors.CertStorageError("unknown kind of item")
where = os.path.dirname(self.current_target(kind))
return os.path.join(where, "{0}{1}.pem".format(kind, version))
def available_versions(self, kind):
"""Which lternative versions of the specified kind of item exist?
"""Which alternative versions of the specified kind of item exist?
The archive directory where the current version is stored is
consulted to obtain the list of alternatives.
@ -281,7 +284,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
raise errors.CertStorageError("unknown kind of item")
where = os.path.dirname(self.current_target(kind))
files = os.listdir(where)
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
@ -308,7 +311,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
:rtype: int
"""
# TODO: this can raise ValueError if there is no version overlap
# TODO: this can raise CertStorageError if there is no version overlap
# (it should probably return None instead)
# TODO: this can raise a spurious AttributeError if the current
# link for any kind is missing (it should probably return None)
@ -355,7 +358,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
raise errors.CertStorageError("unknown kind of item")
link = getattr(self, kind)
filename = "{0}{1}.pem".format(kind, version)
# Relative rather than absolute target directory
@ -550,7 +553,8 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
config_file, config_filename = le_util.unique_lineage_name(
cli_config.renewal_configs_dir, lineagename)
if not config_filename.endswith(".conf"):
raise ValueError("renewal config file name must end in .conf")
raise errors.CertStorageError(
"renewal config file name must end in .conf")
# Determine where on disk everything will go
# lineagename will now potentially be modified based on which
@ -559,9 +563,11 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
archive = os.path.join(cli_config.archive_dir, lineagename)
live_dir = os.path.join(cli_config.live_dir, lineagename)
if os.path.exists(archive):
raise ValueError("archive directory exists for " + lineagename)
raise errors.CertStorageError(
"archive directory exists for " + lineagename)
if os.path.exists(live_dir):
raise ValueError("live directory exists for " + lineagename)
raise errors.CertStorageError(
"live directory exists for " + lineagename)
os.mkdir(archive)
os.mkdir(live_dir)
relative_archive = os.path.join("..", "..", "archive", lineagename)

View file

@ -63,7 +63,6 @@ class ReportNewAccountTest(unittest.TestCase):
def setUp(self):
self.config = mock.MagicMock(config_dir="/etc/letsencrypt")
reg = messages.Registration.from_data(email="rhino@jungle.io")
reg = reg.update(recovery_token="ECCENTRIC INVISIBILITY RHINOCEROS")
self.acc = mock.MagicMock(regr=messages.RegistrationResource(
uri=None, new_authzr_uri=None, body=reg))
@ -81,7 +80,6 @@ class ReportNewAccountTest(unittest.TestCase):
self._call()
call_list = mock_zope().add_message.call_args_list
self.assertTrue(self.config.config_dir in call_list[0][0][0])
self.assertTrue(self.acc.regr.body.recovery_token in call_list[1][0][0])
self.assertTrue(
", ".join(self.acc.regr.body.emails) in call_list[1][0][0])

View file

@ -1,10 +1,7 @@
"""Tests for letsencrypt.achallenges."""
import unittest
import OpenSSL
from acme import challenges
from acme import crypto_util as acme_crypto_util
from acme import jose
from letsencrypt.tests import acme_util
@ -15,28 +12,20 @@ class DVSNITest(unittest.TestCase):
"""Tests for letsencrypt.achallenges.DVSNI."""
def setUp(self):
self.chall = acme_util.chall_to_challb(
challenges.DVSNI(r="r_value", nonce="12345ABCDE"), "pending")
self.response = challenges.DVSNIResponse()
self.challb = acme_util.chall_to_challb(acme_util.DVSNI, "pending")
key = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
from letsencrypt.achallenges import DVSNI
self.achall = DVSNI(challb=self.chall, domain="example.com", key=key)
self.achall = DVSNI(
challb=self.challb, domain="example.com", account_key=key)
def test_proxy(self):
self.assertEqual(self.chall.r, self.achall.r)
self.assertEqual(self.chall.nonce, self.achall.nonce)
self.assertEqual(self.challb.token, self.achall.token)
def test_gen_cert_and_response(self):
cert_pem, _ = self.achall.gen_cert_and_response(s=self.response.s)
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert_pem)
self.assertEqual(cert.get_subject().CN, "example.com")
# pylint: disable=protected-access
self.assertEqual(acme_crypto_util._pyopenssl_cert_or_req_san(cert), [
"example.com", self.chall.nonce_domain,
self.response.z_domain(self.chall)])
response, cert_pem, key_pem = self.achall.gen_cert_and_response()
self.assertTrue(isinstance(response, challenges.DVSNIResponse))
self.assertTrue(isinstance(cert_pem, bytes))
self.assertTrue(isinstance(key_pem, bytes))
if __name__ == "__main__":

View file

@ -15,14 +15,12 @@ KEY = test_util.load_rsa_private_key('rsa512_key.pem')
SIMPLE_HTTP = challenges.SimpleHTTP(
token="evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
DVSNI = challenges.DVSNI(
r=jose.b64decode("Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI"),
nonce=jose.b64decode("a82d5ff8ef740d12881f6d3c2277ab2e"))
token=jose.b64decode(b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJyPCt92wrDoA"))
DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a")
RECOVERY_CONTACT = challenges.RecoveryContact(
activation_url="https://example.ca/sendrecovery/a5bd99383fb0",
success_url="https://example.ca/confirmrecovery/bb1b9928932",
contact="c********n@example.com")
RECOVERY_TOKEN = challenges.RecoveryToken()
POP = challenges.ProofOfPossession(
alg="RS256", nonce=jose.b64decode("eET5udtV7aoX8Xl8gYiZIA"),
hints=challenges.ProofOfPossession.Hints(
@ -43,7 +41,7 @@ POP = challenges.ProofOfPossession(
)
)
CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, RECOVERY_TOKEN, POP]
CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, POP]
DV_CHALLENGES = [chall for chall in CHALLENGES
if isinstance(chall, challenges.DVChallenge)]
CONT_CHALLENGES = [chall for chall in CHALLENGES
@ -85,11 +83,9 @@ DVSNI_P = chall_to_challb(DVSNI, messages.STATUS_PENDING)
SIMPLE_HTTP_P = chall_to_challb(SIMPLE_HTTP, messages.STATUS_PENDING)
DNS_P = chall_to_challb(DNS, messages.STATUS_PENDING)
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages.STATUS_PENDING)
RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, messages.STATUS_PENDING)
POP_P = chall_to_challb(POP, messages.STATUS_PENDING)
CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P,
RECOVERY_CONTACT_P, RECOVERY_TOKEN_P, POP_P]
CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P, RECOVERY_CONTACT_P, POP_P]
DV_CHALLENGES_P = [challb for challb in CHALLENGES_P
if isinstance(challb.chall, challenges.DVChallenge)]
CONT_CHALLENGES_P = [

View file

@ -19,7 +19,6 @@ TRANSLATE = {
"dvsni": "DVSNI",
"simpleHttp": "SimpleHTTP",
"dns": "DNS",
"recoveryToken": "RecoveryToken",
"recoveryContact": "RecoveryContact",
"proofOfPossession": "ProofOfPossession",
}
@ -41,7 +40,8 @@ class ChallengeFactoryTest(unittest.TestCase):
[messages.STATUS_PENDING]*6, False)
def test_all(self):
cont_c, dv_c = self.handler._challenge_factory(self.dom, range(0, 6))
cont_c, dv_c = self.handler._challenge_factory(
self.dom, range(0, len(acme_util.CHALLENGES)))
self.assertEqual(
[achall.chall for achall in cont_c], acme_util.CONT_CHALLENGES)
@ -49,10 +49,10 @@ class ChallengeFactoryTest(unittest.TestCase):
[achall.chall for achall in dv_c], acme_util.DV_CHALLENGES)
def test_one_dv_one_cont(self):
cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 4])
cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 3])
self.assertEqual(
[achall.chall for achall in cont_c], [acme_util.RECOVERY_TOKEN])
[achall.chall for achall in cont_c], [acme_util.RECOVERY_CONTACT])
self.assertEqual([achall.chall for achall in dv_c], [acme_util.DVSNI])
def test_unrecognized(self):
@ -80,7 +80,7 @@ class GetAuthorizationsTest(unittest.TestCase):
self.mock_dv_auth.get_chall_pref.return_value = [challenges.DVSNI]
self.mock_cont_auth.get_chall_pref.return_value = [
challenges.RecoveryToken]
challenges.RecoveryContact]
self.mock_cont_auth.perform.side_effect = gen_auth_resp
self.mock_dv_auth.perform.side_effect = gen_auth_resp
@ -196,7 +196,7 @@ class PollChallengesTest(unittest.TestCase):
self.chall_update = {}
for dom in self.doms:
self.chall_update[dom] = [
challb_to_achall(challb, "dummy_key", dom)
challb_to_achall(challb, mock.Mock(key="dummy_key"), dom)
for challb in self.handler.authzr[dom].body.challenges]
@mock.patch("letsencrypt.auth_handler.time")
@ -313,11 +313,11 @@ class GenChallengePathTest(unittest.TestCase):
self.assertTrue(self._call(challbs[::-1], prefs, None))
def test_common_case_with_continuity(self):
challbs = (acme_util.RECOVERY_TOKEN_P,
challbs = (acme_util.POP_P,
acme_util.RECOVERY_CONTACT_P,
acme_util.DVSNI_P,
acme_util.SIMPLE_HTTP_P)
prefs = [challenges.RecoveryToken, challenges.DVSNI]
prefs = [challenges.ProofOfPossession, challenges.DVSNI]
combos = acme_util.gen_combos(challbs)
self.assertEqual(self._call(challbs, prefs, combos), (0, 2))
@ -325,21 +325,19 @@ class GenChallengePathTest(unittest.TestCase):
self.assertTrue(self._call(challbs, prefs, None))
def test_full_cont_server(self):
challbs = (acme_util.RECOVERY_TOKEN_P,
acme_util.RECOVERY_CONTACT_P,
challbs = (acme_util.RECOVERY_CONTACT_P,
acme_util.POP_P,
acme_util.DVSNI_P,
acme_util.SIMPLE_HTTP_P,
acme_util.DNS_P)
# Typical webserver client that can do everything except DNS
# Attempted to make the order realistic
prefs = [challenges.RecoveryToken,
challenges.ProofOfPossession,
prefs = [challenges.ProofOfPossession,
challenges.SimpleHTTP,
challenges.DVSNI,
challenges.RecoveryContact]
combos = acme_util.gen_combos(challbs)
self.assertEqual(self._call(challbs, prefs, combos), (0, 4))
self.assertEqual(self._call(challbs, prefs, combos), (1, 3))
# Dumb path trivial test
self.assertTrue(self._call(challbs, prefs, None))
@ -438,19 +436,19 @@ class ReportFailedChallsTest(unittest.TestCase):
self.simple_http = achallenges.SimpleHTTP(
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
domain="example.com",
key=acme_util.KEY)
account_key="key")
kwargs["chall"] = acme_util.DVSNI
self.dvsni_same = achallenges.DVSNI(
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
domain="example.com",
key=acme_util.KEY)
account_key="key")
kwargs["error"] = messages.Error(typ="dnssec", detail="detail")
self.dvsni_diff = achallenges.DVSNI(
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
domain="foo.bar",
key=acme_util.KEY)
account_key="key")
@mock.patch("letsencrypt.auth_handler.zope.component.getUtility")
def test_same_error_and_domain(self, mock_zope):

View file

@ -36,7 +36,6 @@ class NamespaceConfigTest(unittest.TestCase):
constants.CERT_DIR = 'certs'
constants.IN_PROGRESS_DIR = '../p'
constants.KEY_DIR = 'keys'
constants.REC_TOKEN_DIR = '/r'
constants.TEMP_CHECKPOINT_DIR = 't'
self.assertEqual(
@ -47,7 +46,6 @@ class NamespaceConfigTest(unittest.TestCase):
self.config.cert_key_backup, '/tmp/foo/c/acme-server.org:443/new')
self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p')
self.assertEqual(self.config.key_dir, '/tmp/config/keys')
self.assertEqual(self.config.rec_token_dir, '/r')
self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t')

View file

@ -17,54 +17,30 @@ class PerformTest(unittest.TestCase):
self.auth = ContinuityAuthenticator(
mock.MagicMock(server="demo_server.org"), None)
self.auth.rec_token.perform = mock.MagicMock(
name="rec_token_perform", side_effect=gen_client_resp)
self.auth.proof_of_pos.perform = mock.MagicMock(
name="proof_of_pos_perform", side_effect=gen_client_resp)
def test_rec_token1(self):
token = achallenges.RecoveryToken(challb=None, domain="0")
responses = self.auth.perform([token])
self.assertEqual(responses, ["RecoveryToken0"])
def test_rec_token5(self):
tokens = []
for i in xrange(5):
tokens.append(achallenges.RecoveryToken(challb=None, domain=str(i)))
responses = self.auth.perform(tokens)
self.assertEqual(len(responses), 5)
for i in xrange(5):
self.assertEqual(responses[i], "RecoveryToken%d" % i)
def test_pop_and_rec_token(self):
def test_pop(self):
achalls = []
for i in xrange(4):
if i % 2 == 0:
achalls.append(achallenges.RecoveryToken(challb=None,
domain=str(i)))
else:
achalls.append(achallenges.ProofOfPossession(challb=None,
domain=str(i)))
achalls.append(achallenges.ProofOfPossession(
challb=None, domain=str(i)))
responses = self.auth.perform(achalls)
self.assertEqual(len(responses), 4)
for i in xrange(4):
if i % 2 == 0:
self.assertEqual(responses[i], "RecoveryToken%d" % i)
else:
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
def test_unexpected(self):
self.assertRaises(
errors.ContAuthError, self.auth.perform, [
achallenges.DVSNI(challb=None, domain="0", key="invalid_key")])
achallenges.DVSNI(
challb=None, domain="0", account_key="invalid_key"),])
def test_chall_pref(self):
self.assertEqual(
self.auth.get_chall_pref("example.com"),
[challenges.ProofOfPossession, challenges.RecoveryToken])
[challenges.ProofOfPossession])
class CleanupTest(unittest.TestCase):
@ -75,24 +51,11 @@ class CleanupTest(unittest.TestCase):
self.auth = ContinuityAuthenticator(
mock.MagicMock(server="demo_server.org"), None)
self.mock_cleanup = mock.MagicMock(name="rec_token_cleanup")
self.auth.rec_token.cleanup = self.mock_cleanup
def test_rec_token2(self):
token1 = achallenges.RecoveryToken(challb=None, domain="0")
token2 = achallenges.RecoveryToken(challb=None, domain="1")
self.auth.cleanup([token1, token2])
self.assertEqual(self.mock_cleanup.call_args_list,
[mock.call(token1), mock.call(token2)])
def test_unexpected(self):
token = achallenges.RecoveryToken(challb=None, domain="0")
unexpected = achallenges.DVSNI(challb=None, domain="0", key="dummy_key")
self.assertRaises(
errors.ContAuthError, self.auth.cleanup, [token, unexpected])
unexpected = achallenges.DVSNI(
challb=None, domain="0", account_key="dummy_key")
self.assertRaises(errors.ContAuthError, self.auth.cleanup, [unexpected])
def gen_client_resp(chall):

View file

@ -227,6 +227,37 @@ class UniqueLineageNameTest(unittest.TestCase):
self.assertRaises(OSError, self._call, "wow")
class SafelyRemoveTest(unittest.TestCase):
"""Tests for letsencrypt.le_util.safely_remove."""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.path = os.path.join(self.tmp, "foo")
def tearDown(self):
shutil.rmtree(self.tmp)
def _call(self):
from letsencrypt.le_util import safely_remove
return safely_remove(self.path)
def test_exists(self):
with open(self.path, "w"):
pass # just create the file
self._call()
self.assertFalse(os.path.exists(self.path))
def test_missing(self):
self._call()
# no error, yay!
self.assertFalse(os.path.exists(self.path))
@mock.patch("letsencrypt.le_util.os.remove")
def test_other_error_passthrough(self, mock_remove):
mock_remove.side_effect = OSError
self.assertRaises(OSError, self._call)
class SafeEmailTest(unittest.TestCase):
"""Test safe_email."""
@classmethod

View file

@ -1,80 +0,0 @@
"""Tests for recovery_token.py."""
import os
import unittest
import shutil
import tempfile
import mock
from acme import challenges
from letsencrypt import achallenges
class RecoveryTokenTest(unittest.TestCase):
def setUp(self):
from letsencrypt.recovery_token import RecoveryToken
server = "demo_server"
self.base_dir = tempfile.mkdtemp("tokens")
self.token_dir = os.path.join(self.base_dir, server)
self.rec_token = RecoveryToken(server, self.base_dir)
def tearDown(self):
shutil.rmtree(self.base_dir)
def test_store_token(self):
self.rec_token.store_token("example.com", 111)
path = os.path.join(self.token_dir, "example.com")
self.assertTrue(os.path.isfile(path))
with open(path) as token_fd:
self.assertEqual(token_fd.read(), "111")
def test_requires_human(self):
self.rec_token.store_token("example2.com", 222)
self.assertFalse(self.rec_token.requires_human("example2.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
def test_cleanup(self):
self.rec_token.store_token("example3.com", 333)
self.assertFalse(self.rec_token.requires_human("example3.com"))
self.rec_token.cleanup(achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example3.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
# Shouldn't throw an error
self.rec_token.cleanup(achallenges.RecoveryToken(
challb=None, domain="example4.com"))
# SHOULD throw an error (OSError other than nonexistent file)
self.assertRaises(
OSError, self.rec_token.cleanup,
achallenges.RecoveryToken(
challb=None, domain=("a" + "r" * 10000 + ".com")))
def test_perform_stored(self):
self.rec_token.store_token("example4.com", 444)
response = self.rec_token.perform(
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example4.com"))
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="444"))
@mock.patch("letsencrypt.recovery_token.zope.component.getUtility")
def test_perform_not_stored(self, mock_input):
mock_input().input.side_effect = [(0, "555"), (1, "000")]
response = self.rec_token.perform(
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example5.com"))
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="555"))
response = self.rec_token.perform(
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example6.com"))
self.assertTrue(response is None)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -10,6 +10,7 @@ import mock
import pytz
from letsencrypt import configuration
from letsencrypt import errors
from letsencrypt.storage import ALL_FOUR
from letsencrypt.tests import test_util
@ -78,7 +79,8 @@ class RenewableCertTests(unittest.TestCase):
for kind in ALL_FOUR:
config["cert"] = "nonexistent_" + kind + ".pem"
config.filename = "nonexistent_sillyfile"
self.assertRaises(ValueError, storage.RenewableCert, config, defaults)
self.assertRaises(
errors.CertStorageError, storage.RenewableCert, config, defaults)
self.assertRaises(TypeError, storage.RenewableCert, "fun", defaults)
def test_renewal_incomplete_config(self):
@ -92,7 +94,8 @@ class RenewableCertTests(unittest.TestCase):
config["chain"] = "imaginary_chain.pem"
config["fullchain"] = "imaginary_fullchain.pem"
config.filename = "imaginary_config.conf"
self.assertRaises(ValueError, storage.RenewableCert, config, defaults)
self.assertRaises(
errors.CertStorageError, storage.RenewableCert, config, defaults)
def test_consistent(self): # pylint: disable=too-many-statements
oldcert = self.test_rc.cert
@ -481,11 +484,13 @@ class RenewableCertTests(unittest.TestCase):
# Now trigger the detection of already existing files
os.mkdir(os.path.join(
self.cli_config.live_dir, "the-lineage.com-0002"))
self.assertRaises(ValueError, storage.RenewableCert.new_lineage,
self.assertRaises(errors.CertStorageError,
storage.RenewableCert.new_lineage,
"the-lineage.com", "cert3", "privkey3", "chain3",
None, self.defaults, self.cli_config)
os.mkdir(os.path.join(self.cli_config.archive_dir, "other-example.com"))
self.assertRaises(ValueError, storage.RenewableCert.new_lineage,
self.assertRaises(errors.CertStorageError,
storage.RenewableCert.new_lineage,
"other-example.com", "cert4", "privkey4", "chain4",
None, self.defaults, self.cli_config)
# Make sure it can accept renewal parameters
@ -518,20 +523,27 @@ class RenewableCertTests(unittest.TestCase):
def test_invalid_config_filename(self, mock_uln):
from letsencrypt import storage
mock_uln.return_value = "this_does_not_end_with_dot_conf", "yikes"
self.assertRaises(ValueError, storage.RenewableCert.new_lineage,
self.assertRaises(errors.CertStorageError,
storage.RenewableCert.new_lineage,
"example.com", "cert", "privkey", "chain",
None, self.defaults, self.cli_config)
def test_bad_kind(self):
self.assertRaises(ValueError, self.test_rc.current_target, "elephant")
self.assertRaises(ValueError, self.test_rc.current_version, "elephant")
self.assertRaises(ValueError, self.test_rc.version, "elephant", 17)
self.assertRaises(ValueError, self.test_rc.available_versions,
"elephant")
self.assertRaises(ValueError, self.test_rc.newest_available_version,
"elephant")
self.assertRaises(ValueError, self.test_rc.update_link_to,
"elephant", 17)
self.assertRaises(
errors.CertStorageError, self.test_rc.current_target, "elephant")
self.assertRaises(
errors.CertStorageError, self.test_rc.current_version, "elephant")
self.assertRaises(
errors.CertStorageError, self.test_rc.version, "elephant", 17)
self.assertRaises(
errors.CertStorageError,
self.test_rc.available_versions, "elephant")
self.assertRaises(
errors.CertStorageError,
self.test_rc.newest_available_version, "elephant")
self.assertRaises(
errors.CertStorageError,
self.test_rc.update_link_to, "elephant", 17)
def test_ocsp_revoked(self):
# XXX: This is currently hardcoded to False due to a lack of an
@ -651,7 +663,7 @@ class RenewableCertTests(unittest.TestCase):
f.write("incomplete = configfile\n")
renewer.main(self.defaults, args=[
'--config-dir', self.cli_config.config_dir])
# The ValueError is caught inside and nothing happens.
# The errors.CertStorageError is caught inside and nothing happens.
if __name__ == "__main__":

View file

@ -1,14 +1,10 @@
#!/bin/sh -xe
#!/bin/sh -x
# Download and run Boulder instance for integration testing
export GOPATH="${GOPATH:-/tmp/go}"
# $ go get github.com/letsencrypt/boulder
# package github.com/letsencrypt/boulder
# imports github.com/letsencrypt/boulder
# imports github.com/letsencrypt/boulder: no buildable Go source files in /tmp/go/src/github.com/letsencrypt/boulder
go get -d github.com/letsencrypt/boulder/cmd/boulder
go get -d github.com/letsencrypt/boulder
cd $GOPATH/src/github.com/letsencrypt/boulder
./test/create_db.sh
./start.py &
# Hopefully start.py bootstraps before integration test is started...

15
tox.ini
View file

@ -6,7 +6,7 @@
# acme and letsencrypt are not yet on pypi, so when Tox invokes
# "install *.zip", it will not find deps
skipsdist = true
envlist = py26,py27,py33,py34,cover,lint
envlist = py26,py27,cover,lint
[testenv]
commands =
@ -22,16 +22,6 @@ setenv =
PYTHONHASHSEED = 0
# https://testrun.org/tox/latest/example/basic.html#special-handling-of-pythonhas
[testenv:py33]
commands =
pip install -e acme[testing]
nosetests acme
[testenv:py34]
commands =
pip install -e acme[testing]
nosetests acme
[testenv:cover]
basepython = python2.7
commands =
@ -45,8 +35,9 @@ basepython = python2.7
# duplicate code checking; if one of the commands fails, others will
# continue, but tox return code will reflect previous error
commands =
pip install -r requirements.txt -e acme -e .[dev] -e letsencrypt-apache -e letsencrypt-nginx
pip install -r requirements.txt -e acme -e .[dev] -e letsencrypt-apache -e letsencrypt-nginx -e letsencrypt-compatibility-test
pylint --rcfile=.pylintrc letsencrypt
pylint --rcfile=.pylintrc acme/acme
pylint --rcfile=.pylintrc letsencrypt-apache/letsencrypt_apache
pylint --rcfile=.pylintrc letsencrypt-nginx/letsencrypt_nginx
pylint --rcfile=.pylintrc letsencrypt-compatibility-test/letsencrypt_compatibility_test