Skip site navigation (1)Skip section navigation (2)
Date:      Fri, 13 May 2011 10:41:59 -0400
From:      Max Laier <max@love2party.net>
To:        freebsd-current@freebsd.org
Cc:        Andriy Gapon <avg@freebsd.org>
Subject:   Re: proposed smp_rendezvous change
Message-ID:  <201105131041.59981.max@love2party.net>
In-Reply-To: <4DCD357D.6000109@FreeBSD.org>
References:  <4DCD357D.6000109@FreeBSD.org>

next in thread | previous in thread | raw e-mail | index | archive | help
--Boundary-00=_3MUzNu/1qvIuzJc
Content-Type: Text/Plain;
  charset="iso-8859-1"
Content-Transfer-Encoding: 7bit

On Friday 13 May 2011 09:43:25 Andriy Gapon wrote:
> This is a change in vein of what I've been doing in the xcpu branch and
> it's supposed to fix the issue by the recent commit that (probably
> unintentionally) stress-tests smp_rendezvous in TSC code.
> 
> Non-essential changes:
> - ditch initial, and in my opinion useless, pre-setup rendezvous in
> smp_rendezvous_action()
> - shift smp_rv_waiters indexes by one because of the above
> 
> Essential changes (the fix):
> - re-use freed smp_rv_waiters[2] to indicate that a slave/target is really
> fully done with rendezvous (i.e. it's not going to access any members of
> smp_rv_* pseudo-structure)
> - spin on smp_rv_waiters[2] upon _entry_ to smp_rendezvous_cpus() to not
> re-use the smp_rv_* pseudo-structure too early
> 
> Index: sys/kern/subr_smp.c
> ===================================================================
> --- sys/kern/subr_smp.c	(revision 221835)
> +++ sys/kern/subr_smp.c	(working copy)
> @@ -316,19 +316,14 @@
>  	void (*local_action_func)(void*)   = smp_rv_action_func;
>  	void (*local_teardown_func)(void*) = smp_rv_teardown_func;
> 
> -	/* Ensure we have up-to-date values. */
> -	atomic_add_acq_int(&smp_rv_waiters[0], 1);
> -	while (smp_rv_waiters[0] < smp_rv_ncpus)
> -		cpu_spinwait();
> -
>  	/* setup function */
>  	if (local_setup_func != smp_no_rendevous_barrier) {
>  		if (smp_rv_setup_func != NULL)
>  			smp_rv_setup_func(smp_rv_func_arg);
> 
>  		/* spin on entry rendezvous */
> -		atomic_add_int(&smp_rv_waiters[1], 1);
> -		while (smp_rv_waiters[1] < smp_rv_ncpus)
> +		atomic_add_int(&smp_rv_waiters[0], 1);
> +		while (smp_rv_waiters[0] < smp_rv_ncpus)
>                  	cpu_spinwait();
>  	}
> 
> @@ -337,12 +332,16 @@
>  		local_action_func(local_func_arg);
> 
>  	/* spin on exit rendezvous */
> -	atomic_add_int(&smp_rv_waiters[2], 1);
> -	if (local_teardown_func == smp_no_rendevous_barrier)
> +	atomic_add_int(&smp_rv_waiters[1], 1);
> +	if (local_teardown_func == smp_no_rendevous_barrier) {
> +		atomic_add_int(&smp_rv_waiters[2], 1);
>                  return;
> -	while (smp_rv_waiters[2] < smp_rv_ncpus)
> +	}
> +	while (smp_rv_waiters[1] < smp_rv_ncpus)
>  		cpu_spinwait();
> 
> +	atomic_add_int(&smp_rv_waiters[2], 1);
> +
>  	/* teardown function */
>  	if (local_teardown_func != NULL)
>  		local_teardown_func(local_func_arg);
> @@ -377,6 +376,10 @@
>  	/* obtain rendezvous lock */
>  	mtx_lock_spin(&smp_ipi_mtx);
> 
> +	/* Wait for any previous unwaited rendezvous to finish. */
> +	while (atomic_load_acq_int(&smp_rv_waiters[2]) < ncpus)

