Date: Mon, 19 Aug 2013 20:39:15 GMT From: ambarisha@FreeBSD.org To: svn-soc-all@FreeBSD.org Subject: socsvn commit: r256164 - in soc2013/ambarisha/head/usr.bin: dmget dms Message-ID: <201308192039.r7JKdFjV078331@socsvn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: ambarisha Date: Mon Aug 19 20:39:15 2013 New Revision: 256164 URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256164 Log: Duplicate request handing. When it receives a request, DMS downloads the requested file into a temporary file. It then uses this file to serve both the original request that started this download and any duplicate requests received in the mean time. Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c soc2013/ambarisha/head/usr.bin/dms/Makefile soc2013/ambarisha/head/usr.bin/dms/dms.c soc2013/ambarisha/head/usr.bin/dms/dms.h soc2013/ambarisha/head/usr.bin/dms/utils.c soc2013/ambarisha/head/usr.bin/dms/worker.c Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dmget/utils.c Mon Aug 19 20:31:57 2013 (r256163) +++ soc2013/ambarisha/head/usr.bin/dmget/utils.c Mon Aug 19 20:39:15 2013 (r256164) @@ -185,18 +185,27 @@ { int bufsize = 0; int err; - struct dmmsg *msg; + struct dmmsg *msg = (struct dmmsg *) Malloc(sizeof(struct dmmsg)); err = Read(sock, &bufsize, sizeof(bufsize)); if (err == 0) { /* set dms_error */ +#if DEBUG + fprintf(stderr, "recv_msg: remote end closed connection\n"); +#endif + free(msg); return (NULL); } + bufsize -= sizeof(bufsize); err = Read(sock, &(msg->op), sizeof(msg->op)); if (err == 0) { /* set dms_error */ +#if DEBUG + fprintf(stderr, "recv_msg: remote end closed connection\n"); +#endif + free(msg); return (NULL); } bufsize -= sizeof(msg->op); @@ -209,6 +218,10 @@ free(msg->buf); msg->len = 0; /* set dms_error */ +#if DEBUG + fprintf(stderr, "recv_msg: remote end closed connection\n"); +#endif + free(msg); return (NULL); } @@ -218,6 +231,7 @@ void free_msg(struct dmmsg **msg) { + free((*msg)->buf); free(*msg); *msg = NULL; Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:31:57 2013 (r256163) +++ soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:39:15 2013 (r256164) @@ -11,5 +11,6 @@ DPADD= ${LIBFETCH} LDADD= -lfetch .endif +CFLAGS+= -g .include <bsd.prog.mk> Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:31:57 2013 (r256163) +++ soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:39:15 2013 (r256164) @@ -200,6 +200,7 @@ dmjob = mk_dmjob(dmreq, csock); jobs = add_job(jobs, dmjob); pthread_create(&(dmjob->worker), NULL, run_worker, dmjob); + pthread_detach(dmjob->worker); break; default: goto error; @@ -211,7 +212,7 @@ error: ret = -1; done: - free_msg(msg); + free_msg(&msg); return ret; } @@ -222,16 +223,15 @@ exit(1); // Temporary } -static int +static state_t service_job(struct dmjob *job, fd_set *fdset) { int ret = 0; - if (FD_ISSET(job->client, fdset)) - pthread_kill(job->worker, SIGUSR1); - - if (job->state == DONE) - ret = 1; - return (ret); + if (FD_ISSET(job->client, fdset)) { + /* TODO: Worker can't handle this signal yet */ + //pthread_kill(job->worker, SIGUSR1); + } + return (job->state); } static void @@ -248,6 +248,7 @@ /* Prepare fdset and make select call */ FD_ZERO(&fdset); + maxfd = socket; FD_SET(socket, &fdset); cur = jobs; @@ -257,37 +258,37 @@ maxfd = cur->client; cur = cur->next; } - + Select(maxfd + 1, &fdset, NULL, NULL, NULL); - if (FD_ISSET(socket, &fdset)) { - struct sockaddr_un cliaddr; - size_t cliaddrlen = sizeof(cliaddr); - int csock = Accept(socket, (struct sockaddr *) &cliaddr, - &cliaddrlen); - handle_request(csock); - } - cur = jobs; while (cur != NULL) { ret = service_job(cur, &fdset); - if (ret == 1) { + if (ret == DONE) { close(cur->client); - pthread_join(cur->worker, &retptr); jobs = rm_job(jobs, cur); } cur = cur->next; } - + + if (FD_ISSET(socket, &fdset)) { + struct sockaddr_un cliaddr; + size_t cliaddrlen = sizeof(cliaddr); + int csock = Accept(socket, (struct sockaddr *) &cliaddr, + &cliaddrlen); + handle_request(csock); + } } + /* Notify all running workers that we've to wrap up */ cur = jobs; - while (cur != NULL) { - close(cur->client); - ret = service_job(cur, &fdset); - /* TODO: Force the worker to quit as well */ + while (cur != NULL) { + if (cur->state == RUNNING) + pthread_kill(cur->worker, SIGINT); + + rm_dmreq(&(cur->request)); jobs = rm_job(jobs, cur); - cur = jobs; + cur = cur->next; } } Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:31:57 2013 (r256163) +++ soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:39:15 2013 (r256164) @@ -3,7 +3,7 @@ #include <sys/types.h> -typedef enum {RUNNING=0, DONE=1} state_t; +typedef enum {RUNNING=0, DONE, DUPLICATE} state_t; struct dmjob { int ofd; @@ -13,9 +13,10 @@ int sigalrm; int siginfo; int siginfo_en; + unsigned timeout; + pthread_t worker; struct dmreq *request; struct url *url; - pthread_t *worker; struct dmjob *next; struct dmjob *prev; Modified: soc2013/ambarisha/head/usr.bin/dms/utils.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/utils.c Mon Aug 19 20:31:57 2013 (r256163) +++ soc2013/ambarisha/head/usr.bin/dms/utils.c Mon Aug 19 20:39:15 2013 (r256164) @@ -183,30 +183,29 @@ struct dmmsg * recv_msg(int sock) { - printf("in recv_msg\n"); int bufsize = 0; int err; - struct dmmsg *msg; + struct dmmsg *msg = (struct dmmsg *) Malloc(sizeof(struct dmmsg)); err = Read(sock, &bufsize, sizeof(bufsize)); if (err == 0) { /* set dms_error */ #if DEBUG fprintf(stderr, "recv_msg: remote end closed connection\n"); #endif + free(msg); return (NULL); } - printf("bufsize = %d\n", bufsize); bufsize -= sizeof(bufsize); - printf("sock = %d\n", sock); err = Read(sock, &(msg->op), sizeof(msg->op)); if (err == 0) { /* set dms_error */ #if DEBUG fprintf(stderr, "recv_msg: remote end closed connection\n"); #endif + free(msg); return (NULL); } bufsize -= sizeof(msg->op); @@ -222,6 +221,7 @@ #if DEBUG fprintf(stderr, "recv_msg: remote end closed connection\n"); #endif + free(msg); return (NULL); } @@ -231,6 +231,7 @@ void free_msg(struct dmmsg **msg) { + free((*msg)->buf); free(*msg); *msg = NULL; Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:31:57 2013 (r256163) +++ soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:39:15 2013 (r256164) @@ -6,12 +6,17 @@ #include <string.h> #include <errno.h> #include <signal.h> +#include <fcntl.h> +#include <unistd.h> #include "dms.h" #include "dm.h" + extern struct dmjob *jobs; +#define TMP_EXT ".tmp" + static int authenticate(struct url *url) { @@ -53,6 +58,12 @@ free_msg(&rcvmsg); } +static int +compare_jobs(struct dmjob *j1, struct dmjob *j2) +{ + return strcmp(j1->request->URL, j2->request->URL); +} + static void stat_send(int csock, struct xferstat *xs, int force) { @@ -143,29 +154,14 @@ } static int -fetch(struct dmjob *dmjob) +mk_url(struct dmjob *dmjob, char *flags) { - struct url_stat us; - struct stat sb, nsb; - struct xferstat xs; - FILE *f, *of; - size_t size, readcnt, wr; - off_t count; - char flags[8]; - const char *slash; - char *tmppath; - int r; - unsigned timeout; - char *ptr; - char *buf; struct dmreq *dmreq = dmjob->request; + struct stat sb; + int r; - f = of = NULL; - tmppath = NULL; - - timeout = 0; - *flags = 0; - count = 0; + if (dmjob->url != NULL) + return 0; /* set verbosity level */ if (dmreq->v_level > 1) @@ -174,7 +170,6 @@ fetchDebug = 1; /* parse URL */ - dmjob->url = NULL; if (*dmreq->URL == '\0') { warnx("empty URL"); goto failure; @@ -212,7 +207,7 @@ strcat(flags, "d"); if (dmreq->flags & U_FLAG) strcat(flags, "l"); - timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout; + dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout; } /* HTTP specific flags */ @@ -222,7 +217,7 @@ strcat(flags, "d"); if ((dmreq->flags & A_FLAG)) strcat(flags, "A"); - timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout; + dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout; if (dmreq->flags & i_FLAG) { if (stat(dmreq->i_filename, &sb)) { warn("%s: stat()", dmreq->i_filename); @@ -234,27 +229,58 @@ } /* set the protocol timeout. */ - fetchTimeout = timeout; + fetchTimeout = dmjob->timeout; + goto success; - /* just print size */ - if (dmreq->flags & s_FLAG) { - // if (timeout) - // alarm(timeout); - r = fetchStat(dmjob->url, &us, flags); - if (timeout) - alarm(0); - if (dmjob->sigalrm || dmjob->sigint) - goto signal; - if (r == -1) { - warnx("%s", fetchLastErrString); - goto failure; - } - if (us.size == -1) - printf("Unknown\n"); - else - printf("%jd\n", (intmax_t)us.size); - goto success; - } +signal: + /* report that we were timedout/interrupted */ +failure: + free(dmjob->url->doc); + free(dmjob->url); + dmjob->url = NULL; +success: + return (r); +} + +static int +fetch(struct dmjob *dmjob, FILE *f, struct url_stat us) +{ + struct stat sb, nsb; + struct xferstat xs; + FILE *of; + size_t size, readcnt, wr; + off_t count; + char flags[8]; + const char *slash; + char *tmppath; + int r; + char *ptr; + char *buf; + struct dmreq *dmreq = dmjob->request; + + of = NULL; + tmppath = NULL; + + dmjob->timeout = 0; + *flags = 0; + count = 0; + + r = mk_url(dmjob, flags); + + /* Initialize signal flags */ + dmjob->sigint = 0; + dmjob->sigalrm = 0; + dmjob->siginfo = 0; + + /* Set timeout */ + if (strcmp(dmjob->url->scheme, SCHEME_FTP) == 0) + dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout; + else if (strcmp(dmjob->url->scheme, SCHEME_HTTP) == 0 || + strcmp(dmjob->url->scheme, SCHEME_HTTPS) == 0) + dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout; + + /* set the protocol timeout. */ + fetchTimeout = dmjob->timeout; /* * If the -r flag was specified, we have to compare the local @@ -289,24 +315,12 @@ } /* start the transfer */ - if (timeout) - alarm(timeout); - f = fetchXGet(dmjob->url, &us, flags); - if (timeout) - alarm(0); + if (dmjob->timeout) + alarm(dmjob->timeout); + if (dmjob->sigalrm || dmjob->sigint) goto signal; - if (f == NULL) { - warnx("%s: %s", dmreq->URL, fetchLastErrString); - if ((dmreq->flags & i_FLAG) && strcmp(dmjob->url->scheme, SCHEME_HTTP) == 0 - && fetchLastErrCode == FETCH_OK - && strcmp(fetchLastErrString, "Not Modified") == 0) { - /* HTTP Not Modified Response, return OK. */ - r = 0; - goto done; - } else - goto failure; - } + if (dmjob->sigint) goto signal; @@ -422,14 +436,11 @@ * from scratch if we want the whole file */ dmjob->url->offset = 0; - if ((f = fetchXGet(dmjob->url, &us, flags)) == NULL) { - warnx("%s: %s", dmreq->URL, fetchLastErrString); - goto failure; - } if (dmjob->sigint) goto signal; } + /* construct a temp file name */ if (sb.st_size != -1 && S_ISREG(sb.st_mode)) { if ((slash = strrchr(dmreq->path, '/')) == NULL) @@ -448,6 +459,7 @@ chmod(tmppath, sb.st_mode & ALLPERMS); } } + if (of == NULL) of = fdopen(dmjob->ofd, "w"); if (of == NULL) { @@ -485,10 +497,12 @@ if ((readcnt = fread(buf, 1, size, f)) < size) { if (ferror(f) && errno == EINTR && !dmjob->sigint) clearerr(f); - else if (readcnt == 0) + else if (readcnt == 0) { break; + } } + stat_update(&xs, count += readcnt, dmjob); for (ptr = buf; readcnt > 0; ptr += wr, readcnt -= wr) if ((wr = fwrite(ptr, 1, readcnt, of)) < readcnt) { @@ -500,6 +514,7 @@ if (readcnt != 0) break; } + if (!dmjob->sigalrm) dmjob->sigalrm = ferror(f) && errno == ETIMEDOUT; dmjob->siginfo_en = 0; @@ -589,9 +604,89 @@ fetchFreeURL(dmjob->url); if (tmppath != NULL) free(tmppath); + return (r); } +FILE * +dmXGet(struct dmjob *dmjob, struct url_stat *us) +{ + char flags[8]; + int ret; + struct dmjob tmpjob; + struct dmreq tmpreq; + struct dmreq *dmreq = dmjob->request; + + /* populate tmpjob */ + + /* TODO : Modify stat_* to udpate jobs of progress, + * right now we just put the msgs on stderr + * */ + tmpjob.client = STDERR_FILENO; + + tmpjob.request = &tmpreq; + tmpreq.v_level = dmreq->v_level; + tmpreq.ftp_timeout = dmjob->request->ftp_timeout; + tmpreq.http_timeout = dmjob->request->http_timeout; + tmpreq.B_size = dmjob->request->B_size; + tmpreq.S_size = dmjob->request->S_size; + tmpreq.T_secs = dmjob->request->T_secs; + tmpreq.flags = dmjob->request->flags; + tmpreq.family = dmjob->request->family; + + tmpreq.i_filename = (char *) Malloc(strlen(dmreq->i_filename)); + strcpy(tmpreq.i_filename, dmreq->i_filename); + + tmpreq.URL = (char *) Malloc(strlen(dmreq->URL)); + strcpy(tmpreq.URL, dmreq->URL); + + tmpjob.url = NULL; + ret = mk_url(&tmpjob, flags); + if (ret <= 0) { + + } + + /* special case : -s flag + if (tmpreq.flags & s_FLAG) { + if (dmjob->timeout) + alarm(dmjob->timeout); + r = fetchStat(dmjob->url, us, flags); + if (dmjob->timeout) + alarm(0); + if (dmjob->sigalrm || dmjob->sigint) + goto signal; + if (r == -1) { + warnx("%s", fetchLastErrString); + goto failure; + } + if (us->size == -1) + printf("Unknown\n"); + else + printf("%jd\n", (intmax_t)us->size); + goto success; + } + */ + tmpreq.path = (char *) Malloc(strlen(dmreq->path) + strlen(TMP_EXT)); + strcpy(tmpreq.path, dmreq->path); + strcat(tmpreq.path, TMP_EXT); + + tmpjob.ofd = open(tmpreq.path, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR); + + FILE *f = fetchXGet(tmpjob.url, us, flags); + fetch(&tmpjob, f, *us); + fclose(f); + + f = fopen(tmpreq.path, "r"); + + free(tmpjob.url->doc); + free(tmpjob.url); + free(tmpreq.i_filename); + free(tmpreq.URL); + free(tmpreq.path); + + return f; +} + static void send_report(int sock, struct dmrep report, char op) { @@ -621,6 +716,9 @@ free(buf); } +/* TODO: This handler isn't registered as SIGUSR1 interrupts the download + * Figure out a proper way to handle this + * */ void sig_handler(int sig) { @@ -630,7 +728,7 @@ pthread_t tid = pthread_self(); if (sig == SIGUSR1) { while (tmp != NULL) { - if (pthread_equal(tid, *(tmp->worker)) != 0) + if (pthread_equal(tid, tmp->worker) != 0) break; tmp = tmp->next; } @@ -641,8 +739,9 @@ tmp->sigint = 1; else if (*clisig == SIGINFO) tmp->siginfo = 1; - else if (*clisig == SIGALRM) - tmp->siginfo = 1; + else if (*clisig == SIGALRM) { + tmp->sigalrm = 1; + } } } @@ -650,13 +749,61 @@ run_worker(struct dmjob *dmjob) { struct dmrep report; + struct dmjob *tmp; + struct url_stat us; + int err; + FILE *f; + char *tmppath; + char flags[8]; + + /* check if this is a duplicate */ + mk_url(dmjob, flags); + tmp = jobs; + while (tmp != NULL) { + if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) { + dmjob->state = DUPLICATE; + dmjob->worker = tmp->worker; + return NULL; + } + tmp = tmp->next; + } + + /* fetch the remote file into a local tmp file */ + f = dmXGet(dmjob, &us); + + /* Serve any outstanding requests from the local tmp file */ + tmp = jobs; + while (tmp != NULL) { + if (compare_jobs(tmp, dmjob) != 0) { + tmp = tmp->next; + continue; + } - int err = fetch(dmjob); - report.status = err; - report.errcode = fetchLastErrCode; - report.errstr = fetchLastErrString; - send_report(dmjob->client, report, DMRESP); - dmjob->state = RUNNING; + if (f == NULL) { + err = -1; + } else { + fseek(f, 0, SEEK_SET); + err = fetch(dmjob, f, us); + } + + report.status = err; + report.errcode = fetchLastErrCode; + report.errstr = fetchLastErrString; + send_report(dmjob->client, report, DMRESP); + + tmp->state = DONE; + tmp = tmp->next; + } + /* remove the local tmp file */ + if (f != NULL) { + tmppath = (char *) Malloc(strlen(dmjob->request->path) + strlen(TMP_EXT)); + strcpy(tmppath, dmjob->request->path); + strcat(tmppath, TMP_EXT); - return NULL; + remove(tmppath); + free(tmppath); + } + + /* TODO : What is this? Yew!! */ + sleep(10); }
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201308192039.r7JKdFjV078331>
