rrl: multi-prefix queries

WIP: batch queries

WIP: multi-prefix queries with unit tests, unoptimized

WIP: optimize multi-prefix queries

WIP: nits

- avoid -Warray-bounds for array[-1]
  I thought it was more readable that way, but warnings would be annoying.
- use `hash_t` more
- drop old FIXME: I believe it was just confused symbols, not code

WIP: deduplicate the choice of SipHash parameters
This commit is contained in:
Lukáš Ondráček 2024-01-17 14:29:11 +01:00 committed by Daniel Salzman
parent 7f488eaa17
commit 627777d643
6 changed files with 423 additions and 234 deletions

View file

@ -25,134 +25,22 @@
#include "libdnssec/error.h"
#include "libdnssec/random.h"
/* Limits (class, ipv6 remote, FIXME dname) */
#define RRL_CLSBLK_MAXLEN 16
/* CIDR block prefix lengths for v4/v6 */
#define RRL_V4_PREFIX_LEN 3 /* /24 */
#define RRL_V6_PREFIX_LEN 7 /* /56 */
/* Defaults */
#define RRL_PSIZE_LARGE 1024
// Hardcoded also in unit tests.
/* Classification */
enum {
CLS_NULL = 0 << 0, /* Empty bucket. */
CLS_NORMAL = 1 << 0, /* Normal response. */
CLS_ERROR = 1 << 1, /* Error response. */
CLS_NXDOMAIN = 1 << 2, /* NXDOMAIN (special case of error). */
CLS_EMPTY = 1 << 3, /* Empty response. */
CLS_LARGE = 1 << 4, /* Response size over threshold (1024k). */
CLS_WILDCARD = 1 << 5, /* Wildcard query. */
CLS_ANY = 1 << 6, /* ANY query (spec. class). */
CLS_DNSSEC = 1 << 7 /* DNSSEC related RR query (spec. class) */
};
#define RRL_V4_PREFIXES (uint8_t[]) { 24, 28, 32}
#define RRL_V4_PRICES (uint16_t[]) { 1 << 5, 1 << 7, 1 << 9}
/* Classification string. */
struct cls_name {
int code;
const char *name;
};
#define RRL_V6_PREFIXES (uint8_t[]) { 32, 56, 64, 128}
#define RRL_V6_PRICES (uint16_t[]) { 1 << 0, 1 << 5, 1 << 7, 1 << 9}
static const struct cls_name rrl_cls_names[] = {
{ CLS_NORMAL, "POSITIVE" },
{ CLS_ERROR, "ERROR" },
{ CLS_NXDOMAIN, "NXDOMAIN"},
{ CLS_EMPTY, "EMPTY"},
{ CLS_LARGE, "LARGE"},
{ CLS_WILDCARD, "WILDCARD"},
{ CLS_ANY, "ANY"},
{ CLS_DNSSEC, "DNSSEC"},
{ CLS_NULL, "NULL"},
{ CLS_NULL, NULL}
};
#define RRL_V4_PREFIXES_CNT (sizeof(RRL_V4_PREFIXES) / sizeof(*RRL_V4_PREFIXES))
#define RRL_V6_PREFIXES_CNT (sizeof(RRL_V6_PREFIXES) / sizeof(*RRL_V6_PREFIXES))
#define RRL_MAX_PREFIXES_CNT ((RRL_V4_PREFIXES_CNT > RRL_V6_PREFIXES_CNT) ? RRL_V4_PREFIXES_CNT : RRL_V6_PREFIXES_CNT)
static inline const char *rrl_clsstr(int code)
{
for (const struct cls_name *c = rrl_cls_names; c->name; c++) {
if (c->code == code) {
return c->name;
}
}
return "unknown class";
}
/* Bucket flags. */
enum {
RRL_BF_NULL = 0 << 0, /* No flags. */
RRL_BF_SSTART = 1 << 0, /* Bucket in slow-start after collision. */
RRL_BF_ELIMIT = 1 << 1 /* Bucket is rate-limited. */
};
static uint8_t rrl_clsid(rrl_req_t *p)
{
/* Check error code */
int ret = CLS_NULL;
switch (knot_wire_get_rcode(p->wire)) {
case KNOT_RCODE_NOERROR: ret = CLS_NORMAL; break;
case KNOT_RCODE_NXDOMAIN: return CLS_NXDOMAIN; break;
default: return CLS_ERROR; break;
}
/* Check if answered from a qname */
if (ret == CLS_NORMAL && p->flags & RRL_REQ_WILDCARD) {
return CLS_WILDCARD;
}
/* Check query type for spec. classes. */
if (p->query) {
switch(knot_pkt_qtype(p->query)) {
case KNOT_RRTYPE_ANY: /* ANY spec. class */
return CLS_ANY;
break;
case KNOT_RRTYPE_DNSKEY:
case KNOT_RRTYPE_RRSIG:
case KNOT_RRTYPE_DS: /* DNSSEC-related RR class. */
return CLS_DNSSEC;
break;
default:
break;
}
}
/* Check packet size for threshold. */
if (p->len >= RRL_PSIZE_LARGE) {
return CLS_LARGE;
}
/* Check ancount */
if (knot_wire_get_ancount(p->wire) == 0) {
return CLS_EMPTY;
}
return ret;
}
static int rrl_classify(uint8_t *dst, size_t maxlen, const struct sockaddr_storage *remote,
rrl_req_t *req, const knot_dname_t *name)
{
/* Class */
uint8_t cls = rrl_clsid(req);
*dst = cls;
int blklen = sizeof(cls);
/* Address (in network byteorder, adjust masks). */
uint64_t netblk = 0;
if (remote->ss_family == AF_INET6) {
struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)remote;
memcpy(&netblk, &ipv6->sin6_addr, RRL_V6_PREFIX_LEN);
} else {
struct sockaddr_in *ipv4 = (struct sockaddr_in *)remote;
memcpy(&netblk, &ipv4->sin_addr, RRL_V4_PREFIX_LEN);
}
memcpy(dst + blklen, &netblk, sizeof(netblk));
blklen += sizeof(netblk);
/* Name not considered anymore. */
return blklen;
}
static void subnet_tostr(char *dst, size_t maxlen, const struct sockaddr_storage *ss)
/*
static void subnet_tostr(char *dst, size_t maxlen, const struct sockaddr_storage *ss) // TODO remove or adapt
{
const void *addr;
const char *suffix;
@ -196,6 +84,7 @@ static void rrl_log_state(knotd_mod_t *mod, const struct sockaddr_storage *ss,
knotd_mod_log(mod, LOG_NOTICE, "address/subnet %s, class %s, qname %s, %s limiting",
addr_str, rrl_clsstr(cls), qname_str, what);
}
*/
rrl_table_t *rrl_create(size_t size, uint32_t rate)
{
@ -218,17 +107,25 @@ int rrl_query(rrl_table_t *rrl, const struct sockaddr_storage *remote,
return KNOT_EINVAL;
}
uint8_t buf[RRL_CLSBLK_MAXLEN] ALIGNED(16) = { 0 };
size_t buf_len = rrl_classify(buf, RRL_CLSBLK_MAXLEN, remote, req, zone);
if (buf_len < 0) {
return KNOT_ERROR;
}
struct timespec now_ts = {0};
clock_gettime(CLOCK_MONOTONIC_COARSE, &now_ts);
uint32_t now = now_ts.tv_sec * 1000 + now_ts.tv_nsec / 1000000;
return KRU.limited(rrl, (char *)buf, now, 1<<9) ? KNOT_ELIMIT : KNOT_EOK; // TODO set price
uint8_t key[16] ALIGNED(16) = {0, };
if (remote->ss_family == AF_INET6) {
struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)remote;
memcpy(key, &ipv6->sin6_addr, 16);
return KRU.limited_multi_prefix_or(rrl, now, 1, key, RRL_V6_PREFIXES, RRL_V6_PRICES, RRL_V6_PREFIXES_CNT)
? KNOT_ELIMIT : KNOT_EOK;
} else {
struct sockaddr_in *ipv4 = (struct sockaddr_in *)remote;
memcpy(key, &ipv4->sin_addr, 4);
return KRU.limited_multi_prefix_or(rrl, now, 0, key, RRL_V4_PREFIXES, RRL_V4_PRICES, RRL_V4_PREFIXES_CNT)
? KNOT_ELIMIT : KNOT_EOK;
}
}
bool rrl_slip_roll(int n_slip)

