Skip site navigation (1)Skip section navigation (2)
Date:      Sun, 22 Jun 2014 18:03:53 +0000 (UTC)
From:      Alexander Motin <mav@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-stable@freebsd.org, svn-src-stable-10@freebsd.org
Subject:   svn commit: r267742 - stable/10/sys/rpc
Message-ID:  <201406221803.s5MI3r93082444@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: mav
Date: Sun Jun 22 18:03:53 2014
New Revision: 267742
URL: http://svnweb.freebsd.org/changeset/base/267742

Log:
  MFC r267228:
  Split RPC pool threads into number of smaller semi-isolated groups.
  
  Old design with unified thread pool was good from the point of thread
  utilization.  But single pool-wide mutex became huge congestion point
  for systems with many CPUs.  To reduce the congestion create several
  thread groups within a pool (one group for every 6 CPUs and 12 threads),
  each group with own mutex.  Each connection during its registration is
  assigned to one of the groups in round-robin fashion.  File affinify
  code may still move requests between the groups, but otherwise groups
  are self-contained.

Modified:
  stable/10/sys/rpc/svc.c
  stable/10/sys/rpc/svc.h
  stable/10/sys/rpc/svc_generic.c
Directory Properties:
  stable/10/   (props changed)

Modified: stable/10/sys/rpc/svc.c
==============================================================================
--- stable/10/sys/rpc/svc.c	Sun Jun 22 18:02:39 2014	(r267741)
+++ stable/10/sys/rpc/svc.c	Sun Jun 22 18:03:53 2014	(r267742)
@@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
 #include <sys/queue.h>
 #include <sys/socketvar.h>
 #include <sys/systm.h>
+#include <sys/smp.h>
 #include <sys/sx.h>
 #include <sys/ucred.h>
 
@@ -70,7 +71,7 @@ __FBSDID("$FreeBSD$");
 
 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
     char *);
-static void svc_new_thread(SVCPOOL *pool);
+static void svc_new_thread(SVCGROUP *grp);
 static void xprt_unregister_locked(SVCXPRT *xprt);
 static void svc_change_space_used(SVCPOOL *pool, int delta);
 static bool_t svc_request_space_available(SVCPOOL *pool);
@@ -79,11 +80,14 @@ static bool_t svc_request_space_availabl
 
 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
