Skip site navigation (1)Skip section navigation (2)
Date:      Mon, 26 Apr 2010 13:48:23 +0000 (UTC)
From:      Lawrence Stewart <lstewart@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org
Subject:   svn commit: r207223 - in head: share/man/man9 sys/kern sys/sys
Message-ID:  <201004261348.o3QDmN3H000249@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: lstewart
Date: Mon Apr 26 13:48:22 2010
New Revision: 207223
URL: http://svn.freebsd.org/changeset/base/207223

Log:
  - Rework the underlying ALQ storage to be a circular buffer, which amongst other
    things allows variable length messages to be easily supported.
  
  - Extend KPI with alq_writen() and alq_getn() to support variable length
    messages, which is enabled at ALQ creation time depending on the
    arguments passed to alq_open(). Also add variants of alq_open() and
    alq_post() that accept a flags argument. The KPI is still fully
    backwards compatible and shouldn't require any change in ALQ consumers
    unless they wish to utilise the new features.
  
  - Introduce the ALQ_NOACTIVATE and ALQ_ORDERED flags to allow ALQ consumers
    to have more control over IO scheduling and resource acquisition
    respectively.
  
  - Strengthen invariants checking.
  
  - Document ALQ changes in ALQ(9) man page.
  
  Sponsored by:	FreeBSD Foundation
  Reviewed by:	gnn, jeff, rpaulo, rwatson
  MFC after:	1 month

Modified:
  head/share/man/man9/alq.9
  head/sys/kern/kern_alq.c
  head/sys/sys/alq.h

Modified: head/share/man/man9/alq.9
==============================================================================
--- head/share/man/man9/alq.9	Mon Apr 26 12:03:55 2010	(r207222)
+++ head/share/man/man9/alq.9	Mon Apr 26 13:48:22 2010	(r207223)
@@ -1,7 +1,13 @@
 .\"
 .\" Copyright (c) 2003 Hiten Pandya <hmp@FreeBSD.org>
+.\" Copyright (c) 2009-2010 The FreeBSD Foundation
 .\" All rights reserved.
 .\"
+.\" Portions of this software were developed at the Centre for Advanced
+.\" Internet Architectures, Swinburne University of Technology, Melbourne,
+.\" Australia by Lawrence Stewart under sponsorship from the FreeBSD
+.\" Foundation.
+.\"
 .\" Redistribution and use in source and binary forms, with or without
 .\" modification, are permitted provided that the following conditions
 .\" are met:
@@ -25,21 +31,34 @@
 .\"
 .\" $FreeBSD$
 .\"
-.Dd May 16, 2003
+.Dd April 26, 2010
 .Dt ALQ 9
 .Os
 .Sh NAME
 .Nm alq ,
+.Nm alq_open_flags ,
 .Nm alq_open ,
+.Nm alq_writen ,
 .Nm alq_write ,
 .Nm alq_flush ,
 .Nm alq_close ,
+.Nm alq_getn ,
 .Nm alq_get ,
+.Nm alq_post_flags ,
 .Nm alq_post
 .Nd Asynchronous Logging Queues
 .Sh SYNOPSIS
 .In sys/alq.h
 .Ft int
+.Fo alq_open_flags
+.Fa "struct alq **app"
+.Fa "const char *file"
+.Fa "struct ucred *cred"
+.Fa "int cmode"
+.Fa "int size"
+.Fa "int flags"
+.Fc
+.Ft int
 .Fo alq_open
 .Fa "struct alq **app"
 .Fa "const char *file"
@@ -49,19 +68,25 @@
 .Fa "int count"
 .Fc
 .Ft int
-.Fn alq_write "struct alq *alq" "void *data" "int waitok"
+.Fn alq_writen "struct alq *alq" "void *data" "int len" "int flags"
+.Ft int
+.Fn alq_write "struct alq *alq" "void *data" "int flags"
 .Ft void
 .Fn alq_flush "struct alq *alq"
 .Ft void
 .Fn alq_close "struct alq *alq"
 .Ft struct ale *
