Skip site navigation (1)Skip section navigation (2)
Date:      Thu, 26 Aug 2010 02:05:32 +0200
From:      Max Laier <max@love2party.net>
To:        freebsd-arch@freebsd.org
Cc:        Stephan Uphoff <ups@freebsd.org>
Subject:   Re: rmlock(9) two additions
Message-ID:  <201008260205.32416.max@love2party.net>
In-Reply-To: <4C73D0FA.5030102@freebsd.org>
References:  <201008160515.21412.max@love2party.net> <201008240045.15998.max@laiers.net> <4C73D0FA.5030102@freebsd.org>

next in thread | previous in thread | raw e-mail | index | archive | help
--Boundary-00=_M/adMKo7R9HbyBw
Content-Type: Text/Plain;
  charset="iso-8859-1"
Content-Transfer-Encoding: 7bit

On Tuesday 24 August 2010 16:02:34 Stephan Uphoff wrote:
> Yes - this is a problem that needs to be addressed.
> Fortunately most platforms won't need to be as strict and I suggest per
> platform parameters.
> An alternative that was in my original design was to use a bitmap for
> the rm_noreadtoken.
> Each CPU would then have an associated bit that will only be cleared by
> that cpu.
> This would also allow targeted IPIs to only the token holders.

Okay ... attached is a version with the bitmask idea.  It is not using a per 
platform parameters, yet.  But I believe it fixes the issue in general.  This 
comes at the cost of an additional memory reference to pc_cpumask on the exit 
conditional from the fast-path.  I don't think this is too much of a problem 
currently, as this will be in the cache already ... but I might be wrong.

For easier review, I've also attached my current version of kern_rmlock.c.

BTW, this also fixes the trylock race by trylocking the base lock.  Again, I 
don't think the race against other readers is a concern in the trylock API.

I'd like to move forward with this in some way ... any objections and/or input 
on what to do/fix before committing this?  Any ideas/pointers on how to best 
implement the per platform selector?

Thanks,
  Max

--Boundary-00=_M/adMKo7R9HbyBw
Content-Type: text/x-patch;
  charset="ISO-8859-1";
  name="rmlock.full.diff"
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment;
	filename="rmlock.full.diff"

diff --git a/share/man/man9/Makefile b/share/man/man9/Makefile
index b438b90..e6d8881 100644
--- a/share/man/man9/Makefile
+++ b/share/man/man9/Makefile
@@ -986,6 +986,7 @@ MLINKS+=rman.9 rman_activate_resource.9 \
 MLINKS+=rmlock.9 rm_destroy.9 \
 	rmlock.9 rm_init.9 \
 	rmlock.9 rm_rlock.9 \
+	rmlock.9 rm_try_rlock.9 \
 	rmlock.9 rm_runlock.9 \
 	rmlock.9 RM_SYSINIT.9 \
 	rmlock.9 rm_wlock.9 \
diff --git a/share/man/man9/locking.9 b/share/man/man9/locking.9
index 005f476..3728319 100644
--- a/share/man/man9/locking.9
+++ b/share/man/man9/locking.9
@@ -301,7 +301,7 @@ one of the synchronization primitives discussed:
 .It mutex     Ta \&ok Ta \&ok-1 Ta \&no Ta \&ok Ta \&ok Ta \&no-3
 .It sx        Ta \&ok Ta \&ok Ta \&ok-2 Ta \&ok Ta \&ok Ta \&ok-4
 .It rwlock    Ta \&ok Ta \&ok Ta \&no Ta \&ok-2 Ta \&ok Ta \&no-3
-.It rmlock    Ta \&ok Ta \&ok Ta \&no Ta \&ok Ta \&ok-2 Ta \&no
+.It rmlock    Ta \&ok Ta \&ok Ta \&ok-5 Ta \&ok Ta \&ok-2 Ta \&ok-5
 .El
 .Pp
 .Em *1
@@ -326,6 +326,13 @@ Though one can sleep holding an sx lock, one can also use
 .Fn sx_sleep
 which will atomically release this primitive when going to sleep and
 reacquire it on wakeup.
+.Pp
+.Em *5
+.Em Read-mostly
+locks can be initialized to support sleeping while holding a write lock.
+See
+.Xr rmlock 9
+for details.
 .Ss Context mode table
 The next table shows what can be used in different contexts.
 At this time this is a rather easy to remember table.
