Skip site navigation (1)Skip section navigation (2)
Date:      Thu, 26 Sep 2013 12:32:13 GMT
From:      ambarisha@FreeBSD.org
To:        svn-soc-all@FreeBSD.org
Subject:   socsvn commit: r257720 - soc2013/ambarisha/head/usr.bin/dms
Message-ID:  <201309261232.r8QCWDW6061725@socsvn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: ambarisha
Date: Thu Sep 26 12:32:12 2013
New Revision: 257720
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=257720

Log:
  Job migration first changes
  

Modified:
  soc2013/ambarisha/head/usr.bin/dms/dms.c
  soc2013/ambarisha/head/usr.bin/dms/dms.h
  soc2013/ambarisha/head/usr.bin/dms/mirror.c
  soc2013/ambarisha/head/usr.bin/dms/mirror.h
  soc2013/ambarisha/head/usr.bin/dms/worker.c

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c	Thu Sep 26 12:30:28 2013	(r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c	Thu Sep 26 12:32:12 2013	(r257720)
@@ -144,6 +144,7 @@
 	dmjob->siginfo = 0;
 	dmjob->siginfo_en = 0;
 	dmjob->state = RUNNING;
+	dmjob->preempted = 0;
 	dmjob->url = NULL;
 	return dmjob;
 }

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h	Thu Sep 26 12:30:28 2013	(r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h	Thu Sep 26 12:32:12 2013	(r257720)
@@ -7,6 +7,8 @@
 #define MINBUFSIZE		4096
 #define MAX_SAMPLES		256
 
+#include "dm.h"
+
 struct dmjob {
 	int		 ofd;
 	int	 	 client;
@@ -15,6 +17,7 @@
 	int	 	 siginfo;
 	int	 	 siginfo_en;
 	unsigned	 timeout;
+	int		 preempted;
 
 	enum {
 		RUNNING = 0,
@@ -26,6 +29,7 @@
 	struct dmreq 	*request;
 	struct url	*url;
 	struct dmmirr	*mirror;
+	struct xferstat	 oldstat;
 
 	struct dmjob 	*next;
 	struct dmjob	*prev;

Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/mirror.c	Thu Sep 26 12:30:28 2013	(r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.c	Thu Sep 26 12:32:12 2013	(r257720)
@@ -103,6 +103,7 @@
 		/* TODO: What if fscanf fails? */
 	}
 
+	mirror->nconns = 0;
 	return mirror;
 }
 
@@ -168,7 +169,7 @@
 	/* Profile list lock */
 	ret = pthread_mutex_lock(&mirror_list_mutex);
 	if (ret == -1) {
-		fprintf(stderr, "get_mirror: Attempt to acquire"
+		fprintf(stderr, "load_mirrors: Attempt to acquire"
 				" profile list mutex failed\n");
 		return -1;
 	}
@@ -181,7 +182,7 @@
 
 	ret = pthread_mutex_unlock(&mirror_list_mutex);
 	if (ret == -1) {
-		fprintf(stderr, "get_mirror: Couldn't release "
+		fprintf(stderr, "load_mirrors: Couldn't release "
 				"profile list lock\n");
 		return -1;
 	}
@@ -202,7 +203,7 @@
 	/* Profile list lock */
 	ret = pthread_mutex_lock(&mirror_list_mutex);
 	if (ret == -1) {
-		fprintf(stderr, "get_mirror: Attempt to acquire"
+		fprintf(stderr, "save_mirrors: Attempt to acquire"
 				" profile list mutex failed\n");
 		return -1;
 	}
@@ -214,7 +215,7 @@
 
 	ret = pthread_mutex_unlock(&mirror_list_mutex);
 	if (ret == -1) {
-		fprintf(stderr, "get_mirror: Couldn't release "
+		fprintf(stderr, "save_mirrors: Couldn't release "
 				"profile list lock\n");
 		return -1;
 	}
@@ -229,6 +230,7 @@
 {
 	struct timeval tv;
 	double speed;
+	int ret;
 
 	gettimeofday(&tv, NULL);
 	if (tv.tv_sec - dmmirr->timestamps[dmmirr->index].tv_sec < 60)
@@ -236,11 +238,54 @@
 
 	speed = get_speed(xs);
 
+	/* Profile list lock */
+	ret = pthread_mutex_lock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "update_mirror: Attempt to acquire"
+				" profile list mutex failed\n");
+		return;
+	}
+
 	/* TODO: This assumes that workers and sites have 1-1 correspondence */
 	dmmirr->index = (dmmirr->index + 1) % MAX_SAMPLES;
 	dmmirr->timestamps[dmmirr->index] = tv;
 	dmmirr->samples[dmmirr->index] = speed;
 	dmmirr->remark = ACTIVE;
+
+	ret = pthread_mutex_unlock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "update_mirror: Couldn't release "
+				"profile list lock\n");
+		return;
+	}
+	/* Profile list lock released */
+}
+
+double
+get_average_speed(struct dmmirr *dmmirr)
+{
+	int i, cnt;
+	double average;
+	struct timeval now;
+	long week_sec;
+
+	week_sec = 7 * 24 * 60 * 60;
+
+	i = dmmirr->index;
+	cnt = 0;
+	average = 0.0;
+
+	do {
+		gettimeofday(&now, NULL);
+		if (dmmirr->timestamps[i].tv_sec <  now.tv_sec - week_sec)
+			break;
+		average = (average * cnt + dmmirr->samples[i]) / (cnt + 1);
+		cnt++;
+
+		i = (i - 1) % MAX_SAMPLES;
+	} while (i != dmmirr->index);
+
+	return average;
 }
 
 struct dmmirr *
@@ -249,12 +294,8 @@
 	struct dmmirr *cur, *tmp;
 	double tmpmax = -1.0;
 	int cnt, ret, i;
-	struct timeval now;
-	long week_sec;
 	double average;
 
-	week_sec = 7 * 24 * 60 * 60;
-
 	/* Profile list lock */
 	ret = pthread_mutex_lock(&mirror_list_mutex);
 	if (ret == -1) {
@@ -277,19 +318,7 @@
 		if (cur->nconns > MAX_CONNS)
 			goto next;
 
-
-		i = cur->index;
-		cnt = 0;
-		average = 0.0;
-		do {
-			gettimeofday(&now, NULL);
-			if (cur->timestamps[i].tv_sec <  now.tv_sec - week_sec)
-				break;
-			average = (average * cnt + cur->samples[i]) / (cnt + 1);
-			cnt++;
-
-			i = (i - 1) % MAX_SAMPLES;
-		} while (i != cur->index);
+		average = get_average_speed(cur);
 
 		if (average > tmpmax) {
 			tmpmax = average;
@@ -298,10 +327,10 @@
 next:
 		cur = cur->next;
 	}
-
 	/* TODO: If we couldn't pick up a mirror? */
 
 success:
+	tmp->nconns++;
 	ret = pthread_mutex_unlock(&mirror_list_mutex);
 	if (ret == -1) {
 		fprintf(stderr, "get_mirror: Couldn't release "
@@ -309,6 +338,32 @@
 		return NULL;
 	}
 	/* Profile list lock released */
-	
+
 	return tmp;
 }
+
+int
+release_mirror(struct dmmirr *dmmirr)
+{
+	int ret;
+
+	/* Profile list lock */
+	ret = pthread_mutex_lock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "update_mirror: Attempt to acquire"
+				" profile list mutex failed\n");
+		return -1;
+	}
+
+	dmmirr->nconns--;
+
+	ret = pthread_mutex_unlock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "update_mirror: Couldn't release "
+				"profile list lock\n");
+		return -1;
+	}
+	/* Profile list lock released */
+
+	return 0;
+}

Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/mirror.h	Thu Sep 26 12:30:28 2013	(r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.h	Thu Sep 26 12:32:12 2013	(r257720)
@@ -8,5 +8,7 @@
 int save_mirrors(void);
 void update_mirror(struct dmmirr *, struct xferstat *);
 struct dmmirr *get_mirror(void);
+int release_mirror(struct dmmirr *);
+double get_average_speed(struct dmmirr *);
 
 #endif

Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c	Thu Sep 26 12:30:28 2013	(r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c	Thu Sep 26 12:32:12 2013	(r257720)
@@ -92,6 +92,24 @@
 	return strcmp(j1->request->URL, j2->request->URL);
 }
 
+static long
+get_eta(struct dmjob *dmjob, struct dmmirr *dmmirr)
+{
+	long eta, elapsed, speed, received, expected;
+	if (dmmirr == dmjob->mirror) {
+		elapsed = dmjob->oldstat.last.tv_sec - dmjob->oldstat.start.tv_sec;
+		received = dmjob->oldstat.rcvd - dmjob->oldstat.offset;
+		expected = dmjob->oldstat.size - dmjob->oldstat.rcvd;
+		eta = (long)((double) elapsed * expected / received);
+	} else {
+		expected = dmjob->oldstat.size;
+		speed = get_average_speed(dmmirr);
+		eta = (long)((double) expected / speed);
+	}
+
+	return eta;
+}
+
 static void
 stat_send(struct xferstat *xs, int force)
 {
@@ -177,6 +195,15 @@
 stat_start(struct xferstat *xs, const char *name, off_t size,
 	off_t offset, struct dmjob *dmjob)
 {
+	/* If there is no absolute progress because of a premption,
+	 * do nothing. Otherwise update status incase there's a 
+	 * preemption later
+	 */
+
+	if (dmjob->preempted != 0 &&
+		dmjob->oldstat.rcvd > xs->rcvd)
+			return;
+
 	snprintf(xs->name, sizeof xs->name, "%s", name);
 	gettimeofday(&xs->start, NULL);
 	xs->last.tv_sec = xs->last.tv_usec = 0;
@@ -185,7 +212,8 @@
 	xs->rcvd = offset;
 	xs->lastrcvd = offset;
 	
-	update_mirror(dmjob->mirror , xs);
+	update_mirror(dmjob->mirror, xs);
+	dmjob->oldstat = *xs;
 
 	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
 		stat_send(xs, 1);
@@ -196,6 +224,16 @@
 static void
 stat_end(struct xferstat *xs, struct dmjob *dmjob)
 {
+	/* If there is no absolute progress because of a premption,
+	 * do nothing. Otherwise update status incase there's a 
+	 * preemption later
+	 */
+
+	if (dmjob->preempted != 0 &&
+		dmjob->oldstat.rcvd > xs->rcvd)
+			return;
+	dmjob->oldstat = *xs;
+
 	gettimeofday(&xs->last, NULL);
 	update_mirror(dmjob->mirror , xs);
 	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) {
@@ -210,7 +248,18 @@
 static void
 stat_update(struct xferstat *xs, off_t rcvd, struct dmjob *dmjob)
 {
+	/* If there is no absolute progress because of a premption,
+	 * do nothing. Otherwise update status incase there's a 
+	 * preemption later
+	 */
+
+	if (dmjob->preempted != 0 &&
+		dmjob->oldstat.rcvd > xs->rcvd)
+			return;
+
 	xs->rcvd = rcvd;
+	dmjob->oldstat = *xs;
+
 	update_mirror(dmjob->mirror , xs);
 	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
 		stat_send(xs, 0);
@@ -231,6 +280,9 @@
 	struct dmreq *dmreq = dmjob->request;
 	struct stat sb;
 	int r;
+	
+	/* Init flags */
+	*flags = '\0'; 
 
 	if (dmjob->url != NULL)
 		return 0;
@@ -319,6 +371,46 @@
 	return (r);
 }
 
+static struct dmjob *
+find_potential_job(struct dmmirr *dmmirr)
+{
+	int ret;
+	long cureta, neweta;
+	struct dmjob *tmp;
+
+	/* Acquire job queue lock */
+	ret = pthread_mutex_lock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Attempt to acquire"
+				" job queue mutex failed\n");
+		return NULL;
+	}
+
+	tmp = jobs;
+	while (tmp != NULL) {
+		cureta = get_eta(tmp, tmp->mirror);
+		neweta = get_eta(tmp, dmmirr);
+	
+		if (neweta < cureta) {
+			/* notify the current owner worker to let go */
+			tmp->preempted = 1;
+			break;
+		}
+
+		tmp = tmp->next;
+	}
+
+	ret = pthread_mutex_unlock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Couldn't release "
+				"job queue lock\n");
+		return NULL;
+	}
+	/* Job queue lock released */
+
+	return tmp;
+}
+
 static int
 check_signal(int signum, struct dmjob *dmjob)
 {
@@ -926,6 +1018,7 @@
 		return NULL;
 	}
 
