From owner-svn-ports-all@FreeBSD.ORG Fri Aug 10 08:19:23 2012 Return-Path: Delivered-To: svn-ports-all@freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2001:4f8:fff6::34]) by hub.freebsd.org (Postfix) with ESMTP id 25DBF1065672; Fri, 10 Aug 2012 08:19:22 +0000 (UTC) (envelope-from trasz@FreeBSD.org) Received: from svn.freebsd.org (svn.freebsd.org [IPv6:2001:4f8:fff6::2c]) by mx1.freebsd.org (Postfix) with ESMTP id C75278FC0C; Fri, 10 Aug 2012 08:19:22 +0000 (UTC) Received: from svn.freebsd.org (localhost [127.0.0.1]) by svn.freebsd.org (8.14.4/8.14.4) with ESMTP id q7A8JMUK010455; Fri, 10 Aug 2012 08:19:22 GMT (envelope-from trasz@svn.freebsd.org) Received: (from trasz@localhost) by svn.freebsd.org (8.14.4/8.14.4/Submit) id q7A8JMcK010452; Fri, 10 Aug 2012 08:19:22 GMT (envelope-from trasz@svn.freebsd.org) Message-Id: <201208100819.q7A8JMcK010452@svn.freebsd.org> From: Edward Tomasz Napierala Date: Fri, 10 Aug 2012 08:19:22 +0000 (UTC) To: ports-committers@freebsd.org, svn-ports-all@freebsd.org, svn-ports-head@freebsd.org X-SVN-Group: ports-head MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cc: Subject: svn commit: r302379 - in head/net/openvswitch: . files X-BeenThere: svn-ports-all@freebsd.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: SVN commit messages for the ports tree List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 10 Aug 2012 08:19:23 -0000 Author: trasz Date: Fri Aug 10 08:19:22 2012 New Revision: 302379 URL: http://svn.freebsd.org/changeset/ports/302379 Log: Add optional threading support; disabled by default. Added: head/net/openvswitch/files/threaded.diff (contents, props changed) Modified: head/net/openvswitch/Makefile Modified: head/net/openvswitch/Makefile ============================================================================== --- head/net/openvswitch/Makefile Fri Aug 10 08:08:27 2012 (r302378) +++ head/net/openvswitch/Makefile Fri Aug 10 08:19:22 2012 (r302379) @@ -30,20 +30,30 @@ MAN8= ovs-appctl.8 ovs-brcompatd.8 ovs- ovs-test.8 ovs-vsctl.8 ovs-vswitchd.8 \ ovs-vlan-bug-workaround.8 ovs-vlan-test.8 +OPTIONS_DEFINE= THREADED +THREADED_DESC= Experimental high-performance threading patch + .include .if ${OSVERSION} < 800000 BROKEN= does not compile .endif +.if ${PORT_OPTIONS:MTHREADED} +CONFIGURE_ARGS+=--enable-threaded=yes +.endif + AUTOTOOLSFILES= aclocal.m4 post-patch: @${REINPLACE_CMD} -e 's|1.11.1|%%AUTOMAKE_APIVER%%|g' \ -e 's|2.65|%%AUTOCONF_VERSION%%|g' \ ${WRKSRC}/aclocal.m4 - # Workaround for a makefile bug; if it builds without this line, remove it. - #${TOUCH} ${WRKSRC}/INSTALL +.if ${PORT_OPTIONS:MTHREADED} + @# We can't use EXTRA_PATCHES, since we need to apply this one + @# after files/patch-bsd-netdef.diff, not before. + ${PATCH} ${PATCH_ARGS} < ${FILESDIR}/threaded.diff +.endif post-install: ${INSTALL_DATA} ${WRKSRC}/vswitchd/vswitch.ovsschema ${PREFIX}/share/openvswitch/ Added: head/net/openvswitch/files/threaded.diff ============================================================================== --- /dev/null 00:00:00 1970 (empty, because file is newly added) +++ head/net/openvswitch/files/threaded.diff Fri Aug 10 08:19:22 2012 (r302379) @@ -0,0 +1,1305 @@ +diff --git configure.ac configure.ac +index 5692b86..ff62627 100644 +--- configure.ac ++++ configure.ac +@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt]) + AC_SEARCH_LIBS([timer_create], [rt]) + AC_SEARCH_LIBS([pcap_open_live], [pcap]) + ++OVS_CHECK_THREADED + OVS_CHECK_COVERAGE + OVS_CHECK_NDEBUG + OVS_CHECK_NETLINK +diff --git lib/automake.mk lib/automake.mk +index 13622b3..87bdd8d 100644 +--- lib/automake.mk ++++ lib/automake.mk +@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \ + lib/daemon.c \ + lib/daemon.h \ + lib/dhcp.h \ ++ lib/dispatch.h \ + lib/dummy.c \ + lib/dummy.h \ + lib/dhparams.h \ +diff --git lib/dispatch.h lib/dispatch.h +new file mode 100644 +index 0000000..80ac9c7 +--- /dev/null ++++ lib/dispatch.h +@@ -0,0 +1,9 @@ ++#include ++#include "ofpbuf.h" ++ ++#ifndef DISPATCH_H ++#define DISPATCH_H 1 ++ ++typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf); ++ ++#endif /* DISPATCH_H */ +diff --git lib/dpif-netdev.c lib/dpif-netdev.c +index cade79e..509e2ef 100644 +--- lib/dpif-netdev.c ++++ lib/dpif-netdev.c +@@ -32,6 +32,15 @@ + #include + #include + ++#ifdef THREADED ++#include ++#include ++ ++#include "socket-util.h" ++#include "fatal-signal.h" ++#include "dispatch.h" ++#endif ++ + #include "csum.h" + #include "dpif.h" + #include "dpif-provider.h" +@@ -55,6 +64,16 @@ + #include "vlog.h" + + VLOG_DEFINE_THIS_MODULE(dpif_netdev); ++/* We could use these macros instead of using #ifdef and #endif every time we ++ * need to call the pthread_mutex_lock/unlock. ++#ifdef THREADED ++#define LOCK(mutex) pthread_mutex_lock(mutex) ++#define UNLOCK(mutex) pthread_mutex_unlock(mutex) ++#else ++#define LOCK(mutex) ++#define UNLOCK(mutex) ++#endif ++*/ + + /* Configuration parameters. */ + enum { MAX_PORTS = 256 }; /* Maximum number of ports. */ +@@ -82,6 +101,21 @@ struct dp_netdev { + int open_cnt; + bool destroyed; + ++#ifdef THREADED ++ /* The pipe is used to signal the presence of a packet on the queue. ++ * - dpif_netdev_recv_wait() waits on p[0] ++ * - dpif_netdev_recv() extract from queue and read p[0] ++ * - dp_netdev_output_control() send to queue and write p[1] ++ */ ++ ++ int pipe[2]; /* signal a packet on the queue */ ++ struct pollfd *pipe_fd; ++ ++ pthread_mutex_t table_mutex; /* mutex for the flow table */ ++ pthread_mutex_t port_list_mutex; /* port list mutex */ ++ ++ /* The access to this queue is protected by the table_mutex mutex */ ++#endif + struct dp_netdev_queue queues[N_QUEUES]; + struct hmap flow_table; /* Flow table. */ + +@@ -102,6 +136,9 @@ struct dp_netdev_port { + struct list node; /* Element in dp_netdev's 'port_list'. */ + struct netdev *netdev; + char *type; /* Port type as requested by user. */ ++#ifdef THREADED ++ struct pollfd *poll_fd; /* To manage the poll loop in the thread. */ ++#endif + }; + + /* A flow in dp_netdev's 'flow_table'. */ +@@ -127,6 +164,11 @@ struct dpif_netdev { + unsigned int dp_serial; + }; + ++#ifdef THREADED ++/* XXX global Descriptor of the thread that manages the datapaths. */ ++pthread_t thread_p; ++#endif ++ + /* All netdev-based datapaths. */ + static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); + +@@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class, + dp->class = class; + dp->name = xstrdup(name); + dp->open_cnt = 0; ++#ifdef THREADED ++ error = pipe(dp->pipe); ++ if (error) { ++ VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno)); ++ return errno; ++ } ++ if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) { ++ VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", ++ strerror(errno)); ++ return errno; ++ } ++ dp->pipe_fd = NULL; ++ VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]); ++ ++ pthread_mutex_init(&dp->table_mutex, NULL); ++ pthread_mutex_init(&dp->port_list_mutex, NULL); ++#endif + for (i = 0; i < N_QUEUES; i++) { + dp->queues[i].head = dp->queues[i].tail = 0; + } +@@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class, + return 0; + } + ++#ifdef THREADED ++static void * dp_thread_body(void *args OVS_UNUSED); ++ ++/* This is the function that is called in response of a fatal signal (e.g. ++ * SIGTERM) */ ++static void ++dpif_netdev_exit_hook(void *aux OVS_UNUSED) ++{ ++ if (pthread_cancel(thread_p) == 0) { ++ pthread_join(thread_p, NULL); ++ } ++} ++ ++static int ++dpif_netdev_init(void) ++{ ++ static int error = -1; ++ ++ if (error < 0) { ++ fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true); ++ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL); ++ if (error != 0) { ++ VLOG_ERR("Unable to create datapath thread: %s", strerror(errno)); ++ error = errno; ++ } else { ++ VLOG_DBG("Datapath thread started"); ++ } ++ } ++ return error; ++} ++#endif ++ + static int + dpif_netdev_open(const struct dpif_class *class, const char *name, + bool create, struct dpif **dpifp) +@@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, + } + + *dpifp = create_dpif_netdev(dp); ++#ifdef THREADED ++ dpif_netdev_init(); ++#endif + return 0; + } + ++/* table_mutex must be locked in THREADED mode. ++ */ + static void + dp_netdev_purge_queues(struct dp_netdev *dp) + { +@@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp) + struct dp_netdev_port *port, *next; + + dp_netdev_flow_flush(dp); ++#ifdef THREADED ++ pthread_mutex_lock(&dp->port_list_mutex); ++#endif + LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) { + do_del_port(dp, port->port_no); + } ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + dp_netdev_purge_queues(dp); + hmap_destroy(&dp->flow_table); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++ pthread_mutex_destroy(&dp->table_mutex); ++ pthread_mutex_destroy(&dp->port_list_mutex); ++#endif + free(dp->name); + free(dp); + } +@@ -306,7 +414,13 @@ static int + dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) + { + struct dp_netdev *dp = get_dp_netdev(dpif); ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + stats->n_flows = hmap_count(&dp->flow_table); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + stats->n_hit = dp->n_hit; + stats->n_missed = dp->n_missed; + stats->n_lost = dp->n_lost; +@@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, + port->port_no = port_no; + port->netdev = netdev; + port->type = xstrdup(type); ++#ifdef THREADED ++ port->poll_fd = NULL; ++#endif + + error = netdev_get_mtu(netdev, &mtu); + if (!error) { + max_mtu = mtu; + } + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->port_list_mutex); ++#endif + list_push_back(&dp->port_list, &port->node); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++#endif + dp->ports[port_no] = port; + dp->serial++; + +@@ -448,15 +571,25 @@ get_port_by_name(struct dp_netdev *dp, + { + struct dp_netdev_port *port; + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->port_list_mutex); ++#endif + LIST_FOR_EACH (port, node, &dp->port_list) { + if (!strcmp(netdev_get_name(port->netdev), devname)) { + *portp = port; ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++#endif + return 0; + } + } ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++#endif + return ENOENT; + } + ++/* In THREADED mode, must be called with port_list_mutex held. */ + static int + do_del_port(struct dp_netdev *dp, uint16_t port_no) + { +@@ -531,7 +664,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED) + static void + dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) + { ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + hmap_remove(&dp->flow_table, &flow->node); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + free(flow->actions); + free(flow); + } +@@ -620,7 +759,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) + } + + static struct dp_netdev_flow * +-dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) ++#ifdef THREADED ++dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key) ++#else ++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) ++#endif + { + struct dp_netdev_flow *flow; + +@@ -632,6 +775,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) + return NULL; + } + ++#ifdef THREADED ++static struct dp_netdev_flow * ++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) ++{ ++ struct dp_netdev_flow *flow; ++ ++ pthread_mutex_lock(&dp->table_mutex); ++ flow = dp_netdev_lookup_flow_locked(dp, key); ++ pthread_mutex_unlock(&dp->table_mutex); ++ return flow; ++} ++#endif ++ + static void + get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats) + { +@@ -729,7 +885,13 @@ add_flow(struct dpif *dpif, const struct flow *key, + return error; + } + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0)); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + return 0; + } + +@@ -749,6 +911,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) + struct dp_netdev_flow *flow; + struct flow key; + int error; ++ int n_flows; + + error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key); + if (error) { +@@ -758,7 +921,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) + flow = dp_netdev_lookup_flow(dp, &key); + if (!flow) { + if (put->flags & DPIF_FP_CREATE) { +- if (hmap_count(&dp->flow_table) < MAX_FLOWS) { ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif ++ n_flows = hmap_count(&dp->flow_table); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif ++ if (n_flows < MAX_FLOWS) { + if (put->stats) { + memset(put->stats, 0, sizeof *put->stats); + } +@@ -843,7 +1013,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_, + struct dp_netdev_flow *flow; + struct hmap_node *node; + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + if (!node) { + return EOF; + } +@@ -949,7 +1125,13 @@ static int + dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, + struct ofpbuf *buf) + { +- struct dp_netdev_queue *q = find_nonempty_queue(dpif); ++ struct dp_netdev_queue *q; ++#ifdef THREADED ++ struct dp_netdev *dp = get_dp_netdev(dpif); ++ char c; ++ pthread_mutex_lock(&dp->table_mutex); ++#endif ++ q = find_nonempty_queue(dpif); + if (q) { + struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK]; + *upcall = *u; +@@ -958,8 +1140,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, + ofpbuf_uninit(buf); + *buf = *upcall->packet; + ++#ifdef THREADED ++ /* Read a byte from the pipe to signal that a packet has been ++ * received. */ ++ if (read(dp->pipe[0], &c, 1) < 0) { ++ VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno)); ++ } ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + return 0; + } else { ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + return EAGAIN; + } + } +@@ -967,19 +1160,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, + static void + dpif_netdev_recv_wait(struct dpif *dpif) + { ++#ifdef THREADED ++ struct dp_netdev *dp = get_dp_netdev(dpif); ++ ++ poll_fd_wait(dp->pipe[0], POLLIN); ++#else + if (find_nonempty_queue(dpif)) { + poll_immediate_wake(); + } else { + /* No messages ready to be received, and dp_wait() will ensure that we + * wake up to queue new messages, so there is nothing to do. */ + } ++#endif + } + + static void + dpif_netdev_recv_purge(struct dpif *dpif) + { + struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); ++#ifdef THREADED ++ struct dp_netdev *dp = get_dp_netdev(dpif); ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + dp_netdev_purge_queues(dpif_netdev->dp); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + } + + static void +@@ -1003,7 +1209,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, + return; + } + flow_extract(packet, 0, 0, port->port_no, &key); ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++ flow = dp_netdev_lookup_flow_locked(dp, &key); ++#else + flow = dp_netdev_lookup_flow(dp, &key); ++#endif + if (flow) { + dp_netdev_flow_used(flow, &key, packet); + dp_netdev_execute_actions(dp, packet, &key, +@@ -1013,8 +1224,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, + dp->n_missed++; + dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0); + } ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + } + ++#ifdef THREADED ++static void ++dpif_netdev_run(struct dpif *dpif OVS_UNUSED) ++{ ++} ++ ++static void ++dpif_netdev_wait(struct dpif *dpif OVS_UNUSED) ++{ ++} ++#else + static void + dpif_netdev_run(struct dpif *dpif) + { +@@ -1053,6 +1278,144 @@ dpif_netdev_wait(struct dpif *dpif) + netdev_recv_wait(port->netdev); + } + } ++#endif ++ ++#ifdef THREADED ++/* ++ * pcap callback argument ++ */ ++struct dispatch_arg { ++ struct dp_netdev *dp; /* update statistics */ ++ struct dp_netdev_port *port; /* argument to flow identifier function */ ++}; ++ ++/* Process a packet. ++ * ++ * The port_input function will send immediately if it finds a flow match and ++ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP. ++ * If a flow is not found or for the other actions, the packet is copied. ++ */ ++static void ++process_pkt(u_char *user, struct ofpbuf *buf) ++{ ++ struct dispatch_arg *arg = (struct dispatch_arg *)user; ++ ++ ofpbuf_padto(buf, ETH_TOTAL_MIN); ++ dp_netdev_port_input(arg->dp, arg->port, buf); ++} ++ ++/* Body of the thread that manages the datapaths */ ++static void* ++dp_thread_body(void *args OVS_UNUSED) ++{ ++ struct dp_netdev *dp; ++ struct dp_netdev_port *port; ++ struct dispatch_arg arg; ++ int error; ++ int n_fds; ++ uint32_t batch = 50; /* max number of pkts processed by the dispatch */ ++ int processed; /* actual number of pkts processed by the dispatch */ ++ char readbuf[1024]; ++ ++ sigset_t sigmask; ++ ++ /*XXX Since the poll involves all ports of all datapaths, the right fds ++ * size should be MAX_PORTS * max_number_of_datapaths */ ++ struct pollfd fds[MAX_PORTS + 1]; ++ ++ /* mask the fatal signals. In this way the main thread is delegate to ++ * manage this them. */ ++ sigemptyset(&sigmask); ++ sigaddset(&sigmask, SIGTERM); ++ sigaddset(&sigmask, SIGALRM); ++ sigaddset(&sigmask, SIGINT); ++ sigaddset(&sigmask, SIGHUP); ++ ++ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) { ++ VLOG_ERR("Error setting thread sigmask: %s", strerror(errno)); ++ } ++ ++ for(;;) { ++ struct shash_node *node; ++ n_fds = 0; ++ /* build the structure for poll */ ++ SHASH_FOR_EACH(node, &dp_netdevs) { ++ dp = (struct dp_netdev *)node->data; ++ fds[n_fds].fd = dp->pipe[1]; ++ fds[n_fds].events = POLLIN; ++ dp->pipe_fd = &fds[n_fds]; ++ n_fds++; ++ if (n_fds >= sizeof(fds) / sizeof(fds[0])) { ++ VLOG_ERR("Too many fds for poll adding pipe_fd"); ++ break; ++ } ++ pthread_mutex_lock(&dp->port_list_mutex); ++ LIST_FOR_EACH (port, node, &dp->port_list) { ++ /* insert an element in the fds structure */ ++ fds[n_fds].fd = netdev_get_fd(port->netdev); ++ fds[n_fds].events = POLLIN; ++ port->poll_fd = &fds[n_fds]; ++ n_fds++; ++ if (n_fds >= sizeof(fds) / sizeof(fds[0])) { ++ VLOG_ERR("Too many fds for poll adding port fd"); ++ break; ++ } ++ } ++ pthread_mutex_unlock(&dp->port_list_mutex); ++ } ++ ++ error = poll(fds, n_fds, 2000); ++ VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error); ++ ++ if (error < 0) { ++ if (errno == EINTR) { ++ /* XXX get this case in detach mode */ ++ continue; ++ } ++ VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno)); ++ /* XXX terminating the thread is probably not right */ ++ break; ++ } ++ pthread_testcancel(); ++ ++ SHASH_FOR_EACH (node, &dp_netdevs) { ++ dp = (struct dp_netdev *)node->data; ++ if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) { ++ VLOG_DBG("Signalled from main thread"); ++ while ((error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0) ++ ; ++ if (error < 0 && errno != EAGAIN) { ++ VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno)); ++ } ++ } ++ arg.dp = dp; ++ pthread_mutex_lock(&dp->port_list_mutex); ++ LIST_FOR_EACH (port, node, &dp->port_list) { ++ arg.port = port; ++ if (port->poll_fd) { ++ VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents); ++ } ++ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) { ++ /* call the dispatch and process the packet into ++ * its callback. We process 'batch' packets at time */ ++ processed = netdev_dispatch(port->netdev, batch, ++ process_pkt, (u_char *)&arg); ++ if (processed < 0) { /* pcap returns error */ ++ static struct vlog_rate_limit rl = ++ VLOG_RATE_LIMIT_INIT(1, 5); ++ VLOG_ERR_RL(&rl, ++ "error receiving data from XXX \n"); ++ } ++ } /* end of if poll */ ++ } /* end of port loop */ ++ pthread_mutex_unlock(&dp->port_list_mutex); ++ } /* end of dp loop */ ++ } /* for ;; */ ++ ++ return NULL; ++} ++ ++#endif /* THREADED */ + + static void + dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key) +@@ -1068,11 +1431,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet, + uint16_t out_port) + { + struct dp_netdev_port *p = dp->ports[out_port]; ++ char c = 0; ++ + if (p) { + netdev_send(p->netdev, packet); ++#ifdef THREADED ++ if (write(dp->pipe[0], &c, 1) < 0) { ++ VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno)); ++ } ++#endif + } + } + ++/* In THREADED mode, must be called with table_lock_mutex held. */ + static int + dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, + int queue_no, const struct flow *flow, uint64_t arg) +@@ -1081,6 +1452,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, + struct dpif_upcall *upcall; + struct ofpbuf *buf; + size_t key_len; ++#ifdef THREADED ++ char c = 0; ++#endif + + if (q->head - q->tail >= MAX_QUEUE_LEN) { + dp->n_lost++; +@@ -1102,6 +1476,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, + upcall->userdata = arg; + + q->upcalls[q->head++ & QUEUE_MASK] = upcall; ++#ifdef THREADED ++ /* Write a byte on the pipe to advertise that a packet is ready. */ ++ if (write(dp->pipe[1], &c, 1) < 0) { ++ VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno)); ++ } ++#endif + + return 0; + } +@@ -1150,7 +1530,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp, + + userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); + userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0; ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + } + + static void +diff --git lib/netdev-bsd.c lib/netdev-bsd.c +index 0b1a37c..ff79367 100644 +--- lib/netdev-bsd.c ++++ lib/netdev-bsd.c +@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_) + } + } + ++#ifdef THREADED ++ ++struct dispatch_arg { ++ pkt_handler h; ++ u_char *user; ++}; ++ ++static void ++dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata) ++{ ++ struct ofpbuf buf; ++ struct dispatch_arg *parg = (struct dispatch_arg*)user; ++ ++ ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen); ++ buf.size = phdr->caplen; ++ (*parg->h)(parg->user, &buf); ++ ofpbuf_uninit(&buf); ++} ++ ++static int ++netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, ++ u_char *user) ++{ ++ int ret; ++ struct dispatch_arg arg; ++ ++ arg.h = h; ++ arg.user = user; ++ ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg); ++ return ret; ++} ++ ++static int ++netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, ++ u_char *user) ++{ ++ int ret; ++ int i; ++ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; ++ OFPBUF_STACK_BUFFER(buf_, size); ++ ++ struct ofpbuf buf; ++ ofpbuf_use_stub(&buf, buf_, size); ++ for (i = 0; i < batch; i++) { ++ ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf)); ++ if (ret >= 0) { ++ buf.size += ret; ++ h(user, &buf); ++ } else if (ret != -EAGAIN) { ++ return -1; ++ } else { /* ret = EAGAIN */ ++ break; ++ } ++ ofpbuf_clear(&buf); ++ } ++ ofpbuf_uninit(&buf); ++ return i; ++} ++ ++static int ++netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, ++ u_char *user) ++{ ++ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); ++ struct netdev_dev_bsd * netdev_dev = ++ netdev_dev_bsd_cast(netdev_get_dev(netdev_)); ++ ++ if (!strcmp(netdev_get_type(netdev_), "tap") && ++ netdev->netdev_fd == netdev_dev->tap_fd) { ++ return netdev_bsd_dispatch_tap(netdev, batch, h, user); ++ } else { ++ return netdev_bsd_dispatch_system(netdev, batch, h, user); ++ } ++} ++ ++static int ++netdev_bsd_get_fd(struct netdev *netdev_) ++{ ++ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); ++ return netdev->netdev_fd; ++} ++#endif ++ + /* Discards all packets waiting to be received from 'netdev'. */ + static int + netdev_bsd_drain(struct netdev *netdev_) +@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = { + + netdev_bsd_recv, + netdev_bsd_recv_wait, ++#ifdef THREADED ++ netdev_bsd_dispatch, ++ netdev_bsd_get_fd, ++#endif + netdev_bsd_drain, + + netdev_bsd_send, +@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = { + + netdev_bsd_recv, + netdev_bsd_recv_wait, ++#ifdef THREADED ++ netdev_bsd_dispatch, ++ netdev_bsd_get_fd, ++#endif + netdev_bsd_drain, + + netdev_bsd_send, +diff --git lib/netdev-dummy.c lib/netdev-dummy.c +index b8c23c5..4e4801c 100644 +--- lib/netdev-dummy.c ++++ lib/netdev-dummy.c +@@ -20,6 +20,12 @@ + + #include + ++#ifdef THREADED ++#include ++#include ++#include "socket-util.h" ++#endif ++ + #include "flow.h" + #include "list.h" + #include "netdev-provider.h" +@@ -51,6 +57,10 @@ struct netdev_dummy { + struct list node; /* In netdev_dev_dummy's "devs" list. */ + struct list recv_queue; + bool listening; ++#ifdef THREADED ++ pthread_mutex_t queue_mutex; ++ int s_pipe[2]; /* used to signal packet arrivals */ ++#endif + }; + + static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs); +@@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp) + { + struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_); + struct netdev_dummy *netdev; ++#ifdef THREADED ++ int error; ++#endif + + netdev = xmalloc(sizeof *netdev); + netdev_init(&netdev->netdev, netdev_dev_); + list_init(&netdev->recv_queue); + netdev->listening = false; ++#ifdef THREADED ++ error = pipe(netdev->s_pipe); ++ if (error) { ++ VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno)); ++ free(netdev); ++ return errno; ++ } ++ if (set_nonblocking(netdev->s_pipe[0]) || ++ set_nonblocking(netdev->s_pipe[1])) { ++ VLOG_ERR("Unable to set nonblocking on dummy pipe: %s", ++ strerror(errno)); ++ free(netdev); ++ return errno; ++ } ++ pthread_mutex_init(&netdev->queue_mutex, NULL); ++#endif + + *netdevp = &netdev->netdev; + list_push_back(&netdev_dev->devs, &netdev->node); +@@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_) + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + list_remove(&netdev->node); + ofpbuf_list_delete(&netdev->recv_queue); ++#ifdef THREADED ++ if (netdev->listening) { ++ close(netdev->s_pipe[0]); ++ close(netdev->s_pipe[1]); ++ } ++ pthread_mutex_destroy(&netdev->queue_mutex); ++#endif + free(netdev); + } + +@@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size) + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + struct ofpbuf *packet; + size_t packet_size; ++#ifdef THREADED ++ char c; ++#endif + ++#ifdef THREADED ++ pthread_mutex_lock(&netdev->queue_mutex); ++#endif + if (list_is_empty(&netdev->recv_queue)) { ++#ifdef THREADED ++ pthread_mutex_unlock(&netdev->queue_mutex); ++#endif + return -EAGAIN; + } ++#ifdef THREADED ++ if (read(netdev->s_pipe[0], &c, 1) < 0) { ++ VLOG_ERR("Error reading dummy pipe: %s", strerror(errno)); ++ } ++#endif + + packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); ++#ifdef THREADED ++ pthread_mutex_unlock(&netdev->queue_mutex); ++#endif + if (packet->size > size) { + return -EMSGSIZE; + } +@@ -179,11 +232,60 @@ static void + netdev_dummy_recv_wait(struct netdev *netdev_) + { + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); +- if (!list_is_empty(&netdev->recv_queue)) { ++ int empty; ++ ++#ifdef THREADED ++ pthread_mutex_lock(&netdev->queue_mutex); ++#endif ++ empty = list_is_empty(&netdev->recv_queue); ++#ifdef THREADED ++ pthread_mutex_unlock(&netdev->queue_mutex); ++#endif ++ if (!empty) { + poll_immediate_wake(); + } + } + ++#ifdef THREADED ++static int ++netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h, ++ u_char *user) ++{ ++ int i; ++ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); ++ struct ofpbuf *packet; ++ VLOG_DBG("dispatch %d", batch); ++ ++ for (i = 0; i < batch; i++) { ++ char c; ++ if (read(netdev->s_pipe[0], &c, 1) < 0) { ++ if (errno == EAGAIN) ++ break; ++ VLOG_ERR("%s: error reading from the pipe: %s", ++ netdev_get_name(netdev_), strerror(errno)); ++ return -1; ++ } ++ pthread_mutex_lock(&netdev->queue_mutex); ++ if (list_is_empty(&netdev->recv_queue)) { ++ pthread_mutex_unlock(&netdev->queue_mutex); ++ return -EAGAIN; ++ } ++ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); ++ pthread_mutex_unlock(&netdev->queue_mutex); ++ h(user, packet); ++ ofpbuf_delete(packet); ++ } ++ return i; ++} ++ ++static int ++netdev_dummy_get_fd(struct netdev *netdev_) *** DIFF OUTPUT TRUNCATED AT 1000 LINES ***