This commit is contained in:
Victor Julien 2026-04-09 01:15:52 -04:00 committed by GitHub
commit 335fdf006f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 181 additions and 112 deletions

View file

@ -159,11 +159,8 @@ int RunModeFilePcapAutoFp(void)
snprintf(tname, sizeof(tname), "%s#01", thread_name_autofp);
/* create the threads */
ThreadVars *tv_receivepcap =
TmThreadCreatePacketHandler(tname,
"packetpool", "packetpool",
queues, "flow",
"pktacqloop");
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler(
tname, "packetpool", "packetpool", queues, "flow", "pktacqloop");
SCFree(queues);
if (tv_receivepcap == NULL) {
@ -194,11 +191,8 @@ int RunModeFilePcapAutoFp(void)
SCLogDebug("tname %s, qname %s", tname, qname);
SCLogDebug("Assigning %s affinity to cpu %u", tname, cpu);
ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(tname,
qname, "flow",
"packetpool", "packetpool",
"varslot");
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(
tname, qname, "flow", "packetpool", "packetpool", "varslot");
if (tv_detect_ncpu == NULL) {
FatalError("TmThreadsCreate failed");
}

View file

@ -944,6 +944,7 @@ next_frame:
break;
}
}
TmThreadFlushOutQueue(ptv->tv);
if (emergency_flush) {
AFPDumpCounters(ptv);
}
@ -1084,9 +1085,11 @@ static int AFPReadFromRingV3(AFPThreadVars *ptv)
ptv->frame_offset = (ptv->frame_offset + 1) % ptv->req.v3.tp_block_nr;
/* return to maintenance task after one loop on the ring */
if (ptv->frame_offset == 0) {
TmThreadFlushOutQueue(ptv->tv);
SCReturnInt(AFP_READ_OK);
}
}
TmThreadFlushOutQueue(ptv->tv);
SCReturnInt(AFP_READ_OK);
}

View file

@ -1037,6 +1037,7 @@ TmEcode ReceiveNFQLoop(ThreadVars *tv, void *data, void *slot)
}
NFQRecvPkt(nq, ntv);
TmThreadFlushOutQueue(tv);
StatsSyncCountersIfSignalled(&tv->stats);
}
SCReturnInt(TM_ECODE_OK);

View file

@ -229,6 +229,7 @@ TmEcode PcapFileDispatch(PcapFileFileVars *ptv)
SCLogError("Pcap callback PcapFileCallbackLoop failed for %s", ptv->filename);
loop_result = TM_ECODE_FAILED;
}
TmThreadFlushOutQueue(ptv->shared->tv);
StatsSyncCountersIfSignalled(&ptv->shared->tv->stats);
}

View file