diff --git a/share/man/man9/rmlock.9 b/share/man/man9/rmlock.9
index e99661d..28ac0a5 100644
--- a/share/man/man9/rmlock.9
+++ b/share/man/man9/rmlock.9
@@ -35,6 +35,7 @@
 .Nm rm_init_flags ,
 .Nm rm_destroy ,
 .Nm rm_rlock ,
+.Nm rm_try_rlock ,
 .Nm rm_wlock ,
 .Nm rm_runlock ,
 .Nm rm_wunlock ,
@@ -53,6 +54,8 @@
 .Fn rm_destroy "struct rmlock *rm"
 .Ft void
 .Fn rm_rlock "struct rmlock *rm"  "struct rm_priotracker* tracker"
+.Ft int
+.Fn rm_try_rlock "struct rmlock *rm"  "struct rm_priotracker* tracker"
 .Ft void
 .Fn rm_wlock "struct rmlock *rm"
 .Ft void
@@ -84,14 +87,16 @@ Although reader/writer locks look very similar to
 locks, their usage pattern is different.
 Reader/writer locks can be treated as mutexes (see
 .Xr mutex 9 )
-with shared/exclusive semantics.
+with shared/exclusive semantics unless initialized with
+.Dv RM_SLEEPABLE .
 Unlike
 .Xr sx 9 ,
 an
 .Nm
 can be locked while holding a non-spin mutex, and an
 .Nm
-cannot be held while sleeping.
+cannot be held while sleeping, again unless initialized with
+.Dv RM_SLEEPABLE .
 The
 .Nm
 locks have full priority propagation like mutexes.
@@ -135,6 +140,13 @@ to ignore this lock.
 .It Dv RM_RECURSE
 Allow threads to recursively acquire exclusive locks for
 .Fa rm .
+.It Dv RM_SLEEPABLE
+Allow writers to sleep while holding the lock.
+Readers must not sleep while holding the lock and can avoid to sleep on
+taking the lock by using
+.Fn rm_try_rlock
+instead of
+.Fn rm_rlock .
 .El
 .It Fn rm_rlock "struct rmlock *rm" "struct rm_priotracker* tracker"
 Lock
@@ -161,6 +173,13 @@ access on
 .Fa rm .
 This is called
 .Dq "recursing on a lock" .
+.It Fn rm_try_rlock "struct rmlock *rm" "struct rm_priotracker* tracker"
+Try to lock
+.Fa rm
+as a reader.
+.Fn rm_try_rlock
+will return 0 if the lock cannot be acquired immediately;
+otherwise the lock will be acquired and a non-zero value will be returned.
 .It Fn rm_wlock "struct rmlock *rm"
 Lock
 .Fa rm
diff --git a/sys/kern/kern_rmlock.c b/sys/kern/kern_rmlock.c
index a6a622e..0ab5d74 100644
--- a/sys/kern/kern_rmlock.c
+++ b/sys/kern/kern_rmlock.c
@@ -187,6 +187,8 @@ rm_cleanIPI(void *arg)
 	}
 }
 
+CTASSERT((RM_SLEEPABLE & LO_CLASSFLAGS) == RM_SLEEPABLE);
+
 void
 rm_init_flags(struct rmlock *rm, const char *name, int opts)
 {
@@ -197,9 +199,13 @@ rm_init_flags(struct rmlock *rm, const char *name, int opts)
 		liflags |= LO_WITNESS;
 	if (opts & RM_RECURSE)
 		liflags |= LO_RECURSABLE;
-	rm->rm_noreadtoken = 1;
+	rm->rm_writecpus = all_cpus;
 	LIST_INIT(&rm->rm_activeReaders);
-	mtx_init(&rm->rm_lock, name, "rmlock_mtx", MTX_NOWITNESS);
+	if (opts & RM_SLEEPABLE) {
+		liflags |= RM_SLEEPABLE;
+		sx_init_flags(&rm->rm_lock_sx, "rmlock_sx", SX_RECURSE);
+	} else
+		mtx_init(&rm->rm_lock_mtx, name, "rmlock_mtx", MTX_NOWITNESS);
 	lock_init(&rm->lock_object, &lock_class_rm, name, NULL, liflags);
 }
 
@@ -214,7 +220,10 @@ void
 rm_destroy(struct rmlock *rm)
 {
 
-	mtx_destroy(&rm->rm_lock);
+	if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+		sx_destroy(&rm->rm_lock_sx);
+	else
+		mtx_destroy(&rm->rm_lock_mtx);
 	lock_destroy(&rm->lock_object);
 }
 
