From owner-svn-soc-all@FreeBSD.ORG Mon Aug 19 20:46:33 2013 Return-Path: Delivered-To: svn-soc-all@FreeBSD.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2001:1900:2254:206a::19:1]) (using TLSv1 with cipher ADH-AES256-SHA (256/256 bits)) (No client certificate requested) by hub.freebsd.org (Postfix) with ESMTP id 25ECFEC1 for ; Mon, 19 Aug 2013 20:46:33 +0000 (UTC) (envelope-from ambarisha@FreeBSD.org) Received: from socsvn.freebsd.org (socsvn.freebsd.org [IPv6:2001:1900:2254:206a::50:2]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by mx1.freebsd.org (Postfix) with ESMTPS id 0505127DB for ; Mon, 19 Aug 2013 20:46:33 +0000 (UTC) Received: from socsvn.freebsd.org ([127.0.1.124]) by socsvn.freebsd.org (8.14.7/8.14.7) with ESMTP id r7JKkWSt029019 for ; Mon, 19 Aug 2013 20:46:32 GMT (envelope-from ambarisha@FreeBSD.org) Received: (from www@localhost) by socsvn.freebsd.org (8.14.7/8.14.6/Submit) id r7JKkW6c029016 for svn-soc-all@FreeBSD.org; Mon, 19 Aug 2013 20:46:32 GMT (envelope-from ambarisha@FreeBSD.org) Date: Mon, 19 Aug 2013 20:46:32 GMT Message-Id: <201308192046.r7JKkW6c029016@socsvn.freebsd.org> X-Authentication-Warning: socsvn.freebsd.org: www set sender to ambarisha@FreeBSD.org using -f From: ambarisha@FreeBSD.org To: svn-soc-all@FreeBSD.org Subject: socsvn commit: r256168 - soc2013/ambarisha/head/usr.bin/dms MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-BeenThere: svn-soc-all@freebsd.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: SVN commit messages for the entire Summer of Code repository List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 19 Aug 2013 20:46:33 -0000 Author: ambarisha Date: Mon Aug 19 20:46:32 2013 New Revision: 256168 URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256168 Log: Thread-proofing the globals. The global job queue is now protected by a mutex lock Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c soc2013/ambarisha/head/usr.bin/dms/worker.c Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:45:05 2013 (r256167) +++ soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:46:32 2013 (r256168) @@ -17,8 +17,9 @@ static int dm_err; static char dm_errstr[512]; -int stop; -struct dmjob *jobs; +int stop; +struct dmjob *jobs; +pthread_mutex_t job_queue_mutex; void *run_worker(struct dmjob *job); @@ -261,7 +262,25 @@ if ((dmjob = mk_dmjob(dmreq, csock)) == NULL) goto error; + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + goto error; + } + jobs = add_job(jobs, dmjob); + + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + goto error; + } + /* Job queue lock released */ + pthread_create(&(dmjob->worker), NULL, run_worker, dmjob); pthread_detach(dmjob->worker); goto done; @@ -310,6 +329,7 @@ struct dmjob *cur; void *retptr; jobs = NULL; + job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; fd_set fdset; signal(SIGINT, sigint_handler); @@ -320,6 +340,14 @@ maxfd = socket; FD_SET(socket, &fdset); + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + cur = jobs; while (cur != NULL) { FD_SET(cur->client, &fdset); @@ -328,6 +356,15 @@ cur = cur->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + ret = select(maxfd + 1, &fdset, NULL, NULL, NULL); if (ret == -1) { fprintf(stderr, "run_event_loop: " @@ -335,6 +372,14 @@ goto wrap_up; } + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + cur = jobs; while (cur != NULL) { state = service_job(cur, &fdset); @@ -346,6 +391,15 @@ cur = cur->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + if (FD_ISSET(socket, &fdset)) { struct sockaddr_un cliaddr; size_t cliaddrlen = sizeof(cliaddr); @@ -362,6 +416,14 @@ wrap_up: /* Notify all running workers that we've to wrap up */ + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + cur = jobs; while (cur != NULL) { if (cur->state == RUNNING) @@ -371,6 +433,16 @@ jobs = rm_job(jobs, cur); cur = cur->next; } + + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + } int main(int argc, char **argv) Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c ============================================================================== --- soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:45:05 2013 (r256167) +++ soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:46:32 2013 (r256168) @@ -13,8 +13,9 @@ #include "dm.h" -static const char *prefixes = " kMGTP"; +static const char *prefixes = " kMGTP"; extern struct dmjob *jobs; +extern pthread_mutex_t job_queue_mutex; #define TMP_EXT ".tmp" @@ -22,8 +23,8 @@ authenticate(struct url *url) { struct dmmsg msg; - struct dmjob *cur = jobs; - int bufsize = 0, i = 0, schlen, hlen; + struct dmjob *cur; + int bufsize = 0, i = 0, schlen, hlen, ret; schlen = strlen(url->scheme) + 1; hlen = strlen(url->host) + 1; @@ -46,6 +47,15 @@ msg.op = DMAUTHREQ; msg.len = bufsize; + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + + cur = jobs; while (cur != NULL) { /* TODO: May be a more thorough comparison? */ if (cur->url != url) { @@ -53,13 +63,22 @@ continue; } - + /* TODO: How do we figure out which request's * authentication credentials to use ??? * */ } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + return 1; } @@ -72,6 +91,7 @@ static void stat_send(struct xferstat *xs, int force) { + int ret; struct dmjob *cur; struct dmmsg msg; pthread_t self = pthread_self(); @@ -89,14 +109,30 @@ msg.op = DMSTAT; msg.buf = buf; msg.len = sizeof(*xs) + sizeof(force); - cur = jobs; + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + goto done; + } + + cur = jobs; while (cur != NULL) { if (pthread_equal(self, cur->worker) != 0) send_dmmsg(cur->client, msg); cur = cur->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + } + /* Job queue lock released */ + +done: free(buf); return; } @@ -740,6 +776,7 @@ int *clisig; pthread_t tid = pthread_self(); if (sig == SIGUSR1) { + /* TODO: Umm...Locking? */ while (tmp != NULL) { if (pthread_equal(tid, tmp->worker) != 0) break; @@ -802,8 +839,14 @@ char *tmppath; char flags[8]; - /* check if this is a duplicate */ - ret = mk_url(dmjob, flags); + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + tmp = jobs; while (tmp != NULL) { if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) { @@ -814,11 +857,30 @@ tmp = tmp->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + + /* check if this is a duplicate */ + ret = mk_url(dmjob, flags); dmjob->worker = pthread_self(); /* fetch the remote file into a local tmp file */ f = dmXGet(dmjob, &us); + /* Acquire job queue lock */ + ret = pthread_mutex_lock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Attempt to acquire" + " job queue mutex failed\n"); + return -1; + } + /* Serve any outstanding requests from the local tmp file */ tmp = jobs; while (tmp != NULL) { @@ -843,6 +905,15 @@ tmp = tmp->next; } + ret = pthread_mutex_unlock(&job_queue_mutex); + if (ret == -1) { + fprintf(stderr, "handle_request: Couldn't release " + "job queue lock\n"); + + return -1; + } + /* Job queue lock released */ + /* remove the local tmp file */ if (f != NULL) { tmppath = (char *) malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));