Skip site navigation (1)Skip section navigation (2)
Date:      Fri, 29 Apr 2022 23:12:45 GMT
From:      John Baldwin <jhb@FreeBSD.org>
To:        src-committers@FreeBSD.org, dev-commits-src-all@FreeBSD.org, dev-commits-src-branches@FreeBSD.org
Subject:   git: ca0ad465d1eb - stable/13 - cxgbei: Replace worker thread pools with per-connection kthreads.
Message-ID:  <202204292312.23TNCjlb045071@gitrepo.freebsd.org>

next in thread | raw e-mail | index | archive | help
The branch stable/13 has been updated by jhb:

URL: https://cgit.FreeBSD.org/src/commit/?id=ca0ad465d1eb0b1b1b5ffe90a12fc931f3ebd5bd

commit ca0ad465d1eb0b1b1b5ffe90a12fc931f3ebd5bd
Author:     John Baldwin <jhb@FreeBSD.org>
AuthorDate: 2022-02-08 00:20:40 +0000
Commit:     John Baldwin <jhb@FreeBSD.org>
CommitDate: 2022-04-29 23:09:02 +0000

    cxgbei: Replace worker thread pools with per-connection kthreads.
    
    Having a single pool of worker threads adds extra complexity and
    overhead.  The software backend also uses per-connection kthreads.
    
    Sponsored by:   Chelsio Communications
    
    (cherry picked from commit 511b83b167b02608a1f857fa2e0cc5fb3218e09d)
---
 sys/dev/cxgbe/cxgbei/cxgbei.c     | 237 +---------------------------
 sys/dev/cxgbe/cxgbei/cxgbei.h     |  22 +--
 sys/dev/cxgbe/cxgbei/icl_cxgbei.c | 320 ++++++++++++++++++++------------------
 3 files changed, 178 insertions(+), 401 deletions(-)

diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c
index c06e39005197..979feace81dd 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.c
@@ -94,11 +94,6 @@ __FBSDID("$FreeBSD$");
 #include "tom/t4_tom.h"
 #include "cxgbei.h"
 
-static int worker_thread_count;
-static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads;
-
-static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc);
-
 static void
 read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len,
     uint32_t *max_rx_data_len, struct ppod_region *pr)
@@ -424,7 +419,7 @@ parse_pdu(struct socket *so, struct toepcb *toep, struct icl_cxgbei_conn *icc,
 	return (ip);
 }
 