@@ -222,7 +231,10 @@ int
 rm_wowned(struct rmlock *rm)
 {
 
-	return (mtx_owned(&rm->rm_lock));
+	if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+		return (sx_xlocked(&rm->rm_lock_sx));
+	else
+		return (mtx_owned(&rm->rm_lock_mtx));
 }
 
 void
@@ -241,8 +253,8 @@ rm_sysinit_flags(void *arg)
 	rm_init_flags(args->ra_rm, args->ra_desc, args->ra_opts);
 }
 
-static void
-_rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker)
+static int
+_rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker, int trylock)
 {
 	struct pcpu *pc;
 	struct rm_queue *queue;
@@ -252,9 +264,9 @@ _rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker)
 	pc = pcpu_find(curcpu);
 
 	/* Check if we just need to do a proper critical_exit. */
-	if (0 == rm->rm_noreadtoken) {
+	if (!(pc->pc_cpumask & rm->rm_writecpus)) {
 		critical_exit();
-		return;
+		return (1);
 	}
 
 	/* Remove our tracker from the per-cpu list. */
@@ -265,7 +277,7 @@ _rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker)
 		/* Just add back tracker - we hold the lock. */
 		rm_tracker_add(pc, tracker);
 		critical_exit();
-		return;
+		return (1);
 	}
 
 	/*
@@ -289,7 +301,7 @@ _rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker)
 				mtx_unlock_spin(&rm_spinlock);
 				rm_tracker_add(pc, tracker);
 				critical_exit();
-				return;
+				return (1);
 			}
 		}
 	}
@@ -297,20 +309,38 @@ _rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker)
 	sched_unpin();
 	critical_exit();
 
-	mtx_lock(&rm->rm_lock);
-	rm->rm_noreadtoken = 0;
-	critical_enter();
+	if (trylock) {
+		if (rm->lock_object.lo_flags & RM_SLEEPABLE) {
+			if (!sx_try_xlock(&rm->rm_lock_sx))
+				return (0);
+		} else {
+			if (!mtx_trylock(&rm->rm_lock_mtx))
+				return (0);
+		}
+	} else {
+		if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+			sx_xlock(&rm->rm_lock_sx);
+		else
+			mtx_lock(&rm->rm_lock_mtx);
+	}
 
+	critical_enter();
 	pc = pcpu_find(curcpu);
+	rm->rm_writecpus &= ~pc->pc_cpumask;
 	rm_tracker_add(pc, tracker);
 	sched_pin();
 	critical_exit();
 
-	mtx_unlock(&rm->rm_lock);
+	if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+		sx_xunlock(&rm->rm_lock_sx);
+	else
+		mtx_unlock(&rm->rm_lock_mtx);
+
+	return (1);
 }
 
-void
-_rm_rlock(struct rmlock *rm, struct rm_priotracker *tracker)
+int
+_rm_rlock(struct rmlock *rm, struct rm_priotracker *tracker, int trylock)
 {
 	struct thread *td = curthread;
 	struct pcpu *pc;
@@ -337,11 +367,11 @@ _rm_rlock(struct rmlock *rm, struct rm_priotracker *tracker)
 	 * Fast path to combine two common conditions into a single
 	 * conditional jump.
 	 */
-	if (0 == (td->td_owepreempt | rm->rm_noreadtoken))
-		return;
+	if (0 == (td->td_owepreempt | (rm->rm_writecpus & pc->pc_cpumask)))
+		return (1);
 
 	/* We do not have a read token and need to acquire one. */
-	_rm_rlock_hard(rm, tracker);
+	return _rm_rlock_hard(rm, tracker, trylock);
 }
 
 static void
