Skip site navigation (1)Skip section navigation (2)
Date:      Wed, 10 Mar 2010 05:06:47 +0000 (UTC)
From:      Lawrence Stewart <lstewart@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-user@freebsd.org
Subject:   svn commit: r204940 - in user/lstewart/alq_varlen_head: share/man/man9 sys/kern sys/sys
Message-ID:  <201003100506.o2A56lOs036765@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: lstewart
Date: Wed Mar 10 05:06:47 2010
New Revision: 204940
URL: http://svn.freebsd.org/changeset/base/204940

Log:
  Add variable length message support to ALQ(9) (imported from the
  tcp_ffcaia2008_head project branch with tweaks). The KPI is completely backwards
  compatible and described in the updated man page. Invariants testing has been
  strengthened and style nits have also been addressed.
  
  Sponsored by:	FreeBSD Foundation

Modified:
  user/lstewart/alq_varlen_head/share/man/man9/alq.9
  user/lstewart/alq_varlen_head/sys/kern/kern_alq.c
  user/lstewart/alq_varlen_head/sys/sys/alq.h

Modified: user/lstewart/alq_varlen_head/share/man/man9/alq.9
==============================================================================
--- user/lstewart/alq_varlen_head/share/man/man9/alq.9	Wed Mar 10 02:17:57 2010	(r204939)
+++ user/lstewart/alq_varlen_head/share/man/man9/alq.9	Wed Mar 10 05:06:47 2010	(r204940)
@@ -1,7 +1,13 @@
 .\"
 .\" Copyright (c) 2003 Hiten Pandya <hmp@FreeBSD.org>
+.\" Copyright (c) 2009-2010 The FreeBSD Foundation
 .\" All rights reserved.
 .\"
+.\" Portions of this software were developed at the Centre for Advanced
+.\" Internet Architectures, Swinburne University of Technology, Melbourne,
+.\" Australia by Lawrence Stewart under sponsorship from the FreeBSD
+.\" Foundation.
+.\"
 .\" Redistribution and use in source and binary forms, with or without
 .\" modification, are permitted provided that the following conditions
 .\" are met:
@@ -25,17 +31,20 @@
 .\"
 .\" $FreeBSD$
 .\"
-.Dd May 16, 2003
+.Dd March 7, 2010
 .Dt ALQ 9
 .Os
 .Sh NAME
 .Nm alq ,
 .Nm alq_open ,
 .Nm alq_write ,
+.Nm alq_writen ,
 .Nm alq_flush ,
 .Nm alq_close ,
 .Nm alq_get ,
-.Nm alq_post
+.Nm alq_getn ,
+.Nm alq_post ,
+.Nm alq_postn
 .Nd Asynchronous Logging Queues
 .Sh SYNOPSIS
 .In sys/alq.h
@@ -49,19 +58,25 @@
 .Fa "int count"
 .Fc
 .Ft int
-.Fn alq_write "struct alq *alq" "void *data" "int waitok"
+.Fn alq_write "struct alq *alq" "void *data" "int flags"
+.Ft int
+.Fn alq_writen "struct alq *alq" "void *data" "int len" "int flags"
 .Ft void
 .Fn alq_flush "struct alq *alq"
 .Ft void
 .Fn alq_close "struct alq *alq"
 .Ft struct ale *
-.Fn alq_get "struct alq *alq" "int waitok"
+.Fn alq_get "struct alq *alq" "int flags"
+.Ft struct ale *
+.Fn alq_getn "struct alq *alq" "int len" "int flags"
 .Ft void
 .Fn alq_post "struct alq *alq" "struct ale *ale"
+.Ft void
+.Fn alq_postn "struct alq *alq" "struct ale *ale" "int flags"
 .Sh DESCRIPTION
 The
 .Nm
-facility provides an asynchronous fixed length recording
+facility provides an asynchronous fixed or variable length recording
 mechanism, known as Asynchronous Logging Queues.
 It can record to any
 .Xr vnode 9 ,
@@ -81,26 +96,37 @@ is defined as
 which has the following members:
 .Bd -literal -offset indent
 struct ale {
-	struct ale	*ae_next;	/* Next Entry */
-	char		*ae_data;	/* Entry buffer */
-	int		ae_flags;	/* Entry flags */
+	intptr_t	ae_bytesused;	/* # bytes written to ALE. */
+	char		*ae_data;	/* Write ptr. */
+	int		ae_pad;		/* Unused, compat. */
 };
 .Ed
 .Pp