this ncpus isn't the one you are looking for.  I have a patch of my own that 
is attached.  This might be overdoing it to some extend, but it has been 
running very well on our system for quite some time, which now uses rmlock 
heavily.

The rmlock diff is unrelated, but since I have this diff around for some time 
now ...

> +		cpu_spinwait();
> +
>  	/* set static function pointers */
>  	smp_rv_ncpus = ncpus;
>  	smp_rv_setup_func = setup_func;
> @@ -395,7 +398,7 @@
>  		smp_rendezvous_action();
> 
>  	if (teardown_func == smp_no_rendevous_barrier)
> -		while (atomic_load_acq_int(&smp_rv_waiters[2]) < ncpus)
> +		while (atomic_load_acq_int(&smp_rv_waiters[1]) < ncpus)
>  			cpu_spinwait();
> 
>  	/* release lock */

--Boundary-00=_3MUzNu/1qvIuzJc
Content-Type: text/x-patch;
  charset="ISO-8859-1";
  name="bug71970.diff"
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment;
	filename="bug71970.diff"

diff --git a/src/sys/kern/kern_rmlock.c b/src/sys/kern/kern_rmlock.c
index f91e61a..a30c16b 100644
--- a/src/sys/kern/kern_rmlock.c
+++ b/src/sys/kern/kern_rmlock.c
@@ -154,6 +154,7 @@ rm_tracker_remove(struct pcpu *pc, struct rm_priotracker *tracker)
 static void
 rm_cleanIPI(void *arg)
 {
+	TAILQ_HEAD(,rm_priotracker) tmp_list = TAILQ_HEAD_INITIALIZER(tmp_list);
 	struct pcpu *pc;
 	struct rmlock *rm = arg;
 	struct rm_priotracker *tracker;
@@ -165,12 +166,12 @@ rm_cleanIPI(void *arg)
 		tracker = (struct rm_priotracker *)queue;
 		if (tracker->rmp_rmlock == rm && tracker->rmp_flags == 0) {
 			tracker->rmp_flags = RMPF_ONQUEUE;
-			mtx_lock_spin(&rm_spinlock);
-			LIST_INSERT_HEAD(&rm->rm_activeReaders, tracker,
-			    rmp_qentry);
-			mtx_unlock_spin(&rm_spinlock);
+			TAILQ_INSERT_HEAD(&tmp_list, tracker, rmp_qentry);
 		}
 	}
+	mtx_lock_spin(&rm_spinlock);
+	TAILQ_CONCAT(&rm->rm_activeReaders, &tmp_list, rmp_qentry);
+	mtx_unlock_spin(&rm_spinlock);
 }
 
 CTASSERT((RM_SLEEPABLE & LO_CLASSFLAGS) == RM_SLEEPABLE);
@@ -186,7 +187,7 @@ rm_init_flags(struct rmlock *rm, const char *name, int opts)
 	if (opts & RM_RECURSE)
 		liflags |= LO_RECURSABLE;
 	rm->rm_writecpus = all_cpus;
