Date: Mon, 19 Aug 2013 20:51:29 GMT From: ambarisha@FreeBSD.org To: svn-soc-all@FreeBSD.org Subject: socsvn commit: r256170 - soc2013/ambarisha/head/usr.bin/dms Message-ID: <201308192051.r7JKpTk4013825@socsvn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: ambarisha Date: Mon Aug 19 20:51:29 2013 New Revision: 256170 URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256170 Log: Added mirror profiling DMS samples the download speeds from mirrors and picks the mirror which has the highest average speed over the past week. It picks a mirror which hasn't been tried before with preference. Added: soc2013/ambarisha/head/usr.bin/dms/mirror.c Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile soc2013/ambarisha/head/usr.bin/dms/dms.c soc2013/ambarisha/head/usr.bin/dms/dms.h soc2013/ambarisha/head/usr.bin/dms/worker.c Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:48:15 2013 (r256169) +++ soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:51:29 2013 (r256170) @@ -1,7 +1,7 @@ # $FreeBSD$ .include <bsd.own.mk> -SRCS= utils.c dms.c worker.c +SRCS= mirror.c utils.c dms.c worker.c PROG= dms CSTD?= c99 .if ${MK_OPENSSL} != "no" Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:48:15 2013 (r256169) +++ soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:51:29 2013 (r256170) @@ -18,8 +18,12 @@ static char dm_errstr[512]; int stop; + struct dmjob *jobs; -pthread_mutex_t job_queue_mutex; +pthread_mutex_t job_queue_mutex; + +extern struct dmmirr *mirrors; +extern pthread_mutex_t mirror_list_mutex; void *run_worker(struct dmjob *job); @@ -114,6 +118,8 @@ static struct dmjob * mk_dmjob(struct dmreq *dmreq, int client) { + int ret; + struct dmmirr *cur; struct dmjob *dmjob = (struct dmjob *) malloc(sizeof(struct dmjob)); if (dmjob == NULL) { fprintf(stderr, "mk_dmjob: malloc: insufficient memory\n"); @@ -128,6 +134,7 @@ return NULL; } + dmjob->mirror = get_mirror(); dmjob->client = client; dmjob->sigint = 0; dmjob->sigalrm = 0; @@ -187,6 +194,7 @@ break; } + memcpy(&(dmreq->flags), rcvbuf + i, sizeof(dmreq->flags)); i += sizeof(dmreq->flags); @@ -292,7 +300,6 @@ if (ret == -1) { fprintf(stderr, "handle_request: Couldn't release " "job queue lock\n"); - goto error; } /* Job queue lock released */ @@ -322,11 +329,12 @@ void sigint_handler(int sig) { + save_mirrors(); stop = 1; exit(1); // Temporary } -static state_t +static int service_job(struct dmjob *job, fd_set *fdset) { int ret = 0; @@ -334,19 +342,23 @@ /* TODO: Worker can't handle this signal yet */ //pthread_kill(job->worker, SIGUSR1); } - return (job->state); + + return ret; } static void run_event_loop(int socket) { int i, ret, maxfd = socket; - state_t state; struct dmjob *cur; void *retptr; + fd_set fdset; + jobs = NULL; job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; - fd_set fdset; + mirrors = NULL; + mirror_list_mutex = PTHREAD_MUTEX_INITIALIZER; + load_mirrors(); signal(SIGINT, sigint_handler); while (!stop) { @@ -398,8 +410,8 @@ cur = jobs; while (cur != NULL) { - state = service_job(cur, &fdset); - if (state == DONE) { + ret = service_job(cur, &fdset); + if (ret > 0) { close(cur->client); jobs = rm_job(jobs, cur); rm_dmjob(&cur); Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:48:15 2013 (r256169) +++ soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:51:29 2013 (r256170) @@ -3,20 +3,29 @@ #include <sys/types.h> -typedef enum {RUNNING=0, DONE, DUPLICATE} state_t; +#define MAX_LISTEN_QUEUE 5 +#define MINBUFSIZE 4096 +#define MAX_SAMPLES 256 struct dmjob { int ofd; int client; - state_t state; int sigint; int sigalrm; int siginfo; int siginfo_en; unsigned timeout; + + enum { + RUNNING = 0, + DONE, + DUPLICATE + } state; + pthread_t worker; struct dmreq *request; struct url *url; + struct dmmirr *mirror; struct dmjob *next; struct dmjob *prev; @@ -28,9 +37,24 @@ char *errstr; }; -#define DEBUG 1 +struct dmmirr { + char name[512]; + int index; + + enum { + NOT_TRIED = 0, + ACTIVE, + FAILED + } remark; + + struct timeval timestamps[MAX_SAMPLES]; + double samples[MAX_SAMPLES]; + int nconns; -#define MAX_LISTEN_QUEUE 5 -#define MINBUFSIZE 4096 + struct dmmirr *next; + struct dmmirr *prev; +}; + +#define DEBUG 1 #endif Added: soc2013/ambarisha/head/usr.bin/dms/mirror.c ============================================================================== --- /dev/null 00:00:00 1970 (empty, because file is newly added) +++ soc2013/ambarisha/head/usr.bin/dms/mirror.c Mon Aug 19 20:51:29 2013 (r256170) @@ -0,0 +1,310 @@ +#include <errno.h> +#include <sys/time.h> +#include "dm.h" +#include "dms.h" + +#define MAX_SAMPLES 256 +#define MAX_CONNS 5 +#define MIRRORS_FILE "mirrors.list" + +struct dmmirr *mirrors; +pthread_mutex_t mirror_list_mutex; + +static const char *MIRROR_LIST[] = { + "ftp.freebsd.org" +}; + +static struct dmmirr * +add_mirror(struct dmmirr *head, struct dmmirr *new) +{ + new->prev = NULL; + new->next = NULL; + + if (head == NULL) + return new; + + head->prev = new; + new->next = head; +} + +static struct dmmirr * +rm_mirror(struct dmmirr *head, struct dmmirr *mirror) +{ + if (mirror->next != NULL) + mirror->next->prev = mirror->prev; + + if (mirror->prev != NULL) + mirror->prev->next = mirror->next; + + if (mirror == head) + return mirror->next; + + return head; +} + +static double +get_speed(struct xferstat *xs) +{ + double delta = (xs->last.tv_sec + (xs->last.tv_usec / 1.e6)) + - (xs->last2.tv_sec + (xs->last2.tv_usec / 1.e6)); + if (delta == 0.0) + return -1.0; + return (xs->rcvd - xs->lastrcvd) / delta; +} + +static struct dmmirr * +read_mirror(FILE *f) +{ + int i; + struct dmmirr *mirror; + char buf[512], rem[64]; + + mirror = (struct dmmirr *) malloc(sizeof(struct dmmirr)); + if (mirror == NULL) { + fprintf(stderr, "read_mirror: Insufficient memory\n"); + return NULL; + } + + if (fgets(buf, 512, f) == NULL) { + free(mirror); + return NULL; + } + sscanf(buf, "%s\n", mirror->name); + + if (fgets(buf, 64, f) == NULL) { + fprintf(stderr, "WARNING: read_mirror: mirrors.list file corrupted\n"); + free(mirror); + return NULL; + } + sscanf(buf, "%s\n", rem); + + if (strcmp(rem, "NOT_TRIED") == 0) { + mirror->remark = NOT_TRIED; + } else if (strcmp(rem, "FAILED") == 0) { + mirror->remark = FAILED; + } else { + fprintf(stderr, "WARNING: Unknown mirror state in mirrors.list\n"); + } + + if (fgets(buf, 64, f) == NULL) { + fprintf(stderr, "WARNING: read_mirror: mirrors.list file corrupted\n"); + free(mirror); + return NULL; + } + sscanf(buf, "%d\n", &mirror->index); + + for(i = 0; i < MAX_SAMPLES; i++) { + fscanf(f, "%ld\t%f\n", &(mirror->timestamps[i].tv_sec), + &(mirror->samples[i])); + /* TODO: What if fscanf fails? */ + } + + return mirror; +} + +static void +write_mirror(struct dmmirr *mirror, FILE *f) +{ + int i; + + fputs(mirror->name, f); + fputc('\n', f); + + switch(mirror->remark) { + case NOT_TRIED: + fputs("NOT_TRIED\n", f); + break; + case FAILED: + fputs("FAILED\n", f); + break; + } + + for(i = 0; i < MAX_SAMPLES; i++) { + fprintf(f, "%ld\t%f\n", mirror->timestamps[i].tv_sec, + mirror->samples[i]); + } + + return; +} + +static int +init_mirrors_file(void) +{ + int i, j; + FILE *f = fopen(MIRRORS_FILE, "w"); + if (f == NULL) + return -1; + + for(i = 0; i < sizeof(MIRROR_LIST) / sizeof(MIRROR_LIST[0]); i++) { + fwrite(MIRROR_LIST[i], strlen(MIRROR_LIST[i]), 1, f); + fprintf(f, "\nNOT_TRIED\n"); + for (j = 0; j < MAX_SAMPLES; j++) + fprintf(f, "0\t0\n"); + } + + fclose(f); +} + +int +load_mirrors(void) +{ + int ret; + struct dmmirr *mirror; + + FILE *f = fopen(MIRRORS_FILE, "r"); + if (f == NULL && errno == ENOENT) { + init_mirrors_file(); + f = fopen(MIRRORS_FILE, "r"); + } else if (f == NULL) { + fprintf(stderr, "load_mirrors: fopen(%s) failed\n", + MIRRORS_FILE); + return -1; + } + + /* Profile list lock */ + ret = pthread_mutex_lock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "get_mirror: Attempt to acquire" + " profile list mutex failed\n"); + return -1; + } + + mirror = read_mirror(f); + while(mirror != NULL) { + mirrors = add_mirror(mirrors, mirror); + mirror = read_mirror(f); + } + + ret = pthread_mutex_unlock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "get_mirror: Couldn't release " + "profile list lock\n"); + return -1; + } + /* Profile list lock released */ + + fclose(f); + return 0; +} + +int +save_mirrors(void) +{ + int ret; + struct dmmirr *mirror = mirrors; + + FILE *f = fopen(MIRRORS_FILE, "w"); + + /* Profile list lock */ + ret = pthread_mutex_lock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "get_mirror: Attempt to acquire" + " profile list mutex failed\n"); + return -1; + } + + while(mirror != NULL) { + write_mirror(mirror, f); + mirrors = rm_mirror(mirrors, mirror); + } + + ret = pthread_mutex_unlock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "get_mirror: Couldn't release " + "profile list lock\n"); + return -1; + } + /* Profile list lock released */ + + fclose(f); + return 0; +} + +void +update_mirror(struct dmmirr *dmmirr, struct xferstat *xs) +{ + struct timeval tv; + double speed; + + gettimeofday(&tv, NULL); + if (tv.tv_sec - dmmirr->timestamps[dmmirr->index].tv_sec < 60) + return; + + speed = get_speed(xs); + + /* TODO: This assumes that workers and sites have 1-1 correspondence */ + dmmirr->index = (dmmirr->index + 1) % MAX_SAMPLES; + dmmirr->timestamps[dmmirr->index] = tv; + dmmirr->samples[dmmirr->index] = speed; + dmmirr->remark = ACTIVE; +} + +struct dmmirr * +get_mirror(void) +{ + struct dmmirr *cur, *tmp; + double tmpmax = -1.0; + int cnt, ret, i; + struct timeval now; + long week_sec; + double average; + + week_sec = 7 * 24 * 60 * 60; + + /* Profile list lock */ + ret = pthread_mutex_lock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "get_mirror: Attempt to acquire" + " profile list mutex failed\n"); + return NULL; + } + + cur = mirrors; + tmp = NULL; + tmpmax = -1.0; + while (cur != NULL) { + if (cur->remark == NOT_TRIED) { + tmp = cur; + goto success; + } + + if (cur->remark == FAILED) + goto next; + if (cur->nconns > MAX_CONNS) + goto next; + + + i = cur->index; + cnt = 0; + average = 0.0; + do { + gettimeofday(&now, NULL); + if (cur->timestamps[i].tv_sec < now.tv_sec - week_sec) + break; + average = (average * cnt + cur->samples[i]) / (cnt + 1); + cnt++; + + i = (i - 1) % MAX_SAMPLES; + } while (i != cur->index); + + if (average > tmpmax) { + tmpmax = average; + tmp = cur; + } +next: + cur = cur->next; + } + + /* TODO: If we couldn't pick up a mirror? */ + +success: + ret = pthread_mutex_unlock(&mirror_list_mutex); + if (ret == -1) { + fprintf(stderr, "get_mirror: Couldn't release " + "profile list lock\n"); + return NULL; + } + /* Profile list lock released */ + + return tmp; +} Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:48:15 2013 (r256169) +++ soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:51:29 2013 (r256170) @@ -180,6 +180,9 @@ xs->offset = offset; xs->rcvd = offset; xs->lastrcvd = offset; + + update_mirror(dmjob->mirror , xs); + if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) stat_send(xs, 1); else if (dmjob->request->v_level > 0) @@ -190,6 +193,7 @@ stat_end(struct xferstat *xs, struct dmjob *dmjob) { gettimeofday(&xs->last, NULL); + update_mirror(dmjob->mirror , xs); if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) { stat_send(xs, 2); putc('\n', stderr); @@ -203,10 +207,20 @@ stat_update(struct xferstat *xs, off_t rcvd, struct dmjob *dmjob) { xs->rcvd = rcvd; + update_mirror(dmjob->mirror , xs); if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) stat_send(xs, 0); } +static void +select_mirror(struct dmjob *dmjob) +{ + dmjob->mirror = get_mirror(); + strcpy(dmjob->url->host, dmjob->mirror->name); + strcpy(dmjob->request->URL, dmjob->mirror->name); + strcat(dmjob->request->URL, dmjob->url->doc); +} + static int mk_url(struct dmjob *dmjob, char *flags) { @@ -228,12 +242,15 @@ fprintf(stderr, "warning: mk_url: URL empty\n"); goto failure; } + if ((dmjob->url = fetchParseURL(dmreq->URL)) == NULL) { warnx("%s: parse error", dmreq->URL); goto failure; } - /* if no scheme was specified, take a guess */ + /* Replace host name with the mirror name */ + select_mirror(dmjob); + if (!*(dmjob->url->scheme)) { if (!*(dmjob->url->host)) strcpy(dmjob->url->scheme, SCHEME_FILE); @@ -732,6 +749,7 @@ goto success; } */ + tmpreq.path = (char *) malloc(strlen(dmreq->path) + strlen(TMP_EXT)); if (tmpreq.path == NULL) { fprintf(stderr, "dmXGet: Insufficient memory\n");
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201308192051.r7JKpTk4013825>
