Date: Mon, 20 Oct 2008 01:46:54 +0000 (UTC) From: Lawrence Stewart <lstewart@FreeBSD.org> To: src-committers@freebsd.org, svn-src-user@freebsd.org Subject: svn commit: r184066 - in user/lstewart/alq_varlen_8.x/sys: kern modules modules/alq sys Message-ID: <200810200146.m9K1ksJh057210@svn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: lstewart Date: Mon Oct 20 01:46:54 2008 New Revision: 184066 URL: http://svn.freebsd.org/changeset/base/184066 Log: Initial import of a patch to turn ALQ(9) into a KLD and add support for variable length messages. Patch currently compiles fine and appears to be usable in basic testing, but needs more extensive testing and perhaps some additional refinement. Todo: - Update ALQ(9) man page - Regression testing Discussed with: jeff@, rwatson@ Added: user/lstewart/alq_varlen_8.x/sys/modules/alq/ user/lstewart/alq_varlen_8.x/sys/modules/alq/Makefile (contents, props changed) Modified: user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c user/lstewart/alq_varlen_8.x/sys/modules/Makefile user/lstewart/alq_varlen_8.x/sys/sys/alq.h Modified: user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c ============================================================================== --- user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c Mon Oct 20 00:35:02 2008 (r184065) +++ user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c Mon Oct 20 01:46:54 2008 (r184066) @@ -1,5 +1,6 @@ /*- * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> + * Copyright (c) 2008, Lawrence Stewart <lstewart@freebsd.org> * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -51,14 +52,18 @@ __FBSDID("$FreeBSD$"); struct alq { int aq_entmax; /* Max entries */ int aq_entlen; /* Entry length */ + int aq_freebytes; /* Bytes available in buffer */ + int aq_buflen; /* Total length of our buffer */ char *aq_entbuf; /* Buffer for stored entries */ + int aq_writehead; + int aq_writetail; int aq_flags; /* Queue flags */ struct mtx aq_mtx; /* Queue lock */ struct vnode *aq_vp; /* Open vnode handle */ struct ucred *aq_cred; /* Credentials of the opening thread */ - struct ale *aq_first; /* First ent */ - struct ale *aq_entfree; /* First free ent */ - struct ale *aq_entvalid; /* First ent valid for writing */ + //struct ale *aq_first; /* First ent */ + //struct ale *aq_entfree; /* First free ent */ + //struct ale *aq_entvalid; /* First ent valid for writing */ LIST_ENTRY(alq) aq_act; /* List of active queues */ LIST_ENTRY(alq) aq_link; /* List of all queues */ }; @@ -182,8 +187,14 @@ ald_daemon(void) ALD_LOCK(); for (;;) { - while ((alq = LIST_FIRST(&ald_active)) == NULL) - msleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); + while ((alq = LIST_FIRST(&ald_active)) == NULL + && !ald_shutingdown) + mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); + + if (ald_shutingdown) { + ALD_UNLOCK(); + break; + } ALQ_LOCK(alq); ald_deactivate(alq); @@ -191,9 +202,11 @@ ald_daemon(void) needwakeup = alq_doio(alq); ALQ_UNLOCK(alq); if (needwakeup) - wakeup(alq); + wakeup_one(alq); ALD_LOCK(); } + + kthread_exit(); } static void @@ -204,6 +217,12 @@ ald_shutdown(void *arg, int howto) ALD_LOCK(); ald_shutingdown = 1; + /* wake ald_daemon so that it exits*/ + wakeup(&ald_active); + + /* wait for ald_daemon to exit */ + mtx_sleep(ald_thread, &ald_mtx, PWAIT, "aldslp", 0); + while ((alq = LIST_FIRST(&ald_queues)) != NULL) { LIST_REMOVE(alq, aq_link); ALD_UNLOCK(); @@ -244,41 +263,45 @@ alq_doio(struct alq *alq) struct vnode *vp; struct uio auio; struct iovec aiov[2]; - struct ale *ale; - struct ale *alstart; int totlen; int iov; int vfslocked; + KASSERT(alq->aq_freebytes != alq->aq_buflen, + ("%s: queue emtpy!", __func__) + ); + vp = alq->aq_vp; td = curthread; totlen = 0; iov = 0; - alstart = ale = alq->aq_entvalid; - alq->aq_entvalid = NULL; - bzero(&aiov, sizeof(aiov)); bzero(&auio, sizeof(auio)); - do { - if (aiov[iov].iov_base == NULL) - aiov[iov].iov_base = ale->ae_data; - aiov[iov].iov_len += alq->aq_entlen; - totlen += alq->aq_entlen; - /* Check to see if we're wrapping the buffer */ - if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data) - iov++; - ale->ae_flags &= ~AE_VALID; - ale = ale->ae_next; - } while (ale->ae_flags & AE_VALID); + /* start the write from the location of our buffer tail pointer */ + aiov[iov].iov_base = alq->aq_entbuf + alq->aq_writetail; + + if (alq->aq_writetail < alq->aq_writehead) { + /* buffer not wrapped */ + totlen = aiov[iov].iov_len = alq->aq_writehead - + alq->aq_writetail; + } else { + /* + * buffer wrapped, requires 2 aiov entries: + * - first is from writetail to end of buffer + * - second is from start of buffer to writehead + */ + aiov[iov].iov_len = alq->aq_buflen - alq->aq_writetail; + iov++; + aiov[iov].iov_base = alq->aq_entbuf; + aiov[iov].iov_len = alq->aq_writehead; + totlen = aiov[0].iov_len + aiov[1].iov_len; + } alq->aq_flags |= AQ_FLUSHING; ALQ_UNLOCK(alq); - if (iov == 2 || aiov[iov].iov_base == NULL) - iov--; - auio.uio_iov = &aiov[0]; auio.uio_offset = 0; auio.uio_segflg = UIO_SYSSPACE; @@ -308,8 +331,17 @@ alq_doio(struct alq *alq) ALQ_LOCK(alq); alq->aq_flags &= ~AQ_FLUSHING; - if (alq->aq_entfree == NULL) - alq->aq_entfree = alstart; + /* Adjust writetail as required, taking into account wrapping */ + alq->aq_writetail += (iov == 2) ? aiov[1].iov_len : totlen; + alq->aq_freebytes += totlen; + + /* + * If we just flushed the buffer completely, + * reset indexes to 0 to minimise buffer wraps + * This is also required to ensure alq_getn() can't wedge itself + */ + if (alq->aq_freebytes == alq->aq_buflen) + alq->aq_writehead = alq->aq_writetail = 0; if (alq->aq_flags & AQ_WANTED) { alq->aq_flags &= ~AQ_WANTED; @@ -340,13 +372,13 @@ alq_open(struct alq **alqp, const char * { struct thread *td; struct nameidata nd; - struct ale *ale; - struct ale *alp; struct alq *alq; - char *bufp; int flags; int error; - int i, vfslocked; + int vfslocked; + + KASSERT(size > 0, ("%s: size <= 0", __func__)); + KASSERT(count >= 0, ("%s: count < 0", __func__)); *alqp = NULL; td = curthread; @@ -365,31 +397,27 @@ alq_open(struct alq **alqp, const char * VFS_UNLOCK_GIANT(vfslocked); alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); - alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO); - alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO); alq->aq_vp = nd.ni_vp; alq->aq_cred = crhold(cred); - alq->aq_entmax = count; - alq->aq_entlen = size; - alq->aq_entfree = alq->aq_first; mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); - bufp = alq->aq_entbuf; - ale = alq->aq_first; - alp = NULL; - - /* Match up entries with buffers */ - for (i = 0; i < count; i++) { - if (alp) - alp->ae_next = ale; - ale->ae_data = bufp; - alp = ale; - ale++; - bufp += size; + if (count > 0) { + /* fixed length messages */ + alq->aq_buflen = size * count; + alq->aq_entmax = count; + alq->aq_entlen = size; + } else { + /* variable length messages */ + alq->aq_buflen = size; + alq->aq_entmax = 0; + alq->aq_entlen = 0; } - alp->ae_next = alq->aq_first; + alq->aq_freebytes = alq->aq_buflen; + alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); + + alq->aq_writehead = alq->aq_writetail = 0; if ((error = ald_add(alq)) != 0) return (error); @@ -403,46 +431,180 @@ alq_open(struct alq **alqp, const char * * wait or return an error depending on the value of waitok. */ int -alq_write(struct alq *alq, void *data, int waitok) +alq_write(struct alq *alq, void *data, int flags) { - struct ale *ale; + /* should only be called in fixed length message (legacy) mode */ + KASSERT(alq->aq_entmax > 0 && alq->aq_entlen > 0, + ("%s: fixed length write on variable length queue", __func__) + ); + return (alq_writen(alq, data, alq->aq_entlen, flags)); +} + +int +alq_writen(struct alq *alq, void *data, int len, int flags) +{ + int activate = 0; + int copy = len; + + KASSERT(len > 0 && len < alq->aq_buflen, + ("%s: len <= 0 || len > alq->aq_buflen", __func__) + ); + + ALQ_LOCK(alq); + + /* + * If the message is larger than our underlying buffer or + * there is not enough free space in our underlying buffer + * to accept the message and the user can't wait, return + */ + if ((len > alq->aq_buflen) || + ((flags & ALQ_NOWAIT) && (alq->aq_freebytes < len))) { + ALQ_UNLOCK(alq); + return (EWOULDBLOCK); + } + + /* + * ALQ_WAITOK or alq->aq_freebytes > len, + * either spin until we have enough free bytes (former) or skip (latter) + */ + while (alq->aq_freebytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) { + alq->aq_flags |= AQ_WANTED; + msleep_spin(alq, &alq->aq_mtx, "alqwriten", 0); + } - if ((ale = alq_get(alq, waitok)) == NULL) + /* + * we need to serialise wakups to ensure records remain in order... + * therefore, wakeup the next thread in the queue waiting for + * alq resources to be available + * (technically this is only required if we actually entered the above + * while loop) + */ + wakeup_one(alq); + + /* bail if we're shutting down */ + if (alq->aq_flags & AQ_SHUTDOWN) { + ALQ_UNLOCK(alq); return (EWOULDBLOCK); + } + + /* + * if we need to wrap the buffer to accommodate the write, + * we'll need 2 calls to bcopy + */ + if ((alq->aq_buflen - alq->aq_writehead) < len) + copy = alq->aq_buflen - alq->aq_writehead; + + /* copy (part of) message to the buffer */ + bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy); + alq->aq_writehead += copy; + + if (copy != len) { + /* + * wrap the buffer by copying the remainder of our message + * to the start of the buffer and resetting the head ptr + */ + bcopy(data, alq->aq_entbuf, len - copy); + alq->aq_writehead = copy; + } - bcopy(data, ale->ae_data, alq->aq_entlen); - alq_post(alq, ale); + alq->aq_freebytes -= len; + + if ((alq->aq_flags & AQ_ACTIVE) == 0) { + alq->aq_flags |= AQ_ACTIVE; + activate = 1; + } + + ALQ_UNLOCK(alq); + + if (activate) { + ALD_LOCK(); + ald_activate(alq); + ALD_UNLOCK(); + } return (0); } struct ale * -alq_get(struct alq *alq, int waitok) +alq_get(struct alq *alq, int flags) +{ + /* should only be called in fixed length message (legacy) mode */ + KASSERT(alq->aq_entmax > 0 && alq->aq_entlen > 0, + ("%s: fixed length get on variable length queue", __func__) + ); + return (alq_getn(alq, alq->aq_entlen, flags)); +} + +struct ale * +alq_getn(struct alq *alq, int len, int flags) { struct ale *ale; - struct ale *aln; + int contigbytes; - ale = NULL; + ale = malloc( sizeof(struct ale), + M_ALD, + (flags & ALQ_NOWAIT) ? M_NOWAIT : M_WAITOK + ); + + if (ale == NULL) + return (NULL); ALQ_LOCK(alq); - /* Loop until we get an entry or we're shutting down */ - while ((alq->aq_flags & AQ_SHUTDOWN) == 0 && - (ale = alq->aq_entfree) == NULL && - (waitok & ALQ_WAITOK)) { - alq->aq_flags |= AQ_WANTED; - msleep_spin(alq, &alq->aq_mtx, "alqget", 0); + /* determine the number of free contiguous bytes */ + if (alq->aq_writehead <= alq->aq_writetail) + contigbytes = alq->aq_freebytes; + else + contigbytes = alq->aq_buflen - alq->aq_writehead; + + /* + * If the message is larger than our underlying buffer or + * there is not enough free contiguous space in our underlying buffer + * to accept the message and the user can't wait, return + */ + if ((len > alq->aq_buflen) || + ((flags & ALQ_NOWAIT) && (contigbytes < len))) { + ALQ_UNLOCK(alq); + return (NULL); } - if (ale != NULL) { - aln = ale->ae_next; - if ((aln->ae_flags & AE_VALID) == 0) - alq->aq_entfree = aln; + /* + * ALQ_WAITOK or contigbytes > len, + * either spin until we have enough free contiguous bytes (former) + * or skip (latter) + */ + while (contigbytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) { + alq->aq_flags |= AQ_WANTED; + msleep_spin(alq, &alq->aq_mtx, "alqgetn", 0); + if (alq->aq_writehead <= alq->aq_writetail) + contigbytes = alq->aq_freebytes; else - alq->aq_entfree = NULL; - } else + contigbytes = alq->aq_buflen - alq->aq_writehead; + } + + /* + * we need to serialise wakups to ensure records remain in order... + * therefore, wakeup the next thread in the queue waiting for + * alq resources to be available + * (technically this is only required if we actually entered the above + * while loop) + */ + wakeup_one(alq); + + /* bail if we're shutting down */ + if (alq->aq_flags & AQ_SHUTDOWN) { ALQ_UNLOCK(alq); + return (NULL); + } + /* + * If we are here, we have a contiguous number of bytes >= len + * available in our buffer starting at aq_writehead. + */ + ale->ae_data = alq->aq_entbuf + alq->aq_writehead; + ale->ae_datalen = len; + alq->aq_writehead += len; + alq->aq_freebytes -= len; return (ale); } @@ -452,11 +614,6 @@ alq_post(struct alq *alq, struct ale *al { int activate; - ale->ae_flags |= AE_VALID; - - if (alq->aq_entvalid == NULL) - alq->aq_entvalid = ale; - if ((alq->aq_flags & AQ_ACTIVE) == 0) { alq->aq_flags |= AQ_ACTIVE; activate = 1; @@ -464,11 +621,14 @@ alq_post(struct alq *alq, struct ale *al activate = 0; ALQ_UNLOCK(alq); + if (activate) { ALD_LOCK(); ald_activate(alq); ALD_UNLOCK(); } + + free(ale, M_ALD); } void @@ -487,7 +647,7 @@ alq_flush(struct alq *alq) ALQ_UNLOCK(alq); if (needwakeup) - wakeup(alq); + wakeup_one(alq); } /* @@ -509,7 +669,49 @@ alq_close(struct alq *alq) alq_shutdown(alq); mtx_destroy(&alq->aq_mtx); - free(alq->aq_first, M_ALD); free(alq->aq_entbuf, M_ALD); free(alq, M_ALD); } + +static int alq_load_handler(module_t mod, int what, void *arg) +{ + int ret = 0; + + switch(what) { + case MOD_LOAD: + case MOD_UNLOAD: + case MOD_SHUTDOWN: + break; + + case MOD_QUIESCE: + ALD_LOCK(); + /* only allow unload if there are no open queues */ + if (LIST_FIRST(&ald_queues) == NULL) { + ald_shutingdown = 1; + ALD_UNLOCK(); + ald_shutdown(NULL, 0); + mtx_destroy(&ald_mtx); + } else { + ALD_UNLOCK(); + ret = EBUSY; + } + break; + + default: + ret = EINVAL; + break; + } + + return (ret); +} + +/* basic module data */ +static moduledata_t alq_mod = +{ + "alq", + alq_load_handler, /* execution entry point for the module */ + NULL +}; + +DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY); +MODULE_VERSION(alq, 1); Modified: user/lstewart/alq_varlen_8.x/sys/modules/Makefile ============================================================================== --- user/lstewart/alq_varlen_8.x/sys/modules/Makefile Mon Oct 20 00:35:02 2008 (r184065) +++ user/lstewart/alq_varlen_8.x/sys/modules/Makefile Mon Oct 20 01:46:54 2008 (r184066) @@ -17,6 +17,7 @@ SUBDIR= ${_3dfx} \ ${_aic} \ aic7xxx \ aio \ + alq \ ${_amd} \ amr \ ${_an} \ Added: user/lstewart/alq_varlen_8.x/sys/modules/alq/Makefile ============================================================================== --- /dev/null 00:00:00 1970 (empty, because file is newly added) +++ user/lstewart/alq_varlen_8.x/sys/modules/alq/Makefile Mon Oct 20 01:46:54 2008 (r184066) @@ -0,0 +1,10 @@ +# $FreeBSD$ + +.include <bsd.own.mk> + +.PATH: ${.CURDIR}/../../kern +KMOD=alq +SRCS=opt_mac.h vnode_if.h kern_alq.c + +.include <bsd.kmod.mk> + Modified: user/lstewart/alq_varlen_8.x/sys/sys/alq.h ============================================================================== --- user/lstewart/alq_varlen_8.x/sys/sys/alq.h Mon Oct 20 00:35:02 2008 (r184065) +++ user/lstewart/alq_varlen_8.x/sys/sys/alq.h Mon Oct 20 01:46:54 2008 (r184066) @@ -41,15 +41,15 @@ extern struct thread *ald_thread; * Async. Logging Entry */ struct ale { - struct ale *ae_next; /* Next Entry */ + //struct ale *ae_next; /* Next Entry */ char *ae_data; /* Entry buffer */ - int ae_flags; /* Entry flags */ + int ae_datalen; /* Length of buffer */ + //int ae_flags; /* Entry flags */ }; -#define AE_VALID 0x0001 /* Entry has valid data */ - +//#define AE_VALID 0x0001 /* Entry has valid data */ -/* waitok options */ +/* flags options */ #define ALQ_NOWAIT 0x0001 #define ALQ_WAITOK 0x0002 @@ -88,7 +88,8 @@ int alq_open(struct alq **, const char * * The system is shutting down. * 0 on success. */ -int alq_write(struct alq *alq, void *data, int waitok); +int alq_write(struct alq *alq, void *data, int flags); +int alq_writen(struct alq *alq, void *data, int len, int flags); /* * alq_flush: Flush the queue out to disk @@ -115,13 +116,14 @@ void alq_close(struct alq *alq); * * This leaves the queue locked until a subsequent alq_post. */ -struct ale *alq_get(struct alq *alq, int waitok); +struct ale *alq_get(struct alq *alq, int flags); +struct ale *alq_getn(struct alq *alq, int len, int flags); /* * alq_post: Schedule the ale retrieved by alq_get for writing. * alq The queue to post the entry to. * ale An asynch logging entry returned by alq_get. */ -void alq_post(struct alq *, struct ale *); +void alq_post(struct alq *alq, struct ale *ale); #endif /* _SYS_ALQ_H_ */
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200810200146.m9K1ksJh057210>