-The
-.Va ae_flags
-field is for internal use, clients of the
+An
+.Nm
+can be created in either fixed or variable length mode.
+A variable length
 .Nm
-interface should not modify this field.
-Behaviour is undefined if this field is modified.
+accommodates writes of varying length using
+.Fn alq_writen
+and
+.Fn alq_getn .
+A fixed length
+.Nm
+accommodates a fixed number of writes using
+.Fn alq_write
+and
+.Fn alq_get ,
+each of fixed size (set at queue creation time).
+Fixed length mode is deprecated in favour of variable length mode.
 .Sh FUNCTIONS
 The
 .Fn alq_open
-function creates a new logging queue.
+function creates a new asynchronous logging queue.
 The
 .Fa file
-argument is the name of the file to open for logging; if the file does not
-yet exist,
+argument is the name of the file to open for logging.
+If the file does not yet exist,
 .Fn alq_open
 will attempt to create it.
 The
@@ -115,30 +141,68 @@ a default creation mode suitable for mos
 The argument
 .Fa cred
 specifies the credentials to use when opening and performing I/O on the file.
-The size of each entry in the queue is determined by
-.Fa size .
-The
+To create a variable length mode
+.Nm ,
+the
+.Fa size
+argument should be set to the size (in bytes) of the underlying queue and the
+.Fa count
+argument should be set to 0.
+To create a fixed length mode
+.Nm ,
+the
+.Fa size
+argument should be set to the size (in bytes) of each write and the
 .Fa count
-argument determines the number of items to be stored in the
-asynchronous queue over an approximate period of a disk
-write operation.
+argument should be set to the number of
+.Fa size
+byte chunks to reserve capacity for.
 .Pp
 The
-.Fn alq_write
+.Fn alq_writen
 function writes
+.Fa len
+bytes from
 .Fa data
-to the designated queue,
+to the designated variable length mode queue
 .Fa alq .
-In the event that
-.Fn alq_write
-could not write the entry immediately, and
+If
+.Fn alq_writen
+could not write the entry immediately and
 .Dv ALQ_WAITOK
-is passed to
-.Fa waitok ,
-then
+is set in
+.Fa flags ,
+the function will be allowed to
+.Xr msleep_spin 9
+with the
+.Dq Li alqwriten
+wait message.
+A write will automatically schedule the queue
+.Fa alq
+to be flushed to disk.
+This behaviour can be controlled by passing ALQ_NOACTIVATE via
+.Fa flags
+to indicate that the write should not schedule
+.Fa alq
+to be flushed to disk.
+.Pp
+The
 .Fn alq_write
-will be allowed to
-.Xr tsleep 9 .
+function is implemented as a wrapper around
+.Fn alq_writen
+to provide backwards compatibility to consumers that have not been updated to
+utilise variable length mode queues.
+The function will write
+.Fa size
+bytes of data (where
+.Fa size
+was specified at queue creation time) from the
+.Fa data
+buffer to the
+.Fa alq .
+Note that it is an error to call
+.Fn alq_write
+on a variable length mode queue.
 .Pp
 The
 .Fn alq_flush
@@ -146,61 +210,138 @@ function is used for flushing
 .Fa alq
 to the log medium that was passed to
 .Fn alq_open .
+If
+.Fa alq
+has data to flush and is not already in the process of being flushed, the
+function will block doing IO.
+Otherwise, the function will return immediately.
 .Pp
 The
 .Fn alq_close
-function will close the asynchronous logging queue,
-.Fa alq ,
+function will close the asynchronous logging queue
+.Fa alq
 and flush all pending write requests to the log medium.
 It will free all resources that were previously allocated.
 .Pp
 The
-.Fn alq_get
-function returns the next available asynchronous logging entry
-from the queue,
-.Fa alq .
-This function leaves the queue in a locked state, until a subsequent
+.Fn alq_getn
+function returns an asynchronous log entry from
+.Fa alq ,
+initialised to point at a buffer capable of receiving
+.Fa len
+bytes of data.
+This function leaves
+.Fa alq
+in a locked state, until a subsequent
 .Fn alq_post
+or
+.Fn alq_postn
 call is made.
