From dc9643853d3ef2d2f7ffd8582b4299da09727128 Mon Sep 17 00:00:00 2001 From: John Baldwin Date: Sat, 7 May 2016 00:33:35 +0000 Subject: [PATCH] Use DDP to implement zerocopy TCP receive with aio_read(). Chelsio's TCP offload engine supports direct DMA of received TCP payload into wired user buffers. This feature is known as Direct-Data Placement. However, to scale well the adapter needs to prepare buffers for DDP before data arrives. aio_read() is more amenable to this requirement than read() as applications often call read() only after data is available in the socket buffer. When DDP is enabled, TOE sockets use the recently added pru_aio_queue protocol hook to claim aio_read(2) requests instead of letting them use the default AIO socket logic. The DDP feature supports scheduling DMA to two buffers at a time so that the second buffer is ready for use after the first buffer is filled. The aio/DDP code optimizes the case of an application ping-ponging between two buffers (similar to the zero-copy bpf(4) code) by keeping the two most recently used AIO buffers wired. If a buffer is reused, the aio/DDP code is able to reuse the vm_page_t array as well as page pod mappings (a kind of MMU mapping the Chelsio NIC uses to describe user buffers). The generation of the vmspace of the calling process is used in conjunction with the user buffer's address and length to determine if a user buffer matches a previously used buffer. If an application queues a buffer for AIO that does not match a previously used buffer then the least recently used buffer is unwired before the new buffer is wired. This ensures that no more than two user buffers per socket are ever wired. Note that this feature is best suited to applications sending a steady stream of data vs short bursts of traffic. Discussed with: np Relnotes: yes Sponsored by: Chelsio Communications --- sys/dev/cxgbe/offload.h | 2 - sys/dev/cxgbe/t4_main.c | 9 - sys/dev/cxgbe/tom/t4_cpl_io.c | 119 ++- sys/dev/cxgbe/tom/t4_ddp.c | 1818 +++++++++++++++++++++------------ sys/dev/cxgbe/tom/t4_tom.c | 36 +- sys/dev/cxgbe/tom/t4_tom.h | 63 +- 6 files changed, 1308 insertions(+), 739 deletions(-) diff --git a/sys/dev/cxgbe/offload.h b/sys/dev/cxgbe/offload.h index d1363c4a8c2..22612d5156a 100644 --- a/sys/dev/cxgbe/offload.h +++ b/sys/dev/cxgbe/offload.h @@ -145,8 +145,6 @@ struct uld_info { struct tom_tunables { int sndbuf; int ddp; - int indsz; - int ddp_thres; int rx_coalesce; int tx_align; }; diff --git a/sys/dev/cxgbe/t4_main.c b/sys/dev/cxgbe/t4_main.c index 1e86aa9516a..132d69e1c3e 100644 --- a/sys/dev/cxgbe/t4_main.c +++ b/sys/dev/cxgbe/t4_main.c @@ -4901,15 +4901,6 @@ t4_sysctls(struct adapter *sc) SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp", CTLFLAG_RW, &sc->tt.ddp, 0, "DDP allowed"); - sc->tt.indsz = G_INDICATESIZE(t4_read_reg(sc, A_TP_PARA_REG5)); - SYSCTL_ADD_INT(ctx, children, OID_AUTO, "indsz", CTLFLAG_RW, - &sc->tt.indsz, 0, "DDP max indicate size allowed"); - - sc->tt.ddp_thres = - G_RXCOALESCESIZE(t4_read_reg(sc, A_TP_PARA_REG2)); - SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp_thres", CTLFLAG_RW, - &sc->tt.ddp_thres, 0, "DDP threshold"); - sc->tt.rx_coalesce = 1; SYSCTL_ADD_INT(ctx, children, OID_AUTO, "rx_coalesce", CTLFLAG_RW, &sc->tt.rx_coalesce, 0, "receive coalescing"); diff --git a/sys/dev/cxgbe/tom/t4_cpl_io.c b/sys/dev/cxgbe/tom/t4_cpl_io.c index dd6d5b52c47..47109774bdb 100644 --- a/sys/dev/cxgbe/tom/t4_cpl_io.c +++ b/sys/dev/cxgbe/tom/t4_cpl_io.c @@ -343,7 +343,7 @@ send_rx_credits(struct adapter *sc, struct toepcb *toep, int credits) } void -t4_rcvd(struct toedev *tod, struct tcpcb *tp) +t4_rcvd_locked(struct toedev *tod, struct tcpcb *tp) { struct adapter *sc = tod->tod_softc; struct inpcb *inp = tp->t_inpcb; @@ -354,7 +354,7 @@ t4_rcvd(struct toedev *tod, struct tcpcb *tp) INP_WLOCK_ASSERT(inp); - SOCKBUF_LOCK(sb); + SOCKBUF_LOCK_ASSERT(sb); KASSERT(toep->sb_cc >= sbused(sb), ("%s: sb %p has more data (%d) than last time (%d).", __func__, sb, sbused(sb), toep->sb_cc)); @@ -372,6 +372,17 @@ t4_rcvd(struct toedev *tod, struct tcpcb *tp) tp->rcv_wnd += credits; tp->rcv_adv += credits; } +} + +void +t4_rcvd(struct toedev *tod, struct tcpcb *tp) +{ + struct inpcb *inp = tp->t_inpcb; + struct socket *so = inp->inp_socket; + struct sockbuf *sb = &so->so_rcv; + + SOCKBUF_LOCK(sb); + t4_rcvd_locked(tod, tp); SOCKBUF_UNLOCK(sb); } @@ -1042,7 +1053,6 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) struct inpcb *inp = toep->inp; struct tcpcb *tp = NULL; struct socket *so; - struct sockbuf *sb; #ifdef INVARIANTS unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl))); #endif @@ -1088,12 +1098,14 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) tp->rcv_nxt++; /* FIN */ so = inp->inp_socket; - sb = &so->so_rcv; - SOCKBUF_LOCK(sb); - if (__predict_false(toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE))) { - handle_ddp_close(toep, tp, sb, cpl->rcv_nxt); + if (toep->ulp_mode == ULP_MODE_TCPDDP) { + DDP_LOCK(toep); + if (__predict_false(toep->ddp_flags & + (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE))) + handle_ddp_close(toep, tp, cpl->rcv_nxt); + DDP_UNLOCK(toep); } - socantrcvmore_locked(so); /* unlocks the sockbuf */ + socantrcvmore(so); if (toep->ulp_mode != ULP_MODE_RDMA) { KASSERT(tp->rcv_nxt == be32toh(cpl->rcv_nxt), @@ -1409,6 +1421,8 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) tp->rcv_wnd -= len; tp->t_rcvtime = ticks; + if (toep->ulp_mode == ULP_MODE_TCPDDP) + DDP_LOCK(toep); so = inp_inpcbtosocket(inp); sb = &so->so_rcv; SOCKBUF_LOCK(sb); @@ -1418,6 +1432,8 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) __func__, tid, len); m_freem(m); SOCKBUF_UNLOCK(sb); + if (toep->ulp_mode == ULP_MODE_TCPDDP) + DDP_UNLOCK(toep); INP_WUNLOCK(inp); INP_INFO_RLOCK(&V_tcbinfo); @@ -1446,6 +1462,10 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) toep->rx_credits += newsize - hiwat; } + if (toep->ddp_waiting_count != 0 || toep->ddp_active_count != 0) + CTR3(KTR_CXGBE, "%s: tid %u, non-ddp rx (%d bytes)", __func__, + tid, len); + if (toep->ulp_mode == ULP_MODE_TCPDDP) { int changed = !(toep->ddp_flags & DDP_ON) ^ cpl->ddp_off; @@ -1458,47 +1478,22 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) __func__)); /* Fell out of DDP mode */ - toep->ddp_flags &= ~(DDP_ON | DDP_BUF0_ACTIVE | - DDP_BUF1_ACTIVE); + toep->ddp_flags &= ~DDP_ON; + CTR1(KTR_CXGBE, "%s: fell out of DDP mode", + __func__); - if (ddp_placed) - insert_ddp_data(toep, ddp_placed); + insert_ddp_data(toep, ddp_placed); } } - if ((toep->ddp_flags & DDP_OK) == 0 && - time_uptime >= toep->ddp_disabled + DDP_RETRY_WAIT) { - toep->ddp_score = DDP_LOW_SCORE; - toep->ddp_flags |= DDP_OK; - CTR3(KTR_CXGBE, "%s: tid %u DDP_OK @ %u", - __func__, tid, time_uptime); - } - if (toep->ddp_flags & DDP_ON) { - /* - * CPL_RX_DATA with DDP on can only be an indicate. Ask - * soreceive to post a buffer or disable DDP. The - * payload that arrived in this indicate is appended to - * the socket buffer as usual. + * CPL_RX_DATA with DDP on can only be an indicate. + * Start posting queued AIO requests via DDP. The + * payload that arrived in this indicate is appended + * to the socket buffer as usual. */ - -#if 0 - CTR5(KTR_CXGBE, - "%s: tid %u (0x%x) DDP indicate (seq 0x%x, len %d)", - __func__, tid, toep->flags, be32toh(cpl->seq), len); -#endif - sb->sb_flags |= SB_DDP_INDICATE; - } else if ((toep->ddp_flags & (DDP_OK|DDP_SC_REQ)) == DDP_OK && - tp->rcv_wnd > DDP_RSVD_WIN && len >= sc->tt.ddp_thres) { - - /* - * DDP allowed but isn't on (and a request to switch it - * on isn't pending either), and conditions are ripe for - * it to work. Switch it on. - */ - - enable_ddp(sc, toep); + handle_ddp_indicate(toep); } } @@ -1516,8 +1511,16 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) tp->rcv_wnd += credits; tp->rcv_adv += credits; } + + if (toep->ddp_waiting_count > 0 && sbavail(sb) != 0) { + CTR2(KTR_CXGBE, "%s: tid %u queueing AIO task", __func__, + tid); + ddp_queue_toep(toep); + } sorwakeup_locked(so); SOCKBUF_UNLOCK_ASSERT(sb); + if (toep->ulp_mode == ULP_MODE_TCPDDP) + DDP_UNLOCK(toep); INP_WUNLOCK(inp); CURVNET_RESTORE(); @@ -1680,6 +1683,7 @@ do_set_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) struct adapter *sc = iq->adapter; const struct cpl_set_tcb_rpl *cpl = (const void *)(rss + 1); unsigned int tid = GET_TID(cpl); + struct toepcb *toep; #ifdef INVARIANTS unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl))); #endif @@ -1691,6 +1695,12 @@ do_set_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) if (is_ftid(sc, tid)) return (t4_filter_rpl(iq, rss, m)); /* TCB is a filter */ + toep = lookup_tid(sc, tid); + if (toep->ulp_mode == ULP_MODE_TCPDDP) { + handle_ddp_tcb_rpl(toep, cpl); + return (0); + } + /* * TOM and/or other ULPs don't request replies for CPL_SET_TCB or * CPL_SET_TCB_FIELD requests. This can easily change and when it does @@ -1731,6 +1741,31 @@ t4_set_tcb_field(struct adapter *sc, struct toepcb *toep, int ctrl, t4_wrq_tx(sc, wr); } +void +t4_set_tcb_field_rpl(struct adapter *sc, struct toepcb *toep, int ctrl, + uint16_t word, uint64_t mask, uint64_t val, uint8_t cookie) +{ + struct wrqe *wr; + struct cpl_set_tcb_field *req; + + KASSERT((cookie & ~M_COOKIE) == 0, ("%s: invalid cookie %#x", __func__, + cookie)); + wr = alloc_wrqe(sizeof(*req), ctrl ? toep->ctrlq : toep->ofld_txq); + if (wr == NULL) { + /* XXX */ + panic("%s: allocation failure.", __func__); + } + req = wrtod(wr); + + INIT_TP_WR_MIT_CPL(req, CPL_SET_TCB_FIELD, toep->tid); + req->reply_ctrl = htobe16(V_QUEUENO(toep->ofld_rxq->iq.abs_id)); + req->word_cookie = htobe16(V_WORD(word) | V_COOKIE(cookie)); + req->mask = htobe64(mask); + req->val = htobe64(val); + + t4_wrq_tx(sc, wr); +} + void t4_init_cpl_io_handlers(struct adapter *sc) { diff --git a/sys/dev/cxgbe/tom/t4_ddp.c b/sys/dev/cxgbe/tom/t4_ddp.c index 2aa774d0139..8cba8ca817b 100644 --- a/sys/dev/cxgbe/tom/t4_ddp.c +++ b/sys/dev/cxgbe/tom/t4_ddp.c @@ -31,7 +31,8 @@ __FBSDID("$FreeBSD$"); #include "opt_inet.h" #include -#include +#include +#include #include #include #include @@ -41,6 +42,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #include #include #include @@ -72,7 +74,10 @@ VNET_DECLARE(int, tcp_autorcvbuf_inc); VNET_DECLARE(int, tcp_autorcvbuf_max); #define V_tcp_autorcvbuf_max VNET(tcp_autorcvbuf_max) -static struct mbuf *get_ddp_mbuf(int len); +static void aio_ddp_requeue_task(void *context, int pending); +static void ddp_complete_all(struct toepcb *toep, int error); +static void t4_aio_cancel_active(struct kaiocb *job); +static void t4_aio_cancel_queued(struct kaiocb *job); #define PPOD_SZ(n) ((n) * sizeof(struct pagepod)) #define PPOD_SIZE (PPOD_SZ(1)) @@ -80,6 +85,10 @@ static struct mbuf *get_ddp_mbuf(int len); /* XXX: must match A_ULP_RX_TDDP_PSZ */ static int t4_ddp_pgsz[] = {4096, 4096 << 2, 4096 << 4, 4096 << 6}; +static TAILQ_HEAD(, pageset) ddp_orphan_pagesets; +static struct mtx ddp_orphan_pagesets_lock; +static struct task ddp_orphan_task; + #define MAX_DDP_BUFFER_SIZE (M_TCB_RX_DDP_BUF0_LEN) static int alloc_ppods(struct tom_data *td, int n, u_int *ppod_addr) @@ -112,33 +121,199 @@ pages_to_nppods(int npages, int ddp_pgsz) return (howmany(nsegs, PPOD_PAGES)); } +/* + * A page set holds information about a buffer used for DDP. The page + * set holds resources such as the VM pages backing the buffer (either + * held or wired) and the page pods associated with the buffer. + * Recently used page sets are cached to allow for efficient reuse of + * buffers (avoiding the need to re-fault in pages, hold them, etc.). + * Note that cached page sets keep the backing pages wired. The + * number of wired pages is capped by only allowing for two wired + * pagesets per connection. This is not a perfect cap, but is a + * trade-off for performance. + * + * If an application ping-pongs two buffers for a connection via + * aio_read(2) then those buffers should remain wired and expensive VM + * fault lookups should be avoided after each buffer has been used + * once. If an application uses more than two buffers then this will + * fall back to doing expensive VM fault lookups for each operation. + */ +static void +free_pageset(struct tom_data *td, struct pageset *ps) +{ + vm_page_t p; + int i; + + if (ps->nppods > 0) + free_ppods(td, ps->ppod_addr, ps->nppods); + + if (ps->flags & PS_WIRED) { + for (i = 0; i < ps->npages; i++) { + p = ps->pages[i]; + vm_page_lock(p); + vm_page_unwire(p, PQ_INACTIVE); + vm_page_unlock(p); + } + } else + vm_page_unhold_pages(ps->pages, ps->npages); + mtx_lock(&ddp_orphan_pagesets_lock); + TAILQ_INSERT_TAIL(&ddp_orphan_pagesets, ps, link); + taskqueue_enqueue(taskqueue_thread, &ddp_orphan_task); + mtx_unlock(&ddp_orphan_pagesets_lock); +} + +static void +ddp_free_orphan_pagesets(void *context, int pending) +{ + struct pageset *ps; + + mtx_lock(&ddp_orphan_pagesets_lock); + while (!TAILQ_EMPTY(&ddp_orphan_pagesets)) { + ps = TAILQ_FIRST(&ddp_orphan_pagesets); + TAILQ_REMOVE(&ddp_orphan_pagesets, ps, link); + mtx_unlock(&ddp_orphan_pagesets_lock); + if (ps->vm) + vmspace_free(ps->vm); + free(ps, M_CXGBE); + mtx_lock(&ddp_orphan_pagesets_lock); + } + mtx_unlock(&ddp_orphan_pagesets_lock); +} + +static void +recycle_pageset(struct toepcb *toep, struct pageset *ps) +{ + + DDP_ASSERT_LOCKED(toep); + if (!(toep->ddp_flags & DDP_DEAD) && ps->flags & PS_WIRED) { + KASSERT(toep->ddp_cached_count + toep->ddp_active_count < + nitems(toep->db), ("too many wired pagesets")); + TAILQ_INSERT_HEAD(&toep->ddp_cached_pagesets, ps, link); + toep->ddp_cached_count++; + } else + free_pageset(toep->td, ps); +} + +static void +ddp_complete_one(struct kaiocb *job, int error) +{ + long copied; + + /* + * If this job had copied data out of the socket buffer before + * it was cancelled, report it as a short read rather than an + * error. + */ + copied = job->uaiocb._aiocb_private.status; + if (copied != 0 || error == 0) + aio_complete(job, copied, 0); + else + aio_complete(job, -1, error); +} + static void free_ddp_buffer(struct tom_data *td, struct ddp_buffer *db) { - if (db == NULL) - return; + if (db->job) { + /* + * XXX: If we are un-offloading the socket then we + * should requeue these on the socket somehow. If we + * got a FIN from the remote end, then this completes + * any remaining requests with an EOF read. + */ + if (!aio_clear_cancel_function(db->job)) + ddp_complete_one(db->job, 0); + } - if (db->pages) - free(db->pages, M_CXGBE); + if (db->ps) + free_pageset(td, db->ps); +} - if (db->nppods > 0) - free_ppods(td, db->ppod_addr, db->nppods); +void +ddp_init_toep(struct toepcb *toep) +{ - free(db, M_CXGBE); + TAILQ_INIT(&toep->ddp_aiojobq); + TASK_INIT(&toep->ddp_requeue_task, 0, aio_ddp_requeue_task, toep); + toep->ddp_active_id = -1; + mtx_init(&toep->ddp_lock, "t4 ddp", NULL, MTX_DEF); +} + +void +ddp_uninit_toep(struct toepcb *toep) +{ + + mtx_destroy(&toep->ddp_lock); } void release_ddp_resources(struct toepcb *toep) { + struct pageset *ps; int i; + DDP_LOCK(toep); + toep->flags |= DDP_DEAD; for (i = 0; i < nitems(toep->db); i++) { - if (toep->db[i] != NULL) { - free_ddp_buffer(toep->td, toep->db[i]); - toep->db[i] = NULL; - } + free_ddp_buffer(toep->td, &toep->db[i]); } + while ((ps = TAILQ_FIRST(&toep->ddp_cached_pagesets)) != NULL) { + TAILQ_REMOVE(&toep->ddp_cached_pagesets, ps, link); + free_pageset(toep->td, ps); + } + ddp_complete_all(toep, 0); + DDP_UNLOCK(toep); +} + +#ifdef INVARIANTS +void +ddp_assert_empty(struct toepcb *toep) +{ + int i; + + MPASS(!(toep->ddp_flags & DDP_TASK_ACTIVE)); + for (i = 0; i < nitems(toep->db); i++) { + MPASS(toep->db[i].job == NULL); + MPASS(toep->db[i].ps == NULL); + } + MPASS(TAILQ_EMPTY(&toep->ddp_cached_pagesets)); + MPASS(TAILQ_EMPTY(&toep->ddp_aiojobq)); +} +#endif + +static void +complete_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db, + unsigned int db_idx) +{ + unsigned int db_flag; + + toep->ddp_active_count--; + if (toep->ddp_active_id == db_idx) { + if (toep->ddp_active_count == 0) { + KASSERT(toep->db[db_idx ^ 1].job == NULL, + ("%s: active_count mismatch", __func__)); + toep->ddp_active_id = -1; + } else + toep->ddp_active_id ^= 1; + CTR2(KTR_CXGBE, "%s: ddp_active_id = %d", __func__, + toep->ddp_active_id); + } else { + KASSERT(toep->ddp_active_count != 0 && + toep->ddp_active_id != -1, + ("%s: active count mismatch", __func__)); + } + + db->cancel_pending = 0; + db->job = NULL; + recycle_pageset(toep, db->ps); + db->ps = NULL; + + db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + KASSERT(toep->ddp_flags & db_flag, + ("%s: DDP buffer not active. toep %p, ddp_flags 0x%x", + __func__, toep, toep->ddp_flags)); + toep->ddp_flags &= ~db_flag; } /* XXX: handle_ddp_data code duplication */ @@ -147,28 +322,59 @@ insert_ddp_data(struct toepcb *toep, uint32_t n) { struct inpcb *inp = toep->inp; struct tcpcb *tp = intotcpcb(inp); - struct sockbuf *sb = &inp->inp_socket->so_rcv; - struct mbuf *m; + struct ddp_buffer *db; + struct kaiocb *job; + size_t placed; + long copied; + unsigned int db_flag, db_idx; INP_WLOCK_ASSERT(inp); - SOCKBUF_LOCK_ASSERT(sb); + DDP_ASSERT_LOCKED(toep); - m = get_ddp_mbuf(n); tp->rcv_nxt += n; #ifndef USE_DDP_RX_FLOW_CONTROL KASSERT(tp->rcv_wnd >= n, ("%s: negative window size", __func__)); tp->rcv_wnd -= n; #endif - - KASSERT(toep->sb_cc >= sbused(sb), - ("%s: sb %p has more data (%d) than last time (%d).", - __func__, sb, sbused(sb), toep->sb_cc)); - toep->rx_credits += toep->sb_cc - sbused(sb); -#ifdef USE_DDP_RX_FLOW_CONTROL - toep->rx_credits -= n; /* adjust for F_RX_FC_DDP */ +#ifndef USE_DDP_RX_FLOW_CONTROL + toep->rx_credits += n; #endif - sbappendstream_locked(sb, m, 0); - toep->sb_cc = sbused(sb); + CTR2(KTR_CXGBE, "%s: placed %u bytes before falling out of DDP", + __func__, n); + while (toep->ddp_active_count > 0) { + MPASS(toep->ddp_active_id != -1); + db_idx = toep->ddp_active_id; + db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + MPASS((toep->ddp_flags & db_flag) != 0); + db = &toep->db[db_idx]; + job = db->job; + copied = job->uaiocb._aiocb_private.status; + placed = n; + if (placed > job->uaiocb.aio_nbytes - copied) + placed = job->uaiocb.aio_nbytes - copied; + if (!aio_clear_cancel_function(job)) { + /* + * Update the copied length for when + * t4_aio_cancel_active() completes this + * request. + */ + job->uaiocb._aiocb_private.status += placed; + } else if (copied + placed != 0) { + CTR4(KTR_CXGBE, + "%s: completing %p (copied %ld, placed %lu)", + __func__, job, copied, placed); + /* XXX: This always completes if there is some data. */ + aio_complete(job, copied + placed, 0); + } else if (aio_set_cancel_function(job, t4_aio_cancel_queued)) { + TAILQ_INSERT_HEAD(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count++; + } else + aio_cancel(job); + n -= placed; + complete_ddp_buffer(toep, db, db_idx); + } + + MPASS(n == 0); } /* SET_TCB_FIELD sent as a ULP command looks like this */ @@ -236,42 +442,10 @@ mk_rx_data_ack_ulp(struct ulp_txpkt *ulpmc, struct toepcb *toep) return (ulpsc); } -static inline uint64_t -select_ddp_flags(struct socket *so, int flags, int db_idx) -{ - uint64_t ddp_flags = V_TF_DDP_INDICATE_OUT(0); - int waitall = flags & MSG_WAITALL; - int nb = so->so_state & SS_NBIO || flags & (MSG_DONTWAIT | MSG_NBIO); - - KASSERT(db_idx == 0 || db_idx == 1, - ("%s: bad DDP buffer index %d", __func__, db_idx)); - - if (db_idx == 0) { - ddp_flags |= V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_ACTIVE_BUF(0); - if (waitall) - ddp_flags |= V_TF_DDP_PUSH_DISABLE_0(1); - else if (nb) - ddp_flags |= V_TF_DDP_BUF0_FLUSH(1); - else - ddp_flags |= V_TF_DDP_BUF0_FLUSH(0); - } else { - ddp_flags |= V_TF_DDP_BUF1_VALID(1) | V_TF_DDP_ACTIVE_BUF(1); - if (waitall) - ddp_flags |= V_TF_DDP_PUSH_DISABLE_1(1); - else if (nb) - ddp_flags |= V_TF_DDP_BUF1_FLUSH(1); - else - ddp_flags |= V_TF_DDP_BUF1_FLUSH(0); - } - - return (ddp_flags); -} - static struct wrqe * mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, - int offset, uint64_t ddp_flags) + struct pageset *ps, int offset, uint64_t ddp_flags, uint64_t ddp_flags_mask) { - struct ddp_buffer *db = toep->db[db_idx]; struct wrqe *wr; struct work_request_hdr *wrh; struct ulp_txpkt *ulpmc; @@ -302,7 +476,7 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_BUF0_TAG + db_idx, V_TCB_RX_DDP_BUF0_TAG(M_TCB_RX_DDP_BUF0_TAG), - V_TCB_RX_DDP_BUF0_TAG(db->tag)); + V_TCB_RX_DDP_BUF0_TAG(ps->tag)); /* Update the current offset in the DDP buffer and its total length */ if (db_idx == 0) @@ -311,21 +485,18 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, V_TCB_RX_DDP_BUF0_OFFSET(M_TCB_RX_DDP_BUF0_OFFSET) | V_TCB_RX_DDP_BUF0_LEN(M_TCB_RX_DDP_BUF0_LEN), V_TCB_RX_DDP_BUF0_OFFSET(offset) | - V_TCB_RX_DDP_BUF0_LEN(db->len)); + V_TCB_RX_DDP_BUF0_LEN(ps->len)); else ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_BUF1_OFFSET, V_TCB_RX_DDP_BUF1_OFFSET(M_TCB_RX_DDP_BUF1_OFFSET) | V_TCB_RX_DDP_BUF1_LEN((u64)M_TCB_RX_DDP_BUF1_LEN << 32), V_TCB_RX_DDP_BUF1_OFFSET(offset) | - V_TCB_RX_DDP_BUF1_LEN((u64)db->len << 32)); + V_TCB_RX_DDP_BUF1_LEN((u64)ps->len << 32)); /* Update DDP flags */ ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_FLAGS, - V_TF_DDP_BUF0_FLUSH(1) | V_TF_DDP_BUF1_FLUSH(1) | - V_TF_DDP_PUSH_DISABLE_0(1) | V_TF_DDP_PUSH_DISABLE_1(1) | - V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_BUF1_VALID(1) | - V_TF_DDP_ACTIVE_BUF(1) | V_TF_DDP_INDICATE_OUT(1), ddp_flags); + ddp_flags_mask, ddp_flags); /* Gratuitous RX_DATA_ACK with RX_MODULATE set to speed up delivery. */ ulpmc = mk_rx_data_ack_ulp(ulpmc, toep); @@ -333,30 +504,20 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, return (wr); } -static void -discourage_ddp(struct toepcb *toep) -{ - - if (toep->ddp_score && --toep->ddp_score == 0) { - toep->ddp_flags &= ~DDP_OK; - toep->ddp_disabled = time_uptime; - CTR3(KTR_CXGBE, "%s: tid %u !DDP_OK @ %u", - __func__, toep->tid, time_uptime); - } -} - static int handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) { uint32_t report = be32toh(ddp_report); - unsigned int db_flag; + unsigned int db_idx; struct inpcb *inp = toep->inp; + struct ddp_buffer *db; struct tcpcb *tp; struct socket *so; struct sockbuf *sb; - struct mbuf *m; + struct kaiocb *job; + long copied; - db_flag = report & F_DDP_BUF_IDX ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + db_idx = report & F_DDP_BUF_IDX ? 1 : 0; if (__predict_false(!(report & F_DDP_INV))) CXGBE_UNIMPLEMENTED("DDP buffer still valid"); @@ -364,19 +525,24 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) INP_WLOCK(inp); so = inp_inpcbtosocket(inp); sb = &so->so_rcv; + DDP_LOCK(toep); + + KASSERT(toep->ddp_active_id == db_idx, + ("completed DDP buffer (%d) != active_id (%d) for tid %d", db_idx, + toep->ddp_active_id, toep->tid)); + db = &toep->db[db_idx]; + job = db->job; + if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT))) { - /* - * XXX: think a bit more. - * tcpcb probably gone, but socket should still be around - * because we always wait for DDP completion in soreceive no - * matter what. Just wake it up and let it clean up. + * This can happen due to an administrative tcpdrop(8). + * Just fail the request with ECONNRESET. */ - CTR5(KTR_CXGBE, "%s: tid %u, seq 0x%x, len %d, inp_flags 0x%x", __func__, toep->tid, be32toh(rcv_nxt), len, inp->inp_flags); - SOCKBUF_LOCK(sb); - goto wakeup; + if (aio_clear_cancel_function(job)) + ddp_complete_one(job, ECONNRESET); + goto completed; } tp = intotcpcb(inp); @@ -386,7 +552,7 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) * sequence number of the next byte to receive. The length of * the data received for this message must be computed by * comparing the new and old values of rcv_nxt. - * + * * For RX_DATA_DDP, len might be non-zero, but it is only the * length of the most recent DMA. It does not include the * total length of the data received since the previous update @@ -400,15 +566,14 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) KASSERT(tp->rcv_wnd >= len, ("%s: negative window size", __func__)); tp->rcv_wnd -= len; #endif - m = get_ddp_mbuf(len); - - SOCKBUF_LOCK(sb); - if (report & F_DDP_BUF_COMPLETE) - toep->ddp_score = DDP_HIGH_SCORE; - else - discourage_ddp(toep); +#ifdef VERBOSE_TRACES + CTR4(KTR_CXGBE, "%s: DDP[%d] placed %d bytes (%#x)", __func__, db_idx, + len, report); +#endif /* receive buffer autosize */ + CURVNET_SET(so->so_vnet); + SOCKBUF_LOCK(sb); if (sb->sb_flags & SB_AUTOSIZE && V_tcp_do_autorcvbuf && sb->sb_hiwat < V_tcp_autorcvbuf_max && @@ -422,57 +587,185 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) else toep->rx_credits += newsize - hiwat; } + SOCKBUF_UNLOCK(sb); + CURVNET_RESTORE(); - KASSERT(toep->sb_cc >= sbused(sb), - ("%s: sb %p has more data (%d) than last time (%d).", - __func__, sb, sbused(sb), toep->sb_cc)); - toep->rx_credits += toep->sb_cc - sbused(sb); -#ifdef USE_DDP_RX_FLOW_CONTROL - toep->rx_credits -= len; /* adjust for F_RX_FC_DDP */ +#ifndef USE_DDP_RX_FLOW_CONTROL + toep->rx_credits += len; #endif - sbappendstream_locked(sb, m, 0); - toep->sb_cc = sbused(sb); -wakeup: - KASSERT(toep->ddp_flags & db_flag, - ("%s: DDP buffer not active. toep %p, ddp_flags 0x%x, report 0x%x", - __func__, toep, toep->ddp_flags, report)); - toep->ddp_flags &= ~db_flag; - sorwakeup_locked(so); - SOCKBUF_UNLOCK_ASSERT(sb); + if (db->cancel_pending) { + /* + * Update the job's length but defer completion to the + * TCB_RPL callback. + */ + job->uaiocb._aiocb_private.status += len; + goto out; + } else if (!aio_clear_cancel_function(job)) { + /* + * Update the copied length for when + * t4_aio_cancel_active() completes this request. + */ + job->uaiocb._aiocb_private.status += len; + } else { + copied = job->uaiocb._aiocb_private.status; +#ifdef VERBOSE_TRACES + CTR4(KTR_CXGBE, "%s: completing %p (copied %ld, placed %d)", + __func__, job, copied, len); +#endif + aio_complete(job, copied + len, 0); + t4_rcvd(&toep->td->tod, tp); + } + +completed: + complete_ddp_buffer(toep, db, db_idx); + if (toep->ddp_waiting_count > 0) + ddp_queue_toep(toep); +out: + DDP_UNLOCK(toep); INP_WUNLOCK(inp); + return (0); } void -handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, struct sockbuf *sb, - __be32 rcv_nxt) +handle_ddp_indicate(struct toepcb *toep) { - struct mbuf *m; - int len; - SOCKBUF_LOCK_ASSERT(sb); + DDP_ASSERT_LOCKED(toep); + MPASS(toep->ddp_active_count == 0); + MPASS((toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0); + if (toep->ddp_waiting_count == 0) { + /* + * The pending requests that triggered the request for an + * an indicate were cancelled. Those cancels should have + * already disabled DDP. Just ignore this as the data is + * going into the socket buffer anyway. + */ + return; + } + CTR3(KTR_CXGBE, "%s: tid %d indicated (%d waiting)", __func__, + toep->tid, toep->ddp_waiting_count); + ddp_queue_toep(toep); +} + +enum { + DDP_BUF0_INVALIDATED = 0x2, + DDP_BUF1_INVALIDATED +}; + +void +handle_ddp_tcb_rpl(struct toepcb *toep, const struct cpl_set_tcb_rpl *cpl) +{ + unsigned int db_idx; + struct inpcb *inp = toep->inp; + struct ddp_buffer *db; + struct kaiocb *job; + long copied; + + if (cpl->status != CPL_ERR_NONE) + panic("XXX: tcp_rpl failed: %d", cpl->status); + + switch (cpl->cookie) { + case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(DDP_BUF0_INVALIDATED): + case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(DDP_BUF1_INVALIDATED): + /* + * XXX: This duplicates a lot of code with handle_ddp_data(). + */ + db_idx = G_COOKIE(cpl->cookie) - DDP_BUF0_INVALIDATED; + INP_WLOCK(inp); + DDP_LOCK(toep); + db = &toep->db[db_idx]; + + /* + * handle_ddp_data() should leave the job around until + * this callback runs once a cancel is pending. + */ + MPASS(db != NULL); + MPASS(db->job != NULL); + MPASS(db->cancel_pending); + + /* + * XXX: It's not clear what happens if there is data + * placed when the buffer is invalidated. I suspect we + * need to read the TCB to see how much data was placed. + * + * For now this just pretends like nothing was placed. + * + * XXX: Note that if we did check the PCB we would need to + * also take care of updating the tp, etc. + */ + job = db->job; + copied = job->uaiocb._aiocb_private.status; + if (copied == 0) { + CTR2(KTR_CXGBE, "%s: cancelling %p", __func__, job); + aio_cancel(job); + } else { + CTR3(KTR_CXGBE, "%s: completing %p (copied %ld)", + __func__, job, copied); + aio_complete(job, copied, 0); + t4_rcvd(&toep->td->tod, intotcpcb(inp)); + } + + complete_ddp_buffer(toep, db, db_idx); + if (toep->ddp_waiting_count > 0) + ddp_queue_toep(toep); + DDP_UNLOCK(toep); + INP_WUNLOCK(inp); + break; + default: + panic("XXX: unknown tcb_rpl offset %#x, cookie %#x", + G_WORD(cpl->cookie), G_COOKIE(cpl->cookie)); + } +} + +void +handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt) +{ + struct ddp_buffer *db; + struct kaiocb *job; + long copied; + unsigned int db_flag, db_idx; + int len, placed; + INP_WLOCK_ASSERT(toep->inp); + DDP_ASSERT_LOCKED(toep); len = be32toh(rcv_nxt) - tp->rcv_nxt; - /* Signal handle_ddp() to break out of its sleep loop. */ - toep->ddp_flags &= ~(DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE); - if (len == 0) - return; - tp->rcv_nxt += len; - KASSERT(toep->sb_cc >= sbused(sb), - ("%s: sb %p has more data (%d) than last time (%d).", - __func__, sb, sbused(sb), toep->sb_cc)); - toep->rx_credits += toep->sb_cc - sbused(sb); -#ifdef USE_DDP_RX_FLOW_CONTROL - toep->rx_credits -= len; /* adjust for F_RX_FC_DDP */ +#ifndef USE_DDP_RX_FLOW_CONTROL + toep->rx_credits += len; #endif - m = get_ddp_mbuf(len); + while (toep->ddp_active_count > 0) { + MPASS(toep->ddp_active_id != -1); + db_idx = toep->ddp_active_id; + db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + MPASS((toep->ddp_flags & db_flag) != 0); + db = &toep->db[db_idx]; + job = db->job; + copied = job->uaiocb._aiocb_private.status; + placed = len; + if (placed > job->uaiocb.aio_nbytes - copied) + placed = job->uaiocb.aio_nbytes - copied; + if (!aio_clear_cancel_function(job)) { + /* + * Update the copied length for when + * t4_aio_cancel_active() completes this + * request. + */ + job->uaiocb._aiocb_private.status += placed; + } else { + CTR4(KTR_CXGBE, "%s: tid %d completed buf %d len %d", + __func__, toep->tid, db_idx, placed); + aio_complete(job, copied + placed, 0); + } + len -= placed; + complete_ddp_buffer(toep, db, db_idx); + } - sbappendstream_locked(sb, m, 0); - toep->sb_cc = sbused(sb); + MPASS(len == 0); + ddp_complete_all(toep, 0); } #define DDP_ERR (F_DDP_PPOD_MISMATCH | F_DDP_LLIMIT_ERR | F_DDP_ULIMIT_ERR |\ @@ -529,7 +822,7 @@ do_rx_ddp_complete(struct sge_iq *iq, const struct rss_header *rss, return (0); } -void +static void enable_ddp(struct adapter *sc, struct toepcb *toep) { @@ -540,6 +833,7 @@ enable_ddp(struct adapter *sc, struct toepcb *toep) CTR3(KTR_CXGBE, "%s: tid %u (time %u)", __func__, toep->tid, time_uptime); + DDP_ASSERT_LOCKED(toep); toep->ddp_flags |= DDP_SC_REQ; t4_set_tcb_field(sc, toep, 1, W_TCB_RX_DDP_FLAGS, V_TF_DDP_OFF(1) | V_TF_DDP_INDICATE_OUT(1) | @@ -550,81 +844,6 @@ enable_ddp(struct adapter *sc, struct toepcb *toep) V_TF_RCV_COALESCE_ENABLE(1), 0); } -static inline void -disable_ddp(struct adapter *sc, struct toepcb *toep) -{ - - KASSERT((toep->ddp_flags & (DDP_ON | DDP_SC_REQ)) == DDP_ON, - ("%s: toep %p has bad ddp_flags 0x%x", - __func__, toep, toep->ddp_flags)); - - CTR3(KTR_CXGBE, "%s: tid %u (time %u)", - __func__, toep->tid, time_uptime); - - toep->ddp_flags |= DDP_SC_REQ; - t4_set_tcb_field(sc, toep, 1, W_TCB_T_FLAGS, - V_TF_RCV_COALESCE_ENABLE(1), V_TF_RCV_COALESCE_ENABLE(1)); - t4_set_tcb_field(sc, toep, 1, W_TCB_RX_DDP_FLAGS, V_TF_DDP_OFF(1), - V_TF_DDP_OFF(1)); -} - -static int -hold_uio(struct uio *uio, vm_page_t **ppages, int *pnpages) -{ - struct vm_map *map; - struct iovec *iov; - vm_offset_t start, end; - vm_page_t *pp; - int n; - - KASSERT(uio->uio_iovcnt == 1, - ("%s: uio_iovcnt %d", __func__, uio->uio_iovcnt)); - KASSERT(uio->uio_td->td_proc == curproc, - ("%s: uio proc (%p) is not curproc (%p)", - __func__, uio->uio_td->td_proc, curproc)); - - map = &curproc->p_vmspace->vm_map; - iov = &uio->uio_iov[0]; - start = trunc_page((uintptr_t)iov->iov_base); - end = round_page((vm_offset_t)iov->iov_base + iov->iov_len); - n = howmany(end - start, PAGE_SIZE); - - if (end - start > MAX_DDP_BUFFER_SIZE) - return (E2BIG); - - pp = malloc(n * sizeof(vm_page_t), M_CXGBE, M_NOWAIT); - if (pp == NULL) - return (ENOMEM); - - if (vm_fault_quick_hold_pages(map, (vm_offset_t)iov->iov_base, - iov->iov_len, VM_PROT_WRITE, pp, n) < 0) { - free(pp, M_CXGBE); - return (EFAULT); - } - - *ppages = pp; - *pnpages = n; - - return (0); -} - -static int -bufcmp(struct ddp_buffer *db, vm_page_t *pages, int npages, int offset, int len) -{ - int i; - - if (db == NULL || db->npages != npages || db->offset != offset || - db->len != len) - return (1); - - for (i = 0; i < npages; i++) { - if (pages[i]->phys_addr != db->pages[i]->phys_addr) - return (1); - } - - return (0); -} - static int calculate_hcf(int n1, int n2) { @@ -647,12 +866,13 @@ calculate_hcf(int n1, int n2) return (b); } -static struct ddp_buffer * -alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, - int len) +static int +alloc_page_pods(struct tom_data *td, struct pageset *ps) { int i, hcf, seglen, idx, ppod, nppods; - struct ddp_buffer *db; + u_int ppod_addr; + + KASSERT(ps->nppods == 0, ("%s: page pods already allocated", __func__)); /* * The DDP page size is unrelated to the VM page size. We combine @@ -662,10 +882,11 @@ alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, * the page list. */ hcf = 0; - for (i = 0; i < npages; i++) { + for (i = 0; i < ps->npages; i++) { seglen = PAGE_SIZE; - while (i < npages - 1 && - pages[i]->phys_addr + PAGE_SIZE == pages[i + 1]->phys_addr) { + while (i < ps->npages - 1 && + ps->pages[i]->phys_addr + PAGE_SIZE == + ps->pages[i + 1]->phys_addr) { seglen += PAGE_SIZE; i++; } @@ -683,7 +904,7 @@ alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, ("%s: PAGE_SIZE %d, hcf %d", __func__, PAGE_SIZE, hcf)); CTR3(KTR_CXGBE, "%s: PAGE_SIZE %d, hcf %d", __func__, PAGE_SIZE, hcf); - return (NULL); + return (0); } for (idx = nitems(t4_ddp_pgsz) - 1; idx > 0; idx--) { @@ -693,40 +914,29 @@ alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, have_pgsz: MPASS(idx <= M_PPOD_PGSZ); - db = malloc(sizeof(*db), M_CXGBE, M_NOWAIT); - if (db == NULL) { - CTR1(KTR_CXGBE, "%s: malloc failed.", __func__); - return (NULL); + nppods = pages_to_nppods(ps->npages, t4_ddp_pgsz[idx]); + if (alloc_ppods(td, nppods, &ppod_addr) != 0) { + CTR4(KTR_CXGBE, "%s: no pods, nppods %d, npages %d, pgsz %d", + __func__, nppods, ps->npages, t4_ddp_pgsz[idx]); + return (0); } - nppods = pages_to_nppods(npages, t4_ddp_pgsz[idx]); - if (alloc_ppods(td, nppods, &db->ppod_addr) != 0) { - free(db, M_CXGBE); - CTR4(KTR_CXGBE, "%s: no pods, nppods %d, resid %d, pgsz %d", - __func__, nppods, len, t4_ddp_pgsz[idx]); - return (NULL); - } - ppod = (db->ppod_addr - td->ppod_start) / PPOD_SIZE; + ppod = (ppod_addr - td->ppod_start) / PPOD_SIZE; + ps->tag = V_PPOD_PGSZ(idx) | V_PPOD_TAG(ppod); + ps->ppod_addr = ppod_addr; + ps->nppods = nppods; - db->tag = V_PPOD_PGSZ(idx) | V_PPOD_TAG(ppod); - db->nppods = nppods; - db->npages = npages; - db->pages = pages; - db->offset = offset; - db->len = len; + CTR5(KTR_CXGBE, "New page pods. " + "ps %p, ddp_pgsz %d, ppod 0x%x, npages %d, nppods %d", + ps, t4_ddp_pgsz[idx], ppod, ps->npages, ps->nppods); - CTR6(KTR_CXGBE, "New DDP buffer. " - "ddp_pgsz %d, ppod 0x%x, npages %d, nppods %d, offset %d, len %d", - t4_ddp_pgsz[idx], ppod, db->npages, db->nppods, db->offset, - db->len); - - return (db); + return (1); } #define NUM_ULP_TX_SC_IMM_PPODS (256 / PPOD_SIZE) static int -write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) +write_page_pods(struct adapter *sc, struct toepcb *toep, struct pageset *ps) { struct wrqe *wr; struct ulp_mem_io *ulpmc; @@ -736,17 +946,20 @@ write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) u_int ppod_addr; uint32_t cmd; + KASSERT(!(ps->flags & PS_PPODS_WRITTEN), + ("%s: page pods already written", __func__)); + cmd = htobe32(V_ULPTX_CMD(ULP_TX_MEM_WRITE)); if (is_t4(sc)) cmd |= htobe32(F_ULP_MEMIO_ORDER); else cmd |= htobe32(F_T5_ULP_MEMIO_IMM); - ddp_pgsz = t4_ddp_pgsz[G_PPOD_PGSZ(db->tag)]; - ppod_addr = db->ppod_addr; - for (i = 0; i < db->nppods; ppod_addr += chunk) { + ddp_pgsz = t4_ddp_pgsz[G_PPOD_PGSZ(ps->tag)]; + ppod_addr = ps->ppod_addr; + for (i = 0; i < ps->nppods; ppod_addr += chunk) { /* How many page pods are we writing in this cycle */ - n = min(db->nppods - i, NUM_ULP_TX_SC_IMM_PPODS); + n = min(ps->nppods - i, NUM_ULP_TX_SC_IMM_PPODS); chunk = PPOD_SZ(n); len = roundup2(sizeof(*ulpmc) + sizeof(*ulpsc) + chunk, 16); @@ -768,15 +981,15 @@ write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) ppod = (struct pagepod *)(ulpsc + 1); for (j = 0; j < n; i++, j++, ppod++) { ppod->vld_tid_pgsz_tag_color = htobe64(F_PPOD_VALID | - V_PPOD_TID(toep->tid) | db->tag); - ppod->len_offset = htobe64(V_PPOD_LEN(db->len) | - V_PPOD_OFST(db->offset)); + V_PPOD_TID(toep->tid) | ps->tag); + ppod->len_offset = htobe64(V_PPOD_LEN(ps->len) | + V_PPOD_OFST(ps->offset)); ppod->rsvd = 0; idx = i * PPOD_PAGES * (ddp_pgsz / PAGE_SIZE); for (k = 0; k < nitems(ppod->addr); k++) { - if (idx < db->npages) { + if (idx < ps->npages) { ppod->addr[k] = - htobe64(db->pages[idx]->phys_addr); + htobe64(ps->pages[idx]->phys_addr); idx += ddp_pgsz / PAGE_SIZE; } else ppod->addr[k] = 0; @@ -792,184 +1005,49 @@ write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) t4_wrq_tx(sc, wr); } + ps->flags |= PS_PPODS_WRITTEN; return (0); } -/* - * Reuse, or allocate (and program the page pods for) a new DDP buffer. The - * "pages" array is handed over to this function and should not be used in any - * way by the caller after that. - */ -static int -select_ddp_buffer(struct adapter *sc, struct toepcb *toep, vm_page_t *pages, - int npages, int db_off, int db_len) -{ - struct ddp_buffer *db; - struct tom_data *td = sc->tom_softc; - int i, empty_slot = -1; - - /* Try to reuse */ - for (i = 0; i < nitems(toep->db); i++) { - if (bufcmp(toep->db[i], pages, npages, db_off, db_len) == 0) { - free(pages, M_CXGBE); - return (i); /* pages still held */ - } else if (toep->db[i] == NULL && empty_slot < 0) - empty_slot = i; - } - - /* Allocate new buffer, write its page pods. */ - db = alloc_ddp_buffer(td, pages, npages, db_off, db_len); - if (db == NULL) { - vm_page_unhold_pages(pages, npages); - free(pages, M_CXGBE); - return (-1); - } - if (write_page_pods(sc, toep, db) != 0) { - vm_page_unhold_pages(pages, npages); - free_ddp_buffer(td, db); - return (-1); - } - - i = empty_slot; - if (i < 0) { - i = arc4random() % nitems(toep->db); - free_ddp_buffer(td, toep->db[i]); - } - toep->db[i] = db; - - CTR5(KTR_CXGBE, "%s: tid %d, DDP buffer[%d] = %p (tag 0x%x)", - __func__, toep->tid, i, db, db->tag); - - return (i); -} - static void -wire_ddp_buffer(struct ddp_buffer *db) +wire_pageset(struct pageset *ps) { - int i; vm_page_t p; + int i; - for (i = 0; i < db->npages; i++) { - p = db->pages[i]; + KASSERT(!(ps->flags & PS_WIRED), ("pageset already wired")); + + for (i = 0; i < ps->npages; i++) { + p = ps->pages[i]; vm_page_lock(p); vm_page_wire(p); vm_page_unhold(p); vm_page_unlock(p); } + ps->flags |= PS_WIRED; } -static void -unwire_ddp_buffer(struct ddp_buffer *db) -{ - int i; - vm_page_t p; - - for (i = 0; i < db->npages; i++) { - p = db->pages[i]; - vm_page_lock(p); - vm_page_unwire(p, PQ_INACTIVE); - vm_page_unlock(p); - } -} - +/* + * Prepare a pageset for DDP. This wires the pageset and sets up page + * pods. + */ static int -handle_ddp(struct socket *so, struct uio *uio, int flags, int error) +prep_pageset(struct adapter *sc, struct toepcb *toep, struct pageset *ps) { - struct sockbuf *sb = &so->so_rcv; - struct tcpcb *tp = so_sototcpcb(so); - struct toepcb *toep = tp->t_toe; - struct adapter *sc = td_adapter(toep->td); - vm_page_t *pages; - int npages, db_idx, rc, buf_flag; - struct ddp_buffer *db; - struct wrqe *wr; - uint64_t ddp_flags; + struct tom_data *td = sc->tom_softc; - SOCKBUF_LOCK_ASSERT(sb); - -#if 0 - if (sbused(sb) + sc->tt.ddp_thres > uio->uio_resid) { - CTR4(KTR_CXGBE, "%s: sb_cc %d, threshold %d, resid %d", - __func__, sbused(sb), sc->tt.ddp_thres, uio->uio_resid); + if (!(ps->flags & PS_WIRED)) + wire_pageset(ps); + if (ps->nppods == 0 && !alloc_page_pods(td, ps)) { + return (0); } -#endif - - /* XXX: too eager to disable DDP, could handle NBIO better than this. */ - if (sbused(sb) >= uio->uio_resid || uio->uio_resid < sc->tt.ddp_thres || - uio->uio_resid > MAX_DDP_BUFFER_SIZE || uio->uio_iovcnt > 1 || - so->so_state & SS_NBIO || flags & (MSG_DONTWAIT | MSG_NBIO) || - error || so->so_error || sb->sb_state & SBS_CANTRCVMORE) - goto no_ddp; - - /* - * Fault in and then hold the pages of the uio buffers. We'll wire them - * a bit later if everything else works out. - */ - SOCKBUF_UNLOCK(sb); - if (hold_uio(uio, &pages, &npages) != 0) { - SOCKBUF_LOCK(sb); - goto no_ddp; - } - SOCKBUF_LOCK(sb); - if (__predict_false(so->so_error || sb->sb_state & SBS_CANTRCVMORE)) { - vm_page_unhold_pages(pages, npages); - free(pages, M_CXGBE); - goto no_ddp; + if (!(ps->flags & PS_PPODS_WRITTEN) && + write_page_pods(sc, toep, ps) != 0) { + return (0); } - /* - * Figure out which one of the two DDP buffers to use this time. - */ - db_idx = select_ddp_buffer(sc, toep, pages, npages, - (uintptr_t)uio->uio_iov->iov_base & PAGE_MASK, uio->uio_resid); - pages = NULL; /* handed off to select_ddp_buffer */ - if (db_idx < 0) - goto no_ddp; - db = toep->db[db_idx]; - buf_flag = db_idx == 0 ? DDP_BUF0_ACTIVE : DDP_BUF1_ACTIVE; - - /* - * Build the compound work request that tells the chip where to DMA the - * payload. - */ - ddp_flags = select_ddp_flags(so, flags, db_idx); - wr = mk_update_tcb_for_ddp(sc, toep, db_idx, sbused(sb), ddp_flags); - if (wr == NULL) { - /* - * Just unhold the pages. The DDP buffer's software state is - * left as-is in the toep. The page pods were written - * successfully and we may have an opportunity to use it in the - * future. - */ - vm_page_unhold_pages(db->pages, db->npages); - goto no_ddp; - } - - /* Wire (and then unhold) the pages, and give the chip the go-ahead. */ - wire_ddp_buffer(db); - t4_wrq_tx(sc, wr); - sb->sb_flags &= ~SB_DDP_INDICATE; - toep->ddp_flags |= buf_flag; - - /* - * Wait for the DDP operation to complete and then unwire the pages. - * The return code from the sbwait will be the final return code of this - * function. But we do need to wait for DDP no matter what. - */ - rc = sbwait(sb); - while (toep->ddp_flags & buf_flag) { - /* XXXGL: shouldn't here be sbwait() call? */ - sb->sb_flags |= SB_WAIT; - msleep(&sb->sb_acc, &sb->sb_mtx, PSOCK , "sbwait", 0); - } - unwire_ddp_buffer(db); - return (rc); -no_ddp: - disable_ddp(sc, toep); - discourage_ddp(toep); - sb->sb_flags &= ~SB_DDP_INDICATE; - return (0); + return (1); } void @@ -994,287 +1072,689 @@ t4_uninit_ddp(struct adapter *sc __unused, struct tom_data *td) } } -#define VNET_SO_ASSERT(so) \ - VNET_ASSERT(curvnet != NULL, \ - ("%s:%d curvnet is NULL, so=%p", __func__, __LINE__, (so))); -#define SBLOCKWAIT(f) (((f) & MSG_DONTWAIT) ? 0 : SBL_WAIT) static int -soreceive_rcvoob(struct socket *so, struct uio *uio, int flags) +pscmp(struct pageset *ps, struct vmspace *vm, vm_offset_t start, int npages, + int pgoff, int len) { - CXGBE_UNIMPLEMENTED(__func__); + if (ps->npages != npages || ps->offset != pgoff || ps->len != len) + return (1); + + return (ps->vm != vm || ps->vm_timestamp != vm->vm_map.timestamp); } -static char ddp_magic_str[] = "nothing to see here"; - -static struct mbuf * -get_ddp_mbuf(int len) -{ - struct mbuf *m; - - m = m_get(M_NOWAIT, MT_DATA); - if (m == NULL) - CXGBE_UNIMPLEMENTED("mbuf alloc failure"); - m->m_len = len; - m->m_data = &ddp_magic_str[0]; - - return (m); -} - -static inline int -is_ddp_mbuf(struct mbuf *m) -{ - - return (m->m_data == &ddp_magic_str[0]); -} - -/* - * Copy an mbuf chain into a uio limited by len if set. - */ static int -m_mbuftouio_ddp(struct uio *uio, struct mbuf *m, int len) +hold_aio(struct toepcb *toep, struct kaiocb *job, struct pageset **pps) { - int error, length, total; - int progress = 0; + struct vmspace *vm; + vm_map_t map; + vm_offset_t start, end, pgoff; + struct pageset *ps; + int n; - if (len > 0) - total = min(uio->uio_resid, len); - else - total = uio->uio_resid; + DDP_ASSERT_LOCKED(toep); - /* Fill the uio with data from the mbufs. */ - for (; m != NULL; m = m->m_next) { - length = min(m->m_len, total - progress); + /* + * The AIO subsystem will cancel and drain all requests before + * permitting a process to exit or exec, so p_vmspace should + * be stable here. + */ + vm = job->userproc->p_vmspace; + map = &vm->vm_map; + start = (uintptr_t)job->uaiocb.aio_buf; + pgoff = start & PAGE_MASK; + end = round_page(start + job->uaiocb.aio_nbytes); + start = trunc_page(start); - if (is_ddp_mbuf(m)) { - enum uio_seg segflag = uio->uio_segflg; - - uio->uio_segflg = UIO_NOCOPY; - error = uiomove(mtod(m, void *), length, uio); - uio->uio_segflg = segflag; - } else - error = uiomove(mtod(m, void *), length, uio); - if (error) - return (error); - - progress += length; + if (end - start > MAX_DDP_BUFFER_SIZE) { + /* + * Truncate the request to a short read. + * Alternatively, we could DDP in chunks to the larger + * buffer, but that would be quite a bit more work. + * + * When truncating, round the request down to avoid + * crossing a cache line on the final transaction. + */ + end = rounddown2(start + MAX_DDP_BUFFER_SIZE, CACHE_LINE_SIZE); +#ifdef VERBOSE_TRACES + CTR4(KTR_CXGBE, "%s: tid %d, truncating size from %lu to %lu", + __func__, toep->tid, (unsigned long)job->uaiocb.aio_nbytes, + (unsigned long)(end - (start + pgoff))); + job->uaiocb.aio_nbytes = end - (start + pgoff); +#endif + end = round_page(end); } + n = atop(end - start); + + /* + * Try to reuse a cached pageset. + */ + TAILQ_FOREACH(ps, &toep->ddp_cached_pagesets, link) { + if (pscmp(ps, vm, start, n, pgoff, + job->uaiocb.aio_nbytes) == 0) { + TAILQ_REMOVE(&toep->ddp_cached_pagesets, ps, link); + toep->ddp_cached_count--; + *pps = ps; + return (0); + } + } + + /* + * If there are too many cached pagesets to create a new one, + * free a pageset before creating a new one. + */ + KASSERT(toep->ddp_active_count + toep->ddp_cached_count <= + nitems(toep->db), ("%s: too many wired pagesets", __func__)); + if (toep->ddp_active_count + toep->ddp_cached_count == + nitems(toep->db)) { + KASSERT(toep->ddp_cached_count > 0, + ("no cached pageset to free")); + ps = TAILQ_LAST(&toep->ddp_cached_pagesets, pagesetq); + TAILQ_REMOVE(&toep->ddp_cached_pagesets, ps, link); + toep->ddp_cached_count--; + free_pageset(toep->td, ps); + } + DDP_UNLOCK(toep); + + /* Create a new pageset. */ + ps = malloc(sizeof(*ps) + n * sizeof(vm_page_t), M_CXGBE, M_WAITOK | + M_ZERO); + ps->pages = (vm_page_t *)(ps + 1); + ps->vm_timestamp = map->timestamp; + ps->npages = vm_fault_quick_hold_pages(map, start, end - start, + VM_PROT_WRITE, ps->pages, n); + + DDP_LOCK(toep); + if (ps->npages < 0) { + free(ps, M_CXGBE); + return (EFAULT); + } + + KASSERT(ps->npages == n, ("hold_aio: page count mismatch: %d vs %d", + ps->npages, n)); + + ps->offset = pgoff; + ps->len = job->uaiocb.aio_nbytes; + atomic_add_int(&vm->vm_refcnt, 1); + ps->vm = vm; + + CTR5(KTR_CXGBE, "%s: tid %d, new pageset %p for job %p, npages %d", + __func__, toep->tid, ps, job, ps->npages); + *pps = ps; return (0); } -/* - * Based on soreceive_stream() in uipc_socket.c - */ -int -t4_soreceive_ddp(struct socket *so, struct sockaddr **psa, struct uio *uio, - struct mbuf **mp0, struct mbuf **controlp, int *flagsp) +static void +ddp_complete_all(struct toepcb *toep, int error) { - int len = 0, error = 0, flags, oresid, ddp_handled = 0; - struct sockbuf *sb; - struct mbuf *m, *n = NULL; + struct kaiocb *job; - /* We only do stream sockets. */ - if (so->so_type != SOCK_STREAM) - return (EINVAL); - if (psa != NULL) - *psa = NULL; - if (controlp != NULL) - return (EINVAL); - if (flagsp != NULL) - flags = *flagsp &~ MSG_EOR; - else - flags = 0; - if (flags & MSG_OOB) - return (soreceive_rcvoob(so, uio, flags)); - if (mp0 != NULL) - *mp0 = NULL; - - sb = &so->so_rcv; - - /* Prevent other readers from entering the socket. */ - error = sblock(sb, SBLOCKWAIT(flags)); - SOCKBUF_LOCK(sb); - if (error) - goto out; - - /* Easy one, no space to copyout anything. */ - if (uio->uio_resid == 0) { - error = EINVAL; - goto out; + DDP_ASSERT_LOCKED(toep); + while (!TAILQ_EMPTY(&toep->ddp_aiojobq)) { + job = TAILQ_FIRST(&toep->ddp_aiojobq); + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count--; + if (aio_clear_cancel_function(job)) + ddp_complete_one(job, error); } - oresid = uio->uio_resid; - - /* We will never ever get anything unless we are or were connected. */ - if (!(so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED))) { - error = ENOTCONN; - goto out; - } - -restart: - SOCKBUF_LOCK_ASSERT(&so->so_rcv); - - if (sb->sb_flags & SB_DDP_INDICATE && !ddp_handled) { - - /* uio should be just as it was at entry */ - KASSERT(oresid == uio->uio_resid, - ("%s: oresid = %d, uio_resid = %zd, sbavail = %d", - __func__, oresid, uio->uio_resid, sbavail(sb))); - - error = handle_ddp(so, uio, flags, 0); - ddp_handled = 1; - if (error) - goto out; - } - - /* Abort if socket has reported problems. */ - if (so->so_error) { - if (sbavail(sb)) - goto deliver; - if (oresid > uio->uio_resid) - goto out; - error = so->so_error; - if (!(flags & MSG_PEEK)) - so->so_error = 0; - goto out; - } - - /* Door is closed. Deliver what is left, if any. */ - if (sb->sb_state & SBS_CANTRCVMORE) { - if (sbavail(sb)) - goto deliver; - else - goto out; - } - - /* Socket buffer is empty and we shall not block. */ - if (sbavail(sb) == 0 && - ((so->so_state & SS_NBIO) || (flags & (MSG_DONTWAIT|MSG_NBIO)))) { - error = EAGAIN; - goto out; - } - - /* Socket buffer got some data that we shall deliver now. */ - if (sbavail(sb) > 0 && !(flags & MSG_WAITALL) && - ((so->so_state & SS_NBIO) || - (flags & (MSG_DONTWAIT|MSG_NBIO)) || - sbavail(sb) >= sb->sb_lowat || - sbavail(sb) >= uio->uio_resid || - sbavail(sb) >= sb->sb_hiwat) ) { - goto deliver; - } - - /* On MSG_WAITALL we must wait until all data or error arrives. */ - if ((flags & MSG_WAITALL) && - (sbavail(sb) >= uio->uio_resid || sbavail(sb) >= sb->sb_lowat)) - goto deliver; - - /* - * Wait and block until (more) data comes in. - * NB: Drops the sockbuf lock during wait. - */ - error = sbwait(sb); - if (error) { - if (sb->sb_flags & SB_DDP_INDICATE && !ddp_handled) { - (void) handle_ddp(so, uio, flags, 1); - ddp_handled = 1; - } - goto out; - } - goto restart; - -deliver: - SOCKBUF_LOCK_ASSERT(&so->so_rcv); - KASSERT(sbavail(sb) > 0, ("%s: sockbuf empty", __func__)); - KASSERT(sb->sb_mb != NULL, ("%s: sb_mb == NULL", __func__)); - - if (sb->sb_flags & SB_DDP_INDICATE && !ddp_handled) - goto restart; - - /* Statistics. */ - if (uio->uio_td) - uio->uio_td->td_ru.ru_msgrcv++; - - /* Fill uio until full or current end of socket buffer is reached. */ - len = min(uio->uio_resid, sbavail(sb)); - if (mp0 != NULL) { - /* Dequeue as many mbufs as possible. */ - if (!(flags & MSG_PEEK) && len >= sb->sb_mb->m_len) { - for (*mp0 = m = sb->sb_mb; - m != NULL && m->m_len <= len; - m = m->m_next) { - len -= m->m_len; - uio->uio_resid -= m->m_len; - sbfree(sb, m); - n = m; - } - sb->sb_mb = m; - if (sb->sb_mb == NULL) - SB_EMPTY_FIXUP(sb); - n->m_next = NULL; - } - /* Copy the remainder. */ - if (len > 0) { - KASSERT(sb->sb_mb != NULL, - ("%s: len > 0 && sb->sb_mb empty", __func__)); - - m = m_copym(sb->sb_mb, 0, len, M_NOWAIT); - if (m == NULL) - len = 0; /* Don't flush data from sockbuf. */ - else - uio->uio_resid -= m->m_len; - if (*mp0 != NULL) - n->m_next = m; - else - *mp0 = m; - if (*mp0 == NULL) { - error = ENOBUFS; - goto out; - } - } - } else { - /* NB: Must unlock socket buffer as uiomove may sleep. */ - SOCKBUF_UNLOCK(sb); - error = m_mbuftouio_ddp(uio, sb->sb_mb, len); - SOCKBUF_LOCK(sb); - if (error) - goto out; - } - SBLASTRECORDCHK(sb); - SBLASTMBUFCHK(sb); - - /* - * Remove the delivered data from the socket buffer unless we - * were only peeking. - */ - if (!(flags & MSG_PEEK)) { - if (len > 0) - sbdrop_locked(sb, len); - - /* Notify protocol that we drained some data. */ - if ((so->so_proto->pr_flags & PR_WANTRCVD) && - (((flags & MSG_WAITALL) && uio->uio_resid > 0) || - !(flags & MSG_SOCALLBCK))) { - SOCKBUF_UNLOCK(sb); - VNET_SO_ASSERT(so); - (*so->so_proto->pr_usrreqs->pru_rcvd)(so, flags); - SOCKBUF_LOCK(sb); - } - } - - /* - * For MSG_WAITALL we may have to loop again and wait for - * more data to come in. - */ - if ((flags & MSG_WAITALL) && uio->uio_resid > 0) - goto restart; -out: - SOCKBUF_LOCK_ASSERT(sb); - SBLASTRECORDCHK(sb); - SBLASTMBUFCHK(sb); - SOCKBUF_UNLOCK(sb); - sbunlock(sb); - return (error); } +static void +aio_ddp_cancel_one(struct kaiocb *job) +{ + long copied; + + /* + * If this job had copied data out of the socket buffer before + * it was cancelled, report it as a short read rather than an + * error. + */ + copied = job->uaiocb._aiocb_private.status; + if (copied != 0) + aio_complete(job, copied, 0); + else + aio_cancel(job); +} + +/* + * Called when the main loop wants to requeue a job to retry it later. + * Deals with the race of the job being cancelled while it was being + * examined. + */ +static void +aio_ddp_requeue_one(struct toepcb *toep, struct kaiocb *job) +{ + + DDP_ASSERT_LOCKED(toep); + if (!(toep->ddp_flags & DDP_DEAD) && + aio_set_cancel_function(job, t4_aio_cancel_queued)) { + TAILQ_INSERT_HEAD(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count++; + } else + aio_ddp_cancel_one(job); +} + +static void +aio_ddp_requeue(struct toepcb *toep) +{ + struct adapter *sc = td_adapter(toep->td); + struct socket *so; + struct sockbuf *sb; + struct inpcb *inp; + struct kaiocb *job; + struct ddp_buffer *db; + size_t copied, offset, resid; + struct pageset *ps; + struct mbuf *m; + uint64_t ddp_flags, ddp_flags_mask; + struct wrqe *wr; + int buf_flag, db_idx, error; + + DDP_ASSERT_LOCKED(toep); + +restart: + if (toep->ddp_flags & DDP_DEAD) { + MPASS(toep->ddp_waiting_count == 0); + MPASS(toep->ddp_active_count == 0); + return; + } + + if (toep->ddp_waiting_count == 0 || + toep->ddp_active_count == nitems(toep->db)) { + return; + } + + job = TAILQ_FIRST(&toep->ddp_aiojobq); + so = job->fd_file->f_data; + sb = &so->so_rcv; + SOCKBUF_LOCK(sb); + + /* We will never get anything unless we are or were connected. */ + if (!(so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED))) { + SOCKBUF_UNLOCK(sb); + ddp_complete_all(toep, ENOTCONN); + return; + } + + KASSERT(toep->ddp_active_count == 0 || sbavail(sb) == 0, + ("%s: pending sockbuf data and DDP is active", __func__)); + + /* Abort if socket has reported problems. */ + /* XXX: Wait for any queued DDP's to finish and/or flush them? */ + if (so->so_error && sbavail(sb) == 0) { + toep->ddp_waiting_count--; + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + if (!aio_clear_cancel_function(job)) { + SOCKBUF_UNLOCK(sb); + goto restart; + } + + /* + * If this job has previously copied some data, report + * a short read and leave the error to be reported by + * a future request. + */ + copied = job->uaiocb._aiocb_private.status; + if (copied != 0) { + SOCKBUF_UNLOCK(sb); + aio_complete(job, copied, 0); + goto restart; + } + error = so->so_error; + so->so_error = 0; + SOCKBUF_UNLOCK(sb); + aio_complete(job, -1, error); + goto restart; + } + + /* + * Door is closed. If there is pending data in the socket buffer, + * deliver it. If there are pending DDP requests, wait for those + * to complete. Once they have completed, return EOF reads. + */ + if (sb->sb_state & SBS_CANTRCVMORE && sbavail(sb) == 0) { + SOCKBUF_UNLOCK(sb); + if (toep->ddp_active_count != 0) + return; + ddp_complete_all(toep, 0); + return; + } + + /* + * If DDP is not enabled and there is no pending socket buffer + * data, try to enable DDP. + */ + if (sbavail(sb) == 0 && (toep->ddp_flags & DDP_ON) == 0) { + SOCKBUF_UNLOCK(sb); + + /* + * Wait for the card to ACK that DDP is enabled before + * queueing any buffers. Currently this waits for an + * indicate to arrive. This could use a TCB_SET_FIELD_RPL + * message to know that DDP was enabled instead of waiting + * for the indicate which would avoid copying the indicate + * if no data is pending. + * + * XXX: Might want to limit the indicate size to the size + * of the first queued request. + */ + if ((toep->ddp_flags & DDP_SC_REQ) == 0) + enable_ddp(sc, toep); + return; + } + SOCKBUF_UNLOCK(sb); + + /* + * If another thread is queueing a buffer for DDP, let it + * drain any work and return. + */ + if (toep->ddp_queueing != NULL) + return; + + /* Take the next job to prep it for DDP. */ + toep->ddp_waiting_count--; + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + if (!aio_clear_cancel_function(job)) + goto restart; + toep->ddp_queueing = job; + + /* NB: This drops DDP_LOCK while it holds the backing VM pages. */ + error = hold_aio(toep, job, &ps); + if (error != 0) { + ddp_complete_one(job, error); + toep->ddp_queueing = NULL; + goto restart; + } + + SOCKBUF_LOCK(sb); + if (so->so_error && sbavail(sb) == 0) { + copied = job->uaiocb._aiocb_private.status; + if (copied != 0) { + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_complete(job, copied, 0); + toep->ddp_queueing = NULL; + goto restart; + } + + error = so->so_error; + so->so_error = 0; + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_complete(job, -1, error); + toep->ddp_queueing = NULL; + goto restart; + } + + if (sb->sb_state & SBS_CANTRCVMORE && sbavail(sb) == 0) { + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + if (toep->ddp_active_count != 0) { + /* + * The door is closed, but there are still pending + * DDP buffers. Requeue. These jobs will all be + * completed once those buffers drain. + */ + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + return; + } + ddp_complete_one(job, 0); + ddp_complete_all(toep, 0); + toep->ddp_queueing = NULL; + return; + } + +sbcopy: + /* + * If the toep is dead, there shouldn't be any data in the socket + * buffer, so the above case should have handled this. + */ + MPASS(!(toep->ddp_flags & DDP_DEAD)); + + /* + * If there is pending data in the socket buffer (either + * from before the requests were queued or a DDP indicate), + * copy those mbufs out directly. + */ + copied = 0; + offset = ps->offset + job->uaiocb._aiocb_private.status; + MPASS(job->uaiocb._aiocb_private.status <= job->uaiocb.aio_nbytes); + resid = job->uaiocb.aio_nbytes - job->uaiocb._aiocb_private.status; + m = sb->sb_mb; + KASSERT(m == NULL || toep->ddp_active_count == 0, + ("%s: sockbuf data with active DDP", __func__)); + while (m != NULL && resid > 0) { + struct iovec iov[1]; + struct uio uio; + int error; + + iov[0].iov_base = mtod(m, void *); + iov[0].iov_len = m->m_len; + if (iov[0].iov_len > resid) + iov[0].iov_len = resid; + uio.uio_iov = iov; + uio.uio_iovcnt = 1; + uio.uio_offset = 0; + uio.uio_resid = iov[0].iov_len; + uio.uio_segflg = UIO_SYSSPACE; + uio.uio_rw = UIO_WRITE; + error = uiomove_fromphys(ps->pages, offset + copied, + uio.uio_resid, &uio); + MPASS(error == 0 && uio.uio_resid == 0); + copied += uio.uio_offset; + resid -= uio.uio_offset; + m = m->m_next; + } + if (copied != 0) { + sbdrop_locked(sb, copied); + job->uaiocb._aiocb_private.status += copied; + copied = job->uaiocb._aiocb_private.status; + inp = sotoinpcb(so); + if (!INP_TRY_WLOCK(inp)) { + /* + * The reference on the socket file descriptor in + * the AIO job should keep 'sb' and 'inp' stable. + * Our caller has a reference on the 'toep' that + * keeps it stable. + */ + SOCKBUF_UNLOCK(sb); + DDP_UNLOCK(toep); + INP_WLOCK(inp); + DDP_LOCK(toep); + SOCKBUF_LOCK(sb); + + /* + * If the socket has been closed, we should detect + * that and complete this request if needed on + * the next trip around the loop. + */ + } + t4_rcvd_locked(&toep->td->tod, intotcpcb(inp)); + INP_WUNLOCK(inp); + if (resid == 0 || toep->ddp_flags & DDP_DEAD) { + /* + * We filled the entire buffer with socket + * data, DDP is not being used, or the socket + * is being shut down, so complete the + * request. + */ + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_complete(job, copied, 0); + toep->ddp_queueing = NULL; + goto restart; + } + + /* + * If DDP is not enabled, requeue this request and restart. + * This will either enable DDP or wait for more data to + * arrive on the socket buffer. + */ + if ((toep->ddp_flags & (DDP_ON | DDP_SC_REQ)) != DDP_ON) { + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + goto restart; + } + + /* + * An indicate might have arrived and been added to + * the socket buffer while it was unlocked after the + * copy to lock the INP. If so, restart the copy. + */ + if (sbavail(sb) != 0) + goto sbcopy; + } + SOCKBUF_UNLOCK(sb); + + if (prep_pageset(sc, toep, ps) == 0) { + recycle_pageset(toep, ps); + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + + /* + * XXX: Need to retry this later. Mostly need a trigger + * when page pods are freed up. + */ + printf("%s: prep_pageset failed\n", __func__); + return; + } + + /* Determine which DDP buffer to use. */ + if (toep->db[0].job == NULL) { + db_idx = 0; + } else { + MPASS(toep->db[1].job == NULL); + db_idx = 1; + } + + ddp_flags = 0; + ddp_flags_mask = 0; + if (db_idx == 0) { + ddp_flags |= V_TF_DDP_BUF0_VALID(1); + if (so->so_state & SS_NBIO) + ddp_flags |= V_TF_DDP_BUF0_FLUSH(1); + ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE0(1) | + V_TF_DDP_PUSH_DISABLE_0(1) | V_TF_DDP_PSHF_ENABLE_0(1) | + V_TF_DDP_BUF0_FLUSH(1) | V_TF_DDP_BUF0_VALID(1); + buf_flag = DDP_BUF0_ACTIVE; + } else { + ddp_flags |= V_TF_DDP_BUF1_VALID(1); + if (so->so_state & SS_NBIO) + ddp_flags |= V_TF_DDP_BUF1_FLUSH(1); + ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE1(1) | + V_TF_DDP_PUSH_DISABLE_1(1) | V_TF_DDP_PSHF_ENABLE_1(1) | + V_TF_DDP_BUF1_FLUSH(1) | V_TF_DDP_BUF1_VALID(1); + buf_flag = DDP_BUF1_ACTIVE; + } + MPASS((toep->ddp_flags & buf_flag) == 0); + if ((toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0) { + MPASS(db_idx == 0); + MPASS(toep->ddp_active_id == -1); + MPASS(toep->ddp_active_count == 0); + ddp_flags_mask |= V_TF_DDP_ACTIVE_BUF(1); + } + + /* + * The TID for this connection should still be valid. If DDP_DEAD + * is set, SBS_CANTRCVMORE should be set, so we shouldn't be + * this far anyway. Even if the socket is closing on the other + * end, the AIO job holds a reference on this end of the socket + * which will keep it open and keep the TCP PCB attached until + * after the job is completed. + */ + wr = mk_update_tcb_for_ddp(sc, toep, db_idx, ps, + job->uaiocb._aiocb_private.status, ddp_flags, ddp_flags_mask); + if (wr == NULL) { + recycle_pageset(toep, ps); + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + + /* + * XXX: Need a way to kick a retry here. + * + * XXX: We know the fixed size needed and could + * preallocate this using a blocking request at the + * start of the task to avoid having to handle this + * edge case. + */ + printf("%s: mk_update_tcb_for_ddp failed\n", __func__); + return; + } + + if (!aio_set_cancel_function(job, t4_aio_cancel_active)) { + free_wrqe(wr); + recycle_pageset(toep, ps); + aio_ddp_cancel_one(job); + toep->ddp_queueing = NULL; + goto restart; + } + +#ifdef VERBOSE_TRACES + CTR5(KTR_CXGBE, "%s: scheduling %p for DDP[%d] (flags %#lx/%#lx)", + __func__, job, db_idx, ddp_flags, ddp_flags_mask); +#endif + /* Give the chip the go-ahead. */ + t4_wrq_tx(sc, wr); + db = &toep->db[db_idx]; + db->cancel_pending = 0; + db->job = job; + db->ps = ps; + toep->ddp_queueing = NULL; + toep->ddp_flags |= buf_flag; + toep->ddp_active_count++; + if (toep->ddp_active_count == 1) { + MPASS(toep->ddp_active_id == -1); + toep->ddp_active_id = db_idx; + CTR2(KTR_CXGBE, "%s: ddp_active_id = %d", __func__, + toep->ddp_active_id); + } + goto restart; +} + +void +ddp_queue_toep(struct toepcb *toep) +{ + + DDP_ASSERT_LOCKED(toep); + if (toep->ddp_flags & DDP_TASK_ACTIVE) + return; + toep->ddp_flags |= DDP_TASK_ACTIVE; + hold_toepcb(toep); + soaio_enqueue(&toep->ddp_requeue_task); +} + +static void +aio_ddp_requeue_task(void *context, int pending) +{ + struct toepcb *toep = context; + + DDP_LOCK(toep); + aio_ddp_requeue(toep); + toep->ddp_flags &= ~DDP_TASK_ACTIVE; + DDP_UNLOCK(toep); + + free_toepcb(toep); +} + +static void +t4_aio_cancel_active(struct kaiocb *job) +{ + struct socket *so = job->fd_file->f_data; + struct tcpcb *tp = so_sototcpcb(so); + struct toepcb *toep = tp->t_toe; + struct adapter *sc = td_adapter(toep->td); + uint64_t valid_flag; + int i; + + DDP_LOCK(toep); + if (aio_cancel_cleared(job)) { + DDP_UNLOCK(toep); + aio_ddp_cancel_one(job); + return; + } + + for (i = 0; i < nitems(toep->db); i++) { + if (toep->db[i].job == job) { + /* Should only ever get one cancel request for a job. */ + MPASS(toep->db[i].cancel_pending == 0); + + /* + * Invalidate this buffer. It will be + * cancelled or partially completed once the + * card ACKs the invalidate. + */ + valid_flag = i == 0 ? V_TF_DDP_BUF0_VALID(1) : + V_TF_DDP_BUF1_VALID(1); + t4_set_tcb_field_rpl(sc, toep, 1, W_TCB_RX_DDP_FLAGS, + valid_flag, 0, i + DDP_BUF0_INVALIDATED); + toep->db[i].cancel_pending = 1; + CTR2(KTR_CXGBE, "%s: request %p marked pending", + __func__, job); + break; + } + } + DDP_UNLOCK(toep); +} + +static void +t4_aio_cancel_queued(struct kaiocb *job) +{ + struct socket *so = job->fd_file->f_data; + struct tcpcb *tp = so_sototcpcb(so); + struct toepcb *toep = tp->t_toe; + + DDP_LOCK(toep); + if (!aio_cancel_cleared(job)) { + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count--; + if (toep->ddp_waiting_count == 0) + ddp_queue_toep(toep); + } + CTR2(KTR_CXGBE, "%s: request %p cancelled", __func__, job); + DDP_UNLOCK(toep); + + aio_ddp_cancel_one(job); +} + +int +t4_aio_queue_ddp(struct socket *so, struct kaiocb *job) +{ + struct tcpcb *tp = so_sototcpcb(so); + struct toepcb *toep = tp->t_toe; + + + /* Ignore writes. */ + if (job->uaiocb.aio_lio_opcode != LIO_READ) + return (EOPNOTSUPP); + + DDP_LOCK(toep); + + /* + * XXX: Think about possibly returning errors for ENOTCONN, + * etc. Perhaps the caller would only queue the request + * if it failed with EOPNOTSUPP? + */ + +#ifdef VERBOSE_TRACES + CTR2(KTR_CXGBE, "%s: queueing %p", __func__, job); +#endif + if (!aio_set_cancel_function(job, t4_aio_cancel_queued)) + panic("new job was cancelled"); + TAILQ_INSERT_TAIL(&toep->ddp_aiojobq, job, list); + job->uaiocb._aiocb_private.status = 0; + toep->ddp_waiting_count++; + toep->ddp_flags |= DDP_OK; + + /* + * Try to handle this request synchronously. If this has + * to block because the task is running, it will just bail + * and let the task handle it instead. + */ + aio_ddp_requeue(toep); + DDP_UNLOCK(toep); + return (0); +} + +int +t4_ddp_mod_load(void) +{ + + TAILQ_INIT(&ddp_orphan_pagesets); + mtx_init(&ddp_orphan_pagesets_lock, "ddp orphans", NULL, MTX_DEF); + TASK_INIT(&ddp_orphan_task, 0, ddp_free_orphan_pagesets, NULL); + return (0); +} + +void +t4_ddp_mod_unload(void) +{ + + taskqueue_drain(taskqueue_thread, &ddp_orphan_task); + MPASS(TAILQ_EMPTY(&ddp_orphan_pagesets)); + mtx_destroy(&ddp_orphan_pagesets_lock); +} #endif diff --git a/sys/dev/cxgbe/tom/t4_tom.c b/sys/dev/cxgbe/tom/t4_tom.c index fe7ec7608a7..452c47e5de9 100644 --- a/sys/dev/cxgbe/tom/t4_tom.c +++ b/sys/dev/cxgbe/tom/t4_tom.c @@ -41,6 +41,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #include #include #include @@ -152,6 +153,7 @@ alloc_toepcb(struct vi_info *vi, int txqid, int rxqid, int flags) if (toep == NULL) return (NULL); + refcount_init(&toep->refcount, 1); toep->td = sc->tom_softc; toep->vi = vi; toep->tx_total = tx_credits; @@ -165,19 +167,32 @@ alloc_toepcb(struct vi_info *vi, int txqid, int rxqid, int flags) toep->txsd_avail = txsd_total; toep->txsd_pidx = 0; toep->txsd_cidx = 0; + ddp_init_toep(toep); return (toep); } +struct toepcb * +hold_toepcb(struct toepcb *toep) +{ + + refcount_acquire(&toep->refcount); + return (toep); +} + void free_toepcb(struct toepcb *toep) { + if (refcount_release(&toep->refcount) == 0) + return; + KASSERT(!(toep->flags & TPF_ATTACHED), ("%s: attached to an inpcb", __func__)); KASSERT(!(toep->flags & TPF_CPL_PENDING), ("%s: CPL pending", __func__)); + ddp_uninit_toep(toep); free(toep, M_CXGBE); } @@ -259,6 +274,8 @@ undo_offload_socket(struct socket *so) mtx_lock(&td->toep_list_lock); TAILQ_REMOVE(&td->toep_list, toep, link); mtx_unlock(&td->toep_list_lock); + + free_toepcb(toep); } static void @@ -283,9 +300,9 @@ release_offload_resources(struct toepcb *toep) */ MPASS(mbufq_len(&toep->ulp_pduq) == 0); MPASS(mbufq_len(&toep->ulp_pdu_reclaimq) == 0); - - if (toep->ulp_mode == ULP_MODE_TCPDDP) - release_ddp_resources(toep); +#ifdef INVARIANTS + ddp_assert_empty(toep); +#endif if (toep->l2te) t4_l2t_release(toep->l2te); @@ -389,6 +406,8 @@ final_cpl_received(struct toepcb *toep) CTR6(KTR_CXGBE, "%s: tid %d, toep %p (0x%x), inp %p (0x%x)", __func__, toep->tid, toep, toep->flags, inp, inp->inp_flags); + if (toep->ulp_mode == ULP_MODE_TCPDDP) + release_ddp_resources(toep); toep->inp = NULL; toep->flags &= ~TPF_CPL_PENDING; mbufq_drain(&toep->ulp_pdu_reclaimq); @@ -599,7 +618,6 @@ set_tcpddp_ulp_mode(struct toepcb *toep) toep->ulp_mode = ULP_MODE_TCPDDP; toep->ddp_flags = DDP_OK; - toep->ddp_score = DDP_LOW_SCORE; } int @@ -1109,12 +1127,16 @@ t4_tom_mod_load(void) int rc; struct protosw *tcp_protosw, *tcp6_protosw; + rc = t4_ddp_mod_load(); + if (rc != 0) + return (rc); + tcp_protosw = pffindproto(PF_INET, IPPROTO_TCP, SOCK_STREAM); if (tcp_protosw == NULL) return (ENOPROTOOPT); bcopy(tcp_protosw, &ddp_protosw, sizeof(ddp_protosw)); bcopy(tcp_protosw->pr_usrreqs, &ddp_usrreqs, sizeof(ddp_usrreqs)); - ddp_usrreqs.pru_soreceive = t4_soreceive_ddp; + ddp_usrreqs.pru_aio_queue = t4_aio_queue_ddp; ddp_protosw.pr_usrreqs = &ddp_usrreqs; tcp6_protosw = pffindproto(PF_INET6, IPPROTO_TCP, SOCK_STREAM); @@ -1122,7 +1144,7 @@ t4_tom_mod_load(void) return (ENOPROTOOPT); bcopy(tcp6_protosw, &ddp6_protosw, sizeof(ddp6_protosw)); bcopy(tcp6_protosw->pr_usrreqs, &ddp6_usrreqs, sizeof(ddp6_usrreqs)); - ddp6_usrreqs.pru_soreceive = t4_soreceive_ddp; + ddp6_usrreqs.pru_aio_queue = t4_aio_queue_ddp; ddp6_protosw.pr_usrreqs = &ddp6_usrreqs; TIMEOUT_TASK_INIT(taskqueue_thread, &clip_task, 0, t4_clip_task, NULL); @@ -1162,6 +1184,8 @@ t4_tom_mod_unload(void) taskqueue_cancel_timeout(taskqueue_thread, &clip_task, NULL); } + t4_ddp_mod_unload(); + return (0); } #endif /* TCP_OFFLOAD */ diff --git a/sys/dev/cxgbe/tom/t4_tom.h b/sys/dev/cxgbe/tom/t4_tom.h index f61888deb3d..09238a49b34 100644 --- a/sys/dev/cxgbe/tom/t4_tom.h +++ b/sys/dev/cxgbe/tom/t4_tom.h @@ -74,6 +74,8 @@ enum { DDP_ON = (1 << 2), /* DDP is turned on */ DDP_BUF0_ACTIVE = (1 << 3), /* buffer 0 in use (not invalidated) */ DDP_BUF1_ACTIVE = (1 << 4), /* buffer 1 in use (not invalidated) */ + DDP_TASK_ACTIVE = (1 << 5), /* requeue task is queued / running */ + DDP_DEAD = (1 << 6), /* toepcb is shutting down */ }; struct ofld_tx_sdesc { @@ -81,19 +83,36 @@ struct ofld_tx_sdesc { uint8_t tx_credits; /* firmware tx credits (unit is 16B) */ }; -struct ddp_buffer { - uint32_t tag; /* includes color, page pod addr, and DDP page size */ +struct pageset { + TAILQ_ENTRY(pageset) link; + vm_page_t *pages; + int npages; + int flags; u_int ppod_addr; int nppods; - int offset; + uint32_t tag; /* includes color, page pod addr, and DDP page size */ + int offset; /* offset in first page */ int len; - int npages; - vm_page_t *pages; + struct vmspace *vm; + u_int vm_timestamp; +}; + +TAILQ_HEAD(pagesetq, pageset); + +#define PS_WIRED 0x0001 /* Pages wired rather than held. */ +#define PS_PPODS_WRITTEN 0x0002 /* Page pods written to the card. */ + +struct ddp_buffer { + struct pageset *ps; + + struct kaiocb *job; + int cancel_pending; }; struct toepcb { TAILQ_ENTRY(toepcb) link; /* toep_list */ u_int flags; /* miscellaneous flags */ + int refcount; struct tom_data *td; struct inpcb *inp; /* backpointer to host stack's PCB */ struct vi_info *vi; /* virtual interface */ @@ -121,9 +140,16 @@ struct toepcb { struct mbufq ulp_pdu_reclaimq; u_int ddp_flags; - struct ddp_buffer *db[2]; - time_t ddp_disabled; - uint8_t ddp_score; + struct ddp_buffer db[2]; + TAILQ_HEAD(, pageset) ddp_cached_pagesets; + TAILQ_HEAD(, kaiocb) ddp_aiojobq; + u_int ddp_waiting_count; + u_int ddp_active_count; + u_int ddp_cached_count; + int ddp_active_id; /* the currently active DDP buffer */ + struct task ddp_requeue_task; + struct kaiocb *ddp_queueing; + struct mtx ddp_lock; /* Tx software descriptor */ uint8_t txsd_total; @@ -133,6 +159,10 @@ struct toepcb { struct ofld_tx_sdesc txsd[]; }; +#define DDP_LOCK(toep) mtx_lock(&(toep)->ddp_lock) +#define DDP_UNLOCK(toep) mtx_unlock(&(toep)->ddp_lock) +#define DDP_ASSERT_LOCKED(toep) mtx_assert(&(toep)->ddp_lock, MA_OWNED) + struct flowc_tx_params { uint32_t snd_nxt; uint32_t rcv_nxt; @@ -242,6 +272,7 @@ mbuf_ulp_submode(struct mbuf *m) /* t4_tom.c */ struct toepcb *alloc_toepcb(struct vi_info *, int, int, int); +struct toepcb *hold_toepcb(struct toepcb *); void free_toepcb(struct toepcb *); void offload_socket(struct socket *, struct toepcb *); void undo_offload_socket(struct socket *); @@ -289,11 +320,14 @@ void send_flowc_wr(struct toepcb *, struct flowc_tx_params *); void send_reset(struct adapter *, struct toepcb *, uint32_t); void make_established(struct toepcb *, uint32_t, uint32_t, uint16_t); void t4_rcvd(struct toedev *, struct tcpcb *); +void t4_rcvd_locked(struct toedev *, struct tcpcb *); int t4_tod_output(struct toedev *, struct tcpcb *); int t4_send_fin(struct toedev *, struct tcpcb *); int t4_send_rst(struct toedev *, struct tcpcb *); void t4_set_tcb_field(struct adapter *, struct toepcb *, int, uint16_t, uint64_t, uint64_t); +void t4_set_tcb_field_rpl(struct adapter *, struct toepcb *, int, uint16_t, + uint64_t, uint64_t, uint8_t); void t4_push_frames(struct adapter *sc, struct toepcb *toep, int drop); void t4_push_pdus(struct adapter *sc, struct toepcb *toep, int drop); @@ -302,10 +336,17 @@ void t4_init_ddp(struct adapter *, struct tom_data *); void t4_uninit_ddp(struct adapter *, struct tom_data *); int t4_soreceive_ddp(struct socket *, struct sockaddr **, struct uio *, struct mbuf **, struct mbuf **, int *); -void enable_ddp(struct adapter *, struct toepcb *toep); +int t4_aio_queue_ddp(struct socket *, struct kaiocb *); +int t4_ddp_mod_load(void); +void t4_ddp_mod_unload(void); +void ddp_assert_empty(struct toepcb *); +void ddp_init_toep(struct toepcb *); +void ddp_uninit_toep(struct toepcb *); +void ddp_queue_toep(struct toepcb *); void release_ddp_resources(struct toepcb *toep); -void handle_ddp_close(struct toepcb *, struct tcpcb *, struct sockbuf *, - uint32_t); +void handle_ddp_close(struct toepcb *, struct tcpcb *, uint32_t); +void handle_ddp_indicate(struct toepcb *); +void handle_ddp_tcb_rpl(struct toepcb *, const struct cpl_set_tcb_rpl *); void insert_ddp_data(struct toepcb *, uint32_t); #endif