Rounds out missing features and is now on par with ConfigParser.py.

Still missing logging, composibility and a couple of attributes.
This commit is contained in:
pypoet 2015-10-14 02:49:34 -04:00
parent fe17c873c0
commit 147f58bdbc

165
Config.py
View file

@ -1,11 +1,14 @@
from datetime import datetime
from dateutil import parser
import json
import pprint
"""Idea here being to start with something that is decomposed so it's easier to
make do json in *and* out, differences between configs and config extension.
"""
def parse_bool_from_json(value, attr_name):
if value in ('true', '1', 1, 'yes'):
bool_value = True
@ -14,26 +17,27 @@ def parse_bool_from_json(value, attr_name):
elif value in (True, False):
bool_value = value
else:
raise ValueError('Config value %s is an invalid boolean value.' % attr_name)
raise ConfigError('Config value %s is an invalid boolean value.' % attr_name)
return bool_value
def parse_timestamp(value, attr_name):
#TODO support full extended timestamp "2014-06-06T14:30:16+00:00" as well
if isinstance(value, datetime):
dt = value
else:
try:
ts = int(value)
dt = datetime.fromtimestamp(ts)
except:
raise ValueError('Config value %s is an invalid timestamp integer.' % attr_name)
return dt
return value
try:
ts = int(value)
return datetime.fromtimestamp(ts)
except (TypeError, ValueError):
pass
try:
return parser.parse(value)
except (TypeError, ValueError):
raise ConfigError('Config value %s is an invalid date or timestamp.' % attr_name)
def verify_member_of(value, member_list, attr_name):
if value not in member_list:
raise ValueError('Config value "%s" must be one of (%s)' % (
raise ConfigError('Config value "%s" must be one of (%s)' % (
attr_name, ', '.join(member_list))
)
return value
@ -41,13 +45,67 @@ def verify_member_of(value, member_list, attr_name):
def verify_string(value, attr_name, max_length=200):
if not isinstance(value, (str, unicode)):
raise TypeError('Config value %s must be a string.' % attr_name)
raise ConfigError('Config value %s must be a string.' % attr_name)
if len(value) > max_length:
raise ValueError('Config value %s is too long.' % attr_name)
raise ConfigError('Config value %s is too long.' % attr_name)
return value
class Config(object):
def to_dict(config_dict):
"""Cleans up BaseConfig children to be serialized."""
d = {}
for key, val in config_dict.iteritems():
if isinstance(val, BaseConfig):
d[key] = to_dict(val._data)
elif isinstance(val, datetime):
d[key] = val.strftime('%Y-%m-%dT%H:%M:%S%z')
elif isinstance(val, dict):
d[key] = to_dict(val)
else:
d[key] = val
return d
class BaseConfig(object):
"""Top level config class for common methods."""
def __init__(self):
# container for validated properties with JSON names
self._data = {}
def __repr__(self):
s = '< %s %s >' % (self.__class__.__name__,
pprint.pformat(self._data))
return s
def to_json(self):
d = to_dict(self._data)
return json.dumps(d)
def write_to_json_file(self, json_filename, f_open=open):
data = self.to_json()
try:
with f_open(json_filename, 'w') as f:
f.write(data)
except IOError:
raise
def load_from_json_file(self, json_filename, f_open=open):
try:
with f_open(json_filename, 'r') as f:
json_str = f.read()
json_dict = json.loads(json_str)
except IOError:
raise
except ValueError:
raise ConfigError('No valid JSON found in file: %s' % json_filename)
self.from_json_dict(json_dict)
def from_json_dict(self, json_dict):
raise NotImplmented('BaseConfig should not be populated.')
class Config(BaseConfig):
"""Config container for StartTLS Everywhere configuration.
Intended as a simple container that unifies where validatation occurs,
@ -59,37 +117,20 @@ class Config(object):
"""
def __init__(self):
# container for validated properties with JSON names
self._data = {}
self.tls_policies = []
self.acceptable_mxs = []
super(self.__class__, self).__init__()
self._data['tls-policies'] = {}
self._data['acceptable-mxs'] = {}
def __add__(self, other_config):
"""Allow addition but not really of *full* configs, need to flesh that out."""
#TODO add this
raise NotImplemented
def __repr__(self):
#TODO fix this generically, and maybe put it in the inheritence tree
s = '<StartTLS-Everywhere Config\n%s\n>' % (self._data.iteritems())
return s
def update(self, other_config):
"""Update properties of config from a 'newer' config and force verification."""
#TODO add this
raise NotImplemented
def load_from_json_file(self, json_filename, f_open=open):
#TODO add robust catching and checking
# try:
with f_open(json_filename, 'r') as f:
json_str = f.read()
json_dict = json.loads(json_str)
# except oserr
# except json parse err
self.from_json_dict(json_dict)
def from_json_dict(self, json_dict):
"""Assign JSON data to Config properties and declare sub-objects.
@ -107,17 +148,13 @@ class Config(object):
elif key == 'timestamp':
self.timestamp = val
elif key == 'tls-policies':
self.tls_policies = self.make_tls_policy_dict(val)
self.make_tls_policy_dict(val)
elif key == 'acceptable-mxs':
self.acceptable_mxs = self.make_acceptable_mxs_dict(val)
self.make_acceptable_mxs_dict(val)
else:
#TODO log warning
print 'Unknown attribute "%s", skipping' % key
def to_json(self):
#TODO implement output and make sure it can be re-input with identical results
raise NotImplemented
@property
def author(self):
return self._data.get('author')
@ -151,47 +188,42 @@ class Config(object):
self._data['timestamp'] = parse_timestamp(value, 'timestamp')
def make_tls_policy_dict(self, policy_dict):
tls_policy_dict = {}
tls_policy_dict = self._data['tls-policies']
for domain_suffix, settings in policy_dict.iteritems():
new_domain_policy = TLSPolicy(domain_suffix)
#TODO define config errs and use
#try
new_domain_policy.from_json_dict(settings)
#except config err
try:
new_domain_policy.from_json_dict(settings)
except ConfigError as e:
raise
tls_policy_dict[domain_suffix] = new_domain_policy
return tls_policy_dict
def make_acceptable_mxs_dict(self, mxs_dict):
acceptable_mxs_dict = {}
acceptable_mxs_dict = self._data['acceptable-mxs']
for domain, settings in mxs_dict.iteritems():
new_domain_policy = AcceptableMX(domain)
#TODO define config errs and use
#try
new_domain_policy.from_json_dict(settings)
#except config err
try:
new_domain_policy.from_json_dict(settings)
except ConfigError as e:
raise
acceptable_mxs_dict[domain] = new_domain_policy
return acceptable_mxs_dict
def is_valid(self):
#TODO implement with checks to make sure domains don't overlap
# and every acceptable mx has a tls policy, etc.
raise NotImplemented
class TLSPolicy(object):
class TLSPolicy(BaseConfig):
ENFORCE_MODES = ('enforce', 'log-only')
TLS_VERSIONS = ('TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3')
def __init__(self, domain_suffix):
# container for validated properties with JSON names
self._data = {}
super(self.__class__, self).__init__()
self.domain_suffix = domain_suffix
#TODO add me
self.accept_spki_hashs = None
#TODO add me
self.error_notification = None
#TODO add support for two designed but yet unsupported attrs
# self._data['accept-spki-hashs'] = None
# self._data['error-notification'] = None
def from_json_dict(self, json_dict):
for key, val in json_dict.iteritems():
@ -268,7 +300,7 @@ class TLSPolicy(object):
self._data['require-valid-certificate'] = parse_bool_from_json(value, 'require-valid-certificate')
class AcceptableMX(object):
class AcceptableMX(BaseConfig):
"""Holds acceptable MX domain suffixes for a single mail serving domain.
Such as for gmail.com that single mail serving suffix domain is:
@ -278,9 +310,8 @@ class AcceptableMX(object):
for the suffix domains.
"""
def __init__(self, domain):
super(self.__class__, self).__init__()
self.domain = domain
# container for validated properties with JSON names
self._data = {}
self._data['accept-mx-domains'] = []
def add_acceptable_mx(self, domain_suffix):
@ -313,3 +344,9 @@ class AcceptableMX(object):
else:
#TODO add logging for this
print 'warning: unknown key %s' % key
class ConfigError(ValueError):
def __init__(self, message):
super(self.__class__, self).__init__(message)