-In the event that
-.Fn alq_get
-could not retrieve an entry immediately, it will
-.Xr tsleep 9
+If
+.Fn alq_getn
+could not obtain
+.Fa len
+bytes of buffer immediately and
+.Dv ALQ_WAITOK
+is set in
+.Fa flags ,
+the function will be allowed to
+.Xr msleep_spin 9
 with the
-.Dq Li alqget
+.Dq Li alqgetn
 wait message.
+The caller can choose to write less than
+.Fa len
+bytes of data to the returned asynchronous log entry by setting the entry's
+ae_bytesused field to the number of bytes actually written.
+This must be done prior to calling
+.Fn alq_post .
 .Pp
 The
-.Fn alq_post
-function schedules the asynchronous logging entry,
-.Fa ale ,
-which is retrieved using the
 .Fn alq_get
-function,
-for writing to the asynchronous logging queue,
+function is implemented as a wrapper around
+.Fn alq_getn
+to provide backwards compatibility to consumers that have not been updated to
+utilise variable length mode queues.
+The asynchronous log entry returned will be initialised to point at a buffer
+capable of receiving
+.Fa size
+bytes of data (where
+.Fa size
+was specified at queue creation time).
+Note that it is an error to call
+.Fn alq_get
+on a variable length mode queue.
+.Pp
+The
+.Fn alq_postn
+function schedules the asynchronous log entry
+.Fa ale
+(obtained from
+.Fn alq_getn
+or
+.Fn alq_get )
+for writing to
 .Fa alq .
-This function leaves the queue,
-.Fa alq ,
+The ALQ_NOACTIVATE flag may be passed in via
+.Fa flags
+to indicate that the queue should not be immediately scheduled to be flushed to
+disk.
+This function leaves
+.Fa alq
 in an unlocked state.
+.Pp
+The
+.Fn alq_post
+function is implemented as a wrapper around
+.Fn alq_postn
+to provide backwards compatibility to consumers that have not been updated to
+utilise the newer
+.Fn alq_postn
+function.
+It simply passes
+.Fa alq
+and
+.Fa ale
+through to
+.Fn alq_postn
+untouched, and sets
+.Fa flags
+to 0.
 .Sh IMPLEMENTATION NOTES
 The
+.Fn alq_writen
+and
 .Fn alq_write
-function is a wrapper around the
+functions both perform a
+.Xr bcopy 3
+from the supplied
+.Fa data
+buffer into the underlying
+.Nm
+buffer.
+Performance critical code paths may wish to consider using
+.Fn alq_getn
+(variable length queues) or
+.Fn alq_get
+(fixed length queues) to avoid the extra memory copy. Note that a queue
+remains locked between calls to
+.Fn alq_getn
+or
 .Fn alq_get
 and
 .Fn alq_post
-functions; by using these functions separately, a call
-to
-.Fn bcopy
-can be avoided for performance critical code paths.
+or
+.Fn alq_postn ,
+so this method of writing to a queue is unsuitable for situations where the
+time between calls may be substantial.
 .Sh LOCKING
-Each asynchronous queue is protected by a spin mutex.
+Each asynchronous logging queue is protected by a spin mutex.
 .Pp
 Functions
-.Fn alq_flush ,
-.Fn alq_open
+.Fn alq_flush
 and
-.Fn alq_post
+.Fn alq_open
 may attempt to acquire an internal sleep mutex, and should
 consequently not be used in contexts where sleeping is
 not allowed.
@@ -214,32 +355,36 @@ if it fails to open
 or else it returns 0.
 .Pp
 The
+.Fn alq_writen
+and
 .Fn alq_write
-function returns
+functions return
 .Er EWOULDBLOCK
 if
 .Dv ALQ_NOWAIT
-was provided as a value to
-.Fa waitok
-and either the queue is full, or when the system is shutting down.
+was set in
+.Fa flags
+and either the queue is full or the system is shutting down.
 .Pp
 The
+.Fn alq_getn
+and
 .Fn alq_get
-function returns
-.Dv NULL ,
+functions return
+.Dv NULL
 if
 .Dv ALQ_NOWAIT
-was provided as a value to
-.Fa waitok
-and either the queue is full, or when the system is shutting down.
+was set in
+.Fa flags
+and either the queue is full or the system is shutting down.
 .Pp
 NOTE: invalid arguments to non-void functions will result in
 undefined behaviour.
 .Sh SEE ALSO
