Date: Sat, 9 Aug 2008 01:26:33 +0200 From: "=?ISO-8859-1?Q?Ermal_Lu=E7i?=" <ermal.luci@gmail.com> To: "Mike Makonnen" <mtm@wubethiopia.com> Cc: freebsd-net@freebsd.org Subject: Re: Application layer classifier for ipfw Message-ID: <9a542da30808081626y7ce1fe58k483494a6cdbfae60@mail.gmail.com> In-Reply-To: <48945A79.50300@wubethiopia.com> References: <48918DB5.7020201@wubethiopia.com> <4891CD13.20600@freebsdbrasil.com.br> <48922E9D.1020507@elischer.org> <4893328C.2040105@freebsdbrasil.com.br> <4894447C.3000800@wubethiopia.com> <48945A79.50300@wubethiopia.com>
next in thread | previous in thread | raw e-mail | index | archive | help
On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <mtm@wubethiopia.com> wrote: > Mike Makonnen wrote: >> >> Patrick Tracanelli wrote: >>> >>> To let you know of my current (real world) tests: >>> >>> - Wireless Internet Provider 1: >>> - 4Mbit/s of Internet Traffic >>> - Classifying default protocols + soulseek + ssh >>> - Classifying 100Mbit/s of dump over ssh >>> >>> Results in: >>> No latency added, very low CPU usage, no packets dropping. >>> >>> - Wireless ISP 2: >>> - 21 Mbit/s of Internet Traffic >>> - Classifying default protocols + soulseek + ssh >>> >>> Results in: >>> No tcp or udp traffic at all; everything that gets diverted never >>> comes out of the divert socket, and ipfw-classifyd logs >>> >>> Aug 1 12:07:35 ourofino last message repeated 58 times >>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent >>> (rule 50000) >>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule >>> 50000) >>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule >>> 50000) >>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule >>> 1000) >>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule >>> 50000) >>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh (rule >>> 50000) >>> Aug 1 12:18:28 ourofino ipfw-classifyd: unable to write to divert >>> socket: Operation not permitted >>> Aug 1 12:18:50 ourofino last message repeated 90 times >> >> Hmmm... this part means that the call to sendto(2) to write the packet >> back into network stack failed. This explains why you are not seein g any >> traffic comming back out of the divert socket, but I don't see why it would >> suddenly fail with a permission error. Could this be a kernel bug? >>> >>> Aug 1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full >>> Aug 1 12:19:11 ourofino last message repeated 94 times >>> >>> Raised queue len a lot (up to 40960), when the application starts it uses >>> up to 25% CPU and a second after that, CPU usage gets lower the 0.1%. >> >> This looks like a deadlock. If it weren't able to process packets fast >> enough the cpu usage should be high even as it's spewing "packet dropped" >> messages. Can you send me some more information like memory usage and the >> firewall script you are using? How much of the 21Mbits/s of traffic is P2P? >> If you reduce the number of protocols you are trying to match against does >> the behavior change? Using netstat -w1 -I<interface> can you tell me how >> many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also, >> the timestamps from the log file seem to show that the daemon is running for >> approx. 34 sec. before the first "unable to write to write to divert socket" >> message. Is it passing traffic during this time? Thanks. >> >> I've uploaded a newer version. Can you try that also please. It includes: >> o SIGHUP forces it to re-read its configuration file >> o rc.d script >> o minor optimization (calls pthread_cond_signal with the mutex unlocked) >> o code cleanup >> >> Also, for your convenience I have attached a patch against the earlier >> version that removes a debugging printf that should remove spammage to your >> log files (the current version has it removed already). >> > > Ooops, a few minutes after I sent this email I found a couple of bugs (one > major, and one minor). They were in the original tarball as well as the > newer one I uploaded earlier today. I've uploaded a fixed version of the > code. Can you try that instead please. > > Also, to help track down performance issues I've modified the Makefile to > build a profiled version of the application so you can use gprof(1) to > figure out where any problems lie. > Does this sound about right for implementing the GC and implementing syntax as $protocol = dnpipe 20 $protocol2 = dnqueue 30 it has some extra goos for pf(4) and altq(4) $protocol3 = queue $queue name $protocol4 = tag TAGNAME $protocol5 = action block It adds 2 new options -e seconds for seconds before a flow is considered expired and -n #packets proccessed before kicking the GC. --- classifyd_old.c 2008-08-09 00:33:04.000000000 +0000 +++ classifyd.c 2008-08-09 00:33:34.000000000 +0000 @@ -28,13 +28,17 @@ #include <sys/types.h> #include <sys/socket.h> +#include <sys/ioctl.h> +#include <sys/time.h> +#include <net/if.h> #include <arpa/inet.h> #include <netinet/in.h> #include <netinet/in_systm.h> #include <netinet/ip.h> #include <netinet/tcp.h> #include <netinet/udp.h> +#include <net/pfvar.h> #include <assert.h> #include <err.h> @@ -53,6 +57,7 @@ #include <unistd.h> #include "hashtable.h" +#include "hashtable_private.h" #include "pathnames.h" #include "protocols.h" @@ -94,6 +99,7 @@ uint32_t if_datalen; /* length in bytes of if_data */ uint16_t if_pktcount; /* number of packets concatenated */ uint16_t if_fwrule; /* ipfw(4) rule associated with flow */ + time_t expire; /* flow expire time */ }; /* @@ -126,7 +132,7 @@ static struct ic_queue outQ; /* divert(4) socket */ -static int dvtS; +static int dvtS = 0; /* config file path */ static const char *conf = IC_CONFIG_PATH; @@ -137,12 +143,25 @@ /* List of protocols available to the system */ struct ic_protocols *fp; +/* Our hashtables */ +struct hashtable *sh = NULL, + *th = NULL, + *uh = NULL; + +/* signaled to kick garbage collector */ +static pthread_cond_t gq_condvar; + +/* number of packets before kicking garbage collector */ +static unsigned int npackets = 250; + +static time_t time_expire = 40; /* 40 seconds */ /* * Forward function declarations. */ void *classify_pthread(void *); void *read_pthread(void *); void *write_pthread(void *); +void *garbage_pthread(void *); static int equalkeys(void *, void *); static unsigned int hashfromkey(void *); static void test_re(void); @@ -155,7 +174,7 @@ { struct sockaddr_in addr; struct sigaction sa; - pthread_t classifytd, readtd, writetd; + pthread_t classifytd, readtd, writetd, garbagectd; const char *errstr; long long num; uint16_t port, qmaxsz; @@ -164,13 +183,27 @@ tflag = 0; port = IC_DPORT; qmaxsz = IC_QMAXSZ; - while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) { + while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) { switch(ch) { case 'c': conf = strdup(optarg); if (conf == NULL) err(EX_TEMPFAIL, "config file path"); break; + case 'e': + num = strtonum((const char *)optarg, 1, 400, &errstr); + if (num == 0 && errstr != NULL) { + errx(EX_USAGE, "invalud expire seconds: %s", errstr); + } + time_expire = (time_t)num; + break; + case 'n': + num = strtonum((const char *)optarg, 1, 65535, &errstr); + if (num == 0 && errstr != NULL) { + errx(EX_USAGE, "invalud expire seconds: %s", errstr); + } + npackets = (unsigned int)num; + break; case 'P': protoDir = strdup(optarg); if (protoDir == NULL) @@ -230,6 +263,9 @@ error = pthread_cond_init(&outQ.fq_condvar, NULL); if (error != 0) err(EX_OSERR, "unable to initialize output queue condvar"); + error = pthread_cond_init(&gq_condvar, NULL); + if (error != 0) + err(EX_OSERR, "unable to initialize garbage collector condvar"); /* * Create and bind the divert(4) socket. @@ -276,32 +312,80 @@ if (error == -1) err(EX_OSERR, "unable to set signal handler"); + /* + * There are 3 tables: udp, tcp, and tcp syn. + * The tcp syn table tracks connections for which a + * SYN packet has been sent but no reply has been returned + * yet. Once the SYN ACK reply is detected it is moved to + * the regular tcp connection tracking table. + */ + sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); + if (sh == NULL) { + syslog(LOG_ERR, "unable to create TCP (SYN) tracking table"); + error = EX_SOFTWARE; + goto cleanup; + } + th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); + if (th == NULL) { + syslog(LOG_ERR, "unable to create TCP tracking table"); + error = EX_SOFTWARE; + goto cleanup; + } + uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); + if (uh == NULL) { + syslog(LOG_ERR, "unable to create UDP tracking table"); + error = EX_SOFTWARE; + goto cleanup; + } + /* * Create the various threads. */ error = pthread_create(&readtd, NULL, read_pthread, NULL); - if (error != 0) - err(EX_OSERR, "unable to create reader thread"); + if (error != 0) { + syslog(LOG_ERR, "unable to create reader thread"); + error = EX_OSERR; + goto cleanup; + } error = pthread_create(&classifytd, NULL, classify_pthread, NULL); - if (error != 0) - err(EX_OSERR, "unable to create classifier thread"); + if (error != 0) { + syslog(LOG_ERR, "unable to create classifier thread"); + error = EX_OSERR; + goto cleanup; + } error = pthread_create(&writetd, NULL, write_pthread, NULL); - if (error != 0) - err(EX_OSERR, "unable to create writer thread"); - + if (error != 0) { + syslog(LOG_ERR, "unable to create writer thread"); + error = EX_OSERR; + goto cleanup; + } + error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL); + if (error != 0) { + syslog(LOG_ERR, "unable to create garbage collect thread"); + error = EX_OSERR; + goto cleanup; + } /* * Wait for our threads to exit. */ pthread_join(readtd, NULL); pthread_join(classifytd, NULL); pthread_join(writetd, NULL); - + pthread_join(garbagectd, NULL); /* * Cleanup */ - close(dvtS); +cleanup: + if (dvtS > 0) + close(dvtS); + if (sh != NULL) + hashtable_destroy(sh, 1); + if (th != NULL) + hashtable_destroy(th, 1); + if (uh != NULL) + hashtable_destroy(uh, 1); - return (0); + return (error); } void * @@ -310,6 +394,7 @@ struct ic_pkt *pkt; struct ip *ipp; int len; + unsigned int pcktcnt = 0; while (1) { pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt)); @@ -353,6 +438,10 @@ STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link); inQ.fq_size++; pthread_mutex_unlock(&inQ.fq_mtx); + if (++pcktcnt > npackets) { + pcktcnt = 0; + pthread_cond_signal(&gq_condvar); + } pthread_cond_signal(&inQ.fq_condvar); } @@ -420,39 +509,19 @@ struct tcphdr *tcp; struct udphdr *udp; struct ic_pkt *pkt; - struct hashtable *sh, *th, *uh; struct protocol *proto; + struct timeval tv; regmatch_t pmatch; u_char *data, *payload; uint16_t trycount; int datalen, error; - /* - * There are 3 tables: udp, tcp, and tcp syn. - * The tcp syn table tracks connections for which a - * SYN packet has been sent but no reply has been returned - * yet. Once the SYN ACK reply is detected it is moved to - * the regular tcp connection tracking table. - */ - sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); - if (sh == NULL) { - syslog(LOG_ERR, "unable to create TCP (SYN) tracking table"); - exit(EX_SOFTWARE); - } - th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); - if (th == NULL) { - syslog(LOG_ERR, "unable to create TCP tracking table"); - exit(EX_SOFTWARE); - } - uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); - if (uh == NULL) { - syslog(LOG_ERR, "unable to create UDP tracking table"); - exit(EX_SOFTWARE); - } - flow = NULL; key = NULL; while(1) { + while(gettimeofday(&tv, NULL) != 0) + ; + pthread_mutex_lock(&inQ.fq_mtx); pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link); while (pkt == NULL) { @@ -528,6 +597,8 @@ free(pkt); continue; } + + flow->expire = tv.tv_sec; goto enqueue; /* * Handle session tear-down. @@ -583,8 +654,11 @@ * collecting IC_PKTMAXMATCH packets, just pass it through. */ } else if (flow->if_pktcount >= IC_PKTMAXMATCH && - flow->if_fwrule == 0) + flow->if_fwrule == 0) { + flow->expire = tv.tv_sec; goto enqueue; + } + flow->expire = tv.tv_sec; goto classify; } @@ -630,6 +704,7 @@ free(pkt); continue; } + flow->expire = tv.tv_sec; goto classify; } @@ -688,6 +763,7 @@ flow->if_datalen = datalen; flow->if_pktcount = 1; flow->if_fwrule = 0; + flow->expire = tv.tv_sec; if (hashtable_insert(uh, (void *)key, (void *)flow) == 0) { syslog(LOG_WARNING, "packet dropped: unable to insert into table"); @@ -715,19 +791,26 @@ flow->if_data = data; flow->if_datalen += datalen; flow->if_pktcount++; + flow->expire = tv.tv_sec; /* * If we haven't been able to classify this flow after * collecting IC_PKTMAXMATCH packets, just pass it through. */ } else if (flow->if_pktcount >= IC_PKTMAXMATCH && - flow->if_fwrule == 0) + flow->if_fwrule == 0) { + flow->expire = tv.tv_sec; goto enqueue; + } } else /* Not an TCP or UDP packet. */ goto enqueue; classify: - assert(flow != NULL); + if (flow == NULL) { + syslog(LOG_ERR, "flow is null argghhhhhhh"); + goto enqueue; + } + //assert(flow != NULL); /* * Inform divert(4) what rule to send it to by @@ -823,6 +906,80 @@ return (NULL); } +void * +garbage_pthread(void *arg __unused) +{ + char errbuf[LINE_MAX]; + struct entry *e, *f; + unsigned int i, flows_expired, error; + struct timeval tv; + + while (1) { + flows_expired = 0; + while (gettimeofday(&tv, NULL) != 0) + ; + tv.tv_sec -= time_expire; + + pthread_mutex_lock(&inQ.fq_mtx); + error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx); + if (error != 0) { + strerror_r(error, errbuf, sizeof(errbuf)); + syslog(EX_OSERR, "unable to wait on garbage collection: %s", + errbuf); + exit(EX_OSERR); + } + + for (i = 0; i < sh->tablelength; i++) { + e = sh->table[i]; + while (e != NULL) { + f = e; e = e->next; + if (((struct ip_flow *)f->v)->expire < tv.tv_sec) { + freekey(f->k); + sh->entrycount--; + if (f->v != NULL) + free(f->v); + free(f); + flows_expired++; + } + } + } + for (i = 0; i < th->tablelength; i++) { + e = th->table[i]; + while (e != NULL) { + f = e; e = e->next; + if (((struct ip_flow *)f->v)->expire < tv.tv_sec) { + freekey(f->k); + th->entrycount--; + if (f->v != NULL) + free(f->v); + free(f); + flows_expired++; + } + } + } + for (i = 0; i < uh->tablelength; i++) { + e = uh->table[i]; + while (e != NULL) { + f = e; e = e->next; + if (((struct ip_flow *)f->v)->expire < tv.tv_sec) { + freekey(f->k); + uh->entrycount--; + if (f->v != NULL) + free(f->v); + free(f); + flows_expired++; + } + } + } + + pthread_mutex_unlock(&inQ.fq_mtx); + + syslog(LOG_WARNING, "expired %u flows", flows_expired); + } + + return (NULL); +} + /* * NOTE: The protocol list (plist) passed as an argument is a global * variable. It is accessed from 3 functions: classify_pthread, @@ -840,12 +997,20 @@ static int read_config(const char *file, struct ic_protocols *plist) { + enum { bufsize = 2048 }; struct protocol *proto; properties props; - const char *errmsg, *name, *value; - int fd; + const char *errmsg, *name; + char *value; + int fd, fdpf; uint16_t rule; + char **ap, *argv[bufsize]; + fdpf = open("/dev/pf", O_RDONLY); + if (fdpf == -1) { + syslog(LOG_ERR, "unable to open /dev/pf"); + return (EX_OSERR); + } fd = open(file, O_RDONLY); if (fd == -1) { syslog(LOG_ERR, "unable to open configuration file"); @@ -863,10 +1028,48 @@ /* Do not match traffic against this pattern */ if (value == NULL) continue; - rule = strtonum(value, 1, 65535, &errmsg); - if (rule == 0) { + for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;) + if (**ap != '\0') + if (++ap >= &argv[bufsize]) + break; + if (!strncmp(argv[0], "queue", strlen("queue"))) { + if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) { + syslog(LOG_WARNING, + "could not get ALTQ translation for" + " queue %s", argv[1]); + continue; + } + if (rule == 0) { + syslog(LOG_WARNING, + "queue %s does not exists!", argv[1]); + continue; + } + } else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue"))) + rule = strtonum(argv[1], 1, 65535, &errmsg); + else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe"))) + rule = strtonum(argv[1], 1, 65535, &errmsg); + else if (!strncmp(argv[0], "tag", strlen("tag"))) { + if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) { + syslog(LOG_WARNING, + "could not get tag translation for" + " queue %s", argv[1]); + continue; + } + if (rule == 0) { + syslog(LOG_WARNING, + "tag %s does not exists!", argv[1]); + continue; + } + } else if (!strncmp(argv[0], "action", strlen("action"))) { + if (strncmp(argv[1], "block", strlen("block"))) + rule = PF_DROP; + else if (strncmp(argv[1], "allow", strlen("allow"))) + rule = PF_PASS; + else + continue; + } else { syslog(LOG_WARNING, - "invalid rule number for %s protocol: %s", + "invalid action specified for %s protocol: %s", proto->p_name, errmsg); continue; } @@ -953,10 +1156,14 @@ static void usage(const char *arg0) { - printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n", basename(arg0)); + printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] " + "[-p port] [-P dir] [-q length]\n", basename(arg0)); printf("usage: %s -t -P dir\n", basename(arg0)); printf( " -c file : path to configuration file\n" + " -e secs : number of seconds before a flow is expired\n" " -h : this help screen\n" + " -n packets: number of packets before the garbage collector" + " tries to expire flows\n" " -P dir : directory containing protocol patterns\n" " -p port : port number of divert socket\n" " -q length : max length (in packets) of in/out queues\n" -- Ermal
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?9a542da30808081626y7ce1fe58k483494a6cdbfae60>