@ -88,7 +88,7 @@ typedef struct ThreadVars_ {
/** incoming queue and handler */
Tmq *inq;
struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);
struct PacketQueueNoLock_ (*TmqhInFn)(struct ThreadVars_ *);
SC_ATOMIC_DECLARE(uint32_t, flags);
@ -103,7 +103,7 @@ typedef struct ThreadVars_ {
/** outgoing queue and handler */
Tmq *outq;
void *outctx;
void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);
void (*TmqhOutFn)(struct ThreadVars_ *, struct Packet_ *);
/** Queue for decoders to temporarily store extra packets they
* generate. These packets are generated as part of the tunnel

View file

@ -24,6 +24,8 @@
#ifndef SURICATA_TM_QUEUEHANDLERS_H
#define SURICATA_TM_QUEUEHANDLERS_H
#include "packet-queue.h"
enum {
TMQH_NOT_SET,
TMQH_SIMPLE,
@ -35,12 +37,15 @@ enum {
typedef struct Tmqh_ {
const char *name;
Packet *(*InHandler)(ThreadVars *);
PacketQueueNoLock (*InHandler)(ThreadVars *);
void (*InShutdownHandler)(ThreadVars *);
void (*OutHandler)(ThreadVars *, Packet *);
void (*OutFlush)(ThreadVars *);
void *(*OutHandlerCtxSetup)(const char *);
void (*OutHandlerCtxFree)(void *);
#ifdef UNITTESTS
void (*RegisterTests)(void);
#endif
} Tmqh;
extern Tmqh tmqh_table[TMQH_SIZE];

View file

@ -316,11 +316,11 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
TmEcode r = TM_ECODE_OK;
/* check if we are setup properly */
if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
if (s == NULL || s->PktAcqLoop == NULL || tv->TmqhInFn == NULL || tv->TmqhOutFn == NULL) {
SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
" PktAcqLoop=%p, tmqh_in=%p,"
" tmqh_out=%p",
s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
" PktAcqLoop=%p, TmqhInFn=%p,"
" TmqhOutFn=%p",
s, s ? s->PktAcqLoop : NULL, tv->TmqhInFn, tv->TmqhOutFn);
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit(NULL);
return NULL;
@ -386,10 +386,10 @@ static void *TmThreadsLib(void *td)
TmSlot *s = tv->tm_slots;
/* check if we are setup properly */
if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
" tmqh_out=%p",
s, tv->tmqh_in, tv->tmqh_out);
if (s == NULL || tv->TmqhInFn == NULL || tv->TmqhOutFn == NULL) {
SCLogError("TmSlot or ThreadVars badly setup: s=%p, TmqhInFn=%p,"
" TmqhOutFn=%p",
s, tv->TmqhInFn, tv->TmqhOutFn);
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
return NULL;
}
@ -428,7 +428,7 @@ static void *TmThreadsSlotVar(void *td)
SCDropCaps(tv);
/* check if we are setup properly */
if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
if (s == NULL || tv->TmqhInFn == NULL || tv->TmqhOutFn == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit(NULL);
return NULL;
@ -489,39 +489,43 @@ static void *TmThreadsSlotVar(void *td)
s = (TmSlot *)tv->tm_slots;
while (run) {
/* input a packet */
p = tv->tmqh_in(tv);
/* get packet(s) from the queue. Can get more than one so should loop
* over it. */
PacketQueueNoLock q = tv->TmqhInFn(tv);
do {
p = PacketDequeueNoLock(&q);
/* if we didn't get a packet see if we need to do some housekeeping */
if (unlikely(p == NULL)) {
if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
p = PacketGetFromQueueOrAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
/* if we didn't get a packet see if we need to do some housekeeping */
if (unlikely(p == NULL)) {
if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
p = PacketGetFromQueueOrAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
}
}
}
}
if (p != NULL) {
/* run the thread module(s) */
r = TmThreadsSlotVarRun(tv, p, s);
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
if (p != NULL) {
/* run the thread module(s) */
r = TmThreadsSlotVarRun(tv, p, s);
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
/* output the packet */
tv->TmqhOutFn(tv, p);
/* now handle the stream pq packets */
TmThreadsHandleInjectedPackets(tv);
}
/* output the packet */
tv->tmqh_out(tv, p);
/* now handle the stream pq packets */
TmThreadsHandleInjectedPackets(tv);
}
if (TmThreadsCheckFlag(tv, (THV_KILL | THV_REQ_FLOW_LOOP))) {
run = false;
}
if (TmThreadsCheckFlag(tv, (THV_KILL | THV_REQ_FLOW_LOOP))) {
run = false;
}
} while (q.len != 0);
}
if (!SCTmThreadsSlotPacketLoopFinish(tv)) {
goto error;
@ -987,9 +991,9 @@ ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *i
if (tmqh == NULL)
goto error;
tv->tmqh_in = tmqh->InHandler;
tv->TmqhInFn = tmqh->InHandler;
tv->inq_id = (uint8_t)id;
SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
SCLogDebug("tv->TmqhInFn %p", tv->TmqhInFn);
}
/* set the outgoing queue */
@ -1005,7 +1009,7 @@ ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *i
if (tmqh == NULL)
goto error;
tv->tmqh_out = tmqh->OutHandler;
tv->TmqhOutFn = tmqh->OutHandler;
tv->outq_id = (uint8_t)id;
if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {

View file

@ -27,6 +27,7 @@
#include "tmqh-packetpool.h"
#include "tm-threads-common.h"
#include "tm-queuehandlers.h"
#include "tm-modules.h"
#include "flow.h" // for the FlowQueue
@ -180,7 +181,7 @@ static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
TmThreadsSlotProcessPktFail(tv, extra_p);
break;
}
tv->tmqh_out(tv, extra_p);
tv->TmqhOutFn(tv, extra_p);
}
return true;
} else {
@ -194,7 +195,7 @@ static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
{
if (s == NULL) {
tv->tmqh_out(tv, p);
tv->TmqhOutFn(tv, p);
return TM_ECODE_OK;
}
@ -204,7 +205,7 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
return TM_ECODE_FAILED;
}
tv->tmqh_out(tv, p);
tv->TmqhOutFn(tv, p);
TmThreadsHandleInjectedPackets(tv);
@ -257,7 +258,7 @@ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
/* packet could have been passed to us that we won't use
* return it to the pool. */
if (p != NULL)
tv->tmqh_out(tv, p);
tv->TmqhOutFn(tv, p);
}
static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
@ -278,6 +279,18 @@ static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
}
}
static inline void TmThreadFlushOutQueue(ThreadVars *tv)
{
if (tv->outctx != NULL && tv->outq_id != TMQH_NOT_SET) {
Tmqh *qh = TmqhGetQueueHandlerByID(tv->outq_id);
if (qh != NULL) {
if (qh->OutFlush != NULL) {
qh->OutFlush(tv);
}
}
}
}
void TmThreadsSealThreads(void);
void TmThreadsUnsealThreads(void);
void TmThreadsListThreads(void);

