Skip site navigation (1)Skip section navigation (2)
Date:      Mon, 20 Oct 2008 01:46:54 +0000 (UTC)
From:      Lawrence Stewart <lstewart@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-user@freebsd.org
Subject:   svn commit: r184066 - in user/lstewart/alq_varlen_8.x/sys: kern modules modules/alq sys
Message-ID:  <200810200146.m9K1ksJh057210@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: lstewart
Date: Mon Oct 20 01:46:54 2008
New Revision: 184066
URL: http://svn.freebsd.org/changeset/base/184066

Log:
  Initial import of a patch to turn ALQ(9) into a KLD and add support for variable
  length messages.
  
  Patch currently compiles fine and appears to be usable in basic testing,
  but needs more extensive testing and perhaps some additional refinement.
  
  Todo:
   - Update ALQ(9) man page
   - Regression testing
  Discussed with:	jeff@, rwatson@

Added:
  user/lstewart/alq_varlen_8.x/sys/modules/alq/
  user/lstewart/alq_varlen_8.x/sys/modules/alq/Makefile   (contents, props changed)
Modified:
  user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c
  user/lstewart/alq_varlen_8.x/sys/modules/Makefile
  user/lstewart/alq_varlen_8.x/sys/sys/alq.h

Modified: user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c
==============================================================================
--- user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c	Mon Oct 20 00:35:02 2008	(r184065)
+++ user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c	Mon Oct 20 01:46:54 2008	(r184066)
@@ -1,5 +1,6 @@
 /*-
  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
+ * Copyright (c) 2008, Lawrence Stewart <lstewart@freebsd.org>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -51,14 +52,18 @@ __FBSDID("$FreeBSD$");
 struct alq {
 	int	aq_entmax;		/* Max entries */
 	int	aq_entlen;		/* Entry length */
+	int	aq_freebytes;		/* Bytes available in buffer */
+	int	aq_buflen;		/* Total length of our buffer */
 	char	*aq_entbuf;		/* Buffer for stored entries */
+	int	aq_writehead;
+	int	aq_writetail;
 	int	aq_flags;		/* Queue flags */
 	struct mtx	aq_mtx;		/* Queue lock */
 	struct vnode	*aq_vp;		/* Open vnode handle */
 	struct ucred	*aq_cred;	/* Credentials of the opening thread */
-	struct ale	*aq_first;	/* First ent */
-	struct ale	*aq_entfree;	/* First free ent */
-	struct ale	*aq_entvalid;	/* First ent valid for writing */
+	//struct ale	*aq_first;	/* First ent */
+	//struct ale	*aq_entfree;	/* First free ent */
+	//struct ale	*aq_entvalid;	/* First ent valid for writing */
 	LIST_ENTRY(alq)	aq_act;		/* List of active queues */
 	LIST_ENTRY(alq)	aq_link;	/* List of all queues */
 };
@@ -182,8 +187,14 @@ ald_daemon(void)
 	ALD_LOCK();
 
 	for (;;) {
-		while ((alq = LIST_FIRST(&ald_active)) == NULL)
-			msleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
+		while ((alq = LIST_FIRST(&ald_active)) == NULL
+				&& !ald_shutingdown)
+			mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
+
+		if (ald_shutingdown) {
+			ALD_UNLOCK();
+			break;
+		}
 
 		ALQ_LOCK(alq);
 		ald_deactivate(alq);
@@ -191,9 +202,11 @@ ald_daemon(void)
 		needwakeup = alq_doio(alq);
 		ALQ_UNLOCK(alq);
 		if (needwakeup)
-			wakeup(alq);
+			wakeup_one(alq);
 		ALD_LOCK();
 	}
+
+	kthread_exit();
 }
 
 static void
@@ -204,6 +217,12 @@ ald_shutdown(void *arg, int howto)
 	ALD_LOCK();
 	ald_shutingdown = 1;
 
+	/* wake ald_daemon so that it exits*/
+	wakeup(&ald_active);
+
+	/* wait for ald_daemon to exit */
+	mtx_sleep(ald_thread, &ald_mtx, PWAIT, "aldslp", 0);
+
 	while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
 		LIST_REMOVE(alq, aq_link);
 		ALD_UNLOCK();
@@ -244,41 +263,45 @@ alq_doio(struct alq *alq)
 	struct vnode *vp;
 	struct uio auio;
 	struct iovec aiov[2];
-	struct ale *ale;
-	struct ale *alstart;
 	int totlen;
 	int iov;
 	int vfslocked;
 