-.Fn alq_get "struct alq *alq" "int waitok"
+.Fn alq_getn "struct alq *alq" "int len" "int flags"
+.Ft struct ale *
+.Fn alq_get "struct alq *alq" "int flags"
+.Ft void
+.Fn alq_post_flags "struct alq *alq" "struct ale *ale" "int flags"
 .Ft void
 .Fn alq_post "struct alq *alq" "struct ale *ale"
 .Sh DESCRIPTION
 The
 .Nm
-facility provides an asynchronous fixed length recording
+facility provides an asynchronous fixed or variable length recording
 mechanism, known as Asynchronous Logging Queues.
 It can record to any
 .Xr vnode 9 ,
@@ -81,26 +106,37 @@ is defined as
 which has the following members:
 .Bd -literal -offset indent
 struct ale {
-	struct ale	*ae_next;	/* Next Entry */
-	char		*ae_data;	/* Entry buffer */
-	int		ae_flags;	/* Entry flags */
+	intptr_t	ae_bytesused;	/* # bytes written to ALE. */
+	char		*ae_data;	/* Write ptr. */
+	int		ae_pad;		/* Unused, compat. */
 };
 .Ed
 .Pp
-The
-.Va ae_flags
-field is for internal use, clients of the
+An
 .Nm
-interface should not modify this field.
-Behaviour is undefined if this field is modified.
+can be created in either fixed or variable length mode.
+A variable length
+.Nm
+accommodates writes of varying length using
+.Fn alq_writen
+and
+.Fn alq_getn .
+A fixed length
+.Nm
+accommodates a fixed number of writes using
+.Fn alq_write
+and
+.Fn alq_get ,
+each of fixed size (set at queue creation time).
+Fixed length mode is deprecated in favour of variable length mode.
 .Sh FUNCTIONS
 The
-.Fn alq_open
-function creates a new logging queue.
+.Fn alq_open_flags
+function creates a new variable length asynchronous logging queue.
 The
 .Fa file
-argument is the name of the file to open for logging; if the file does not
-yet exist,
+argument is the name of the file to open for logging.
+If the file does not yet exist,
 .Fn alq_open
 will attempt to create it.
 The
@@ -112,33 +148,99 @@ as the requested creation mode, to be us
 Consumers of this API may wish to pass
 .Dv ALQ_DEFAULT_CMODE ,
 a default creation mode suitable for most applications.
-The argument
+The
 .Fa cred
-specifies the credentials to use when opening and performing I/O on the file.
-The size of each entry in the queue is determined by
-.Fa size .
+argument specifies the credentials to use when opening and performing I/O on the file.
 The
+.Fa size
+argument sets the size (in bytes) of the underlying queue.
+The ALQ_ORDERED flag may be passed in via
+.Fa flags
+to indicate that the ordering of writer threads waiting for a busy
+.Nm
+to free up resources should be preserved.
+.Pp
+The deprecated
+.Fn alq_open
+function is implemented as a wrapper around
+.Fn alq_open_flags
+to provide backwards compatibility to consumers that have not been updated to
+utilise the newer
+.Fn alq_open_flags
+function.
+It passes all arguments through to
+.Fn alq_open_flags
+untouched except for
+.Fa size
+and
+.Fa count ,
+and sets
+.Fa flags
+to 0.
+To create a variable length mode
+.Nm ,
+the
+.Fa size
+argument should be set to the size (in bytes) of the underlying queue and the
+.Fa count
+argument should be set to 0.
+To create a fixed length mode
+.Nm ,
+the
+.Fa size
+argument should be set to the size (in bytes) of each write and the
 .Fa count
-argument determines the number of items to be stored in the
-asynchronous queue over an approximate period of a disk
-write operation.
+argument should be set to the number of
+.Fa size
+byte chunks to reserve capacity for.
 .Pp
 The
-.Fn alq_write
+.Fn alq_writen
 function writes
+.Fa len
+bytes from
 .Fa data
-to the designated queue,
+to the designated variable length mode queue
 .Fa alq .
-In the event that
-.Fn alq_write
-could not write the entry immediately, and
+If
+.Fn alq_writen
+could not write the entry immediately and
 .Dv ALQ_WAITOK