-static void
+void
 parse_pdus(struct icl_cxgbei_conn *icc, struct sockbuf *sb)
 {
 	struct icl_conn *ic = &icc->ic;
@@ -588,7 +583,7 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 	STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
 	if (!icc->rx_active) {
 		icc->rx_active = true;
-		cwt_queue_for_rx(icc);
+		wakeup(&icc->rx_active);
 	}
 	SOCKBUF_UNLOCK(sb);
 	INP_WUNLOCK(inp);
@@ -831,7 +826,7 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 	STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
 	if (!icc->rx_active) {
 		icc->rx_active = true;
-		cwt_queue_for_rx(icc);
+		wakeup(&icc->rx_active);
 	}
 	SOCKBUF_UNLOCK(sb);
 	INP_WUNLOCK(inp);
@@ -928,222 +923,6 @@ static struct uld_info cxgbei_uld_info = {
 	.deactivate = cxgbei_deactivate,
 };
 
-static void
-cwt_rx_main(void *arg)
-{
-	struct cxgbei_worker_thread *cwt = arg;
-	struct icl_cxgbei_conn *icc = NULL;
-	struct icl_conn *ic;
-	struct icl_pdu *ip;
-	struct sockbuf *sb;
-	STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus);
-
-	MPASS(cwt != NULL);
-
-	mtx_lock(&cwt->cwt_lock);
-	MPASS(cwt->cwt_state == 0);
-	cwt->cwt_state = CWT_RUNNING;
-	cv_signal(&cwt->cwt_cv);
-
-	while (__predict_true(cwt->cwt_state != CWT_STOP)) {
-		cwt->cwt_state = CWT_RUNNING;
-		while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
-			TAILQ_REMOVE(&cwt->icc_head, icc, rx_link);
-			mtx_unlock(&cwt->cwt_lock);
-
-			ic = &icc->ic;
-			sb = &ic->ic_socket->so_rcv;
-
-			SOCKBUF_LOCK(sb);
-			if (__predict_false(sbused(sb)) != 0) {
-				/*
-				 * PDUs were received before the tid
-				 * transitioned to ULP mode.  Convert
-				 * them to icl_cxgbei_pdus and insert
-				 * them into the head of rcvd_pdus.
-				 */
-				parse_pdus(icc, sb);
-			}
-			MPASS(icc->rx_active);
-			if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) {
-				MPASS(STAILQ_EMPTY(&rx_pdus));
-				STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
-				SOCKBUF_UNLOCK(sb);
-
-				/* Hand over PDUs to ICL. */
-				while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) {
-					STAILQ_REMOVE_HEAD(&rx_pdus, ip_next);
-					ic->ic_receive(ip);
-				}
-
-				SOCKBUF_LOCK(sb);
-				MPASS(STAILQ_EMPTY(&rx_pdus));
-			}
-			MPASS(icc->rx_active);
-			if (STAILQ_EMPTY(&icc->rcvd_pdus) ||
-			    __predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
-				icc->rx_active = false;
-				SOCKBUF_UNLOCK(sb);
-
-				mtx_lock(&cwt->cwt_lock);
-			} else {
-				SOCKBUF_UNLOCK(sb);
-
-				/*
-				 * More PDUs were received while we were busy
-				 * handing over the previous batch to ICL.
-				 * Re-add this connection to the end of the
-				 * queue.
-				 */
-				mtx_lock(&cwt->cwt_lock);
-				TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
-				    rx_link);
-			}
-		}
-
-		/* Inner loop doesn't check for CWT_STOP, do that first. */
-		if (__predict_false(cwt->cwt_state == CWT_STOP))
-			break;
-		cwt->cwt_state = CWT_SLEEPING;
-		cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
-	}
-
-	MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
-	mtx_unlock(&cwt->cwt_lock);
-	kthread_exit();
-}
-
-static void
-cwt_queue_for_rx(struct icl_cxgbei_conn *icc)
-{
-	struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt];
-
-	mtx_lock(&cwt->cwt_lock);
-	TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link);
-	if (cwt->cwt_state == CWT_SLEEPING) {
-		cwt->cwt_state = CWT_RUNNING;
-		cv_signal(&cwt->cwt_cv);
-	}
-	mtx_unlock(&cwt->cwt_lock);
-}
-
-void
-cwt_queue_for_tx(struct icl_cxgbei_conn *icc)
-{
-	struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt];
-
-	mtx_lock(&cwt->cwt_lock);
-	TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link);
-	if (cwt->cwt_state == CWT_SLEEPING) {
-		cwt->cwt_state = CWT_RUNNING;
-		cv_signal(&cwt->cwt_cv);
-	}
-	mtx_unlock(&cwt->cwt_lock);
-}
-
-static int
-start_worker_threads(void)
-{
-	struct proc *cxgbei_proc;
-	int i, rc;
-	struct cxgbei_worker_thread *cwt;
-
-	worker_thread_count = min(mp_ncpus, 32);
-	cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
-	    M_WAITOK | M_ZERO);
-	cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
-	    M_WAITOK | M_ZERO);
-
-	for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
-	     i++, cwt++) {
-		mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
-		cv_init(&cwt->cwt_cv, "cwt cv");
-		TAILQ_INIT(&cwt->icc_head);
-	}
-
-	for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
-	     i++, cwt++) {
-		mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
-		cv_init(&cwt->cwt_cv, "cwt cv");
-		TAILQ_INIT(&cwt->icc_head);
-	}
-
-	cxgbei_proc = NULL;
-	for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
-	     i++, cwt++) {
-		rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc,
-		    &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i);
-		if (rc != 0) {
-			printf("cxgbei: failed to start rx thread #%d/%d (%d)\n",
-			    i + 1, worker_thread_count, rc);
-			return (rc);
-		}
-	}
-
-	for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
-	     i++, cwt++) {
-		rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc,
-		    &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i);
-		if (rc != 0) {
-			printf("cxgbei: failed to start tx thread #%d/%d (%d)\n",
-			    i + 1, worker_thread_count, rc);
-			return (rc);
-		}
-	}
-
-	return (0);
-}
-
-static void
-stop_worker_threads1(struct cxgbei_worker_thread *threads)
-{
-	struct cxgbei_worker_thread *cwt;
-	int i;
-
-	for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) {
-		mtx_lock(&cwt->cwt_lock);
-		if (cwt->cwt_td != NULL) {
-			MPASS(cwt->cwt_state == CWT_RUNNING ||
-			    cwt->cwt_state == CWT_SLEEPING);
-			cwt->cwt_state = CWT_STOP;
-			cv_signal(&cwt->cwt_cv);
-			mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0);
-		}
-		mtx_unlock(&cwt->cwt_lock);
-		mtx_destroy(&cwt->cwt_lock);
-		cv_destroy(&cwt->cwt_cv);
-	}
-	free(threads, M_CXGBE);
-}
-
-static void
-stop_worker_threads(void)
-{
-
-	MPASS(worker_thread_count >= 0);
-	stop_worker_threads1(cwt_rx_threads);
-	stop_worker_threads1(cwt_tx_threads);
-}
-
-/* Select a worker thread for a connection. */
-u_int
-cxgbei_select_worker_thread(struct icl_cxgbei_conn *icc)
-{
-	struct adapter *sc = icc->sc;
-	struct toepcb *toep = icc->toep;
-	u_int i, n;
-
-	n = worker_thread_count / sc->sge.nofldrxq;
-	if (n > 0)
-		i = toep->vi->pi->port_id * n + arc4random() % n;
-	else
-		i = arc4random() % worker_thread_count;
-
-	CTR3(KTR_CXGBE, "%s: tid %u, cwt %u", __func__, toep->tid, i);
-
-	return (i);
-}
-
 static int
 cxgbei_mod_load(void)
 {
@@ -1154,15 +933,9 @@ cxgbei_mod_load(void)
 	t4_register_cpl_handler(CPL_RX_ISCSI_DDP, do_rx_iscsi_ddp);
 	t4_register_cpl_handler(CPL_RX_ISCSI_CMP, do_rx_iscsi_cmp);
 
-	rc = start_worker_threads();
-	if (rc != 0)
-		return (rc);
-
 	rc = t4_register_uld(&cxgbei_uld_info);
-	if (rc != 0) {
-		stop_worker_threads();
+	if (rc != 0)
 		return (rc);
-	}
 
 	t4_iterate(cxgbei_activate_all, NULL);
 
@@ -1178,8 +951,6 @@ cxgbei_mod_unload(void)
 	if (t4_unregister_uld(&cxgbei_uld_info) == EBUSY)
 		return (EBUSY);
 
-	stop_worker_threads();
-
 	t4_register_cpl_handler(CPL_ISCSI_HDR, NULL);
 	t4_register_cpl_handler(CPL_ISCSI_DATA, NULL);
 	t4_register_cpl_handler(CPL_RX_ISCSI_DDP, NULL);
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h
index b078f3110d62..d0e423ce5b2f 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.h
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.h
@@ -32,21 +32,6 @@
 
 #include <dev/iscsi/icl.h>
 
-enum {
-	CWT_SLEEPING	= 1,
-	CWT_RUNNING	= 2,
-	CWT_STOP	= 3,
-};
-
-struct cxgbei_worker_thread {
-	struct mtx	cwt_lock;
-	struct cv	cwt_cv;
-	volatile int	cwt_state;
-	struct thread	*cwt_td;
-
-	TAILQ_HEAD(, icl_cxgbei_conn) icc_head;
-} __aligned(CACHE_LINE_SIZE);
-
 #define CXGBEI_CONN_SIGNATURE 0x56788765
 
 struct cxgbei_cmp {
@@ -67,12 +52,12 @@ struct icl_cxgbei_conn {
 	int ulp_submode;
 	struct adapter *sc;
 	struct toepcb *toep;
-	u_int cwt;
 
 	/* Receive related. */
 	bool rx_active;				/* protected by so_rcv lock */
+	bool rx_exiting;			/* protected by so_rcv lock */
 	STAILQ_HEAD(, icl_pdu) rcvd_pdus;	/* protected by so_rcv lock */
-	TAILQ_ENTRY(icl_cxgbei_conn) rx_link;	/* protected by cwt lock */
+	struct thread *rx_thread;
 
 	struct cxgbei_cmp_head *cmp_table;	/* protected by cmp_lock */
 	struct mtx cmp_lock;
@@ -81,7 +66,7 @@ struct icl_cxgbei_conn {
 	/* Transmit related. */
 	bool tx_active;				/* protected by ic lock */
 	STAILQ_HEAD(, icl_pdu) sent_pdus;	/* protected by ic lock */
-	TAILQ_ENTRY(icl_cxgbei_conn) tx_link;	/* protected by cwt lock */
+	struct thread *tx_thread;
 };
 
 static inline struct icl_cxgbei_conn *
@@ -136,6 +121,7 @@ struct cxgbei_data {
 /* cxgbei.c */
 u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *);
 void cwt_queue_for_tx(struct icl_cxgbei_conn *);
+void parse_pdus(struct icl_cxgbei_conn *, struct sockbuf *);
 
 /* icl_cxgbei.c */
 void cwt_tx_main(void *);
diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
index 10d5430c6413..0d4f9b07ae2f 100644
--- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
@@ -422,124 +422,133 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct icl_cxgbei_pdu *icp)
 }
 
 static void
-cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq)
+icl_cxgbei_tx_main(void *arg)
 {
 	struct epoch_tracker et;
+	struct icl_cxgbei_conn *icc = arg;
 	struct icl_conn *ic = &icc->ic;
 	struct toepcb *toep = icc->toep;
-	struct inpcb *inp;
-
-	/*
-	 * Do not get inp from toep->inp as the toepcb might have
-	 * detached already.
-	 */
-	inp = sotoinpcb(so);
-	CURVNET_SET(toep->vnet);
-	NET_EPOCH_ENTER(et);
-	INP_WLOCK(inp);
-
-	ICL_CONN_UNLOCK(ic);
-	if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
-	    __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
-		mbufq_drain(mq);
-	} else {
-		mbufq_concat(&toep->ulp_pduq, mq);
-		t4_push_pdus(icc->sc, toep, 0);
-	}
-	INP_WUNLOCK(inp);
-	NET_EPOCH_EXIT(et);
-	CURVNET_RESTORE();
-
-	ICL_CONN_LOCK(ic);
-}
-
-void
-cwt_tx_main(void *arg)
-{
-	struct cxgbei_worker_thread *cwt = arg;
-	struct icl_cxgbei_conn *icc;
-	struct icl_conn *ic;
+	struct socket *so = ic->ic_socket;
+	struct inpcb *inp = sotoinpcb(so);
 	struct icl_pdu *ip;
-	struct socket *so;
 	struct mbuf *m;
 	struct mbufq mq;
 	STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus);
 