+	KASSERT(alq->aq_freebytes != alq->aq_buflen,
+		("%s: queue emtpy!", __func__)
+	);
+
 	vp = alq->aq_vp;
 	td = curthread;
 	totlen = 0;
 	iov = 0;
 
-	alstart = ale = alq->aq_entvalid;
-	alq->aq_entvalid = NULL;
-
 	bzero(&aiov, sizeof(aiov));
 	bzero(&auio, sizeof(auio));
 
-	do {
-		if (aiov[iov].iov_base == NULL)
-			aiov[iov].iov_base = ale->ae_data;
-		aiov[iov].iov_len += alq->aq_entlen;
-		totlen += alq->aq_entlen;
-		/* Check to see if we're wrapping the buffer */
-		if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data)
-			iov++;
-		ale->ae_flags &= ~AE_VALID;
-		ale = ale->ae_next;
-	} while (ale->ae_flags & AE_VALID);
+	/* start the write from the location of our buffer tail pointer */
+	aiov[iov].iov_base = alq->aq_entbuf + alq->aq_writetail;
+
+	if (alq->aq_writetail < alq->aq_writehead) {
+		/* buffer not wrapped */
+		totlen = aiov[iov].iov_len =  alq->aq_writehead -
+							alq->aq_writetail;
+	} else {
+		/*
+		 * buffer wrapped, requires 2 aiov entries:
+		 * - first is from writetail to end of buffer
+		 * - second is from start of buffer to writehead
+		 */
+		aiov[iov].iov_len =  alq->aq_buflen - alq->aq_writetail;
+		iov++;
+		aiov[iov].iov_base = alq->aq_entbuf;
+		aiov[iov].iov_len =  alq->aq_writehead;
+		totlen = aiov[0].iov_len + aiov[1].iov_len;
+	}
 
 	alq->aq_flags |= AQ_FLUSHING;
 	ALQ_UNLOCK(alq);
 
-	if (iov == 2 || aiov[iov].iov_base == NULL)
-		iov--;
-
 	auio.uio_iov = &aiov[0];
 	auio.uio_offset = 0;
 	auio.uio_segflg = UIO_SYSSPACE;
@@ -308,8 +331,17 @@ alq_doio(struct alq *alq)
 	ALQ_LOCK(alq);
 	alq->aq_flags &= ~AQ_FLUSHING;
 