-is passed to
-.Fa waitok ,
-then
+is set in
+.Fa flags ,
+the function will be allowed to
+.Xr msleep_spin 9
+with the
+.Dq Li alqwnord
+or
+.Dq Li alqwnres
+wait message.
+A write will automatically schedule the queue
+.Fa alq
+to be flushed to disk.
+This behaviour can be controlled by passing ALQ_NOACTIVATE via
+.Fa flags
+to indicate that the write should not schedule
+.Fa alq
+to be flushed to disk.
+.Pp
+The deprecated
+.Fn alq_write
+function is implemented as a wrapper around
+.Fn alq_writen
+to provide backwards compatibility to consumers that have not been updated to
+utilise variable length mode queues.
+The function will write
+.Fa size
+bytes of data (where
+.Fa size
+was specified at queue creation time) from the
+.Fa data
+buffer to the
+.Fa alq .
+Note that it is an error to call
 .Fn alq_write
-will be allowed to
-.Xr tsleep 9 .
+on a variable length mode queue.
 .Pp
 The
 .Fn alq_flush
@@ -146,61 +248,136 @@ function is used for flushing
 .Fa alq
 to the log medium that was passed to
 .Fn alq_open .
+If
+.Fa alq
+has data to flush and is not already in the process of being flushed, the
+function will block doing IO.
+Otherwise, the function will return immediately.
 .Pp
 The
 .Fn alq_close
-function will close the asynchronous logging queue,
-.Fa alq ,
+function will close the asynchronous logging queue
+.Fa alq
 and flush all pending write requests to the log medium.
 It will free all resources that were previously allocated.
 .Pp
 The
-.Fn alq_get
-function returns the next available asynchronous logging entry
-from the queue,
-.Fa alq .
-This function leaves the queue in a locked state, until a subsequent
+.Fn alq_getn
+function returns an asynchronous log entry from
+.Fa alq ,
+initialised to point at a buffer capable of receiving
+.Fa len
+bytes of data.
+This function leaves
+.Fa alq
+in a locked state, until a subsequent
 .Fn alq_post
+or
+.Fn alq_post_flags
 call is made.
-In the event that
-.Fn alq_get
-could not retrieve an entry immediately, it will
-.Xr tsleep 9
+If
+.Fn alq_getn
+could not obtain
+.Fa len
+bytes of buffer immediately and
+.Dv ALQ_WAITOK
+is set in
+.Fa flags ,
+the function will be allowed to
+.Xr msleep_spin 9
 with the
-.Dq Li alqget
+.Dq Li alqgnord
+or
+.Dq Li alqgnres
 wait message.
+The caller can choose to write less than
+.Fa len
+bytes of data to the returned asynchronous log entry by setting the entry's
+ae_bytesused field to the number of bytes actually written.
+This must be done prior to calling
+.Fn alq_post .
 .Pp
-The
-.Fn alq_post
-function schedules the asynchronous logging entry,
-.Fa ale ,
-which is retrieved using the
+The deprecated
 .Fn alq_get
-function,
-for writing to the asynchronous logging queue,
+function is implemented as a wrapper around
+.Fn alq_getn
+to provide backwards compatibility to consumers that have not been updated to
+utilise variable length mode queues.
+The asynchronous log entry returned will be initialised to point at a buffer
+capable of receiving
+.Fa size
+bytes of data (where
+.Fa size
+was specified at queue creation time).
+Note that it is an error to call
+.Fn alq_get
+on a variable length mode queue.
+.Pp
+The
+.Fn alq_post_flags
+function schedules the asynchronous log entry
+.Fa ale
+(obtained from
+.Fn alq_getn
+or
+.Fn alq_get )
+for writing to
 .Fa alq .
-This function leaves the queue,
-.Fa alq ,
+The ALQ_NOACTIVATE flag may be passed in via
+.Fa flags
+to indicate that the queue should not be immediately scheduled to be flushed to
+disk.
+This function leaves
+.Fa alq
 in an unlocked state.
+.Pp
+The
+.Fn alq_post
+function is implemented as a wrapper around
+.Fn alq_post_flags
+to provide backwards compatibility to consumers that have not been updated to
+utilise the newer
+.Fn alq_post_flags
+function.
+It simply passes all arguments through to
+.Fn alq_post_flags
+untouched, and sets
+.Fa flags
+to 0.
 .Sh IMPLEMENTATION NOTES
 The
