Date: Tue, 8 Feb 2022 00:21:17 GMT From: John Baldwin <jhb@FreeBSD.org> To: src-committers@FreeBSD.org, dev-commits-src-all@FreeBSD.org, dev-commits-src-main@FreeBSD.org Subject: git: 511b83b167b0 - main - cxgbei: Replace worker thread pools with per-connection kthreads. Message-ID: <202202080021.2180LHNE030736@gitrepo.freebsd.org>
next in thread | raw e-mail | index | archive | help
The branch main has been updated by jhb: URL: https://cgit.FreeBSD.org/src/commit/?id=511b83b167b02608a1f857fa2e0cc5fb3218e09d commit 511b83b167b02608a1f857fa2e0cc5fb3218e09d Author: John Baldwin <jhb@FreeBSD.org> AuthorDate: 2022-02-08 00:20:40 +0000 Commit: John Baldwin <jhb@FreeBSD.org> CommitDate: 2022-02-08 00:20:40 +0000 cxgbei: Replace worker thread pools with per-connection kthreads. Having a single pool of worker threads adds extra complexity and overhead. The software backend also uses per-connection kthreads. Sponsored by: Chelsio Communications --- sys/dev/cxgbe/cxgbei/cxgbei.c | 237 +--------------------------- sys/dev/cxgbe/cxgbei/cxgbei.h | 22 +-- sys/dev/cxgbe/cxgbei/icl_cxgbei.c | 320 ++++++++++++++++++++------------------ 3 files changed, 178 insertions(+), 401 deletions(-) diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c index c06e39005197..979feace81dd 100644 --- a/sys/dev/cxgbe/cxgbei/cxgbei.c +++ b/sys/dev/cxgbe/cxgbei/cxgbei.c @@ -94,11 +94,6 @@ __FBSDID("$FreeBSD$"); #include "tom/t4_tom.h" #include "cxgbei.h" -static int worker_thread_count; -static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads; - -static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc); - static void read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len, uint32_t *max_rx_data_len, struct ppod_region *pr) @@ -424,7 +419,7 @@ parse_pdu(struct socket *so, struct toepcb *toep, struct icl_cxgbei_conn *icc, return (ip); } -static void +void parse_pdus(struct icl_cxgbei_conn *icc, struct sockbuf *sb) { struct icl_conn *ic = &icc->ic; @@ -588,7 +583,7 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next); if (!icc->rx_active) { icc->rx_active = true; - cwt_queue_for_rx(icc); + wakeup(&icc->rx_active); } SOCKBUF_UNLOCK(sb); INP_WUNLOCK(inp); @@ -831,7 +826,7 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next); if (!icc->rx_active) { icc->rx_active = true; - cwt_queue_for_rx(icc); + wakeup(&icc->rx_active); } SOCKBUF_UNLOCK(sb); INP_WUNLOCK(inp); @@ -928,222 +923,6 @@ static struct uld_info cxgbei_uld_info = { .deactivate = cxgbei_deactivate, }; -static void -cwt_rx_main(void *arg) -{ - struct cxgbei_worker_thread *cwt = arg; - struct icl_cxgbei_conn *icc = NULL; - struct icl_conn *ic; - struct icl_pdu *ip; - struct sockbuf *sb; - STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus); - - MPASS(cwt != NULL); - - mtx_lock(&cwt->cwt_lock); - MPASS(cwt->cwt_state == 0); - cwt->cwt_state = CWT_RUNNING; - cv_signal(&cwt->cwt_cv); - - while (__predict_true(cwt->cwt_state != CWT_STOP)) { - cwt->cwt_state = CWT_RUNNING; - while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) { - TAILQ_REMOVE(&cwt->icc_head, icc, rx_link); - mtx_unlock(&cwt->cwt_lock); - - ic = &icc->ic; - sb = &ic->ic_socket->so_rcv; - - SOCKBUF_LOCK(sb); - if (__predict_false(sbused(sb)) != 0) { - /* - * PDUs were received before the tid - * transitioned to ULP mode. Convert - * them to icl_cxgbei_pdus and insert - * them into the head of rcvd_pdus. - */ - parse_pdus(icc, sb); - } - MPASS(icc->rx_active); - if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) { - MPASS(STAILQ_EMPTY(&rx_pdus)); - STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu); - SOCKBUF_UNLOCK(sb); - - /* Hand over PDUs to ICL. */ - while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) { - STAILQ_REMOVE_HEAD(&rx_pdus, ip_next); - ic->ic_receive(ip); - } - - SOCKBUF_LOCK(sb); - MPASS(STAILQ_EMPTY(&rx_pdus)); - } - MPASS(icc->rx_active); - if (STAILQ_EMPTY(&icc->rcvd_pdus) || - __predict_false(sb->sb_state & SBS_CANTRCVMORE)) { - icc->rx_active = false; - SOCKBUF_UNLOCK(sb); - - mtx_lock(&cwt->cwt_lock); - } else { - SOCKBUF_UNLOCK(sb); - - /* - * More PDUs were received while we were busy - * handing over the previous batch to ICL. - * Re-add this connection to the end of the - * queue. - */ - mtx_lock(&cwt->cwt_lock); - TAILQ_INSERT_TAIL(&cwt->icc_head, icc, - rx_link); - } - } - - /* Inner loop doesn't check for CWT_STOP, do that first. */ - if (__predict_false(cwt->cwt_state == CWT_STOP)) - break; - cwt->cwt_state = CWT_SLEEPING; - cv_wait(&cwt->cwt_cv, &cwt->cwt_lock); - } - - MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL); - mtx_unlock(&cwt->cwt_lock); - kthread_exit(); -} - -static void -cwt_queue_for_rx(struct icl_cxgbei_conn *icc) -{ - struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt]; - - mtx_lock(&cwt->cwt_lock); - TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link); - if (cwt->cwt_state == CWT_SLEEPING) { - cwt->cwt_state = CWT_RUNNING; - cv_signal(&cwt->cwt_cv); - } - mtx_unlock(&cwt->cwt_lock); -} - -void -cwt_queue_for_tx(struct icl_cxgbei_conn *icc) -{ - struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt]; - - mtx_lock(&cwt->cwt_lock); - TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link); - if (cwt->cwt_state == CWT_SLEEPING) { - cwt->cwt_state = CWT_RUNNING; - cv_signal(&cwt->cwt_cv); - } - mtx_unlock(&cwt->cwt_lock); -} - -static int -start_worker_threads(void) -{ - struct proc *cxgbei_proc; - int i, rc; - struct cxgbei_worker_thread *cwt; - - worker_thread_count = min(mp_ncpus, 32); - cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE, - M_WAITOK | M_ZERO); - cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE, - M_WAITOK | M_ZERO); - - for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count; - i++, cwt++) { - mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF); - cv_init(&cwt->cwt_cv, "cwt cv"); - TAILQ_INIT(&cwt->icc_head); - } - - for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count; - i++, cwt++) { - mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF); - cv_init(&cwt->cwt_cv, "cwt cv"); - TAILQ_INIT(&cwt->icc_head); - } - - cxgbei_proc = NULL; - for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count; - i++, cwt++) { - rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc, - &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i); - if (rc != 0) { - printf("cxgbei: failed to start rx thread #%d/%d (%d)\n", - i + 1, worker_thread_count, rc); - return (rc); - } - } - - for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count; - i++, cwt++) { - rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc, - &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i); - if (rc != 0) { - printf("cxgbei: failed to start tx thread #%d/%d (%d)\n", - i + 1, worker_thread_count, rc); - return (rc); - } - } - - return (0); -} - -static void -stop_worker_threads1(struct cxgbei_worker_thread *threads) -{ - struct cxgbei_worker_thread *cwt; - int i; - - for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) { - mtx_lock(&cwt->cwt_lock); - if (cwt->cwt_td != NULL) { - MPASS(cwt->cwt_state == CWT_RUNNING || - cwt->cwt_state == CWT_SLEEPING); - cwt->cwt_state = CWT_STOP; - cv_signal(&cwt->cwt_cv); - mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0); - } - mtx_unlock(&cwt->cwt_lock); - mtx_destroy(&cwt->cwt_lock); - cv_destroy(&cwt->cwt_cv); - } - free(threads, M_CXGBE); -} - -static void -stop_worker_threads(void) -{ - - MPASS(worker_thread_count >= 0); - stop_worker_threads1(cwt_rx_threads); - stop_worker_threads1(cwt_tx_threads); -} - -/* Select a worker thread for a connection. */ -u_int -cxgbei_select_worker_thread(struct icl_cxgbei_conn *icc) -{ - struct adapter *sc = icc->sc; - struct toepcb *toep = icc->toep; - u_int i, n; - - n = worker_thread_count / sc->sge.nofldrxq; - if (n > 0) - i = toep->vi->pi->port_id * n + arc4random() % n; - else - i = arc4random() % worker_thread_count; - - CTR3(KTR_CXGBE, "%s: tid %u, cwt %u", __func__, toep->tid, i); - - return (i); -} - static int cxgbei_mod_load(void) { @@ -1154,15 +933,9 @@ cxgbei_mod_load(void) t4_register_cpl_handler(CPL_RX_ISCSI_DDP, do_rx_iscsi_ddp); t4_register_cpl_handler(CPL_RX_ISCSI_CMP, do_rx_iscsi_cmp); - rc = start_worker_threads(); - if (rc != 0) - return (rc); - rc = t4_register_uld(&cxgbei_uld_info); - if (rc != 0) { - stop_worker_threads(); + if (rc != 0) return (rc); - } t4_iterate(cxgbei_activate_all, NULL); @@ -1178,8 +951,6 @@ cxgbei_mod_unload(void) if (t4_unregister_uld(&cxgbei_uld_info) == EBUSY) return (EBUSY); - stop_worker_threads(); - t4_register_cpl_handler(CPL_ISCSI_HDR, NULL); t4_register_cpl_handler(CPL_ISCSI_DATA, NULL); t4_register_cpl_handler(CPL_RX_ISCSI_DDP, NULL); diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h index b078f3110d62..d0e423ce5b2f 100644 --- a/sys/dev/cxgbe/cxgbei/cxgbei.h +++ b/sys/dev/cxgbe/cxgbei/cxgbei.h @@ -32,21 +32,6 @@ #include <dev/iscsi/icl.h> -enum { - CWT_SLEEPING = 1, - CWT_RUNNING = 2, - CWT_STOP = 3, -}; - -struct cxgbei_worker_thread { - struct mtx cwt_lock; - struct cv cwt_cv; - volatile int cwt_state; - struct thread *cwt_td; - - TAILQ_HEAD(, icl_cxgbei_conn) icc_head; -} __aligned(CACHE_LINE_SIZE); - #define CXGBEI_CONN_SIGNATURE 0x56788765 struct cxgbei_cmp { @@ -67,12 +52,12 @@ struct icl_cxgbei_conn { int ulp_submode; struct adapter *sc; struct toepcb *toep; - u_int cwt; /* Receive related. */ bool rx_active; /* protected by so_rcv lock */ + bool rx_exiting; /* protected by so_rcv lock */ STAILQ_HEAD(, icl_pdu) rcvd_pdus; /* protected by so_rcv lock */ - TAILQ_ENTRY(icl_cxgbei_conn) rx_link; /* protected by cwt lock */ + struct thread *rx_thread; struct cxgbei_cmp_head *cmp_table; /* protected by cmp_lock */ struct mtx cmp_lock; @@ -81,7 +66,7 @@ struct icl_cxgbei_conn { /* Transmit related. */ bool tx_active; /* protected by ic lock */ STAILQ_HEAD(, icl_pdu) sent_pdus; /* protected by ic lock */ - TAILQ_ENTRY(icl_cxgbei_conn) tx_link; /* protected by cwt lock */ + struct thread *tx_thread; }; static inline struct icl_cxgbei_conn * @@ -136,6 +121,7 @@ struct cxgbei_data { /* cxgbei.c */ u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *); void cwt_queue_for_tx(struct icl_cxgbei_conn *); +void parse_pdus(struct icl_cxgbei_conn *, struct sockbuf *); /* icl_cxgbei.c */ void cwt_tx_main(void *); diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c index 516ab931a49c..296d4f2d270a 100644 --- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c +++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c @@ -422,124 +422,133 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct icl_cxgbei_pdu *icp) } static void -cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq) +icl_cxgbei_tx_main(void *arg) { struct epoch_tracker et; + struct icl_cxgbei_conn *icc = arg; struct icl_conn *ic = &icc->ic; struct toepcb *toep = icc->toep; - struct inpcb *inp; - - /* - * Do not get inp from toep->inp as the toepcb might have - * detached already. - */ - inp = sotoinpcb(so); - CURVNET_SET(toep->vnet); - NET_EPOCH_ENTER(et); - INP_WLOCK(inp); - - ICL_CONN_UNLOCK(ic); - if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) || - __predict_false((toep->flags & TPF_ATTACHED) == 0)) { - mbufq_drain(mq); - } else { - mbufq_concat(&toep->ulp_pduq, mq); - t4_push_pdus(icc->sc, toep, 0); - } - INP_WUNLOCK(inp); - NET_EPOCH_EXIT(et); - CURVNET_RESTORE(); - - ICL_CONN_LOCK(ic); -} - -void -cwt_tx_main(void *arg) -{ - struct cxgbei_worker_thread *cwt = arg; - struct icl_cxgbei_conn *icc; - struct icl_conn *ic; + struct socket *so = ic->ic_socket; + struct inpcb *inp = sotoinpcb(so); struct icl_pdu *ip; - struct socket *so; struct mbuf *m; struct mbufq mq; STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus); - MPASS(cwt != NULL); + mbufq_init(&mq, INT_MAX); - mtx_lock(&cwt->cwt_lock); - MPASS(cwt->cwt_state == 0); - cwt->cwt_state = CWT_RUNNING; - cv_signal(&cwt->cwt_cv); + ICL_CONN_LOCK(ic); + while (__predict_true(!ic->ic_disconnecting)) { + while (STAILQ_EMPTY(&icc->sent_pdus)) { + icc->tx_active = false; + mtx_sleep(&icc->tx_active, ic->ic_lock, 0, "-", 0); + if (__predict_false(ic->ic_disconnecting)) + goto out; + MPASS(icc->tx_active); + } - mbufq_init(&mq, INT_MAX); - while (__predict_true(cwt->cwt_state != CWT_STOP)) { - cwt->cwt_state = CWT_RUNNING; - while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) { - TAILQ_REMOVE(&cwt->icc_head, icc, tx_link); - mtx_unlock(&cwt->cwt_lock); + STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu); + ICL_CONN_UNLOCK(ic); - ic = &icc->ic; + while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) { + STAILQ_REMOVE_HEAD(&tx_pdus, ip_next); - ICL_CONN_LOCK(ic); - MPASS(icc->tx_active); - STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu); - ICL_CONN_UNLOCK(ic); + m = finalize_pdu(icc, ip_to_icp(ip)); + M_ASSERTPKTHDR(m); + MPASS((m->m_pkthdr.len & 3) == 0); - while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) { - STAILQ_REMOVE_HEAD(&tx_pdus, ip_next); + mbufq_enqueue(&mq, m); + } - m = finalize_pdu(icc, ip_to_icp(ip)); - M_ASSERTPKTHDR(m); - MPASS((m->m_pkthdr.len & 3) == 0); + ICL_CONN_LOCK(ic); + if (__predict_false(ic->ic_disconnecting) || + __predict_false(ic->ic_socket == NULL)) { + mbufq_drain(&mq); + break; + } - mbufq_enqueue(&mq, m); - } + CURVNET_SET(toep->vnet); + NET_EPOCH_ENTER(et); + INP_WLOCK(inp); - ICL_CONN_LOCK(ic); - so = ic->ic_socket; - if (__predict_false(ic->ic_disconnecting) || - __predict_false(so == NULL)) { - mbufq_drain(&mq); - icc->tx_active = false; - ICL_CONN_UNLOCK(ic); + ICL_CONN_UNLOCK(ic); + if (__predict_false(inp->inp_flags & (INP_DROPPED | + INP_TIMEWAIT)) || + __predict_false((toep->flags & TPF_ATTACHED) == 0)) { + mbufq_drain(&mq); + } else { + mbufq_concat(&toep->ulp_pduq, &mq); + t4_push_pdus(icc->sc, toep, 0); + } + INP_WUNLOCK(inp); + NET_EPOCH_EXIT(et); + CURVNET_RESTORE(); - mtx_lock(&cwt->cwt_lock); - continue; - } + ICL_CONN_LOCK(ic); + } +out: + ICL_CONN_UNLOCK(ic); - cwt_push_pdus(icc, so, &mq); + kthread_exit(); +} - MPASS(icc->tx_active); - if (STAILQ_EMPTY(&icc->sent_pdus)) { - icc->tx_active = false; - ICL_CONN_UNLOCK(ic); - - mtx_lock(&cwt->cwt_lock); - } else { - ICL_CONN_UNLOCK(ic); - - /* - * More PDUs were queued while we were - * busy sending the previous batch. - * Re-add this connection to the end - * of the queue. - */ - mtx_lock(&cwt->cwt_lock); - TAILQ_INSERT_TAIL(&cwt->icc_head, icc, - tx_link); - } +static void +icl_cxgbei_rx_main(void *arg) +{ + struct icl_cxgbei_conn *icc = arg; + struct icl_conn *ic = &icc->ic; + struct icl_pdu *ip; + struct sockbuf *sb; + STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus); + bool cantrcvmore; + + sb = &ic->ic_socket->so_rcv; + SOCKBUF_LOCK(sb); + while (__predict_true(!ic->ic_disconnecting)) { + while (STAILQ_EMPTY(&icc->rcvd_pdus)) { + icc->rx_active = false; + mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0); + if (__predict_false(ic->ic_disconnecting)) + goto out; + MPASS(icc->rx_active); } - /* Inner loop doesn't check for CWT_STOP, do that first. */ - if (__predict_false(cwt->cwt_state == CWT_STOP)) - break; - cwt->cwt_state = CWT_SLEEPING; - cv_wait(&cwt->cwt_cv, &cwt->cwt_lock); + if (__predict_false(sbused(sb)) != 0) { + /* + * PDUs were received before the tid + * transitioned to ULP mode. Convert + * them to icl_cxgbei_pdus and insert + * them into the head of rcvd_pdus. + */ + parse_pdus(icc, sb); + } + cantrcvmore = (sb->sb_state & SBS_CANTRCVMORE) != 0; + MPASS(STAILQ_EMPTY(&rx_pdus)); + STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu); + SOCKBUF_UNLOCK(sb); + + /* Hand over PDUs to ICL. */ + while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) { + STAILQ_REMOVE_HEAD(&rx_pdus, ip_next); + if (cantrcvmore) + icl_cxgbei_pdu_done(ip, ENOTCONN); + else + ic->ic_receive(ip); + } + + SOCKBUF_LOCK(sb); } +out: + /* + * Since ic_disconnecting is set before the SOCKBUF_MTX is + * locked in icl_cxgbei_conn_close, the loop above can exit + * before icl_cxgbei_conn_close can lock SOCKBUF_MTX and block + * waiting for the thread exit. + */ + while (!icc->rx_exiting) + mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0); + SOCKBUF_UNLOCK(sb); - MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL); - mtx_unlock(&cwt->cwt_lock); kthread_exit(); } @@ -678,7 +687,7 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip, STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next); if (!icc->tx_active) { icc->tx_active = true; - cwt_queue_for_tx(icc); + wakeup(&icc->tx_active); } } @@ -740,10 +749,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket *so, int sspace, rs = max(recvspace, rspace); error = soreserve(so, ss, rs); - if (error != 0) { - icl_cxgbei_conn_close(ic); + if (error != 0) return (error); - } SOCKBUF_LOCK(&so->so_snd); so->so_snd.sb_flags |= SB_AUTOSIZE; SOCKBUF_UNLOCK(&so->so_snd); @@ -761,10 +768,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket *so, int sspace, opt.sopt_val = &one; opt.sopt_valsize = sizeof(one); error = sosetopt(so, &opt); - if (error != 0) { - icl_cxgbei_conn_close(ic); + if (error != 0) return (error); - } return (0); } @@ -934,8 +939,10 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd) fa.sc = NULL; fa.so = so; t4_iterate(find_offload_adapter, &fa); - if (fa.sc == NULL) - return (EINVAL); + if (fa.sc == NULL) { + error = EINVAL; + goto out; + } icc->sc = fa.sc; max_rx_pdu_len = ISCSI_BHS_SIZE + ic->ic_max_recv_data_segment_length; @@ -954,7 +961,8 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd) tp = intotcpcb(inp); if (inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) { INP_WUNLOCK(inp); - return (EBUSY); + error = ENOTCONN; + goto out; } /* @@ -968,11 +976,11 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd) if (ulp_mode(toep) != ULP_MODE_NONE) { INP_WUNLOCK(inp); - return (EINVAL); + error = EINVAL; + goto out; } icc->toep = toep; - icc->cwt = cxgbei_select_worker_thread(icc); icc->ulp_submode = 0; if (ic->ic_header_crc32c) @@ -996,7 +1004,21 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd) set_ulp_mode_iscsi(icc->sc, toep, icc->ulp_submode); INP_WUNLOCK(inp); - return (icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len)); + error = kthread_add(icl_cxgbei_tx_main, icc, NULL, &icc->tx_thread, 0, + 0, "%stx (cxgbei)", ic->ic_name); + if (error != 0) + goto out; + + error = kthread_add(icl_cxgbei_rx_main, icc, NULL, &icc->rx_thread, 0, + 0, "%srx (cxgbei)", ic->ic_name); + if (error != 0) + goto out; + + error = icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len); +out: + if (error != 0) + icl_cxgbei_conn_close(ic); + return (error); } void @@ -1027,60 +1049,58 @@ icl_cxgbei_conn_close(struct icl_conn *ic) ("destroying session with %d outstanding PDUs", ic->ic_outstanding_pdus)); #endif - ICL_CONN_UNLOCK(ic); CTR3(KTR_CXGBE, "%s: tid %d, icc %p", __func__, toep ? toep->tid : -1, icc); + + /* + * Wait for the transmit thread to stop processing + * this connection. + */ + if (icc->tx_thread != NULL) { + wakeup(&icc->tx_active); + mtx_sleep(icc->tx_thread, ic->ic_lock, 0, "conclo", 0); + } + + /* Discard PDUs queued for TX. */ + while (!STAILQ_EMPTY(&icc->sent_pdus)) { + ip = STAILQ_FIRST(&icc->sent_pdus); + STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next); + icl_cxgbei_pdu_done(ip, ENOTCONN); + } + ICL_CONN_UNLOCK(ic); + inp = sotoinpcb(so); sb = &so->so_rcv; + + /* + * Wait for the receive thread to stop processing this + * connection. + */ + SOCKBUF_LOCK(sb); + if (icc->rx_thread != NULL) { + icc->rx_exiting = true; + wakeup(&icc->rx_active); + mtx_sleep(icc->rx_thread, SOCKBUF_MTX(sb), 0, "conclo", 0); + } + + /* + * Discard received PDUs not passed to the iSCSI layer. + */ + while (!STAILQ_EMPTY(&icc->rcvd_pdus)) { + ip = STAILQ_FIRST(&icc->rcvd_pdus); + STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next); + icl_cxgbei_pdu_done(ip, ENOTCONN); + } + SOCKBUF_UNLOCK(sb); + INP_WLOCK(inp); if (toep != NULL) { /* NULL if connection was never offloaded. */ toep->ulpcb = NULL; - /* - * Wait for the cwt threads to stop processing this - * connection for transmit. - */ - while (icc->tx_active) - rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1); - - /* Discard PDUs queued for TX. */ - while (!STAILQ_EMPTY(&icc->sent_pdus)) { - ip = STAILQ_FIRST(&icc->sent_pdus); - STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next); - icl_cxgbei_pdu_done(ip, ENOTCONN); - } + /* Discard mbufs queued for TX. */ mbufq_drain(&toep->ulp_pduq); - /* - * Wait for the cwt threads to stop processing this - * connection for receive. - */ - SOCKBUF_LOCK(sb); - if (icc->rx_active) { - volatile bool *p = &icc->rx_active; - - SOCKBUF_UNLOCK(sb); - INP_WUNLOCK(inp); - - while (*p) - pause("conclo", 1); - - INP_WLOCK(inp); - SOCKBUF_LOCK(sb); - } - - /* - * Discard received PDUs not passed to the iSCSI - * layer. - */ - while (!STAILQ_EMPTY(&icc->rcvd_pdus)) { - ip = STAILQ_FIRST(&icc->rcvd_pdus); - STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next); - icl_cxgbei_pdu_done(ip, ENOTCONN); - } - SOCKBUF_UNLOCK(sb); - /* * Grab a reference to use when waiting for the final * CPL to be received. If toep->inp is NULL, then
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?202202080021.2180LHNE030736>