Date: Sat, 23 Apr 2011 21:02:25 +0000 (UTC) From: Florent Thoumie <flz@FreeBSD.org> To: src-committers@freebsd.org, svn-src-projects@freebsd.org Subject: svn commit: r220976 - in projects/portbuild: conf qmanager scripts Message-ID: <201104232102.p3NL2PpT003585@svn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: flz Date: Sat Apr 23 21:02:25 2011 New Revision: 220976 URL: http://svn.freebsd.org/changeset/base/220976 Log: Import qmanager into portbuild. Added: projects/portbuild/qmanager/ projects/portbuild/qmanager/acl.py projects/portbuild/qmanager/dumpdb.py (contents, props changed) projects/portbuild/qmanager/packagebuild (contents, props changed) projects/portbuild/qmanager/qclient (contents, props changed) projects/portbuild/qmanager/qmanager (contents, props changed) projects/portbuild/qmanager/qmanager.py (contents, props changed) projects/portbuild/qmanager/qmanagerclient.py projects/portbuild/qmanager/qmanagerhandler.py projects/portbuild/qmanager/qmanagerobj.py projects/portbuild/qmanager/schema.sql Modified: projects/portbuild/conf/server.conf projects/portbuild/scripts/dopackages Modified: projects/portbuild/conf/server.conf ============================================================================== --- projects/portbuild/conf/server.conf Sat Apr 23 20:59:58 2011 (r220975) +++ projects/portbuild/conf/server.conf Sat Apr 23 21:02:25 2011 (r220976) @@ -72,7 +72,6 @@ PDISPATCH_TIMEOUT=360000 # qmanager definitions (note: Python script, so avoid {}) # -QMANAGER_PATH=/var/portbuild/evil/qmanager QMANAGER_DATABASE_FILE=qdb.sl3 QMANAGER_SOCKET_FILE=/tmp/.qmgr Added: projects/portbuild/qmanager/acl.py ============================================================================== --- /dev/null 00:00:00 1970 (empty, because file is newly added) +++ projects/portbuild/qmanager/acl.py Sat Apr 23 21:02:25 2011 (r220976) @@ -0,0 +1,156 @@ +# Validate a (uid, (gids)) tuple against an ACL. + +import pwd, grp + +def getuidbyname(username): + if str(username).isdigit(): + return int(username) + return pwd.getpwnam(username)[2] + +def getgidbyname(grname): + if str(grname).isdigit(): + return int(grname) + return grp.getgrnam(grname)[2] + +class ACLElement(object): + """ Component of an ACL. """ + + def __init__(self, name, uidlist, gidlist, sense): + self.name = name + self.uidlist = [getuidbyname(uid) for uid in uidlist] + self.gidlist = [getgidbyname(gid) for gid in gidlist] + self.sense = bool(sense) + + def validate(self, uid, gids): + """ Validate an ACL Element. In order to match, the following must + hold: + + * uid is a subset of self.uidlist, or self.uidlist is empty + * one of the gids must be present in self.gidlist, or + self.gidlist is empty + + If both conditions hold, then the validation returns self.sense + + Returns: True/False if Element matches + None if Element fails to match + """ + + if (len(self.uidlist) == 0 or uid in self.uidlist) and \ + (len(self.gidlist) == 0 or set(gids).intersection(self.gidlist)): + return self.sense + return None + +class ACL(object): + """ List of ACLElements that form an ACL """ + + def __init__(self, acllist): + self.acls = acllist + + def validate(self, uid, gids): + uid=getuidbyname(uid) + gids=set(getgidbyname(gid) for gid in gids) + + for acl in self.acls: + res=acl.validate(uid, gids) + if res is not None: + return res + return False + +if __name__ == "__main__": + + from sys import exit + + assert getuidbyname(123) == 123 + assert getuidbyname('123') == 123 + + try: + ACLElement("test", ["foobar"], [""], True) + except KeyError: + pass + + try: + ACLElement("test", [123, "foobar"], [""], True) + except KeyError: + pass + + assert ACLElement("test", [123], [], True) != None + assert ACLElement("test", ["123"], [], True) != None + + acl = ACL([ACLElement("el 1", ["kris"], [], True), + ACLElement("el 2", [], ["wheel"], True), + ACLElement("el 3", [], [], False)]) + + assert acl.validate(getuidbyname('kris'), []) == True + assert acl.validate(getuidbyname('simon'), []) == False + assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('root'), [pwd.getpwnam('root')[3]]) == True + + acl = ACL([ACLElement("el 1", ["kris"], ["distcc"], True), + ACLElement("el 2", [], ["wheel"], True), + ACLElement("el 3", [], [], False)]) + assert acl.validate("kris", ["wheel"]) == True + assert acl.validate("kris", ["staff"]) == False + + acl = ACL([ACLElement("", ('kris',), (), True), + ACLElement("", (), ('wheel', 'devel'), True), + ACLElement("", (), (), False)]) + + assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False + + acl = ACL([ACLElement("", ('kris',), (), True), + ACLElement("", (), ('devel',), True), + ACLElement("", (), ('wheel',), True), + ACLElement("", (), (), False)]) + + assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False + + acl = ACL([ACLElement("", ('kris',), (), True), + ACLElement("", (), ('devel',), False), + ACLElement("", (), ('wheel',), True), + ACLElement("", (), (), False)]) + + assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == False + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False + + + acl = ACL([ACLElement("", ('kris',), (), True), + ACLElement("", (), ('devel',), True), + ACLElement("", (), ('wheel',), False), + ACLElement("", (), (), False)]) + + assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False + + + acl = ACL([ACLElement("", ('kris',), (), True), + ACLElement("", (), ('devel',), True), + ACLElement("", (), ('wheel',), False), + ACLElement("", (), (), True)]) + + assert acl.validate(getuidbyname('simon'), []) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == True + + acl = ACL([ACLElement("", ('kris',), (), False), + ACLElement("", (), ('devel',), True), + ACLElement("", (), ('wheel',), False), + ACLElement("", (), (), True)]) + + assert acl.validate(getuidbyname('simon'), []) == True + assert acl.validate(getuidbyname('kris'), []) == False + assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False + assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == True + + acl = ACL([ACLElement("", (4206,), set([]), True), + ACLElement("", (), set([]), False)]) + + assert acl.validate(4206, (4206, 31337)) == True + assert acl.validate(4201, (4201, 31337)) == False Added: projects/portbuild/qmanager/dumpdb.py ============================================================================== --- /dev/null 00:00:00 1970 (empty, because file is newly added) +++ projects/portbuild/qmanager/dumpdb.py Sat Apr 23 21:02:25 2011 (r220976) @@ -0,0 +1,140 @@ +# +# try doing some SQL reads as a test +# +from freebsd_config import * + +import os, threading, socket, Queue + +from signal import * +from sys import exc_info +from itertools import chain + +from qmanagerobj import * + +CONFIG_DIR="/var/portbuild" +CONFIG_SUBDIR="conf" +CONFIG_FILENAME="server.conf" + +# pieces of qmanagerobj.startup +def obj_startup(filename): + + engine = create_engine('sqlite:///' + filename, echo=True) + Session = sessionmaker(bind=engine) + session = Session() + + Base.metadata.create_all(engine) + + return (engine, session) + + +def show_acl( session ): + + acls = session.query(QManagerACL) + acls = acls.order_by('name') + + print + print 'starting dump of acl table:' + print + + for acl in acls: + + print + print "name: %s" % acl.name + # list + print "uidlist: " + str( acl.uidlist ) + # list + print "gidlist: " + str( acl.gidlist ) + print "sense: " + str( acl.sense ) + + +def show_jobs( session ): + + jobs = session.query(Job) + jobs = jobs.order_by('id') + + print + print 'starting dump of Job table:' + print + + for job in jobs: + + print + # job ID + print "job id: " + `job.id` + # Name of job + print "name: " + job.name + # priority of request + print "priority: " + `job.priority` + # job type + print "type: " + job.type + # uid of job owner + print "owner: " + `job.owner` + # gids of job owner (tuple) + #print str( type( job.gids ) ) + print "gids: " + str( job.gids ) + # machines that satisfied initial query (list) + #print str( type( job.machines ) ) + print "machines: " + str( job.machines ) + # Time job started/blocked (must not be modified when job is + # blocked or it will corrupt the heapq) + print "startttime: " + `job.starttime` + # initial machine description in case we have to revalidate (list) + # print str( type( job.mdl ) ) + print "mdl: " + str( job.mdl ) + # True --> job is running; False --> job is blocked + print "running: " + str( job.running ) + + +def show_machines( session ): + + machines = session.query(Machine) + machines = machines.order_by('name') + + print + print 'starting dump of Machines table:' + print + + for machine in machines: + + print + print "name: %s" % machine.name + # list + print "acl: " + str( machine.acl ) + # boolean + print "haszfs: " + str( machine.haszfs ) + # boolean + print "online: " + str( machine.online ) + + +def show_machines_for_arch( engine, arch ): + + mdl = ["arch = %s" % arch] + + q = SQL.construct(Machine, mdl) + res = engine.execute(Machine.__table__.select(q)) + result = [SQL.to_dict(Machine, i) for i in res] + + print + for machine in result: + print "machine for %s : %s " % ( arch, machine[ 'name' ] ) + + +# main + +if __name__ == '__main__': + + print "acquiring engine and session" + config = getConfig( CONFIG_DIR, CONFIG_SUBDIR, CONFIG_FILENAME ) + QMANAGER_PATH = config.get( 'QMANAGER_PATH' ) + QMANAGER_DATABASE_FILE = config.get( 'QMANAGER_DATABASE_FILE' ) + (engine, session) = obj_startup( \ + os.path.join( QMANAGER_PATH, QMANAGER_DATABASE_FILE ) ) + print "acquired engine and session" + # print "engine = '" + str( engine ) + "', session = '" + str( session ) + "'" + + show_acl( session ) + show_machines( session ) + show_jobs( session ) + + show_machines_for_arch( engine, 'i386' ) + show_machines_for_arch( engine, 'amd64' ) Added: projects/portbuild/qmanager/packagebuild ============================================================================== --- /dev/null 00:00:00 1970 (empty, because file is newly added) +++ projects/portbuild/qmanager/packagebuild Sat Apr 23 21:02:25 2011 (r220976) @@ -0,0 +1,649 @@ +#!/usr/bin/env python + +# Improved build dispatcher. Invoked on server-side from dopackages. + +# We try to build leaf packages (those +# which can be built immediately without requiring additional +# dependencies to be built) in the order such that the ones required +# by the longest dependency chains are built first. +# +# This has the effect of favouring deep parts of the package tree and +# evening out the depth over time, hopefully avoiding the situation +# where the entire cluster waits for a deep part of the tree to +# build on a small number of machines +# +# We can dynamically respond to changes in build machine availability, +# since the queue manager will block jobs that cannot be immediately +# satisfied and will unblock us when a job slot becomes available. +# +# When a package build fails, it is requeued with a lower priority +# such that it will rebuild again as soon as no "phase 1" packages +# are available to build. This prevents the cluster staying idle +# until the last phase 1 package builds. +# +# Other advantages are that this system is easily customizable and in +# the future will let us customize things like the matching policy of +# jobs to machines. For example, we could avoid dispatching multiple +# openoffice builds to the same system. +# +# TODO: +# * Combine build prep stages? +# - initial check for file up-to-date +# * check mtime for package staleness (cf make) +# * option to skip phase 2 + +from qmanagerclient import * + +from freebsd_config import * + +import os, string, sys, threading, time, subprocess +#import random +from itertools import chain +#import gc +from stat import * + +from Queue import Queue +from heapq import * + +CONFIG_DIR="/var/portbuild" +CONFIG_SUBDIR="conf" +CONFIG_FILENAME="server.conf" + +config = getConfig( CONFIG_DIR, CONFIG_SUBDIR, CONFIG_FILENAME ) +QMANAGER_MAX_JOB_ATTEMPTS = int( \ + config.get( 'QMANAGER_MAX_JOB_ATTEMPTS' ) ) +QMANAGER_PRIORITY_PACKAGES = string.split( \ + config.get( 'QMANAGER_PRIORITY_PACKAGES' ) ) +QMANAGER_RUNAWAY_PERCENTAGE = float( \ + config.get( 'QMANAGER_RUNAWAY_PERCENTAGE' ) ) +QMANAGER_RUNAWAY_THRESHOLD = int( \ + config.get( 'QMANAGER_RUNAWAY_THRESHOLD' ) ) + +DEBUG = False + +categories = {} +ports = {} + +# When a build fails we requeue it with a lower priority such that it +# will never preempt a phase 1 build but will build when spare +# capacity is available. +PHASE2_BASE_PRIO=1000 + +# Process success quickly so other jobs are started +SUCCESS_PRIO = -1000 + +# Failure should be a less common event :) +FAILURE_PRIO = -900 + +# Port status codes +PENDING = 1 # Yet to build +PHASE2 = 2 # Failed once + +class PriorityQueue(Queue): + """Variant of Queue that retrieves open entries in + priority order (lowest first). + Entries are typically tuples of the form: (priority number, + data) + This class can be found at: Python-2.6a3/Lib/Queue.py + """ + maxsize = 0 + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self, len=len): + return len(self.queue) + + def _put(self, item, heappush=heappush): + heappush(self.queue, item) + + def _get(self, heappop=heappop): + return heappop(self.queue) + +class Index(object): + + def __init__(self, indexfile): + self.indexfile = indexfile + + def parse(self, targets = None): + + print "[MASTER] Read index" + f = file(self.indexfile) + index = f.readlines() + f.close() + f = None + del f + + lines=[] + print "[MASTER] Phase 1" + for i in index: + (name, path, prefix, comment, descr, maintainer, categories, bdep, + rdep, www, edep, pdep, fdep) = i.rstrip().split("|") + + if targets is None or name in targets: + lines.append((name, bdep, rdep, edep, pdep, fdep)) + + Port(name, path, "", "", "", "", + categories, "") + index = None + del index + + print "[MASTER] Phase 2" + for (name, bdep, rdep, edep, pdep, fdep) in lines: + ports[name].setdeps(bdep, rdep, edep, pdep, fdep) + + lines = None + del lines + print "[MASTER] Done" + +def depthindex(targets): + """ Initial population of depth tree """ + + for i in targets: + i.depth_recursive() + +class Port(object): + + def __init__(self, name, path, prefix, comment, descr, maintainer, + cats, www): + + __slots__ = ["name", "path", "prefix", "comment", "descr", + "maintainer", "www", "bdep", "rdep", "edep", "pdep", + "fdep", "alldep", "parents", "depth", "categories"] + + self.name = name + self.path = path + self.prefix = prefix + self.comment = comment + self.descr = descr + self.maintainer = maintainer + self.www = www + + # Populated later + self.bdep = [] + self.rdep = [] + self.edep = [] + self.pdep = [] + self.fdep = [] + + self.alldep = [] + self.parents = [] + self.id = None # XXX + + self.status = PENDING + self.attempts = 0 + + # Whether the package build has completed and is hanging around + # to resolve dependencies for others XXX use status + self.done = False + + # Depth is the maximum length of the dependency chain of this port + self.depth = None + + self.categories=[] + scats = cats.split() + if len(scats) != len(set(scats)): + print "[MASTER] Warning: port %s includes duplicated categories: %s" % (name, cats) + + for c in set(scats): + try: + cat = categories[c] + except KeyError: + cat = Category(c) + + self.categories.append(cat) + cat.add(self) + + ports[name] = self + + def remove(self): + """ Clean ourselves up but don't touch references in other objects; +they still need to know about us as dependencies etc """ + + self.fdep = None + self.edep = None + self.pdep = None + self.bdep = None + self.rdep = None + self.alldep = None + self.parents = None + + for cat in self.categories: + cat.remove(self) + + ports[self.name] = None + del ports[self.name] + del self + + def destroy(self): + """ Remove a package and all references to it """ + + for pkg in self.alldep: + if pkg.parents is not None: + # Already removed but not destroyed + try: + pkg.parents.remove(self) + except ValueError: + continue + + for pkg in self.parents: + try: + pkg.fdep.remove(self) + except ValueError: + pass + try: + pkg.edep.remove(self) + except ValueError: + pass + try: + pkg.pdep.remove(self) + except ValueError: + pass + try: + pkg.bdep.remove(self) + except ValueError: + pass + try: + pkg.rdep.remove(self) + except ValueError: + pass + pkg.alldep.remove(self) + + sys.exc_clear() + + self.remove() + + def setdeps(self, bdep, rdep, edep, pdep, fdep): + self.fdep = [ports[p] for p in fdep.split()] + self.edep = [ports[p] for p in edep.split()] + self.pdep = [ports[p] for p in pdep.split()] + self.bdep = [ports[p] for p in bdep.split()] + self.rdep = [ports[p] for p in rdep.split()] + + self.alldep = list(set(chain(self.fdep, self.edep, self.pdep, + self.bdep, self.rdep))) + + for p in self.alldep: + p.parents.append(self) + + def depth_recursive(self): + + """ + Recursively populate the depth tree up from a given package + through dependencies, assuming empty values on entries not yet + visited + """ + + if self.depth is None: + if len(self.parents) > 0: + max = 0 + for i in self.parents: + w = i.depth_recursive() + if w > max: + max = w + self.depth = max + 1 + else: + self.depth = 1 + for port in QMANAGER_PRIORITY_PACKAGES: + if self.name.startswith(port): + # Artificial boost to try and get it building earlier + self.depth = 100 + return self.depth + + def destroy_recursive(self): + """ Remove a port and everything that depends on it """ + + parents=set([self]) + + while len(parents) > 0: + pkg = parents.pop() + assert pkg.depth is not None + parents.update(pkg.parents) + pkg.destroy() + + def success(self): + """ Build succeeded and possibly uncovered some new leaves """ + + parents = self.parents[:] + self.done = True + self.remove() + + newleafs = [p for p in parents if all(c.done for c in p.alldep)] + return newleafs + + def failure(self): + """ Build failed """ + + self.destroy_recursive() + + def packagename(self, arch, branch, buildid): + """ Return the path where a package may be found""" + + return "/var/portbuild/%s/%s/builds/%s/packages/All/%s.tbz" \ + % (arch, branch, buildid, self.name) + + def is_stale(self, arch, branch, buildid): + """ Does a package need to be (re)-built? + + Returns: False: if it exists and has newer mtime than all of + its dependencies. + True: otherwise + """ + + my_pkgname = self.packagename(arch, branch, buildid) + pkg_exists = os.path.exists(my_pkgname) + + if pkg_exists: + my_mtime = os.stat(my_pkgname)[ST_MTIME] + + dep_packages = [pkg.packagename(arch, branch, buildid) + for pkg in self.alldep] + deps_exist = all(os.path.exists(pkg) for pkg in dep_packages) + return not (pkg_exists and deps_exist and + all(os.stat(pkg)[ST_MTIME] <= my_mtime + for pkg in dep_packages)) + +class Category(object): + def __init__(self, name): + self.name = name + self.ports = {} + categories[name] = self + + def add(self, port): + self.ports[port] = port + + def remove(self, port): + self.ports[port]=None + del self.ports[port] + +def gettargets(targets): + """ split command line arguments into list of packages to build. + Returns set or iterable of all ports that will be built including + dependencies """ + + plist = set() + if len(targets) == 0: + targets = ["all"] + for i in targets: + if i == "all": + return ports.itervalues() + + if i.endswith("-all"): + cat = i.rpartition("-")[0] + plist.update(p.name for p in categories[cat].ports) + elif i.rstrip(".tbz") in ports: + plist.update([ports[i.rstrip(".tbz")].name]) + else: + raise KeyError, i + + # Compute transitive closure of all dependencies + pleft=plist.copy() + while len(pleft) > 0: + pkg = pleft.pop() + new = [p.name for p in ports[pkg].alldep] + plist.update(new) + pleft.update(new) + + for p in set(ports.keys()).difference(plist): + ports[p].destroy() + + return [ports[p] for p in plist] + +class worker(threading.Thread): + + # Protects threads + lock = threading.Lock() + + # Running threads, used for collecting status + threads = {} + + def __init__(self, mach, job, arch, branch, buildid, queue): + threading.Thread.__init__(self) + self.machine = mach + self.job = job + self.arch = arch + self.branch = branch + self.buildid = buildid + self.queue = queue + + self.setDaemon(True) + + def run(self): + pkg = self.job + + print "[MASTER] Running job %s" % (pkg.name), + if pkg.status == PHASE2: + print " (phase 2)" + else: + print + try: + runenv={'HOME':"/root", + 'PATH':'/sbin:/bin:/usr/sbin:/usr/bin:/usr/games:/usr/local/sbin:/usr/local/bin:/var/portbuild/scripts', + 'FD':" ".join(["%s.tbz" % p.name for p in pkg.fdep]), + 'ED':" ".join(["%s.tbz" % p.name for p in pkg.edep]), + 'PD':" ".join(["%s.tbz" % p.name for p in pkg.pdep]), + 'BD':" ".join(["%s.tbz" % p.name for p in pkg.bdep]), + 'RD':" ".join(["%s.tbz" % p.name for p in pkg.rdep])} + for var in ["NOCLEAN", "NO_RESTRICTED", "NOPLISTCHECK", "NO_DISTFILES", "FETCH_ORIGINAL", "TRYBROKEN" ]: + if var in os.environ: + runenv[var] = os.environ.get(var) + build = subprocess.Popen( + ["/bin/sh", "/var/portbuild/scripts/pdispatch", + self.arch, self.branch, self.buildid, self.machine, + "/var/portbuild/scripts/portbuild", "%s.tbz" % pkg.name, + pkg.path], + env=runenv, + stderr=subprocess.STDOUT, stdout=subprocess.PIPE, bufsize=0) + except OSError, e: + print >>sys.stderr, "[%s:%s]: Execution failed: %s" % \ + (pkg.id, pkg.name, e) + while True: + try: + line = build.stdout.readline() + except: + print "[%s:%s]: Failed reading from build script" % \ + (pkg.id, pkg.name) + break + if line == "": + break + print "[%s:%s] %s" % (pkg.id, pkg.name, line.rstrip()) + + retcode = build.wait() + +# time.sleep(random.randint(0,60)) +# +# r = random.random() +# if r < 0.1: +# retcode = 1 +# elif r < 0.15: +# retcode = 254 +# else: +# retcode = 0 + + conn = QManagerClientConn(stderr = sys.stderr) + timeout = 1 + try: + (code, vars) = conn.command("release", {'id':pkg.id}) + except RequestError, e: + print "[MASTER] Error releasing job %s (%s): %s" % (pkg.name, pkg.id, e.value) + + if DEBUG: + print "[MASTER] got retcode %d from pkg %s" % (retcode, pkg.name) + if retcode == 254: + # Requeue soft failure at original priority + # XXX exponential backoff? + time.sleep(60) +# print "Requeueing %s" % pkg.id + self.queue.put((-pkg.depth, pkg)) + elif retcode == 253: + # setting up a machine, we should immediately retry + self.queue.put((-pkg.depth, pkg)) + elif retcode == 0: + self.queue.put((SUCCESS_PRIO, pkg)) + else: + self.queue.put((FAILURE_PRIO, pkg)) + + # Clean up + worker.lock.acquire() + worker.threads[self]=None + del worker.threads[self] + worker.lock.release() + + @staticmethod + def dispatch(mach, job, arch, branch, buildid, queue): + wrk = worker(mach, job, arch, branch, buildid, queue) + + worker.lock.acquire() + worker.threads[wrk] = wrk + worker.lock.release() + + wrk.start() + +def main(arch, branch, buildid, args): + global index + + basedir="/var/portbuild/"+arch+"/"+branch+"/builds/"+buildid + portsdir=basedir+"/ports" + + # get the major branch number. + branchbase = branch.split("-")[ 0 ] + # XXX ERWLA - Ugly hack + branchbase = branchbase.split(".")[ 0 ] + indexfile=portsdir+"/INDEX-"+branchbase + + print "[MASTER] parseindex..." + index = Index(indexfile) + index.parse() + print "[MASTER] length = %s" % len(ports) + + print "[MASTER] Finding targets..." + targets = gettargets(args) + + print "[MASTER] Calculating depth..." + depthindex(targets) + + print "[MASTER] Pruning duds..." + dudsfile=basedir+"/duds" + for line in file(dudsfile): + try: + dud = ports[line.rstrip()] + except KeyError: + continue + print "[MASTER] Skipping %s (duds)" % dud.name + dud.destroy_recursive() + + queue = PriorityQueue() + # XXX can do this while parsing index if we prune targets/duds + # first + for pkg in ports.itervalues(): + if len(pkg.alldep) == 0: + queue.put((-pkg.depth, pkg)) + + # XXX check osversion, pool + mdl=["arch = %s" % arch] + + # Main work loop + completed_jobs = 0 + failed_jobs = 0 + while len(ports) > 0: + print "[MASTER] Ports remaining=%s, Queue length=%s" % (len(ports), queue.qsize()) + + if len(ports) < 10: + print "[MASTER] Remaining ports: %s" % ports.keys() + + (prio, job) = queue.get() + if DEBUG: + print "[MASTER] Job %s pulled from queue with prio %d" % ( job.name, prio ) + if prio == SUCCESS_PRIO: + print "[MASTER] Job %s succeeded" % job.name + for new in job.success(): + queue.put((-new.depth, new)) + completed_jobs = completed_jobs + 1 + continue + elif prio == FAILURE_PRIO: + if job.status == PHASE2: + print "[MASTER] Job %s failed" % job.name + job.failure() + continue + else: + # XXX MCL 20110421 + completed_jobs = completed_jobs + 1 + failed_jobs = failed_jobs + 1 + if DEBUG: + print "[MASTER] jobs: %d failed jobs out of %d:" % \ + ( failed_jobs, completed_jobs ) + if completed_jobs > QMANAGER_RUNAWAY_THRESHOLD and \ + float( failed_jobs ) / completed_jobs > QMANAGER_RUNAWAY_PERCENTAGE: + print "[MASTER] ERROR: runaway build detected: %d failed jobs out of %d:" % \ + ( failed_jobs, completed_jobs ) + print "[MASTER] RUN TERMINATED." + break + + job.attempts = job.attempts + 1 + # XXX MCL in theory, if all this code worked correctly, + # this condition would never trigger. In practice, + # however, it does, so bomb out before filling portmgr's + # mbox. + # XXX MCL 20110422 perhaps this code has been fixed now; + # XXX it did not use to work: + if job.attempts > QMANAGER_MAX_JOB_ATTEMPTS: + print "[MASTER] Job %s failed %d times; RUN TERMINATED." % ( job.name, job.attempts ) + break + else: + # Requeue at low priority + print "[MASTER] Job %s failed (requeued for phase 2)" % job.name + job.status = PHASE2 + queue.put((PHASE2_BASE_PRIO-job.depth, job)) + continue + elif job.status == PHASE2: + depth = -(prio - PHASE2_BASE_PRIO) + else: + depth = -prio + + print "[MASTER] Working on job %s, depth %d" % (job.name, depth) + if job.is_stale(arch, branch, buildid): + conn = QManagerClientConn(stderr = sys.stderr) + (code, vars) = conn.command("acquire", + {"name":job.name, + "type":"%s/%s/%s package" % \ + (arch, branch, buildid), + "priority":10, "mdl":mdl}) + + if code[0] == "2": + machine=vars['machine'] + job.id=vars['id'] +# print "Got ID %s" % job.id + + worker.dispatch(machine, job, arch, branch, buildid, queue) + else: + print "[MASTER] Error acquiring job %s: %s" % (pkg.name, code) + else: + print "[MASTER] Skipping %s since it already exists" % job.name + for new in job.success(): + queue.put((-new.depth, new)) + + print "[MASTER] Waiting for threads" + threads = worker.threads.copy() + + for t in threads: + print "[MASTER] Outstanding thread: %s" % t.job.name + + for t in threads: + print "[MASTER] Waiting for thread %s" % t.job.name + t.join() + + print "[MASTER] Finished" + +if __name__ == "__main__": + + try: + main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4:]) + sys.exit( 0 ) + except Exception, e: + # XXX MCL TODO move this above + print "packagebuild: Exception:" + try: + print str( e ) + except: + pass + sys.exit( 1 ) Added: projects/portbuild/qmanager/qclient ============================================================================== --- /dev/null 00:00:00 1970 (empty, because file is newly added) +++ projects/portbuild/qmanager/qclient Sat Apr 23 21:02:25 2011 (r220976) @@ -0,0 +1,242 @@ +#!/usr/bin/env python + +# queue manager client + +# TODO: +# * pretty-print command output + +import socket, os, sys + +from optparse import OptionParser +from qmanagerclient import * + +def error(msg): + print >>sys.stderr, "%s: %s" % (sys.argv[0], msg.rstrip()) + sys.exit(1) + +def buildquery(option, opt, values, parser): + """ + Turn command line options into a query description + + Modifies: + query global + + Raises: + ValueError if bogus arguments to numeric operators + + """ + global query + + numopt = False + if opt in ("-m", "--machine"): + key="name" + *** DIFF OUTPUT TRUNCATED AT 1000 LINES ***
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201104232102.p3NL2PpT003585>