+.Fn alq_writen
+and
 .Fn alq_write
-function is a wrapper around the
+functions both perform a
+.Xr bcopy 3
+from the supplied
+.Fa data
+buffer into the underlying
+.Nm
+buffer.
+Performance critical code paths may wish to consider using
+.Fn alq_getn
+(variable length queues) or
+.Fn alq_get
+(fixed length queues) to avoid the extra memory copy. Note that a queue
+remains locked between calls to
+.Fn alq_getn
+or
 .Fn alq_get
 and
 .Fn alq_post
-functions; by using these functions separately, a call
-to
-.Fn bcopy
-can be avoided for performance critical code paths.
+or
+.Fn alq_post_flags ,
+so this method of writing to a queue is unsuitable for situations where the
+time between calls may be substantial.
 .Sh LOCKING
-Each asynchronous queue is protected by a spin mutex.
+Each asynchronous logging queue is protected by a spin mutex.
 .Pp
 Functions
-.Fn alq_flush ,
-.Fn alq_open
+.Fn alq_flush
 and
-.Fn alq_post
+.Fn alq_open
 may attempt to acquire an internal sleep mutex, and should
 consequently not be used in contexts where sleeping is
 not allowed.
@@ -214,32 +391,36 @@ if it fails to open
 or else it returns 0.
 .Pp
 The
+.Fn alq_writen
+and
 .Fn alq_write
-function returns
+functions return
 .Er EWOULDBLOCK
 if
 .Dv ALQ_NOWAIT
-was provided as a value to
-.Fa waitok
-and either the queue is full, or when the system is shutting down.
+was set in
+.Fa flags
+and either the queue is full or the system is shutting down.
 .Pp
 The
+.Fn alq_getn
+and
 .Fn alq_get
-function returns
-.Dv NULL ,
+functions return
+.Dv NULL
 if
 .Dv ALQ_NOWAIT
-was provided as a value to
-.Fa waitok
-and either the queue is full, or when the system is shutting down.
+was set in
+.Fa flags
+and either the queue is full or the system is shutting down.
 .Pp
 NOTE: invalid arguments to non-void functions will result in
 undefined behaviour.
 .Sh SEE ALSO
-.Xr syslog 3 ,
-.Xr kthread 9 ,
+.Xr kproc 9 ,
 .Xr ktr 9 ,
-.Xr tsleep 9 ,
+.Xr msleep_spin 9 ,
+.Xr syslog 3 ,
 .Xr vnode 9
 .Sh HISTORY
 The
@@ -250,7 +431,11 @@ Asynchronous Logging Queues (ALQ) facili
 The
 .Nm
 facility was written by
-.An Jeffrey Roberson Aq jeff@FreeBSD.org .
+.An Jeffrey Roberson Aq jeff@FreeBSD.org
+and extended by
+.An Lawrence Stewart Aq lstewart@freebsd.org .
 .Pp
 This manual page was written by
-.An Hiten Pandya Aq hmp@FreeBSD.org .
+.An Hiten Pandya Aq hmp@FreeBSD.org
+and revised by
+.An Lawrence Stewart Aq lstewart@freebsd.org .

Modified: head/sys/kern/kern_alq.c
==============================================================================
--- head/sys/kern/kern_alq.c	Mon Apr 26 12:03:55 2010	(r207222)
+++ head/sys/kern/kern_alq.c	Mon Apr 26 13:48:22 2010	(r207223)
@@ -55,16 +55,23 @@ __FBSDID("$FreeBSD$");
 
 /* Async. Logging Queue */
 struct alq {
+	char	*aq_entbuf;		/* Buffer for stored entries */
 	int	aq_entmax;		/* Max entries */
 	int	aq_entlen;		/* Entry length */
-	char	*aq_entbuf;		/* Buffer for stored entries */
+	int	aq_freebytes;		/* Bytes available in buffer */
+	int	aq_buflen;		/* Total length of our buffer */
+	int	aq_writehead;		/* Location for next write */
+	int	aq_writetail;		/* Flush starts at this location */
+	int	aq_wrapearly;		/* # bytes left blank at end of buf */
 	int	aq_flags;		/* Queue flags */
+	int	aq_waiters;		/* Num threads waiting for resources
+					 * NB: Used as a wait channel so must
+					 * not be first field in the alq struct
+					 */
+	struct	ale	aq_getpost;	/* ALE for use by get/post */
 	struct mtx	aq_mtx;		/* Queue lock */
 	struct vnode	*aq_vp;		/* Open vnode handle */
 	struct ucred	*aq_cred;	/* Credentials of the opening thread */
-	struct ale	*aq_first;	/* First ent */
-	struct ale	*aq_entfree;	/* First free ent */
-	struct ale	*aq_entvalid;	/* First ent valid for writing */
 	LIST_ENTRY(alq)	aq_act;		/* List of active queues */
 	LIST_ENTRY(alq)	aq_link;	/* List of all queues */
 };
