Date: Mon, 19 Aug 2013 20:46:32 GMT From: ambarisha@FreeBSD.org To: svn-soc-all@FreeBSD.org Subject: socsvn commit: r256168 - soc2013/ambarisha/head/usr.bin/dms Message-ID: <201308192046.r7JKkW6c029016@socsvn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: ambarisha Date: Mon Aug 19 20:46:32 2013 New Revision: 256168 URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256168 Log: Thread-proofing the globals. The global job queue is now protected by a mutex lock Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c soc2013/ambarisha/head/usr.bin/dms/worker.c Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:45:05 2013 (r256167) +++ soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:46:32 2013 (r256168) @@ -17,8 +17,9 @@ static int dm_err; static char dm_errstr[512]; -int stop; -struct dmjob *jobs; +int stop; +struct dmjob *jobs; +pthread_mutex_t job_queue_mutex; void *run_worker(struct dmjob *job); @@ -261,7 +262,25 @@ if ((dmjob = mk_dmjob(dmreq, csock)) == NULL) goto error; + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + goto error; + } + jobs = add_job(jobs, dmjob); + + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + goto error; + } + /* Job queue lock released */ + pthread_create(&(dmjob->worker), NULL, run_worker, dmjob); pthread_detach(dmjob->worker); goto done; @@ -310,6 +329,7 @@ struct dmjob *cur; void *retptr; jobs = NULL; + job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; fd_set fdset; signal(SIGINT, sigint_handler); @@ -320,6 +340,14 @@ maxfd = socket; FD_SET(socket, &fdset); + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + cur = jobs; while (cur != NULL) { FD_SET(cur->client, &fdset); @@ -328,6 +356,15 @@ cur = cur->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + ret = select(maxfd + 1, &fdset, NULL, NULL, NULL); if (ret == -1) { fprintf(stderr, "run_event_loop: " @@ -335,6 +372,14 @@ goto wrap_up; } + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + cur = jobs; while (cur != NULL) { state = service_job(cur, &fdset); @@ -346,6 +391,15 @@ cur = cur->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + if (FD_ISSET(socket, &fdset)) { struct sockaddr_un cliaddr; size_t cliaddrlen = sizeof(cliaddr); @@ -362,6 +416,14 @@ wrap_up: /* Notify all running workers that we've to wrap up */ + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + cur = jobs; while (cur != NULL) { if (cur->state == RUNNING) @@ -371,6 +433,16 @@ jobs = rm_job(jobs, cur); cur = cur->next; } + + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + } int main(int argc, char **argv) Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:45:05 2013 (r256167) +++ soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:46:32 2013 (r256168) @@ -13,8 +13,9 @@ #include "dm.h" -static const char *prefixes = " kMGTP"; +static const char *prefixes = " kMGTP"; extern struct dmjob *jobs; +extern pthread_mutex_t job_queue_mutex; #define TMP_EXT ".tmp" @@ -22,8 +23,8 @@ authenticate(struct url *url) { struct dmmsg msg; - struct dmjob *cur = jobs; - int bufsize = 0, i = 0, schlen, hlen; + struct dmjob *cur; + int bufsize = 0, i = 0, schlen, hlen, ret; schlen = strlen(url->scheme) + 1; hlen = strlen(url->host) + 1; @@ -46,6 +47,15 @@ msg.op = DMAUTHREQ; msg.len = bufsize; + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + + cur = jobs; while (cur != NULL) { /* TODO: May be a more thorough comparison? */ if (cur->url != url) { @@ -53,13 +63,22 @@ continue; } - + /* TODO: How do we figure out which request's * authentication credentials to use ??? * */ } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + return 1; } @@ -72,6 +91,7 @@ static void stat_send(struct xferstat *xs, int force) { + int ret; struct dmjob *cur; struct dmmsg msg; pthread_t self = pthread_self(); @@ -89,14 +109,30 @@ msg.op = DMSTAT; msg.buf = buf; msg.len = sizeof(*xs) + sizeof(force); - cur = jobs; + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + goto done; + } + + cur = jobs; while (cur != NULL) { if (pthread_equal(self, cur->worker) != 0) send_dmmsg(cur->client, msg); cur = cur->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + } + /* Job queue lock released */ + +done: free(buf); return; } @@ -740,6 +776,7 @@ int *clisig; pthread_t tid = pthread_self(); if (sig == SIGUSR1) { + /* TODO: Umm...Locking? */ while (tmp != NULL) { if (pthread_equal(tid, tmp->worker) != 0) break; @@ -802,8 +839,14 @@ char *tmppath; char flags[8]; - /* check if this is a duplicate */ - ret = mk_url(dmjob, flags); + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + tmp = jobs; while (tmp != NULL) { if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) { @@ -814,11 +857,30 @@ tmp = tmp->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + + /* check if this is a duplicate */ + ret = mk_url(dmjob, flags); dmjob->worker = pthread_self(); /* fetch the remote file into a local tmp file */ f = dmXGet(dmjob, &us); + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + /* Serve any outstanding requests from the local tmp file */ tmp = jobs; while (tmp != NULL) { @@ -843,6 +905,15 @@ tmp = tmp->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + /* remove the local tmp file */ if (f != NULL) { tmppath = (char *) malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201308192046.r7JKkW6c029016>