View file

@ -16,7 +16,7 @@ struct decay_config {
};
/// Catch up the time drift with configurably slower decay.
static void update_time(struct load_cl *l, const uint32_t time_now,
static inline void update_time(struct load_cl *l, const uint32_t time_now,
const struct decay_config *decay)
{
uint32_t ticks;

View file

@ -72,11 +72,11 @@ void test_stage(struct test_ctx *ctx, uint32_t dur) {
size_t cat;
for (cat = 0; freq_bounds[cat] <= rnd; cat++);
uint64_t key[2] = {0,};
uint64_t key[2] ALIGNED(16) = {0};
key[0] = random() % (ctx->cats[cat].id_max - ctx->cats[cat].id_min + 1) + ctx->cats[cat].id_min;
ctx->cats[cat].total++;
ctx->cats[cat].passed += !KRU.limited(ctx->kru, (char *)key, ctx->time, ctx->price);
ctx->cats[cat].passed += !KRU.limited(ctx->kru, ctx->time, (uint8_t *)key, ctx->price);
}
}
}
@ -198,11 +198,15 @@ void test_multi_attackers(void) {
/*=== benchmarking time performance ===*/
#define TIMED_TESTS_TABLE_SIZE_LOG 16
#define TIMED_TESTS_PRICE (1 << 9)
#define TIMED_TESTS_QUERIES (1 << 26) // 28
#define TIMED_TESTS_MAX_THREADS 12
#define TIMED_TESTS_WAIT_BEFORE_SEC 2 // 60
#define TIMED_TESTS_TABLE_SIZE_LOG 16
#define TIMED_TESTS_PRICE (1 << 9)
#define TIMED_TESTS_QUERIES (1 << 26)
#define TIMED_TESTS_TIME_UPDATE_PERIOD 4
#define TIMED_TESTS_MAX_THREADS 64
#define TIMED_TESTS_WAIT_BEFORE_SEC 2
#define TIMED_TESTS_BATCH_SIZE 1 // each query still counted individually in MQPS; should be set to 1 if PREFIXES are set
#define TIMED_TESTS_PREFIXES (uint8_t []){64, 65, 66, 67} // one query contains all prefixes, MQPS is lowered
struct timed_test_ctx {
struct kru *kru;
@ -213,15 +217,42 @@ struct timed_test_ctx {
void *timed_runnable(void *arg) {
struct timed_test_ctx *ctx = arg;
uint64_t key[2] = {0,};
for (uint64_t i = ctx->first_query; i < TIMED_TESTS_QUERIES; i += ctx->increment) {
struct timespec now_ts = {0};
clock_gettime(CLOCK_MONOTONIC_COARSE, &now_ts);
uint32_t now_msec = now_ts.tv_sec * 1000 + now_ts.tv_nsec / 1000000;
struct timespec now_ts = {0};
uint32_t now_msec = 0;
uint64_t now_last_update = -TIMED_TESTS_TIME_UPDATE_PERIOD * ctx->increment;
key[0] = i * ctx->key_mult;
KRU.limited(ctx->kru, (char *)key, now_msec, TIMED_TESTS_PRICE);
#ifdef TIMED_TESTS_PREFIXES
uint16_t prices[sizeof(TIMED_TESTS_PREFIXES)];
#else
uint16_t prices[TIMED_TESTS_BATCH_SIZE];
#endif
for (size_t j = 0; j < sizeof(prices)/sizeof(*prices); j++) {
prices[j] = TIMED_TESTS_PRICE;
}
for (uint64_t i = ctx->first_query; i < TIMED_TESTS_QUERIES; ) {
if (i >= now_last_update + TIMED_TESTS_TIME_UPDATE_PERIOD * ctx->increment) {
clock_gettime(CLOCK_MONOTONIC_COARSE, &now_ts);
now_msec = now_ts.tv_sec * 1000 + now_ts.tv_nsec / 1000000;
now_last_update = i;
}
uint64_t key_values[TIMED_TESTS_BATCH_SIZE * 2] = {0,};
uint8_t *keys[TIMED_TESTS_BATCH_SIZE];
for (size_t j = 0; j < TIMED_TESTS_BATCH_SIZE; j++) {
key_values[2 * j] = i * ctx->key_mult;
key_values[2 * j + 1] = 0xFFFFFFFFFFFFFFFFll;
keys[j] = (uint8_t *)(key_values + 2 * j);
i += ctx->increment;
}
#ifdef TIMED_TESTS_PREFIXES
KRU.limited_multi_prefix_or(ctx->kru, now_msec, 0, keys[0], TIMED_TESTS_PREFIXES, prices, sizeof(TIMED_TESTS_PREFIXES));
#else
KRU.limited_multi_or_nobreak(ctx->kru, now_msec, keys, prices, TIMED_TESTS_BATCH_SIZE);
#endif
}
return NULL;
}
@ -235,7 +266,7 @@ void timed_tests() {
struct timespec wait_ts = {TIMED_TESTS_WAIT_BEFORE_SEC, 0};
for (int threads = 1; threads <= TIMED_TESTS_MAX_THREADS; threads++) {
for (int threads = 1; threads <= TIMED_TESTS_MAX_THREADS; threads *= 2) {
for (int collide = 0; collide < 2; collide++) {
nanosleep(&wait_ts, NULL);
printf("%3d threads, %-15s: ", threads, (collide ? "single query" : "unique queries"));

View file

@ -6,6 +6,14 @@
#include <stdint.h>
// FIXME: review the whole header; for now at least the main APIs should appear
#if __GNUC__ >= 4 || __clang_major__ >= 4
#define ALIGNED_CPU_CACHE __attribute__((aligned(64)))
#define ALIGNED(_bytes) __attribute__((aligned(_bytes)))
#else
#define ALIGNED_CPU_CACHE
#define ALIGNED(_bytes)
#endif
struct kru;
/// Usage: KRU.limited(...)
@ -28,20 +36,20 @@ struct kru_api {
/// Determine if a key should get limited (and update the KRU).
/// key needs to be aligned to a multiple of 16 bytes.
bool (*limited)(struct kru *kru, char key[static const 16],
uint32_t time_now, uint16_t price);
bool (*limited)(struct kru *kru, uint32_t time_now, uint8_t key[static const 16], uint16_t price);
/// Multiple queries. Returns OR of answers. Updates KRU only if no query is blocked (and possibly on race).
bool (*limited_multi_or)(struct kru *kru, uint32_t time_now, uint8_t **keys, uint16_t *prices, size_t queries_cnt);
/// Same as previous but without short-circuit evaluation; for time measurement purposes.
bool (*limited_multi_or_nobreak)(struct kru *kru, uint32_t time_now, uint8_t ** keys, uint16_t *prices, size_t queries_cnt);
/// Multiple queries based on different prefixes of a single key. Returns OR of answers. Updates KRU only if no query is blocked.
/// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace.
bool (*limited_multi_prefix_or)(struct kru *kru, uint32_t time_now,
uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, uint16_t *prices, size_t queries_cnt);
};
// The functions are stored this way to make it easier to switch
// implementation based on detected CPU.
extern struct kru_api KRU;
extern const struct kru_api KRU_GENERIC, KRU_AVX2; // for tests only
#if __GNUC__ >= 4 || __clang_major__ >= 4
#define ALIGNED_CPU_CACHE __attribute__((aligned(64)))
#define ALIGNED(_bytes) __attribute__((aligned(_bytes)))
#else
#define ALIGNED_CPU_CACHE
#define ALIGNED(_bytes)
#endif

View file

@ -35,7 +35,6 @@ Size (`loads_bits` = log2 length):
- The length should probably be at least something like the square of the number of utilized CPUs.
But this most likely won't be a limiting factor.
*/
#include <stdlib.h>
#include <assert.h>
#include <stdatomic.h>
@ -67,8 +66,12 @@ typedef uint64_t hash_t;
#define AES_ROUNDS 4
#else
#include "contrib/openbsd/siphash.h"
/// 1,3 should be OK choice, probably. TODO: confirm
#define hash(_k, _p, _l) SipHash((_k), 1, 3, (_p), (_l))
enum {
SIPHASH_RC = 1,
SIPHASH_RF = 3,
};
#endif
@ -77,6 +80,7 @@ typedef uint64_t hash_t;
#include <x86intrin.h>
#endif
struct kru {
#if USE_AES
/// Hashing secret. Random but shared by all users of the table.
@ -134,13 +138,21 @@ static struct kru *kru_create(int capacity_log)
return kru;
}
/// Update limiting and return true iff it hit the limit instead.
static bool kru_limited(struct kru *kru, char key[static const 16], uint32_t time_now, uint16_t price)
struct query_ctx {
struct load_cl *l[TABLE_COUNT];
uint32_t time_now;
uint16_t price;
uint16_t id;
uint16_t *load;
};
/// Phase 1/3 of a query -- hash, prefetch, ctx init. Based on one 16-byte key.
static inline void kru_limited_prefetch(struct kru *kru, uint32_t time_now, uint8_t key[static 16], uint16_t price, struct query_ctx *ctx)
{
// Obtain hash of *buf.
uint64_t hash;
hash_t hash;
#if !USE_AES
hash = hash(&kru->hash_key, key, 16);
hash = SipHash(&kru->hash_key, SIPHASH_RC, SIPHASH_RF, key, 16);
#else
{
__m128i h; /// hashing state
@ -153,107 +165,207 @@ static bool kru_limited(struct kru *kru, char key[static const 16], uint32_t tim
}
memcpy(&hash, &h, sizeof(hash));
}
//FIXME: gcc 12 is apparently mixing code of hashing with update_time() ?!
#endif
// Choose the cache-lines to operate on
struct load_cl *l[TABLE_COUNT];
const uint32_t loads_mask = (1 << kru->loads_bits) - 1;
// Fetch the two cache-lines in parallel before we really touch them.
for (int li = 0; li < TABLE_COUNT; ++li) {
l[li] = &kru->load_cls[hash & loads_mask][li];
__builtin_prefetch(l[li], 0); // hope for read-only access
struct load_cl * const l = &kru->load_cls[hash & loads_mask][li];
__builtin_prefetch(l, 0); // hope for read-only access
hash >>= kru->loads_bits;
ctx->l[li] = l;
}
for (int li = 0; li < TABLE_COUNT; ++li)
update_time(l[li], time_now, &DECAY_32);
uint16_t id = hash;
hash >>= 16;
ctx->time_now = time_now;
ctx->price = price;
ctx->id = hash;
}
/// Phase 1/3 of a query -- hash, prefetch, ctx init. Based on a bit prefix of one 16-byte key.
static inline void kru_limited_prefetch_prefix(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t prefix, uint16_t price, struct query_ctx *ctx)
{
// Obtain hash of *buf.
hash_t hash;
#if !USE_AES
{
const int rc = SIPHASH_RC, rf = SIPHASH_RF;
// Hash prefix of key, prefix size, and namespace together.
SIPHASH_CTX hctx;
SipHash_Init(&hctx, &kru->hash_key);
SipHash_Update(&hctx, rc, rf, &namespace, sizeof(namespace));
SipHash_Update(&hctx, rc, rf, &prefix, sizeof(prefix));
SipHash_Update(&hctx, rc, rf, key, prefix / 8);
if (prefix % 8) {
const uint8_t masked_byte = key[prefix / 8] & (0xFF00 >> (prefix % 8));
SipHash_Update(&hctx, rc, rf, &masked_byte, 1);
}
hash = SipHash_End(&hctx, rc, rf);
}
#else
{
__m128i h; /// hashing state
h = _mm_load_si128((__m128i *)key);
{ // Keep only the prefix.
const uint8_t p = prefix;
// Prefix mask (1...0) -> little endian byte array (0x00 ... 0x00 0xFF ... 0xFF).
__m128i mask = _mm_set_epi64x(
(p < 64 ? (p == 0 ? 0 : -1ll << (64 - p)) : -1ll), // higher 64 bits (1...) -> second half of byte array (... 0xFF)
(p <= 64 ? 0 : -1ll << (128 - p))); // lower 64 bits (...0) -> first half of byte array (0x00 ...)
// Swap mask endianness (0x11 ... 0x11 0x00 ... 0x00).
mask = _mm_shuffle_epi8(mask,
_mm_set_epi8(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15));
// Apply mask.
h = _mm_and_si128(h, mask);
}
// Now do the the hashing itself.
__m128i *aes_key = (void*)kru->hash_key;
{
// Mix namespace and prefix size into the first aes key.
__m128i aes_key1 = _mm_insert_epi16(_mm_load_si128(aes_key), (namespace << 8) | prefix, 0);
h = _mm_aesenc_si128(h, aes_key1);
}
for (int j = 1; j < AES_ROUNDS; ++j) {
int key_id = j % (sizeof(kru->hash_key) / sizeof(__m128i));
h = _mm_aesenc_si128(h, _mm_load_si128(&aes_key[key_id]));
}
memcpy(&hash, &h, sizeof(hash));
}
#endif
// Choose the cache-lines to operate on
const uint32_t loads_mask = (1 << kru->loads_bits) - 1;
// Fetch the two cache-lines in parallel before we really touch them.
for (int li = 0; li < TABLE_COUNT; ++li) {
struct load_cl * const l = &kru->load_cls[hash & loads_mask][li];
__builtin_prefetch(l, 0); // hope for read-only access
hash >>= kru->loads_bits;
ctx->l[li] = l;
}
ctx->time_now = time_now;
ctx->price = price;
ctx->id = hash;
}
/// Phase 2/3 of a query -- returns answer with no state modification (except update_time).
static inline bool kru_limited_fetch(struct kru *kru, struct query_ctx *ctx)
{
for (int li = 0; li < TABLE_COUNT; ++li) {
update_time(ctx->l[li], ctx->time_now, &DECAY_32);
}
const uint16_t id = ctx->id;
// Find matching element. Matching 16 bits in addition to loads_bits.
uint16_t *load = NULL;
ctx->load = NULL;
#if !USE_AVX2
for (int li = 0; li < TABLE_COUNT; ++li)
for (int i = 0; i < LOADS_LEN; ++i)
if (l[li]->ids[i] == id) {
load = &l[li]->loads[i];
if (ctx->l[li]->ids[i] == id) {
ctx->load = &ctx->l[li]->loads[i];
goto load_found;
}
#else
const __m256i id_v = _mm256_set1_epi16(id);
for (int li = 0; li < TABLE_COUNT; ++li) {
static_assert(LOADS_LEN == 15 && sizeof(l[li]->ids[0]) == 2, "");
static_assert(LOADS_LEN == 15 && sizeof(ctx->l[li]->ids[0]) == 2, "");
// unfortunately we can't use aligned load here
__m256i ids_v = _mm256_loadu_si256(((__m256i *)&l[li]->ids[-1]));
__m256i ids_v = _mm256_loadu_si256((__m256i *)(ctx->l[li]->ids - 1));
__m256i match_mask = _mm256_cmpeq_epi16(ids_v, id_v);
if (_mm256_testz_si256(match_mask, match_mask))
continue; // no match of id
int index = _bit_scan_reverse(_mm256_movemask_epi8(match_mask)) / 2 - 1;
// there's a small possibility that we hit equality only on the -1 index
if (index >= 0) {
load = &l[li]->loads[index];
ctx->load = &ctx->l[li]->loads[index];
goto load_found;
}
}
#endif
// No match, so find position of the smallest load.
int min_li = 0;
int min_i = 0;
return false;
load_found:;
const uint16_t price = ctx->price;
const uint32_t limit = (1<<16) - price;
return (*ctx->load >= limit);
}
/// Phase 3/3 of a query -- state update, return value overrides previous answer in case of race.
/// Not needed if blocked by fetch phase.
static inline bool kru_limited_update(struct kru *kru, struct query_ctx *ctx)
{
_Atomic uint16_t *load_at;
if (!ctx->load) {
// No match, so find position of the smallest load.
int min_li = 0;
int min_i = 0;
#if !USE_SSE41
for (int li = 0; li < TABLE_COUNT; ++li)
for (int i = 0; i < LOADS_LEN; ++i)
if (l[li]->loads[i] < l[min_li]->loads[min_i]) {
min_li = li;
min_i = i;
}
for (int li = 0; li < TABLE_COUNT; ++li)
for (int i = 0; i < LOADS_LEN; ++i)
if (ctx->l[li]->loads[i] < ctx->l[min_li]->loads[min_i]) {
min_li = li;
min_i = i;
}
#else
int min_val = 0;
for (int li = 0; li < TABLE_COUNT; ++li) {
// BEWARE: we're relying on the exact memory layout of struct load_cl,
// where the .loads array take 15 16-bit values at the very end.
static_assert((offsetof(struct load_cl, loads) - 2) % 16 == 0,
"bad alignment of struct load_cl::loads");
static_assert(LOADS_LEN == 15 && sizeof(l[li]->loads[0]) == 2, "");
__m128i *l_v = ((__m128i *)(&l[li]->loads[-1]));
__m128i l0 = _mm_load_si128(l_v);
__m128i l1 = _mm_load_si128(l_v + 1);
// We want to avoid the first item in l0, so we maximize it.
l0 = _mm_insert_epi16(l0, (1<<16)-1, 0);
int min_val = 0;
for (int li = 0; li < TABLE_COUNT; ++li) {
// BEWARE: we're relying on the exact memory layout of struct load_cl,
// where the .loads array take 15 16-bit values at the very end.
static_assert((offsetof(struct load_cl, loads) - 2) % 16 == 0,
"bad alignment of struct load_cl::loads");
static_assert(LOADS_LEN == 15 && sizeof(ctx->l[li]->loads[0]) == 2, "");
__m128i *l_v = (__m128i *)(ctx->l[li]->loads - 1);
__m128i l0 = _mm_load_si128(l_v);
__m128i l1 = _mm_load_si128(l_v + 1);
// We want to avoid the first item in l0, so we maximize it.
// (but this function takes a signed integer, so -1 is the maximum)
l0 = _mm_insert_epi16(l0, -1, 0);
// Only one instruction can find minimum and its position,
// and it works on 8x uint16_t.
__m128i mp0 = _mm_minpos_epu16(l0);
__m128i mp1 = _mm_minpos_epu16(l1);
int min0 = _mm_extract_epi16(mp0, 0);
int min1 = _mm_extract_epi16(mp1, 0);
int min01, min_ix;
if (min0 < min1) {
min01 = min0;
min_ix = _mm_extract_epi16(mp0, 1);
} else {
min01 = min1;
min_ix = 8 + _mm_extract_epi16(mp1, 1);
}
// Only one instruction can find minimum and its position,
// and it works on 8x uint16_t.
__m128i mp0 = _mm_minpos_epu16(l0);
__m128i mp1 = _mm_minpos_epu16(l1);
int min0 = _mm_extract_epi16(mp0, 0);
int min1 = _mm_extract_epi16(mp1, 0);
int min01, min_ix;
if (min0 < min1) {
min01 = min0;
min_ix = _mm_extract_epi16(mp0, 1);
} else {
min01 = min1;
min_ix = 8 + _mm_extract_epi16(mp1, 1);
}
if (li == 0 || min_val > min01) {
min_li = li;
min_i = min_ix;
min_val = min01;
if (li == 0 || min_val > min01) {
min_li = li;
min_i = min_ix;
min_val = min01;
}
}
}
// now, min_i (and min_ix) is offset by one due to alignment of .loads
if (min_i != 0) // zero is very unlikely
--min_i;
// now, min_i (and min_ix) is offset by one due to alignment of .loads
if (min_i != 0) // zero is very unlikely
--min_i;
#endif
l[min_li]->ids[min_i] = id;
load = &l[min_li]->loads[min_i]; // TODO: goto load_found?
load_found:;
ctx->l[min_li]->ids[min_i] = ctx->id;
load_at = (_Atomic uint16_t *)&ctx->l[min_li]->loads[min_i];
} else {
load_at = (_Atomic uint16_t *)ctx->load;
}
static_assert(ATOMIC_CHAR16_T_LOCK_FREE == 2, "insufficient atomics");
_Atomic uint16_t *load_at = (_Atomic uint16_t *)load;
const uint16_t price = ctx->price;
const uint32_t limit = (1<<16) - price;
uint16_t load_orig = atomic_load_explicit(load_at, memory_order_relaxed);
do {
@ -264,7 +376,79 @@ load_found:;
return false;
}
static bool kru_limited_multi_or(struct kru *kru, uint32_t time_now, uint8_t **keys, uint16_t *prices, size_t queries_cnt)
{
struct query_ctx ctx[queries_cnt];
for (size_t i = 0; i < queries_cnt; i++) {
kru_limited_prefetch(kru, time_now, keys[i], prices[i], ctx + i);
}
for (size_t i = 0; i < queries_cnt; i++) {
if (kru_limited_fetch(kru, ctx + i))
return true;
}
bool ret = false;
for (size_t i = 0; i < queries_cnt; i++) {
ret |= kru_limited_update(kru, ctx + i);
}
return ret;
}
static bool kru_limited_multi_or_nobreak(struct kru *kru, uint32_t time_now, uint8_t **keys, uint16_t *prices, size_t queries_cnt)
{
struct query_ctx ctx[queries_cnt];
bool ret = false;
for (size_t i = 0; i < queries_cnt; i++) {
kru_limited_prefetch(kru, time_now, keys[i], prices[i], ctx + i);
}
for (size_t i = 0; i < queries_cnt; i++) {
if (kru_limited_fetch(kru, ctx + i))
ret = true;
}
if (ret) return true;
for (size_t i = 0; i < queries_cnt; i++) {
if (kru_limited_update(kru, ctx + i))
ret = true;
}
return ret;
}
static bool kru_limited_multi_prefix_or(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, uint16_t *prices, size_t queries_cnt)
{
struct query_ctx ctx[queries_cnt];
for (size_t i = 0; i < queries_cnt; i++) {
kru_limited_prefetch_prefix(kru, time_now, namespace, key, prefixes[i], prices[i], ctx + i);
}
for (size_t i = 0; i < queries_cnt; i++) {
if (kru_limited_fetch(kru, ctx + i))
return true;
}
bool ret = false;
for (size_t i = 0; i < queries_cnt; i++) {
ret |= kru_limited_update(kru, ctx + i);
}
return ret;
}
/// Update limiting and return true iff it hit the limit instead.
static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 16], uint16_t price)
{
return kru_limited_multi_or(kru, time_now, &key, &price, 1);
}
#define KRU_API_INITIALIZER { \
.create = kru_create, \
.limited = kru_limited, \
.limited_multi_or = kru_limited_multi_or, \
.limited_multi_or_nobreak = kru_limited_multi_or_nobreak, \
.limited_multi_prefix_or = kru_limited_multi_prefix_or, \
}

View file

@ -32,13 +32,16 @@ int fakeclock_gettime(clockid_t clockid, struct timespec *tp);
#include <stdatomic.h>
#define RRL_PRICE_LOG 9 // XXX same constant as the hardcoded one in rrl_query function
#define RRL_PRICE_LOG 9 // highest price of all prefixes, same for IPv4 and IPv6
#define RRL_THREADS 8
//#define RRL_SYNC_WITH_REAL_TIME
uint32_t initial_limit(uint16_t price_log) {
return (1 << (16 - price_log)) - 1;
}
// expected limits for parallel test
// expected limits for parallel test (for largest prefix)
#define RRL_FIRST_BLOCKED (1 << (16 - RRL_PRICE_LOG))
#define RRL_INITIAL_LIMIT_MIN (RRL_FIRST_BLOCKED - 1)
#define RRL_INITIAL_LIMIT_MAX (RRL_INITIAL_LIMIT_MIN + RRL_THREADS - 1) // races may occur on first insertion into table
@ -167,12 +170,26 @@ static void* rrl_runnable(void *arg)
}
}
int count_passing_queries(rrl_table_t *rrl, int addr_family, char *format, uint32_t min_value, uint32_t max_value, uint32_t max_queries) {
struct sockaddr_storage addr;
char addr_str[40];
rrl_req_t rq = {0,};
for (size_t i = 0; i < max_queries; i++) {
snprintf(addr_str, sizeof(addr_str), format, i % (max_value - min_value + 1) + min_value);
sockaddr_set(&addr, addr_family, addr_str, 0);
if (rrl_query(rrl, &addr, &rq, NULL, NULL) != KNOT_EOK) {
return i;
}
}
return -1;
}
void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) {
fakeclock_init();
/* 1. create rrl table */
/* create rrl table */
rrl_table_t *rrl = rrl_create(1, 1); // XXX parameters ignored
ok(rrl != NULL, "rrl(%s): create", impl_name);
@ -187,7 +204,7 @@ void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) {
}
/* 2. N unlimited requests. */
/* N unlimited requests. */
struct sockaddr_storage addr;
struct sockaddr_storage addr6;
sockaddr_set(&addr, AF_INET, "1.2.3.4", 0);
@ -202,24 +219,76 @@ void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) {
}
is_int(0, ret, "rrl(%s): unlimited IPv4/v6 requests", impl_name);
/* 3. limited request */
/* limited request */
ret = rrl_query(rrl, &addr, &rq, zone, NULL);
is_int(KNOT_ELIMIT, ret, "rrl(%s): blocked IPv4 request", impl_name);
/* 4. limited IPv6 request */
/* limited IPv6 request */
ret = rrl_query(rrl, &addr6, &rq, zone, NULL);
is_int(KNOT_ELIMIT, ret, "rrl(%s): blocked IPv6 request", impl_name);
/* 5. unblocked request */
/* different namespaces for IPv4 and IPv6 */
ret = count_passing_queries(rrl, AF_INET6, "0102:0304:%x::", 0,0xffff, 1 << 16);
is_int(initial_limit(0), ret, "rrl(%s): different namespaces for IPv4 and IPv6 (needs limit on IPv6 /32)", impl_name);
/* unblocked request */
fakeclock_tick = 32;
ret = rrl_query(rrl, &addr, &rq, zone, NULL);
is_int(KNOT_EOK, ret, "rrl(%s): unblocked IPv4 request", impl_name);
/* 6. unblocked IPv6 request */
/* unblocked IPv6 request */
ret = rrl_query(rrl, &addr6, &rq, zone, NULL);
is_int(KNOT_EOK, ret, "rrl(%s): unblocked IPv6 request", impl_name);
/* 7+. parallel tests */
/* IPv4 multi-prefix tests */
int limit = initial_limit(9);
ret = count_passing_queries(rrl, AF_INET, "128.0.0.0", 0,0, 1 << 16);
is_int(limit, ret, "rrl(%s): IPv4 limit for /32", impl_name);
ret = count_passing_queries(rrl, AF_INET, "128.0.0.1", 0,0, 1);
is_int(-1, ret, "rrl(%s): IPv4 limit for /32 not applied for /31", impl_name);
limit = initial_limit(7) - initial_limit(9) - 1;
ret = count_passing_queries(rrl, AF_INET, "128.0.0.%d", 2,15, 1 << 16);
is_int(limit, ret, "rrl(%s): IPv4 limit for /28", impl_name);
ret = count_passing_queries(rrl, AF_INET, "128.0.0.16", 0,0, 1);
is_int(-1, ret, "rrl(%s): IPv4 limit for /28 not applied for /27", impl_name);
limit = initial_limit(5) - initial_limit(7) - 1;
ret = count_passing_queries(rrl, AF_INET, "128.0.0.%d", 17,255, 1 << 16);
is_int(limit, ret, "rrl(%s): IPv4 limit for /24", impl_name);
ret = count_passing_queries(rrl, AF_INET, "128.0.1.0", 1,1, 1);
is_int(-1, ret, "rrl(%s): IPv4 limit for /24 not applied for /23", impl_name);
/* IPv6 multi-prefix tests */
limit = initial_limit(9);
ret = count_passing_queries(rrl, AF_INET6, "8000::", 0,0, 1 << 16);
is_int(limit, ret, "rrl(%s): IPv6 limit for /128", impl_name);
ret = count_passing_queries(rrl, AF_INET6, "8000::1", 0,0, 1);
is_int(-1, ret, "rrl(%s): IPv6 limit for /128 not applied for /127", impl_name);
limit = initial_limit(7) - initial_limit(9) - 1;
ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:0:%02x00::", 0x01,0xff, 1 << 16);
is_int(limit, ret, "rrl(%s): IPv6 limit for /64", impl_name);
ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:1::", 0,0, 1);
is_int(-1, ret, "rrl(%s): IPv6 limit for /64 not applied for /63", impl_name);
limit = initial_limit(5) - initial_limit(7) - 1;
ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:00%02x::", 0x01,0xff, 1 << 16);
is_int(limit, ret, "rrl(%s): IPv6 limit for /56", impl_name);
ret = count_passing_queries(rrl, AF_INET6, "8000:0:0:0100", 1,1, 1);
is_int(-1, ret, "rrl(%s): IPv6 limit for /56 not applied for /55", impl_name);
// prefix /32 is not tested here
/* parallel tests */
struct stage stages[] = {
/* first tick, last tick, hosts */
{32, 32, {
@ -280,7 +349,7 @@ void test_rrl(char *impl_name, rrl_req_t rq, knot_dname_t *zone) {
uint32_t ticks = stages[si].last_tick - stages[si].first_tick + 1;
for (size_t i = 0; h[i].queries_per_tick; i++) {
ok( h[i].min_passed * ticks <= h[i].passed && h[i].passed <= h[i].max_passed * ticks,
"rrl(%s): stage %d, addr %s: %.2f <= %.4f <= %.2f", impl_name, si, h[i].addr_format, h[i].min_passed, (double)h[i].passed / ticks, h[i].max_passed);
"rrl(%s): parallel stage %d, addr %s: %.2f <= %.4f <= %.2f", impl_name, si, h[i].addr_format, h[i].min_passed, (double)h[i].passed / ticks, h[i].max_passed);
}
} while (stages[++si].first_tick);