View file

@ -1,4 +1,4 @@
/* Copyright (C) 2007-2020 Open Information Security Foundation
/* Copyright (C) 2007-2025 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
@ -40,13 +40,18 @@
#include "conf.h"
#include "util-unittest.h"
Packet *TmqhInputFlow(ThreadVars *t);
PacketQueueNoLock TmqhInputFlow(ThreadVars *t);
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
static void TmqhOutputFlowFTPHash(ThreadVars *t, Packet *p);
static void TmqhOutputFlush(ThreadVars *tv);
void *TmqhOutputFlowSetupCtx(const char *queue_str);
void TmqhOutputFlowFreeCtx(void *ctx);
#ifdef UNITTESTS
void TmqhFlowRegisterTests(void);
#endif
static uint16_t g_sync_pkts = 8;
void TmqhFlowRegister(void)
{
@ -54,8 +59,9 @@ void TmqhFlowRegister(void)
tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
tmqh_table[TMQH_FLOW].OutHandlerCtxFree = TmqhOutputFlowFreeCtx;
#ifdef UNITTESTS
tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
#endif
const char *scheduler = NULL;
if (SCConfGet("autofp-scheduler", &scheduler) == 1) {
if (strcasecmp(scheduler, "round-robin") == 0) {
@ -79,13 +85,19 @@ void TmqhFlowRegister(void)
} else {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
}
intmax_t sync_pkts;
if (SCConfGetInt("autofp-batch-size", &sync_pkts) != 1)
sync_pkts = 8;
g_sync_pkts = (uint16_t)sync_pkts;
SCLogNotice("autofp-block-size: %u", g_sync_pkts);
tmqh_table[TMQH_FLOW].OutFlush = TmqhOutputFlush;
}
void TmqhFlowPrintAutofpHandler(void)
{
#define PRINT_IF_FUNC(f, msg) \
if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
#define PRINT_IF_FUNC(f, msg) \
if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
@ -95,27 +107,21 @@ void TmqhFlowPrintAutofpHandler(void)
}
/* same as 'simple' */
Packet *TmqhInputFlow(ThreadVars *tv)
PacketQueueNoLock TmqhInputFlow(ThreadVars *tv)
{
PacketQueue *q = tv->inq->pq;
StatsSyncCountersIfSignalled(&tv->stats);
PacketQueue *q = tv->inq->pq;
SCMutexLock(&q->mutex_q);
if (q->len == 0) {
/* if we have no packets in queue, wait... */
SCCondWait(&q->cond_q, &q->mutex_q);
}
if (q->len > 0) {
Packet *p = PacketDequeue(q);
SCMutexUnlock(&q->mutex_q);
return p;
} else {
/* return NULL if we have no pkt. Should only happen on signals. */
SCMutexUnlock(&q->mutex_q);
return NULL;
}
PacketQueueNoLock pq = { .top = q->top, .bot = q->bot, q->len };
q->bot = q->top = NULL;
q->len = 0;
SCMutexUnlock(&q->mutex_q);
return pq;
}
static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
@ -148,6 +154,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
ctx->queues[ctx->size - 1].q = tmq->pq;
SCLogNotice("size %d, mem %u", ctx->size, (uint32_t)(ctx->size * sizeof(TmqhFlowMode)));
return 0;
}
@ -172,6 +179,8 @@ void *TmqhOutputFlowSetupCtx(const char *queue_str)
TmqhFlowCtx *ctx = SCCalloc(1, sizeof(TmqhFlowCtx));
if (unlikely(ctx == NULL))
return NULL;
SCLogNotice("ctx %p", ctx);
ctx->last_flow_qid = 0xffff;
char *str = SCStrdup(queue_str);
if (unlikely(str == NULL)) {
@ -181,16 +190,16 @@ void *TmqhOutputFlowSetupCtx(const char *queue_str)
/* parse the comma separated string */
do {
char *comma = strchr(tstr,',');
char *comma = strchr(tstr, ',');
if (comma != NULL) {
*comma = '\0';
char *qname = tstr;
int r = StoreQueueId(ctx,qname);
int r = StoreQueueId(ctx, qname);
if (r < 0)
goto error;
} else {
char *qname = tstr;
int r = StoreQueueId(ctx,qname);
int r = StoreQueueId(ctx, qname);
if (r < 0)
goto error;
}
@ -211,20 +220,61 @@ void TmqhOutputFlowFreeCtx(void *ctx)
{
TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
fctx->size);
SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16, fctx->size);
SCFree(fctx->queues);
SCFree(fctx);
}
static void Flush(ThreadVars *tv, TmqhFlowCtx *ctx, const uint32_t new_qid)
{
SCLogDebug("flushing %p with new_qid %x last_flow_hash_cnt %u last_flow_qid %x", ctx, new_qid,
ctx->last_flow_hash_cnt, ctx->last_flow_qid);
const uint32_t old_qid = ctx->last_flow_qid;
ctx->last_flow_qid = new_qid;
ctx->last_flow_hash_cnt = 1;
const uint32_t qid = old_qid;
if (qid < ctx->size) {
SCLogDebug("flushing %p qid %x", ctx, qid);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
do {
Packet *p = PacketDequeueNoLock(&ctx->last_flow_hash_queue);
PacketEnqueue(q, p);
} while (ctx->last_flow_hash_queue.len > 0);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
}
static void TmqhOutputFlush(ThreadVars *tv)
{
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
Flush(tv, ctx, 0xffff);
}
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
{
uint32_t qid;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
if (p->flags & PKT_WANTS_FLOW) {
uint32_t hash = p->flow_hash;
const uint32_t hash = p->flow_hash;
qid = hash % ctx->size;
if (qid != ctx->last_flow_qid || ctx->last_flow_hash_cnt == g_sync_pkts) {
Flush(tv, ctx, qid);
} else {
ctx->last_flow_hash_cnt++;
if (ctx->last_flow_hash_cnt_max < ctx->last_flow_hash_cnt) {
ctx->last_flow_hash_cnt_max = ctx->last_flow_hash_cnt;
SCLogNotice("max same hash cnt %u", ctx->last_flow_hash_cnt_max);
}
PacketEnqueueNoLock(&ctx->last_flow_hash_queue, p);
return;
}
} else {
qid = ctx->last++;
@ -391,16 +441,12 @@ static int TmqhOutputFlowSetupCtxTest03(void)
PASS;
}
#endif /* UNITTESTS */
void TmqhFlowRegisterTests(void)
{
#ifdef UNITTESTS
UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
TmqhOutputFlowSetupCtxTest01);
UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
TmqhOutputFlowSetupCtxTest02);
UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
TmqhOutputFlowSetupCtxTest03);
#endif
UtRegisterTest("TmqhOutputFlowSetupCtxTest03", TmqhOutputFlowSetupCtxTest03);
}
#endif /* UNITTESTS */