-	MPASS(cwt != NULL);
+	mbufq_init(&mq, INT_MAX);
 
-	mtx_lock(&cwt->cwt_lock);
-	MPASS(cwt->cwt_state == 0);
-	cwt->cwt_state = CWT_RUNNING;
-	cv_signal(&cwt->cwt_cv);
+	ICL_CONN_LOCK(ic);
+	while (__predict_true(!ic->ic_disconnecting)) {
+		while (STAILQ_EMPTY(&icc->sent_pdus)) {
+			icc->tx_active = false;
+			mtx_sleep(&icc->tx_active, ic->ic_lock, 0, "-", 0);
+			if (__predict_false(ic->ic_disconnecting))
+				goto out;
+			MPASS(icc->tx_active);
+		}
 
-	mbufq_init(&mq, INT_MAX);
-	while (__predict_true(cwt->cwt_state != CWT_STOP)) {
-		cwt->cwt_state = CWT_RUNNING;
-		while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
-			TAILQ_REMOVE(&cwt->icc_head, icc, tx_link);
-			mtx_unlock(&cwt->cwt_lock);
+		STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
+		ICL_CONN_UNLOCK(ic);
 
-			ic = &icc->ic;
+		while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
+			STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
 
-			ICL_CONN_LOCK(ic);
-			MPASS(icc->tx_active);
-			STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
-			ICL_CONN_UNLOCK(ic);
+			m = finalize_pdu(icc, ip_to_icp(ip));
+			M_ASSERTPKTHDR(m);
+			MPASS((m->m_pkthdr.len & 3) == 0);
 
-			while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
-				STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
+			mbufq_enqueue(&mq, m);
+		}
 
-				m = finalize_pdu(icc, ip_to_icp(ip));
-				M_ASSERTPKTHDR(m);
-				MPASS((m->m_pkthdr.len & 3) == 0);
+		ICL_CONN_LOCK(ic);
+		if (__predict_false(ic->ic_disconnecting) ||
+		    __predict_false(ic->ic_socket == NULL)) {
+			mbufq_drain(&mq);
+			break;
+		}
 
-				mbufq_enqueue(&mq, m);
-			}
+		CURVNET_SET(toep->vnet);
+		NET_EPOCH_ENTER(et);
+		INP_WLOCK(inp);
 