+static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
 
 SVCPOOL*
 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
 {
 	SVCPOOL *pool;
+	SVCGROUP *grp;
+	int g;
 
 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
 	
@@ -91,15 +95,22 @@ svcpool_create(const char *name, struct 
 	pool->sp_name = name;
 	pool->sp_state = SVCPOOL_INIT;
 	pool->sp_proc = NULL;
-	TAILQ_INIT(&pool->sp_xlist);
-	TAILQ_INIT(&pool->sp_active);
 	TAILQ_INIT(&pool->sp_callouts);
 	TAILQ_INIT(&pool->sp_lcallouts);
-	LIST_INIT(&pool->sp_threads);
-	LIST_INIT(&pool->sp_idlethreads);
 	pool->sp_minthreads = 1;
 	pool->sp_maxthreads = 1;
-	pool->sp_threadcount = 0;
+	pool->sp_groupcount = 1;
+	for (g = 0; g < SVC_MAXGROUPS; g++) {
+		grp = &pool->sp_groups[g];
+		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
+		grp->sg_pool = pool;
+		grp->sg_state = SVCPOOL_ACTIVE;
+		TAILQ_INIT(&grp->sg_xlist);
+		TAILQ_INIT(&grp->sg_active);
+		LIST_INIT(&grp->sg_idlethreads);
+		grp->sg_minthreads = 1;
+		grp->sg_maxthreads = 1;
+	}
 
 	/*
 	 * Don't use more than a quarter of mbuf clusters or more than
@@ -114,12 +125,19 @@ svcpool_create(const char *name, struct 
 	if (sysctl_base) {
 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
-		    pool, 0, svcpool_minthread_sysctl, "I", "");
+		    pool, 0, svcpool_minthread_sysctl, "I",
+		    "Minimal number of threads");
 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
-		    pool, 0, svcpool_maxthread_sysctl, "I", "");
+		    pool, 0, svcpool_maxthread_sysctl, "I",
+		    "Maximal number of threads");
+		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+		    "threads", CTLTYPE_INT | CTLFLAG_RD,
+		    pool, 0, svcpool_threads_sysctl, "I",
+		    "Current number of threads");
 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
-		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
+		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
+		    "Number of thread groups");
 
 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
 		    "request_space_used", CTLFLAG_RD,
@@ -158,20 +176,29 @@ svcpool_create(const char *name, struct 
 void
 svcpool_destroy(SVCPOOL *pool)
 {
+	SVCGROUP *grp;
 	SVCXPRT *xprt, *nxprt;
 	struct svc_callout *s;
 	struct svc_loss_callout *sl;
 	struct svcxprt_list cleanup;
+	int g;
 
 	TAILQ_INIT(&cleanup);
-	mtx_lock(&pool->sp_lock);
 
-	while (TAILQ_FIRST(&pool->sp_xlist)) {
-		xprt = TAILQ_FIRST(&pool->sp_xlist);
-		xprt_unregister_locked(xprt);
-		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
+	for (g = 0; g < SVC_MAXGROUPS; g++) {
+		grp = &pool->sp_groups[g];
+		mtx_lock(&grp->sg_lock);
+		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
+			xprt_unregister_locked(xprt);
+			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
+		}
+		mtx_unlock(&grp->sg_lock);
+	}
+	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
+		SVC_RELEASE(xprt);
 	}
 
+	mtx_lock(&pool->sp_lock);
 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
 		mtx_unlock(&pool->sp_lock);
 		svc_unreg(pool, s->sc_prog, s->sc_vers);
@@ -184,10 +211,10 @@ svcpool_destroy(SVCPOOL *pool)
 	}
 	mtx_unlock(&pool->sp_lock);
 
-	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
-		SVC_RELEASE(xprt);
+	for (g = 0; g < SVC_MAXGROUPS; g++) {
+		grp = &pool->sp_groups[g];
+		mtx_destroy(&grp->sg_lock);
 	}
-
 	mtx_destroy(&pool->sp_lock);
 
 	if (pool->sp_rcache)
@@ -197,14 +224,23 @@ svcpool_destroy(SVCPOOL *pool)
 	free(pool, M_RPC);
 }
 
-static bool_t
-svcpool_active(SVCPOOL *pool)
+/*
+ * Sysctl handler to get the present thread count on a pool
+ */
+static int
+svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
 {
-	enum svcpool_state state = pool->sp_state;
+	SVCPOOL *pool;
+	int threads, error, g;
 
-	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
-		return (FALSE);
-	return (TRUE);
+	pool = oidp->oid_arg1;
+	threads = 0;
+	mtx_lock(&pool->sp_lock);
+	for (g = 0; g < pool->sp_groupcount; g++)
+		threads += pool->sp_groups[g].sg_threadcount;
+	mtx_unlock(&pool->sp_lock);
+	error = sysctl_handle_int(oidp, &threads, 0, req);
+	return (error);
 }
 
 /*
@@ -214,7 +250,7 @@ static int
 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
 {
 	SVCPOOL *pool;
-	int newminthreads, error, n;
+	int newminthreads, error, g;
 
 	pool = oidp->oid_arg1;
 	newminthreads = pool->sp_minthreads;
@@ -223,21 +259,11 @@ svcpool_minthread_sysctl(SYSCTL_HANDLER_
 		if (newminthreads > pool->sp_maxthreads)
 			return (EINVAL);
 		mtx_lock(&pool->sp_lock);
-		if (newminthreads > pool->sp_minthreads
-		    && svcpool_active(pool)) {
-			/*
-			 * If the pool is running and we are
-			 * increasing, create some more threads now.
-			 */
-			n = newminthreads - pool->sp_threadcount;
-			if (n > 0) {
-				mtx_unlock(&pool->sp_lock);
-				while (n--)
-					svc_new_thread(pool);
-				mtx_lock(&pool->sp_lock);
-			}
-		}
 		pool->sp_minthreads = newminthreads;
+		for (g = 0; g < pool->sp_groupcount; g++) {
+			pool->sp_groups[g].sg_minthreads = max(1,
+			    pool->sp_minthreads / pool->sp_groupcount);
+		}
 		mtx_unlock(&pool->sp_lock);
 	}
 	return (error);