-.Xr syslog 3 ,
-.Xr kthread 9 ,
+.Xr kproc 9 ,
 .Xr ktr 9 ,
-.Xr tsleep 9 ,
+.Xr msleep_spin 9 ,
+.Xr syslog 3 ,
 .Xr vnode 9
 .Sh HISTORY
 The
@@ -250,7 +395,11 @@ Asynchronous Logging Queues (ALQ) facili
 The
 .Nm
 facility was written by
-.An Jeffrey Roberson Aq jeff@FreeBSD.org .
+.An Jeffrey Roberson Aq jeff@FreeBSD.org
+and extended by
+.An Lawrence Stewart Aq lstewart@freebsd.org .
 .Pp
 This manual page was written by
-.An Hiten Pandya Aq hmp@FreeBSD.org .
+.An Hiten Pandya Aq hmp@FreeBSD.org
+and revised by
+.An Lawrence Stewart Aq lstewart@freebsd.org .

Modified: user/lstewart/alq_varlen_head/sys/kern/kern_alq.c
==============================================================================
--- user/lstewart/alq_varlen_head/sys/kern/kern_alq.c	Wed Mar 10 02:17:57 2010	(r204939)
+++ user/lstewart/alq_varlen_head/sys/kern/kern_alq.c	Wed Mar 10 05:06:47 2010	(r204940)
@@ -56,14 +56,17 @@ __FBSDID("$FreeBSD$");
 struct alq {
 	int	aq_entmax;		/* Max entries */
 	int	aq_entlen;		/* Entry length */
+	int	aq_freebytes;		/* Bytes available in buffer */
+	int	aq_buflen;		/* Total length of our buffer */
 	char	*aq_entbuf;		/* Buffer for stored entries */
+	int	aq_writehead;		/* Location for next write */
+	int	aq_writetail;		/* Flush starts at this location */
+	int	aq_wrapearly;		/* # bytes left blank at end of buf */
 	int	aq_flags;		/* Queue flags */
+	struct	ale	aq_getpost;	/* ALE for use by get/post */
 	struct mtx	aq_mtx;		/* Queue lock */
 	struct vnode	*aq_vp;		/* Open vnode handle */
 	struct ucred	*aq_cred;	/* Credentials of the opening thread */
-	struct ale	*aq_first;	/* First ent */
-	struct ale	*aq_entfree;	/* First free ent */
-	struct ale	*aq_entvalid;	/* First ent valid for writing */
 	LIST_ENTRY(alq)	aq_act;		/* List of active queues */
 	LIST_ENTRY(alq)	aq_link;	/* List of all queues */
 };
@@ -76,6 +79,8 @@ struct alq {
 #define	ALQ_LOCK(alq)	mtx_lock_spin(&(alq)->aq_mtx)
 #define	ALQ_UNLOCK(alq)	mtx_unlock_spin(&(alq)->aq_mtx)
 
+#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
+
 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
 
 /*
@@ -90,7 +95,7 @@ static struct proc *ald_proc;
 #define	ALD_LOCK()	mtx_lock(&ald_mtx)
 #define	ALD_UNLOCK()	mtx_unlock(&ald_mtx)
 
-/* Daemon functions */
+/* Daemon functions. */
 static int ald_add(struct alq *);
 static int ald_rem(struct alq *);
 static void ald_startup(void *);
@@ -99,7 +104,7 @@ static void ald_shutdown(void *, int);
 static void ald_activate(struct alq *);
 static void ald_deactivate(struct alq *);
 
-/* Internal queue functions */
+/* Internal queue functions. */
 static void alq_shutdown(struct alq *);
 static void alq_destroy(struct alq *);
 static int alq_doio(struct alq *);
@@ -125,7 +130,7 @@ ald_add(struct alq *alq)
 
 /*
  * Remove a queue from the global list unless we're shutting down.  If so,
- * the ald will take care of cleaning up it's resources.
+ * the ALD will take care of cleaning up its resources.
  */
 static int
 ald_rem(struct alq *alq)
@@ -242,7 +247,21 @@ alq_shutdown(struct alq *alq)
 	/* Stop any new writers. */
 	alq->aq_flags |= AQ_SHUTDOWN;
 
-	/* Drain IO */
+	/*
+	 * If the ALQ isn't active but has unwritten data (possible if
+	 * the ALQ_NOACTIVATE flag has been used), explicitly activate the
+	 * ALQ here so that the pending data gets flushed by the ald_daemon.
+	 */
+	if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
+		alq->aq_flags |= AQ_ACTIVE;
+		ALQ_UNLOCK(alq);
+		ALD_LOCK();
+		ald_activate(alq);
+		ALD_UNLOCK();
+		ALQ_LOCK(alq);
+	}
+
+	/* Drain IO. */
 	while (alq->aq_flags & AQ_ACTIVE) {
 		alq->aq_flags |= AQ_WANTED;
 		msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
@@ -260,7 +279,6 @@ alq_destroy(struct alq *alq)
 	alq_shutdown(alq);
 
 	mtx_destroy(&alq->aq_mtx);
-	free(alq->aq_first, M_ALD);
 	free(alq->aq_entbuf, M_ALD);
 	free(alq, M_ALD);
 }
