Skip site navigation (1)Skip section navigation (2)
Date:      Sun, 27 Jun 1999 13:24:01 -0700 (PDT)
From:      Matthew Dillon <dillon@apollo.backplane.com>
To:        freebsd-smp@FreeBSD.ORG
Subject:   high-efficiency SMP locks - submission for review
Message-ID:  <199906272024.NAA15634@apollo.backplane.com>

next in thread | raw e-mail | index | archive | help
    I would like to know what the SMP gurus think of this code.  I have
    not posted the complete patch, but have included the meat of the code. 
    I am interested in comments relating to the implementation & performance
    aspects of the code.  I would also appreciate it if an inline assembly
    expert could look at my inline assembly.  I have tested it well and
    I believe it to be correct, but I don't write much inline assembly so...

    This code is designed to implement extremely low overhead SMP-compatible 
    locks.  It will probably be used instead of lockmgr() locks for the
    buffer cache, and I hope to use it as a basis to replace lockmgr() locks
    in other modules later on.  The same structure can be used to implement
    both spin locks and normal blocking locks, and can handle shared, 
    exclusive, and recursive-exclusive lock types.  The critical path has 
    been optimize down to 20 instructions or so ( compared to the hundreds
    in lockmgr() ).

    As many of you may already know, lockmgr() locks are extremely expensive
    and require interrupt synchronization for their internal simplelocks to
    operate properly in a mixed process/interrupt environment.  The result is
    a horrendous amount of overhead for very little gain.  I am attempting
    to come up with a general locking module that can eventually replace
    lockmgr() locks.
				
						-Matt

( for /usr/src/sys/i386/include/atomic.h )

	atomic_cmpex() returns oldv on success, something else on failure.


static __inline int
atomic_cmpex(volatile int *pint, int oldv, int newv)
{
        __asm __volatile ("/* %0 %1 */; lock; cmpxchgl %2,(%3)"
                          : "=a" (oldv)
                          : "a" (oldv), "r" (newv), "r" (pint)
                        );
        return(oldv);
}


/*
 * SYS/QLOCK.H
 *
 * (c)Copyright Matthew Dillon, 1999.  BSD copyright /usr/src/COPYRIGHT is
 * to apply with the exception that the author's name, "Matthew Dillon", is to
 * replace the "Regents of the University of California".
 *
 * Implement high efficiency SMP-safe shared/exclusive locks with recursion
 * capabilities.
 *
 * See kern/kern_qlock.c for a detailed explanation
 *
 * $Id: Exp $
 */

#ifndef	_QLOCK_H_
#define	_QLOCK_H_

#include <machine/atomic.h>

struct proc;

/*
 * qlock structure
 *	
 *	count
 *		contains lock count.  Positive numbers indicate shared 
 *		references, negative numbers indicate exclusive ( and 
 *		possibly recursive ) references.  0 indicates that the
 *		lock is not held at all.
 *
 *	waiting
 *		If non-zero, indicates that someone is waiting for this lock.
 *		Processes waiting for a lock are woken up when the lock count
 *		passes through 0.
 *
 *	holder
 *		If an exclusive lock is being held, holder contains the process
 *		pointer of the holder.  Otherwise holder contains garbage
 *		(but may be useful in crash dumps).  If lock debugging is 
 *		turned on and shared locks are being held, holder contains 
 *		the last process to obtain the shared lock.
 *
 *		The holder field is not updated atomically, but this does
 *		not matter because it is only tested if an exclusive lock
 *		is already held.  We go through a little magic to deal with
 *		the one race that occurs in qlock_try_mwr().
 */

struct qlock {
	volatile int count;	/* lock refs: neg=exclusive, pos=shared */
	int	waiting;	/* process(s) are waiting on lock	*/
	struct proc *holder;	/* holder of an exclusive lock		*/
};

#define QLK_SPIN	0
#define QLK_SWITCH	1

#ifdef KERNEL

/*
 * These functions may not be used outside this header file
 */

extern void	qlock_init(struct qlock *lk);
extern void	_qlock_rd(struct qlock *lk);
extern void	_qlock_wr(struct qlock *lk);
extern void	_qlock_mwr(struct qlock *lk);
extern int	_qlock_sleepfail_rd(struct qlock *lk, char *wmesg, int catch, int timo);
extern int	_qlock_sleepfail_wr(struct qlock *lk, char *wmesg, int catch, int timo);
extern void	_qlock_upgrade(struct qlock *lk);
extern void	_qlock_ovf_panic(struct qlock *lk);
extern void	_qlock_op_panic(struct qlock *lk);
extern void	_qunlock_rd_panic(struct qlock *lk);
extern void	_qunlock_wr_panic(struct qlock *lk);