@@ -400,20 +430,26 @@ _rm_wlock(struct rmlock *rm)
 {
 	struct rm_priotracker *prio;
 	struct turnstile *ts;
+	cpumask_t readcpus;
 
-	mtx_lock(&rm->rm_lock);
+	if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+		sx_xlock(&rm->rm_lock_sx);
+	else
+		mtx_lock(&rm->rm_lock_mtx);
 
-	if (rm->rm_noreadtoken == 0) {
+	if (rm->rm_writecpus != all_cpus) {
 		/* Get all read tokens back */
 
-		rm->rm_noreadtoken = 1;
+		readcpus = all_cpus & (all_cpus & ~rm->rm_writecpus);
+		rm->rm_writecpus = all_cpus;
 
 		/*
-		 * Assumes rm->rm_noreadtoken update is visible on other CPUs
+		 * Assumes rm->rm_writecpus update is visible on other CPUs
 		 * before rm_cleanIPI is called.
 		 */
 #ifdef SMP
-		smp_rendezvous(smp_no_rendevous_barrier,
+		smp_rendezvous_cpus(readcpus,
+		    smp_no_rendevous_barrier,
 		    rm_cleanIPI,
 		    smp_no_rendevous_barrier,
 		    rm);
@@ -439,7 +475,10 @@ void
 _rm_wunlock(struct rmlock *rm)
 {
 
-	mtx_unlock(&rm->rm_lock);
+	if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+		sx_xunlock(&rm->rm_lock_sx);
+	else
+		mtx_unlock(&rm->rm_lock_mtx);
 }
 
 #ifdef LOCK_DEBUG
@@ -454,7 +493,11 @@ void _rm_wlock_debug(struct rmlock *rm, const char *file, int line)
 
 	LOCK_LOG_LOCK("RMWLOCK", &rm->lock_object, 0, 0, file, line);
 
-	WITNESS_LOCK(&rm->lock_object, LOP_EXCLUSIVE, file, line);
+	if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+		WITNESS_LOCK(&rm->rm_lock_sx.lock_object, LOP_EXCLUSIVE,
+		    file, line);	
+	else
+		WITNESS_LOCK(&rm->lock_object, LOP_EXCLUSIVE, file, line);
 
 	curthread->td_locks++;
 
@@ -465,25 +508,35 @@ _rm_wunlock_debug(struct rmlock *rm, const char *file, int line)
 {
 
 	curthread->td_locks--;
-	WITNESS_UNLOCK(&rm->lock_object, LOP_EXCLUSIVE, file, line);
+	if (rm->lock_object.lo_flags & RM_SLEEPABLE)
+		WITNESS_UNLOCK(&rm->rm_lock_sx.lock_object, LOP_EXCLUSIVE,
+		    file, line);
+	else
+		WITNESS_UNLOCK(&rm->lock_object, LOP_EXCLUSIVE, file, line);
 	LOCK_LOG_LOCK("RMWUNLOCK", &rm->lock_object, 0, 0, file, line);
 	_rm_wunlock(rm);
 }
 
-void
+int
 _rm_rlock_debug(struct rmlock *rm, struct rm_priotracker *tracker,
-    const char *file, int line)
+    int trylock, const char *file, int line)
 {
-
+	if (!trylock && (rm->lock_object.lo_flags & RM_SLEEPABLE))
+		WITNESS_CHECKORDER(&rm->rm_lock_sx.lock_object, LOP_NEWORDER,
+		    file, line, NULL);
 	WITNESS_CHECKORDER(&rm->lock_object, LOP_NEWORDER, file, line, NULL);
 
-	_rm_rlock(rm, tracker);
+	if (_rm_rlock(rm, tracker, trylock)) {
+		LOCK_LOG_LOCK("RMRLOCK", &rm->lock_object, 0, 0, file, line);
 
-	LOCK_LOG_LOCK("RMRLOCK", &rm->lock_object, 0, 0, file, line);
+		WITNESS_LOCK(&rm->lock_object, 0, file, line);
 
-	WITNESS_LOCK(&rm->lock_object, 0, file, line);
+		curthread->td_locks++;
 
-	curthread->td_locks++;
+		return (1);
+	}
+
+	return (0);
 }
 
 void
@@ -517,12 +570,12 @@ _rm_wunlock_debug(struct rmlock *rm, const char *file, int line)
 	_rm_wunlock(rm);
 }
 
-void
+int
 _rm_rlock_debug(struct rmlock *rm, struct rm_priotracker *tracker,
-    const char *file, int line)
+    int trylock, const char *file, int line)
 {
 
-	_rm_rlock(rm, tracker);
+	return _rm_rlock(rm, tracker, trylock);
 }
 
 void
diff --git a/sys/sys/_rmlock.h b/sys/sys/_rmlock.h
index e5c68d5..75a159c 100644
--- a/sys/sys/_rmlock.h
+++ b/sys/sys/_rmlock.h
@@ -45,11 +45,15 @@ LIST_HEAD(rmpriolist,rm_priotracker);
 
 struct rmlock {
 	struct lock_object lock_object; 
-	volatile int 	rm_noreadtoken;
+	volatile cpumask_t rm_writecpus;
 	LIST_HEAD(,rm_priotracker) rm_activeReaders;
-	struct mtx	rm_lock;
-
+	union {
+		struct mtx _rm_lock_mtx;
+		struct sx _rm_lock_sx;
+	} _rm_lock;
 };
+#define	rm_lock_mtx	_rm_lock._rm_lock_mtx
+#define	rm_lock_sx	_rm_lock._rm_lock_sx
 
 struct rm_priotracker {
 	struct rm_queue rmp_cpuQueue; /* Must be first */
diff --git a/sys/sys/rmlock.h b/sys/sys/rmlock.h
index 9766f67..ef5776b 100644
--- a/sys/sys/rmlock.h
+++ b/sys/sys/rmlock.h
@@ -33,6 +33,7 @@
 #define _SYS_RMLOCK_H_
 
 #include <sys/mutex.h>
+#include <sys/sx.h>
 #include <sys/_lock.h>
 #include <sys/_rmlock.h>
 
@@ -43,6 +44,7 @@
  */
 #define	RM_NOWITNESS	0x00000001
 #define	RM_RECURSE	0x00000002
+#define	RM_SLEEPABLE	0x00000004
 
 void	rm_init(struct rmlock *rm, const char *name);
 void	rm_init_flags(struct rmlock *rm, const char *name, int opts);
@@ -53,14 +55,15 @@ void	rm_sysinit_flags(void *arg);
 
 void	_rm_wlock_debug(struct rmlock *rm, const char *file, int line);
 void	_rm_wunlock_debug(struct rmlock *rm, const char *file, int line);
-void	_rm_rlock_debug(struct rmlock *rm, struct rm_priotracker *tracker,
-	    const char *file, int line);
+int	_rm_rlock_debug(struct rmlock *rm, struct rm_priotracker *tracker,
+	    int trylock, const char *file, int line);
 void	_rm_runlock_debug(struct rmlock *rm,  struct rm_priotracker *tracker,
 	    const char *file, int line);
 
 void	_rm_wlock(struct rmlock *rm);
 void	_rm_wunlock(struct rmlock *rm);
-void	_rm_rlock(struct rmlock *rm, struct rm_priotracker *tracker);
+int	_rm_rlock(struct rmlock *rm, struct rm_priotracker *tracker,
+	    int trylock);
 void	_rm_runlock(struct rmlock *rm,  struct rm_priotracker *tracker);
 
 /*
@@ -74,14 +77,17 @@ void	_rm_runlock(struct rmlock *rm,  struct rm_priotracker *tracker);
 #define	rm_wlock(rm)	_rm_wlock_debug((rm), LOCK_FILE, LOCK_LINE)
 #define	rm_wunlock(rm)	_rm_wunlock_debug((rm), LOCK_FILE, LOCK_LINE)
 #define	rm_rlock(rm,tracker)  \
-    _rm_rlock_debug((rm),(tracker), LOCK_FILE, LOCK_LINE )
+    ((void)_rm_rlock_debug((rm),(tracker), 0, LOCK_FILE, LOCK_LINE ))
+#define	rm_try_rlock(rm,tracker)  \
+    _rm_rlock_debug((rm),(tracker), 1, LOCK_FILE, LOCK_LINE )
 #define	rm_runlock(rm,tracker)	\
     _rm_runlock_debug((rm), (tracker), LOCK_FILE, LOCK_LINE )
 #else
-#define	rm_wlock(rm)		_rm_wlock((rm))
-#define	rm_wunlock(rm)		_rm_wunlock((rm))
-#define	rm_rlock(rm,tracker)   	_rm_rlock((rm),(tracker))
-#define	rm_runlock(rm,tracker)	_rm_runlock((rm), (tracker))
+#define	rm_wlock(rm)			_rm_wlock((rm))
+#define	rm_wunlock(rm)			_rm_wunlock((rm))
+#define	rm_rlock(rm,tracker)		((void)_rm_rlock((rm),(tracker), 0))
+#define	rm_try_rlock(rm,tracker)	_rm_rlock((rm),(tracker), 1)
+#define	rm_runlock(rm,tracker)		_rm_runlock((rm), (tracker))
 #endif
 
 struct rm_args {

--Boundary-00=_M/adMKo7R9HbyBw--



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201008260205.32416.max>