@@ -276,58 +294,62 @@ alq_doio(struct alq *alq)
 	struct vnode *vp;
 	struct uio auio;
 	struct iovec aiov[2];
-	struct ale *ale;
-	struct ale *alstart;
 	int totlen;
 	int iov;
 	int vfslocked;
+	int wrapearly;
+
+	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue emtpy!", __func__));
 
 	vp = alq->aq_vp;
 	td = curthread;
 	totlen = 0;
-	iov = 0;
-
-	alstart = ale = alq->aq_entvalid;
-	alq->aq_entvalid = NULL;
+	iov = 1;
+	wrapearly = alq->aq_wrapearly;
 
 	bzero(&aiov, sizeof(aiov));
 	bzero(&auio, sizeof(auio));
 
-	do {
-		if (aiov[iov].iov_base == NULL)
-			aiov[iov].iov_base = ale->ae_data;
-		aiov[iov].iov_len += alq->aq_entlen;
-		totlen += alq->aq_entlen;
-		/* Check to see if we're wrapping the buffer */
-		if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data)
-			iov++;
-		ale->ae_flags &= ~AE_VALID;
-		ale = ale->ae_next;
-	} while (ale->ae_flags & AE_VALID);
+	/* Start the write from the location of our buffer tail pointer. */
+	aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
+
+	if (alq->aq_writetail < alq->aq_writehead) {
+		/* Buffer not wrapped. */
+		totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
+	} else if (alq->aq_writehead == 0) {
+		/* Buffer not wrapped (special case to avoid an empty iov). */
+		totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
+		    wrapearly;
+	} else {
+		/*
+		 * Buffer wrapped, requires 2 aiov entries:
+		 * - first is from writetail to end of buffer
+		 * - second is from start of buffer to writehead
+		 */
+		aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
+		    wrapearly;
+		iov++;
+		aiov[1].iov_base = alq->aq_entbuf;
+		aiov[1].iov_len =  alq->aq_writehead;
+		totlen = aiov[0].iov_len + aiov[1].iov_len;
+	}
 
 	alq->aq_flags |= AQ_FLUSHING;
 	ALQ_UNLOCK(alq);
 
-	if (iov == 2 || aiov[iov].iov_base == NULL)
-		iov--;
-
 	auio.uio_iov = &aiov[0];
 	auio.uio_offset = 0;
 	auio.uio_segflg = UIO_SYSSPACE;
 	auio.uio_rw = UIO_WRITE;
-	auio.uio_iovcnt = iov + 1;
+	auio.uio_iovcnt = iov;
 	auio.uio_resid = totlen;
 	auio.uio_td = td;
 
-	/*
-	 * Do all of the junk required to write now.
-	 */
+	/* Do all of the junk required to write now. */
 	vfslocked = VFS_LOCK_GIANT(vp->v_mount);
 	vn_start_write(vp, &mp, V_WAIT);
 	vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
-	/*
-	 * XXX: VOP_WRITE error checks are ignored.
-	 */
+	/* XXX: VOP_WRITE error checks are ignored. */
 #ifdef MAC
 	if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
 #endif
@@ -339,8 +361,28 @@ alq_doio(struct alq *alq)
 	ALQ_LOCK(alq);
 	alq->aq_flags &= ~AQ_FLUSHING;
 
