Fix updates and merges, add testing to make sure they stay fixed.

This commit is contained in:
pypoet 2015-10-23 18:26:26 -07:00
parent 6da5de6b19
commit 9a71b18b85
2 changed files with 128 additions and 10 deletions

View file

@ -1,6 +1,7 @@
from datetime import datetime
from dateutil import parser
import json
import logging
import pprint
@ -8,6 +9,10 @@ import pprint
make do json in *and* out, differences between configs and config extension.
"""
#TODO scope logging and handlers better, control verbosity by command line flags
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
def parse_bool_from_json(value, attr_name):
if value in ('true', '1', 1, 'yes'):
@ -90,7 +95,11 @@ class BaseConfig(object):
return s
def update(self, newer_config, merge=False, **kwargs):
# removed 'merge' kw arg - and it was passed to constructor
# make a note to not do that, consume it on the param list
fresh_config = self.__class__(**kwargs)
logger.debug('from parent update kwargs %s' % kwargs)
logger.debug('from parent update merge %s' % merge)
if not isinstance(newer_config, self.__class__):
raise ConfigError('Attempting to update a %s with a %s' % (
self.__class__,
@ -108,6 +117,7 @@ class BaseConfig(object):
def merge(self, newer_config, **kwargs):
kwargs['merge'] = True
logger.debug('from parent merge: %s' % kwargs)
return self.update(newer_config, **kwargs)
def to_json(self):
@ -163,8 +173,6 @@ class Config(BaseConfig):
#TODO add this
new_config = Config()
raise NotImplemented
def from_json_dict(self, json_dict):
"""Assign JSON data to Config properties and declare sub-objects.
@ -187,8 +195,7 @@ class Config(BaseConfig):
elif key == 'acceptable-mxs':
self.make_acceptable_mxs_dict(val)
else:
#TODO log warning
print 'Unknown attribute "%s", skipping' % key
logger.warn('Unknown attribute "%s", skipping' % key)
@property
def author(self):
@ -294,8 +301,7 @@ class TLSPolicy(BaseConfig):
elif key == 'require-valid-certificate':
self.require_valid_certificate = val
else:
#TODO wat, log this instead
print 'Unknown key %s' % key
logger.warn('Unknown key %s' % key)
def is_valid(self):
"""Do simple check that config contains all required values.
@ -313,9 +319,17 @@ class TLSPolicy(BaseConfig):
return True
def update(self, newer_policy, **kwargs):
if not kwargs.get('domain_suffix'):
kwargs['domain_suffix'] = self.domain_suffix
fresh_policy = super(self.__class__, self).update(newer_policy,
domain_suffix=self.domain_suffix)
fresh_policy.domain_suffix = self.domain_suffix
**kwargs)
logger.debug('from TLS child update %s' % kwargs)
return fresh_policy
def merge(self, newer_policy, **kwargs):
logger.debug('from TLS child merge: %s' % kwargs)
fresh_policy = super(self.__class__, self).merge(newer_policy,
domain_suffix=self.domain_suffix)
return fresh_policy
@property
@ -385,6 +399,14 @@ class AcceptableMX(BaseConfig):
unique_domain_suffixes.add(domain_suffix)
self._data['accept-mx-domains'] = list(unique_domain_suffixes)
@property
def comment(self):
return self._data.get('comment')
@comment.setter
def comment(self, value):
self._data['comment'] = verify_string(value, 'comment')
def is_valid(self):
"""Check to make sure there is one acceptable domain suffix.
@ -407,9 +429,32 @@ class AcceptableMX(BaseConfig):
self.add_acceptable_mx(domain_suffix)
else:
self.add_acceptable_mx(val)
elif key == 'comment':
self.comment = val
else:
#TODO add logging for this
print 'warning: unknown key %s' % key
logger.warn('warning: unknown key %s' % key)
def update(self, newer_policy, **kwargs):
logger.debug('from MX child update got %s' % kwargs)
if not kwargs.get('domain'):
kwargs['domain'] = self.domain
fresh_policy = super(self.__class__, self).update(newer_policy,
**kwargs)
if kwargs.get('merge'):
new_accepted_mxs = set(self.accept_mx_domains)
new_accepted_mxs = new_accepted_mxs.union(newer_policy.accept_mx_domains)
else:
new_accepted_mxs = newer_policy.accept_mx_domains
for domain in new_accepted_mxs:
fresh_policy.add_acceptable_mx(domain)
return fresh_policy
def merge(self, newer_policy, **kwargs):
logger.debug('from MX child merge: %s' % kwargs)
fresh_policy = super(self.__class__, self).merge(newer_policy,
**kwargs)
return fresh_policy
class ConfigError(ValueError):

73
TestConfig.py Normal file
View file

@ -0,0 +1,73 @@
import copy
import logging
import unittest
import Config
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
class TestTLSPolicy(unittest.TestCase):
def setUp(self):
self.old_config = Config.TLSPolicy(domain_suffix='.eff.org')
self.old_config.comment = 'Testing EFF.org TLS policy'
self.old_config.require_tls = True
self.old_config.require_valid_certificate = False
self.old_config.min_tls_version = 'TLSv1'
self.old_config.enforce_mode = 'log-only'
self.new_config = Config.TLSPolicy(domain_suffix='.eff.org')
self.new_config.require_valid_certificate = True
self.new_config.min_tls_version = 'TLSv1.2'
self.new_config.enforce_mode = 'enforce'
def testUpdateDropsOldSettings(self):
logger.debug('old: %s' % self.old_config)
logger.debug('new: %s' % self.new_config)
tls_policy = self.old_config.update(self.new_config)
logger.debug('just generated: %s' % tls_policy)
self.assertFalse(any([tls_policy.require_tls, tls_policy.comment]))
def testMergeKeepsOldSettings(self):
logger.debug('old: %s' % self.old_config)
logger.debug('new: %s' % self.new_config)
tls_policy = self.old_config.merge(self.new_config, merge=True)
logger.debug('just generated: %s' % tls_policy)
self.assertTrue(all([tls_policy.require_tls, tls_policy.comment]))
def testUpdateGetsNameSet(self):
tls_policy = self.old_config.update(self.new_config)
self.assertEquals(tls_policy.domain_suffix, self.old_config.domain_suffix)
class TestAcceptableMX(unittest.TestCase):
def setUp(self):
self.old_config = Config.AcceptableMX(domain='eff.org')
self.old_config.add_acceptable_mx('.eff.org')
def testUpdateDropsOldMXs(self):
new_bogus_mx = '.testing.eff.org'
new_config = Config.AcceptableMX(domain='eff.org')
new_config.add_acceptable_mx(new_bogus_mx)
updated_config = self.old_config.update(new_config)
self.assertNotIn('.eff.org', updated_config.accept_mx_domains)
def testMergeKeepsOldMXs(self):
new_bogus_mx = '.testing.eff.org'
new_config = Config.AcceptableMX(domain='eff.org')
new_config.add_acceptable_mx(new_bogus_mx)
updated_config = self.old_config.merge(new_config)
self.assertListEqual(sorted(['.eff.org', '.testing.eff.org']),
sorted(updated_config.accept_mx_domains))
def testUpdateGetsNameSet(self):
new_policy = Config.AcceptableMX(domain=self.old_config.domain)
mx_policy = self.old_config.update(new_policy)
self.assertEquals(mx_policy.domain, self.old_config.domain)
if __name__ == '__main__':
unittest.main()