/*
 * Optimized inlined functions for general use
 */

/*
 *	qlock_try_rd
 *	qlock_try_wr
 *	qlock_try_mwr
 *
 *	Attempt to obtain a shared lock, exclusive lock, or recursive-capable
 *	exclusive lock.  0 is returned on success, EBUSY on failure.  The loop
 *	exists only to manage SMP collisions, the routine does not block.
 *
 *	Note that qlock_*wr() needs to deal with the SMP race between the
 *	count going from 0 to -1 and the lock holder getting set.  It does
 *	this by running the lock through an intermediate stage that blocks
 *	other attempts to obtain exclusive locks until the holder can be set.
 */

static __inline int
qlock_try_rd(struct qlock *lk)
{
	int n;

	while ((n = lk->count) >= 0) {
#ifdef INVARIANTS
		if (n == 0x7FFFFFFF)
			_qlock_ovf_panic(lk);
#endif
		if (atomic_cmpex(&lk->count, n, n + 1) == n) {
#ifdef DEBUG_LOCKS
			lk->holder = curproc;
#endif
			return(0);
		}
	}
	return(EBUSY);
}

static __inline int
qlock_try_wr(struct qlock *lk)
{
	int n;

	while ((n = lk->count) == 0) {
		if (atomic_cmpex(&lk->count, 0, -(int)0x7FFFFFFF) == 0) {
			lk->holder = curproc;
			lk->count = -1;
			return(0);
		}
	}
	return(EBUSY);
}

static __inline int
qlock_try_mwr(struct qlock *lk)
{
	int n;

	while ((n = lk->count) <= 0) {
		if (n == 0) {
			if (atomic_cmpex(&lk->count, n, -(int)0x7FFFFFFF) == n) {
				lk->holder = curproc;
				lk->count = -1;
				return(0);
			}
			continue;
		}
		if (n == -(int)0x7FFFFFFF)
			continue;
		if (lk->holder != curproc)
			break;
#ifdef INVARIANTS
		if (n == -(int)0x7FFFFFFE)
			_qlock_ovf_panic(lk);
#endif
		if (atomic_cmpex(&lk->count, n, n - 1) == n)
			return(0);
	}
	return(EBUSY);
}

/*
 *	qlock_spin_rd
 *	qlock_spin_wr
 *	qlock_spin_mwr
 *
 *	Obtain a shared lock, exclusive lock, or recursive-capable
 *	exclusive lock.  These calls spin until they can get the lock.
 */

static __inline void
qlock_spin_rd(struct qlock *lk)
{
	while (qlock_try_rd(lk) != 0)
		;
}

static __inline void
qlock_spin_wr(struct qlock *lk)
{
	while (qlock_try_wr(lk) != 0)
		;
}

static __inline void
qlock_spin_mwr(struct qlock *lk)
{
	while (qlock_try_mwr(lk) != 0)
		;
}

/*
 *	qlock_require_rd
 *	qlock_require_wr
 *	qlock_require_mwr
 *
 *	Obtain a shared lock, exclusive lock, or recursive-capable
 *	exclusive lock.  We expect to be able to obtain the lock without
 *	having to block, and panic if we cannot.
 */

static __inline void
qlock_require_rd(struct qlock *lk)
{
	if (qlock_try_rd(lk) != 0)
		panic("qlock: failed to obtain shared lock %p", lk);
}

static __inline void
qlock_require_wr(struct qlock *lk)
{
	if (qlock_try_wr(lk) != 0)
		panic("qlock: failed to obtain exclusive lock %p", lk);
}

static __inline void
qlock_require_mwr(struct qlock *lk)
{
	if (qlock_try_mwr(lk) != 0)
		panic("qlock: failed to obtain m-exclusive lock %p", lk);
}

/*
 *	qlock_rd
 *	qlock_wr
 *	qlock_mwr
 *
 *	Obtain a shared lock, exclusive lock, or recursive-capable
 *	exclusive lock.  These routines will block until they get
 *	the requested lock.
 */

static __inline void
qlock_rd(struct qlock *lk)
{
	if (qlock_try_rd(lk) != 0)
		_qlock_rd(lk);
}