-	if (alq->aq_entfree == NULL)
-		alq->aq_entfree = alstart;
+	/* Adjust writetail as required, taking into account wrapping. */
+	alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
+	    alq->aq_buflen;
+	alq->aq_freebytes += totlen + wrapearly;
+
+	/*
+	 * If we just flushed part of the buffer which wrapped, reset the
+	 * wrapearly indicator.
+	 */
+	if (wrapearly)
+		alq->aq_wrapearly = 0;
+
+	/*
+	 * If we just flushed the buffer completely,
+	 * reset indexes to 0 to minimise buffer wraps.
+	 * This is also required to ensure alq_getn() can't wedge itself.
+	 */
+	if (!HAS_PENDING_DATA(alq))
+		alq->aq_writehead = alq->aq_writetail = 0;
+
+	KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
+	    ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
 
 	if (alq->aq_flags & AQ_WANTED) {
 		alq->aq_flags &= ~AQ_WANTED;
@@ -360,7 +402,7 @@ SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, 
 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
 
 
-/* User visible queue functions */
+/* User visible queue functions. */
 
 /*
  * Create the queue data structure, allocate the buffer, and open the file.
@@ -371,13 +413,13 @@ alq_open(struct alq **alqp, const char *
 {
 	struct thread *td;
 	struct nameidata nd;
-	struct ale *ale;
-	struct ale *alp;
 	struct alq *alq;
-	char *bufp;
 	int flags;
 	int error;
-	int i, vfslocked;
+	int vfslocked;
+
+	KASSERT((size > 0), ("%s: size <= 0", __func__));
+	KASSERT((count >= 0), ("%s: count < 0", __func__));
 
 	*alqp = NULL;
 	td = curthread;
@@ -391,36 +433,31 @@ alq_open(struct alq **alqp, const char *
 
 	vfslocked = NDHASGIANT(&nd);
 	NDFREE(&nd, NDF_ONLY_PNBUF);
-	/* We just unlock so we hold a reference */
+	/* We just unlock so we hold a reference. */
 	VOP_UNLOCK(nd.ni_vp, 0);
 	VFS_UNLOCK_GIANT(vfslocked);
 
 	alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
-	alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO);
-	alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO);
 	alq->aq_vp = nd.ni_vp;
 	alq->aq_cred = crhold(cred);
-	alq->aq_entmax = count;
-	alq->aq_entlen = size;
-	alq->aq_entfree = alq->aq_first;
 
 	mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
 
