From owner-freebsd-smp Sun Jun 27 13:24:11 1999 Delivered-To: freebsd-smp@freebsd.org Received: from apollo.backplane.com (apollo.backplane.com [209.157.86.2]) by hub.freebsd.org (Postfix) with ESMTP id 7D0B314E12 for ; Sun, 27 Jun 1999 13:24:01 -0700 (PDT) (envelope-from dillon@apollo.backplane.com) Received: (from dillon@localhost) by apollo.backplane.com (8.9.3/8.9.1) id NAA15634; Sun, 27 Jun 1999 13:24:01 -0700 (PDT) (envelope-from dillon) Date: Sun, 27 Jun 1999 13:24:01 -0700 (PDT) From: Matthew Dillon Message-Id: <199906272024.NAA15634@apollo.backplane.com> To: freebsd-smp@FreeBSD.ORG Subject: high-efficiency SMP locks - submission for review Sender: owner-freebsd-smp@FreeBSD.ORG Precedence: bulk X-Loop: FreeBSD.org I would like to know what the SMP gurus think of this code. I have not posted the complete patch, but have included the meat of the code. I am interested in comments relating to the implementation & performance aspects of the code. I would also appreciate it if an inline assembly expert could look at my inline assembly. I have tested it well and I believe it to be correct, but I don't write much inline assembly so... This code is designed to implement extremely low overhead SMP-compatible locks. It will probably be used instead of lockmgr() locks for the buffer cache, and I hope to use it as a basis to replace lockmgr() locks in other modules later on. The same structure can be used to implement both spin locks and normal blocking locks, and can handle shared, exclusive, and recursive-exclusive lock types. The critical path has been optimize down to 20 instructions or so ( compared to the hundreds in lockmgr() ). As many of you may already know, lockmgr() locks are extremely expensive and require interrupt synchronization for their internal simplelocks to operate properly in a mixed process/interrupt environment. The result is a horrendous amount of overhead for very little gain. I am attempting to come up with a general locking module that can eventually replace lockmgr() locks. -Matt ( for /usr/src/sys/i386/include/atomic.h ) atomic_cmpex() returns oldv on success, something else on failure. static __inline int atomic_cmpex(volatile int *pint, int oldv, int newv) { __asm __volatile ("/* %0 %1 */; lock; cmpxchgl %2,(%3)" : "=a" (oldv) : "a" (oldv), "r" (newv), "r" (pint) ); return(oldv); } /* * SYS/QLOCK.H * * (c)Copyright Matthew Dillon, 1999. BSD copyright /usr/src/COPYRIGHT is * to apply with the exception that the author's name, "Matthew Dillon", is to * replace the "Regents of the University of California". * * Implement high efficiency SMP-safe shared/exclusive locks with recursion * capabilities. * * See kern/kern_qlock.c for a detailed explanation * * $Id: Exp $ */ #ifndef _QLOCK_H_ #define _QLOCK_H_ #include struct proc; /* * qlock structure * * count * contains lock count. Positive numbers indicate shared * references, negative numbers indicate exclusive ( and * possibly recursive ) references. 0 indicates that the * lock is not held at all. * * waiting * If non-zero, indicates that someone is waiting for this lock. * Processes waiting for a lock are woken up when the lock count * passes through 0. * * holder * If an exclusive lock is being held, holder contains the process * pointer of the holder. Otherwise holder contains garbage * (but may be useful in crash dumps). If lock debugging is * turned on and shared locks are being held, holder contains * the last process to obtain the shared lock. * * The holder field is not updated atomically, but this does * not matter because it is only tested if an exclusive lock * is already held. We go through a little magic to deal with * the one race that occurs in qlock_try_mwr(). */ struct qlock { volatile int count; /* lock refs: neg=exclusive, pos=shared */ int waiting; /* process(s) are waiting on lock */ struct proc *holder; /* holder of an exclusive lock */ }; #define QLK_SPIN 0 #define QLK_SWITCH 1 #ifdef KERNEL /* * These functions may not be used outside this header file */ extern void qlock_init(struct qlock *lk); extern void _qlock_rd(struct qlock *lk); extern void _qlock_wr(struct qlock *lk); extern void _qlock_mwr(struct qlock *lk); extern int _qlock_sleepfail_rd(struct qlock *lk, char *wmesg, int catch, int timo); extern int _qlock_sleepfail_wr(struct qlock *lk, char *wmesg, int catch, int timo); extern void _qlock_upgrade(struct qlock *lk); extern void _qlock_ovf_panic(struct qlock *lk); extern void _qlock_op_panic(struct qlock *lk); extern void _qunlock_rd_panic(struct qlock *lk); extern void _qunlock_wr_panic(struct qlock *lk); /* * Optimized inlined functions for general use */ /* * qlock_try_rd * qlock_try_wr * qlock_try_mwr * * Attempt to obtain a shared lock, exclusive lock, or recursive-capable * exclusive lock. 0 is returned on success, EBUSY on failure. The loop * exists only to manage SMP collisions, the routine does not block. * * Note that qlock_*wr() needs to deal with the SMP race between the * count going from 0 to -1 and the lock holder getting set. It does * this by running the lock through an intermediate stage that blocks * other attempts to obtain exclusive locks until the holder can be set. */ static __inline int qlock_try_rd(struct qlock *lk) { int n; while ((n = lk->count) >= 0) { #ifdef INVARIANTS if (n == 0x7FFFFFFF) _qlock_ovf_panic(lk); #endif if (atomic_cmpex(&lk->count, n, n + 1) == n) { #ifdef DEBUG_LOCKS lk->holder = curproc; #endif return(0); } } return(EBUSY); } static __inline int qlock_try_wr(struct qlock *lk) { int n; while ((n = lk->count) == 0) { if (atomic_cmpex(&lk->count, 0, -(int)0x7FFFFFFF) == 0) { lk->holder = curproc; lk->count = -1; return(0); } } return(EBUSY); } static __inline int qlock_try_mwr(struct qlock *lk) { int n; while ((n = lk->count) <= 0) { if (n == 0) { if (atomic_cmpex(&lk->count, n, -(int)0x7FFFFFFF) == n) { lk->holder = curproc; lk->count = -1; return(0); } continue; } if (n == -(int)0x7FFFFFFF) continue; if (lk->holder != curproc) break; #ifdef INVARIANTS if (n == -(int)0x7FFFFFFE) _qlock_ovf_panic(lk); #endif if (atomic_cmpex(&lk->count, n, n - 1) == n) return(0); } return(EBUSY); } /* * qlock_spin_rd * qlock_spin_wr * qlock_spin_mwr * * Obtain a shared lock, exclusive lock, or recursive-capable * exclusive lock. These calls spin until they can get the lock. */ static __inline void qlock_spin_rd(struct qlock *lk) { while (qlock_try_rd(lk) != 0) ; } static __inline void qlock_spin_wr(struct qlock *lk) { while (qlock_try_wr(lk) != 0) ; } static __inline void qlock_spin_mwr(struct qlock *lk) { while (qlock_try_mwr(lk) != 0) ; } /* * qlock_require_rd * qlock_require_wr * qlock_require_mwr * * Obtain a shared lock, exclusive lock, or recursive-capable * exclusive lock. We expect to be able to obtain the lock without * having to block, and panic if we cannot. */ static __inline void qlock_require_rd(struct qlock *lk) { if (qlock_try_rd(lk) != 0) panic("qlock: failed to obtain shared lock %p", lk); } static __inline void qlock_require_wr(struct qlock *lk) { if (qlock_try_wr(lk) != 0) panic("qlock: failed to obtain exclusive lock %p", lk); } static __inline void qlock_require_mwr(struct qlock *lk) { if (qlock_try_mwr(lk) != 0) panic("qlock: failed to obtain m-exclusive lock %p", lk); } /* * qlock_rd * qlock_wr * qlock_mwr * * Obtain a shared lock, exclusive lock, or recursive-capable * exclusive lock. These routines will block until they get * the requested lock. */ static __inline void qlock_rd(struct qlock *lk) { if (qlock_try_rd(lk) != 0) _qlock_rd(lk); } static __inline void qlock_wr(struct qlock *lk) { if (qlock_try_wr(lk) != 0) _qlock_wr(lk); } static __inline void qlock_mwr(struct qlock *lk) { if (qlock_try_mwr(lk) != 0) _qlock_mwr(lk); } /* * qlock_sleepfail_rd * qlock_sleepfail_wr * * Obtain a shared lock or an exclusive lock and include options for * catching signals and timeouts. Note that we do not support * recursive exclusive locks (yet). * * These routines will block until they get the requested lock. * 0 is returned if the lock was obtained, and an appropriate * error is returned otherwise (similar to lockmgr() locks). */ static __inline int qlock_sleepfail_rd(struct qlock *lk, char *wmesg, int catch, int timo) { if (qlock_try_rd(lk) != 0) return(_qlock_sleepfail_rd(lk, wmesg, catch, timo)); return(0); } static __inline int qlock_sleepfail_wr(struct qlock *lk, char *wmesg, int catch, int timo) { if (qlock_try_wr(lk) != 0) return(_qlock_sleepfail_wr(lk, wmesg, catch, timo)); return(0); } /* * qunlock_rd * qunlock_wr * qunlock * * Release a shared or exclusive lock. The qunlock() function can release * either type of lock while the qunlock_rd/wr functions can only release * a specific type of lock. * * Note that we do not bother clearing holder when the count transitions * to 0. If we were to do this, note that we would have to go through * the special count state to avoid SMP races with holder. * * These routines do not block. */ static __inline void qunlock_rd(struct qlock *lk) { for (;;) { int n; if ((n = lk->count) <= 0) _qunlock_rd_panic(lk); if (atomic_cmpex(&lk->count, n, n - 1) == n) { if (n == 1 && lk->waiting) { lk->waiting = 0; wakeup(lk); } break; } } } static __inline void qunlock_wr(struct qlock *lk) { for (;;) { int n; if ((n = lk->count) >= 0) _qunlock_wr_panic(lk); if (atomic_cmpex(&lk->count, n, n + 1) == n) { if (n == -1 && lk->waiting) { lk->waiting = 0; wakeup(lk); } break; } } } static __inline void qunlock(struct qlock *lk) { for (;;) { int n; int xn; if ((n = lk->count) == 0) _qunlock_wr_panic(lk); if (n < 0) xn = n + 1; else xn = n - 1; if (atomic_cmpex(&lk->count, n, xn) == n) { if (xn == 0 && lk->waiting) { lk->waiting = 0; wakeup(lk); } break; } } } /* * qlockfree: * * Free a lock. At the moment all we need to do is check that nobody * is holding the lock, and panic if someone is. */ static __inline void qlockfree(struct qlock *lk) { if (lk->count != 0) panic("freed lock %p is locked", lk); } /* * qlockabscount: * * Return the current lock count as a positive value. The number of * held shared or exclusive locks is returned. A value of 0 indicates * that no locks are being held. */ static __inline int qlockabscount(struct qlock *lk) { if (lk->count < 0) return(-lk->count); else return(lk->count); } /* * qlockcount: * * Return the current lock count. A positive value indicates that N * shared locks are being held. A negative value indicates that -N * exclusive locks are being held (N can only be < -1 if recursive * exclusive locks are being held). A value of 0 indicates that no * locks are being held. */ static __inline int qlockcount(struct qlock *lk) { return(lk->count); } /* * qlock_try_upgrade shared -> exclusive * qlock_upgrade shared -> exclusive * qlock_downgrade exclusive -> shared * * Upgrade or downgrade an existing lock. Recursion is NOT supported * for either. Also note that in order to avoid odd deadlock situations, * qlock_upgrade will release the currently held shared lock prior to * attempting to obtain the new exclusive lock. qlock_try_upgrade() * on the otherhand only succeeds if it is able to upgrade the lock * atomically. * * When upgrading a lock, we have to deal with a race between the * lock holder field and the count going negative. We do this by * staging the count through a special value. */ static __inline int qlock_try_upgrade(struct qlock *lk) { if (lk->count == 1) { if (atomic_cmpex(&lk->count, 1, -(int)0x7FFFFFFF) == 1) { lk->holder = curproc; lk->count = -1; return(0); } } return(EBUSY); } static __inline void qlock_upgrade(struct qlock *lk) { if (qlock_try_upgrade(lk) == EBUSY) _qlock_upgrade(lk); } static __inline void qlock_downgrade(struct qlock *lk) { int n; if ( (n = lk->count) != -1 || lk->holder != curproc || atomic_cmpex(&lk->count, n, 1) != n ) { _qlock_op_panic(lk); } if (lk->waiting) { lk->waiting = 0; wakeup(lk); } } #endif #endif /* !_QLOCK_H_ */ /* * KERN/KERN_QLOCK.C * * (c)Copyright Matthew Dillon, 1999. BSD copyright /usr/src/COPYRIGHT is * to apply with the exception that the author's name, "Matthew Dillon", is to * replace the "Regents of the University of California". * * Provides highly efficient SMP-capable shared & exclusive locks. Most of * the meat of the code is in sys/qlock.h. These locks are designed to * operate efficiently in either an SMP or non-SMP environment. A minimum * number of instructions and conditions are utilized to implement the locks. * * Please note that while we do not do it here, it is possible to generate * inlines that support single-entry-point calls with an internal switch. * If the lock type is passed to such an inline the compiler will be able * to optimize each instance to a single case statment in the switch, * producing optimal code. * * These locks are not meant to be complex and additional functionality should * not be added to them if it would effect the efficiency of existing calls. * error returns are close to what lockmgr() gives us. We use hybrid inline * functions to make the common-case as fast and tight as possible. * * "recursion" is defined as being able to obtain multiple locks of the same * type from the same process context. Note that qlocks allow only shared or * only exclusive locks to be associated with the structure at any given * moment, using negative values to count exclusive locks and positive values * to count shared locks. * * "blocks" is defined to mean that the call may block. * * Two types of locks are supported: shared and exclusive. Shared locks * usually support recursion, Exclusive locks can optionally support * recursion. The more esoteric functions (upgrades, downgrades, ...) * generally cannot support recursion. The general unlock code always * supports recursion. * * call recurs blocks type * ----- ----- ----- ----- * * qlock_try_rd inline yes no shared * qlock_try_wr inline no no exclusive * qlock_try_mwr inline yes no exclusive * * qlock_spin_rd inline yes spins shared * qlock_spin_wr inline no spins exclusive * qlock_spin_mwr inline yes spins exclusive * * qlock_require_rd inline yes no(1) shared * qlock_require_wr inline no no(1) exclusive * qlock_require_mwr inline yes no(1) exclusive * * note (1): guarentees lock is obtained non-blocking, * panics if it cannot be obtained. * * qlock_rd hybrid yes yes shared * qlock_wr hybrid no yes exclusive * qlock_mwr hybrid yes yes exclusive * * qlock_sleepfail_rd hybrid no yes(2) shared * qlock_sleepfail_wr hybrid no yes(2) exclusive * * note (2): if the call blocks, it does not attempt to * retry the lock prior to returning. * * qunlock_rd inline yes no shared * qunlock_wr inline yes no exclusive * qunlock inline yes no shared or exclusive * * qlock_upgrade hybrid no(3) yes shared->exclusive * qlock_downgrade inline no(3) no exclusive->shared * * note (3): NOTE! these routines cannot operate with * recursive locks, and the lock may be lost for a period * within a qlock_upgrade() call. * * qlockfree inline * qlockabscount inline */ #include #include #include #include /* * qlock_init: * * Initialize a qlock. At the moment we need only zero the fields, * but this may change in the future. */ void qlock_init(struct qlock *lk) { lk->count = 0; lk->waiting = 0; lk->holder = NULL; } #if 0 /* * Inlines are available for these, but sometimes it is more efficient to * call a subroutine to deal with it. */ void qlock_call_rd(struct qlock *lk) { qlock_rd(lk); } void qlock_call_wr(struct qlock *lk) { qlock_wr(lk); } #endif /* * These core routines are only called from sys/qlock.h and should never be * called directly. They typically implement the non-trivial cases for the * inlines. */ void _qlock_rd(struct qlock *lk) { for (;;) { asleep(lk, PRIBIO + 4, "qlkrd", 0); lk->waiting = 1; if (qlock_try_rd(lk) == 0) return; await(-1, -1); } } void _qlock_wr(struct qlock *lk) { for (;;) { asleep(lk, PRIBIO + 4, "qlkwr", 0); lk->waiting = 1; if (qlock_try_wr(lk) == 0) return; await(-1, -1); } } void _qlock_mwr(struct qlock *lk) { for (;;) { asleep(lk, PRIBIO + 4, "qlkwr", 0); lk->waiting = 1; if (qlock_try_mwr(lk) == 0) return; await(-1, -1); } } int _qlock_sleepfail_rd(struct qlock *lk, char *wmesg, int catch, int timo) { int r = 0; asleep(lk, (PRIBIO + 4) | catch, wmesg, timo); lk->waiting = 1; if (qlock_try_rd(lk) != 0) { r = await(-1, -1); if (r == 0) r = ENOLCK; } return(r); } int _qlock_sleepfail_wr(struct qlock *lk, char *wmesg, int catch, int timo) { int r = 0; asleep(lk, (PRIBIO + 4) | catch, wmesg, timo); lk->waiting = 1; if (qlock_try_wr(lk) != 0) { r = await(-1, -1); if (r == 0) r = ENOLCK; } return(r); } void _qlock_upgrade(struct qlock *lk) { /* * First release the existing shared lock */ for (;;) { int n = lk->count; if (n <= 0) _qlock_op_panic(lk); if (atomic_cmpex(&lk->count, n, n - 1) == n) break; } /* * Then obtain a new exclusive lock */ qlock_wr(lk); } /* * The panic functions collapse the code overhead into one place, reducing * the codespace wasteage of the inlines. */ void _qlock_ovf_panic(struct qlock *lk) { panic("qlock_rd/wr: %p count overflow", lk); } void _qlock_op_panic(struct qlock *lk) { panic("qlock: %p illegal operation on lock state %d", lk, lk->count); } void _qunlock_rd_panic(struct qlock *lk) { panic("qunlock_rd: %p not holding shared lock", lk); } void _qunlock_wr_panic(struct qlock *lk) { panic("qunlock_rd: %p not holding exclusive lock", lk); } To Unsubscribe: send mail to majordomo@FreeBSD.org with "unsubscribe freebsd-smp" in the body of the message