static __inline void
qlock_wr(struct qlock *lk)
{
	if (qlock_try_wr(lk) != 0)
		_qlock_wr(lk);
}


static __inline void
qlock_mwr(struct qlock *lk)
{
	if (qlock_try_mwr(lk) != 0)
		_qlock_mwr(lk);
}

/*
 *	qlock_sleepfail_rd
 *	qlock_sleepfail_wr
 *
 *	Obtain a shared lock or an exclusive lock and include options for
 *	catching signals and timeouts.  Note that we do not support
 *	recursive exclusive locks (yet).
 *	
 *	These routines will block until they get the requested lock.
 *	0 is returned if the lock was obtained, and an appropriate 
 *	error is returned otherwise (similar to lockmgr() locks).
 */

static __inline int
qlock_sleepfail_rd(struct qlock *lk, char *wmesg, int catch, int timo)
{
	if (qlock_try_rd(lk) != 0)
		return(_qlock_sleepfail_rd(lk, wmesg, catch, timo));
	return(0);
}

static __inline int
qlock_sleepfail_wr(struct qlock *lk, char *wmesg, int catch, int timo)
{
	if (qlock_try_wr(lk) != 0)
		return(_qlock_sleepfail_wr(lk, wmesg, catch, timo));
	return(0);
}

/*
 *	qunlock_rd
 *	qunlock_wr
 *	qunlock
 *
 *	Release a shared or exclusive lock.  The qunlock() function can release
 *	either type of lock while the qunlock_rd/wr functions can only release
 *	a specific type of lock.
 *
 *	Note that we do not bother clearing holder when the count transitions
 *	to 0.  If we were to do this, note that we would have to go through 
 *	the special count state to avoid SMP races with holder.
 *
 *	These routines do not block.
 */


static __inline void
qunlock_rd(struct qlock *lk)
{
	for (;;) {
		int n;

		if ((n = lk->count) <= 0)
			_qunlock_rd_panic(lk);
		if (atomic_cmpex(&lk->count, n, n - 1) == n) {
			if (n == 1 && lk->waiting) {
				lk->waiting = 0;
				wakeup(lk);
			}
			break;
		}
	}
}

static __inline void
qunlock_wr(struct qlock *lk)
{
	for (;;) {
		int n;

		if ((n = lk->count) >= 0)
			_qunlock_wr_panic(lk);
		if (atomic_cmpex(&lk->count, n, n + 1) == n) {
			if (n == -1 && lk->waiting) {
				lk->waiting = 0;
				wakeup(lk);
			}
			break;
		}
	}
}

static __inline void
qunlock(struct qlock *lk)
{
	for (;;) {
		int n;
		int xn;

		if ((n = lk->count) == 0)
			_qunlock_wr_panic(lk);
		if (n < 0)
			xn = n + 1;
		else
			xn = n - 1;
		if (atomic_cmpex(&lk->count, n, xn) == n) {
			if (xn == 0 && lk->waiting) {
				lk->waiting = 0;
				wakeup(lk);
			}
			break;
		}
	}
}

/*
 *	qlockfree:
 *
 *	Free a lock.  At the moment all we need to do is check that nobody
 *	is holding the lock, and panic if someone is.
 */

static __inline void
qlockfree(struct qlock *lk)
{
	if (lk->count != 0)
		panic("freed lock %p is locked", lk);
}

/*
 *	qlockabscount:
 *
 *	Return the current lock count as a positive value.  The number of
 *	held shared or exclusive locks is returned.  A value of 0 indicates
 *	that no locks are being held.
 */

static __inline int
qlockabscount(struct qlock *lk)
{
	if (lk->count < 0)
		return(-lk->count);
	else
		return(lk->count);
}

/*
 *	qlockcount:
 *
 *	Return the current lock count.  A positive value indicates that N
 *	shared locks are being held.  A negative value indicates that -N
 *	exclusive locks are being held (N can only be < -1 if recursive
 *	exclusive locks are being held).  A value of 0 indicates that no
 *	locks are being held.
 */

static __inline int
qlockcount(struct qlock *lk)
{
	return(lk->count);
}