@@ -73,10 +80,14 @@ struct alq {
 #define	AQ_ACTIVE	0x0002		/* on the active list */
 #define	AQ_FLUSHING	0x0004		/* doing IO */
 #define	AQ_SHUTDOWN	0x0008		/* Queue no longer valid */
+#define	AQ_ORDERED	0x0010		/* Queue enforces ordered writes */
+#define	AQ_LEGACY	0x0020		/* Legacy queue (fixed length writes) */
 
 #define	ALQ_LOCK(alq)	mtx_lock_spin(&(alq)->aq_mtx)
 #define	ALQ_UNLOCK(alq)	mtx_unlock_spin(&(alq)->aq_mtx)
 
+#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
+
 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
 
 /*
@@ -205,7 +216,7 @@ ald_daemon(void)
 		needwakeup = alq_doio(alq);
 		ALQ_UNLOCK(alq);
 		if (needwakeup)
-			wakeup(alq);
+			wakeup_one(alq);
 		ALD_LOCK();
 	}
 
@@ -252,6 +263,20 @@ alq_shutdown(struct alq *alq)
 	/* Stop any new writers. */
 	alq->aq_flags |= AQ_SHUTDOWN;
 
+	/*
+	 * If the ALQ isn't active but has unwritten data (possible if
+	 * the ALQ_NOACTIVATE flag has been used), explicitly activate the
+	 * ALQ here so that the pending data gets flushed by the ald_daemon.
+	 */
+	if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
+		alq->aq_flags |= AQ_ACTIVE;
+		ALQ_UNLOCK(alq);
+		ALD_LOCK();
+		ald_activate(alq);
+		ALD_UNLOCK();
+		ALQ_LOCK(alq);
+	}
+
 	/* Drain IO */
 	while (alq->aq_flags & AQ_ACTIVE) {
 		alq->aq_flags |= AQ_WANTED;
@@ -271,7 +296,6 @@ alq_destroy(struct alq *alq)
 	alq_shutdown(alq);
 
 	mtx_destroy(&alq->aq_mtx);
-	free(alq->aq_first, M_ALD);
 	free(alq->aq_entbuf, M_ALD);
 	free(alq, M_ALD);
 }
@@ -287,46 +311,54 @@ alq_doio(struct alq *alq)
 	struct vnode *vp;
 	struct uio auio;
 	struct iovec aiov[2];
-	struct ale *ale;
-	struct ale *alstart;
 	int totlen;
 	int iov;
 	int vfslocked;
+	int wrapearly;
+
+	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
 
 	vp = alq->aq_vp;
 	td = curthread;
 	totlen = 0;
-	iov = 0;
-
-	alstart = ale = alq->aq_entvalid;
-	alq->aq_entvalid = NULL;
+	iov = 1;
+	wrapearly = alq->aq_wrapearly;
 
 	bzero(&aiov, sizeof(aiov));
 	bzero(&auio, sizeof(auio));
 