@@ -250,8 +276,7 @@ static int
 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
 {
 	SVCPOOL *pool;
-	SVCTHREAD *st;
-	int newmaxthreads, error;
+	int newmaxthreads, error, g;
 
 	pool = oidp->oid_arg1;
 	newmaxthreads = pool->sp_maxthreads;
@@ -260,17 +285,11 @@ svcpool_maxthread_sysctl(SYSCTL_HANDLER_
 		if (newmaxthreads < pool->sp_minthreads)
 			return (EINVAL);
 		mtx_lock(&pool->sp_lock);
-		if (newmaxthreads < pool->sp_maxthreads
-		    && svcpool_active(pool)) {
-			/*
-			 * If the pool is running and we are
-			 * decreasing, wake up some idle threads to
-			 * encourage them to exit.
-			 */
-			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
-				cv_signal(&st->st_cond);
-		}
 		pool->sp_maxthreads = newmaxthreads;
+		for (g = 0; g < pool->sp_groupcount; g++) {
+			pool->sp_groups[g].sg_maxthreads = max(1,
+			    pool->sp_maxthreads / pool->sp_groupcount);
+		}
 		mtx_unlock(&pool->sp_lock);
 	}
 	return (error);
@@ -283,13 +302,17 @@ void
 xprt_register(SVCXPRT *xprt)
 {
 	SVCPOOL *pool = xprt->xp_pool;
+	SVCGROUP *grp;
+	int g;
 
 	SVC_ACQUIRE(xprt);
-	mtx_lock(&pool->sp_lock);
+	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
+	xprt->xp_group = grp = &pool->sp_groups[g];
+	mtx_lock(&grp->sg_lock);
 	xprt->xp_registered = TRUE;
 	xprt->xp_active = FALSE;
-	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
-	mtx_unlock(&pool->sp_lock);
+	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
+	mtx_unlock(&grp->sg_lock);
 }
 
 /*
@@ -300,29 +323,29 @@ xprt_register(SVCXPRT *xprt)
 static void
 xprt_unregister_locked(SVCXPRT *xprt)
 {
-	SVCPOOL *pool = xprt->xp_pool;
+	SVCGROUP *grp = xprt->xp_group;
 
-	mtx_assert(&pool->sp_lock, MA_OWNED);
+	mtx_assert(&grp->sg_lock, MA_OWNED);
 	KASSERT(xprt->xp_registered == TRUE,
 	    ("xprt_unregister_locked: not registered"));
 	xprt_inactive_locked(xprt);
-	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
+	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
 	xprt->xp_registered = FALSE;
 }
 
 void
 xprt_unregister(SVCXPRT *xprt)
 {
-	SVCPOOL *pool = xprt->xp_pool;
+	SVCGROUP *grp = xprt->xp_group;
 
-	mtx_lock(&pool->sp_lock);
+	mtx_lock(&grp->sg_lock);
 	if (xprt->xp_registered == FALSE) {
 		/* Already unregistered by another thread */
-		mtx_unlock(&pool->sp_lock);
+		mtx_unlock(&grp->sg_lock);
 		return;
 	}
 	xprt_unregister_locked(xprt);
-	mtx_unlock(&pool->sp_lock);
+	mtx_unlock(&grp->sg_lock);
 
 	SVC_RELEASE(xprt);
 }
