Services: Unbound DNS: Blocklists - split logic in update_blocklist() so we can reuse it easily in list_configuration().

Functionally this shouldn't change anything, but when building additional handlers, it's practical to show priorities of the ones that are registered.

To use the list action, simply call:

./blocklists.py list
This commit is contained in:
Ad Schellevis 2026-02-15 13:44:15 +01:00
parent 4a6094b001
commit f399b33df9
2 changed files with 38 additions and 14 deletions

View file

@ -43,8 +43,9 @@ if __name__ == '__main__':
help="Action to perform"
)
# update subcommand
# update/list subcommands
subparsers.add_parser("update", help="Update blocklist")
subparsers.add_parser("list", help="List blocklist configuration sorted by priority")
args = parser.parse_args()
@ -54,3 +55,5 @@ if __name__ == '__main__':
parser_obj.modify_blocklist(args.uuid, args.domain, args.action)
elif args.command == "update":
parser_obj.update_blocklist()
elif args.command == "list":
parser_obj.list_configuration()

View file

@ -204,18 +204,9 @@ class BlocklistParser:
# atomically replace the current dnsbl so unbound can pick up on it
os.replace('/var/unbound/data/dnsbl.json.new', '/var/unbound/data/dnsbl.json')
def update_blocklist(self):
merged_result = {
'data': {},
'config': {
'general': {
'has_wildcards': False
}
}
}
# collect data from all handlers
def _get_policies(self, add_handler=False):
# collect and sort policies by priority
policies = {}
blocklists = {}
for hidx, handler in enumerate(self.handlers):
for policy in handler.get_policies():
policies[policy['id']] = policy.copy()
@ -229,6 +220,17 @@ class BlocklistParser:
# persist sort criteria, network prio and handler index
policies[policy['id']]['prio'] = prio
policies[policy['id']]['hidx'] = hidx
if add_handler:
policies[policy['id']]['handler'] = handler
# sort policies by priority (smallest net first)
policies = dict(sorted(policies.items(), key=lambda x: (x[1]['prio'], x[1]['hidx'])))
return policies
def _get_blocklists(self):
# return a dictionary with domains and policies
blocklists = {}
for handler in self.handlers:
# blocklist items per domain and policy
for domain, policy, cnf in handler.blocklists_iter():
if domain not in blocklists:
@ -236,9 +238,20 @@ class BlocklistParser:
if policy not in blocklists[domain]:
blocklists[domain][policy] = []
blocklists[domain][policy].append(cnf)
return blocklists
# sort policies by priority (smallest net first)
policies = dict(sorted(policies.items(), key=lambda x: (x[1]['prio'], x[1]['hidx'])))
def update_blocklist(self):
merged_result = {
'data': {},
'config': {
'general': {
'has_wildcards': False
}
}
}
# collect data from all handlers
policies = self._get_policies()
blocklists = self._get_blocklists()
# build domain list sorted by policy priority
data = {}
@ -280,3 +293,11 @@ class BlocklistParser:
handlers.append(cls())
handlers.sort(key=lambda h: h.priority)
self.handlers = handlers
def list_configuration(self):
# small debug function showing configured lists and their priorities
for policy in self._get_policies(True).values():
print (policy['handler'].__class__.__name__)
for item in policy.items():
if item[0] not in ['handler']:
print("\t%s = %s" % (item[0], item[1]))