From 9a71b18b851c58bd7cb7c6a3fdfdaee23bdc3d94 Mon Sep 17 00:00:00 2001 From: pypoet Date: Fri, 23 Oct 2015 18:26:26 -0700 Subject: [PATCH] Fix updates and merges, add testing to make sure they stay fixed. --- Config.py | 65 ++++++++++++++++++++++++++++++++++++++------- TestConfig.py | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 10 deletions(-) create mode 100644 TestConfig.py diff --git a/Config.py b/Config.py index 740de35c0..9cfbad3f0 100644 --- a/Config.py +++ b/Config.py @@ -1,6 +1,7 @@ from datetime import datetime from dateutil import parser import json +import logging import pprint @@ -8,6 +9,10 @@ import pprint make do json in *and* out, differences between configs and config extension. """ +#TODO scope logging and handlers better, control verbosity by command line flags +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) + def parse_bool_from_json(value, attr_name): if value in ('true', '1', 1, 'yes'): @@ -90,7 +95,11 @@ class BaseConfig(object): return s def update(self, newer_config, merge=False, **kwargs): + # removed 'merge' kw arg - and it was passed to constructor + # make a note to not do that, consume it on the param list fresh_config = self.__class__(**kwargs) + logger.debug('from parent update kwargs %s' % kwargs) + logger.debug('from parent update merge %s' % merge) if not isinstance(newer_config, self.__class__): raise ConfigError('Attempting to update a %s with a %s' % ( self.__class__, @@ -108,6 +117,7 @@ class BaseConfig(object): def merge(self, newer_config, **kwargs): kwargs['merge'] = True + logger.debug('from parent merge: %s' % kwargs) return self.update(newer_config, **kwargs) def to_json(self): @@ -163,8 +173,6 @@ class Config(BaseConfig): #TODO add this new_config = Config() raise NotImplemented - - def from_json_dict(self, json_dict): """Assign JSON data to Config properties and declare sub-objects. @@ -187,8 +195,7 @@ class Config(BaseConfig): elif key == 'acceptable-mxs': self.make_acceptable_mxs_dict(val) else: - #TODO log warning - print 'Unknown attribute "%s", skipping' % key + logger.warn('Unknown attribute "%s", skipping' % key) @property def author(self): @@ -294,8 +301,7 @@ class TLSPolicy(BaseConfig): elif key == 'require-valid-certificate': self.require_valid_certificate = val else: - #TODO wat, log this instead - print 'Unknown key %s' % key + logger.warn('Unknown key %s' % key) def is_valid(self): """Do simple check that config contains all required values. @@ -313,9 +319,17 @@ class TLSPolicy(BaseConfig): return True def update(self, newer_policy, **kwargs): + if not kwargs.get('domain_suffix'): + kwargs['domain_suffix'] = self.domain_suffix fresh_policy = super(self.__class__, self).update(newer_policy, - domain_suffix=self.domain_suffix) - fresh_policy.domain_suffix = self.domain_suffix + **kwargs) + logger.debug('from TLS child update %s' % kwargs) + return fresh_policy + + def merge(self, newer_policy, **kwargs): + logger.debug('from TLS child merge: %s' % kwargs) + fresh_policy = super(self.__class__, self).merge(newer_policy, + domain_suffix=self.domain_suffix) return fresh_policy @property @@ -385,6 +399,14 @@ class AcceptableMX(BaseConfig): unique_domain_suffixes.add(domain_suffix) self._data['accept-mx-domains'] = list(unique_domain_suffixes) + @property + def comment(self): + return self._data.get('comment') + + @comment.setter + def comment(self, value): + self._data['comment'] = verify_string(value, 'comment') + def is_valid(self): """Check to make sure there is one acceptable domain suffix. @@ -407,9 +429,32 @@ class AcceptableMX(BaseConfig): self.add_acceptable_mx(domain_suffix) else: self.add_acceptable_mx(val) + elif key == 'comment': + self.comment = val else: - #TODO add logging for this - print 'warning: unknown key %s' % key + logger.warn('warning: unknown key %s' % key) + + def update(self, newer_policy, **kwargs): + logger.debug('from MX child update got %s' % kwargs) + if not kwargs.get('domain'): + kwargs['domain'] = self.domain + fresh_policy = super(self.__class__, self).update(newer_policy, + **kwargs) + if kwargs.get('merge'): + new_accepted_mxs = set(self.accept_mx_domains) + new_accepted_mxs = new_accepted_mxs.union(newer_policy.accept_mx_domains) + else: + new_accepted_mxs = newer_policy.accept_mx_domains + for domain in new_accepted_mxs: + fresh_policy.add_acceptable_mx(domain) + + return fresh_policy + + def merge(self, newer_policy, **kwargs): + logger.debug('from MX child merge: %s' % kwargs) + fresh_policy = super(self.__class__, self).merge(newer_policy, + **kwargs) + return fresh_policy class ConfigError(ValueError): diff --git a/TestConfig.py b/TestConfig.py new file mode 100644 index 000000000..043eb2dca --- /dev/null +++ b/TestConfig.py @@ -0,0 +1,73 @@ +import copy +import logging +import unittest + +import Config + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) + + +class TestTLSPolicy(unittest.TestCase): + + def setUp(self): + self.old_config = Config.TLSPolicy(domain_suffix='.eff.org') + self.old_config.comment = 'Testing EFF.org TLS policy' + self.old_config.require_tls = True + self.old_config.require_valid_certificate = False + self.old_config.min_tls_version = 'TLSv1' + self.old_config.enforce_mode = 'log-only' + + self.new_config = Config.TLSPolicy(domain_suffix='.eff.org') + self.new_config.require_valid_certificate = True + self.new_config.min_tls_version = 'TLSv1.2' + self.new_config.enforce_mode = 'enforce' + + def testUpdateDropsOldSettings(self): + logger.debug('old: %s' % self.old_config) + logger.debug('new: %s' % self.new_config) + tls_policy = self.old_config.update(self.new_config) + logger.debug('just generated: %s' % tls_policy) + self.assertFalse(any([tls_policy.require_tls, tls_policy.comment])) + + def testMergeKeepsOldSettings(self): + logger.debug('old: %s' % self.old_config) + logger.debug('new: %s' % self.new_config) + tls_policy = self.old_config.merge(self.new_config, merge=True) + logger.debug('just generated: %s' % tls_policy) + self.assertTrue(all([tls_policy.require_tls, tls_policy.comment])) + + def testUpdateGetsNameSet(self): + tls_policy = self.old_config.update(self.new_config) + self.assertEquals(tls_policy.domain_suffix, self.old_config.domain_suffix) + + +class TestAcceptableMX(unittest.TestCase): + + def setUp(self): + self.old_config = Config.AcceptableMX(domain='eff.org') + self.old_config.add_acceptable_mx('.eff.org') + + def testUpdateDropsOldMXs(self): + new_bogus_mx = '.testing.eff.org' + new_config = Config.AcceptableMX(domain='eff.org') + new_config.add_acceptable_mx(new_bogus_mx) + updated_config = self.old_config.update(new_config) + self.assertNotIn('.eff.org', updated_config.accept_mx_domains) + + def testMergeKeepsOldMXs(self): + new_bogus_mx = '.testing.eff.org' + new_config = Config.AcceptableMX(domain='eff.org') + new_config.add_acceptable_mx(new_bogus_mx) + updated_config = self.old_config.merge(new_config) + self.assertListEqual(sorted(['.eff.org', '.testing.eff.org']), + sorted(updated_config.accept_mx_domains)) + + def testUpdateGetsNameSet(self): + new_policy = Config.AcceptableMX(domain=self.old_config.domain) + mx_policy = self.old_config.update(new_policy) + self.assertEquals(mx_policy.domain, self.old_config.domain) + + +if __name__ == '__main__': + unittest.main()