@@ -333,11 +356,11 @@ xprt_unregister(SVCXPRT *xprt)
 static int
 xprt_assignthread(SVCXPRT *xprt)
 {
-	SVCPOOL *pool = xprt->xp_pool;
+	SVCGROUP *grp = xprt->xp_group;
 	SVCTHREAD *st;
 
-	mtx_assert(&pool->sp_lock, MA_OWNED);
-	st = LIST_FIRST(&pool->sp_idlethreads);
+	mtx_assert(&grp->sg_lock, MA_OWNED);
+	st = LIST_FIRST(&grp->sg_idlethreads);
 	if (st) {
 		LIST_REMOVE(st, st_ilink);
 		SVC_ACQUIRE(xprt);
@@ -354,10 +377,10 @@ xprt_assignthread(SVCXPRT *xprt)
 		 * from a socket upcall). Don't create more
 		 * than one thread per second.
 		 */
-		if (pool->sp_state == SVCPOOL_ACTIVE
-		    && pool->sp_lastcreatetime < time_uptime
-		    && pool->sp_threadcount < pool->sp_maxthreads) {
-			pool->sp_state = SVCPOOL_THREADWANTED;
+		if (grp->sg_state == SVCPOOL_ACTIVE
+		    && grp->sg_lastcreatetime < time_uptime
+		    && grp->sg_threadcount < grp->sg_maxthreads) {
+			grp->sg_state = SVCPOOL_THREADWANTED;
 		}
 	}
 	return (FALSE);
@@ -366,40 +389,40 @@ xprt_assignthread(SVCXPRT *xprt)
 void
 xprt_active(SVCXPRT *xprt)
 {
-	SVCPOOL *pool = xprt->xp_pool;
+	SVCGROUP *grp = xprt->xp_group;
 
-	mtx_lock(&pool->sp_lock);
+	mtx_lock(&grp->sg_lock);
 
 	if (!xprt->xp_registered) {
 		/*
 		 * Race with xprt_unregister - we lose.
 		 */
-		mtx_unlock(&pool->sp_lock);
+		mtx_unlock(&grp->sg_lock);
 		return;
 	}
 
 	if (!xprt->xp_active) {
 		xprt->xp_active = TRUE;
 		if (xprt->xp_thread == NULL) {
-			if (!svc_request_space_available(pool) ||
+			if (!svc_request_space_available(xprt->xp_pool) ||
 			    !xprt_assignthread(xprt))
-				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
+				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
 				    xp_alink);
 		}
 	}
 
-	mtx_unlock(&pool->sp_lock);
+	mtx_unlock(&grp->sg_lock);
 }
 
 void
 xprt_inactive_locked(SVCXPRT *xprt)
 {
-	SVCPOOL *pool = xprt->xp_pool;
+	SVCGROUP *grp = xprt->xp_group;
 
-	mtx_assert(&pool->sp_lock, MA_OWNED);
+	mtx_assert(&grp->sg_lock, MA_OWNED);
 	if (xprt->xp_active) {
 		if (xprt->xp_thread == NULL)
-			TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
 		xprt->xp_active = FALSE;
 	}
 }
@@ -407,11 +430,11 @@ xprt_inactive_locked(SVCXPRT *xprt)
 void
 xprt_inactive(SVCXPRT *xprt)
 {
-	SVCPOOL *pool = xprt->xp_pool;
+	SVCGROUP *grp = xprt->xp_group;
 
-	mtx_lock(&pool->sp_lock);
+	mtx_lock(&grp->sg_lock);
 	xprt_inactive_locked(xprt);
-	mtx_unlock(&pool->sp_lock);
+	mtx_unlock(&grp->sg_lock);
 }
 
 /*
@@ -991,14 +1014,14 @@ svc_executereq(struct svc_req *rqstp)
 }
 
 static void
-svc_checkidle(SVCPOOL *pool)
+svc_checkidle(SVCGROUP *grp)
 {
 	SVCXPRT *xprt, *nxprt;
 	time_t timo;
 	struct svcxprt_list cleanup;
 
 	TAILQ_INIT(&cleanup);
-	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
+	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
 		/*
 		 * Only some transports have idle timers. Don't time
 		 * something out which is just waking up.
@@ -1013,27 +1036,31 @@ svc_checkidle(SVCPOOL *pool)
 		}
 	}
 
-	mtx_unlock(&pool->sp_lock);
+	mtx_unlock(&grp->sg_lock);
 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
 		SVC_RELEASE(xprt);
 	}
-	mtx_lock(&pool->sp_lock);
-
+	mtx_lock(&grp->sg_lock);
 }
 
 static void
 svc_assign_waiting_sockets(SVCPOOL *pool)
 {
+	SVCGROUP *grp;
 	SVCXPRT *xprt;
+	int g;
 
-	mtx_lock(&pool->sp_lock);
-	while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
-		if (xprt_assignthread(xprt))
-			TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
-		else
-			break;
+	for (g = 0; g < pool->sp_groupcount; g++) {
+		grp = &pool->sp_groups[g];
+		mtx_lock(&grp->sg_lock);
+		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
+			if (xprt_assignthread(xprt))
+				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
+			else
+				break;
+		}
+		mtx_unlock(&grp->sg_lock);
 	}
-	mtx_unlock(&pool->sp_lock);
 }
 
 static void
@@ -1067,8 +1094,9 @@ svc_request_space_available(SVCPOOL *poo
 }
 
 static void
-svc_run_internal(SVCPOOL *pool, bool_t ismaster)
+svc_run_internal(SVCGROUP *grp, bool_t ismaster)
 {
+	SVCPOOL *pool = grp->sg_pool;
 	SVCTHREAD *st, *stpref;
 	SVCXPRT *xprt;
 	enum xprt_stat stat;
@@ -1083,35 +1111,34 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 	STAILQ_INIT(&st->st_reqs);
 	cv_init(&st->st_cond, "rpcsvc");
 
-	mtx_lock(&pool->sp_lock);
-	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
+	mtx_lock(&grp->sg_lock);
 
 	/*
 	 * If we are a new thread which was spawned to cope with
 	 * increased load, set the state back to SVCPOOL_ACTIVE.
 	 */
-	if (pool->sp_state == SVCPOOL_THREADSTARTING)
-		pool->sp_state = SVCPOOL_ACTIVE;
+	if (grp->sg_state == SVCPOOL_THREADSTARTING)
+		grp->sg_state = SVCPOOL_ACTIVE;
 
-	while (pool->sp_state != SVCPOOL_CLOSING) {
+	while (grp->sg_state != SVCPOOL_CLOSING) {
 		/*
 		 * Create new thread if requested.
 		 */
-		if (pool->sp_state == SVCPOOL_THREADWANTED) {
-			pool->sp_state = SVCPOOL_THREADSTARTING;
-			pool->sp_lastcreatetime = time_uptime;
-			mtx_unlock(&pool->sp_lock);
-			svc_new_thread(pool);
-			mtx_lock(&pool->sp_lock);
+		if (grp->sg_state == SVCPOOL_THREADWANTED) {
+			grp->sg_state = SVCPOOL_THREADSTARTING;
+			grp->sg_lastcreatetime = time_uptime;
+			mtx_unlock(&grp->sg_lock);
+			svc_new_thread(grp);
+			mtx_lock(&grp->sg_lock);
 			continue;
 		}
 
 		/*
 		 * Check for idle transports once per second.
 		 */
-		if (time_uptime > pool->sp_lastidlecheck) {
-			pool->sp_lastidlecheck = time_uptime;
-			svc_checkidle(pool);
+		if (time_uptime > grp->sg_lastidlecheck) {
+			grp->sg_lastidlecheck = time_uptime;
+			svc_checkidle(grp);
 		}
 
 		xprt = st->st_xprt;
@@ -1119,7 +1146,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 			/*
 			 * Enforce maxthreads count.
 			 */
-			if (pool->sp_threadcount > pool->sp_maxthreads)
+			if (grp->sg_threadcount > grp->sg_maxthreads)
 				break;
 
 			/*
@@ -1128,22 +1155,22 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 			 * by a thread.
 			 */
 			if (svc_request_space_available(pool) &&
-			    (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
-				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
+				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
 				SVC_ACQUIRE(xprt);
 				xprt->xp_thread = st;
 				st->st_xprt = xprt;
 				continue;
 			}
 
-			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
+			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
 			if (ismaster || (!ismaster &&
-			    pool->sp_threadcount > pool->sp_minthreads))
+			    grp->sg_threadcount > grp->sg_minthreads))
 				error = cv_timedwait_sig(&st->st_cond,
-				    &pool->sp_lock, 5 * hz);
+				    &grp->sg_lock, 5 * hz);
 			else
 				error = cv_wait_sig(&st->st_cond,
-				    &pool->sp_lock);
+				    &grp->sg_lock);
 			if (st->st_xprt == NULL)
 				LIST_REMOVE(st, st_ilink);
 
@@ -1152,19 +1179,19 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 			 */
 			if (error == EWOULDBLOCK) {
 				if (!ismaster
-				    && (pool->sp_threadcount
-					> pool->sp_minthreads)
+				    && (grp->sg_threadcount
+					> grp->sg_minthreads)
 					&& !st->st_xprt)
 					break;
 			} else if (error) {
-				mtx_unlock(&pool->sp_lock);
+				mtx_unlock(&grp->sg_lock);
 				svc_exit(pool);
-				mtx_lock(&pool->sp_lock);
+				mtx_lock(&grp->sg_lock);
 				break;
 			}
 			continue;
 		}
-		mtx_unlock(&pool->sp_lock);
+		mtx_unlock(&grp->sg_lock);
 
 		/*
 		 * Drain the transport socket and queue up any RPCs.
@@ -1196,7 +1223,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 				}
 			}
 		} while (rqstp == NULL && stat == XPRT_MOREREQS
-		    && pool->sp_state != SVCPOOL_CLOSING);
+		    && grp->sg_state != SVCPOOL_CLOSING);
 
 		/*
 		 * Move this transport to the end of the active list to
@@ -1204,16 +1231,16 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 		 * If this was the last queued request, svc_getreq will end
 		 * up calling xprt_inactive to remove from the active list.
 		 */
-		mtx_lock(&pool->sp_lock);
+		mtx_lock(&grp->sg_lock);
 		xprt->xp_thread = NULL;
 		st->st_xprt = NULL;
 		if (xprt->xp_active) {
 			if (!svc_request_space_available(pool) ||
 			    !xprt_assignthread(xprt))
-				TAILQ_INSERT_TAIL(&pool->sp_active,
+				TAILQ_INSERT_TAIL(&grp->sg_active,
 				    xprt, xp_alink);
 		}
-		mtx_unlock(&pool->sp_lock);
+		mtx_unlock(&grp->sg_lock);
 		SVC_RELEASE(xprt);
 
 		/*
@@ -1230,7 +1257,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 		}
 		mtx_unlock(&st->st_lock);
 		svc_change_space_used(pool, -sz);
-		mtx_lock(&pool->sp_lock);
+		mtx_lock(&grp->sg_lock);
 	}
 
 	if (st->st_xprt) {
@@ -1238,46 +1265,43 @@ svc_run_internal(SVCPOOL *pool, bool_t i
 		st->st_xprt = NULL;
 		SVC_RELEASE(xprt);
 	}
-
 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
-	LIST_REMOVE(st, st_link);
-	pool->sp_threadcount--;
-
-	mtx_unlock(&pool->sp_lock);
-
 	mtx_destroy(&st->st_lock);
 	cv_destroy(&st->st_cond);
 	mem_free(st, sizeof(*st));
 
+	grp->sg_threadcount--;
 	if (!ismaster)
-		wakeup(pool);
+		wakeup(grp);
+	mtx_unlock(&grp->sg_lock);
 }
 
 static void
 svc_thread_start(void *arg)
 {
 
-	svc_run_internal((SVCPOOL *) arg, FALSE);
+	svc_run_internal((SVCGROUP *) arg, FALSE);
 	kthread_exit();
 }
 
 static void
-svc_new_thread(SVCPOOL *pool)
+svc_new_thread(SVCGROUP *grp)
 {
+	SVCPOOL *pool = grp->sg_pool;
 	struct thread *td;
 
-	pool->sp_threadcount++;
-	kthread_add(svc_thread_start, pool,
-	    pool->sp_proc, &td, 0, 0,
+	grp->sg_threadcount++;
+	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
 	    "%s: service", pool->sp_name);
 }
 
 void
 svc_run(SVCPOOL *pool)
 {
-	int i;
+	int g, i;
 	struct proc *p;
 	struct thread *td;
+	SVCGROUP *grp;
 
 	p = curproc;
 	td = curthread;
@@ -1285,35 +1309,56 @@ svc_run(SVCPOOL *pool)
 	    "%s: master", pool->sp_name);
 	pool->sp_state = SVCPOOL_ACTIVE;
 	pool->sp_proc = p;
-	pool->sp_lastcreatetime = time_uptime;
-	pool->sp_threadcount = 1;
 
-	for (i = 1; i < pool->sp_minthreads; i++) {
-		svc_new_thread(pool);
+	/* Choose group count based on number of threads and CPUs. */
+	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
+	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
+	for (g = 0; g < pool->sp_groupcount; g++) {
+		grp = &pool->sp_groups[g];
+		grp->sg_minthreads = max(1,
+		    pool->sp_minthreads / pool->sp_groupcount);
+		grp->sg_maxthreads = max(1,
+		    pool->sp_maxthreads / pool->sp_groupcount);
+		grp->sg_lastcreatetime = time_uptime;
+	}
+
+	/* Starting threads */
+	for (g = 0; g < pool->sp_groupcount; g++) {
+		grp = &pool->sp_groups[g];
+		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
+			svc_new_thread(grp);
+	}
+	pool->sp_groups[0].sg_threadcount++;
+	svc_run_internal(&pool->sp_groups[0], TRUE);
+
+	/* Waiting for threads to stop. */
+	for (g = 0; g < pool->sp_groupcount; g++) {
+		grp = &pool->sp_groups[g];
+		mtx_lock(&grp->sg_lock);
+		while (grp->sg_threadcount > 0)
+			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
+		mtx_unlock(&grp->sg_lock);
 	}
-
-	svc_run_internal(pool, TRUE);
-
-	mtx_lock(&pool->sp_lock);
-	while (pool->sp_threadcount > 0)
-		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
-	mtx_unlock(&pool->sp_lock);
 }
 
 void
 svc_exit(SVCPOOL *pool)
 {
+	SVCGROUP *grp;
 	SVCTHREAD *st;
+	int g;
 
-	mtx_lock(&pool->sp_lock);
-
-	if (pool->sp_state != SVCPOOL_CLOSING) {
-		pool->sp_state = SVCPOOL_CLOSING;
-		LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
-			cv_signal(&st->st_cond);
+	pool->sp_state = SVCPOOL_CLOSING;
+	for (g = 0; g < pool->sp_groupcount; g++) {
+		grp = &pool->sp_groups[g];
+		mtx_lock(&grp->sg_lock);
+		if (grp->sg_state != SVCPOOL_CLOSING) {
+			grp->sg_state = SVCPOOL_CLOSING;
+			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
+				cv_signal(&st->st_cond);
+		}
+		mtx_unlock(&grp->sg_lock);
 	}
-
-	mtx_unlock(&pool->sp_lock);
 }
 
 bool_t

Modified: stable/10/sys/rpc/svc.h
==============================================================================
--- stable/10/sys/rpc/svc.h	Sun Jun 22 18:02:39 2014	(r267741)
+++ stable/10/sys/rpc/svc.h	Sun Jun 22 18:03:53 2014	(r267742)
@@ -137,6 +137,7 @@ struct xp_ops2 {
 
 #ifdef _KERNEL
 struct __rpc_svcpool;
+struct __rpc_svcgroup;
 struct __rpc_svcthread;
 #endif
 
@@ -150,6 +151,7 @@ typedef struct __rpc_svcxprt {
 	volatile u_int	xp_refs;
 	struct sx	xp_lock;
 	struct __rpc_svcpool *xp_pool;  /* owning pool (see below) */
+	struct __rpc_svcgroup *xp_group; /* owning group (see below) */
 	TAILQ_ENTRY(__rpc_svcxprt) xp_link;
 	TAILQ_ENTRY(__rpc_svcxprt) xp_alink;
 	bool_t		xp_registered;	/* xprt_register has been called */
@@ -245,8 +247,6 @@ struct svc_loss_callout {
 };
 TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
 
-struct __rpc_svcthread;
-
 /*
  * Service request
  */
@@ -296,7 +296,6 @@ typedef struct __rpc_svcthread {
 	SVCXPRT			*st_xprt; /* transport we are processing */
 	struct svc_reqlist	st_reqs;  /* RPC requests to execute */
 	struct cv		st_cond; /* sleeping for work */
-	LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */
 	LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */
 	LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */
 	int		st_p2;		/* application workspace */
@@ -305,6 +304,36 @@ typedef struct __rpc_svcthread {
 LIST_HEAD(svcthread_list, __rpc_svcthread);
 
 /*
+ * A thread group contain all information needed to assign subset of
+ * transports to subset of threads.  On systems with many CPUs and many
+ * threads that allows to reduce lock congestion and improve performance.
+ * Hundreds of threads on dozens of CPUs sharing the single pool lock do
+ * not scale well otherwise.
+ */
+TAILQ_HEAD(svcxprt_list, __rpc_svcxprt);
+enum svcpool_state {
+	SVCPOOL_INIT,		/* svc_run not called yet */
+	SVCPOOL_ACTIVE,		/* normal running state */
+	SVCPOOL_THREADWANTED,	/* new service thread requested */
+	SVCPOOL_THREADSTARTING,	/* new service thread started */
+	SVCPOOL_CLOSING		/* svc_exit called */
+};
+typedef struct __rpc_svcgroup {
+	struct mtx_padalign sg_lock;	/* protect the thread/req lists */
+	struct __rpc_svcpool	*sg_pool;
+	enum svcpool_state sg_state;	/* current pool state */
+	struct svcxprt_list sg_xlist;	/* all transports in the group */
+	struct svcxprt_list sg_active;	/* transports needing service */
+	struct svcthread_list sg_idlethreads; /* idle service threads */
+
+	int		sg_minthreads;	/* minimum service thread count */
+	int		sg_maxthreads;	/* maximum service thread count */
+	int		sg_threadcount; /* current service thread count */
+	time_t		sg_lastcreatetime; /* when we last started a thread */
+	time_t		sg_lastidlecheck;  /* when we last checked idle transports */
+} SVCGROUP;
+
+/*
  * In the kernel, we can't use global variables to store lists of
  * transports etc. since otherwise we could not have two unrelated RPC
  * services running, each on its own thread. We solve this by
@@ -316,32 +345,18 @@ LIST_HEAD(svcthread_list, __rpc_svcthrea
  * this to support something similar to the Solaris multi-threaded RPC
  * server.
  */
-TAILQ_HEAD(svcxprt_list, __rpc_svcxprt);
-enum svcpool_state {
-	SVCPOOL_INIT,		/* svc_run not called yet */
-	SVCPOOL_ACTIVE,		/* normal running state */
-	SVCPOOL_THREADWANTED,	/* new service thread requested */
-	SVCPOOL_THREADSTARTING,	/* new service thread started */
-	SVCPOOL_CLOSING		/* svc_exit called */
-};
 typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *);
 typedef void pool_done_fn(SVCTHREAD *, struct svc_req *);
+#define	SVC_MAXGROUPS	16
 typedef struct __rpc_svcpool {
 	struct mtx_padalign sp_lock;	/* protect the transport lists */
 	const char	*sp_name;	/* pool name (e.g. "nfsd", "NLM" */
 	enum svcpool_state sp_state;	/* current pool state */
 	struct proc	*sp_proc;	/* process which is in svc_run */
-	struct svcxprt_list sp_xlist;	/* all transports in the pool */
-	struct svcxprt_list sp_active;	/* transports needing service */
 	struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
 	struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
-	struct svcthread_list sp_threads; /* service threads */
-	struct svcthread_list sp_idlethreads; /* idle service threads */
 	int		sp_minthreads;	/* minimum service thread count */
 	int		sp_maxthreads;	/* maximum service thread count */
-	int		sp_threadcount; /* current service thread count */
-	time_t		sp_lastcreatetime; /* when we last started a thread */
-	time_t		sp_lastidlecheck;  /* when we last checked idle transports */
 
 	/*
 	 * Hooks to allow an application to control request to thread
@@ -364,6 +379,10 @@ typedef struct __rpc_svcpool {
 
 	struct replay_cache *sp_rcache; /* optional replay cache */
 	struct sysctl_ctx_list sp_sysctl;
+
+	int		sp_groupcount;	/* Number of groups in the pool. */
+	int		sp_nextgroup;	/* Next group to assign port. */
+	SVCGROUP	sp_groups[SVC_MAXGROUPS]; /* Thread/port groups. */
 } SVCPOOL;
 
 #else

Modified: stable/10/sys/rpc/svc_generic.c
==============================================================================
--- stable/10/sys/rpc/svc_generic.c	Sun Jun 22 18:02:39 2014	(r267741)
+++ stable/10/sys/rpc/svc_generic.c	Sun Jun 22 18:03:53 2014	(r267742)
@@ -86,7 +86,8 @@ svc_create(
 	rpcvers_t versnum,		/* Version number */
 	const char *nettype)		/* Networktype token */
 {
-	int num = 0;
+	int g, num = 0;
+	SVCGROUP *grp;
 	SVCXPRT *xprt;
 	struct netconfig *nconf;
 	void *handle;
@@ -96,11 +97,14 @@ svc_create(
 		return (0);
 	}
 	while ((nconf = __rpc_getconf(handle)) != NULL) {
-		mtx_lock(&pool->sp_lock);
-		TAILQ_FOREACH(xprt, &pool->sp_xlist, xp_link) {
-			if (strcmp(xprt->xp_netid, nconf->nc_netid) == 0) {
+		for (g = 0; g < SVC_MAXGROUPS; g++) {
+			grp = &pool->sp_groups[g];
+			mtx_lock(&grp->sg_lock);
+			TAILQ_FOREACH(xprt, &grp->sg_xlist, xp_link) {
+				if (strcmp(xprt->xp_netid, nconf->nc_netid))
+					continue;
 				/* Found an old one, use it */
-				mtx_unlock(&pool->sp_lock);
+				mtx_unlock(&grp->sg_lock);
 				(void) rpcb_unset(prognum, versnum, nconf);
 				if (svc_reg(xprt, prognum, versnum,
 					dispatch, nconf) == FALSE) {
@@ -108,15 +112,15 @@ svc_create(
 		"svc_create: could not register prog %u vers %u on %s\n",
 					(unsigned)prognum, (unsigned)versnum,
 					 nconf->nc_netid);
-					mtx_lock(&pool->sp_lock);
+					mtx_lock(&grp->sg_lock);
 				} else {
 					num++;
-					mtx_lock(&pool->sp_lock);
+					mtx_lock(&grp->sg_lock);
 					break;
 				}
 			}
+			mtx_unlock(&grp->sg_lock);
 		}
-		mtx_unlock(&pool->sp_lock);
 		if (xprt == NULL) {
 			/* It was not found. Now create a new one */
 			xprt = svc_tp_create(pool, dispatch, prognum, versnum,



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201406221803.s5MI3r93082444>