Date: Thu, 26 Sep 2013 12:32:13 GMT From: ambarisha@FreeBSD.org To: svn-soc-all@FreeBSD.org Subject: socsvn commit: r257720 - soc2013/ambarisha/head/usr.bin/dms Message-ID: <201309261232.r8QCWDW6061725@socsvn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: ambarisha Date: Thu Sep 26 12:32:12 2013 New Revision: 257720 URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=257720 Log: Job migration first changes Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c soc2013/ambarisha/head/usr.bin/dms/dms.h soc2013/ambarisha/head/usr.bin/dms/mirror.c soc2013/ambarisha/head/usr.bin/dms/mirror.h soc2013/ambarisha/head/usr.bin/dms/worker.c Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 12:30:28 2013 (r257719) +++ soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 12:32:12 2013 (r257720) @@ -144,6 +144,7 @@ dmjob->siginfo = 0; dmjob->siginfo_en = 0; dmjob->state = RUNNING; + dmjob->preempted = 0; dmjob->url = NULL; return dmjob; } Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 12:30:28 2013 (r257719) +++ soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 12:32:12 2013 (r257720) @@ -7,6 +7,8 @@ #define MINBUFSIZE 4096 #define MAX_SAMPLES 256 +#include "dm.h" + struct dmjob { int ofd; int client; @@ -15,6 +17,7 @@ int siginfo; int siginfo_en; unsigned timeout; + int preempted; enum { RUNNING = 0, @@ -26,6 +29,7 @@ struct dmreq *request; struct url *url; struct dmmirr *mirror; + struct xferstat oldstat; struct dmjob *next; struct dmjob *prev; Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 12:30:28 2013 (r257719) +++ soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 12:32:12 2013 (r257720) @@ -103,6 +103,7 @@ /* TODO: What if fscanf fails? */ } + mirror->nconns = 0; return mirror; } @@ -168,7 +169,7 @@ /* Profile list lock */ ret = pthread_mutex_lock(&mirror_list_mutex); if (ret == -1) { - fprintf(stderr, "get_mirror: Attempt to acquire" + fprintf(stderr, "load_mirrors: Attempt to acquire" " profile list mutex failed\n"); return -1; } @@ -181,7 +182,7 @@ ret = pthread_mutex_unlock(&mirror_list_mutex); if (ret == -1) { - fprintf(stderr, "get_mirror: Couldn't release " + fprintf(stderr, "load_mirrors: Couldn't release " "profile list lock\n"); return -1; } @@ -202,7 +203,7 @@ /* Profile list lock */ ret = pthread_mutex_lock(&mirror_list_mutex); if (ret == -1) { - fprintf(stderr, "get_mirror: Attempt to acquire" + fprintf(stderr, "save_mirrors: Attempt to acquire" " profile list mutex failed\n"); return -1; } @@ -214,7 +215,7 @@ ret = pthread_mutex_unlock(&mirror_list_mutex); if (ret == -1) { - fprintf(stderr, "get_mirror: Couldn't release " + fprintf(stderr, "save_mirrors: Couldn't release " "profile list lock\n"); return -1; } @@ -229,6 +230,7 @@ { struct timeval tv; double speed; + int ret; gettimeofday(&tv, NULL); if (tv.tv_sec - dmmirr->timestamps[dmmirr->index].tv_sec < 60) @@ -236,11 +238,54 @@ speed = get_speed(xs); + /* Profile list lock */ + ret = pthread_mutex_lock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "update_mirror: Attempt to acquire" + " profile list mutex failed\n"); + return; + } + /* TODO: This assumes that workers and sites have 1-1 correspondence */ dmmirr->index = (dmmirr->index + 1) % MAX_SAMPLES; dmmirr->timestamps[dmmirr->index] = tv; dmmirr->samples[dmmirr->index] = speed; dmmirr->remark = ACTIVE; + + ret = pthread_mutex_unlock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "update_mirror: Couldn't release " + "profile list lock\n"); + return; + } + /* Profile list lock released */ +} + +double +get_average_speed(struct dmmirr *dmmirr) +{ + int i, cnt; + double average; + struct timeval now; + long week_sec; + + week_sec = 7 * 24 * 60 * 60; + + i = dmmirr->index; + cnt = 0; + average = 0.0; + + do { + gettimeofday(&now, NULL); + if (dmmirr->timestamps[i].tv_sec < now.tv_sec - week_sec) + break; + average = (average * cnt + dmmirr->samples[i]) / (cnt + 1); + cnt++; + + i = (i - 1) % MAX_SAMPLES; + } while (i != dmmirr->index); + + return average; } struct dmmirr * @@ -249,12 +294,8 @@ struct dmmirr *cur, *tmp; double tmpmax = -1.0; int cnt, ret, i; - struct timeval now; - long week_sec; double average; - week_sec = 7 * 24 * 60 * 60; - /* Profile list lock */ ret = pthread_mutex_lock(&mirror_list_mutex); if (ret == -1) { @@ -277,19 +318,7 @@ if (cur->nconns > MAX_CONNS) goto next; - - i = cur->index; - cnt = 0; - average = 0.0; - do { - gettimeofday(&now, NULL); - if (cur->timestamps[i].tv_sec < now.tv_sec - week_sec) - break; - average = (average * cnt + cur->samples[i]) / (cnt + 1); - cnt++; - - i = (i - 1) % MAX_SAMPLES; - } while (i != cur->index); + average = get_average_speed(cur); if (average > tmpmax) { tmpmax = average; @@ -298,10 +327,10 @@ next: cur = cur->next; } - /* TODO: If we couldn't pick up a mirror? */ success: + tmp->nconns++; ret = pthread_mutex_unlock(&mirror_list_mutex); if (ret == -1) { fprintf(stderr, "get_mirror: Couldn't release " @@ -309,6 +338,32 @@ return NULL; } /* Profile list lock released */ - + return tmp; } + +int +release_mirror(struct dmmirr *dmmirr) +{ + int ret; + + /* Profile list lock */ + ret = pthread_mutex_lock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "update_mirror: Attempt to acquire" + " profile list mutex failed\n"); + return -1; + } + + dmmirr->nconns--; + + ret = pthread_mutex_unlock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "update_mirror: Couldn't release " + "profile list lock\n"); + return -1; + } + /* Profile list lock released */ + + return 0; +} Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.h ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/mirror.h Thu Sep 26 12:30:28 2013 (r257719) +++ soc2013/ambarisha/head/usr.bin/dms/mirror.h Thu Sep 26 12:32:12 2013 (r257720) @@ -8,5 +8,7 @@ int save_mirrors(void); void update_mirror(struct dmmirr *, struct xferstat *); struct dmmirr *get_mirror(void); +int release_mirror(struct dmmirr *); +double get_average_speed(struct dmmirr *); #endif Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 12:30:28 2013 (r257719) +++ soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 12:32:12 2013 (r257720) @@ -92,6 +92,24 @@ return strcmp(j1->request->URL, j2->request->URL); } +static long +get_eta(struct dmjob *dmjob, struct dmmirr *dmmirr) +{ + long eta, elapsed, speed, received, expected; + if (dmmirr == dmjob->mirror) { + elapsed = dmjob->oldstat.last.tv_sec - dmjob->oldstat.start.tv_sec; + received = dmjob->oldstat.rcvd - dmjob->oldstat.offset; + expected = dmjob->oldstat.size - dmjob->oldstat.rcvd; + eta = (long)((double) elapsed * expected / received); + } else { + expected = dmjob->oldstat.size; + speed = get_average_speed(dmmirr); + eta = (long)((double) expected / speed); + } + + return eta; +} + static void stat_send(struct xferstat *xs, int force) { @@ -177,6 +195,15 @@ stat_start(struct xferstat *xs, const char *name, off_t size, off_t offset, struct dmjob *dmjob) { + /* If there is no absolute progress because of a premption, + * do nothing. Otherwise update status incase there's a + * preemption later + */ + + if (dmjob->preempted != 0 && + dmjob->oldstat.rcvd > xs->rcvd) + return; + snprintf(xs->name, sizeof xs->name, "%s", name); gettimeofday(&xs->start, NULL); xs->last.tv_sec = xs->last.tv_usec = 0; @@ -185,7 +212,8 @@ xs->rcvd = offset; xs->lastrcvd = offset; - update_mirror(dmjob->mirror , xs); + update_mirror(dmjob->mirror, xs); + dmjob->oldstat = *xs; if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) stat_send(xs, 1); @@ -196,6 +224,16 @@ static void stat_end(struct xferstat *xs, struct dmjob *dmjob) { + /* If there is no absolute progress because of a premption, + * do nothing. Otherwise update status incase there's a + * preemption later + */ + + if (dmjob->preempted != 0 && + dmjob->oldstat.rcvd > xs->rcvd) + return; + dmjob->oldstat = *xs; + gettimeofday(&xs->last, NULL); update_mirror(dmjob->mirror , xs); if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) { @@ -210,7 +248,18 @@ static void stat_update(struct xferstat *xs, off_t rcvd, struct dmjob *dmjob) { + /* If there is no absolute progress because of a premption, + * do nothing. Otherwise update status incase there's a + * preemption later + */ + + if (dmjob->preempted != 0 && + dmjob->oldstat.rcvd > xs->rcvd) + return; + xs->rcvd = rcvd; + dmjob->oldstat = *xs; + update_mirror(dmjob->mirror , xs); if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) stat_send(xs, 0); @@ -231,6 +280,9 @@ struct dmreq *dmreq = dmjob->request; struct stat sb; int r; + + /* Init flags */ + *flags = '\0'; if (dmjob->url != NULL) return 0; @@ -319,6 +371,46 @@ return (r); } +static struct dmjob * +find_potential_job(struct dmmirr *dmmirr) +{ + int ret; + long cureta, neweta; + struct dmjob *tmp; + + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return NULL; + } + + tmp = jobs; + while (tmp != NULL) { + cureta = get_eta(tmp, tmp->mirror); + neweta = get_eta(tmp, dmmirr); + + if (neweta < cureta) { + /* notify the current owner worker to let go */ + tmp->preempted = 1; + break; + } + + tmp = tmp->next; + } + + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + return NULL; + } + /* Job queue lock released */ + + return tmp; +} + static int check_signal(int signum, struct dmjob *dmjob) { @@ -926,6 +1018,7 @@ return NULL; } + /* check if this is a duplicate */ tmp = jobs; while (tmp != NULL) { if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) { @@ -940,12 +1033,11 @@ if (ret == -1) { fprintf(stderr, "handle_request: Couldn't release " "job queue lock\n"); - return NULL; } /* Job queue lock released */ - /* check if this is a duplicate */ +start: ret = mk_url(dmjob, flags); dmjob->worker = pthread_self(); @@ -968,11 +1060,10 @@ continue; } - if (f == NULL) { + if (f == NULL) ret = -1; - } else { + else ret = validate_and_copy(dmjob, f, us); - } report.status = ret; report.errcode = fetchLastErrCode; @@ -993,14 +1084,22 @@ /* remove the local tmp file */ if (f != NULL) { - tmppath = (char *) malloc(strlen(dmjob->request->path) + strlen(TMP_EXT)); + tmppath = (char *) malloc(strlen(dmjob->request->path) + + strlen(TMP_EXT)); strcpy(tmppath, dmjob->request->path); strcat(tmppath, TMP_EXT); remove(tmppath); free(tmppath); } + + + /* Check if this worker can prempt any downloads */ + dmjob = find_potential_job(dmjob->mirror); + if (dmjob != NULL) + goto start; + release_mirror(dmjob->mirror); /* TODO : What is this? Yew!! */ sleep(10); }
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201309261232.r8QCWDW6061725>