Skip site navigation (1)Skip section navigation (2)
Date:      Fri, 10 Aug 2012 08:19:22 +0000 (UTC)
From:      Edward Tomasz Napierala <trasz@FreeBSD.org>
To:        ports-committers@freebsd.org, svn-ports-all@freebsd.org, svn-ports-head@freebsd.org
Subject:   svn commit: r302379 - in head/net/openvswitch: . files
Message-ID:  <201208100819.q7A8JMcK010452@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: trasz
Date: Fri Aug 10 08:19:22 2012
New Revision: 302379
URL: http://svn.freebsd.org/changeset/ports/302379

Log:
  Add optional threading support; disabled by default.

Added:
  head/net/openvswitch/files/threaded.diff   (contents, props changed)
Modified:
  head/net/openvswitch/Makefile

Modified: head/net/openvswitch/Makefile
==============================================================================
--- head/net/openvswitch/Makefile	Fri Aug 10 08:08:27 2012	(r302378)
+++ head/net/openvswitch/Makefile	Fri Aug 10 08:19:22 2012	(r302379)
@@ -30,20 +30,30 @@ MAN8=		ovs-appctl.8 ovs-brcompatd.8 ovs-
 		ovs-test.8 ovs-vsctl.8 ovs-vswitchd.8 \
 		ovs-vlan-bug-workaround.8 ovs-vlan-test.8
 
+OPTIONS_DEFINE=	THREADED
+THREADED_DESC=	Experimental high-performance threading patch
+
 .include <bsd.port.pre.mk>
 
 .if ${OSVERSION} < 800000
 BROKEN=		does not compile
 .endif
 
+.if ${PORT_OPTIONS:MTHREADED}
+CONFIGURE_ARGS+=--enable-threaded=yes
+.endif
+
 AUTOTOOLSFILES=	aclocal.m4
 
 post-patch:
 	@${REINPLACE_CMD} -e 's|1.11.1|%%AUTOMAKE_APIVER%%|g' \
 			  -e 's|2.65|%%AUTOCONF_VERSION%%|g' \
 			  ${WRKSRC}/aclocal.m4
-	# Workaround for a makefile bug; if it builds without this line, remove it.
-	#${TOUCH} ${WRKSRC}/INSTALL
+.if ${PORT_OPTIONS:MTHREADED}
+	@# We can't use EXTRA_PATCHES, since we need to apply this one
+	@# after files/patch-bsd-netdef.diff, not before.
+	${PATCH} ${PATCH_ARGS} < ${FILESDIR}/threaded.diff
+.endif
 
 post-install:
 	${INSTALL_DATA} ${WRKSRC}/vswitchd/vswitch.ovsschema ${PREFIX}/share/openvswitch/