-	do {
-		if (aiov[iov].iov_base == NULL)
-			aiov[iov].iov_base = ale->ae_data;
-		aiov[iov].iov_len += alq->aq_entlen;
-		totlen += alq->aq_entlen;
-		/* Check to see if we're wrapping the buffer */
-		if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data)
-			iov++;
-		ale->ae_flags &= ~AE_VALID;
-		ale = ale->ae_next;
-	} while (ale->ae_flags & AE_VALID);
+	/* Start the write from the location of our buffer tail pointer. */
+	aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
+
+	if (alq->aq_writetail < alq->aq_writehead) {
+		/* Buffer not wrapped. */
+		totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
+	} else if (alq->aq_writehead == 0) {
+		/* Buffer not wrapped (special case to avoid an empty iov). */
+		totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
+		    wrapearly;
+	} else {
+		/*
+		 * Buffer wrapped, requires 2 aiov entries:
+		 * - first is from writetail to end of buffer
+		 * - second is from start of buffer to writehead
+		 */
+		aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
+		    wrapearly;
+		iov++;
+		aiov[1].iov_base = alq->aq_entbuf;
+		aiov[1].iov_len =  alq->aq_writehead;
+		totlen = aiov[0].iov_len + aiov[1].iov_len;
+	}
 
 	alq->aq_flags |= AQ_FLUSHING;
 	ALQ_UNLOCK(alq);
 
-	if (iov == 2 || aiov[iov].iov_base == NULL)
-		iov--;
-
 	auio.uio_iov = &aiov[0];
 	auio.uio_offset = 0;
 	auio.uio_segflg = UIO_SYSSPACE;
 	auio.uio_rw = UIO_WRITE;
-	auio.uio_iovcnt = iov + 1;
+	auio.uio_iovcnt = iov;
 	auio.uio_resid = totlen;
 	auio.uio_td = td;
 
@@ -350,8 +382,28 @@ alq_doio(struct alq *alq)
 	ALQ_LOCK(alq);
 	alq->aq_flags &= ~AQ_FLUSHING;
 
-	if (alq->aq_entfree == NULL)
-		alq->aq_entfree = alstart;
+	/* Adjust writetail as required, taking into account wrapping. */
+	alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
+	    alq->aq_buflen;
+	alq->aq_freebytes += totlen + wrapearly;
+
+	/*
+	 * If we just flushed part of the buffer which wrapped, reset the
+	 * wrapearly indicator.
+	 */
+	if (wrapearly)
+		alq->aq_wrapearly = 0;
+
+	/*
+	 * If we just flushed the buffer completely, reset indexes to 0 to
+	 * minimise buffer wraps.
+	 * This is also required to ensure alq_getn() can't wedge itself.
+	 */
+	if (!HAS_PENDING_DATA(alq))
+		alq->aq_writehead = alq->aq_writetail = 0;
+
+	KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
+	    ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
 
 	if (alq->aq_flags & AQ_WANTED) {
 		alq->aq_flags &= ~AQ_WANTED;
@@ -376,27 +428,27 @@ SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, 
 /*
  * Create the queue data structure, allocate the buffer, and open the file.
  */
+
 int
-alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
-    int size, int count)
+alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+    int size, int flags)
 {
 	struct thread *td;
 	struct nameidata nd;
-	struct ale *ale;
-	struct ale *alp;
 	struct alq *alq;
-	char *bufp;
-	int flags;
+	int oflags;
 	int error;
-	int i, vfslocked;
+	int vfslocked;
+
+	KASSERT((size > 0), ("%s: size <= 0", __func__));
 
 	*alqp = NULL;
 	td = curthread;
 
 	NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
-	flags = FWRITE | O_NOFOLLOW | O_CREAT;
+	oflags = FWRITE | O_NOFOLLOW | O_CREAT;
 
-	error = vn_open_cred(&nd, &flags, cmode, 0, cred, NULL);
+	error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
 	if (error)
 		return (error);
 
@@ -407,31 +459,20 @@ alq_open(struct alq **alqp, const char *
 	VFS_UNLOCK_GIANT(vfslocked);
 
 	alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
-	alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO);
-	alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO);
 	alq->aq_vp = nd.ni_vp;
 	alq->aq_cred = crhold(cred);
-	alq->aq_entmax = count;
-	alq->aq_entlen = size;
-	alq->aq_entfree = alq->aq_first;
 
 	mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
 