-			ICL_CONN_LOCK(ic);
-			so = ic->ic_socket;
-			if (__predict_false(ic->ic_disconnecting) ||
-			    __predict_false(so == NULL)) {
-				mbufq_drain(&mq);
-				icc->tx_active = false;
-				ICL_CONN_UNLOCK(ic);
+		ICL_CONN_UNLOCK(ic);
+		if (__predict_false(inp->inp_flags & (INP_DROPPED |
+		    INP_TIMEWAIT)) ||
+		    __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
+			mbufq_drain(&mq);
+		} else {
+			mbufq_concat(&toep->ulp_pduq, &mq);
+			t4_push_pdus(icc->sc, toep, 0);
+		}
+		INP_WUNLOCK(inp);
+		NET_EPOCH_EXIT(et);
+		CURVNET_RESTORE();
 
-				mtx_lock(&cwt->cwt_lock);
-				continue;
-			}
+		ICL_CONN_LOCK(ic);
+	}
+out:
+	ICL_CONN_UNLOCK(ic);
 
-			cwt_push_pdus(icc, so, &mq);
+	kthread_exit();
+}
 
-			MPASS(icc->tx_active);
-			if (STAILQ_EMPTY(&icc->sent_pdus)) {
-				icc->tx_active = false;
-				ICL_CONN_UNLOCK(ic);
-
-				mtx_lock(&cwt->cwt_lock);
-			} else {
-				ICL_CONN_UNLOCK(ic);
-
-				/*
-				 * More PDUs were queued while we were
-				 * busy sending the previous batch.
-				 * Re-add this connection to the end
-				 * of the queue.
-				 */
-				mtx_lock(&cwt->cwt_lock);
-				TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
-				    tx_link);
-			}
+static void
+icl_cxgbei_rx_main(void *arg)
+{
+	struct icl_cxgbei_conn *icc = arg;
+	struct icl_conn *ic = &icc->ic;
+	struct icl_pdu *ip;
+	struct sockbuf *sb;
+	STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus);
+	bool cantrcvmore;
+
+	sb = &ic->ic_socket->so_rcv;
+	SOCKBUF_LOCK(sb);
+	while (__predict_true(!ic->ic_disconnecting)) {
+		while (STAILQ_EMPTY(&icc->rcvd_pdus)) {
+			icc->rx_active = false;
+			mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0);
+			if (__predict_false(ic->ic_disconnecting))
+				goto out;
+			MPASS(icc->rx_active);
 		}
 
