From 627777d6438e58967fbec2cc42d2dcf733cee533 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Ondr=C3=A1=C4=8Dek?= Date: Wed, 17 Jan 2024 14:29:11 +0100 Subject: [PATCH] rrl: multi-prefix queries WIP: batch queries WIP: multi-prefix queries with unit tests, unoptimized WIP: optimize multi-prefix queries WIP: nits - avoid -Warray-bounds for array[-1] I thought it was more readable that way, but warnings would be annoying. - use `hash_t` more - drop old FIXME: I believe it was just confused symbols, not code WIP: deduplicate the choice of SipHash parameters --- src/knot/modules/rrl/functions.c | 155 +++---------- src/knot/modules/rrl/kru-decay.inc.c | 2 +- src/knot/modules/rrl/kru-tests.c | 61 +++-- src/knot/modules/rrl/kru.h | 32 ++- src/knot/modules/rrl/kru.inc.c | 318 +++++++++++++++++++++------ tests/modules/test_rrl.c | 89 +++++++- 6 files changed, 423 insertions(+), 234 deletions(-) diff --git a/src/knot/modules/rrl/functions.c b/src/knot/modules/rrl/functions.c index 559a4e6e1..464dea625 100644 --- a/src/knot/modules/rrl/functions.c +++ b/src/knot/modules/rrl/functions.c @@ -25,134 +25,22 @@ #include "libdnssec/error.h" #include "libdnssec/random.h" -/* Limits (class, ipv6 remote, FIXME dname) */ -#define RRL_CLSBLK_MAXLEN 16 /* CIDR block prefix lengths for v4/v6 */ -#define RRL_V4_PREFIX_LEN 3 /* /24 */ -#define RRL_V6_PREFIX_LEN 7 /* /56 */ -/* Defaults */ -#define RRL_PSIZE_LARGE 1024 +// Hardcoded also in unit tests. -/* Classification */ -enum { - CLS_NULL = 0 << 0, /* Empty bucket. */ - CLS_NORMAL = 1 << 0, /* Normal response. */ - CLS_ERROR = 1 << 1, /* Error response. */ - CLS_NXDOMAIN = 1 << 2, /* NXDOMAIN (special case of error). */ - CLS_EMPTY = 1 << 3, /* Empty response. */ - CLS_LARGE = 1 << 4, /* Response size over threshold (1024k). */ - CLS_WILDCARD = 1 << 5, /* Wildcard query. */ - CLS_ANY = 1 << 6, /* ANY query (spec. class). */ - CLS_DNSSEC = 1 << 7 /* DNSSEC related RR query (spec. class) */ -}; +#define RRL_V4_PREFIXES (uint8_t[]) { 24, 28, 32} +#define RRL_V4_PRICES (uint16_t[]) { 1 << 5, 1 << 7, 1 << 9} -/* Classification string. */ -struct cls_name { - int code; - const char *name; -}; +#define RRL_V6_PREFIXES (uint8_t[]) { 32, 56, 64, 128} +#define RRL_V6_PRICES (uint16_t[]) { 1 << 0, 1 << 5, 1 << 7, 1 << 9} -static const struct cls_name rrl_cls_names[] = { - { CLS_NORMAL, "POSITIVE" }, - { CLS_ERROR, "ERROR" }, - { CLS_NXDOMAIN, "NXDOMAIN"}, - { CLS_EMPTY, "EMPTY"}, - { CLS_LARGE, "LARGE"}, - { CLS_WILDCARD, "WILDCARD"}, - { CLS_ANY, "ANY"}, - { CLS_DNSSEC, "DNSSEC"}, - { CLS_NULL, "NULL"}, - { CLS_NULL, NULL} -}; +#define RRL_V4_PREFIXES_CNT (sizeof(RRL_V4_PREFIXES) / sizeof(*RRL_V4_PREFIXES)) +#define RRL_V6_PREFIXES_CNT (sizeof(RRL_V6_PREFIXES) / sizeof(*RRL_V6_PREFIXES)) +#define RRL_MAX_PREFIXES_CNT ((RRL_V4_PREFIXES_CNT > RRL_V6_PREFIXES_CNT) ? RRL_V4_PREFIXES_CNT : RRL_V6_PREFIXES_CNT) -static inline const char *rrl_clsstr(int code) -{ - for (const struct cls_name *c = rrl_cls_names; c->name; c++) { - if (c->code == code) { - return c->name; - } - } - return "unknown class"; -} - -/* Bucket flags. */ -enum { - RRL_BF_NULL = 0 << 0, /* No flags. */ - RRL_BF_SSTART = 1 << 0, /* Bucket in slow-start after collision. */ - RRL_BF_ELIMIT = 1 << 1 /* Bucket is rate-limited. */ -}; - -static uint8_t rrl_clsid(rrl_req_t *p) -{ - /* Check error code */ - int ret = CLS_NULL; - switch (knot_wire_get_rcode(p->wire)) { - case KNOT_RCODE_NOERROR: ret = CLS_NORMAL; break; - case KNOT_RCODE_NXDOMAIN: return CLS_NXDOMAIN; break; - default: return CLS_ERROR; break; - } - - /* Check if answered from a qname */ - if (ret == CLS_NORMAL && p->flags & RRL_REQ_WILDCARD) { - return CLS_WILDCARD; - } - - /* Check query type for spec. classes. */ - if (p->query) { - switch(knot_pkt_qtype(p->query)) { - case KNOT_RRTYPE_ANY: /* ANY spec. class */ - return CLS_ANY; - break; - case KNOT_RRTYPE_DNSKEY: - case KNOT_RRTYPE_RRSIG: - case KNOT_RRTYPE_DS: /* DNSSEC-related RR class. */ - return CLS_DNSSEC; - break; - default: - break; - } - } - - /* Check packet size for threshold. */ - if (p->len >= RRL_PSIZE_LARGE) { - return CLS_LARGE; - } - - /* Check ancount */ - if (knot_wire_get_ancount(p->wire) == 0) { - return CLS_EMPTY; - } - - return ret; -} - -static int rrl_classify(uint8_t *dst, size_t maxlen, const struct sockaddr_storage *remote, - rrl_req_t *req, const knot_dname_t *name) -{ - /* Class */ - uint8_t cls = rrl_clsid(req); - *dst = cls; - int blklen = sizeof(cls); - - /* Address (in network byteorder, adjust masks). */ - uint64_t netblk = 0; - if (remote->ss_family == AF_INET6) { - struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)remote; - memcpy(&netblk, &ipv6->sin6_addr, RRL_V6_PREFIX_LEN); - } else { - struct sockaddr_in *ipv4 = (struct sockaddr_in *)remote; - memcpy(&netblk, &ipv4->sin_addr, RRL_V4_PREFIX_LEN); - } - memcpy(dst + blklen, &netblk, sizeof(netblk)); - blklen += sizeof(netblk); - - /* Name not considered anymore. */ - - return blklen; -} - -static void subnet_tostr(char *dst, size_t maxlen, const struct sockaddr_storage *ss) +/* +static void subnet_tostr(char *dst, size_t maxlen, const struct sockaddr_storage *ss) // TODO remove or adapt { const void *addr; const char *suffix; @@ -196,6 +84,7 @@ static void rrl_log_state(knotd_mod_t *mod, const struct sockaddr_storage *ss, knotd_mod_log(mod, LOG_NOTICE, "address/subnet %s, class %s, qname %s, %s limiting", addr_str, rrl_clsstr(cls), qname_str, what); } +*/ rrl_table_t *rrl_create(size_t size, uint32_t rate) { @@ -218,17 +107,25 @@ int rrl_query(rrl_table_t *rrl, const struct sockaddr_storage *remote, return KNOT_EINVAL; } - uint8_t buf[RRL_CLSBLK_MAXLEN] ALIGNED(16) = { 0 }; - size_t buf_len = rrl_classify(buf, RRL_CLSBLK_MAXLEN, remote, req, zone); - if (buf_len < 0) { - return KNOT_ERROR; - } - struct timespec now_ts = {0}; clock_gettime(CLOCK_MONOTONIC_COARSE, &now_ts); uint32_t now = now_ts.tv_sec * 1000 + now_ts.tv_nsec / 1000000; - return KRU.limited(rrl, (char *)buf, now, 1<<9) ? KNOT_ELIMIT : KNOT_EOK; // TODO set price + uint8_t key[16] ALIGNED(16) = {0, }; + if (remote->ss_family == AF_INET6) { + struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)remote; + memcpy(key, &ipv6->sin6_addr, 16); + + return KRU.limited_multi_prefix_or(rrl, now, 1, key, RRL_V6_PREFIXES, RRL_V6_PRICES, RRL_V6_PREFIXES_CNT) + ? KNOT_ELIMIT : KNOT_EOK; + + } else { + struct sockaddr_in *ipv4 = (struct sockaddr_in *)remote; + memcpy(key, &ipv4->sin_addr, 4); + + return KRU.limited_multi_prefix_or(rrl, now, 0, key, RRL_V4_PREFIXES, RRL_V4_PRICES, RRL_V4_PREFIXES_CNT) + ? KNOT_ELIMIT : KNOT_EOK; + } } bool rrl_slip_roll(int n_slip) diff --git a/src/knot/modules/rrl/kru-decay.inc.c b/src/knot/modules/rrl/kru-decay.inc.c index 68a425c90..3dd673adf 100644 --- a/src/knot/modules/rrl/kru-decay.inc.c +++ b/src/knot/modules/rrl/kru-decay.inc.c @@ -16,7 +16,7 @@ struct decay_config { }; /// Catch up the time drift with configurably slower decay. -static void update_time(struct load_cl *l, const uint32_t time_now, +static inline void update_time(struct load_cl *l, const uint32_t time_now, const struct decay_config *decay) { uint32_t ticks; diff --git a/src/knot/modules/rrl/kru-tests.c b/src/knot/modules/rrl/kru-tests.c index e78005e03..0cb160f11 100644 --- a/src/knot/modules/rrl/kru-tests.c +++ b/src/knot/modules/rrl/kru-tests.c @@ -72,11 +72,11 @@ void test_stage(struct test_ctx *ctx, uint32_t dur) { size_t cat; for (cat = 0; freq_bounds[cat] <= rnd; cat++); - uint64_t key[2] = {0,}; + uint64_t key[2] ALIGNED(16) = {0}; key[0] = random() % (ctx->cats[cat].id_max - ctx->cats[cat].id_min + 1) + ctx->cats[cat].id_min; ctx->cats[cat].total++; - ctx->cats[cat].passed += !KRU.limited(ctx->kru, (char *)key, ctx->time, ctx->price); + ctx->cats[cat].passed += !KRU.limited(ctx->kru, ctx->time, (uint8_t *)key, ctx->price); } } } @@ -198,11 +198,15 @@ void test_multi_attackers(void) { /*=== benchmarking time performance ===*/ -#define TIMED_TESTS_TABLE_SIZE_LOG 16 -#define TIMED_TESTS_PRICE (1 << 9) -#define TIMED_TESTS_QUERIES (1 << 26) // 28 -#define TIMED_TESTS_MAX_THREADS 12 -#define TIMED_TESTS_WAIT_BEFORE_SEC 2 // 60 +#define TIMED_TESTS_TABLE_SIZE_LOG 16 +#define TIMED_TESTS_PRICE (1 << 9) +#define TIMED_TESTS_QUERIES (1 << 26) +#define TIMED_TESTS_TIME_UPDATE_PERIOD 4 +#define TIMED_TESTS_MAX_THREADS 64 +#define TIMED_TESTS_WAIT_BEFORE_SEC 2 + +#define TIMED_TESTS_BATCH_SIZE 1 // each query still counted individually in MQPS; should be set to 1 if PREFIXES are set +#define TIMED_TESTS_PREFIXES (uint8_t []){64, 65, 66, 67} // one query contains all prefixes, MQPS is lowered struct timed_test_ctx { struct kru *kru; @@ -213,15 +217,42 @@ struct timed_test_ctx { void *timed_runnable(void *arg) { struct timed_test_ctx *ctx = arg; - uint64_t key[2] = {0,}; - for (uint64_t i = ctx->first_query; i < TIMED_TESTS_QUERIES; i += ctx->increment) { - struct timespec now_ts = {0}; - clock_gettime(CLOCK_MONOTONIC_COARSE, &now_ts); - uint32_t now_msec = now_ts.tv_sec * 1000 + now_ts.tv_nsec / 1000000; + struct timespec now_ts = {0}; + uint32_t now_msec = 0; + uint64_t now_last_update = -TIMED_TESTS_TIME_UPDATE_PERIOD * ctx->increment; - key[0] = i * ctx->key_mult; - KRU.limited(ctx->kru, (char *)key, now_msec, TIMED_TESTS_PRICE); +#ifdef TIMED_TESTS_PREFIXES + uint16_t prices[sizeof(TIMED_TESTS_PREFIXES)]; +#else + uint16_t prices[TIMED_TESTS_BATCH_SIZE]; +#endif + for (size_t j = 0; j < sizeof(prices)/sizeof(*prices); j++) { + prices[j] = TIMED_TESTS_PRICE; + } + + for (uint64_t i = ctx->first_query; i < TIMED_TESTS_QUERIES; ) { + if (i >= now_last_update + TIMED_TESTS_TIME_UPDATE_PERIOD * ctx->increment) { + clock_gettime(CLOCK_MONOTONIC_COARSE, &now_ts); + now_msec = now_ts.tv_sec * 1000 + now_ts.tv_nsec / 1000000; + now_last_update = i; + } + + uint64_t key_values[TIMED_TESTS_BATCH_SIZE * 2] = {0,}; + uint8_t *keys[TIMED_TESTS_BATCH_SIZE]; + + for (size_t j = 0; j < TIMED_TESTS_BATCH_SIZE; j++) { + key_values[2 * j] = i * ctx->key_mult; + key_values[2 * j + 1] = 0xFFFFFFFFFFFFFFFFll; + keys[j] = (uint8_t *)(key_values + 2 * j); + i += ctx->increment; + } + +#ifdef TIMED_TESTS_PREFIXES + KRU.limited_multi_prefix_or(ctx->kru, now_msec, 0, keys[0], TIMED_TESTS_PREFIXES, prices, sizeof(TIMED_TESTS_PREFIXES)); +#else + KRU.limited_multi_or_nobreak(ctx->kru, now_msec, keys, prices, TIMED_TESTS_BATCH_SIZE); +#endif } return NULL; } @@ -235,7 +266,7 @@ void timed_tests() { struct timespec wait_ts = {TIMED_TESTS_WAIT_BEFORE_SEC, 0}; - for (int threads = 1; threads <= TIMED_TESTS_MAX_THREADS; threads++) { + for (int threads = 1; threads <= TIMED_TESTS_MAX_THREADS; threads *= 2) { for (int collide = 0; collide < 2; collide++) { nanosleep(&wait_ts, NULL); printf("%3d threads, %-15s: ", threads, (collide ? "single query" : "unique queries")); diff --git a/src/knot/modules/rrl/kru.h b/src/knot/modules/rrl/kru.h index 17b882710..e728d7a79 100644 --- a/src/knot/modules/rrl/kru.h +++ b/src/knot/modules/rrl/kru.h @@ -6,6 +6,14 @@ #include // FIXME: review the whole header; for now at least the main APIs should appear +#if __GNUC__ >= 4 || __clang_major__ >= 4 + #define ALIGNED_CPU_CACHE __attribute__((aligned(64))) + #define ALIGNED(_bytes) __attribute__((aligned(_bytes))) +#else + #define ALIGNED_CPU_CACHE + #define ALIGNED(_bytes) +#endif + struct kru; /// Usage: KRU.limited(...) @@ -28,20 +36,20 @@ struct kru_api { /// Determine if a key should get limited (and update the KRU). /// key needs to be aligned to a multiple of 16 bytes. - bool (*limited)(struct kru *kru, char key[static const 16], - uint32_t time_now, uint16_t price); + bool (*limited)(struct kru *kru, uint32_t time_now, uint8_t key[static const 16], uint16_t price); + + /// Multiple queries. Returns OR of answers. Updates KRU only if no query is blocked (and possibly on race). + bool (*limited_multi_or)(struct kru *kru, uint32_t time_now, uint8_t **keys, uint16_t *prices, size_t queries_cnt); + + /// Same as previous but without short-circuit evaluation; for time measurement purposes. + bool (*limited_multi_or_nobreak)(struct kru *kru, uint32_t time_now, uint8_t ** keys, uint16_t *prices, size_t queries_cnt); + + /// Multiple queries based on different prefixes of a single key. Returns OR of answers. Updates KRU only if no query is blocked. + /// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace. + bool (*limited_multi_prefix_or)(struct kru *kru, uint32_t time_now, + uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, uint16_t *prices, size_t queries_cnt); }; // The functions are stored this way to make it easier to switch // implementation based on detected CPU. extern struct kru_api KRU; extern const struct kru_api KRU_GENERIC, KRU_AVX2; // for tests only - - -#if __GNUC__ >= 4 || __clang_major__ >= 4 - #define ALIGNED_CPU_CACHE __attribute__((aligned(64))) - #define ALIGNED(_bytes) __attribute__((aligned(_bytes))) -#else - #define ALIGNED_CPU_CACHE - #define ALIGNED(_bytes) -#endif - diff --git a/src/knot/modules/rrl/kru.inc.c b/src/knot/modules/rrl/kru.inc.c index c72b8449c..0094a3d4c 100644 --- a/src/knot/modules/rrl/kru.inc.c +++ b/src/knot/modules/rrl/kru.inc.c @@ -35,7 +35,6 @@ Size (`loads_bits` = log2 length): - The length should probably be at least something like the square of the number of utilized CPUs. But this most likely won't be a limiting factor. */ - #include #include #include @@ -67,8 +66,12 @@ typedef uint64_t hash_t; #define AES_ROUNDS 4 #else #include "contrib/openbsd/siphash.h" + /// 1,3 should be OK choice, probably. TODO: confirm - #define hash(_k, _p, _l) SipHash((_k), 1, 3, (_p), (_l)) + enum { + SIPHASH_RC = 1, + SIPHASH_RF = 3, + }; #endif @@ -77,6 +80,7 @@ typedef uint64_t hash_t; #include #endif + struct kru { #if USE_AES /// Hashing secret. Random but shared by all users of the table. @@ -134,13 +138,21 @@ static struct kru *kru_create(int capacity_log) return kru; } -/// Update limiting and return true iff it hit the limit instead. -static bool kru_limited(struct kru *kru, char key[static const 16], uint32_t time_now, uint16_t price) +struct query_ctx { + struct load_cl *l[TABLE_COUNT]; + uint32_t time_now; + uint16_t price; + uint16_t id; + uint16_t *load; +}; + +/// Phase 1/3 of a query -- hash, prefetch, ctx init. Based on one 16-byte key. +static inline void kru_limited_prefetch(struct kru *kru, uint32_t time_now, uint8_t key[static 16], uint16_t price, struct query_ctx *ctx) { // Obtain hash of *buf. - uint64_t hash; + hash_t hash; #if !USE_AES - hash = hash(&kru->hash_key, key, 16); + hash = SipHash(&kru->hash_key, SIPHASH_RC, SIPHASH_RF, key, 16); #else { __m128i h; /// hashing state @@ -153,107 +165,207 @@ static bool kru_limited(struct kru *kru, char key[static const 16], uint32_t tim } memcpy(&hash, &h, sizeof(hash)); } - //FIXME: gcc 12 is apparently mixing code of hashing with update_time() ?! #endif // Choose the cache-lines to operate on - struct load_cl *l[TABLE_COUNT]; const uint32_t loads_mask = (1 << kru->loads_bits) - 1; // Fetch the two cache-lines in parallel before we really touch them. for (int li = 0; li < TABLE_COUNT; ++li) { - l[li] = &kru->load_cls[hash & loads_mask][li]; - __builtin_prefetch(l[li], 0); // hope for read-only access + struct load_cl * const l = &kru->load_cls[hash & loads_mask][li]; + __builtin_prefetch(l, 0); // hope for read-only access hash >>= kru->loads_bits; + ctx->l[li] = l; } - for (int li = 0; li < TABLE_COUNT; ++li) - update_time(l[li], time_now, &DECAY_32); - uint16_t id = hash; - hash >>= 16; + ctx->time_now = time_now; + ctx->price = price; + ctx->id = hash; +} + + +/// Phase 1/3 of a query -- hash, prefetch, ctx init. Based on a bit prefix of one 16-byte key. +static inline void kru_limited_prefetch_prefix(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t prefix, uint16_t price, struct query_ctx *ctx) +{ + // Obtain hash of *buf. + hash_t hash; + +#if !USE_AES + { + const int rc = SIPHASH_RC, rf = SIPHASH_RF; + + // Hash prefix of key, prefix size, and namespace together. + SIPHASH_CTX hctx; + SipHash_Init(&hctx, &kru->hash_key); + SipHash_Update(&hctx, rc, rf, &namespace, sizeof(namespace)); + SipHash_Update(&hctx, rc, rf, &prefix, sizeof(prefix)); + SipHash_Update(&hctx, rc, rf, key, prefix / 8); + if (prefix % 8) { + const uint8_t masked_byte = key[prefix / 8] & (0xFF00 >> (prefix % 8)); + SipHash_Update(&hctx, rc, rf, &masked_byte, 1); + } + hash = SipHash_End(&hctx, rc, rf); + } +#else + { + + __m128i h; /// hashing state + h = _mm_load_si128((__m128i *)key); + + { // Keep only the prefix. + const uint8_t p = prefix; + + // Prefix mask (1...0) -> little endian byte array (0x00 ... 0x00 0xFF ... 0xFF). + __m128i mask = _mm_set_epi64x( + (p < 64 ? (p == 0 ? 0 : -1ll << (64 - p)) : -1ll), // higher 64 bits (1...) -> second half of byte array (... 0xFF) + (p <= 64 ? 0 : -1ll << (128 - p))); // lower 64 bits (...0) -> first half of byte array (0x00 ...) + + // Swap mask endianness (0x11 ... 0x11 0x00 ... 0x00). + mask = _mm_shuffle_epi8(mask, + _mm_set_epi8(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)); + + // Apply mask. + h = _mm_and_si128(h, mask); + } + + // Now do the the hashing itself. + __m128i *aes_key = (void*)kru->hash_key; + { + // Mix namespace and prefix size into the first aes key. + __m128i aes_key1 = _mm_insert_epi16(_mm_load_si128(aes_key), (namespace << 8) | prefix, 0); + h = _mm_aesenc_si128(h, aes_key1); + } + for (int j = 1; j < AES_ROUNDS; ++j) { + int key_id = j % (sizeof(kru->hash_key) / sizeof(__m128i)); + h = _mm_aesenc_si128(h, _mm_load_si128(&aes_key[key_id])); + } + memcpy(&hash, &h, sizeof(hash)); + } +#endif + + // Choose the cache-lines to operate on + const uint32_t loads_mask = (1 << kru->loads_bits) - 1; + // Fetch the two cache-lines in parallel before we really touch them. + for (int li = 0; li < TABLE_COUNT; ++li) { + struct load_cl * const l = &kru->load_cls[hash & loads_mask][li]; + __builtin_prefetch(l, 0); // hope for read-only access + hash >>= kru->loads_bits; + ctx->l[li] = l; + } + + ctx->time_now = time_now; + ctx->price = price; + ctx->id = hash; +} + +/// Phase 2/3 of a query -- returns answer with no state modification (except update_time). +static inline bool kru_limited_fetch(struct kru *kru, struct query_ctx *ctx) +{ + for (int li = 0; li < TABLE_COUNT; ++li) { + update_time(ctx->l[li], ctx->time_now, &DECAY_32); + } + + const uint16_t id = ctx->id; // Find matching element. Matching 16 bits in addition to loads_bits. - uint16_t *load = NULL; + ctx->load = NULL; #if !USE_AVX2 for (int li = 0; li < TABLE_COUNT; ++li) for (int i = 0; i < LOADS_LEN; ++i) - if (l[li]->ids[i] == id) { - load = &l[li]->loads[i]; + if (ctx->l[li]->ids[i] == id) { + ctx->load = &ctx->l[li]->loads[i]; goto load_found; } #else const __m256i id_v = _mm256_set1_epi16(id); for (int li = 0; li < TABLE_COUNT; ++li) { - static_assert(LOADS_LEN == 15 && sizeof(l[li]->ids[0]) == 2, ""); + static_assert(LOADS_LEN == 15 && sizeof(ctx->l[li]->ids[0]) == 2, ""); // unfortunately we can't use aligned load here - __m256i ids_v = _mm256_loadu_si256(((__m256i *)&l[li]->ids[-1])); + __m256i ids_v = _mm256_loadu_si256((__m256i *)(ctx->l[li]->ids - 1)); __m256i match_mask = _mm256_cmpeq_epi16(ids_v, id_v); if (_mm256_testz_si256(match_mask, match_mask)) continue; // no match of id int index = _bit_scan_reverse(_mm256_movemask_epi8(match_mask)) / 2 - 1; // there's a small possibility that we hit equality only on the -1 index if (index >= 0) { - load = &l[li]->loads[index]; + ctx->load = &ctx->l[li]->loads[index]; goto load_found; } } #endif - // No match, so find position of the smallest load. - int min_li = 0; - int min_i = 0; + return false; + +load_found:; + const uint16_t price = ctx->price; + const uint32_t limit = (1<<16) - price; + return (*ctx->load >= limit); +} + +/// Phase 3/3 of a query -- state update, return value overrides previous answer in case of race. +/// Not needed if blocked by fetch phase. +static inline bool kru_limited_update(struct kru *kru, struct query_ctx *ctx) +{ + _Atomic uint16_t *load_at; + if (!ctx->load) { + // No match, so find position of the smallest load. + int min_li = 0; + int min_i = 0; #if !USE_SSE41 - for (int li = 0; li < TABLE_COUNT; ++li) - for (int i = 0; i < LOADS_LEN; ++i) - if (l[li]->loads[i] < l[min_li]->loads[min_i]) { - min_li = li; - min_i = i; - } + for (int li = 0; li < TABLE_COUNT; ++li) + for (int i = 0; i < LOADS_LEN; ++i) + if (ctx->l[li]->loads[i] < ctx->l[min_li]->loads[min_i]) { + min_li = li; + min_i = i; + } #else - int min_val = 0; - for (int li = 0; li < TABLE_COUNT; ++li) { - // BEWARE: we're relying on the exact memory layout of struct load_cl, - // where the .loads array take 15 16-bit values at the very end. - static_assert((offsetof(struct load_cl, loads) - 2) % 16 == 0, - "bad alignment of struct load_cl::loads"); - static_assert(LOADS_LEN == 15 && sizeof(l[li]->loads[0]) == 2, ""); - __m128i *l_v = ((__m128i *)(&l[li]->loads[-1])); - __m128i l0 = _mm_load_si128(l_v); - __m128i l1 = _mm_load_si128(l_v + 1); - // We want to avoid the first item in l0, so we maximize it. - l0 = _mm_insert_epi16(l0, (1<<16)-1, 0); + int min_val = 0; + for (int li = 0; li < TABLE_COUNT; ++li) { + // BEWARE: we're relying on the exact memory layout of struct load_cl, + // where the .loads array take 15 16-bit values at the very end. + static_assert((offsetof(struct load_cl, loads) - 2) % 16 == 0, + "bad alignment of struct load_cl::loads"); + static_assert(LOADS_LEN == 15 && sizeof(ctx->l[li]->loads[0]) == 2, ""); + __m128i *l_v = (__m128i *)(ctx->l[li]->loads - 1); + __m128i l0 = _mm_load_si128(l_v); + __m128i l1 = _mm_load_si128(l_v + 1); + // We want to avoid the first item in l0, so we maximize it. + // (but this function takes a signed integer, so -1 is the maximum) + l0 = _mm_insert_epi16(l0, -1, 0); - // Only one instruction can find minimum and its position, - // and it works on 8x uint16_t. - __m128i mp0 = _mm_minpos_epu16(l0); - __m128i mp1 = _mm_minpos_epu16(l1); - int min0 = _mm_extract_epi16(mp0, 0); - int min1 = _mm_extract_epi16(mp1, 0); - int min01, min_ix; - if (min0 < min1) { - min01 = min0; - min_ix = _mm_extract_epi16(mp0, 1); - } else { - min01 = min1; - min_ix = 8 + _mm_extract_epi16(mp1, 1); - } + // Only one instruction can find minimum and its position, + // and it works on 8x uint16_t. + __m128i mp0 = _mm_minpos_epu16(l0); + __m128i mp1 = _mm_minpos_epu16(l1); + int min0 = _mm_extract_epi16(mp0, 0); + int min1 = _mm_extract_epi16(mp1, 0); + int min01, min_ix; + if (min0 < min1) { + min01 = min0; + min_ix = _mm_extract_epi16(mp0, 1); + } else { + min01 = min1; + min_ix = 8 + _mm_extract_epi16(mp1, 1); + } - if (li == 0 || min_val > min01) { - min_li = li; - min_i = min_ix; - min_val = min01; + if (li == 0 || min_val > min01) { + min_li = li; + min_i = min_ix; + min_val = min01; + } } - } - // now, min_i (and min_ix) is offset by one due to alignment of .loads - if (min_i != 0) // zero is very unlikely - --min_i; + // now, min_i (and min_ix) is offset by one due to alignment of .loads + if (min_i != 0) // zero is very unlikely + --min_i; #endif - l[min_li]->ids[min_i] = id; - load = &l[min_li]->loads[min_i]; // TODO: goto load_found? -load_found:; + ctx->l[min_li]->ids[min_i] = ctx->id; + load_at = (_Atomic uint16_t *)&ctx->l[min_li]->loads[min_i]; + } else { + load_at = (_Atomic uint16_t *)ctx->load; + } static_assert(ATOMIC_CHAR16_T_LOCK_FREE == 2, "insufficient atomics"); - _Atomic uint16_t *load_at = (_Atomic uint16_t *)load; + const uint16_t price = ctx->price; const uint32_t limit = (1<<16) - price; uint16_t load_orig = atomic_load_explicit(load_at, memory_order_relaxed); do { @@ -264,7 +376,79 @@ load_found:; return false; } +static bool kru_limited_multi_or(struct kru *kru, uint32_t time_now, uint8_t **keys, uint16_t *prices, size_t queries_cnt) +{ + struct query_ctx ctx[queries_cnt]; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch(kru, time_now, keys[i], prices[i], ctx + i); + } + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_fetch(kru, ctx + i)) + return true; + } + bool ret = false; + + for (size_t i = 0; i < queries_cnt; i++) { + ret |= kru_limited_update(kru, ctx + i); + } + + return ret; +} + +static bool kru_limited_multi_or_nobreak(struct kru *kru, uint32_t time_now, uint8_t **keys, uint16_t *prices, size_t queries_cnt) +{ + struct query_ctx ctx[queries_cnt]; + bool ret = false; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch(kru, time_now, keys[i], prices[i], ctx + i); + } + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_fetch(kru, ctx + i)) + ret = true; + } + if (ret) return true; + + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_update(kru, ctx + i)) + ret = true; + } + + return ret; +} + +static bool kru_limited_multi_prefix_or(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, uint16_t *prices, size_t queries_cnt) +{ + struct query_ctx ctx[queries_cnt]; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch_prefix(kru, time_now, namespace, key, prefixes[i], prices[i], ctx + i); + } + + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_fetch(kru, ctx + i)) + return true; + } + + bool ret = false; + for (size_t i = 0; i < queries_cnt; i++) { + ret |= kru_limited_update(kru, ctx + i); + } + + return ret; +} + +/// Update limiting and return true iff it hit the limit instead. +static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 16], uint16_t price) +{ + return kru_limited_multi_or(kru, time_now, &key, &price, 1); +} + #define KRU_API_INITIALIZER { \ .create = kru_create, \ .limited = kru_limited, \ + .limited_multi_or = kru_limited_multi_or, \ + .limited_multi_or_nobreak = kru_limited_multi_or_nobreak, \ + .limited_multi_prefix_or = kru_limited_multi_prefix_or, \ } diff --git a/tests/modules/test_rrl.c b/tests/modules/test_rrl.c index 2ccdc4b34..f8aea3ee0 100644 --- a/tests/modules/test_rrl.c +++ b/tests/modules/test_rrl.c @@ -32,13 +32,16 @@ int fakeclock_gettime(clockid_t clockid, struct timespec *tp); #include -#define RRL_PRICE_LOG 9 // XXX same constant as the hardcoded one in rrl_query function +#define RRL_PRICE_LOG 9 // highest price of all prefixes, same for IPv4 and IPv6 #define RRL_THREADS 8 //#define RRL_SYNC_WITH_REAL_TIME +uint32_t initial_limit(uint16_t price_log) { + return (1 << (16 - price_log)) - 1; +} -// expected limits for parallel test +// expected limits for parallel test (for largest prefix) #define RRL_FIRST_BLOCKED (1 << (16 - RRL_PRICE_LOG)) #define RRL_INITIAL_LIMIT_MIN (RRL_FIRST_BLOCKED - 1) #define RRL_INITIAL_LIMIT_MAX (RRL_INITIAL_LIMIT_MIN + RRL_THREADS - 1) // races may occur on first insertion into table @@ -167,12 +170,26 @@ static void* rrl_runnable(void *arg) } } +int count_passing_queries(rrl_table_t *rrl, int addr_family, char *format, uint32_t min_value, uint32_t max_value, uint32_t max_queries) { + struct sockaddr_storage addr; + char addr_str[40]; + rrl_req_t rq = {0,}; + + for (size_t i = 0; i < max_queries; i++) { + snprintf(addr_str, sizeof(addr_str), format, i % (max_value - min_value + 1) + min_value); + sockaddr_set(&addr, addr_family, addr_str, 0); + if (rrl_query(rrl, &addr, &rq, NULL, NULL) != KNOT_EOK) { + return i; + } + } + return -1; +} void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) { fakeclock_init(); - /* 1. create rrl table */ + /* create rrl table */ rrl_table_t *rrl = rrl_create(1, 1); // XXX parameters ignored ok(rrl != NULL, "rrl(%s): create", impl_name); @@ -187,7 +204,7 @@ void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) { } - /* 2. N unlimited requests. */ + /* N unlimited requests. */ struct sockaddr_storage addr; struct sockaddr_storage addr6; sockaddr_set(&addr, AF_INET, "1.2.3.4", 0); @@ -202,24 +219,76 @@ void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) { } is_int(0, ret, "rrl(%s): unlimited IPv4/v6 requests", impl_name); - /* 3. limited request */ + /* limited request */ ret = rrl_query(rrl, &addr, &rq, zone, NULL); is_int(KNOT_ELIMIT, ret, "rrl(%s): blocked IPv4 request", impl_name); - /* 4. limited IPv6 request */ + /* limited IPv6 request */ ret = rrl_query(rrl, &addr6, &rq, zone, NULL); is_int(KNOT_ELIMIT, ret, "rrl(%s): blocked IPv6 request", impl_name); - /* 5. unblocked request */ + /* different namespaces for IPv4 and IPv6 */ + ret = count_passing_queries(rrl, AF_INET6, "0102:0304:%x::", 0,0xffff, 1 << 16); + is_int(initial_limit(0), ret, "rrl(%s): different namespaces for IPv4 and IPv6 (needs limit on IPv6 /32)", impl_name); + + /* unblocked request */ fakeclock_tick = 32; ret = rrl_query(rrl, &addr, &rq, zone, NULL); is_int(KNOT_EOK, ret, "rrl(%s): unblocked IPv4 request", impl_name); - /* 6. unblocked IPv6 request */ + /* unblocked IPv6 request */ ret = rrl_query(rrl, &addr6, &rq, zone, NULL); is_int(KNOT_EOK, ret, "rrl(%s): unblocked IPv6 request", impl_name); - /* 7+. parallel tests */ + /* IPv4 multi-prefix tests */ + int limit = initial_limit(9); + ret = count_passing_queries(rrl, AF_INET, "128.0.0.0", 0,0, 1 << 16); + is_int(limit, ret, "rrl(%s): IPv4 limit for /32", impl_name); + + ret = count_passing_queries(rrl, AF_INET, "128.0.0.1", 0,0, 1); + is_int(-1, ret, "rrl(%s): IPv4 limit for /32 not applied for /31", impl_name); + + limit = initial_limit(7) - initial_limit(9) - 1; + ret = count_passing_queries(rrl, AF_INET, "128.0.0.%d", 2,15, 1 << 16); + is_int(limit, ret, "rrl(%s): IPv4 limit for /28", impl_name); + + ret = count_passing_queries(rrl, AF_INET, "128.0.0.16", 0,0, 1); + is_int(-1, ret, "rrl(%s): IPv4 limit for /28 not applied for /27", impl_name); + + limit = initial_limit(5) - initial_limit(7) - 1; + ret = count_passing_queries(rrl, AF_INET, "128.0.0.%d", 17,255, 1 << 16); + is_int(limit, ret, "rrl(%s): IPv4 limit for /24", impl_name); + + ret = count_passing_queries(rrl, AF_INET, "128.0.1.0", 1,1, 1); + is_int(-1, ret, "rrl(%s): IPv4 limit for /24 not applied for /23", impl_name); + + + /* IPv6 multi-prefix tests */ + limit = initial_limit(9); + ret = count_passing_queries(rrl, AF_INET6, "8000::", 0,0, 1 << 16); + is_int(limit, ret, "rrl(%s): IPv6 limit for /128", impl_name); + + ret = count_passing_queries(rrl, AF_INET6, "8000::1", 0,0, 1); + is_int(-1, ret, "rrl(%s): IPv6 limit for /128 not applied for /127", impl_name); + + limit = initial_limit(7) - initial_limit(9) - 1; + ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:0:%02x00::", 0x01,0xff, 1 << 16); + is_int(limit, ret, "rrl(%s): IPv6 limit for /64", impl_name); + + ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:1::", 0,0, 1); + is_int(-1, ret, "rrl(%s): IPv6 limit for /64 not applied for /63", impl_name); + + limit = initial_limit(5) - initial_limit(7) - 1; + ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:00%02x::", 0x01,0xff, 1 << 16); + is_int(limit, ret, "rrl(%s): IPv6 limit for /56", impl_name); + + ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:0100", 1,1, 1); + is_int(-1, ret, "rrl(%s): IPv6 limit for /56 not applied for /55", impl_name); + + // prefix /32 is not tested here + + + /* parallel tests */ struct stage stages[] = { /* first tick, last tick, hosts */ {32, 32, { @@ -280,7 +349,7 @@ void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) { uint32_t ticks = stages[si].last_tick - stages[si].first_tick + 1; for (size_t i = 0; h[i].queries_per_tick; i++) { ok( h[i].min_passed * ticks <= h[i].passed && h[i].passed <= h[i].max_passed * ticks, - "rrl(%s): stage %d, addr %s: %.2f <= %.4f <= %.2f", impl_name, si, h[i].addr_format, h[i].min_passed, (double)h[i].passed / ticks, h[i].max_passed); + "rrl(%s): parallel stage %d, addr %s: %.2f <= %.4f <= %.2f", impl_name, si, h[i].addr_format, h[i].min_passed, (double)h[i].passed / ticks, h[i].max_passed); } } while (stages[++si].first_tick);