-	bufp = alq->aq_entbuf;
-	ale = alq->aq_first;
-	alp = NULL;
-
-	/* Match up entries with buffers */
-	for (i = 0; i < count; i++) {
-		if (alp)
-			alp->ae_next = ale;
-		ale->ae_data = bufp;
-		alp = ale;
-		ale++;
-		bufp += size;
-	}
-
-	alp->ae_next = alq->aq_first;
+	alq->aq_buflen = size;
+	alq->aq_entmax = 0;
+	alq->aq_entlen = 0;
+
+	alq->aq_freebytes = alq->aq_buflen;
+	alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
+	alq->aq_writehead = alq->aq_writetail = 0;
+	if (flags & ALQ_ORDERED)
+		alq->aq_flags |= AQ_ORDERED;
 
 	if ((error = ald_add(alq)) != 0) {
 		alq_destroy(alq);
@@ -443,77 +484,405 @@ alq_open(struct alq **alqp, const char *
 	return (0);
 }
 
+int
+alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+    int size, int count)
+{
+	int ret;
+
+	KASSERT((count >= 0), ("%s: count < 0", __func__));
+
+	if (count > 0) {
+		ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
+		(*alqp)->aq_flags |= AQ_LEGACY;
+		(*alqp)->aq_entmax = count;
+		(*alqp)->aq_entlen = size;
+	} else
+		ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
+
+	return (ret);
+}
+
+
 /*
  * Copy a new entry into the queue.  If the operation would block either
  * wait or return an error depending on the value of waitok.
  */
 int
-alq_write(struct alq *alq, void *data, int waitok)
+alq_writen(struct alq *alq, void *data, int len, int flags)
 {
-	struct ale *ale;
+	int activate, copy, ret;
+	void *waitchan;
+
+	KASSERT((len > 0 && len <= alq->aq_buflen),
+	    ("%s: len <= 0 || len > aq_buflen", __func__));
 
-	if ((ale = alq_get(alq, waitok)) == NULL)
+	activate = ret = 0;
+	copy = len;
+	waitchan = NULL;
+
+	ALQ_LOCK(alq);
+
+	/*
+	 * Fail to perform the write and return EWOULDBLOCK if:
+	 * - The message is larger than our underlying buffer.
+	 * - The ALQ is being shutdown.
+	 * - There is insufficient free space in our underlying buffer
+	 *   to accept the message and the user can't wait for space.
+	 * - There is insufficient free space in our underlying buffer
+	 *   to accept the message and the alq is inactive due to prior
+	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
+	 */
+	if (len > alq->aq_buflen ||
+	    alq->aq_flags & AQ_SHUTDOWN ||
+	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
+	    HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
+		ALQ_UNLOCK(alq);
 		return (EWOULDBLOCK);
+	}
 
-	bcopy(data, ale->ae_data, alq->aq_entlen);
-	alq_post(alq, ale);
+	/*
+	 * If we want ordered writes and there is already at least one thread
+	 * waiting for resources to become available, sleep until we're woken.
+	 */
+	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
+		KASSERT(!(flags & ALQ_NOWAIT),
+		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
+		alq->aq_waiters++;
+		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
+		alq->aq_waiters--;
+	}
 
-	return (0);
+	/*
+	 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
+	 * enter while loop and sleep until we have enough free bytes (former)
+	 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
+	 * be in this loop. Otherwise, multiple threads may be sleeping here
+	 * competing for ALQ resources.
+	 */
+	while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
+		KASSERT(!(flags & ALQ_NOWAIT),
+		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
+		alq->aq_flags |= AQ_WANTED;
+		alq->aq_waiters++;
+		if (waitchan)
+			wakeup(waitchan);
+		msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
+		alq->aq_waiters--;
+
+		/*
+		 * If we're the first thread to wake after an AQ_WANTED wakeup
+		 * but there isn't enough free space for us, we're going to loop
+		 * and sleep again. If there are other threads waiting in this
+		 * loop, schedule a wakeup so that they can see if the space
+		 * they require is available.
+		 */
+		if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
+		    alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
+			waitchan = alq;
+		else
+			waitchan = NULL;
+	}
+
+	/*
+	 * If there are waiters, we need to signal the waiting threads after we
+	 * complete our work. The alq ptr is used as a wait channel for threads
+	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
+	 * are not allowed to concurrently compete for resources in the above
+	 * while loop, so we use a different wait channel in this case.
+	 */
+	if (alq->aq_waiters > 0) {
+		if (alq->aq_flags & AQ_ORDERED)
+			waitchan = &alq->aq_waiters;
+		else
+			waitchan = alq;
+	} else
+		waitchan = NULL;
+
+	/* Bail if we're shutting down. */
+	if (alq->aq_flags & AQ_SHUTDOWN) {
+		ret = EWOULDBLOCK;
+		goto unlock;
+	}
+
+	/*
+	 * If we need to wrap the buffer to accommodate the write,
+	 * we'll need 2 calls to bcopy.
+	 */
+	if ((alq->aq_buflen - alq->aq_writehead) < len)
+		copy = alq->aq_buflen - alq->aq_writehead;
+
+	/* Copy message (or part thereof if wrap required) to the buffer. */
+	bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
+	alq->aq_writehead += copy;
+
+	if (alq->aq_writehead >= alq->aq_buflen) {
+		KASSERT((alq->aq_writehead == alq->aq_buflen),
+		    ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
+		    __func__,
+		    alq->aq_writehead,
+		    alq->aq_buflen));
+		alq->aq_writehead = 0;
+	}
+
+	if (copy != len) {
+		/*
+		 * Wrap the buffer by copying the remainder of our message
+		 * to the start of the buffer and resetting aq_writehead.
+		 */
+		bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
+		alq->aq_writehead = len - copy;
+	}
+
+	KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
+	    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
+
+	alq->aq_freebytes -= len;
+
+	if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
+		alq->aq_flags |= AQ_ACTIVE;
+		activate = 1;
+	}
+
+	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
+
+unlock:
+	ALQ_UNLOCK(alq);
+
+	if (activate) {
+		ALD_LOCK();
+		ald_activate(alq);
+		ALD_UNLOCK();
+	}
+
+	/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
+	if (waitchan != NULL)
+		wakeup_one(waitchan);
+
+	return (ret);
+}
+
+int
+alq_write(struct alq *alq, void *data, int flags)
+{
+	/* Should only be called in fixed length message (legacy) mode. */
+	KASSERT((alq->aq_flags & AQ_LEGACY),
+	    ("%s: fixed length write on variable length queue", __func__));
+	return (alq_writen(alq, data, alq->aq_entlen, flags));
 }
 