View file

@ -1,4 +1,4 @@
/* Copyright (C) 2007-2010 Open Information Security Foundation
/* Copyright (C) 2007-2026 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
@ -33,12 +33,18 @@ typedef struct TmqhFlowMode_ {
* \param queues array of queue id's this flow handler outputs to */
typedef struct TmqhFlowCtx_ {
uint16_t size;
/* last for round robin non-flow distributions */
uint16_t last;
uint16_t last_flow_qid;
uint32_t last_flow_hash_cnt;
uint32_t last_flow_hash_cnt_max;
PacketQueueNoLock last_flow_hash_queue;
TmqhFlowMode *queues;
} TmqhFlowCtx;
void TmqhFlowRegister (void);
void TmqhFlowRegister(void);
void TmqhFlowRegisterTests(void);
void TmqhFlowPrintAutofpHandler(void);

View file

@ -48,6 +48,8 @@ static inline PktPool *GetThreadPacketPool(void)
return &thread_pkt_pool;
}
static PacketQueueNoLock TmqhInputPacketpool(ThreadVars *);
/**
* \brief TmqhPacketpoolRegister
* \initonly
@ -297,9 +299,11 @@ void PacketPoolDestroy(void)
#endif /* DEBUG_VALIDATION */
}
Packet *TmqhInputPacketpool(ThreadVars *tv)
static PacketQueueNoLock TmqhInputPacketpool(ThreadVars *tv)
{
return PacketPoolGetPacket();
Packet *p = PacketPoolGetPacket();
PacketQueueNoLock pq = { .top = p, .bot = p, .len = (p != NULL) };
return pq;
}
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)