/*
 *	qlock_try_upgrade	shared -> exclusive
 *	qlock_upgrade		shared -> exclusive
 *	qlock_downgrade		exclusive -> shared
 *
 *	Upgrade or downgrade an existing lock.  Recursion is NOT supported
 *	for either.  Also note that in order to avoid odd deadlock situations,
 *	qlock_upgrade will release the currently held shared lock prior to
 *	attempting to obtain the new exclusive lock.  qlock_try_upgrade()
 *	on the otherhand only succeeds if it is able to upgrade the lock
 *	atomically.
 *
 *	When upgrading a lock, we have to deal with a race between the
 *	lock holder field and the count going negative.  We do this by
 *	staging the count through a special value.
 */

static __inline int
qlock_try_upgrade(struct qlock *lk)
{
	if (lk->count == 1) {
		if (atomic_cmpex(&lk->count, 1, -(int)0x7FFFFFFF) == 1) {
			lk->holder = curproc;
			lk->count = -1;
			return(0);
		}
	}
	return(EBUSY);
}

static __inline void
qlock_upgrade(struct qlock *lk)
{
	if (qlock_try_upgrade(lk) == EBUSY)
		_qlock_upgrade(lk);
}

static __inline void
qlock_downgrade(struct qlock *lk)
{
	int n;

	if (
	    (n = lk->count) != -1 || 
	    lk->holder != curproc ||
	    atomic_cmpex(&lk->count, n, 1) != n
	) {
		_qlock_op_panic(lk);
	}
	if (lk->waiting) {
		lk->waiting = 0;
		wakeup(lk);
	}
}

#endif

#endif /* !_QLOCK_H_ */



/*
 * KERN/KERN_QLOCK.C
 *
 * (c)Copyright Matthew Dillon, 1999.  BSD copyright /usr/src/COPYRIGHT is
 * to apply with the exception that the author's name, "Matthew Dillon", is to
 * replace the "Regents of the University of California".
 *
 * Provides highly efficient SMP-capable shared & exclusive locks.  Most of
 * the meat of the code is in sys/qlock.h.  These locks are designed to
 * operate efficiently in either an SMP or non-SMP environment.  A minimum
 * number of instructions and conditions are utilized to implement the locks.
 *
 * Please note that while we do not do it here, it is possible to generate
 * inlines that support single-entry-point calls with an internal switch.
 * If the lock type is passed to such an inline the compiler will be able
 * to optimize each instance to a single case statment in the switch, 
 * producing optimal code.
 *
 * These locks are not meant to be complex and additional functionality should
 * not be added to them if it would effect the efficiency of existing calls.
 * error returns are close to what lockmgr() gives us.    We use hybrid inline
 * functions to make the common-case as fast and tight as possible.
 *
 * "recursion" is defined as being able to obtain multiple locks of the same
 * type from the same process context.  Note that qlocks allow only shared or
 * only exclusive locks to be associated with the structure at any given 
 * moment, using negative values to count exclusive locks and positive values
 * to count shared locks.
 *
 * "blocks" is defined to mean that the call may block.
 *
 * Two types of locks are supported: shared and exclusive.  Shared locks
 * usually support recursion,  Exclusive locks can optionally support
 * recursion.  The more esoteric functions (upgrades, downgrades, ...)
 * generally cannot support recursion.  The general unlock code always 
 * supports recursion.
 *
 *				call	recurs	blocks	type
 *				-----	-----	-----	-----
 *
 *	qlock_try_rd		inline	yes	no	shared
 *	qlock_try_wr		inline	no	no	exclusive
 *	qlock_try_mwr		inline	yes	no	exclusive
 *
 *	qlock_spin_rd		inline	yes	spins	shared
 *	qlock_spin_wr		inline	no	spins	exclusive
 *	qlock_spin_mwr		inline	yes	spins	exclusive
 *
 *	qlock_require_rd	inline	yes	no(1)	shared
 *	qlock_require_wr	inline	no	no(1)	exclusive
 *	qlock_require_mwr	inline	yes	no(1)	exclusive
 *
 *		note (1): guarentees lock is obtained non-blocking,
 *		panics if it cannot be obtained.
 *
 *	qlock_rd		hybrid	yes	yes	shared
 *	qlock_wr		hybrid	no	yes	exclusive
 *	qlock_mwr		hybrid	yes	yes	exclusive
 *	
 *	qlock_sleepfail_rd	hybrid	no	yes(2)	shared
 *	qlock_sleepfail_wr	hybrid	no	yes(2)	exclusive
 *
 *		note (2): if the call blocks, it does not attempt to
 *		retry the lock prior to returning.
 *
 *	qunlock_rd		inline	yes	no	shared
 *	qunlock_wr		inline	yes	no	exclusive
 *	qunlock			inline	yes	no	shared or exclusive
 *
 *	qlock_upgrade		hybrid	no(3)	yes	shared->exclusive
 *	qlock_downgrade		inline	no(3)	no	exclusive->shared
 *
 *		note (3): NOTE! these routines cannot operate with
 *		recursive locks, and the lock may be lost for a period
 *		within a qlock_upgrade() call.
 *
 *	qlockfree		inline
 *	qlockabscount		inline
 */

