From owner-svn-src-head@freebsd.org Fri Aug 12 22:13:47 2016 Return-Path: Delivered-To: svn-src-head@mailman.ysv.freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2001:1900:2254:206a::19:1]) by mailman.ysv.freebsd.org (Postfix) with ESMTP id 204BDBB8C09; Fri, 12 Aug 2016 22:13:47 +0000 (UTC) (envelope-from cse.cem@gmail.com) Received: from mail-it0-f50.google.com (mail-it0-f50.google.com [209.85.214.50]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (Client CN "smtp.gmail.com", Issuer "Google Internet Authority G2" (verified OK)) by mx1.freebsd.org (Postfix) with ESMTPS id E84C317C6; Fri, 12 Aug 2016 22:13:46 +0000 (UTC) (envelope-from cse.cem@gmail.com) Received: by mail-it0-f50.google.com with SMTP id f6so26029622ith.0; Fri, 12 Aug 2016 15:13:46 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:reply-to:in-reply-to:references :from:date:message-id:subject:to:cc; bh=4/dLsJZHgCjW4OnbihVCR6YlKxqEpgQaTeY+8bvcxyY=; b=Mb7VmYu6UhD07ne8fmA0s1aK4k7DglpKuLykFlI2X0KUyUTNMhI0v1mP5OlGEPXlhE kVqpafwmZ8ZythOdQ7i/0eZ4r0cptF7jCpC5dDqLmCFAnYa6r885LpNCZhZnLZLGzNlo YMM30QFHaLPZ6YR+FV1DIJEFKmNhWWCg4Kxkbzja4ypAk/K0kedPum00Oftbrl/vc86I jZRbGH+ipWL0EZfUXr9rxguHEBw/jzRUZAAdjL874x3qcPKxUq0WP+bI6F7Ay0ZzQhcJ ubWrkjrS67hdwN+cb0ukYMXES6f8uxk94Ri038idl1n3iJGR0uDLP0EdyjdbVSz1YYwE 2GuA== X-Gm-Message-State: AEkoousKMmuqjDAPmspcU+/254A7HI1jlGWBLAfi/dbHzcJ6sBKHY+47n/WD/UstT2GCmQ== X-Received: by 10.36.224.197 with SMTP id c188mr1359939ith.21.1471040019249; Fri, 12 Aug 2016 15:13:39 -0700 (PDT) Received: from mail-io0-f180.google.com (mail-io0-f180.google.com. [209.85.223.180]) by smtp.gmail.com with ESMTPSA id l124sm4309247ioe.26.2016.08.12.15.13.39 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 12 Aug 2016 15:13:39 -0700 (PDT) Received: by mail-io0-f180.google.com with SMTP id q83so36946021iod.1; Fri, 12 Aug 2016 15:13:39 -0700 (PDT) X-Received: by 10.107.28.11 with SMTP id c11mr23794747ioc.7.1471040018324; Fri, 12 Aug 2016 15:13:38 -0700 (PDT) MIME-Version: 1.0 Reply-To: cem@freebsd.org Received: by 10.36.220.129 with HTTP; Fri, 12 Aug 2016 15:13:37 -0700 (PDT) In-Reply-To: <201608122129.u7CLTiig028007@repo.freebsd.org> References: <201608122129.u7CLTiig028007@repo.freebsd.org> From: Conrad Meyer Date: Fri, 12 Aug 2016 15:13:37 -0700 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: svn commit: r304021 - in head/sys: conf kern net sys To: Stephen Hurd Cc: src-committers , svn-src-all@freebsd.org, svn-src-head@freebsd.org Content-Type: text/plain; charset=UTF-8 X-BeenThere: svn-src-head@freebsd.org X-Mailman-Version: 2.1.22 Precedence: list List-Id: SVN commit messages for the src tree for head/-current List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 12 Aug 2016 22:13:47 -0000 A manual page for gtaskqueue would be nice, if you have time. Thanks, Conrad On Fri, Aug 12, 2016 at 2:29 PM, Stephen Hurd wrote: > Author: shurd (ports committer) > Date: Fri Aug 12 21:29:44 2016 > New Revision: 304021 > URL: https://svnweb.freebsd.org/changeset/base/304021 > > Log: > Update iflib to support more NIC designs > > - Move group task queue into kern/subr_gtaskqueue.c > - Change intr_enable to return an int so it can be detected if it's not > implemented > - Allow different TX/RX queues per set to be different sizes > - Don't split up TX mbufs before transmit > - Allow a completion queue for TX as well as RX > - Pass the RX budget to isc_rxd_available() to allow an earlier return > and avoid multiple calls > > Submitted by: shurd > Reviewed by: gallatin > Approved by: scottl > Differential Revision: https://reviews.freebsd.org/D7393 > > Added: > head/sys/kern/subr_gtaskqueue.c (contents, props changed) > head/sys/sys/gtaskqueue.h (contents, props changed) > Modified: > head/sys/conf/files > head/sys/kern/subr_taskqueue.c > head/sys/net/ifdi_if.m > head/sys/net/iflib.c > head/sys/net/iflib.h > head/sys/sys/_task.h > head/sys/sys/taskqueue.h > > Modified: head/sys/conf/files > ============================================================================== > --- head/sys/conf/files Fri Aug 12 20:33:23 2016 (r304020) > +++ head/sys/conf/files Fri Aug 12 21:29:44 2016 (r304021) > @@ -3349,6 +3349,7 @@ kern/subr_disk.c standard > kern/subr_eventhandler.c standard > kern/subr_fattime.c standard > kern/subr_firmware.c optional firmware > +kern/subr_gtaskqueue.c standard > kern/subr_hash.c standard > kern/subr_hints.c standard > kern/subr_kdb.c standard > > Added: head/sys/kern/subr_gtaskqueue.c > ============================================================================== > --- /dev/null 00:00:00 1970 (empty, because file is newly added) > +++ head/sys/kern/subr_gtaskqueue.c Fri Aug 12 21:29:44 2016 (r304021) > @@ -0,0 +1,864 @@ > +/*- > + * Copyright (c) 2000 Doug Rabson > + * Copyright (c) 2014 Jeff Roberson > + * Copyright (c) 2016 Matthew Macy > + * All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * 1. Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * 2. Redistributions in binary form must reproduce the above copyright > + * notice, this list of conditions and the following disclaimer in the > + * documentation and/or other materials provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE > + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE > + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE > + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS > + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) > + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT > + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY > + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > + > +#include > +__FBSDID("$FreeBSD$"); > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +static MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues"); > +static void gtaskqueue_thread_enqueue(void *); > +static void gtaskqueue_thread_loop(void *arg); > + > + > +struct gtaskqueue_busy { > + struct gtask *tb_running; > + TAILQ_ENTRY(gtaskqueue_busy) tb_link; > +}; > + > +static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1; > + > +struct gtaskqueue { > + STAILQ_HEAD(, gtask) tq_queue; > + gtaskqueue_enqueue_fn tq_enqueue; > + void *tq_context; > + char *tq_name; > + TAILQ_HEAD(, gtaskqueue_busy) tq_active; > + struct mtx tq_mutex; > + struct thread **tq_threads; > + int tq_tcount; > + int tq_spin; > + int tq_flags; > + int tq_callouts; > + taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; > + void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; > +}; > + > +#define TQ_FLAGS_ACTIVE (1 << 0) > +#define TQ_FLAGS_BLOCKED (1 << 1) > +#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) > + > +#define DT_CALLOUT_ARMED (1 << 0) > + > +#define TQ_LOCK(tq) \ > + do { \ > + if ((tq)->tq_spin) \ > + mtx_lock_spin(&(tq)->tq_mutex); \ > + else \ > + mtx_lock(&(tq)->tq_mutex); \ > + } while (0) > +#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) > + > +#define TQ_UNLOCK(tq) \ > + do { \ > + if ((tq)->tq_spin) \ > + mtx_unlock_spin(&(tq)->tq_mutex); \ > + else \ > + mtx_unlock(&(tq)->tq_mutex); \ > + } while (0) > +#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) > + > +static __inline int > +TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, > + int t) > +{ > + if (tq->tq_spin) > + return (msleep_spin(p, m, wm, t)); > + return (msleep(p, m, pri, wm, t)); > +} > + > +static struct gtaskqueue * > +_gtaskqueue_create(const char *name, int mflags, > + taskqueue_enqueue_fn enqueue, void *context, > + int mtxflags, const char *mtxname __unused) > +{ > + struct gtaskqueue *queue; > + char *tq_name; > + > + tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO); > + if (!tq_name) > + return (NULL); > + > + snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); > + > + queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO); > + if (!queue) > + return (NULL); > + > + STAILQ_INIT(&queue->tq_queue); > + TAILQ_INIT(&queue->tq_active); > + queue->tq_enqueue = enqueue; > + queue->tq_context = context; > + queue->tq_name = tq_name; > + queue->tq_spin = (mtxflags & MTX_SPIN) != 0; > + queue->tq_flags |= TQ_FLAGS_ACTIVE; > + if (enqueue == gtaskqueue_thread_enqueue) > + queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; > + mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); > + > + return (queue); > +} > + > + > +/* > + * Signal a taskqueue thread to terminate. > + */ > +static void > +gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) > +{ > + > + while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { > + wakeup(tq); > + TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); > + } > +} > + > +static void > +gtaskqueue_free(struct gtaskqueue *queue) > +{ > + > + TQ_LOCK(queue); > + queue->tq_flags &= ~TQ_FLAGS_ACTIVE; > + gtaskqueue_terminate(queue->tq_threads, queue); > + KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); > + KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); > + mtx_destroy(&queue->tq_mutex); > + free(queue->tq_threads, M_GTASKQUEUE); > + free(queue->tq_name, M_GTASKQUEUE); > + free(queue, M_GTASKQUEUE); > +} > + > +int > +grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) > +{ > + TQ_LOCK(queue); > + if (gtask->ta_flags & TASK_ENQUEUED) { > + TQ_UNLOCK(queue); > + return (0); > + } > + STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); > + gtask->ta_flags |= TASK_ENQUEUED; > + TQ_UNLOCK(queue); > + if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) > + queue->tq_enqueue(queue->tq_context); > + return (0); > +} > + > +static void > +gtaskqueue_task_nop_fn(void *context) > +{ > +} > + > +/* > + * Block until all currently queued tasks in this taskqueue > + * have begun execution. Tasks queued during execution of > + * this function are ignored. > + */ > +static void > +gtaskqueue_drain_tq_queue(struct gtaskqueue *queue) > +{ > + struct gtask t_barrier; > + > + if (STAILQ_EMPTY(&queue->tq_queue)) > + return; > + > + /* > + * Enqueue our barrier after all current tasks, but with > + * the highest priority so that newly queued tasks cannot > + * pass it. Because of the high priority, we can not use > + * taskqueue_enqueue_locked directly (which drops the lock > + * anyway) so just insert it at tail while we have the > + * queue lock. > + */ > + GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); > + STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); > + t_barrier.ta_flags |= TASK_ENQUEUED; > + > + /* > + * Once the barrier has executed, all previously queued tasks > + * have completed or are currently executing. > + */ > + while (t_barrier.ta_flags & TASK_ENQUEUED) > + TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); > +} > + > +/* > + * Block until all currently executing tasks for this taskqueue > + * complete. Tasks that begin execution during the execution > + * of this function are ignored. > + */ > +static void > +gtaskqueue_drain_tq_active(struct gtaskqueue *queue) > +{ > + struct gtaskqueue_busy tb_marker, *tb_first; > + > + if (TAILQ_EMPTY(&queue->tq_active)) > + return; > + > + /* Block taskq_terminate().*/ > + queue->tq_callouts++; > + > + /* > + * Wait for all currently executing taskqueue threads > + * to go idle. > + */ > + tb_marker.tb_running = TB_DRAIN_WAITER; > + TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); > + while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) > + TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); > + TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); > + > + /* > + * Wakeup any other drain waiter that happened to queue up > + * without any intervening active thread. > + */ > + tb_first = TAILQ_FIRST(&queue->tq_active); > + if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) > + wakeup(tb_first); > + > + /* Release taskqueue_terminate(). */ > + queue->tq_callouts--; > + if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) > + wakeup_one(queue->tq_threads); > +} > + > +void > +gtaskqueue_block(struct gtaskqueue *queue) > +{ > + > + TQ_LOCK(queue); > + queue->tq_flags |= TQ_FLAGS_BLOCKED; > + TQ_UNLOCK(queue); > +} > + > +void > +gtaskqueue_unblock(struct gtaskqueue *queue) > +{ > + > + TQ_LOCK(queue); > + queue->tq_flags &= ~TQ_FLAGS_BLOCKED; > + if (!STAILQ_EMPTY(&queue->tq_queue)) > + queue->tq_enqueue(queue->tq_context); > + TQ_UNLOCK(queue); > +} > + > +static void > +gtaskqueue_run_locked(struct gtaskqueue *queue) > +{ > + struct gtaskqueue_busy tb; > + struct gtaskqueue_busy *tb_first; > + struct gtask *gtask; > + > + KASSERT(queue != NULL, ("tq is NULL")); > + TQ_ASSERT_LOCKED(queue); > + tb.tb_running = NULL; > + > + while (STAILQ_FIRST(&queue->tq_queue)) { > + TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); > + > + /* > + * Carefully remove the first task from the queue and > + * clear its TASK_ENQUEUED flag > + */ > + gtask = STAILQ_FIRST(&queue->tq_queue); > + KASSERT(gtask != NULL, ("task is NULL")); > + STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); > + gtask->ta_flags &= ~TASK_ENQUEUED; > + tb.tb_running = gtask; > + TQ_UNLOCK(queue); > + > + KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); > + gtask->ta_func(gtask->ta_context); > + > + TQ_LOCK(queue); > + tb.tb_running = NULL; > + wakeup(gtask); > + > + TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); > + tb_first = TAILQ_FIRST(&queue->tq_active); > + if (tb_first != NULL && > + tb_first->tb_running == TB_DRAIN_WAITER) > + wakeup(tb_first); > + } > +} > + > +static int > +task_is_running(struct gtaskqueue *queue, struct gtask *gtask) > +{ > + struct gtaskqueue_busy *tb; > + > + TQ_ASSERT_LOCKED(queue); > + TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { > + if (tb->tb_running == gtask) > + return (1); > + } > + return (0); > +} > + > +static int > +gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) > +{ > + > + if (gtask->ta_flags & TASK_ENQUEUED) > + STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); > + gtask->ta_flags &= ~TASK_ENQUEUED; > + return (task_is_running(queue, gtask) ? EBUSY : 0); > +} > + > +int > +gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) > +{ > + int error; > + > + TQ_LOCK(queue); > + error = gtaskqueue_cancel_locked(queue, gtask); > + TQ_UNLOCK(queue); > + > + return (error); > +} > + > +void > +gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) > +{ > + > + if (!queue->tq_spin) > + WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); > + > + TQ_LOCK(queue); > + while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) > + TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0); > + TQ_UNLOCK(queue); > +} > + > +void > +gtaskqueue_drain_all(struct gtaskqueue *queue) > +{ > + > + if (!queue->tq_spin) > + WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); > + > + TQ_LOCK(queue); > + gtaskqueue_drain_tq_queue(queue); > + gtaskqueue_drain_tq_active(queue); > + TQ_UNLOCK(queue); > +} > + > +static int > +_gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, > + cpuset_t *mask, const char *name, va_list ap) > +{ > + char ktname[MAXCOMLEN + 1]; > + struct thread *td; > + struct gtaskqueue *tq; > + int i, error; > + > + if (count <= 0) > + return (EINVAL); > + > + vsnprintf(ktname, sizeof(ktname), name, ap); > + tq = *tqp; > + > + tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE, > + M_NOWAIT | M_ZERO); > + if (tq->tq_threads == NULL) { > + printf("%s: no memory for %s threads\n", __func__, ktname); > + return (ENOMEM); > + } > + > + for (i = 0; i < count; i++) { > + if (count == 1) > + error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, > + &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); > + else > + error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, > + &tq->tq_threads[i], RFSTOPPED, 0, > + "%s_%d", ktname, i); > + if (error) { > + /* should be ok to continue, taskqueue_free will dtrt */ > + printf("%s: kthread_add(%s): error %d", __func__, > + ktname, error); > + tq->tq_threads[i] = NULL; /* paranoid */ > + } else > + tq->tq_tcount++; > + } > + for (i = 0; i < count; i++) { > + if (tq->tq_threads[i] == NULL) > + continue; > + td = tq->tq_threads[i]; > + if (mask) { > + error = cpuset_setthread(td->td_tid, mask); > + /* > + * Failing to pin is rarely an actual fatal error; > + * it'll just affect performance. > + */ > + if (error) > + printf("%s: curthread=%llu: can't pin; " > + "error=%d\n", > + __func__, > + (unsigned long long) td->td_tid, > + error); > + } > + thread_lock(td); > + sched_prio(td, pri); > + sched_add(td, SRQ_BORING); > + thread_unlock(td); > + } > + > + return (0); > +} > + > +static int > +gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, > + const char *name, ...) > +{ > + va_list ap; > + int error; > + > + va_start(ap, name); > + error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap); > + va_end(ap); > + return (error); > +} > + > +static inline void > +gtaskqueue_run_callback(struct gtaskqueue *tq, > + enum taskqueue_callback_type cb_type) > +{ > + taskqueue_callback_fn tq_callback; > + > + TQ_ASSERT_UNLOCKED(tq); > + tq_callback = tq->tq_callbacks[cb_type]; > + if (tq_callback != NULL) > + tq_callback(tq->tq_cb_contexts[cb_type]); > +} > + > +static void > +gtaskqueue_thread_loop(void *arg) > +{ > + struct gtaskqueue **tqp, *tq; > + > + tqp = arg; > + tq = *tqp; > + gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); > + TQ_LOCK(tq); > + while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { > + /* XXX ? */ > + gtaskqueue_run_locked(tq); > + /* > + * Because taskqueue_run() can drop tq_mutex, we need to > + * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the > + * meantime, which means we missed a wakeup. > + */ > + if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) > + break; > + TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); > + } > + gtaskqueue_run_locked(tq); > + /* > + * This thread is on its way out, so just drop the lock temporarily > + * in order to call the shutdown callback. This allows the callback > + * to look at the taskqueue, even just before it dies. > + */ > + TQ_UNLOCK(tq); > + gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); > + TQ_LOCK(tq); > + > + /* rendezvous with thread that asked us to terminate */ > + tq->tq_tcount--; > + wakeup_one(tq->tq_threads); > + TQ_UNLOCK(tq); > + kthread_exit(); > +} > + > +static void > +gtaskqueue_thread_enqueue(void *context) > +{ > + struct gtaskqueue **tqp, *tq; > + > + tqp = context; > + tq = *tqp; > + wakeup_one(tq); > +} > + > + > +static struct gtaskqueue * > +gtaskqueue_create_fast(const char *name, int mflags, > + taskqueue_enqueue_fn enqueue, void *context) > +{ > + return _gtaskqueue_create(name, mflags, enqueue, context, > + MTX_SPIN, "fast_taskqueue"); > +} > + > + > +struct taskqgroup_cpu { > + LIST_HEAD(, grouptask) tgc_tasks; > + struct gtaskqueue *tgc_taskq; > + int tgc_cnt; > + int tgc_cpu; > +}; > + > +struct taskqgroup { > + struct taskqgroup_cpu tqg_queue[MAXCPU]; > + struct mtx tqg_lock; > + char * tqg_name; > + int tqg_adjusting; > + int tqg_stride; > + int tqg_cnt; > +}; > + > +struct taskq_bind_task { > + struct gtask bt_task; > + int bt_cpuid; > +}; > + > +static void > +taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) > +{ > + struct taskqgroup_cpu *qcpu; > + > + qcpu = &qgroup->tqg_queue[idx]; > + LIST_INIT(&qcpu->tgc_tasks); > + qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, > + taskqueue_thread_enqueue, &qcpu->tgc_taskq); > + gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, > + "%s_%d", qgroup->tqg_name, idx); > + qcpu->tgc_cpu = idx * qgroup->tqg_stride; > +} > + > +static void > +taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) > +{ > + > + gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); > +} > + > +/* > + * Find the taskq with least # of tasks that doesn't currently have any > + * other queues from the uniq identifier. > + */ > +static int > +taskqgroup_find(struct taskqgroup *qgroup, void *uniq) > +{ > + struct grouptask *n; > + int i, idx, mincnt; > + int strict; > + > + mtx_assert(&qgroup->tqg_lock, MA_OWNED); > + if (qgroup->tqg_cnt == 0) > + return (0); > + idx = -1; > + mincnt = INT_MAX; > + /* > + * Two passes; First scan for a queue with the least tasks that > + * does not already service this uniq id. If that fails simply find > + * the queue with the least total tasks; > + */ > + for (strict = 1; mincnt == INT_MAX; strict = 0) { > + for (i = 0; i < qgroup->tqg_cnt; i++) { > + if (qgroup->tqg_queue[i].tgc_cnt > mincnt) > + continue; > + if (strict) { > + LIST_FOREACH(n, > + &qgroup->tqg_queue[i].tgc_tasks, gt_list) > + if (n->gt_uniq == uniq) > + break; > + if (n != NULL) > + continue; > + } > + mincnt = qgroup->tqg_queue[i].tgc_cnt; > + idx = i; > + } > + } > + if (idx == -1) > + panic("taskqgroup_find: Failed to pick a qid."); > + > + return (idx); > +} > + > +void > +taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, > + void *uniq, int irq, char *name) > +{ > + cpuset_t mask; > + int qid; > + > + gtask->gt_uniq = uniq; > + gtask->gt_name = name; > + gtask->gt_irq = irq; > + gtask->gt_cpu = -1; > + mtx_lock(&qgroup->tqg_lock); > + qid = taskqgroup_find(qgroup, uniq); > + qgroup->tqg_queue[qid].tgc_cnt++; > + LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); > + gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; > + if (irq != -1 && smp_started) { > + CPU_ZERO(&mask); > + CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); > + mtx_unlock(&qgroup->tqg_lock); > + intr_setaffinity(irq, &mask); > + } else > + mtx_unlock(&qgroup->tqg_lock); > +} > + > +int > +taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, > + void *uniq, int cpu, int irq, char *name) > +{ > + cpuset_t mask; > + int i, qid; > + > + qid = -1; > + gtask->gt_uniq = uniq; > + gtask->gt_name = name; > + gtask->gt_irq = irq; > + gtask->gt_cpu = cpu; > + mtx_lock(&qgroup->tqg_lock); > + if (smp_started) { > + for (i = 0; i < qgroup->tqg_cnt; i++) > + if (qgroup->tqg_queue[i].tgc_cpu == cpu) { > + qid = i; > + break; > + } > + if (qid == -1) { > + mtx_unlock(&qgroup->tqg_lock); > + return (EINVAL); > + } > + } else > + qid = 0; > + qgroup->tqg_queue[qid].tgc_cnt++; > + LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); > + gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; > + if (irq != -1 && smp_started) { > + CPU_ZERO(&mask); > + CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); > + mtx_unlock(&qgroup->tqg_lock); > + intr_setaffinity(irq, &mask); > + } else > + mtx_unlock(&qgroup->tqg_lock); > + return (0); > +} > + > +void > +taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) > +{ > + int i; > + > + mtx_lock(&qgroup->tqg_lock); > + for (i = 0; i < qgroup->tqg_cnt; i++) > + if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) > + break; > + if (i == qgroup->tqg_cnt) > + panic("taskqgroup_detach: task not in group\n"); > + qgroup->tqg_queue[i].tgc_cnt--; > + LIST_REMOVE(gtask, gt_list); > + mtx_unlock(&qgroup->tqg_lock); > + gtask->gt_taskqueue = NULL; > +} > + > +static void > +taskqgroup_binder(void *ctx) > +{ > + struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx; > + cpuset_t mask; > + int error; > + > + CPU_ZERO(&mask); > + CPU_SET(gtask->bt_cpuid, &mask); > + error = cpuset_setthread(curthread->td_tid, &mask); > + thread_lock(curthread); > + sched_bind(curthread, gtask->bt_cpuid); > + thread_unlock(curthread); > + > + if (error) > + printf("taskqgroup_binder: setaffinity failed: %d\n", > + error); > + free(gtask, M_DEVBUF); > +} > + > +static void > +taskqgroup_bind(struct taskqgroup *qgroup) > +{ > + struct taskq_bind_task *gtask; > + int i; > + > + /* > + * Bind taskqueue threads to specific CPUs, if they have been assigned > + * one. > + */ > + for (i = 0; i < qgroup->tqg_cnt; i++) { > + gtask = malloc(sizeof (*gtask), M_DEVBUF, M_NOWAIT); > + GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); > + gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; > + grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, > + >ask->bt_task); > + } > +} > + > +static int > +_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) > +{ > + LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); > + cpuset_t mask; > + struct grouptask *gtask; > + int i, old_cnt, qid; > + > + mtx_assert(&qgroup->tqg_lock, MA_OWNED); > + > + if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { > + printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", > + cnt, stride, mp_ncpus, smp_started); > + return (EINVAL); > + } > + if (qgroup->tqg_adjusting) { > + printf("taskqgroup_adjust failed: adjusting\n"); > + return (EBUSY); > + } > + qgroup->tqg_adjusting = 1; > + old_cnt = qgroup->tqg_cnt; > + mtx_unlock(&qgroup->tqg_lock); > + /* > + * Set up queue for tasks added before boot. > + */ > + if (old_cnt == 0) { > + LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, > + grouptask, gt_list); > + qgroup->tqg_queue[0].tgc_cnt = 0; > + } > + > + /* > + * If new taskq threads have been added. > + */ > + for (i = old_cnt; i < cnt; i++) > + taskqgroup_cpu_create(qgroup, i); > + mtx_lock(&qgroup->tqg_lock); > + qgroup->tqg_cnt = cnt; > + qgroup->tqg_stride = stride; > + > + /* > + * Adjust drivers to use new taskqs. > + */ > + for (i = 0; i < old_cnt; i++) { > + while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { > + LIST_REMOVE(gtask, gt_list); > + qgroup->tqg_queue[i].tgc_cnt--; > + LIST_INSERT_HEAD(>ask_head, gtask, gt_list); > + } > + } > + > + while ((gtask = LIST_FIRST(>ask_head))) { > + LIST_REMOVE(gtask, gt_list); > + if (gtask->gt_cpu == -1) > + qid = taskqgroup_find(qgroup, gtask->gt_uniq); > + else { > + for (i = 0; i < qgroup->tqg_cnt; i++) > + if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) { > + qid = i; > + break; > + } > + } > + qgroup->tqg_queue[qid].tgc_cnt++; > + LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, > + gt_list); > + gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; > + } > + /* > + * Set new CPU and IRQ affinity > + */ > + for (i = 0; i < cnt; i++) { > + qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride; > + CPU_ZERO(&mask); > + CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); > + LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { > + if (gtask->gt_irq == -1) > + continue; > + intr_setaffinity(gtask->gt_irq, &mask); > + } > + } > + mtx_unlock(&qgroup->tqg_lock); > + > + /* > + * If taskq thread count has been reduced. > + */ > + for (i = cnt; i < old_cnt; i++) > + taskqgroup_cpu_remove(qgroup, i); > + > + mtx_lock(&qgroup->tqg_lock); > + qgroup->tqg_adjusting = 0; > + > + taskqgroup_bind(qgroup); > + > + return (0); > +} > + > +int > +taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) > +{ > + int error; > + > + mtx_lock(&qgroup->tqg_lock); > + error = _taskqgroup_adjust(qgroup, cpu, stride); > + mtx_unlock(&qgroup->tqg_lock); > + > + return (error); > +} > + > +struct taskqgroup * > +taskqgroup_create(char *name) > +{ > + struct taskqgroup *qgroup; > + > + qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); > + mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); > + qgroup->tqg_name = name; > + LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); > + > + return (qgroup); > +} > + > +void > +taskqgroup_destroy(struct taskqgroup *qgroup) > +{ > + > +} > > Modified: head/sys/kern/subr_taskqueue.c > ============================================================================== > --- head/sys/kern/subr_taskqueue.c Fri Aug 12 20:33:23 2016 (r304020) > +++ head/sys/kern/subr_taskqueue.c Fri Aug 12 21:29:44 2016 (r304021) > @@ -261,22 +261,6 @@ taskqueue_enqueue_locked(struct taskqueu > } > > int > -grouptaskqueue_enqueue(struct taskqueue *queue, struct task *task) > -{ > - TQ_LOCK(queue); > - if (task->ta_pending) { > - TQ_UNLOCK(queue); > - return (0); > - } > - STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); > - task->ta_pending = 1; > - TQ_UNLOCK(queue); > - if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) > - queue->tq_enqueue(queue->tq_context); > - return (0); > -} > - > -int > taskqueue_enqueue(struct taskqueue *queue, struct task *task) > { > int res; > @@ -806,347 +790,3 @@ taskqueue_member(struct taskqueue *queue > } > return (ret); > } > - > -struct taskqgroup_cpu { > - LIST_HEAD(, grouptask) tgc_tasks; > - struct taskqueue *tgc_taskq; > - int tgc_cnt; > - int tgc_cpu; > -}; > - > -struct taskqgroup { > - struct taskqgroup_cpu tqg_queue[MAXCPU]; > - struct mtx tqg_lock; > - char * tqg_name; > - int tqg_adjusting; > - int tqg_stride; > - int tqg_cnt; > -}; > - > -struct taskq_bind_task { > - struct task bt_task; > - int bt_cpuid; > -}; > - > -static void > -taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) > -{ > - struct taskqgroup_cpu *qcpu; > - int i, j; > - > - qcpu = &qgroup->tqg_queue[idx]; > - LIST_INIT(&qcpu->tgc_tasks); > - qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK, > - taskqueue_thread_enqueue, &qcpu->tgc_taskq); > - taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, > - "%s_%d", qgroup->tqg_name, idx); > - > - for (i = CPU_FIRST(), j = 0; j < idx * qgroup->tqg_stride; > - j++, i = CPU_NEXT(i)) { > - /* > - * Wait: evaluate the idx * qgroup->tqg_stride'th CPU, > - * potentially wrapping the actual count > - */ > - } > - qcpu->tgc_cpu = i; > -} > - > -static void > -taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) > -{ > - > - taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); > -} > - > -/* > - * Find the taskq with least # of tasks that doesn't currently have any > - * other queues from the uniq identifier. > - */ > -static int > -taskqgroup_find(struct taskqgroup *qgroup, void *uniq) > -{ > - struct grouptask *n; > - int i, idx, mincnt; > - int strict; > - > - mtx_assert(&qgroup->tqg_lock, MA_OWNED); > - if (qgroup->tqg_cnt == 0) > - return (0); > - idx = -1; > - mincnt = INT_MAX; > - /* > - * Two passes; First scan for a queue with the least tasks that > - * does not already service this uniq id. If that fails simply find > - * the queue with the least total tasks; > - */ > - for (strict = 1; mincnt == INT_MAX; strict = 0) { > - for (i = 0; i < qgroup->tqg_cnt; i++) { > - if (qgroup->tqg_queue[i].tgc_cnt > mincnt) > - continue; > - if (strict) { > - LIST_FOREACH(n, > - &qgroup->tqg_queue[i].tgc_tasks, gt_list) > - if (n->gt_uniq == uniq) > - break; > - if (n != NULL) > - continue; > - } > - mincnt = qgroup->tqg_queue[i].tgc_cnt; > - idx = i; > - } > - } > - if (idx == -1) > - panic("taskqgroup_find: Failed to pick a qid."); > - > - return (idx); > -} > > *** DIFF OUTPUT TRUNCATED AT 1000 LINES *** >