+/*
+ * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
+ */
 struct ale *
-alq_get(struct alq *alq, int waitok)
+alq_getn(struct alq *alq, int len, int flags)
 {
-	struct ale *ale;
-	struct ale *aln;
+	int contigbytes;
+	void *waitchan;
+
+	KASSERT((len > 0 && len <= alq->aq_buflen),
+	    ("%s: len <= 0 || len > alq->aq_buflen", __func__));
 
-	ale = NULL;
+	waitchan = NULL;
 
 	ALQ_LOCK(alq);
 
-	/* Loop until we get an entry or we're shutting down */
-	while ((alq->aq_flags & AQ_SHUTDOWN) == 0 && 
-	    (ale = alq->aq_entfree) == NULL &&
-	    (waitok & ALQ_WAITOK)) {
+	/*
+	 * Determine the number of free contiguous bytes.
+	 * We ensure elsewhere that if aq_writehead == aq_writetail because
+	 * the buffer is empty, they will both be set to 0 and therefore
+	 * aq_freebytes == aq_buflen and is fully contiguous.
+	 * If they are equal and the buffer is not empty, aq_freebytes will
+	 * be 0 indicating the buffer is full.
+	 */
+	if (alq->aq_writehead <= alq->aq_writetail)
+		contigbytes = alq->aq_freebytes;
+	else {
+		contigbytes = alq->aq_buflen - alq->aq_writehead;
+
+		if (contigbytes < len) {
+			/*
+			 * Insufficient space at end of buffer to handle a
+			 * contiguous write. Wrap early if there's space at
+			 * the beginning. This will leave a hole at the end
+			 * of the buffer which we will have to skip over when
+			 * flushing the buffer to disk.
+			 */
+			if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
+				/* Keep track of # bytes left blank. */
+				alq->aq_wrapearly = contigbytes;
+				/* Do the wrap and adjust counters. */
+				contigbytes = alq->aq_freebytes =
+				    alq->aq_writetail;
+				alq->aq_writehead = 0;
+			}
+		}
+	}
+
+	/*

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201004261348.o3QDmN3H000249>