-	LIST_INIT(&rm->rm_activeReaders);
+	TAILQ_INIT(&rm->rm_activeReaders);
 	if (opts & RM_SLEEPABLE) {
 		liflags |= RM_SLEEPABLE;
 		sx_init_flags(&rm->rm_lock_sx, "rmlock_sx", SX_RECURSE);
@@ -281,7 +282,7 @@ _rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker, int trylock)
 			if ((atracker->rmp_rmlock == rm) &&
 			    (atracker->rmp_thread == tracker->rmp_thread)) {
 				mtx_lock_spin(&rm_spinlock);
-				LIST_INSERT_HEAD(&rm->rm_activeReaders,
+				TAILQ_INSERT_HEAD(&rm->rm_activeReaders,
 				    tracker, rmp_qentry);
 				tracker->rmp_flags = RMPF_ONQUEUE;
 				mtx_unlock_spin(&rm_spinlock);
@@ -373,7 +374,8 @@ _rm_unlock_hard(struct thread *td,struct rm_priotracker *tracker)
 		return;
 
 	mtx_lock_spin(&rm_spinlock);
-	LIST_REMOVE(tracker, rmp_qentry);
+	TAILQ_REMOVE(&tracker->rmp_rmlock->rm_activeReaders, tracker,
+	    rmp_qentry);
 
 	if (tracker->rmp_flags & RMPF_SIGNAL) {
 		struct rmlock *rm;
@@ -445,7 +447,7 @@ _rm_wlock(struct rmlock *rm)
 #endif
 
 		mtx_lock_spin(&rm_spinlock);
-		while ((prio = LIST_FIRST(&rm->rm_activeReaders)) != NULL) {
+		while ((prio = TAILQ_FIRST(&rm->rm_activeReaders)) != NULL) {
 			ts = turnstile_trywait(&rm->lock_object);
 			prio->rmp_flags = RMPF_ONQUEUE | RMPF_SIGNAL;
 			mtx_unlock_spin(&rm_spinlock);
diff --git a/src/sys/kern/subr_smp.c b/src/sys/kern/subr_smp.c
index 98c81cc..e61d6f8 100644
--- a/src/sys/kern/subr_smp.c
+++ b/src/sys/kern/subr_smp.c
@@ -112,7 +112,8 @@ static void (*volatile smp_rv_setup_func)(void *arg);
 static void (*volatile smp_rv_action_func)(void *arg);
 static void (*volatile smp_rv_teardown_func)(void *arg);
 static void * volatile smp_rv_func_arg;
-static volatile int smp_rv_waiters[3];
+static volatile int smp_rv_waiters[4];
+static volatile cpumask_t smp_rv_targets;
 
 /* 
  * Shared mutex to restrict busywaits between smp_rendezvous() and
@@ -347,15 +348,41 @@ restart_cpus(cpumask_t map)
 void
 smp_rendezvous_action(void)
 {
-	void* local_func_arg = smp_rv_func_arg;
-	void (*local_setup_func)(void*)   = smp_rv_setup_func;
-	void (*local_action_func)(void*)   = smp_rv_action_func;
-	void (*local_teardown_func)(void*) = smp_rv_teardown_func;
+	int tmp, local_ncpu;
+	cpumask_t local_mask;
+	void* local_func_arg;
+	void (*local_setup_func)(void*);
+	void (*local_action_func)(void*);
+	void (*local_teardown_func)(void*);
 
 	/* Ensure we have up-to-date values. */
-	atomic_add_acq_int(&smp_rv_waiters[0], 1);
-	while (smp_rv_waiters[0] < smp_rv_ncpus)
+	curthread->td_critnest++;	/* critical_enter */
+	tmp = 0;
+	while ((local_mask = atomic_load_acq_int(&smp_rv_targets)) == 0) {
 		cpu_spinwait();
+		tmp++;
+	}
+#ifdef INVARIANTS
+	if (tmp != 0)
+		printf("%s: CPU%d got IPI too early and spun %d times\n",
+		    __func__, curcpu, tmp);
+#endif
+	if ((local_mask & (1 << curcpu)) == 0) {
+#ifdef INVARIANTS
+		printf("%s: %d not in %x\n", __func__, curcpu, local_mask);
+#endif
+		curthread->td_critnest--;	/* critical_exit */
+		return;
+	}
+	atomic_add_int(&smp_rv_waiters[0], 1);
+	local_ncpu = smp_rv_ncpus;
+	while ((tmp = smp_rv_waiters[0]) < local_ncpu)
+		cpu_spinwait();
+	KASSERT(tmp == local_ncpu, ("stale value for smp_rv_waiters"));
+	local_func_arg = smp_rv_func_arg;
+	local_setup_func = smp_rv_setup_func;
+	local_action_func = smp_rv_action_func;
+	local_teardown_func = smp_rv_teardown_func;
 
 	/* setup function */
 	if (local_setup_func != smp_no_rendevous_barrier) {
@@ -364,7 +391,7 @@ smp_rendezvous_action(void)
 
 		/* spin on entry rendezvous */
 		atomic_add_int(&smp_rv_waiters[1], 1);
-		while (smp_rv_waiters[1] < smp_rv_ncpus)
+		while (smp_rv_waiters[1] < local_ncpu)
                 	cpu_spinwait();
 	}
 
@@ -372,16 +399,22 @@ smp_rendezvous_action(void)
 	if (local_action_func != NULL)
 		local_action_func(local_func_arg);
 
-	/* spin on exit rendezvous */
 	atomic_add_int(&smp_rv_waiters[2], 1);
-	if (local_teardown_func == smp_no_rendevous_barrier)
-                return;
-	while (smp_rv_waiters[2] < smp_rv_ncpus)
+	if (local_teardown_func == smp_no_rendevous_barrier) {
+		atomic_add_int(&smp_rv_waiters[3], 1);
+		curthread->td_critnest--;	/* critical_exit */
+		return;
+	}
+	/* wait until all CPUs are done with the main action */
+	while (smp_rv_waiters[2] < local_ncpu)
 		cpu_spinwait();
 
+	atomic_add_int(&smp_rv_waiters[3], 1);
 	/* teardown function */
 	if (local_teardown_func != NULL)
 		local_teardown_func(local_func_arg);
+
+	curthread->td_critnest--;	/* critical_exit */
 }
 
 void
@@ -418,21 +451,31 @@ smp_rendezvous_cpus(cpumask_t map,
 	smp_rv_action_func = action_func;
 	smp_rv_teardown_func = teardown_func;
 	smp_rv_func_arg = arg;
+	smp_rv_waiters[0] = 0;
 	smp_rv_waiters[1] = 0;
 	smp_rv_waiters[2] = 0;
-	atomic_store_rel_int(&smp_rv_waiters[0], 0);
+	smp_rv_waiters[3] = 0;
+	atomic_store_rel_int(&smp_rv_targets, map);
 
 	/* signal other processors, which will enter the IPI with interrupts off */
 	ipi_selected(map & ~(1 << curcpu), IPI_RENDEZVOUS);
 
 	/* Check if the current CPU is in the map */
-	if ((map & (1 << curcpu)) != 0)
+	if ((map & (1 << curcpu)) != 0) {
 		smp_rendezvous_action();
-
-	if (teardown_func == smp_no_rendevous_barrier)
-		while (atomic_load_acq_int(&smp_rv_waiters[2]) < ncpus)
+		/* wait until all CPUs will exit smp_rendezvous_action() */
+		while (atomic_load_acq_int(&smp_rv_waiters[3]) < ncpus)
 			cpu_spinwait();
+	} else {
+		/* wait until all CPUs will exit smp_rendezvous_action() */
+		while (smp_rv_waiters[2] < ncpus)
+			cpu_spinwait();
+	}
 
+#ifdef INVARIANTS
+	smp_rv_waiters[0] = 0xdead;
+	smp_rv_targets = 0;
+#endif
 	/* release lock */
 	mtx_unlock_spin(&smp_ipi_mtx);
 }
diff --git a/src/sys/sys/_rmlock.h b/src/sys/sys/_rmlock.h
index 75a159c..1f1e36e 100644
--- a/src/sys/sys/_rmlock.h
+++ b/src/sys/sys/_rmlock.h
@@ -41,12 +41,12 @@
  * Mostly reader/occasional writer lock.
  */
 
-LIST_HEAD(rmpriolist,rm_priotracker);
+TAILQ_HEAD(rmpriolist,rm_priotracker);
 
 struct rmlock {
 	struct lock_object lock_object; 
 	volatile cpumask_t rm_writecpus;
-	LIST_HEAD(,rm_priotracker) rm_activeReaders;
+	TAILQ_HEAD(,rm_priotracker) rm_activeReaders;
 	union {
 		struct mtx _rm_lock_mtx;
 		struct sx _rm_lock_sx;
@@ -60,7 +60,7 @@ struct rm_priotracker {
 	struct rmlock *rmp_rmlock;
 	struct thread *rmp_thread;
 	int rmp_flags;
-	LIST_ENTRY(rm_priotracker) rmp_qentry;
+	TAILQ_ENTRY(rm_priotracker) rmp_qentry;
 };
 
 #endif /* !_SYS__RMLOCK_H_ */

--Boundary-00=_3MUzNu/1qvIuzJc--



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201105131041.59981.max>