Date: Thu, 26 Sep 2013 13:03:30 GMT From: ambarisha@FreeBSD.org To: svn-soc-all@FreeBSD.org Subject: socsvn commit: r257723 - in soc2013/ambarisha/head/usr.bin: dmget dms Message-ID: <201309261303.r8QD3UDO066083@socsvn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: ambarisha Date: Thu Sep 26 13:03:30 2013 New Revision: 257723 URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=257723 Log: Fixed some bugs in job migration and status dump Modified: soc2013/ambarisha/head/usr.bin/dmget/dmget.c soc2013/ambarisha/head/usr.bin/dmget/utils.c soc2013/ambarisha/head/usr.bin/dms/dm.h soc2013/ambarisha/head/usr.bin/dms/dms.c soc2013/ambarisha/head/usr.bin/dms/dms.h soc2013/ambarisha/head/usr.bin/dms/mirror.c soc2013/ambarisha/head/usr.bin/dms/utils.c soc2013/ambarisha/head/usr.bin/dms/utils.h soc2013/ambarisha/head/usr.bin/dms/worker.c Modified: soc2013/ambarisha/head/usr.bin/dmget/dmget.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dmget/dmget.c Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dmget/dmget.c Thu Sep 26 13:03:30 2013 (r257723) @@ -281,7 +281,6 @@ ret = sigsafe_write(sock, reqbuf, bufsize); free(reqbuf); - if (ret == -1) return -1; @@ -406,9 +405,8 @@ while (!sigint) { struct dmmsg *msg; msg = recv_dmmsg(sock); - if (msg == NULL) { + if (msg == NULL) goto failure; - } if (sigint) { send_signal(sock, SIGINT); Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dmget/utils.c Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dmget/utils.c Thu Sep 26 13:03:30 2013 (r257723) @@ -13,10 +13,12 @@ int send_dmmsg(int socket, struct dmmsg msg) { + printf("IN DMMSG\n"); int bufsize = sizeof(bufsize); // Buffer size bufsize += 1; // Op bufsize += msg.len; // Signal number + printf("About to send message\n"); char *sndbuf = (char *) malloc(bufsize); if (sndbuf == NULL) { fprintf(stderr, "send_dmmsg: malloc: insufficient memory\n"); @@ -30,10 +32,14 @@ *(sndbuf + i) = msg.op; i++; - memcpy(sndbuf + i, msg.buf, msg.len); - i += msg.len; + if (msg.len != 0) { + memcpy(sndbuf + i, msg.buf, msg.len); + i += msg.len; + } int nbytes = write(socket, sndbuf, bufsize); + perror("send_dmmsg write"); + printf("%d bytes sent\n", nbytes); free(sndbuf); if (nbytes == -1) { @@ -79,6 +85,13 @@ } bufsize -= sizeof(msg->op); + + /* This is to accommodate for 0 length messages */ + if (bufsize == 0) { + msg->len = 0; + msg->buf = NULL; + return msg; + } msg->buf = (char *) malloc(bufsize); if (msg == NULL) { Modified: soc2013/ambarisha/head/usr.bin/dms/dm.h ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dm.h Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dms/dm.h Thu Sep 26 13:03:30 2013 (r257723) @@ -18,6 +18,8 @@ #define MAX_CHKSUM_LEN SHA_DIGEST_LENGTH /* TODO: Any better alternative? */ +typedef enum { RUNNING = 0, DONE, DUPLICATE } state_t; + struct dmres { int status; int errcode; @@ -68,6 +70,15 @@ char *buf; }; +struct dmsumm { + char name[64]; + char mirror[64]; + state_t state; + off_t size; + off_t rcvd; + long eta; +}; + struct xferstat { char name[64]; struct timeval start; /* start of transfer */ @@ -85,5 +96,7 @@ #define DMAUTHRESP 4 #define DMSIG 5 #define DMSTAT 6 +#define DMDUMPREQ 7 +#define DMDUMPRESP 8 #endif /* _DMCLIENT_H */ Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 13:03:30 2013 (r257723) @@ -17,12 +17,14 @@ #include "utils.h" #include "mirror.h" -static int dm_err; -static char dm_errstr[512]; +#define MAX_SUMMS 32 -volatile sig_atomic_t stop; -struct dmjob *jobs; -pthread_mutex_t job_queue_mutex; +static int dm_err; +static char dm_errstr[512]; + +volatile sig_atomic_t stop; +struct dmjob *jobs; +pthread_mutex_t job_queue_mutex; extern struct dmmirr *mirrors; extern pthread_mutex_t mirror_list_mutex; @@ -103,7 +105,7 @@ new->next = head; } -static struct dmjob * +struct dmjob * rm_job(struct dmjob *head, struct dmjob *job) { if (job->next != NULL) @@ -123,6 +125,7 @@ { int ret; struct dmmirr *cur; + struct dmjob *dmjob = (struct dmjob *) malloc(sizeof(struct dmjob)); if (dmjob == NULL) { fprintf(stderr, "mk_dmjob: malloc: insufficient memory\n"); @@ -241,7 +244,7 @@ return dmreq; } -static void +void rm_dmreq(struct dmreq **dmreq) { if (*dmreq == NULL) @@ -263,6 +266,50 @@ free((*dmjob)->url); } +static void +send_job_summaries(int sock) +{ + struct dmsumm summs[MAX_SUMMS]; + int i, ret; + struct dmmsg dmmsg; + struct dmjob *tmp = jobs; + + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "send_job_summaries: Attempt to acquire" + " job queue mutex failed\n"); + return; + } + + for (i = 0; i < MAX_SUMMS; i++) { + void *temp = tmp->url->doc; + strncpy(summs[i].name, tmp->url->doc, sizeof(summs[i].name)); + strncpy(summs[i].mirror, tmp->mirror->name, + sizeof(summs[i].name)); + + summs[i].state = tmp->state; + summs[i].size = tmp->oldstat.size; + summs[i].rcvd = tmp->oldstat.rcvd; + summs[i].eta = get_eta(&(tmp->oldstat)); + } + + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "send_job_summaries: Couldn't release " + "job queue lock\n"); + return; + } + /* Job queue lock released */ + + dmmsg.op = DMDUMPRESP; + dmmsg.len = i * sizeof(struct dmsumm); + dmmsg.buf = (char *)summs; + + send_dmmsg(sock, dmmsg); + return; +} + static int handle_request(int csock) { @@ -273,6 +320,7 @@ int ret; pid_t pid; + msg = recv_dmmsg(csock); if (msg == NULL) { report.status = -1; @@ -311,6 +359,9 @@ pthread_create(&(dmjob->worker), NULL, run_worker, dmjob); pthread_detach(dmjob->worker); goto done; + case DMDUMPREQ: + send_job_summaries(csock); + goto done; default: free_dmmsg(&msg); goto error; @@ -326,6 +377,7 @@ rm_dmjob(&dmjob); ret = -1; done: + free_dmmsg(&msg); return ret; } @@ -376,7 +428,6 @@ "select: %s\n", strerror(errno)); goto wrap_up; } - handle_request(csock); } @@ -414,6 +465,7 @@ int main(int argc, char **argv) { int sock, err; + sock = socket(AF_UNIX, SOCK_STREAM, 0); if (sock == -1) { fprintf(stderr, "main: socket: %s\n", strerror(errno)); Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 13:03:30 2013 (r257723) @@ -18,13 +18,7 @@ int siginfo_en; unsigned timeout; int preempted; - - enum { - RUNNING = 0, - DONE, - DUPLICATE - } state; - + state_t state; pthread_t worker; struct dmreq *request; struct url *url; @@ -42,7 +36,7 @@ }; struct dmmirr { - char name[512]; + char name[256]; int index; enum { Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 13:03:30 2013 (r257723) @@ -86,6 +86,8 @@ mirror->remark = NOT_TRIED; } else if (strcmp(rem, "FAILED") == 0) { mirror->remark = FAILED; + } else if (strcmp(rem, "ACTIVE") == 0) { + mirror->remark = ACTIVE; } else { fprintf(stderr, "WARNING: Unknown mirror state in mirrors.list\n"); } @@ -122,7 +124,12 @@ case FAILED: fputs("FAILED\n", f); break; + case ACTIVE: + fputs("ACTIVE\n", f); + break; } + + fprintf(f, "%u\n", mirror->index); for(i = 0; i < MAX_SAMPLES; i++) { fprintf(f, "%ld\t%f\n", mirror->timestamps[i].tv_sec, @@ -143,6 +150,7 @@ for(i = 0; i < sizeof(MIRROR_LIST) / sizeof(MIRROR_LIST[0]); i++) { fwrite(MIRROR_LIST[i], strlen(MIRROR_LIST[i]), 1, f); fprintf(f, "\nNOT_TRIED\n"); + fprintf(f, "0\n"); for (j = 0; j < MAX_SAMPLES; j++) fprintf(f, "0\t0\n"); } Modified: soc2013/ambarisha/head/usr.bin/dms/utils.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/utils.c Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dms/utils.c Thu Sep 26 13:03:30 2013 (r257723) @@ -30,8 +30,10 @@ *(sndbuf + i) = msg.op; i++; - memcpy(sndbuf + i, msg.buf, msg.len); - i += msg.len; + if (msg.len != 0) { + memcpy(sndbuf + i, msg.buf, msg.len); + i += msg.len; + } int nbytes = write(socket, sndbuf, bufsize); free(sndbuf); @@ -79,6 +81,13 @@ } bufsize -= sizeof(msg->op); + + /* This is to accommodate for 0 length messages */ + if (bufsize == 0) { + msg->len = 0; + msg->buf = NULL; + return msg; + } msg->buf = (char *) malloc(bufsize); if (msg == NULL) { @@ -118,3 +127,14 @@ free(*msg); *msg = NULL; } + +long +get_eta(struct xferstat *xs) +{ + long eta, elapsed, speed, received, expected; + elapsed = xs->last.tv_sec - xs->start.tv_sec; + received = xs->rcvd - xs->offset; + expected = xs->size - xs->rcvd; + eta = (long)((double) elapsed * expected / received); + return eta; +} Modified: soc2013/ambarisha/head/usr.bin/dms/utils.h ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/utils.h Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dms/utils.h Thu Sep 26 13:03:30 2013 (r257723) @@ -18,3 +18,6 @@ void free_dmmsg(struct dmmsg **msg); + +long +get_eta(struct xferstat *xs); Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 13:01:43 2013 (r257722) +++ soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 13:03:30 2013 (r257723) @@ -93,20 +93,11 @@ } static long -get_eta(struct dmjob *dmjob, struct dmmirr *dmmirr) +mirr_eta(off_t size, struct dmmirr *dmmirr) { - long eta, elapsed, speed, received, expected; - if (dmmirr == dmjob->mirror) { - elapsed = dmjob->oldstat.last.tv_sec - dmjob->oldstat.start.tv_sec; - received = dmjob->oldstat.rcvd - dmjob->oldstat.offset; - expected = dmjob->oldstat.size - dmjob->oldstat.rcvd; - eta = (long)((double) elapsed * expected / received); - } else { - expected = dmjob->oldstat.size; - speed = get_average_speed(dmmirr); - eta = (long)((double) expected / speed); - } - + long expected, speed, eta; + speed = get_average_speed(dmmirr); + eta = (long)((double) size / speed); return eta; } @@ -377,8 +368,9 @@ /* TODO: Not good assumes type of opaque pthread_t type * Fix this by having a pthread_t -> id mapping */ - unsigned int tid = (unsigned long)pthread_self(); - char idstr[8]; + unsigned int tid = (unsigned int)pthread_self(); + char idstr[32]; + sprintf(idstr, ".%u", tid); char *tmpfn = (char *) malloc(strlen(fn) + strlen(idstr) + strlen(TMP_EXT) + 1); @@ -410,11 +402,10 @@ tmp = jobs; while (tmp != NULL) { - cureta = get_eta(tmp, tmp->mirror); - neweta = get_eta(tmp, dmmirr); + cureta = get_eta(&(tmp->oldstat)); + neweta = mirr_eta(tmp->oldstat.size, dmmirr); if (neweta < cureta) { - /* notify the current owner worker to let go */ tmp->preempted = 1; break; } @@ -456,6 +447,9 @@ tv.tv_usec = 0; ret = select(dmjob->client + 1, &fds, NULL, NULL, &tv); + if (!FD_ISSET(dmjob->client, &fds)) + break; + msg = recv_dmmsg(dmjob->client); sig = (int *)msg->buf; if (*sig == SIGINT) @@ -562,6 +556,7 @@ if (dmjob->preempted && dmjob->worker != tid) goto preempted; + /* check that size is as expected */ /*if (dmreq->S_size) { if (us.size == -1) { @@ -770,6 +765,7 @@ stat_end(&xs, dmjob); + goto signal; preempted: r = -1; @@ -932,9 +928,9 @@ tmpreq.path = get_tmpfn(dmreq->path); if (tmpreq.path == NULL) goto done; - + tmpjob.ofd = open(tmpreq.path, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR); - + FILE *f = fetchXGet(tmpjob.url, us, flags); if (f == NULL) { close(tmpjob.ofd); @@ -946,6 +942,7 @@ ret = fetch(&tmpjob, f, *us); if (ret == -1) { f = NULL; + fprintf(stderr, "Failed now\n"); goto done; } @@ -984,6 +981,7 @@ /* Notify the client of the same */ return -1; } + break; case MD5_CHKSUM: MD5_Init(&md5_ctx); @@ -998,6 +996,7 @@ fprintf(stderr, "dms: checksum mismatch\n"); return -1; } + break; default: break; @@ -1045,6 +1044,7 @@ { struct dmrep report; struct dmjob *tmp; + struct dmmirr *dmmirr; struct url_stat us; int ret; FILE *f; @@ -1086,7 +1086,7 @@ f = dmXGet(dmjob, &us); if (f == NULL && dmjob->preempted && dmjob->worker != pthread_self()) return NULL; - + /* Acquire job queue lock */ ret = pthread_mutex_lock(&job_queue_mutex); if (ret == -1) { @@ -1095,6 +1095,7 @@ return NULL; } + /* Serve any outstanding requests from the local tmp file */ tmp = jobs; while (tmp != NULL) { @@ -1129,18 +1130,24 @@ if (f != NULL) { tmppath = get_tmpfn(dmjob->request->path); if (tmppath != NULL) { - remove(tmppath); +// remove(tmppath); free(tmppath); } } + dmmirr = dmjob->mirror; + + rm_dmreq(&(dmjob->request)); + jobs = rm_job(jobs, dmjob); /* Check if this worker can prempt any downloads */ - dmjob = find_potential_job(dmjob->mirror); - if (dmjob != NULL) + tmp = find_potential_job(dmmirr); + if (tmp != NULL) { + dmjob = tmp; goto start; - - release_mirror(dmjob->mirror); + } + + release_mirror(dmmirr); /* TODO : What is this? Yew!! */ sleep(10); }
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201309261303.r8QD3UDO066083>