-		/* Inner loop doesn't check for CWT_STOP, do that first. */
-		if (__predict_false(cwt->cwt_state == CWT_STOP))
-			break;
-		cwt->cwt_state = CWT_SLEEPING;
-		cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
+		if (__predict_false(sbused(sb)) != 0) {
+			/*
+			 * PDUs were received before the tid
+			 * transitioned to ULP mode.  Convert
+			 * them to icl_cxgbei_pdus and insert
+			 * them into the head of rcvd_pdus.
+			 */
+			parse_pdus(icc, sb);
+		}
+		cantrcvmore = (sb->sb_state & SBS_CANTRCVMORE) != 0;
+		MPASS(STAILQ_EMPTY(&rx_pdus));
+		STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
+		SOCKBUF_UNLOCK(sb);
+
+		/* Hand over PDUs to ICL. */
+		while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) {
+			STAILQ_REMOVE_HEAD(&rx_pdus, ip_next);
+			if (cantrcvmore)
+				icl_cxgbei_pdu_done(ip, ENOTCONN);
+			else
+				ic->ic_receive(ip);
+		}
+
+		SOCKBUF_LOCK(sb);
 	}
+out:
+	/*
+	 * Since ic_disconnecting is set before the SOCKBUF_MTX is
+	 * locked in icl_cxgbei_conn_close, the loop above can exit
+	 * before icl_cxgbei_conn_close can lock SOCKBUF_MTX and block
+	 * waiting for the thread exit.
+	 */
+	while (!icc->rx_exiting)
+		mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0);
+	SOCKBUF_UNLOCK(sb);
 