#include <sys/param.h>
#include <sys/proc.h>
#include <sys/systm.h>
#include <sys/qlock.h>

/*
 *	qlock_init:
 *
 *	Initialize a qlock.  At the moment we need only zero the fields,
 *	but this may change in the future.
 */

void
qlock_init(struct qlock *lk)
{
	lk->count = 0;
	lk->waiting = 0;
	lk->holder = NULL;
}

#if 0

/*
 * Inlines are available for these, but sometimes it is more efficient to
 * call a subroutine to deal with it.
 */

void
qlock_call_rd(struct qlock *lk)
{
	qlock_rd(lk);
}

void
qlock_call_wr(struct qlock *lk)
{
	qlock_wr(lk);
}

#endif

/*
 * These core routines are only called from sys/qlock.h and should never be
 * called directly.  They typically implement the non-trivial cases for the
 * inlines.
 */

void
_qlock_rd(struct qlock *lk)
{
	for (;;) {
		asleep(lk, PRIBIO + 4, "qlkrd", 0);
		lk->waiting = 1;
		if (qlock_try_rd(lk) == 0)
			return;
		await(-1, -1);
	}
}

void
_qlock_wr(struct qlock *lk)
{
	for (;;) {
		asleep(lk, PRIBIO + 4, "qlkwr", 0);
		lk->waiting = 1;
		if (qlock_try_wr(lk) == 0)
			return;
		await(-1, -1);
	}
}

void
_qlock_mwr(struct qlock *lk)
{
	for (;;) {
		asleep(lk, PRIBIO + 4, "qlkwr", 0);
		lk->waiting = 1;
		if (qlock_try_mwr(lk) == 0)
			return;
		await(-1, -1);
	}
}

int 
_qlock_sleepfail_rd(struct qlock *lk, char *wmesg, int catch, int timo)
{
	int r = 0;

	asleep(lk, (PRIBIO + 4) | catch, wmesg, timo);
	lk->waiting = 1;
	if (qlock_try_rd(lk) != 0) {
		r = await(-1, -1);
		if (r == 0)
			r = ENOLCK;
	}
	return(r);
}   
 
int 
_qlock_sleepfail_wr(struct qlock *lk, char *wmesg, int catch, int timo)
{                       
	int r = 0;

	asleep(lk, (PRIBIO + 4) | catch, wmesg, timo); 
	lk->waiting = 1;
	if (qlock_try_wr(lk) != 0) {
		r = await(-1, -1);
		if (r == 0)
			r = ENOLCK;
	}
	return(r);
}

void
_qlock_upgrade(struct qlock *lk)
{
	/*
	 * First release the existing shared lock
	 */

	for (;;) {
		int n = lk->count;

		if (n <= 0)
			_qlock_op_panic(lk);
		if (atomic_cmpex(&lk->count, n, n - 1) == n)
			break;
	}

	/*
	 * Then obtain a new exclusive lock
	 */
	qlock_wr(lk);
}

/*
 * The panic functions collapse the code overhead into one place, reducing
 * the codespace wasteage of the inlines.
 */

void
_qlock_ovf_panic(struct qlock *lk)
{
	panic("qlock_rd/wr: %p count overflow", lk);
}

void
_qlock_op_panic(struct qlock *lk)
{
	panic("qlock: %p illegal operation on lock state %d", lk, lk->count);
}

void
_qunlock_rd_panic(struct qlock *lk)
{
	panic("qunlock_rd: %p not holding shared lock", lk);
}

void
_qunlock_wr_panic(struct qlock *lk)
{
	panic("qunlock_rd: %p not holding exclusive lock", lk);
}



To Unsubscribe: send mail to majordomo@FreeBSD.org
with "unsubscribe freebsd-smp" in the body of the message




Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?199906272024.NAA15634>