-	if (alq->aq_entfree == NULL)
-		alq->aq_entfree = alstart;
+	/* Adjust writetail as required, taking into account wrapping */
+	alq->aq_writetail += (iov == 2) ? aiov[1].iov_len : totlen;
+	alq->aq_freebytes += totlen;
+
+	/*
+	 * If we just flushed the buffer completely,
+	 * reset indexes to 0 to minimise buffer wraps
+	 * This is also required to ensure alq_getn() can't wedge itself
+	 */
+	if (alq->aq_freebytes == alq->aq_buflen)
+		alq->aq_writehead = alq->aq_writetail = 0;
 
 	if (alq->aq_flags & AQ_WANTED) {
 		alq->aq_flags &= ~AQ_WANTED;
@@ -340,13 +372,13 @@ alq_open(struct alq **alqp, const char *
 {
 	struct thread *td;
 	struct nameidata nd;
-	struct ale *ale;
-	struct ale *alp;
 	struct alq *alq;
-	char *bufp;
 	int flags;
 	int error;
-	int i, vfslocked;
+	int vfslocked;
+
+	KASSERT(size > 0, ("%s: size <= 0", __func__));
+	KASSERT(count >= 0, ("%s: count < 0", __func__));
 
 	*alqp = NULL;
 	td = curthread;
@@ -365,31 +397,27 @@ alq_open(struct alq **alqp, const char *
 	VFS_UNLOCK_GIANT(vfslocked);
 
 	alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
-	alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO);
-	alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO);
 	alq->aq_vp = nd.ni_vp;
 	alq->aq_cred = crhold(cred);
-	alq->aq_entmax = count;
-	alq->aq_entlen = size;
-	alq->aq_entfree = alq->aq_first;
 
 	mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
 
-	bufp = alq->aq_entbuf;
-	ale = alq->aq_first;
-	alp = NULL;
-
-	/* Match up entries with buffers */
-	for (i = 0; i < count; i++) {
-		if (alp)
-			alp->ae_next = ale;
-		ale->ae_data = bufp;
-		alp = ale;
-		ale++;
-		bufp += size;
+	if (count > 0) {
+		/* fixed length messages */
+		alq->aq_buflen = size * count;
+		alq->aq_entmax = count;
+		alq->aq_entlen = size;
+	} else {
+		/* variable length messages */
+		alq->aq_buflen = size;
+		alq->aq_entmax = 0;
+		alq->aq_entlen = 0;
 	}
 
-	alp->ae_next = alq->aq_first;
+	alq->aq_freebytes = alq->aq_buflen;	
+	alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
+
+	alq->aq_writehead = alq->aq_writetail = 0;
 
 	if ((error = ald_add(alq)) != 0)
 		return (error);
@@ -403,46 +431,180 @@ alq_open(struct alq **alqp, const char *
  * wait or return an error depending on the value of waitok.
  */
 int
-alq_write(struct alq *alq, void *data, int waitok)
+alq_write(struct alq *alq, void *data, int flags)
 {
-	struct ale *ale;
+	/* should only be called in fixed length message (legacy) mode */
+	KASSERT(alq->aq_entmax > 0 && alq->aq_entlen > 0,
+		("%s: fixed length write on variable length queue", __func__)
+	);
+	return (alq_writen(alq, data, alq->aq_entlen, flags));
+}
+
+int
+alq_writen(struct alq *alq, void *data, int len, int flags)
+{
+	int activate = 0;
+	int copy = len;
+
+	KASSERT(len > 0 && len < alq->aq_buflen,
+		("%s: len <= 0 || len > alq->aq_buflen", __func__)
+	);
+
+	ALQ_LOCK(alq);
+
+	/*
+	 * If the message is larger than our underlying buffer or
+	 * there is not enough free space in our underlying buffer
+	 * to accept the message and the user can't wait, return
+	 */
+	if ((len > alq->aq_buflen) ||
+		((flags & ALQ_NOWAIT) && (alq->aq_freebytes < len))) {
+		ALQ_UNLOCK(alq);
+		return (EWOULDBLOCK);
+	}
+
+	/*
+	 * ALQ_WAITOK or alq->aq_freebytes > len,
+	 * either spin until we have enough free bytes (former) or skip (latter)
+	 */
+	while (alq->aq_freebytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) {
+		alq->aq_flags |= AQ_WANTED;
+		msleep_spin(alq, &alq->aq_mtx, "alqwriten", 0);
+	}
 
-	if ((ale = alq_get(alq, waitok)) == NULL)
+	/*
+	 * we need to serialise wakups to ensure records remain in order...
+	 * therefore, wakeup the next thread in the queue waiting for
+	 * alq resources to be available
+	 * (technically this is only required if we actually entered the above
+	 * while loop)
+	 */
+	wakeup_one(alq);
+
+	/* bail if we're shutting down */
+	if (alq->aq_flags & AQ_SHUTDOWN) {
+		ALQ_UNLOCK(alq);
 		return (EWOULDBLOCK);
+	}
+
+	/*
+	 * if we need to wrap the buffer to accommodate the write,
+	 * we'll need 2 calls to bcopy
+	 */
+	if ((alq->aq_buflen - alq->aq_writehead) < len)
+		copy = alq->aq_buflen - alq->aq_writehead;
+
+	/* copy (part of) message to the buffer */
+	bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
+	alq->aq_writehead += copy;
+
+	if (copy != len) {
+		/*
+		 * wrap the buffer by copying the remainder of our message
+		 * to the start of the buffer and resetting the head ptr
+		 */
+		bcopy(data, alq->aq_entbuf, len - copy);
+		alq->aq_writehead = copy;
+	}
 
-	bcopy(data, ale->ae_data, alq->aq_entlen);
-	alq_post(alq, ale);
+	alq->aq_freebytes -= len;
+
+	if ((alq->aq_flags & AQ_ACTIVE) == 0) {
+		alq->aq_flags |= AQ_ACTIVE;
+		activate = 1;
+	}
+
+	ALQ_UNLOCK(alq);
+
+	if (activate) {
+		ALD_LOCK();
+		ald_activate(alq);
+		ALD_UNLOCK();
+	}
 
 	return (0);
 }
 
 struct ale *
-alq_get(struct alq *alq, int waitok)
+alq_get(struct alq *alq, int flags)
+{
+	/* should only be called in fixed length message (legacy) mode */
+	KASSERT(alq->aq_entmax > 0 && alq->aq_entlen > 0,
+		("%s: fixed length get on variable length queue", __func__)
+	);
+	return (alq_getn(alq, alq->aq_entlen, flags));
+}
+
+struct ale *
+alq_getn(struct alq *alq, int len, int flags)
 {
 	struct ale *ale;
-	struct ale *aln;
+	int contigbytes;
 
-	ale = NULL;
+	ale = malloc(	sizeof(struct ale),
+			M_ALD,
+			(flags & ALQ_NOWAIT) ? M_NOWAIT : M_WAITOK
+	);
+
+	if (ale == NULL)
+		return (NULL);
 
 	ALQ_LOCK(alq);
 
-	/* Loop until we get an entry or we're shutting down */
-	while ((alq->aq_flags & AQ_SHUTDOWN) == 0 && 
-	    (ale = alq->aq_entfree) == NULL &&
-	    (waitok & ALQ_WAITOK)) {
-		alq->aq_flags |= AQ_WANTED;
-		msleep_spin(alq, &alq->aq_mtx, "alqget", 0);
+	/* determine the number of free contiguous bytes */
+	if (alq->aq_writehead <= alq->aq_writetail)
+		contigbytes = alq->aq_freebytes;
+	else
+		contigbytes = alq->aq_buflen - alq->aq_writehead;
+
+	/*
+	 * If the message is larger than our underlying buffer or
+	 * there is not enough free contiguous space in our underlying buffer
+	 * to accept the message and the user can't wait, return
+	 */
+	if ((len > alq->aq_buflen) ||
+		((flags & ALQ_NOWAIT) && (contigbytes < len))) {
+		ALQ_UNLOCK(alq);
+		return (NULL);
 	}
 
-	if (ale != NULL) {
-		aln = ale->ae_next;
-		if ((aln->ae_flags & AE_VALID) == 0)
-			alq->aq_entfree = aln;
+	/*
+	 * ALQ_WAITOK or contigbytes > len,
+	 * either spin until we have enough free contiguous bytes (former)
+	 * or skip (latter)
+	 */
+	while (contigbytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) {
+		alq->aq_flags |= AQ_WANTED;
+		msleep_spin(alq, &alq->aq_mtx, "alqgetn", 0);
+		if (alq->aq_writehead <= alq->aq_writetail)
+			contigbytes = alq->aq_freebytes;
 		else
-			alq->aq_entfree = NULL;
-	} else
+			contigbytes = alq->aq_buflen - alq->aq_writehead;
+	}
+
+	/*
+	 * we need to serialise wakups to ensure records remain in order...
+	 * therefore, wakeup the next thread in the queue waiting for
+	 * alq resources to be available
+	 * (technically this is only required if we actually entered the above
+	 * while loop)
+	 */
+	wakeup_one(alq);
+
+	/* bail if we're shutting down */
+	if (alq->aq_flags & AQ_SHUTDOWN) {
 		ALQ_UNLOCK(alq);
+		return (NULL);
+	}
 
+	/*
+	 * If we are here, we have a contiguous number of bytes >= len
+	 * available in our buffer starting at aq_writehead.
+	 */
+	ale->ae_data = alq->aq_entbuf + alq->aq_writehead;
+	ale->ae_datalen = len;
+	alq->aq_writehead += len;
+	alq->aq_freebytes -= len;
 
 	return (ale);
 }
@@ -452,11 +614,6 @@ alq_post(struct alq *alq, struct ale *al
 {
 	int activate;
 
-	ale->ae_flags |= AE_VALID;
-
-	if (alq->aq_entvalid == NULL)
-		alq->aq_entvalid = ale;
-
 	if ((alq->aq_flags & AQ_ACTIVE) == 0) {
 		alq->aq_flags |= AQ_ACTIVE;
 		activate = 1;
@@ -464,11 +621,14 @@ alq_post(struct alq *alq, struct ale *al
 		activate = 0;
 
 	ALQ_UNLOCK(alq);
+
 	if (activate) {
 		ALD_LOCK();
 		ald_activate(alq);
 		ALD_UNLOCK();
 	}
+
+	free(ale, M_ALD);
 }
 
 void
@@ -487,7 +647,7 @@ alq_flush(struct alq *alq)
 	ALQ_UNLOCK(alq);
 
 	if (needwakeup)
-		wakeup(alq);
+		wakeup_one(alq);
 }
 
 /*
@@ -509,7 +669,49 @@ alq_close(struct alq *alq)
 	alq_shutdown(alq);
 
 	mtx_destroy(&alq->aq_mtx);
-	free(alq->aq_first, M_ALD);
 	free(alq->aq_entbuf, M_ALD);
 	free(alq, M_ALD);
 }
+
+static int alq_load_handler(module_t mod, int what, void *arg)
+{
+	int ret = 0;
+
+	switch(what) {
+		case MOD_LOAD:
+		case MOD_UNLOAD:
+		case MOD_SHUTDOWN:
+			break;
+		
+		case MOD_QUIESCE:
+			ALD_LOCK();
+			/* only allow unload if there are no open queues */
+			if (LIST_FIRST(&ald_queues) == NULL) {
+				ald_shutingdown = 1;
+				ALD_UNLOCK();
+				ald_shutdown(NULL, 0);
+				mtx_destroy(&ald_mtx);
+			} else {
+				ALD_UNLOCK();
+				ret = EBUSY;
+			}
+			break;
+		
+		default:
+			ret = EINVAL;
+			break;
+	}
+
+	return (ret);
+}
+
+/* basic module data */
+static moduledata_t alq_mod =
+{
+	"alq",
+	alq_load_handler, /* execution entry point for the module */
+	NULL
+};
+
+DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
+MODULE_VERSION(alq, 1);

Modified: user/lstewart/alq_varlen_8.x/sys/modules/Makefile
==============================================================================
--- user/lstewart/alq_varlen_8.x/sys/modules/Makefile	Mon Oct 20 00:35:02 2008	(r184065)
+++ user/lstewart/alq_varlen_8.x/sys/modules/Makefile	Mon Oct 20 01:46:54 2008	(r184066)
@@ -17,6 +17,7 @@ SUBDIR=	${_3dfx} \
 	${_aic} \
 	aic7xxx \
 	aio \
+	alq \
 	${_amd} \
 	amr \
 	${_an} \

Added: user/lstewart/alq_varlen_8.x/sys/modules/alq/Makefile
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ user/lstewart/alq_varlen_8.x/sys/modules/alq/Makefile	Mon Oct 20 01:46:54 2008	(r184066)
@@ -0,0 +1,10 @@
+# $FreeBSD$
+
+.include <bsd.own.mk>
+
+.PATH:  ${.CURDIR}/../../kern
+KMOD=alq
+SRCS=opt_mac.h vnode_if.h kern_alq.c
+
+.include <bsd.kmod.mk> 
+

Modified: user/lstewart/alq_varlen_8.x/sys/sys/alq.h
==============================================================================
--- user/lstewart/alq_varlen_8.x/sys/sys/alq.h	Mon Oct 20 00:35:02 2008	(r184065)
+++ user/lstewart/alq_varlen_8.x/sys/sys/alq.h	Mon Oct 20 01:46:54 2008	(r184066)
@@ -41,15 +41,15 @@ extern struct thread *ald_thread;
  * Async. Logging Entry
  */
 struct ale {
-	struct ale	*ae_next;	/* Next Entry */
+	//struct ale	*ae_next;	/* Next Entry */
 	char		*ae_data;	/* Entry buffer */
-	int		ae_flags;	/* Entry flags */
+	int		ae_datalen;	/* Length of buffer */
+	//int		ae_flags;	/* Entry flags */
 };
 
-#define	AE_VALID	0x0001		/* Entry has valid data */
- 
+//#define	AE_VALID	0x0001		/* Entry has valid data */
 
-/* waitok options */
+/* flags options */
 #define	ALQ_NOWAIT	0x0001
 #define	ALQ_WAITOK	0x0002
 
@@ -88,7 +88,8 @@ int alq_open(struct alq **, const char *
  *		The system is shutting down.
  *	0 on success.
  */
-int alq_write(struct alq *alq, void *data, int waitok);
+int alq_write(struct alq *alq, void *data, int flags);
+int alq_writen(struct alq *alq, void *data, int len, int flags);
 
 /*
  * alq_flush:  Flush the queue out to disk
@@ -115,13 +116,14 @@ void alq_close(struct alq *alq);
  *
  * This leaves the queue locked until a subsequent alq_post.
  */
-struct ale *alq_get(struct alq *alq, int waitok);
+struct ale *alq_get(struct alq *alq, int flags);
+struct ale *alq_getn(struct alq *alq, int len, int flags);
 
 /*
  * alq_post:  Schedule the ale retrieved by alq_get for writing.
  *	alq	The queue to post the entry to.
  *	ale	An asynch logging entry returned by alq_get.
  */
-void alq_post(struct alq *, struct ale *);
+void alq_post(struct alq *alq, struct ale *ale);
 
 #endif	/* _SYS_ALQ_H_ */



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200810200146.m9K1ksJh057210>