mirror of
https://gitlab.nic.cz/knot/knot-dns.git
synced 2026-05-28 04:02:31 -04:00
Merge branch 'xfr_lock_nonrcu' into 'master'
nameserver/XFRout: multi-msg locked by rwlock instead RCU... See merge request knot/knot-dns!1765
This commit is contained in:
commit
43de9e7655
12 changed files with 179 additions and 12 deletions
|
|
@ -95,7 +95,7 @@ static void axfr_query_cleanup(knotd_qdata_t *qdata)
|
|||
mm_free(qdata->mm, axfr);
|
||||
|
||||
/* Allow zone changes (finished). */
|
||||
rcu_read_unlock();
|
||||
pthread_rwlock_unlock(&qdata->extra->contents->xfrout_lock);
|
||||
}
|
||||
|
||||
static void axfr_answer_finished(knotd_qdata_t *qdata, knot_pkt_t *pkt, int state)
|
||||
|
|
@ -171,7 +171,7 @@ static int axfr_query_init(knotd_qdata_t *qdata)
|
|||
qdata->extra->ext_finished = &axfr_answer_finished;
|
||||
|
||||
/* No zone changes during multipacket answer (unlocked in axfr_answer_cleanup) */
|
||||
rcu_read_lock();
|
||||
pthread_rwlock_rdlock(&qdata->extra->contents->xfrout_lock);
|
||||
|
||||
return KNOT_EOK;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -148,7 +148,7 @@ static void ixfr_answer_cleanup(knotd_qdata_t *qdata)
|
|||
mm_free(qdata->mm, qdata->extra->ext);
|
||||
|
||||
/* Allow zone changes (finished). */
|
||||
rcu_read_unlock();
|
||||
pthread_rwlock_unlock(&qdata->extra->contents->xfrout_lock);
|
||||
}
|
||||
|
||||
static void ixfr_answer_finished(knotd_qdata_t *qdata, knot_pkt_t *pkt, int state)
|
||||
|
|
@ -229,7 +229,7 @@ static int ixfr_answer_init(knotd_qdata_t *qdata, uint32_t *serial_from)
|
|||
qdata->extra->ext_finished = &ixfr_answer_finished;
|
||||
|
||||
/* No zone changes during multipacket answer (unlocked in ixfr_answer_cleanup) */
|
||||
rcu_read_lock();
|
||||
pthread_rwlock_rdlock(&qdata->extra->contents->xfrout_lock);
|
||||
|
||||
return KNOT_EOK;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ const knot_layer_api_t *process_query_layer(void);
|
|||
/*! \brief Query processing intermediate data. */
|
||||
typedef struct knotd_qdata_extra {
|
||||
zone_t *zone; /*!< Zone from which is answered. */
|
||||
const zone_contents_t *contents; /*!< Zone contents from which is answered. */
|
||||
zone_contents_t *contents; /*!< Zone contents from which is answered. */
|
||||
list_t wildcards; /*!< Visited wildcards. */
|
||||
list_t rrsigs; /*!< Section RRSIGs. */
|
||||
uint8_t *opt_rr_pos; /*!< Place of the OPT RR in wire. */
|
||||
|
|
|
|||
|
|
@ -800,6 +800,7 @@ static bool counter_reach(counter_reach_t *counter, size_t increment, size_t lim
|
|||
*/
|
||||
typedef struct {
|
||||
struct rcu_head rcuhead;
|
||||
pthread_t *retry_thread;
|
||||
|
||||
zone_contents_t *free_contents;
|
||||
void (*free_method)(zone_contents_t *);
|
||||
|
|
@ -809,12 +810,27 @@ typedef struct {
|
|||
size_t new_cont_size;
|
||||
} update_clear_ctx_t;
|
||||
|
||||
static void *update_clear_retry(void *arg);
|
||||
|
||||
static void update_clear(struct rcu_head *param)
|
||||
{
|
||||
static counter_reach_t counter = { PTHREAD_MUTEX_INITIALIZER, 0 };
|
||||
|
||||
update_clear_ctx_t *ctx = (update_clear_ctx_t *)param;
|
||||
|
||||
if (ctx->free_contents != NULL) {
|
||||
if (pthread_rwlock_trywrlock(&ctx->free_contents->xfrout_lock) != 0) {
|
||||
if (*ctx->retry_thread == 0 &&
|
||||
pthread_create(ctx->retry_thread, NULL, update_clear_retry, ctx) == 0) {
|
||||
return;
|
||||
}
|
||||
log_zone_debug(ctx->free_contents->apex->owner,
|
||||
"disposal of old contents blocked by outstanding zone transfer");
|
||||
pthread_rwlock_wrlock(&ctx->free_contents->xfrout_lock);
|
||||
}
|
||||
pthread_rwlock_unlock(&ctx->free_contents->xfrout_lock);
|
||||
}
|
||||
|
||||
ctx->free_method(ctx->free_contents);
|
||||
apply_cleanup(ctx->cleanup_apply);
|
||||
free(ctx->cleanup_apply);
|
||||
|
|
@ -826,6 +842,12 @@ static void update_clear(struct rcu_head *param)
|
|||
free(ctx);
|
||||
}
|
||||
|
||||
static void *update_clear_retry(void *arg)
|
||||
{
|
||||
update_clear(arg);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void discard_adds_tree(zone_update_t *update)
|
||||
{
|
||||
additionals_tree_free(update->new_cont->adds_tree);
|
||||
|
|
@ -975,6 +997,7 @@ int zone_update_commit(conf_t *conf, zone_update_t *update)
|
|||
|
||||
update_clear_ctx_t *clear_ctx = calloc(1, sizeof(*clear_ctx));
|
||||
if (clear_ctx != NULL) {
|
||||
clear_ctx->retry_thread = &update->zone->update_clear_thr;
|
||||
clear_ctx->free_contents = old_contents;
|
||||
clear_ctx->free_method = (
|
||||
(update->flags & (UPDATE_FULL | UPDATE_HYBRID)) ?
|
||||
|
|
|
|||
|
|
@ -182,6 +182,7 @@ zone_contents_t *zone_contents_new(const knot_dname_t *apex_name, bool use_binod
|
|||
contents->apex->flags |= NODE_FLAGS_APEX;
|
||||
contents->max_ttl = UINT32_MAX;
|
||||
ATOMIC_INIT(contents->dnssec_expire, 0);
|
||||
pthread_rwlock_init(&contents->xfrout_lock, NULL);
|
||||
|
||||
return contents;
|
||||
|
||||
|
|
@ -513,6 +514,7 @@ void zone_contents_free(zone_contents_t *contents)
|
|||
additionals_tree_free(contents->adds_tree);
|
||||
|
||||
ATOMIC_DEINIT(contents->dnssec_expire);
|
||||
pthread_rwlock_destroy(&contents->xfrout_lock);
|
||||
|
||||
free(contents);
|
||||
}
|
||||
|
|
@ -523,6 +525,8 @@ void zone_contents_deep_free(zone_contents_t *contents)
|
|||
return;
|
||||
}
|
||||
|
||||
pthread_rwlock_wrlock(&contents->xfrout_lock);
|
||||
|
||||
if (contents != NULL) {
|
||||
// Delete NSEC3 tree.
|
||||
(void)zone_tree_apply(contents->nsec3_nodes,
|
||||
|
|
@ -533,6 +537,8 @@ void zone_contents_deep_free(zone_contents_t *contents)
|
|||
destroy_node_rrsets_from_tree, NULL);
|
||||
}
|
||||
|
||||
pthread_rwlock_unlock(&contents->xfrout_lock);
|
||||
|
||||
zone_contents_free(contents);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include "contrib/atomic.h"
|
||||
#include "libdnssec/nsec.h"
|
||||
#include "libknot/rrtype/nsec3param.h"
|
||||
|
|
@ -24,6 +26,10 @@ typedef struct zone_contents {
|
|||
|
||||
trie_t *adds_tree; // "additionals tree" for reverse lookup of nodes affected by additionals
|
||||
|
||||
// Responding normal queries is protected by rcu_read_lock, but for long
|
||||
// outgoing XFRs, zone-specific lock is better.
|
||||
pthread_rwlock_t xfrout_lock;
|
||||
|
||||
dnssec_nsec3_params_t nsec3_params;
|
||||
knot_atomic_uint64_t dnssec_expire;
|
||||
size_t size;
|
||||
|
|
|
|||
|
|
@ -210,6 +210,13 @@ void zone_free(zone_t **zone_ptr)
|
|||
|
||||
zone_events_deinit(zone);
|
||||
|
||||
if (zone->update_clear_thr) {
|
||||
pthread_join(zone->update_clear_thr, NULL);
|
||||
}
|
||||
|
||||
/* Free zone contents. Possible wait for XFRout lock. */
|
||||
zone_contents_deep_free(zone->contents);
|
||||
|
||||
knot_dname_free(zone->name, NULL);
|
||||
|
||||
free_ddns_queue(zone);
|
||||
|
|
@ -228,9 +235,6 @@ void zone_free(zone_t **zone_ptr)
|
|||
pthread_mutex_destroy(&zone->preferred_lock);
|
||||
free(zone->preferred_master);
|
||||
|
||||
/* Free zone contents. */
|
||||
zone_contents_deep_free(zone->contents);
|
||||
|
||||
conf_deactivate_modules(&zone->query_modules, &zone->query_plan);
|
||||
|
||||
ptrlist_free(&zone->reverse_from, NULL);
|
||||
|
|
|
|||
|
|
@ -110,6 +110,9 @@ typedef struct zone
|
|||
/*! \brief Ensue one COW transaction on zone's trees at a time. */
|
||||
knot_sem_t cow_lock;
|
||||
|
||||
/*! \brief Special thread that performs update_clear if delayed by outstanding XFR. */
|
||||
pthread_t update_clear_thr;
|
||||
|
||||
/*! \brief Pointer on running server with e.g. KASP db, journal DB, catalog... */
|
||||
struct server *server;
|
||||
|
||||
|
|
|
|||
93
tests-extra/tests/ixfr/slow/test.py
Normal file
93
tests-extra/tests/ixfr/slow/test.py
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
'''Test not blocking frequently updated zone1 by slow IXFR of zone2.'''
|
||||
|
||||
from dnstest.test import Test
|
||||
from dnstest.utils import *
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
|
||||
t = Test(tsig=False)
|
||||
|
||||
master = t.server("knot")
|
||||
zone_slow = t.zone(".")
|
||||
name_slow = zone_slow[0].name
|
||||
zone_freq = t.zone("example.")
|
||||
name_freq = zone_freq[0].name
|
||||
zones = zone_slow + zone_freq
|
||||
t.link(zones, master)
|
||||
master.dnssec(zone_slow).enable = True
|
||||
|
||||
MSGDELAY = 90
|
||||
|
||||
master.tcp_remote_io_timeout = 4000
|
||||
master.tcp_io_timeout = 4000
|
||||
|
||||
def slow_ixfr(server, zname, serial):
|
||||
server.kdig(zname, "IXFR=" + str(serial), msgdelay=MSGDELAY)
|
||||
|
||||
def send_update(up):
|
||||
try:
|
||||
up.try_send()
|
||||
except:
|
||||
pass
|
||||
|
||||
def send_up_bg(up):
|
||||
threading.Thread(target=send_update, args=[up]).start()
|
||||
|
||||
def check_blocked(server, zname):
|
||||
return server.log_search("[%s]" % zname, "blocked by")
|
||||
|
||||
t.start()
|
||||
|
||||
serial = master.zone_wait(zone_slow)
|
||||
sfirst = serial
|
||||
for i in range(12): # generating large enough IXFR so that it takes time to send
|
||||
master.ctl("zone-sign " + name_slow)
|
||||
serial = master.zone_wait(zone_slow, serial)
|
||||
|
||||
threading.Thread(target=slow_ixfr, args=[master, name_slow, sfirst]).start()
|
||||
|
||||
for i in range(5):
|
||||
owner = "abc" + str(i)
|
||||
|
||||
upf = master.update(zone_freq)
|
||||
upf.add(owner, 3600, "A", "1.2.3.4")
|
||||
send_up_bg(upf)
|
||||
|
||||
t.sleep(0.5)
|
||||
|
||||
ups = master.update(zone_slow) # updating slow zone checks that it is still protected by locks by itself
|
||||
ups.add(owner, 3600, "A", "1.2.3.4")
|
||||
send_up_bg(ups)
|
||||
|
||||
t.sleep(0.5)
|
||||
|
||||
if not master.valgrind: # in valgrind mode, processing a DDNS may take more than 2 secs!
|
||||
resp = master.dig(owner + "." + name_freq, "A")
|
||||
resp.check(rcode="NOERROR", rdata="1.2.3.4")
|
||||
else:
|
||||
if check_blocked(master, name_freq):
|
||||
set_err("BLOCKED " + name_freq)
|
||||
|
||||
t.sleep(2)
|
||||
|
||||
if not check_blocked(master, name_slow):
|
||||
set_err("NOT BLOCKED " + name_slow)
|
||||
|
||||
# check zone_contents_deep_free() while an outgoing XFR is running
|
||||
|
||||
kd = threading.Thread(target=slow_ixfr, args=[master, name_slow, sfirst])
|
||||
kd.start()
|
||||
t.sleep(1)
|
||||
if random.choice([False, True]):
|
||||
master.ctl("-f zone-purge +expire " + name_slow)
|
||||
else:
|
||||
confsock = master.ctl_sock_rnd()
|
||||
master.ctl("conf-begin", custom_parm=confsock)
|
||||
master.ctl("conf-unset zone[%s]" % name_slow, custom_parm=confsock)
|
||||
master.ctl("conf-commit", custom_parm=confsock)
|
||||
kd.join()
|
||||
|
||||
t.end()
|
||||
|
|
@ -62,6 +62,8 @@ libknot_lib = get_binary("KNOT_TEST_LIBKNOT", repo_binary("src/.libs/libknot.so"
|
|||
knot_bin = get_binary("KNOT_TEST_KNOT", repo_binary("src/knotd"))
|
||||
# KNOT_TEST_KNOTC - Knot control binary.
|
||||
knot_ctl = get_binary("KNOT_TEST_KNOTC", repo_binary("src/knotc"))
|
||||
# KNOT_TEST_KDIG - Digging binary.
|
||||
kdig_bin = get_binary("KNOT_TEST_KDIG", repo_binary("src/kdig"))
|
||||
# KNOT_TEST_KEYMGR - Knot key management binary.
|
||||
keymgr_bin = get_binary("KNOT_TEST_KEYMGR", repo_binary("src/keymgr"))
|
||||
# KNOT_TEST_KJOURNALPRINT - Knot journal print binary.
|
||||
|
|
|
|||
|
|
@ -615,6 +615,36 @@ class Server(object):
|
|||
hostname3 = socket.gethostname()
|
||||
return ("", certfile, hostname1 or hostname2 or hostname3, ssearch(gcli_s, r'pin-sha256:([^\n]*)'))
|
||||
|
||||
def kdig(self, rname, rtype, rclass="IN", dnssec=None, validate=None, msgdelay=None):
|
||||
cmd = [ params.kdig_bin, "@" + self.addr, "-p", str(self.port), "-q", rname, "-t", rtype, "-c", rclass ]
|
||||
if dnssec:
|
||||
cmd += [ "+dnssec" ]
|
||||
if validate:
|
||||
cmd += [ "+validate" ]
|
||||
if msgdelay:
|
||||
cmd += [ "+msgdelay=" + str(msgdelay) ]
|
||||
outcome = run(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||
out_s = outcome.stdout.rstrip()
|
||||
err_s = outcome.stderr.rstrip()
|
||||
with open(self.dir + "/kdig.out", mode="a") as sout:
|
||||
sout.write(out_s)
|
||||
sout.write("\n")
|
||||
with open(self.dir + "/kdig.err", mode="a") as serr:
|
||||
serr.write(err_s)
|
||||
serr.write("\n")
|
||||
|
||||
if validate and "validation support not compiled" in err_s:
|
||||
return out_s
|
||||
if outcome.returncode != 0:
|
||||
set_err("KDIG FAILED")
|
||||
|
||||
expect = "OK"
|
||||
if isinstance(validate, str):
|
||||
expect = validate
|
||||
if validate and not (("DNSSEC VALIDATION: %s!" % expect) in out_s):
|
||||
set_err("KDIG VALIDATION")
|
||||
return out_s
|
||||
|
||||
def dig(self, rname, rtype, rclass="IN", udp=None, serial=None, timeout=None,
|
||||
tries=3, flags="", bufsize=None, edns=None, nsid=False, dnssec=False,
|
||||
log_no_sep=False, tsig=None, addr=None, source=None, xdp=None):
|
||||
|
|
@ -810,8 +840,8 @@ class Server(object):
|
|||
raise Failed("Can't send RAW data (%d bytes) to server='%s'" %
|
||||
(len(data), self.name))
|
||||
|
||||
def log_search(self, pattern):
|
||||
return fsearch(self.fout, pattern) or fsearch(self.ferr, pattern)
|
||||
def log_search(self, pattern, pattern2=None):
|
||||
return fsearch(self.fout, pattern, pattern2) or fsearch(self.ferr, pattern, pattern2)
|
||||
|
||||
def log_search_count(self, pattern):
|
||||
return fsearch_count(self.fout, pattern) + fsearch_count(self.ferr, pattern)
|
||||
|
|
|
|||
|
|
@ -57,10 +57,10 @@ def ssearch(s, pattern):
|
|||
else:
|
||||
return found.groups()[0]
|
||||
|
||||
def fsearch(fname, pattern):
|
||||
def fsearch(fname, pattern, pattern2=None):
|
||||
with open(fname) as f:
|
||||
for line in f:
|
||||
if pattern in line:
|
||||
if pattern in line and (pattern2 is None or pattern2 in line):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue