From owner-svn-src-head@FreeBSD.ORG Sun Dec 23 23:03:45 2012 Return-Path: Delivered-To: svn-src-head@freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [69.147.83.52]) by hub.freebsd.org (Postfix) with ESMTP id B29C2715; Sun, 23 Dec 2012 23:03:45 +0000 (UTC) (envelope-from luigi@FreeBSD.org) Received: from svn.freebsd.org (svn.freebsd.org [IPv6:2001:1900:2254:2068::e6a:0]) by mx1.freebsd.org (Postfix) with ESMTP id 952428FC0A; Sun, 23 Dec 2012 23:03:45 +0000 (UTC) Received: from svn.freebsd.org (localhost [127.0.0.1]) by svn.freebsd.org (8.14.5/8.14.5) with ESMTP id qBNN3j7i084786; Sun, 23 Dec 2012 23:03:45 GMT (envelope-from luigi@svn.freebsd.org) Received: (from luigi@localhost) by svn.freebsd.org (8.14.5/8.14.5/Submit) id qBNN3jBD084785; Sun, 23 Dec 2012 23:03:45 GMT (envelope-from luigi@svn.freebsd.org) Message-Id: <201212232303.qBNN3jBD084785@svn.freebsd.org> From: Luigi Rizzo Date: Sun, 23 Dec 2012 23:03:45 +0000 (UTC) To: src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org Subject: svn commit: r244644 - head/tools/tools/netrate/netreceive X-SVN-Group: head MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-BeenThere: svn-src-head@freebsd.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: SVN commit messages for the src tree for head/-current List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Sun, 23 Dec 2012 23:03:45 -0000 Author: luigi Date: Sun Dec 23 23:03:45 2012 New Revision: 244644 URL: http://svnweb.freebsd.org/changeset/base/244644 Log: small cleanup of the code, and add support for running multiple threads on each socket. Modified: head/tools/tools/netrate/netreceive/netreceive.c Modified: head/tools/tools/netrate/netreceive/netreceive.c ============================================================================== --- head/tools/tools/netrate/netreceive/netreceive.c Sun Dec 23 22:43:27 2012 (r244643) +++ head/tools/tools/netrate/netreceive/netreceive.c Sun Dec 23 23:03:45 2012 (r244644) @@ -43,27 +43,158 @@ #define MAXSOCK 20 +#include +#include +#include /* clock_getres() */ + +static int round_to(int n, int l) +{ + return ((n + l - 1)/l)*l; +} + +/* + * Each socket uses multiple threads so the receiver is + * more efficient. A collector thread runs the stats. + */ +struct td_desc { + pthread_t td_id; + uint64_t count; /* rx counter */ + int fd; + char *buf; + int buflen; +}; + static void usage(void) { - fprintf(stderr, "netreceive [port]\n"); + fprintf(stderr, "netreceive port [nthreads]\n"); exit(-1); } +static __inline void +timespec_add(struct timespec *tsa, struct timespec *tsb) +{ + + tsa->tv_sec += tsb->tv_sec; + tsa->tv_nsec += tsb->tv_nsec; + if (tsa->tv_nsec >= 1000000000) { + tsa->tv_sec++; + tsa->tv_nsec -= 1000000000; + } +} + +static __inline void +timespec_sub(struct timespec *tsa, struct timespec *tsb) +{ + + tsa->tv_sec -= tsb->tv_sec; + tsa->tv_nsec -= tsb->tv_nsec; + if (tsa->tv_nsec < 0) { + tsa->tv_sec--; + tsa->tv_nsec += 1000000000; + } +} + +static void * +rx_body(void *data) +{ + struct td_desc *t = data; + struct pollfd fds; + int y; + + fds.fd = t->fd; + fds.events = POLLIN; + + for (;;) { + if (poll(&fds, 1, -1) < 0) + perror("poll on thread"); + if (!(fds.revents & POLLIN)) + continue; + for (;;) { + y = recv(t->fd, t->buf, t->buflen, MSG_DONTWAIT); + if (y < 0) + break; + t->count++; + } + } + return NULL; +} + +int +make_threads(struct td_desc **tp, int *s, int nsock, int nthreads) +{ + int i, si, nt = nsock * nthreads; + int lb = round_to(nt * sizeof (struct td_desc *), 64); + int td_len = round_to(sizeof(struct td_desc), 64); // cache align + char *m = calloc(1, lb + td_len * nt); + + printf("td len %d -> %d\n", (int)sizeof(struct td_desc) , td_len); + /* pointers plus the structs */ + if (m == NULL) { + perror("no room for pointers!"); + exit(1); + } + tp = (struct td_desc **)m; + m += lb; /* skip the pointers */ + for (si = i = 0; i < nt; i++, m += td_len) { + tp[i] = (struct td_desc *)m; + tp[i]->fd = s[si]; + if (++si == nsock) + si = 0; + if (pthread_create(&tp[i]->td_id, NULL, rx_body, tp[i])) { + perror("unable to create thread"); + exit(1); + } + } +} + +int +main_thread(struct td_desc **tp, int nsock, int nthreads) +{ + uint64_t c0, c1; + struct timespec now, then, delta; + /* now the parent collects and prints results */ + c0 = c1 = 0; + clock_gettime(CLOCK_REALTIME, &then); + fprintf(stderr, "start at %ld.%09ld\n", then.tv_sec, then.tv_nsec); + while (1) { + int i, nt = nsock * nthreads; + int64_t dn; + uint64_t pps; + + if (poll(NULL, 0, 500) < 0) + perror("poll"); + c0 = 0; + for (i = 0; i < nt; i++) { + c0 += tp[i]->count; + } + dn = c0 - c1; + clock_gettime(CLOCK_REALTIME, &now); + delta = now; + timespec_sub(&delta, &then); + then = now; + pps = dn; + pps = (pps * 1000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); + fprintf(stderr, "%d pkts in %ld.%09ld ns %ld pps\n", + (int)dn, delta.tv_sec, delta.tv_nsec, (long)pps); + c1 = c0; + } +} + int main(int argc, char *argv[]) { struct addrinfo hints, *res, *res0; char *dummy, *packet; int port; - int error, v, i; + int error, v, nthreads = 1; + struct td_desc **tp; const char *cause = NULL; int s[MAXSOCK]; - struct pollfd fds[MAXSOCK]; int nsock; - if (argc != 2) + if (argc < 2) usage(); memset(&hints, 0, sizeof(hints)); @@ -74,6 +205,10 @@ main(int argc, char *argv[]) port = strtoul(argv[1], &dummy, 10); if (port < 1 || port > 65535 || *dummy != '\0') usage(); + if (argc > 2) + nthreads = strtoul(argv[2], &dummy, 10); + if (nthreads < 1 || nthreads > 64) + usage(); packet = malloc(65536); if (packet == NULL) { @@ -110,9 +245,6 @@ main(int argc, char *argv[]) continue; } (void) listen(s[nsock], 5); - fds[nsock].fd = s[nsock]; - fds[nsock].events = POLLIN; - nsock++; } if (nsock == 0) { @@ -121,21 +253,12 @@ main(int argc, char *argv[]) /*NOTREACHED*/ } - printf("netreceive listening on UDP port %d\n", (u_short)port); + printf("netreceive %d sockets x %d threads listening on UDP port %d\n", + nsock, nthreads, (u_short)port); + + make_threads(tp, s, nsock, nthreads); + main_thread(tp, nsock, nthreads); - while (1) { - if (poll(fds, nsock, -1) < 0) - perror("poll"); - for (i = 0; i < nsock; i++) { - if (fds[i].revents & POLLIN) { - if (recv(s[i], packet, 65536, 0) < 0) - perror("recv"); - } - if ((fds[i].revents &~ POLLIN) != 0) - perror("poll"); - } - } - /*NOTREACHED*/ freeaddrinfo(res0); }