View file

@ -72,7 +72,6 @@ typedef struct PktPool_ {
PktPoolLockedStack return_stack;
} PktPool;
Packet *TmqhInputPacketpool(ThreadVars *);
void TmqhOutputPacketpool(ThreadVars *, Packet *);
void TmqhReleasePacketsToPacketPool(PacketQueue *);
void TmqhPacketpoolRegister(void);

View file

@ -32,9 +32,9 @@
#include "tm-queuehandlers.h"
#include "tmqh-simple.h"
Packet *TmqhInputSimple(ThreadVars *t);
void TmqhOutputSimple(ThreadVars *t, Packet *p);
void TmqhInputSimpleShutdownHandler(ThreadVars *);
static PacketQueueNoLock TmqhInputSimple(ThreadVars *t);
static void TmqhOutputSimple(ThreadVars *t, Packet *p);
static void TmqhInputSimpleShutdownHandler(ThreadVars *);
void TmqhSimpleRegister (void)
{
@ -44,31 +44,24 @@ void TmqhSimpleRegister (void)
tmqh_table[TMQH_SIMPLE].OutHandler = TmqhOutputSimple;
}
Packet *TmqhInputSimple(ThreadVars *t)
PacketQueueNoLock TmqhInputSimple(ThreadVars *tv)
{
PacketQueue *q = t->inq->pq;
StatsSyncCountersIfSignalled(&t->stats);
StatsSyncCountersIfSignalled(&tv->stats);
PacketQueue *q = tv->inq->pq;
SCMutexLock(&q->mutex_q);
if (q->len == 0) {
/* if we have no packets in queue, wait... */
SCCondWait(&q->cond_q, &q->mutex_q);
}
if (q->len > 0) {
Packet *p = PacketDequeue(q);
SCMutexUnlock(&q->mutex_q);
return p;
} else {
/* return NULL if we have no pkt. Should only happen on signals. */
SCMutexUnlock(&q->mutex_q);
return NULL;
}
PacketQueueNoLock pq = { .top = q->top, .bot = q->bot, q->len };
q->bot = q->top = NULL;
q->len = 0;
SCMutexUnlock(&q->mutex_q);
return pq;
}
void TmqhInputSimpleShutdownHandler(ThreadVars *tv)
static void TmqhInputSimpleShutdownHandler(ThreadVars *tv)
{
int i;
@ -83,7 +76,7 @@ void TmqhInputSimpleShutdownHandler(ThreadVars *tv)
}
}
void TmqhOutputSimple(ThreadVars *t, Packet *p)
static void TmqhOutputSimple(ThreadVars *t, Packet *p)
{
SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, BOOL2STR(p->pool == NULL));