-	bufp = alq->aq_entbuf;
-	ale = alq->aq_first;
-	alp = NULL;
-
-	/* Match up entries with buffers */
-	for (i = 0; i < count; i++) {
-		if (alp)
-			alp->ae_next = ale;
-		ale->ae_data = bufp;
-		alp = ale;
-		ale++;
-		bufp += size;
+	if (count > 0) {
+		/* Fixed length messages. */
+		alq->aq_buflen = size * count;
+		alq->aq_entmax = count;
+		alq->aq_entlen = size;
+	} else {
+		/* Variable length messages. */
+		alq->aq_buflen = size;
+		alq->aq_entmax = 0;
+		alq->aq_entlen = 0;
 	}
 
-	alp->ae_next = alq->aq_first;
+	alq->aq_freebytes = alq->aq_buflen;
+	alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
+	alq->aq_writehead = alq->aq_writetail = 0;
 
 	if ((error = ald_add(alq)) != 0) {
 		alq_destroy(alq);
@@ -437,39 +474,205 @@ alq_open(struct alq **alqp, const char *
  * wait or return an error depending on the value of waitok.
  */
 int
-alq_write(struct alq *alq, void *data, int waitok)
+alq_writen(struct alq *alq, void *data, int len, int flags)
 {
-	struct ale *ale;
+	int activate, copy;
+
+	KASSERT((len > 0 && len <= alq->aq_buflen),
+	    ("%s: len <= 0 || len > aq_buflen", __func__));
+
+	activate = 0;
+	copy = len;
+
+	ALQ_LOCK(alq);
+
+	/*
+	 * Fail to perform the write and return EWOULDBLOCK if:
+	 * - The message is larger than our underlying buffer.
+	 * - There is insufficient free space in our underlying buffer
+	 *   to accept the message and the user can't wait for space.
+	 * - There is insufficient free space in our underlying buffer
+	 *   to accept the message and the alq is inactive due to prior
+	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
+	 */
+	if (len > alq->aq_buflen ||
+	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
+	    HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
+		ALQ_UNLOCK(alq);
+		return (EWOULDBLOCK);
+	}
+
+	/*
+	 * ALQ_WAITOK or alq->aq_freebytes > len, either spin until
+	 * we have enough free bytes (former) or skip (latter). However in the
+	 * latter case, we can't skip if other threads are already
+	 * waiting (AQ_WANTED is set), otherwise records can get out of order.
+	 */
+	while ((alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN))
+	    || alq->aq_flags & AQ_WANTED) {
+		alq->aq_flags |= AQ_WANTED;
+		msleep_spin(alq, &alq->aq_mtx, "alqwriten", 0);
+		KASSERT(!(alq->aq_flags & AQ_WANTED),
+		    ("AQ_WANTED should have been unset!"));
+	}
+
+	/*
+	 * We need to serialise wakeups to ensure records remain in order.
+	 * Therefore, wakeup the next thread in the queue waiting for
+	 * ALQ resources to be available.
+	 * (Technically this is only required if we actually entered the above
+	 * while loop.)
+	 */
+	wakeup_one(alq);
 
-	if ((ale = alq_get(alq, waitok)) == NULL)
+	/* Bail if we're shutting down. */
+	if (alq->aq_flags & AQ_SHUTDOWN) {
+		ALQ_UNLOCK(alq);
 		return (EWOULDBLOCK);
+	}
+
+	/*
+	 * If we need to wrap the buffer to accommodate the write,
+	 * we'll need 2 calls to bcopy.
+	 */
+	if ((alq->aq_buflen - alq->aq_writehead) < len)
+		copy = alq->aq_buflen - alq->aq_writehead;
+
+	/* Copy message (or part thereof if wrap required) to the buffer. */
+	bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
+	alq->aq_writehead += copy;
+
+	if (alq->aq_writehead >= alq->aq_buflen) {
+		KASSERT((alq->aq_writehead == alq->aq_buflen),
+		    ("alq->aq_writehead (%d) > alq->aq_buflen (%d)",
+		    alq->aq_writehead,
+		    alq->aq_buflen));
+		alq->aq_writehead = 0;
+	}
+
+	if (copy != len) {
+		/*
+		 * Wrap the buffer by copying the remainder of our message
+		 * to the start of the buffer and resetting aq_writehead.
+		 */
+		bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
+		alq->aq_writehead = len - copy;
+	}
+
+	KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
+	    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
+
+	alq->aq_freebytes -= len;
+
+	if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
+		alq->aq_flags |= AQ_ACTIVE;
+		activate = 1;
+	}
+
+	ALQ_UNLOCK(alq);
 
-	bcopy(data, ale->ae_data, alq->aq_entlen);
-	alq_post(alq, ale);
+	if (activate) {
+		ALD_LOCK();
+		ald_activate(alq);
+		ALD_UNLOCK();
+	}
 
 	return (0);
 }
 
+int
+alq_write(struct alq *alq, void *data, int flags)
+{
+	/* Should only be called in fixed length message (legacy) mode. */
+	KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
+	    ("%s: fixed length write on variable length queue", __func__));
+	return (alq_writen(alq, data, alq->aq_entlen, flags));
+}
+
+/*
+ * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
+ */
 struct ale *
