Date: Tue, 19 Aug 2008 23:26:05 -0300 From: =?ISO-8859-1?Q?Daniel_Dias_Gon=E7alves?= <ddg@yan.com.br> To: =?ISO-8859-1?Q?Ermal_Lu=E7i?= <ermal.luci@gmail.com> Cc: Mike Makonnen <mtm@wubethiopia.com>, freebsd-net@freebsd.org Subject: Re: Application layer classifier for ipfw Message-ID: <48AB80BD.0@yan.com.br> In-Reply-To: <9a542da30808081626y7ce1fe58k483494a6cdbfae60@mail.gmail.com> References: <48918DB5.7020201@wubethiopia.com> <4891CD13.20600@freebsdbrasil.com.br> <48922E9D.1020507@elischer.org> <4893328C.2040105@freebsdbrasil.com.br> <4894447C.3000800@wubethiopia.com> <48945A79.50300@wubethiopia.com> <9a542da30808081626y7ce1fe58k483494a6cdbfae60@mail.gmail.com>
next in thread | previous in thread | raw e-mail | index | archive | help
Ermal Luçi escreveu: > On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <mtm@wubethiopia.com> wrote: > >> Mike Makonnen wrote: >> >>> Patrick Tracanelli wrote: >>> >>>> To let you know of my current (real world) tests: >>>> >>>> - Wireless Internet Provider 1: >>>> - 4Mbit/s of Internet Traffic >>>> - Classifying default protocols + soulseek + ssh >>>> - Classifying 100Mbit/s of dump over ssh >>>> >>>> Results in: >>>> No latency added, very low CPU usage, no packets dropping. >>>> >>>> - Wireless ISP 2: >>>> - 21 Mbit/s of Internet Traffic >>>> - Classifying default protocols + soulseek + ssh >>>> >>>> Results in: >>>> No tcp or udp traffic at all; everything that gets diverted never >>>> comes out of the divert socket, and ipfw-classifyd logs >>>> >>>> Aug 1 12:07:35 ourofino last message repeated 58 times >>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent >>>> (rule 50000) >>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule >>>> 50000) >>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule >>>> 50000) >>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule >>>> 1000) >>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule >>>> 50000) >>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh (rule >>>> 50000) >>>> Aug 1 12:18:28 ourofino ipfw-classifyd: unable to write to divert >>>> socket: Operation not permitted >>>> Aug 1 12:18:50 ourofino last message repeated 90 times >>>> >>> Hmmm... this part means that the call to sendto(2) to write the packet >>> back into network stack failed. This explains why you are not seein g any >>> traffic comming back out of the divert socket, but I don't see why it would >>> suddenly fail with a permission error. Could this be a kernel bug? >>> >>>> Aug 1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full >>>> Aug 1 12:19:11 ourofino last message repeated 94 times >>>> >>>> Raised queue len a lot (up to 40960), when the application starts it uses >>>> up to 25% CPU and a second after that, CPU usage gets lower the 0.1%. >>>> >>> This looks like a deadlock. If it weren't able to process packets fast >>> enough the cpu usage should be high even as it's spewing "packet dropped" >>> messages. Can you send me some more information like memory usage and the >>> firewall script you are using? How much of the 21Mbits/s of traffic is P2P? >>> If you reduce the number of protocols you are trying to match against does >>> the behavior change? Using netstat -w1 -I<interface> can you tell me how >>> many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also, >>> the timestamps from the log file seem to show that the daemon is running for >>> approx. 34 sec. before the first "unable to write to write to divert socket" >>> message. Is it passing traffic during this time? Thanks. >>> >>> I've uploaded a newer version. Can you try that also please. It includes: >>> o SIGHUP forces it to re-read its configuration file >>> o rc.d script >>> o minor optimization (calls pthread_cond_signal with the mutex unlocked) >>> o code cleanup >>> >>> Also, for your convenience I have attached a patch against the earlier >>> version that removes a debugging printf that should remove spammage to your >>> log files (the current version has it removed already). >>> >>> >> Ooops, a few minutes after I sent this email I found a couple of bugs (one >> major, and one minor). They were in the original tarball as well as the >> newer one I uploaded earlier today. I've uploaded a fixed version of the >> code. Can you try that instead please. >> >> Also, to help track down performance issues I've modified the Makefile to >> build a profiled version of the application so you can use gprof(1) to >> figure out where any problems lie. >> >> > > Does this sound about right for implementing the GC and implementing syntax as > $protocol = dnpipe 20 > $protocol2 = dnqueue 30 > it has some extra goos for pf(4) and altq(4) > $protocol3 = queue $queue name > $protocol4 = tag TAGNAME > $protocol5 = action block > > It adds 2 new options -e seconds for seconds before a flow is > considered expired and -n #packets proccessed before kicking the GC. > > --- classifyd_old.c 2008-08-09 00:33:04.000000000 +0000 > +++ classifyd.c 2008-08-09 00:33:34.000000000 +0000 > @@ -28,13 +28,17 @@ > > #include <sys/types.h> > #include <sys/socket.h> > +#include <sys/ioctl.h> > +#include <sys/time.h> > > +#include <net/if.h> > #include <arpa/inet.h> > #include <netinet/in.h> > #include <netinet/in_systm.h> > #include <netinet/ip.h> > #include <netinet/tcp.h> > #include <netinet/udp.h> > +#include <net/pfvar.h> > > #include <assert.h> > #include <err.h> > @@ -53,6 +57,7 @@ > #include <unistd.h> > > #include "hashtable.h" > +#include "hashtable_private.h" > #include "pathnames.h" > #include "protocols.h" > > @@ -94,6 +99,7 @@ > uint32_t if_datalen; /* length in bytes of if_data */ > uint16_t if_pktcount; /* number of packets concatenated */ > uint16_t if_fwrule; /* ipfw(4) rule associated with flow */ > + time_t expire; /* flow expire time */ > }; > > /* > @@ -126,7 +132,7 @@ > static struct ic_queue outQ; > > /* divert(4) socket */ > -static int dvtS; > +static int dvtS = 0; > > /* config file path */ > static const char *conf = IC_CONFIG_PATH; > @@ -137,12 +143,25 @@ > /* List of protocols available to the system */ > struct ic_protocols *fp; > > +/* Our hashtables */ > +struct hashtable *sh = NULL, > + *th = NULL, > + *uh = NULL; > + > +/* signaled to kick garbage collector */ > +static pthread_cond_t gq_condvar; > + > +/* number of packets before kicking garbage collector */ > +static unsigned int npackets = 250; > + > +static time_t time_expire = 40; /* 40 seconds */ > /* > * Forward function declarations. > */ > void *classify_pthread(void *); > void *read_pthread(void *); > void *write_pthread(void *); > +void *garbage_pthread(void *); > static int equalkeys(void *, void *); > static unsigned int hashfromkey(void *); > static void test_re(void); > @@ -155,7 +174,7 @@ > { > struct sockaddr_in addr; > struct sigaction sa; > - pthread_t classifytd, readtd, writetd; > + pthread_t classifytd, readtd, writetd, garbagectd; > const char *errstr; > long long num; > uint16_t port, qmaxsz; > @@ -164,13 +183,27 @@ > tflag = 0; > port = IC_DPORT; > qmaxsz = IC_QMAXSZ; > - while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) { > + while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) { > switch(ch) { > case 'c': > conf = strdup(optarg); > if (conf == NULL) > err(EX_TEMPFAIL, "config file path"); > break; > + case 'e': > + num = strtonum((const char *)optarg, 1, 400, &errstr); > + if (num == 0 && errstr != NULL) { > + errx(EX_USAGE, "invalud expire seconds: %s", errstr); > + } > + time_expire = (time_t)num; > + break; > + case 'n': > + num = strtonum((const char *)optarg, 1, > 65535, &errstr); > + if (num == 0 && errstr != NULL) { > + errx(EX_USAGE, "invalud expire > seconds: %s", errstr); > + } > + npackets = (unsigned int)num; > + break; > case 'P': > protoDir = strdup(optarg); > if (protoDir == NULL) > @@ -230,6 +263,9 @@ > error = pthread_cond_init(&outQ.fq_condvar, NULL); > if (error != 0) > err(EX_OSERR, "unable to initialize output queue condvar"); > + error = pthread_cond_init(&gq_condvar, NULL); > + if (error != 0) > + err(EX_OSERR, "unable to initialize garbage collector > condvar"); > > /* > * Create and bind the divert(4) socket. > @@ -276,32 +312,80 @@ > if (error == -1) > err(EX_OSERR, "unable to set signal handler"); > > + /* > + * There are 3 tables: udp, tcp, and tcp syn. > + * The tcp syn table tracks connections for which a > + * SYN packet has been sent but no reply has been returned > + * yet. Once the SYN ACK reply is detected it is moved to > + * the regular tcp connection tracking table. > + */ > + sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); > + if (sh == NULL) { > + syslog(LOG_ERR, "unable to create TCP (SYN) tracking table"); > + error = EX_SOFTWARE; > + goto cleanup; > + } > + th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); > + if (th == NULL) { > + syslog(LOG_ERR, "unable to create TCP tracking table"); > + error = EX_SOFTWARE; > + goto cleanup; > + } > + uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); > + if (uh == NULL) { > + syslog(LOG_ERR, "unable to create UDP tracking table"); > + error = EX_SOFTWARE; > + goto cleanup; > + } > + > /* > * Create the various threads. > */ > error = pthread_create(&readtd, NULL, read_pthread, NULL); > - if (error != 0) > - err(EX_OSERR, "unable to create reader thread"); > + if (error != 0) { > + syslog(LOG_ERR, "unable to create reader thread"); > + error = EX_OSERR; > + goto cleanup; > + } > error = pthread_create(&classifytd, NULL, classify_pthread, NULL); > - if (error != 0) > - err(EX_OSERR, "unable to create classifier thread"); > + if (error != 0) { > + syslog(LOG_ERR, "unable to create classifier thread"); > + error = EX_OSERR; > + goto cleanup; > + } > error = pthread_create(&writetd, NULL, write_pthread, NULL); > - if (error != 0) > - err(EX_OSERR, "unable to create writer thread"); > - > + if (error != 0) { > + syslog(LOG_ERR, "unable to create writer thread"); > + error = EX_OSERR; > + goto cleanup; > + } > + error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL); > + if (error != 0) { > + syslog(LOG_ERR, "unable to create garbage collect thread"); > + error = EX_OSERR; > + goto cleanup; > + } > /* > * Wait for our threads to exit. > */ > pthread_join(readtd, NULL); > pthread_join(classifytd, NULL); > pthread_join(writetd, NULL); > - > + pthread_join(garbagectd, NULL); > /* > * Cleanup > */ > - close(dvtS); > +cleanup: > + if (dvtS > 0) > + close(dvtS); > + if (sh != NULL) > + hashtable_destroy(sh, 1); > + if (th != NULL) > + hashtable_destroy(th, 1); > + if (uh != NULL) > + hashtable_destroy(uh, 1); > > - return (0); > + return (error); > } > > void * > @@ -310,6 +394,7 @@ > struct ic_pkt *pkt; > struct ip *ipp; > int len; > + unsigned int pcktcnt = 0; > > while (1) { > pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt)); > @@ -353,6 +438,10 @@ > STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link); > inQ.fq_size++; > pthread_mutex_unlock(&inQ.fq_mtx); > + if (++pcktcnt > npackets) { > + pcktcnt = 0; > + pthread_cond_signal(&gq_condvar); > + } > pthread_cond_signal(&inQ.fq_condvar); > } > > @@ -420,39 +509,19 @@ > struct tcphdr *tcp; > struct udphdr *udp; > struct ic_pkt *pkt; > - struct hashtable *sh, *th, *uh; > struct protocol *proto; > + struct timeval tv; > regmatch_t pmatch; > u_char *data, *payload; > uint16_t trycount; > int datalen, error; > > - /* > - * There are 3 tables: udp, tcp, and tcp syn. > - * The tcp syn table tracks connections for which a > - * SYN packet has been sent but no reply has been returned > - * yet. Once the SYN ACK reply is detected it is moved to > - * the regular tcp connection tracking table. > - */ > - sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); > - if (sh == NULL) { > - syslog(LOG_ERR, "unable to create TCP (SYN) tracking table"); > - exit(EX_SOFTWARE); > - } > - th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); > - if (th == NULL) { > - syslog(LOG_ERR, "unable to create TCP tracking table"); > - exit(EX_SOFTWARE); > - } > - uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys); > - if (uh == NULL) { > - syslog(LOG_ERR, "unable to create UDP tracking table"); > - exit(EX_SOFTWARE); > - } > - > flow = NULL; > key = NULL; > while(1) { > + while(gettimeofday(&tv, NULL) != 0) > + ; > + > pthread_mutex_lock(&inQ.fq_mtx); > pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link); > while (pkt == NULL) { > @@ -528,6 +597,8 @@ > free(pkt); > continue; > } > + > + flow->expire = tv.tv_sec; > goto enqueue; > /* > * Handle session tear-down. > @@ -583,8 +654,11 @@ > * collecting IC_PKTMAXMATCH packets, just pass it through. > */ > } else if (flow->if_pktcount >= IC_PKTMAXMATCH && > - flow->if_fwrule == 0) > + flow->if_fwrule == 0) { > + flow->expire = tv.tv_sec; > goto enqueue; > + } > + flow->expire = tv.tv_sec; > goto classify; > } > > @@ -630,6 +704,7 @@ > free(pkt); > continue; > } > + flow->expire = tv.tv_sec; > goto classify; > } > > @@ -688,6 +763,7 @@ > flow->if_datalen = datalen; > flow->if_pktcount = 1; > flow->if_fwrule = 0; > + flow->expire = tv.tv_sec; > if (hashtable_insert(uh, (void *)key, (void *)flow) == 0) { > syslog(LOG_WARNING, > "packet dropped: unable to insert into table"); > @@ -715,19 +791,26 @@ > flow->if_data = data; > flow->if_datalen += datalen; > flow->if_pktcount++; > + flow->expire = tv.tv_sec; > /* > * If we haven't been able to classify this flow after > * collecting IC_PKTMAXMATCH packets, just pass it through. > */ > } else if (flow->if_pktcount >= IC_PKTMAXMATCH && > - flow->if_fwrule == 0) > + flow->if_fwrule == 0) { > + flow->expire = tv.tv_sec; > goto enqueue; > + } > } else > /* Not an TCP or UDP packet. */ > goto enqueue; > > classify: > - assert(flow != NULL); > + if (flow == NULL) { > + syslog(LOG_ERR, "flow is null argghhhhhhh"); > + goto enqueue; > + } > + //assert(flow != NULL); > > /* > * Inform divert(4) what rule to send it to by > @@ -823,6 +906,80 @@ > return (NULL); > } > > +void * > +garbage_pthread(void *arg __unused) > +{ > + char errbuf[LINE_MAX]; > + struct entry *e, *f; > + unsigned int i, flows_expired, error; > + struct timeval tv; > + > + while (1) { > + flows_expired = 0; > + while (gettimeofday(&tv, NULL) != 0) > + ; > + tv.tv_sec -= time_expire; > + > + pthread_mutex_lock(&inQ.fq_mtx); > + error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx); > + if (error != 0) { > + strerror_r(error, errbuf, sizeof(errbuf)); > + syslog(EX_OSERR, "unable to wait on garbage > collection: %s", > + errbuf); > + exit(EX_OSERR); > + } > + > + for (i = 0; i < sh->tablelength; i++) { > + e = sh->table[i]; > + while (e != NULL) { > + f = e; e = e->next; > + if (((struct ip_flow *)f->v)->expire < tv.tv_sec) { > + freekey(f->k); > + sh->entrycount--; > + if (f->v != NULL) > + free(f->v); > + free(f); > + flows_expired++; > + } > + } > + } > + for (i = 0; i < th->tablelength; i++) { > + e = th->table[i]; > + while (e != NULL) { > + f = e; e = e->next; > + if (((struct ip_flow *)f->v)->expire > < tv.tv_sec) { > + freekey(f->k); > + th->entrycount--; > + if (f->v != NULL) > + free(f->v); > + free(f); > + flows_expired++; > + } > + } > + } > + for (i = 0; i < uh->tablelength; i++) { > + e = uh->table[i]; > + while (e != NULL) { > + f = e; e = e->next; > + if (((struct ip_flow *)f->v)->expire > < tv.tv_sec) { > + freekey(f->k); > + uh->entrycount--; > + if (f->v != NULL) > + free(f->v); > + free(f); > + flows_expired++; > + } > + } > + } > + > + pthread_mutex_unlock(&inQ.fq_mtx); > + > + syslog(LOG_WARNING, "expired %u flows", flows_expired); > + } > + > + return (NULL); > +} > + > /* > * NOTE: The protocol list (plist) passed as an argument is a global > * variable. It is accessed from 3 functions: classify_pthread, > @@ -840,12 +997,20 @@ > static int > read_config(const char *file, struct ic_protocols *plist) > { > + enum { bufsize = 2048 }; > struct protocol *proto; > properties props; > - const char *errmsg, *name, *value; > - int fd; > + const char *errmsg, *name; > + char *value; > + int fd, fdpf; > uint16_t rule; > + char **ap, *argv[bufsize]; > > + fdpf = open("/dev/pf", O_RDONLY); > + if (fdpf == -1) { > + syslog(LOG_ERR, "unable to open /dev/pf"); > + return (EX_OSERR); > + } > fd = open(file, O_RDONLY); > if (fd == -1) { > syslog(LOG_ERR, "unable to open configuration file"); > @@ -863,10 +1028,48 @@ > /* Do not match traffic against this pattern */ > if (value == NULL) > continue; > - rule = strtonum(value, 1, 65535, &errmsg); > - if (rule == 0) { > + for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;) > + if (**ap != '\0') > + if (++ap >= &argv[bufsize]) > + break; > + if (!strncmp(argv[0], "queue", strlen("queue"))) { > + if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) { > + syslog(LOG_WARNING, > + "could not get ALTQ translation for" > + " queue %s", argv[1]); > + continue; > + } > + if (rule == 0) { > + syslog(LOG_WARNING, > + "queue %s does not exists!", argv[1]); > + continue; > + } > + } else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue"))) > + rule = strtonum(argv[1], 1, 65535, &errmsg); > + else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe"))) > + rule = strtonum(argv[1], 1, 65535, &errmsg); > + else if (!strncmp(argv[0], "tag", strlen("tag"))) { > + if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) { > + syslog(LOG_WARNING, > + "could not get tag translation for" > + " queue %s", argv[1]); > + continue; > + } > + if (rule == 0) { > + syslog(LOG_WARNING, > + "tag %s does not exists!", argv[1]); > + continue; > + } > + } else if (!strncmp(argv[0], "action", strlen("action"))) { > + if (strncmp(argv[1], "block", strlen("block"))) > + rule = PF_DROP; > + else if (strncmp(argv[1], "allow", strlen("allow"))) > + rule = PF_PASS; > + else > + continue; > + } else { > syslog(LOG_WARNING, > - "invalid rule number for %s protocol: %s", > + "invalid action specified for %s protocol: %s", > proto->p_name, errmsg); > continue; > } > @@ -953,10 +1156,14 @@ > static void > usage(const char *arg0) > { > - printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n", > basename(arg0)); > + printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] " > + "[-p port] [-P dir] [-q length]\n", basename(arg0)); > printf("usage: %s -t -P dir\n", basename(arg0)); > printf( " -c file : path to configuration file\n" > + " -e secs : number of seconds before a flow is expired\n" > " -h : this help screen\n" > + " -n packets: number of packets before the garbage collector" > + " tries to expire flows\n" > " -P dir : directory containing protocol patterns\n" > " -p port : port number of divert socket\n" > " -q length : max length (in packets) of in/out queues\n" > > Some progress in solution of presented problems ? -- Daniel
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?48AB80BD.0>