Added: head/net/openvswitch/files/threaded.diff
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/net/openvswitch/files/threaded.diff	Fri Aug 10 08:19:22 2012	(r302379)
@@ -0,0 +1,1305 @@
+diff --git configure.ac configure.ac
+index 5692b86..ff62627 100644
+--- configure.ac
++++ configure.ac
+@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt])
+ AC_SEARCH_LIBS([timer_create], [rt])
+ AC_SEARCH_LIBS([pcap_open_live], [pcap])
+ 
++OVS_CHECK_THREADED
+ OVS_CHECK_COVERAGE
+ OVS_CHECK_NDEBUG
+ OVS_CHECK_NETLINK
+diff --git lib/automake.mk lib/automake.mk
+index 13622b3..87bdd8d 100644
+--- lib/automake.mk
++++ lib/automake.mk
+@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \
+ 	lib/daemon.c \
+ 	lib/daemon.h \
+ 	lib/dhcp.h \
++	lib/dispatch.h \
+ 	lib/dummy.c \
+ 	lib/dummy.h \
+ 	lib/dhparams.h \
+diff --git lib/dispatch.h lib/dispatch.h
+new file mode 100644
+index 0000000..80ac9c7
+--- /dev/null
++++ lib/dispatch.h
+@@ -0,0 +1,9 @@
++#include <sys/types.h>
++#include "ofpbuf.h"
++
++#ifndef DISPATCH_H
++#define DISPATCH_H 1
++
++typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf);
++
++#endif /* DISPATCH_H */
+diff --git lib/dpif-netdev.c lib/dpif-netdev.c
+index cade79e..509e2ef 100644
+--- lib/dpif-netdev.c
++++ lib/dpif-netdev.c
+@@ -32,6 +32,15 @@
+ #include <sys/stat.h>
+ #include <unistd.h>
+ 
++#ifdef THREADED
++#include <signal.h>
++#include <pthread.h>
++
++#include "socket-util.h"
++#include "fatal-signal.h"
++#include "dispatch.h"
++#endif
++
+ #include "csum.h"
+ #include "dpif.h"
+ #include "dpif-provider.h"
+@@ -55,6 +64,16 @@
+ #include "vlog.h"
+ 
+ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
++/* We could use these macros instead of using #ifdef and #endif every time we
++ * need to call the pthread_mutex_lock/unlock.
++#ifdef THREADED
++#define LOCK(mutex) pthread_mutex_lock(mutex)
++#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
++#else
++#define LOCK(mutex)
++#define UNLOCK(mutex)
++#endif
++*/
+ 
+ /* Configuration parameters. */
+ enum { MAX_PORTS = 256 };       /* Maximum number of ports. */
+@@ -82,6 +101,21 @@ struct dp_netdev {
+     int open_cnt;
+     bool destroyed;
+ 
++#ifdef THREADED
++    /* The pipe is used to signal the presence of a packet on the queue.
++     * - dpif_netdev_recv_wait() waits on p[0]
++     * - dpif_netdev_recv() extract from queue and read p[0]
++     * - dp_netdev_output_control() send to queue and write p[1]
++     */
++
++    int pipe[2];    /* signal a packet on the queue */
++    struct pollfd *pipe_fd;
++
++    pthread_mutex_t table_mutex;    /* mutex for the flow table */
++    pthread_mutex_t port_list_mutex;    /* port list mutex */
++
++    /* The access to this queue is protected by the table_mutex mutex */
++#endif
+     struct dp_netdev_queue queues[N_QUEUES];
+     struct hmap flow_table;     /* Flow table. */
+ 
+@@ -102,6 +136,9 @@ struct dp_netdev_port {
+     struct list node;           /* Element in dp_netdev's 'port_list'. */
+     struct netdev *netdev;
+     char *type;                 /* Port type as requested by user. */
++#ifdef THREADED
++    struct pollfd *poll_fd;     /* To manage the poll loop in the thread. */
++#endif
+ };
+ 
+ /* A flow in dp_netdev's 'flow_table'. */
+@@ -127,6 +164,11 @@ struct dpif_netdev {
+     unsigned int dp_serial;
+ };
+ 
++#ifdef THREADED
++/* XXX global Descriptor of the thread that manages the datapaths. */
++pthread_t thread_p;
++#endif
++
+ /* All netdev-based datapaths. */
+ static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs);
+ 
+@@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
+     dp->class = class;
+     dp->name = xstrdup(name);
+     dp->open_cnt = 0;
++#ifdef THREADED
++    error = pipe(dp->pipe);
++    if (error) {
++        VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno));
++        return errno;
++    }
++    if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
++        VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s",
++                 strerror(errno));
++        return errno;
++    }
++    dp->pipe_fd = NULL;
++    VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]);
++
++    pthread_mutex_init(&dp->table_mutex, NULL);
++    pthread_mutex_init(&dp->port_list_mutex, NULL);
++#endif
+     for (i = 0; i < N_QUEUES; i++) {
+         dp->queues[i].head = dp->queues[i].tail = 0;
+     }
+@@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
+     return 0;
+ }
+ 
++#ifdef THREADED
++static void * dp_thread_body(void *args OVS_UNUSED);
++
++/* This is the function that is called in response of a fatal signal (e.g.
++ * SIGTERM) */
++static void
++dpif_netdev_exit_hook(void *aux OVS_UNUSED)
++{
++    if (pthread_cancel(thread_p) == 0) {
++        pthread_join(thread_p, NULL);
++    }
++}
++
++static int
++dpif_netdev_init(void)
++{
++    static int error = -1;
++
++    if (error < 0) {
++        fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true);
++        error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
++        if (error != 0) {
++            VLOG_ERR("Unable to create datapath thread: %s", strerror(errno));
++            error = errno;
++        } else {
++            VLOG_DBG("Datapath thread started");
++        }
++    }
++    return error;
++}
++#endif
++
+ static int
+ dpif_netdev_open(const struct dpif_class *class, const char *name,
+                  bool create, struct dpif **dpifp)
+@@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
+     }
+ 
+     *dpifp = create_dpif_netdev(dp);
++#ifdef THREADED
++    dpif_netdev_init();
++#endif
+     return 0;
+ }
+ 
++/* table_mutex must be locked in THREADED mode.
++ */
+ static void
+ dp_netdev_purge_queues(struct dp_netdev *dp)
+ {
+@@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp)
+     struct dp_netdev_port *port, *next;
+ 
+     dp_netdev_flow_flush(dp);
++#ifdef THREADED
++    pthread_mutex_lock(&dp->port_list_mutex);
++#endif
+     LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) {
+         do_del_port(dp, port->port_no);
+     }
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->port_list_mutex);
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
+     dp_netdev_purge_queues(dp);
+     hmap_destroy(&dp->flow_table);
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++    pthread_mutex_destroy(&dp->table_mutex);
++    pthread_mutex_destroy(&dp->port_list_mutex);
++#endif
+     free(dp->name);
+     free(dp);
+ }
+@@ -306,7 +414,13 @@ static int
+ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
+ {
+     struct dp_netdev *dp = get_dp_netdev(dpif);
++#ifdef THREADED
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
+     stats->n_flows = hmap_count(&dp->flow_table);
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++#endif
+     stats->n_hit = dp->n_hit;
+     stats->n_missed = dp->n_missed;
+     stats->n_lost = dp->n_lost;
+@@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
+     port->port_no = port_no;
+     port->netdev = netdev;
+     port->type = xstrdup(type);
++#ifdef THREADED
++    port->poll_fd = NULL;
++#endif
+ 
+     error = netdev_get_mtu(netdev, &mtu);
+     if (!error) {
+         max_mtu = mtu;
+     }
+ 
++#ifdef THREADED
++    pthread_mutex_lock(&dp->port_list_mutex);
++#endif
+     list_push_back(&dp->port_list, &port->node);
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->port_list_mutex);
++#endif
+     dp->ports[port_no] = port;
+     dp->serial++;
+ 
+@@ -448,15 +571,25 @@ get_port_by_name(struct dp_netdev *dp,
+ {
+     struct dp_netdev_port *port;
+ 
++#ifdef THREADED
++    pthread_mutex_lock(&dp->port_list_mutex);
++#endif
+     LIST_FOR_EACH (port, node, &dp->port_list) {
+         if (!strcmp(netdev_get_name(port->netdev), devname)) {
+             *portp = port;
++#ifdef THREADED
++            pthread_mutex_unlock(&dp->port_list_mutex);
++#endif
+             return 0;
+         }
+     }
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->port_list_mutex);
++#endif
+     return ENOENT;
+ }
+ 
++/* In THREADED mode, must be called with port_list_mutex held. */
+ static int
+ do_del_port(struct dp_netdev *dp, uint16_t port_no)
+ {
+@@ -531,7 +664,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED)
+ static void
+ dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
+ {
++#ifdef THREADED
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
+     hmap_remove(&dp->flow_table, &flow->node);
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++#endif
+     free(flow->actions);
+     free(flow);
+ }
+@@ -620,7 +759,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_)
+ }
+ 
+ static struct dp_netdev_flow *
+-dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
++#ifdef THREADED
++dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key)
++#else
++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
++#endif
+ {
+     struct dp_netdev_flow *flow;
+ 
+@@ -632,6 +775,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
+     return NULL;
+ }
+ 
++#ifdef THREADED
++static struct dp_netdev_flow *
++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
++{
++    struct dp_netdev_flow *flow;
++
++    pthread_mutex_lock(&dp->table_mutex);
++    flow = dp_netdev_lookup_flow_locked(dp, key);
++    pthread_mutex_unlock(&dp->table_mutex);
++    return flow;
++}
++#endif
++
+ static void
+ get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats)
+ {
+@@ -729,7 +885,13 @@ add_flow(struct dpif *dpif, const struct flow *key,
+         return error;
+     }
+ 
++#ifdef THREADED
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
+     hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++#endif
+     return 0;
+ }
+ 
+@@ -749,6 +911,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
+     struct dp_netdev_flow *flow;
+     struct flow key;
+     int error;
++    int n_flows;
+ 
+     error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key);
+     if (error) {
+@@ -758,7 +921,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
+     flow = dp_netdev_lookup_flow(dp, &key);
+     if (!flow) {
+         if (put->flags & DPIF_FP_CREATE) {
+-            if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
++#ifdef THREADED
++            pthread_mutex_lock(&dp->table_mutex);
++#endif
++            n_flows = hmap_count(&dp->flow_table);
++#ifdef THREADED
++            pthread_mutex_unlock(&dp->table_mutex);
++#endif
++            if (n_flows < MAX_FLOWS) {
+                 if (put->stats) {
+                     memset(put->stats, 0, sizeof *put->stats);
+                 }
+@@ -843,7 +1013,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_,
+     struct dp_netdev_flow *flow;
+     struct hmap_node *node;
+ 
++#ifdef THREADED
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
+     node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset);
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++#endif
+     if (!node) {
+         return EOF;
+     }
+@@ -949,7 +1125,13 @@ static int
+ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
+                  struct ofpbuf *buf)
+ {
+-    struct dp_netdev_queue *q = find_nonempty_queue(dpif);
++    struct dp_netdev_queue *q;
++#ifdef THREADED
++    struct dp_netdev *dp = get_dp_netdev(dpif);
++    char c;
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
++    q = find_nonempty_queue(dpif);
+     if (q) {
+         struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK];
+         *upcall = *u;
+@@ -958,8 +1140,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
+         ofpbuf_uninit(buf);
+         *buf = *upcall->packet;
+ 
++#ifdef THREADED
++        /* Read a byte from the pipe to signal that a packet has been
++         * received. */
++        if (read(dp->pipe[0], &c, 1) < 0) {
++            VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno));
++        }
++        pthread_mutex_unlock(&dp->table_mutex);
++#endif
+         return 0;
+     } else {
++#ifdef THREADED
++        pthread_mutex_unlock(&dp->table_mutex);
++#endif
+         return EAGAIN;
+     }
+ }
+@@ -967,19 +1160,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
+ static void
+ dpif_netdev_recv_wait(struct dpif *dpif)
+ {
++#ifdef THREADED
++    struct dp_netdev *dp = get_dp_netdev(dpif);
++
++    poll_fd_wait(dp->pipe[0], POLLIN);
++#else
+     if (find_nonempty_queue(dpif)) {
+         poll_immediate_wake();
+     } else {
+         /* No messages ready to be received, and dp_wait() will ensure that we
+          * wake up to queue new messages, so there is nothing to do. */
+     }
++#endif
+ }
+ 
+ static void
+ dpif_netdev_recv_purge(struct dpif *dpif)
+ {
+     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
++#ifdef THREADED
++    struct dp_netdev *dp = get_dp_netdev(dpif);
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
+     dp_netdev_purge_queues(dpif_netdev->dp);
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ }
+ 
+ static void
+@@ -1003,7 +1209,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
+         return;
+     }
+     flow_extract(packet, 0, 0, port->port_no, &key);
++#ifdef THREADED
++    pthread_mutex_lock(&dp->table_mutex);
++    flow = dp_netdev_lookup_flow_locked(dp, &key);
++#else
+     flow = dp_netdev_lookup_flow(dp, &key);
++#endif
+     if (flow) {
+         dp_netdev_flow_used(flow, &key, packet);
+         dp_netdev_execute_actions(dp, packet, &key,
+@@ -1013,8 +1224,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
+         dp->n_missed++;
+         dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0);
+     }
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ }
+ 
++#ifdef THREADED
++static void
++dpif_netdev_run(struct dpif *dpif OVS_UNUSED)
++{
++}
++
++static void
++dpif_netdev_wait(struct dpif *dpif OVS_UNUSED)
++{
++}
++#else
+ static void
+ dpif_netdev_run(struct dpif *dpif)
+ {
+@@ -1053,6 +1278,144 @@ dpif_netdev_wait(struct dpif *dpif)
+         netdev_recv_wait(port->netdev);
+     }
+ }
++#endif
++
++#ifdef THREADED
++/*
++ * pcap callback argument
++ */
++struct dispatch_arg {
++    struct dp_netdev *dp;   /* update statistics */
++    struct dp_netdev_port *port;    /* argument to flow identifier function */
++};
++
++/* Process a packet.
++ *
++ * The port_input function will send immediately if it finds a flow match and
++ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
++ * If a flow is not found or for the other actions, the packet is copied.
++ */
++static void
++process_pkt(u_char *user, struct ofpbuf *buf)
++{
++    struct dispatch_arg *arg = (struct dispatch_arg *)user;
++
++    ofpbuf_padto(buf, ETH_TOTAL_MIN);
++    dp_netdev_port_input(arg->dp, arg->port, buf);
++}
++
++/* Body of the thread that manages the datapaths */
++static void*
++dp_thread_body(void *args OVS_UNUSED)
++{
++    struct dp_netdev *dp;
++    struct dp_netdev_port *port;
++    struct dispatch_arg arg;
++    int error;
++    int n_fds;
++    uint32_t batch = 50; /* max number of pkts processed by the dispatch */
++    int processed;     /* actual number of pkts processed by the dispatch */
++    char readbuf[1024];
++
++    sigset_t sigmask;
++
++    /*XXX Since the poll involves all ports of all datapaths, the right fds
++     * size should be MAX_PORTS * max_number_of_datapaths */
++    struct pollfd fds[MAX_PORTS + 1]; 
++    
++    /* mask the fatal signals. In this way the main thread is delegate to
++     * manage this them. */
++    sigemptyset(&sigmask);
++    sigaddset(&sigmask, SIGTERM);
++    sigaddset(&sigmask, SIGALRM);
++    sigaddset(&sigmask, SIGINT);
++    sigaddset(&sigmask, SIGHUP);
++
++    if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
++        VLOG_ERR("Error setting thread sigmask: %s", strerror(errno));
++    }
++
++    for(;;) {
++        struct shash_node *node;
++        n_fds = 0;
++        /* build the structure for poll */
++        SHASH_FOR_EACH(node, &dp_netdevs) {
++            dp = (struct dp_netdev *)node->data;
++            fds[n_fds].fd = dp->pipe[1];
++            fds[n_fds].events = POLLIN;
++            dp->pipe_fd = &fds[n_fds];
++            n_fds++;
++            if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
++                VLOG_ERR("Too many fds for poll adding pipe_fd");
++                break;
++            }
++            pthread_mutex_lock(&dp->port_list_mutex);
++            LIST_FOR_EACH (port, node, &dp->port_list) {
++                /* insert an element in the fds structure */
++                fds[n_fds].fd = netdev_get_fd(port->netdev);
++                fds[n_fds].events = POLLIN;
++                port->poll_fd = &fds[n_fds];
++                n_fds++;
++                if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
++                    VLOG_ERR("Too many fds for poll adding port fd");
++                    break;
++                }
++            }
++            pthread_mutex_unlock(&dp->port_list_mutex);
++        }
++
++        error = poll(fds, n_fds, 2000);
++        VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error);
++
++        if (error < 0) {
++            if (errno == EINTR) {
++                /* XXX get this case in detach mode */
++                continue;
++            }
++            VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno));
++            /* XXX terminating the thread is probably not right */
++            break;
++        }
++        pthread_testcancel();
++
++        SHASH_FOR_EACH (node, &dp_netdevs) {
++            dp = (struct dp_netdev *)node->data;
++            if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) {
++                VLOG_DBG("Signalled from main thread");
++                while ((error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0)
++                        ;
++                if (error < 0 && errno != EAGAIN) {
++                    VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno));
++                }
++            }
++            arg.dp = dp;
++            pthread_mutex_lock(&dp->port_list_mutex);
++            LIST_FOR_EACH (port, node, &dp->port_list) {
++                arg.port = port;
++                if (port->poll_fd) {
++                    VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents);
++                }
++                if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
++                    /* call the dispatch and process the packet into
++                     * its callback. We process 'batch' packets at time */
++                    processed = netdev_dispatch(port->netdev, batch,
++                                         process_pkt, (u_char *)&arg);
++                    if (processed < 0) { /* pcap returns error */
++                        static struct vlog_rate_limit rl =
++                            VLOG_RATE_LIMIT_INIT(1, 5);
++                        VLOG_ERR_RL(&rl, 
++                                "error receiving data from XXX \n"); 
++                    }
++                } /* end of if poll */
++            } /* end of port loop */
++            pthread_mutex_unlock(&dp->port_list_mutex);
++        } /* end of dp loop */
++    } /* for ;; */
++
++    return NULL;
++}
++
++#endif /* THREADED */
+ 
+ static void
+ dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key)
+@@ -1068,11 +1431,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
+                       uint16_t out_port)
+ {
+     struct dp_netdev_port *p = dp->ports[out_port];
++    char c = 0;
++
+     if (p) {
+         netdev_send(p->netdev, packet);
++#ifdef THREADED
++        if (write(dp->pipe[0], &c, 1) < 0) {
++            VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno));
++        }
++#endif
+     }
+ }
+ 
++/* In THREADED mode, must be called with table_lock_mutex held. */
+ static int
+ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
+                          int queue_no, const struct flow *flow, uint64_t arg)
+@@ -1081,6 +1452,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
+     struct dpif_upcall *upcall;
+     struct ofpbuf *buf;
+     size_t key_len;
++#ifdef THREADED
++    char c = 0;
++#endif
+ 
+     if (q->head - q->tail >= MAX_QUEUE_LEN) {
+         dp->n_lost++;
+@@ -1102,6 +1476,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
+     upcall->userdata = arg;
+ 
+     q->upcalls[q->head++ & QUEUE_MASK] = upcall;
++#ifdef THREADED
++    /* Write a byte on the pipe to advertise that a packet is ready. */
++    if (write(dp->pipe[1], &c, 1) < 0) {
++        VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno));
++    }
++#endif
+ 
+     return 0;
+ }
+@@ -1150,7 +1530,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp,
+ 
+     userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
+     userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0;
++#ifdef THREADED
++    pthread_mutex_lock(&dp->table_mutex);
++#endif
+     dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata);
++#ifdef THREADED
++    pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ }
+ 
+ static void
+diff --git lib/netdev-bsd.c lib/netdev-bsd.c
+index 0b1a37c..ff79367 100644
+--- lib/netdev-bsd.c
++++ lib/netdev-bsd.c
+@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_)
+     }
+ }
+ 
++#ifdef THREADED
++
++struct dispatch_arg {
++    pkt_handler h;
++    u_char *user;
++};
++
++static void
++dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata)
++{
++    struct ofpbuf buf;
++    struct dispatch_arg *parg = (struct dispatch_arg*)user;
++
++    ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen);
++    buf.size = phdr->caplen;
++    (*parg->h)(parg->user, &buf);
++    ofpbuf_uninit(&buf);
++}
++
++static int 
++netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, 
++                           u_char *user)
++{
++    int ret;
++    struct dispatch_arg arg;
++
++    arg.h = h;
++    arg.user = user;
++    ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg);
++    return ret;
++}
++
++static int 
++netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, 
++                        u_char *user)
++{
++    int ret;
++    int i;
++    const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
++    OFPBUF_STACK_BUFFER(buf_, size);
++
++    struct ofpbuf buf;
++    ofpbuf_use_stub(&buf, buf_, size);
++    for (i = 0; i < batch; i++) {
++        ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf));
++        if (ret >= 0) {
++            buf.size += ret;
++            h(user, &buf);
++        } else if (ret != -EAGAIN) {
++            return -1; 
++        } else { /* ret = EAGAIN */
++            break;
++        }
++        ofpbuf_clear(&buf);
++    }
++    ofpbuf_uninit(&buf);
++    return i;
++}
++
++static int
++netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, 
++                    u_char *user)
++{
++    struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
++    struct netdev_dev_bsd * netdev_dev =
++        netdev_dev_bsd_cast(netdev_get_dev(netdev_));
++
++    if (!strcmp(netdev_get_type(netdev_), "tap") && 
++            netdev->netdev_fd == netdev_dev->tap_fd) {
++        return netdev_bsd_dispatch_tap(netdev, batch, h, user);
++    } else {
++        return netdev_bsd_dispatch_system(netdev, batch, h, user);
++    }
++}
++
++static int
++netdev_bsd_get_fd(struct netdev *netdev_)
++{
++    struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
++    return netdev->netdev_fd;
++}
++#endif
++
+ /* Discards all packets waiting to be received from 'netdev'. */
+ static int
+ netdev_bsd_drain(struct netdev *netdev_)
+@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = {
+ 
+     netdev_bsd_recv,
+     netdev_bsd_recv_wait,
++#ifdef THREADED
++    netdev_bsd_dispatch,
++    netdev_bsd_get_fd,
++#endif
+     netdev_bsd_drain,
+ 
+     netdev_bsd_send,
+@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = {
+ 
+     netdev_bsd_recv,
+     netdev_bsd_recv_wait,
++#ifdef THREADED
++    netdev_bsd_dispatch,
++    netdev_bsd_get_fd,
++#endif
+     netdev_bsd_drain,
+ 
+     netdev_bsd_send,
+diff --git lib/netdev-dummy.c lib/netdev-dummy.c
+index b8c23c5..4e4801c 100644
+--- lib/netdev-dummy.c
++++ lib/netdev-dummy.c
+@@ -20,6 +20,12 @@
+ 
+ #include <errno.h>
+ 
++#ifdef THREADED
++#include <pthread.h>
++#include <unistd.h>
++#include "socket-util.h"
++#endif
++
+ #include "flow.h"
+ #include "list.h"
+ #include "netdev-provider.h"
+@@ -51,6 +57,10 @@ struct netdev_dummy {
+     struct list node;           /* In netdev_dev_dummy's "devs" list. */
+     struct list recv_queue;
+     bool listening;
++#ifdef THREADED
++    pthread_mutex_t queue_mutex;
++    int s_pipe[2];		/* used to signal packet arrivals */
++#endif
+ };
+ 
+ static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs);
+@@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp)
+ {
+     struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_);
+     struct netdev_dummy *netdev;
++#ifdef THREADED
++    int error;
++#endif
+ 
+     netdev = xmalloc(sizeof *netdev);
+     netdev_init(&netdev->netdev, netdev_dev_);
+     list_init(&netdev->recv_queue);
+     netdev->listening = false;
++#ifdef THREADED
++    error = pipe(netdev->s_pipe);
++    if (error) {
++        VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno));
++        free(netdev);
++        return errno;
++    }
++    if (set_nonblocking(netdev->s_pipe[0]) ||
++        set_nonblocking(netdev->s_pipe[1])) {
++        VLOG_ERR("Unable to set nonblocking on dummy pipe: %s",
++                 strerror(errno));
++        free(netdev);
++        return errno;
++    }
++    pthread_mutex_init(&netdev->queue_mutex, NULL);
++#endif
+ 
+     *netdevp = &netdev->netdev;
+     list_push_back(&netdev_dev->devs, &netdev->node);
+@@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_)
+     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+     list_remove(&netdev->node);
+     ofpbuf_list_delete(&netdev->recv_queue);
++#ifdef THREADED
++    if (netdev->listening) {
++	    close(netdev->s_pipe[0]);
++	    close(netdev->s_pipe[1]);
++    }
++    pthread_mutex_destroy(&netdev->queue_mutex);
++#endif
+     free(netdev);
+ }
+ 
+@@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size)
+     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+     struct ofpbuf *packet;
+     size_t packet_size;
++#ifdef THREADED
++    char c;
++#endif
+ 
++#ifdef THREADED
++    pthread_mutex_lock(&netdev->queue_mutex);
++#endif
+     if (list_is_empty(&netdev->recv_queue)) {
++#ifdef THREADED
++        pthread_mutex_unlock(&netdev->queue_mutex);
++#endif
+         return -EAGAIN;
+     }
++#ifdef THREADED
++    if (read(netdev->s_pipe[0], &c, 1) < 0) {
++        VLOG_ERR("Error reading dummy pipe: %s", strerror(errno));
++    }
++#endif
+ 
+     packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
++#ifdef THREADED
++    pthread_mutex_unlock(&netdev->queue_mutex);
++#endif
+     if (packet->size > size) {
+         return -EMSGSIZE;
+     }
+@@ -179,11 +232,60 @@ static void
+ netdev_dummy_recv_wait(struct netdev *netdev_)
+ {
+     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+-    if (!list_is_empty(&netdev->recv_queue)) {
++    int empty;
++
++#ifdef THREADED
++    pthread_mutex_lock(&netdev->queue_mutex);
++#endif
++    empty = list_is_empty(&netdev->recv_queue);
++#ifdef THREADED
++    pthread_mutex_unlock(&netdev->queue_mutex);
++#endif
++    if (!empty) {
+         poll_immediate_wake();
+     }
+ }
+ 
++#ifdef THREADED
++static int
++netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
++                      u_char *user)
++{
++    int i;
++    struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
++    struct ofpbuf *packet;
++    VLOG_DBG("dispatch %d", batch);
++    
++    for (i = 0; i < batch; i++) {
++        char c;
++        if (read(netdev->s_pipe[0], &c, 1) < 0) {
++            if (errno == EAGAIN)
++                break;
++            VLOG_ERR("%s: error reading from the pipe: %s",
++                netdev_get_name(netdev_), strerror(errno));
++            return -1;
++        }
++        pthread_mutex_lock(&netdev->queue_mutex);
++        if (list_is_empty(&netdev->recv_queue)) {
++            pthread_mutex_unlock(&netdev->queue_mutex);
++            return -EAGAIN;
++        }
++        packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
++        pthread_mutex_unlock(&netdev->queue_mutex);
++        h(user, packet);
++        ofpbuf_delete(packet);
++    }
++    return i;
++}
++
++static int
++netdev_dummy_get_fd(struct netdev *netdev_)

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201208100819.q7A8JMcK010452>