diff --git a/dumbo/backends/__init__.py b/dumbo/backends/__init__.py index 70917ee..d399481 100644 --- a/dumbo/backends/__init__.py +++ b/dumbo/backends/__init__.py @@ -35,9 +35,10 @@ def create_filesystem(opts): ## due to circular dependencies. ## ################################################################# -from dumbo.backends import streaming, unix +from dumbo.backends import streaming, unix, punix backends = [ streaming.StreamingBackend(), + punix.PunixBackend(), unix.UnixBackend() - ] \ No newline at end of file + ] diff --git a/dumbo/backends/punix.py b/dumbo/backends/punix.py new file mode 100644 index 0000000..0956ffa --- /dev/null +++ b/dumbo/backends/punix.py @@ -0,0 +1,442 @@ +''' +Created on 8 Jul 2011 + +@author: jso +''' + +import sys, os, time, threading +import operator + +from dumbo.backends.common import Backend, Iteration, FileSystem +from dumbo.util import getopt, getopts, configopts, envdef, execute +from dumbo.cmd import decodepipe + +from multiprocessing import Pool + +from dumbo.backends.unix import UnixFileSystem + +import subprocess + +import Queue +import random + +master_debug = False + +class PunixBackend(Backend): + + def matches(self, opts): + return bool(getopt(opts, 'punix', delete=False)) + + def create_iteration(self, opts): + progopt = getopt(opts, 'prog') + return PunixIteration(progopt[0], opts) + + def create_filesystem(self, opts): + # are we given a specific shell? + shell = getopt(opts, "shell", delete=False) + if shell: + return UnixFileSystem(shell[0]) + else: + return UnixFileSystem() + +def doMap(*args, **kwargs): + f = None + retval = 1 + try: + pyenv, python, cmdenv, mapper, nReducers, tmpdir, output, addedopts, shell, mapi, filename_list, doReduces = args + + filenames = " ".join(["-file %s" % x for x in filename_list]) + + encodepipe = pyenv + ' ' + python + ' -m dumbo.cmd encodepipe ' + filenames + if addedopts['inputformat'] and addedopts['inputformat'][0] == 'code': + encodepipe += ' -alreadycoded yes' + + if doReduces: + cmd = "%s | %s %s %s | python -m dumbo.backends.punixSplitter %d %d %s" % (encodepipe, + pyenv, + cmdenv, + mapper, + mapi, + nReducers, + tmpdir) + + else: + outfile = os.sep.join([output, "part-%05d" % mapi]) + cmd = "%s | %s %s %s > '%s'" % (encodepipe, + pyenv, + cmdenv, + mapper, + outfile) + + + cmdStderr = open(os.sep.join([tmpdir, "m-%d-status" % mapi]), "w") + retval = execute(cmd, stderr=cmdStderr, executable=shell) + cmdStderr.close() + + + f = open(os.sep.join([tmpdir, "m-%d-status" % mapi]), "a") + print >>f, "return code:", retval + f.close() + + if retval != 0: + print "map %d failed" % mapi + + except Exception as e: + f = open(os.sep.join([tmpdir, "m-%d-status" % mapi]), "a") + print >>f, type(e), str(e) + f.close() + + return retval + +def doReduce(*args, **kwargs): + retval = 1 + try: + tmpdir, pyenv, cmdenv, reducer, output, shell, reducenum = args + + combinedInput = os.sep.join([tmpdir, "r-%d-all" % reducenum]) + + retval = 0 + if os.path.exists(combinedInput): + cmd = "LC_ALL=C sort -t $'\\t' --temporary-directory=%s --key=1 %s | %s %s %s > '%s'" % (tmpdir, combinedInput, pyenv, cmdenv, reducer, os.sep.join([output, "part-%05d" % reducenum])) + + cmdStderr = open(os.sep.join([tmpdir, "r-%d-status" % reducenum]), "w") + retval = execute(cmd, stderr=cmdStderr, executable=shell) + cmdStderr.close() + + f = open(os.sep.join([tmpdir, "r-%d-status" % reducenum]), "a") + print >>f, "return code:", retval + f.close() + + if not master_debug: + # clean up + os.remove(combinedInput) + + except Exception as e: + f = open(os.sep.join([tmpdir, "r-%d-status" % reducenum]), "a") + print >>f, type(e), str(e) + f.close() + + if retval != 0: + print "reduce %d failed" % reducenum + + return retval + +class MapErrOutputCopier(threading.Thread): + def __init__(self, tmpdir): + threading.Thread.__init__(self) + + self.tmpdir = tmpdir + self.dstFile = "%s.mapStatus.txt" % self.tmpdir + + self.q = Queue.Queue() + + def run(self): + f = open(self.dstFile, "w") + while True: + mapi = self.q.get() + if mapi is None: break + + # copy the file to dstFile and remove the original + toCopy = os.sep.join([self.tmpdir, "m-%d-status" % mapi]) + if os.path.exists(toCopy): + print >>f, "map %d" % mapi + g = open(toCopy) + for l in g: + if "WARNING: skipping bad value" in l: continue + if "reporter:counter:Dumbo" in l: continue + f.write(l) + g.close() + os.remove(toCopy) + f.close() + + def map_done(self, mapi): + self.q.put(mapi) + +class ReduceErrOutputCopier(threading.Thread): + def __init__(self, tmpdir): + threading.Thread.__init__(self) + + self.tmpdir = tmpdir + self.dstFile = "%s.reduceStatus.txt" % self.tmpdir + + self.q = Queue.Queue() + + def run(self): + f = open(self.dstFile, "w") + while True: + reducenum = self.q.get() + if reducenum is None: break + + # copy the file to dstFile and remove the original + toCopy = os.sep.join([self.tmpdir, "r-%d-status" % reducenum]) + if os.path.exists(toCopy): + print >>f, "reduce %d" % reducenum + g = open(toCopy) + for l in g: + f.write(l) + g.close() + os.remove(toCopy) + f.close() + + def reduce_done(self, reducenum): + self.q.put(reducenum) + +class ReduceInputCopier(threading.Thread): + def __init__(self, tmpdir, rnum): + threading.Thread.__init__(self) + + self.tmpdir = tmpdir + self.rnum = rnum + self.q = Queue.Queue() + + self.dstFile = os.sep.join([tmpdir, "r-%d-all" % rnum]) + + def run(self): + while True: + mapi = self.q.get() + if mapi is None: break + + # copy the file to dstFile and remove the original + toCopy = os.sep.join([self.tmpdir, "r-%d" % self.rnum, "m-%d" % mapi]) + if os.path.exists(toCopy): + subprocess.call("cat %s >> %s" % (toCopy, self.dstFile), shell=True, executable="/bin/bash") + os.remove(toCopy) + + os.rmdir(os.sep.join([self.tmpdir, "r-%d" % self.rnum])) + + def map_done(self, mapi): + self.q.put(mapi) + +class PunixIteration(Iteration): + + def __init__(self, prog, opts): + Iteration.__init__(self, prog, opts) + self.opts += configopts('punix', prog, self.opts) + + def run(self): + retval = Iteration.run(self) + if retval != 0: + return retval + addedopts = getopts(self.opts, ['input', + 'output', + 'mapper', + 'reducer', + 'libegg', + 'delinputs', + 'cmdenv', + 'inputformat', + 'outputformat', + 'numreducetasks', + 'python', + 'pypath', + 'tmpdir', + 'nmappers', + 'nreducers', + 'permapper', + 'shell']) + (mapper, reducer) = (addedopts['mapper'][0], addedopts['reducer'][0]) + if not addedopts['input'] or not addedopts['output']: + print >> sys.stderr, 'ERROR: input or output not specified' + return 1 + inputs = reduce(operator.concat, (input.split(' ') for input in + addedopts['input'])) + output = addedopts['output'][0] + try: os.makedirs(output) + except os.error as e: pass + + pyenv = envdef('PYTHONPATH', addedopts['libegg'], + shortcuts=dict(configopts('eggs', self.prog)), + extrapaths=addedopts['pypath']) + cmdenv = ' '.join("%s='%s'" % tuple(arg.split('=')) for arg in + addedopts['cmdenv']) + + shell = addedopts["shell"][0] + + python = addedopts['python'][0] + + mapTotal = len(inputs) + mapDoneCount = [0] + reduceDoneCount = [0] + + nMappers = int(addedopts["nmappers"][0]) + nReducers = int(addedopts["nreducers"][0]) + + # this is the number of files that will be handed to each mapper + permapper = int(addedopts["permapper"][0]) + + # start the mappers, reducers + mPool = Pool(nMappers) + rPool = Pool(nReducers) + + doReduces = not (addedopts['numreducetasks'] and addedopts['numreducetasks'][0] == '0') + + # set up the mapper output/reducer input directories + tmpdir = os.sep.join([addedopts['tmpdir'][0], "%s_%06d" % (time.strftime("%Y-%m-%d_%H-%M-%S", time.gmtime()), random.randint(0, 999999))]) + + mLock = threading.Lock() + mResults = {} + rLock = threading.Lock() + mByR = {} + rStarted = set() + rResults = {} + + # start the map status output copier + mapErrOutputCopier = MapErrOutputCopier(tmpdir) + mapErrOutputCopier.start() + + if doReduces: + # start the copy threads to handle map outputs + copyLock = threading.Lock() + copyThreads = {} + for i in range(nReducers): + copyThreads[i] = ReduceInputCopier(tmpdir, i) + copyThreads[i].start() + + # start the reduce status output copier + reduceErrOutputCopier = ReduceErrOutputCopier(tmpdir) + reduceErrOutputCopier.start() + + for i in range(nReducers): + try: os.makedirs(os.sep.join([tmpdir, "r-%d" % i])) + except os.error as e: pass + + mByR[i] = set() + + # do maps -- kick it all off + if permapper == 1: + for args in enumerate(inputs): + i, filename = args + args = pyenv, python, cmdenv, mapper, nReducers, tmpdir, output, addedopts, shell, i, [filename], doReduces + mLock.acquire() + mResults[i] = mPool.apply_async(doMap, args) + mLock.release() + else: + # multiple files per mapper... + remaining = list(inputs) + i = 0 + while remaining: + args = pyenv, python, cmdenv, mapper, nReducers, tmpdir, output, addedopts, shell, i, remaining[:permapper], doReduces + mLock.acquire() + mResults[i] = mPool.apply_async(doMap, args) + mLock.release() + + remaining = remaining[permapper:] + i += 1 + + # need to reset the mapTotal variable since we have fewer tasks... + mapTotal = i + + def reduceDone(): + # did anything finish? + rLock.acquire() + + done = [x for x in rResults if rResults[x].ready()] + for args in done: del rResults[args] # cleanup + + rLock.release() + + for reducenum in done: + #print "reduce %d done" % reducenum + reduceDoneCount[0] += 1 + + reduceErrOutputCopier.reduce_done(reducenum) + + def mapDone(): + # did anything finish? + mLock.acquire() + + done = [x for x in mResults if mResults[x].ready()] + for args in done: del mResults[args] # cleanup + + mLock.release() + + for args in done: + i = args + + mapDoneCount[0] += 1 + + mapErrOutputCopier.map_done(i) + + if doReduces: + #print "map %d done" % i + + # update the structures + for reducenum in range(nReducers): + # initiate the copy request... + copyThreads[reducenum].map_done(i) + + rLock.acquire() + + mByR[reducenum].add(i) + + # see if we can signal that's all the copier will have to handle? + if len(mByR[reducenum]) == mapTotal: + copyThreads[reducenum].map_done(None) + + rLock.release() + else: + # just move the map output file (unsorted) to the output directory + print "map %d done" % i + + + def copyDone(): + # did anything finish? + copyLock.acquire() + + done = [x for x in copyThreads if not copyThreads[x].is_alive()] + + for rnum in done: del copyThreads[rnum] # cleanup + + copyLock.release() + + for rnum in done: + rLock.acquire() + + rStarted.add(rnum) + args = tmpdir, pyenv, cmdenv, reducer, output, shell, rnum + rResults[rnum] = rPool.apply_async(doReduce, args) + + rLock.release() + + while reduceDoneCount[0] < nReducers: + # check for things finishing... + mapDone() + copyDone() + reduceDone() + + mLock.acquire() + haveMaps = len(mResults) + mLock.release() + + rLock.acquire() + haveReduces = len(rResults) + rLock.release() + + copyLock.acquire() + copyRunning = len(copyThreads) + copyLock.release() + + print "%d/%d/%d maps\t%d/%d copies\t%d/%d/%d reduces" % (haveMaps, mapDoneCount[0], mapTotal, copyRunning, nReducers, haveReduces, reduceDoneCount[0], nReducers) + + time.sleep(5) + + mPool.terminate() + mPool.join() + rPool.terminate() + rPool.join() + + # make sure the map status output is done before cleaning up the tmp dir + mapErrOutputCopier.map_done(None) + mapErrOutputCopier.join() + + if doReduces: + # make sure the reduce status output is done before cleaning up the tmp dir + reduceErrOutputCopier.reduce_done(None) + reduceErrOutputCopier.join() + + if not master_debug and len(os.listdir(tmpdir)) == 0: + os.rmdir(tmpdir) + + + return 0 # make sure we return an error if there is a problem. + diff --git a/dumbo/backends/punixSplitter.py b/dumbo/backends/punixSplitter.py new file mode 100644 index 0000000..ae9b3e6 --- /dev/null +++ b/dumbo/backends/punixSplitter.py @@ -0,0 +1,24 @@ +import os, sys + +# assume reducer directory has been created + +mapId = int(sys.argv[1]) +nReducers = int(sys.argv[2]) +outdir = sys.argv[3] + +files = {} + +from IPy import IP +import datetime + +for line in sys.stdin: + key, val = line.split("\t", 1) + outnum = hash(eval(key)) % nReducers + if outnum not in files: + f = open(os.sep.join([outdir, "r-%d" % outnum, "m-%d" % mapId]), "w") + files[outnum] = f + files[outnum].write(line) + +for i in files: + files[i].close() + diff --git a/dumbo/backends/unix.py b/dumbo/backends/unix.py index c370650..d7d94aa 100644 --- a/dumbo/backends/unix.py +++ b/dumbo/backends/unix.py @@ -21,8 +21,12 @@ def create_iteration(self, opts): return UnixIteration(opts['prog'][0], opts) def create_filesystem(self, opts): - return UnixFileSystem() - + # are we given a specific shell? + shell = getopt(opts, "shell", delete=False) + if shell: + return UnixFileSystem(shell[0]) + else: + return UnixFileSystem() class UnixIteration(Iteration): @@ -109,6 +113,9 @@ def run(self): class UnixFileSystem(FileSystem): + + def __init__(self, shell="/bin/sh"): + self.shell = shell def cat(self, path, opts): opts = Options(opts) @@ -116,16 +123,16 @@ def cat(self, path, opts): return decodepipe(opts) def ls(self, path, opts): - return execute("ls -l '%s'" % path, printcmd=False) - + return execute("ls -l '%s'" % path, printcmd=False, executable=self.shell) + def exists(self, path, opts): - return execute("test -e '%s'" % path, printcmd=False) - + return execute("test -e '%s'" % path, printcmd=False, executable=self.shell) + def rm(self, path, opts): - return execute("rm -rf '%s'" % path, printcmd=False) - + return execute("rm -rf '%s'" % path, printcmd=False, executable=self.shell) + def put(self, path1, path2, opts): - return execute("cp '%s' '%s'" % (path1, path2), printcmd=False) - + return execute("cp '%s' '%s'" % (path1, path2), printcmd=False, executable=self.shell) + def get(self, path1, path2, opts): - return execute("cp '%s' '%s'" % (path1, path2), printcmd=False) + return execute("cp '%s' '%s'" % (path1, path2), printcmd=False, executable=self.shell) diff --git a/dumbo/cmd.py b/dumbo/cmd.py index f0f045d..dae9667 100644 --- a/dumbo/cmd.py +++ b/dumbo/cmd.py @@ -86,12 +86,23 @@ def start(prog, return 1 prog = '-m ' + prog - return execute("%s %s" % (sys.executable, prog), - opts, - pyenv, - stdout=stdout, - stderr=stderr, - printcmd=False) + # try to grab the shell to use + shellopt = getopt(opts, "shell", delete=False) + if shellopt: + return execute("%s %s" % (sys.executable, prog), + opts, + pyenv, + stdout=stdout, + stderr=stderr, + printcmd=False, + executable=shellopt[0]) + else: + return execute("%s %s" % (sys.executable, prog), + opts, + pyenv, + stdout=stdout, + stderr=stderr, + printcmd=False) def cat(path, opts): @@ -135,6 +146,11 @@ def get(path1, path2, opts): opts += Options(configopts('get')) return create_filesystem(opts).get(path1, path2, opts) +import gzip +def openFile(f): + if f.endswith(".gz"): + return gzip.open(f) + return open(f) def encodepipe(opts=None): opts = opts or Options() @@ -143,7 +159,7 @@ def encodepipe(opts=None): opts.remove(*keys) ofiles = addedopts['file'] - files = map(open, ofiles) if ofiles else [sys.stdin] + files = map(openFile, ofiles) if ofiles else [sys.stdin] loadfun = loadcode if addedopts['alreadycoded'] else loadtext addpath = addedopts['addpath'] @@ -161,7 +177,7 @@ def encodepipe(opts=None): def decodepipe(opts=None): opts = opts or Options() ofiles = opts.pop('file') - files = map(open, ofiles) if ofiles else [sys.stdin] + files = map(openFile, ofiles) if ofiles else [sys.stdin] for _file in files: outputs = loadcode(line[:-1] for line in _file) diff --git a/dumbo/core.py b/dumbo/core.py index 12afe15..e66f539 100644 --- a/dumbo/core.py +++ b/dumbo/core.py @@ -85,6 +85,16 @@ def run(self): preoutputsopt = opts.pop('preoutputs') delinputsopt = opts.pop('delinputs') + # only do this for the first iteration... + if iter == 0: + # handle inputfile options here; we are past the point where a + # bunch of things would get dumped to the commandline. + inputfiles = getopt(opts, 'inputfile', delete=True) # this deletes the inputfile options from opts + for inputfile in inputfiles: + for l in open(inputfile): + infile = l.strip() + opts.append(("input", infile)) + job_inputs = opts['input'] if not job_inputs: print >> sys.stderr, 'ERROR: No input path specified' diff --git a/dumbo/util.py b/dumbo/util.py index d87d823..a80f641 100644 --- a/dumbo/util.py +++ b/dumbo/util.py @@ -242,7 +242,8 @@ def execute(cmd, precmd='', printcmd=True, stdout=sys.stdout, - stderr=sys.stderr): + stderr=sys.stderr, + executable="/bin/sh"): if precmd: cmd = ' '.join((precmd, cmd)) opts = opts or Options() @@ -251,14 +252,14 @@ def execute(cmd, cmd = ' '.join((cmd, args)) if printcmd: print >> stderr, 'EXEC:', cmd - return system(cmd, stdout, stderr) + return system(cmd, stdout, stderr, executable) -def system(cmd, stdout=sys.stdout, stderr=sys.stderr): +def system(cmd, stdout=sys.stdout, stderr=sys.stderr, executable="/bin/sh"): if sys.version[:3] == '2.4': return os.system(cmd) proc = subprocess.Popen(cmd, shell=True, stdout=stdout, - stderr=stderr) + stderr=stderr, executable=executable) return proc.wait()