+	/* check if this is a duplicate */
 	tmp = jobs;
 	while (tmp != NULL) {
 		if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) {
@@ -940,12 +1033,11 @@
 	if (ret == -1) {
 		fprintf(stderr, "handle_request: Couldn't release "
 				"job queue lock\n");
-
 		return NULL;
 	}
 	/* Job queue lock released */
 
-	/* check if this is a duplicate */
+start:
 	ret = mk_url(dmjob, flags);	
 	dmjob->worker = pthread_self();
 
@@ -968,11 +1060,10 @@
 			continue;
 		}
 
-		if (f == NULL) {
+		if (f == NULL)
 			ret = -1;
-		} else {
+		else
 			ret = validate_and_copy(dmjob, f, us);		
-		}
 
 		report.status = ret;
 		report.errcode = fetchLastErrCode;
@@ -993,14 +1084,22 @@
 
 	/* remove the local tmp file */
 	if (f != NULL) {
-		tmppath = (char *) malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));
+		tmppath = (char *) malloc(strlen(dmjob->request->path) +
+					strlen(TMP_EXT));
 		strcpy(tmppath, dmjob->request->path);
 		strcat(tmppath, TMP_EXT);
 
 		remove(tmppath);
 		free(tmppath);
 	}
+
+
+	/* Check if this worker can prempt any downloads */
+	dmjob = find_potential_job(dmjob->mirror);	
+	if (dmjob != NULL)
+		goto start;
 	
+	release_mirror(dmjob->mirror);
 	/* TODO : What is this? Yew!! */
 	sleep(10);
 }



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201309261232.r8QCWDW6061725>