Date: Sat, 25 Oct 2008 01:46:29 +0000 (UTC) From: Alfred Perlstein <alfred@FreeBSD.org> To: src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-releng@freebsd.org Subject: svn commit: r184239 - in releng/6.4: lib/libthr lib/libthr/thread sys sys/kern Message-ID: <200810250146.m9P1kTxL004369@svn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: alfred Date: Sat Oct 25 01:46:29 2008 New Revision: 184239 URL: http://svn.freebsd.org/changeset/base/184239 Log: Merge r184172 (pthread condvar race fix) into 6.4-release. Reviewed by: re, davidxu Approved by: re Modified: releng/6.4/lib/libthr/ (props changed) releng/6.4/lib/libthr/thread/thr_cond.c releng/6.4/lib/libthr/thread/thr_private.h releng/6.4/sys/ (props changed) releng/6.4/sys/kern/kern_umtx.c Modified: releng/6.4/lib/libthr/thread/thr_cond.c ============================================================================== --- releng/6.4/lib/libthr/thread/thr_cond.c Sat Oct 25 01:25:29 2008 (r184238) +++ releng/6.4/lib/libthr/thread/thr_cond.c Sat Oct 25 01:46:29 2008 (r184239) @@ -71,7 +71,7 @@ cond_init(pthread_cond_t *cond, const pt _thr_umtx_init(&pcond->c_lock); pcond->c_seqno = 0; pcond->c_waiters = 0; - pcond->c_wakeups = 0; + pcond->c_broadcast = 0; if (cond_attr == NULL || *cond_attr == NULL) { pcond->c_pshared = 0; pcond->c_clockid = CLOCK_REALTIME; @@ -122,7 +122,7 @@ _pthread_cond_destroy(pthread_cond_t *co else { /* Lock the condition variable structure: */ THR_LOCK_ACQUIRE(curthread, &(*cond)->c_lock); - if ((*cond)->c_waiters + (*cond)->c_wakeups != 0) { + if ((*cond)->c_waiters != 0) { THR_LOCK_RELEASE(curthread, &(*cond)->c_lock); return (EBUSY); } @@ -166,14 +166,13 @@ cond_cancel_handler(void *arg) cv = *(cci->cond); THR_LOCK_ACQUIRE(curthread, &cv->c_lock); - if (cv->c_seqno != cci->seqno && cv->c_wakeups != 0) { - if (cv->c_waiters > 0) { - cv->c_seqno++; - _thr_umtx_wake(&cv->c_seqno, 1); - } else - cv->c_wakeups--; - } else { - cv->c_waiters--; + if (--cv->c_waiters == 0) + cv->c_broadcast = 0; + if (cv->c_seqno != cci->seqno) { + _thr_umtx_wake(&cv->c_seqno, 1); + /* cv->c_seqno++; XXX why was this here? */ + _thr_umtx_wake(&cv->c_seqno, 1); + } THR_LOCK_RELEASE(curthread, &cv->c_lock); @@ -191,6 +190,7 @@ cond_wait_common(pthread_cond_t *cond, p long seq, oldseq; int oldcancel; int ret = 0; + int loops = -1; /* * If the condition variable is statically initialized, @@ -202,18 +202,24 @@ cond_wait_common(pthread_cond_t *cond, p cv = *cond; THR_LOCK_ACQUIRE(curthread, &cv->c_lock); + oldseq = cv->c_seqno; ret = _mutex_cv_unlock(mutex); if (ret) { THR_LOCK_RELEASE(curthread, &cv->c_lock); return (ret); } - oldseq = seq = cv->c_seqno; + seq = cv->c_seqno; cci.mutex = mutex; cci.cond = cond; cci.seqno = oldseq; cv->c_waiters++; - do { + /* + * loop if we have never been told to wake up + * or we lost a race. + */ + while (seq == oldseq /* || cv->c_wakeups == 0*/) { + loops++; THR_LOCK_RELEASE(curthread, &cv->c_lock); if (abstime != NULL) { @@ -232,24 +238,23 @@ cond_wait_common(pthread_cond_t *cond, p } else { ret = _thr_umtx_wait(&cv->c_seqno, seq, tsp); } + /* + * If we get back EINTR we want to loop as condvars + * do NOT return EINTR, they just restart. + */ THR_LOCK_ACQUIRE(curthread, &cv->c_lock); seq = cv->c_seqno; if (abstime != NULL && ret == ETIMEDOUT) break; - /* - * loop if we have never been told to wake up - * or we lost a race. - */ - } while (seq == oldseq || cv->c_wakeups == 0); - - if (seq != oldseq && cv->c_wakeups != 0) { - cv->c_wakeups--; - ret = 0; - } else { - cv->c_waiters--; } + + if (--cv->c_waiters == 0) + cv->c_broadcast = 0; + if (seq != oldseq) + ret = 0; + THR_LOCK_RELEASE(curthread, &cv->c_lock); _mutex_cv_lock(mutex); return (ret); @@ -298,7 +303,7 @@ cond_signal_common(pthread_cond_t *cond, { struct pthread *curthread = _get_curthread(); pthread_cond_t cv; - int ret = 0, oldwaiters; + int ret = 0; /* * If the condition variable is statically initialized, perform dynamic @@ -311,19 +316,15 @@ cond_signal_common(pthread_cond_t *cond, cv = *cond; /* Lock the condition variable structure. */ THR_LOCK_ACQUIRE(curthread, &cv->c_lock); + cv->c_seqno++; + if (cv->c_broadcast == 0) + cv->c_broadcast = broadcast; + if (cv->c_waiters) { - if (!broadcast) { - cv->c_wakeups++; - cv->c_waiters--; - cv->c_seqno++; + if (cv->c_broadcast) + _thr_umtx_wake(&cv->c_seqno, INT_MAX); + else _thr_umtx_wake(&cv->c_seqno, 1); - } else { - oldwaiters = cv->c_waiters; - cv->c_wakeups += cv->c_waiters; - cv->c_waiters = 0; - cv->c_seqno++; - _thr_umtx_wake(&cv->c_seqno, oldwaiters); - } } THR_LOCK_RELEASE(curthread, &cv->c_lock); return (ret); Modified: releng/6.4/lib/libthr/thread/thr_private.h ============================================================================== --- releng/6.4/lib/libthr/thread/thr_private.h Sat Oct 25 01:25:29 2008 (r184238) +++ releng/6.4/lib/libthr/thread/thr_private.h Sat Oct 25 01:46:29 2008 (r184239) @@ -166,7 +166,7 @@ struct pthread_cond { volatile umtx_t c_lock; volatile umtx_t c_seqno; volatile int c_waiters; - volatile int c_wakeups; + volatile int c_broadcast; int c_pshared; int c_clockid; }; Modified: releng/6.4/sys/kern/kern_umtx.c ============================================================================== --- releng/6.4/sys/kern/kern_umtx.c Sat Oct 25 01:25:29 2008 (r184238) +++ releng/6.4/sys/kern/kern_umtx.c Sat Oct 25 01:46:29 2008 (r184239) @@ -36,6 +36,7 @@ __FBSDID("$FreeBSD$"); #include <sys/malloc.h> #include <sys/mutex.h> #include <sys/proc.h> +#include <sys/sysctl.h> #include <sys/sysent.h> #include <sys/systm.h> #include <sys/sysproto.h> @@ -81,6 +82,8 @@ struct umtx_key { struct umtx_q { LIST_ENTRY(umtx_q) uq_next; /* Linked list for the hash. */ struct umtx_key uq_key; /* Umtx key. */ + int uq_flags; +#define UQF_UMTXQ 0x0001 struct thread *uq_thread; /* The thread waits on. */ LIST_ENTRY(umtx_q) uq_rqnext; /* Linked list for requeuing. */ vm_offset_t uq_addr; /* Umtx's virtual address. */ @@ -229,9 +232,7 @@ umtxq_insert(struct umtx_q *uq) mtx_assert(umtxq_mtx(chain), MA_OWNED); head = &umtxq_chains[chain].uc_queue; LIST_INSERT_HEAD(head, uq, uq_next); - mtx_lock_spin(&sched_lock); - uq->uq_thread->td_flags |= TDF_UMTXQ; - mtx_unlock_spin(&sched_lock); + uq->uq_flags |= UQF_UMTXQ; } /* @@ -241,12 +242,10 @@ static inline void umtxq_remove(struct umtx_q *uq) { mtx_assert(umtxq_mtx(umtxq_hash(&uq->uq_key)), MA_OWNED); - if (uq->uq_thread->td_flags & TDF_UMTXQ) { + if (uq->uq_flags & UQF_UMTXQ) { LIST_REMOVE(uq, uq_next); - /* turning off TDF_UMTXQ should be the last thing. */ - mtx_lock_spin(&sched_lock); - uq->uq_thread->td_flags &= ~TDF_UMTXQ; - mtx_unlock_spin(&sched_lock); + /* turning off UQF_UMTXQ should be the last thing. */ + uq->uq_flags &= ~UQF_UMTXQ; } } @@ -308,7 +307,7 @@ umtxq_sleep(struct thread *td, struct um static int umtx_key_get(struct thread *td, void *umtx, struct umtx_key *key) { -#if defined(UMTX_DYNAMIC_SHARED) || defined(UMTX_STATIC_SHARED) +#if defined(UMTX_STATIC_SHARED) vm_map_t map; vm_map_entry_t entry; vm_pindex_t pindex; @@ -321,20 +320,7 @@ umtx_key_get(struct thread *td, void *um &wired) != KERN_SUCCESS) { return EFAULT; } -#endif -#if defined(UMTX_DYNAMIC_SHARED) - key->type = UMTX_SHARED; - key->info.shared.offset = entry->offset + entry->start - - (vm_offset_t)umtx; - /* - * Add object reference, if we don't do this, a buggy application - * deallocates the object, the object will be reused by other - * applications, then unlock will wake wrong thread. - */ - vm_object_reference(key->info.shared.object); - vm_map_lookup_done(map, entry); -#elif defined(UMTX_STATIC_SHARED) if (VM_INHERIT_SHARE == entry->inheritance) { key->type = UMTX_SHARED; key->info.shared.offset = entry->offset + entry->start - @@ -380,74 +366,6 @@ umtxq_queue_me(struct thread *td, void * return (0); } -#if defined(UMTX_DYNAMIC_SHARED) -static void -fork_handler(void *arg, struct proc *p1, struct proc *p2, int flags) -{ - vm_map_t map; - vm_map_entry_t entry; - vm_object_t object; - vm_pindex_t pindex; - vm_prot_t prot; - boolean_t wired; - struct umtx_key key; - LIST_HEAD(, umtx_q) workq; - struct umtx_q *uq; - struct thread *td; - int onq; - - LIST_INIT(&workq); - - /* Collect threads waiting on umtxq */ - PROC_LOCK(p1); - FOREACH_THREAD_IN_PROC(p1, td) { - if (td->td_flags & TDF_UMTXQ) { - uq = td->td_umtxq; - if (uq) - LIST_INSERT_HEAD(&workq, uq, uq_rqnext); - } - } - PROC_UNLOCK(p1); - - LIST_FOREACH(uq, &workq, uq_rqnext) { - map = &p1->p_vmspace->vm_map; - if (vm_map_lookup(&map, uq->uq_addr, VM_PROT_WRITE, - &entry, &object, &pindex, &prot, &wired) != KERN_SUCCESS) { - continue; - } - key.type = UMTX_SHARED; - key.info.shared.object = object; - key.info.shared.offset = entry->offset + entry->start - - uq->uq_addr; - if (umtx_key_match(&key, &uq->uq_key)) { - vm_map_lookup_done(map, entry); - continue; - } - - umtxq_lock(&uq->uq_key); - umtxq_busy(&uq->uq_key); - if (uq->uq_thread->td_flags & TDF_UMTXQ) { - umtxq_remove(uq); - onq = 1; - } else - onq = 0; - umtxq_unbusy(&uq->uq_key); - umtxq_unlock(&uq->uq_key); - if (onq) { - vm_object_deallocate(uq->uq_key.info.shared.object); - uq->uq_key = key; - umtxq_lock(&uq->uq_key); - umtxq_busy(&uq->uq_key); - umtxq_insert(uq); - umtxq_unbusy(&uq->uq_key); - umtxq_unlock(&uq->uq_key); - vm_object_reference(uq->uq_key.info.shared.object); - } - vm_map_lookup_done(map, entry); - } -} -#endif - static int _do_lock(struct thread *td, struct umtx *umtx, long id, int timo) { @@ -526,7 +444,7 @@ _do_lock(struct thread *td, struct umtx * unlocking the umtx. */ umtxq_lock(&uq->uq_key); - if (old == owner && (td->td_flags & TDF_UMTXQ)) { + if (old == owner && (uq->uq_flags & UQF_UMTXQ)) { error = umtxq_sleep(td, &uq->uq_key, PCATCH, "umtx", timo); } @@ -705,7 +623,7 @@ _do_lock32(struct thread *td, uint32_t * * unlocking the umtx. */ umtxq_lock(&uq->uq_key); - if (old == owner && (td->td_flags & TDF_UMTXQ)) { + if (old == owner && (uq->uq_flags & UQF_UMTXQ)) { error = umtxq_sleep(td, &uq->uq_key, PCATCH, "umtx", timo); } @@ -825,35 +743,22 @@ do_wait(struct thread *td, struct umtx * tmp = fuword(&umtx->u_owner); else tmp = fuword32(&umtx->u_owner); + umtxq_lock(&uq->uq_key); if (tmp != id) { - umtxq_lock(&uq->uq_key); umtxq_remove(uq); - umtxq_unlock(&uq->uq_key); } else if (timeout == NULL) { - umtxq_lock(&uq->uq_key); - if (td->td_flags & TDF_UMTXQ) + if (uq->uq_flags & UQF_UMTXQ) error = umtxq_sleep(td, &uq->uq_key, PCATCH, "ucond", 0); - if (!(td->td_flags & TDF_UMTXQ)) - error = 0; - else - umtxq_remove(uq); - umtxq_unlock(&uq->uq_key); } else { getnanouptime(&ts); timespecadd(&ts, timeout); TIMESPEC_TO_TIMEVAL(&tv, timeout); for (;;) { - umtxq_lock(&uq->uq_key); - if (td->td_flags & TDF_UMTXQ) { + if (uq->uq_flags & UQF_UMTXQ) { error = umtxq_sleep(td, &uq->uq_key, PCATCH, - "ucond", tvtohz(&tv)); - } - if (!(td->td_flags & TDF_UMTXQ)) { - umtxq_unlock(&uq->uq_key); - goto out; + "ucondt", tvtohz(&tv)); } - umtxq_unlock(&uq->uq_key); if (error != ETIMEDOUT) break; getnanouptime(&ts2); @@ -865,14 +770,28 @@ do_wait(struct thread *td, struct umtx * timespecsub(&ts3, &ts2); TIMESPEC_TO_TIMEVAL(&tv, &ts3); } - umtxq_lock(&uq->uq_key); - umtxq_remove(uq); - umtxq_unlock(&uq->uq_key); } -out: + if (error != 0) { + if ((uq->uq_flags & UQF_UMTXQ) == 0) { + /* + * If we concurrently got do_cv_signal()d + * and we got an error or UNIX signals or a timeout, + * then, perform another umtxq_signal to avoid + * consuming the wakeup. This may cause supurious + * wakeup for another thread which was just queued, + * but SUSV3 explicitly allows supurious wakeup to + * occur, and indeed a kernel based implementation + * can not avoid it. + */ + if (!umtxq_signal(&uq->uq_key, 1)) + error = 0; + } + if (error == ERESTART) + error = EINTR; + } + umtxq_remove(uq); + umtxq_unlock(&uq->uq_key); umtx_key_release(&uq->uq_key); - if (error == ERESTART) - error = EINTR; return (error); }
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200810250146.m9P1kTxL004369>