-	MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
-	mtx_unlock(&cwt->cwt_lock);
 	kthread_exit();
 }
 
@@ -678,7 +687,7 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
 	STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next);
 	if (!icc->tx_active) {
 		icc->tx_active = true;
-		cwt_queue_for_tx(icc);
+		wakeup(&icc->tx_active);
 	}
 }
 
@@ -740,10 +749,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket *so, int sspace,
 	rs = max(recvspace, rspace);
 
 	error = soreserve(so, ss, rs);
-	if (error != 0) {
-		icl_cxgbei_conn_close(ic);
+	if (error != 0)
 		return (error);
-	}
 	SOCKBUF_LOCK(&so->so_snd);
 	so->so_snd.sb_flags |= SB_AUTOSIZE;
 	SOCKBUF_UNLOCK(&so->so_snd);
@@ -761,10 +768,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket *so, int sspace,
 	opt.sopt_val = &one;
 	opt.sopt_valsize = sizeof(one);
 	error = sosetopt(so, &opt);
-	if (error != 0) {
-		icl_cxgbei_conn_close(ic);
+	if (error != 0)
 		return (error);
-	}
 
 	return (0);
 }
@@ -934,8 +939,10 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
 	fa.sc = NULL;
 	fa.so = so;
 	t4_iterate(find_offload_adapter, &fa);
-	if (fa.sc == NULL)
-		return (EINVAL);
+	if (fa.sc == NULL) {
+		error = EINVAL;
+		goto out;
+	}
 	icc->sc = fa.sc;
 
 	max_rx_pdu_len = ISCSI_BHS_SIZE + ic->ic_max_recv_data_segment_length;
