Date: Wed, 17 Mar 2010 03:02:48 +0000 (UTC) From: Lawrence Stewart <lstewart@FreeBSD.org> To: src-committers@freebsd.org, svn-src-user@freebsd.org Subject: svn commit: r205237 - in user/lstewart/alq_varlen_head/sys: kern sys Message-ID: <201003170302.o2H32mpw075673@svn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: lstewart Date: Wed Mar 17 03:02:48 2010 New Revision: 205237 URL: http://svn.freebsd.org/changeset/base/205237 Log: - Rework the way thread ordering is enforced so that it actually behaves as expected (issue discovered during detailed testing). Ordering is now an off-by-default option that can be enabled at ALQ creation time using the ALQ_ORDERED flag. - Add an alq_open_flags() KPI call to allow the new ALQ_ORDERED flag to be specified. alq_open() is now implemented as a wrapper around alq_open_flags. - Rename alq_postn() to alq_post_flags() to keep the naming consistent. - Keep a record of some useful debugging printf's (will be removed in a later diff). - Remove some no longer relevant assertions. - Introduce the AQ_VARLEN flag, used internally to indicate the ALQ is variable length message capable. - Protect alq_getn/alq_post from 0 length writes so that the "use less than you asked for" feature works in contexts where no data may be generated. Sponsored by: FreeBSD Foundation Modified: user/lstewart/alq_varlen_head/sys/kern/kern_alq.c user/lstewart/alq_varlen_head/sys/sys/alq.h Modified: user/lstewart/alq_varlen_head/sys/kern/kern_alq.c ============================================================================== --- user/lstewart/alq_varlen_head/sys/kern/kern_alq.c Wed Mar 17 02:48:14 2010 (r205236) +++ user/lstewart/alq_varlen_head/sys/kern/kern_alq.c Wed Mar 17 03:02:48 2010 (r205237) @@ -54,15 +54,16 @@ __FBSDID("$FreeBSD$"); /* Async. Logging Queue */ struct alq { + char *aq_entbuf; /* Buffer for stored entries */ int aq_entmax; /* Max entries */ int aq_entlen; /* Entry length */ int aq_freebytes; /* Bytes available in buffer */ int aq_buflen; /* Total length of our buffer */ - char *aq_entbuf; /* Buffer for stored entries */ int aq_writehead; /* Location for next write */ int aq_writetail; /* Flush starts at this location */ int aq_wrapearly; /* # bytes left blank at end of buf */ int aq_flags; /* Queue flags */ + int aq_waiters; /* Num threads waiting for resources */ struct ale aq_getpost; /* ALE for use by get/post */ struct mtx aq_mtx; /* Queue lock */ struct vnode *aq_vp; /* Open vnode handle */ @@ -75,6 +76,8 @@ struct alq { #define AQ_ACTIVE 0x0002 /* on the active list */ #define AQ_FLUSHING 0x0004 /* doing IO */ #define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ +#define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */ +#define AQ_VARLEN 0x0020 /* Queue is variable length capable */ #define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) @@ -200,7 +203,7 @@ ald_daemon(void) needwakeup = alq_doio(alq); ALQ_UNLOCK(alq); if (needwakeup) - wakeup_one(alq); + wakeup(alq); ALD_LOCK(); } @@ -334,6 +337,9 @@ alq_doio(struct alq *alq) totlen = aiov[0].iov_len + aiov[1].iov_len; } + /*printf("Flushing %d bytes to disk, aq_freebytes==%d\n", totlen, + alq->aq_freebytes);*/ + alq->aq_flags |= AQ_FLUSHING; ALQ_UNLOCK(alq); @@ -366,6 +372,9 @@ alq_doio(struct alq *alq) alq->aq_buflen; alq->aq_freebytes += totlen + wrapearly; + /*printf("Flushed %d bytes to disk, aq_freebytes==%d, AQ_WANTED==%d\n", + totlen, alq->aq_freebytes, alq->aq_flags & AQ_WANTED);*/ + /* * If we just flushed part of the buffer which wrapped, reset the * wrapearly indicator. @@ -374,8 +383,8 @@ alq_doio(struct alq *alq) alq->aq_wrapearly = 0; /* - * If we just flushed the buffer completely, - * reset indexes to 0 to minimise buffer wraps. + * If we just flushed the buffer completely, reset indexes to 0 to + * minimise buffer wraps. * This is also required to ensure alq_getn() can't wedge itself. */ if (!HAS_PENDING_DATA(alq)) @@ -407,14 +416,15 @@ SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, /* * Create the queue data structure, allocate the buffer, and open the file. */ + int -alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, - int size, int count) +alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, + int size, int count, int flags) { struct thread *td; struct nameidata nd; struct alq *alq; - int flags; + int oflags; int error; int vfslocked; @@ -425,9 +435,9 @@ alq_open(struct alq **alqp, const char * td = curthread; NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td); - flags = FWRITE | O_NOFOLLOW | O_CREAT; + oflags = FWRITE | O_NOFOLLOW | O_CREAT; - error = vn_open_cred(&nd, &flags, cmode, 0, cred, NULL); + error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL); if (error) return (error); @@ -453,11 +463,14 @@ alq_open(struct alq **alqp, const char * alq->aq_buflen = size; alq->aq_entmax = 0; alq->aq_entlen = 0; + alq->aq_flags |= AQ_VARLEN; } alq->aq_freebytes = alq->aq_buflen; alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); alq->aq_writehead = alq->aq_writetail = 0; + if (flags & ALQ_ORDERED) + alq->aq_flags |= AQ_ORDERED; if ((error = ald_add(alq)) != 0) { alq_destroy(alq); @@ -503,27 +516,47 @@ alq_writen(struct alq *alq, void *data, } /* + * If we want ordered writes and there are threads already waiting for + * resources to become available, spin until we're woken. + */ + if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { + /*printf("tid %d order sleep, wants %d bytes (%d avail)\n", + curthread->td_tid, len, alq->aq_freebytes);*/ + alq->aq_waiters++; + msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwriten", 0); + alq->aq_waiters--; + /*printf("tid %d order woken, wants %d bytes (%d avail)\n", + curthread->td_tid, len, alq->aq_freebytes);*/ + } + + /* * ALQ_WAITOK or alq->aq_freebytes > len, either spin until - * we have enough free bytes (former) or skip (latter). However in the - * latter case, we can't skip if other threads are already - * waiting (AQ_WANTED is set), otherwise records can get out of order. + * we have enough free bytes (former) or skip (latter). */ - while ((alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) - || alq->aq_flags & AQ_WANTED) { + while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { + /*printf("tid %d sleep, wants %d bytes (%d avail)\n", + curthread->td_tid, len, alq->aq_freebytes);*/ alq->aq_flags |= AQ_WANTED; + alq->aq_waiters++; msleep_spin(alq, &alq->aq_mtx, "alqwriten", 0); - KASSERT(!(alq->aq_flags & AQ_WANTED), - ("AQ_WANTED should have been unset!")); + alq->aq_waiters--; + /*printf("tid %d woken, wants %d bytes (%d avail)\n", + curthread->td_tid, len, alq->aq_freebytes);*/ } + /*printf("tid %d got %d bytes (%d avail, %d waiters)\n", + curthread->td_tid, len, alq->aq_freebytes, alq->aq_waiters);*/ + /* - * We need to serialise wakeups to ensure records remain in order. - * Therefore, wakeup the next thread in the queue waiting for - * ALQ resources to be available. - * (Technically this is only required if we actually entered the above - * while loop.) + * If there are waiters, wakeup the next thread in the queue waiting for + * ALQ resources. */ - wakeup_one(alq); + if (alq->aq_waiters > 0) { + if (alq->aq_flags & AQ_ORDERED) + wakeup_one(&alq->aq_waiters); + else + wakeup(alq); + } /* Bail if we're shutting down. */ if (alq->aq_flags & AQ_SHUTDOWN) { @@ -569,6 +602,8 @@ alq_writen(struct alq *alq, void *data, activate = 1; } + KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue emtpy!", __func__)); + ALQ_UNLOCK(alq); if (activate) { @@ -584,7 +619,7 @@ int alq_write(struct alq *alq, void *data, int flags) { /* Should only be called in fixed length message (legacy) mode. */ - KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0), + KASSERT((!(alq->aq_flags & AQ_VARLEN)), ("%s: fixed length write on variable length queue", __func__)); return (alq_writen(alq, data, alq->aq_entlen, flags)); } @@ -651,19 +686,31 @@ alq_getn(struct alq *alq, int len, int f } /* + * If we want ordered writes and there are threads already waiting for + * resources to become available, spin until we're woken. + */ + if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { + /*printf("tid %d order sleep, wants %d bytes (%d avail)\n", + curthread->td_tid, len, alq->aq_freebytes);*/ + alq->aq_waiters++; + msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgetn", 0); + alq->aq_waiters--; + /*printf("tid %d order woken, wants %d bytes (%d avail)\n", + curthread->td_tid, len, alq->aq_freebytes);*/ + } + + /* * ALQ_WAITOK or contigbytes >= len, * either spin until we have enough free contiguous bytes (former) * or skip (latter). However, in the latter case, we can't skip if * other threads are already waiting (AQ_WANTED is set), otherwise * records can get out of order. */ - while ((contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) - || alq->aq_flags & AQ_WANTED) { + while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { alq->aq_flags |= AQ_WANTED; + alq->aq_waiters++; msleep_spin(alq, &alq->aq_mtx, "alqgetn", 0); - - KASSERT(!(alq->aq_flags & AQ_WANTED), - ("AQ_WANTED should have been unset!")); + alq->aq_waiters--; if (alq->aq_writehead <= alq->aq_writetail) contigbytes = alq->aq_freebytes; @@ -672,13 +719,15 @@ alq_getn(struct alq *alq, int len, int f } /* - * We need to serialise wakeups to ensure records remain in order. - * Therefore, wakeup the next thread in the queue waiting for - * ALQ resources to be available. - * (Technically this is only required if we actually entered the above - * while loop.) + * If there are waiters, wakeup the next thread in the queue waiting for + * ALQ resources. */ - wakeup_one(alq); + if (alq->aq_waiters > 0) { + if (alq->aq_flags & AQ_ORDERED) + wakeup_one(&alq->aq_waiters); + else + wakeup(alq); + } /* Bail if we're shutting down. */ if (alq->aq_flags & AQ_SHUTDOWN) { @@ -700,32 +749,39 @@ struct ale * alq_get(struct alq *alq, int flags) { /* Should only be called in fixed length message (legacy) mode. */ - KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0), + KASSERT((!(alq->aq_flags & AQ_VARLEN)), ("%s: fixed length get on variable length queue", __func__)); return (alq_getn(alq, alq->aq_entlen, flags)); } void -alq_postn(struct alq *alq, struct ale *ale, int flags) +alq_post_flags(struct alq *alq, struct ale *ale, int flags) { int activate; activate = 0; - if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) { - alq->aq_flags |= AQ_ACTIVE; - activate = 1; - } + if (ale->ae_bytesused > 0) { + if (!(alq->aq_flags & AQ_ACTIVE) && + !(flags & ALQ_NOACTIVATE)) { + alq->aq_flags |= AQ_ACTIVE; + activate = 1; + } - alq->aq_writehead += ale->ae_bytesused; - alq->aq_freebytes -= ale->ae_bytesused; + alq->aq_writehead += ale->ae_bytesused; + alq->aq_freebytes -= ale->ae_bytesused; - /* Wrap aq_writehead if we've filled to the end of the buffer. */ - if (alq->aq_writehead == alq->aq_buflen) - alq->aq_writehead = 0; + /* Wrap aq_writehead if we filled to the end of the buffer. */ + if (alq->aq_writehead == alq->aq_buflen) + alq->aq_writehead = 0; + + KASSERT((alq->aq_writehead >= 0 && + alq->aq_writehead < alq->aq_buflen), + ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", + __func__)); - KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen), - ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__)); + KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue emtpy!", __func__)); + } ALQ_UNLOCK(alq); @@ -761,7 +817,7 @@ alq_flush(struct alq *alq) ALQ_UNLOCK(alq); if (needwakeup) - wakeup_one(alq); + wakeup(alq); } /* Modified: user/lstewart/alq_varlen_head/sys/sys/alq.h ============================================================================== --- user/lstewart/alq_varlen_head/sys/sys/alq.h Wed Mar 17 02:48:14 2010 (r205236) +++ user/lstewart/alq_varlen_head/sys/sys/alq.h Wed Mar 17 03:02:48 2010 (r205237) @@ -56,6 +56,7 @@ struct ale { #define ALQ_NOWAIT 0x0001 #define ALQ_WAITOK 0x0002 #define ALQ_NOACTIVATE 0x0004 +#define ALQ_ORDERED 0x0010 /* Suggested mode for file creation. */ #define ALQ_DEFAULT_CMODE 0600 @@ -77,8 +78,15 @@ struct ale { * error from open or 0 on success */ struct ucred; -int alq_open(struct alq **, const char *file, struct ucred *cred, int cmode, - int size, int count); +int alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, + int size, int count, int flags); + +static __inline int +alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, + int size, int count) +{ + return alq_open_flags(alqp, file, cred, cmode, size, count, 0); +} /* * alq_writen: Write data into the queue @@ -133,12 +141,12 @@ struct ale *alq_get(struct alq *alq, int * ale An asynch logging entry returned by alq_get. * flags ALQ_NOACTIVATE */ -void alq_postn(struct alq *alq, struct ale *ale, int flags); +void alq_post_flags(struct alq *alq, struct ale *ale, int flags); static __inline void alq_post(struct alq *alq, struct ale *ale) { - alq_postn(alq, ale, 0); + alq_post_flags(alq, ale, 0); } #endif /* _SYS_ALQ_H_ */
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201003170302.o2H32mpw075673>