-alq_get(struct alq *alq, int waitok)
+alq_getn(struct alq *alq, int len, int flags)
 {
-	struct ale *ale;
-	struct ale *aln;
+	int contigbytes;
 
-	ale = NULL;
+	KASSERT((len > 0 && len <= alq->aq_buflen),
+	    ("%s: len <= 0 || len > alq->aq_buflen", __func__));
 
 	ALQ_LOCK(alq);
 
-	/* Loop until we get an entry or we're shutting down */
-	while ((alq->aq_flags & AQ_SHUTDOWN) == 0 && 
-	    (ale = alq->aq_entfree) == NULL &&
-	    (waitok & ALQ_WAITOK)) {
+	/*
+	 * Determine the number of free contiguous bytes.
+	 * We ensure elsewhere that if aq_writehead == aq_writetail because
+	 * the buffer is empty, they will both be set to 0 and therefore
+	 * aq_freebytes == aq_buflen and is fully contiguous.
+	 * If they are equal and the buffer is not empty, aq_freebytes will
+	 * be 0 indicating the buffer is full.
+	 */
+	if (alq->aq_writehead <= alq->aq_writetail)
+		contigbytes = alq->aq_freebytes;
+	else {
+		contigbytes = alq->aq_buflen - alq->aq_writehead;
+
+		if (contigbytes < len) {
+			/*
+			 * Insufficient space at end of buffer to handle a
+			 * contiguous write. Wrap early if there's space at
+			 * the beginning. This will leave a hole at the end
+			 * of the buffer which we will have to skip over when
+			 * flushing the buffer to disk.
+			 */
+			if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
+				/* Keep track of # bytes left blank. */
+				alq->aq_wrapearly = contigbytes;
+				/* Do the wrap and adjust counters. */
+				contigbytes = alq->aq_freebytes =
+				    alq->aq_writetail;
+				alq->aq_writehead = 0;
+			}
+		}
+	}
+
+	/*
+	 * Return a NULL ALE if:
+	 * - The message is larger than our underlying buffer.
+	 * - There is insufficient free space in our underlying buffer
+	 *   to accept the message and the user can't wait for space.
+	 * - There is insufficient free space in our underlying buffer
+	 *   to accept the message and the alq is inactive due to prior
+	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
+	 */
+	if (len > alq->aq_buflen ||
+	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
+	    HAS_PENDING_DATA(alq))) && contigbytes < len)) {
+		ALQ_UNLOCK(alq);
+		return (NULL);
+	}
+
+	/*
+	 * ALQ_WAITOK or contigbytes >= len,
+	 * either spin until we have enough free contiguous bytes (former)
+	 * or skip (latter). However, in the latter case, we can't skip if
+	 * other threads are already waiting (AQ_WANTED is set), otherwise
+	 * records can get out of order.
+	 */
+	while ((contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN))
+	    || alq->aq_flags & AQ_WANTED) {
 		alq->aq_flags |= AQ_WANTED;
-		msleep_spin(alq, &alq->aq_mtx, "alqget", 0);
+		msleep_spin(alq, &alq->aq_mtx, "alqgetn", 0);
+
+		KASSERT(!(alq->aq_flags & AQ_WANTED),
+		    ("AQ_WANTED should have been unset!"));
+
+		if (alq->aq_writehead <= alq->aq_writetail)
+			contigbytes = alq->aq_freebytes;
+		else
+			contigbytes = alq->aq_buflen - alq->aq_writehead;
 	}
 
 	/*
-	 * We need to serialise wakups to ensure records remain in order...
+	 * We need to serialise wakeups to ensure records remain in order.
 	 * Therefore, wakeup the next thread in the queue waiting for
 	 * ALQ resources to be available.
 	 * (Technically this is only required if we actually entered the above
@@ -477,36 +680,55 @@ alq_get(struct alq *alq, int waitok)
 	 */
 	wakeup_one(alq);
 
-	if (ale != NULL) {
-		aln = ale->ae_next;
-		if ((aln->ae_flags & AE_VALID) == 0)
-			alq->aq_entfree = aln;
-		else
-			alq->aq_entfree = NULL;
-	} else
+	/* Bail if we're shutting down. */
+	if (alq->aq_flags & AQ_SHUTDOWN) {
 		ALQ_UNLOCK(alq);
+		return (NULL);
+	}
+
+	/*
+	 * If we are here, we have a contiguous number of bytes >= len
+	 * available in our buffer starting at aq_writehead.
+	 */
+	alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
+	alq->aq_getpost.ae_bytesused = len;
 
+	return (&alq->aq_getpost);
+}
 
-	return (ale);
+struct ale *
+alq_get(struct alq *alq, int flags)
+{
+	/* Should only be called in fixed length message (legacy) mode. */
+	KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
+	    ("%s: fixed length get on variable length queue", __func__));
+	return (alq_getn(alq, alq->aq_entlen, flags));
 }
 
 void
-alq_post(struct alq *alq, struct ale *ale)
+alq_postn(struct alq *alq, struct ale *ale, int flags)
 {
 	int activate;
 
-	ale->ae_flags |= AE_VALID;
+	activate = 0;
 
-	if (alq->aq_entvalid == NULL)
-		alq->aq_entvalid = ale;
-
-	if ((alq->aq_flags & AQ_ACTIVE) == 0) {
+	if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
 		alq->aq_flags |= AQ_ACTIVE;
 		activate = 1;
-	} else
-		activate = 0;
+	}
+

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201003100506.o2A56lOs036765>