@@ -954,7 +961,8 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
 	tp = intotcpcb(inp);
 	if (inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) {
 		INP_WUNLOCK(inp);
-		return (EBUSY);
+		error = ENOTCONN;
+		goto out;
 	}
 
 	/*
@@ -968,11 +976,11 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
 
 	if (ulp_mode(toep) != ULP_MODE_NONE) {
 		INP_WUNLOCK(inp);
-		return (EINVAL);
+		error = EINVAL;
+		goto out;
 	}
 
 	icc->toep = toep;
-	icc->cwt = cxgbei_select_worker_thread(icc);
 
 	icc->ulp_submode = 0;
 	if (ic->ic_header_crc32c)
@@ -996,7 +1004,21 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
 	set_ulp_mode_iscsi(icc->sc, toep, icc->ulp_submode);
 	INP_WUNLOCK(inp);
 
-	return (icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len));
+	error = kthread_add(icl_cxgbei_tx_main, icc, NULL, &icc->tx_thread, 0,
+	    0, "%stx (cxgbei)", ic->ic_name);
+	if (error != 0)
+		goto out;
+
+	error = kthread_add(icl_cxgbei_rx_main, icc, NULL, &icc->rx_thread, 0,
+	    0, "%srx (cxgbei)", ic->ic_name);
+	if (error != 0)
+		goto out;
+
+	error = icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len);
+out:
+	if (error != 0)
+		icl_cxgbei_conn_close(ic);
+	return (error);
 }
 
 void
@@ -1027,60 +1049,58 @@ icl_cxgbei_conn_close(struct icl_conn *ic)
 	    ("destroying session with %d outstanding PDUs",
 	     ic->ic_outstanding_pdus));
 #endif
-	ICL_CONN_UNLOCK(ic);
 
 	CTR3(KTR_CXGBE, "%s: tid %d, icc %p", __func__, toep ? toep->tid : -1,
 	    icc);
+
+	/*
+	 * Wait for the transmit thread to stop processing
+	 * this connection.
+	 */
+	if (icc->tx_thread != NULL) {
+		wakeup(&icc->tx_active);
+		mtx_sleep(icc->tx_thread, ic->ic_lock, 0, "conclo", 0);
+	}
+
+	/* Discard PDUs queued for TX. */
+	while (!STAILQ_EMPTY(&icc->sent_pdus)) {
+		ip = STAILQ_FIRST(&icc->sent_pdus);
+		STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
+		icl_cxgbei_pdu_done(ip, ENOTCONN);
+	}
+	ICL_CONN_UNLOCK(ic);
+
 	inp = sotoinpcb(so);
 	sb = &so->so_rcv;
+
+	/*
+	 * Wait for the receive thread to stop processing this
+	 * connection.
+	 */
+	SOCKBUF_LOCK(sb);
+	if (icc->rx_thread != NULL) {
+		icc->rx_exiting = true;
+		wakeup(&icc->rx_active);
+		mtx_sleep(icc->rx_thread, SOCKBUF_MTX(sb), 0, "conclo", 0);
+	}
+
+	/*
+	 * Discard received PDUs not passed to the iSCSI layer.
+	 */
+	while (!STAILQ_EMPTY(&icc->rcvd_pdus)) {
+		ip = STAILQ_FIRST(&icc->rcvd_pdus);
+		STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next);
+		icl_cxgbei_pdu_done(ip, ENOTCONN);
+	}
+	SOCKBUF_UNLOCK(sb);
+
 	INP_WLOCK(inp);
 	if (toep != NULL) {	/* NULL if connection was never offloaded. */
 		toep->ulpcb = NULL;
 
-		/*
-		 * Wait for the cwt threads to stop processing this
-		 * connection for transmit.
-		 */
-		while (icc->tx_active)
-			rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1);
-
-		/* Discard PDUs queued for TX. */
-		while (!STAILQ_EMPTY(&icc->sent_pdus)) {
-			ip = STAILQ_FIRST(&icc->sent_pdus);
-			STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
-			icl_cxgbei_pdu_done(ip, ENOTCONN);
-		}
+		/* Discard mbufs queued for TX. */
 		mbufq_drain(&toep->ulp_pduq);
 
-		/*
-		 * Wait for the cwt threads to stop processing this
-		 * connection for receive.
-		 */
-		SOCKBUF_LOCK(sb);
-		if (icc->rx_active) {
-			volatile bool *p = &icc->rx_active;
-
-			SOCKBUF_UNLOCK(sb);
-			INP_WUNLOCK(inp);
-
-			while (*p)
-				pause("conclo", 1);
-
-			INP_WLOCK(inp);
-			SOCKBUF_LOCK(sb);
-		}
-
-		/*
-		 * Discard received PDUs not passed to the iSCSI
-		 * layer.
-		 */
-		while (!STAILQ_EMPTY(&icc->rcvd_pdus)) {
-			ip = STAILQ_FIRST(&icc->rcvd_pdus);
-			STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next);
-			icl_cxgbei_pdu_done(ip, ENOTCONN);
-		}
-		SOCKBUF_UNLOCK(sb);
-
 		/*
 		 * Grab a reference to use when waiting for the final
 		 * CPL to be received.  If toep->inp is NULL, then



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?202204292312.23TNCjlb045071>