#!/usr/bin/env python # DHARMA Project # Copyright (C) 2003-2004 Yun Mao, University of Pennsylvania # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ vxargs: Visualized xargs with redirected output """ version = "0.3.3" import os, sys, time, signal, errno import curses, random, commands import getopt update_rate = 1 final_stats = {} gsl = None stopping = 0 def getListFromFile(f): """I'll ignore the line starting with # @param f: file object of the host list file @return: a list of hostnames (or IPs) """ hostlist = [] for line in f: if line[0]!='#': if line.strip(): hostlist.append([line.strip(),'']) elif hostlist and hostlist[-1][1]=='': hostlist[-1][1] = line.strip()[1:] return hostlist def get_last_line_py(fn): #equ to tail -n1 fn try: lines = open(fn,'r').readlines() if len(lines)>0: return (0, lines[-1].strip()) except IOError: pass return (1,'') def get_last_line(fn): #equ to tail -n1 fn try: x = commands.getstatusoutput('tail -n1 %s' % (fn,)) except IOError, e: #getstatusoutput uses pipe.read(), which may produce interrupted syscall exception return (1,'') if x[0]==0: return (0, x[1]) return (1,'') class Slot: def __init__(self, outdir, num, screen, timeout, name, count): self.outdir = outdir self.slotnum = num self.screen = screen self.comment = "" self.startTime = time.time() self.timeout = timeout self.name = name self.count = count def drawLine(self, comment='', done = False): if self.screen is None: return if comment == '': comment = self.comment else: self.comment = comment stdscr = self.screen elapsed = time.time()-self.startTime try: y,x = stdscr.getmaxyx() spaces = ' '*x stdscr.addstr(self.slotnum+2, 0, spaces) #title occupies two lines if done: stdscr.addstr(self.slotnum+2,0, comment[:x]) else: #construct the string output = "(%3ds)%3d: %s " % ( round(elapsed), self.count, self.name ) spaceleft = x - len(output) if self.outdir and spaceleft>1: outfn = os.path.join(self.outdir, '%s.out' % self.name) errfn = os.path.join(self.outdir, '%s.err' % self.name) lout = get_last_line(outfn) lerr = get_last_line(errfn) if lerr[0]==0 and lerr[1]: output += lerr[1] elif lout[0]==0 and lout[1]: output += lout[1] else: output += comment else: output += comment stdscr.addstr(self.slotnum+2, 0, output[:x] ) stdscr.refresh() except curses.error: #some of them will be out of screen, ignore it pass def update(self, pid): self.drawLine() if self.timeout >0: self.kill(pid) def kill(self, pid): overtime = time.time()-self.startTime - self.timeout try: if overtime > 3: #expired more than 3 seconds, send -9 os.kill(-pid, signal.SIGKILL) elif overtime > 2: #expired more than 2 seconds, send -15 os.kill(-pid, signal.SIGTERM) elif overtime >= 0: os.kill(-pid, signal.SIGINT) except OSError, e: if e.errno != errno.ESRCH: # No such process raise e def stop(self, pid): """stop current pid b/c we caught SIGINT twice """ self.startTime = time.time() - self.timeout self.kill(pid) class Slots: pids = {} def __init__(self, max, screen, timeout, outdir): self.maxChild = max self.slots = range(self.maxChild) self.screen = screen self.t = timeout self.outdir = outdir def getSlot(self, name, count): if not self.slots: #it's empty, wait until other jobs finish slot = self.waitJobs().slotnum else: slot = self.slots[0] self.slots.remove(slot) return Slot(self.outdir, slot, self.screen, self.t, name, count) def mapPID(self, pid, slot): """@param slot: slot object """ self.pids[pid] = slot def waitJobs(self): while 1: try: pid, status = os.wait() break except OSError, e: if e.errno == errno.ECHILD: #no child processes raise RuntimeError('no child processes when waiting') slot = self.pids[pid] if self.outdir: open(os.path.join(self.outdir, '%s.status' % slot.name),'w').write('%d' % (status>>8)) if (status & 0xFF) !=0: open(os.path.join(self.outdir, 'killed_list'),'a').write('%s\n' % (slot.name)) if status >>8: open(os.path.join(self.outdir, 'abnormal_list'),'a').write('%s\n' % (slot.name)) del self.pids[pid] s = status >> 8 if final_stats.has_key(s): final_stats[s]+= 1 else: final_stats[s]=1 return slot def update(self): for k,v in self.pids.items(): v.update(k) def timeout(self): self.update() signal.alarm(update_rate) def drawTitle(self, stuff): if self.screen: y,x = self.screen.getmaxyx() spaces = ' '*(x*2) self.screen.addstr(0,0, spaces) self.screen.addstr(0,0, stuff[:x*2]) self.screen.refresh() else: print stuff def stop(self): if stopping ==1: msg = 'Stopping -- Waiting current jobs done. Press Ctrl-C again to kill current jobs.' else: msg = 'Stopping -- Killing current jobs' self.drawTitle(msg) if stopping >1: for k,v in self.pids.items(): v.stop(k) return def handler(signum, frame_unused): global gsl if signum==signal.SIGALRM: gsl.timeout() if signum==signal.SIGINT: global stopping stopping += 1 gsl.stop() def generateCommands(cmd_line, args): return [per_arg.replace('{}', args[0]) for per_arg in cmd_line] def spawn(cmdline, outfn, errfn, setpgrp = False): """A cleverer spawn that lets you redirect stdout and stderr to outfn and errfn. Returns pid of child. You can't do this with os.spawn, sadly. """ pid = os.fork() if pid==0: #child out = open(outfn, 'w') os.dup2(out.fileno() ,sys.stdout.fileno()) err = open(errfn, 'w') os.dup2(err.fileno(), sys.stderr.fileno()) if setpgrp: os.setpgrp() try: os.execvp(cmdline[0], cmdline) except OSError,e: print >> sys.stderr, "error before execution:",e sys.exit(255) #father process return pid def start(win, max_child, hlist, outdir, randomize, command_line, timeout): total = len(hlist) if randomize: random.shuffle(hlist) signal.signal(signal.SIGALRM, handler) signal.signal(signal.SIGINT, handler) signal.alarm(update_rate) sl = Slots(max_child, win, timeout, outdir) global gsl global stopping gsl = sl count = 0 for i in hlist: slot = sl.getSlot(i[0], count) if stopping>0: slot.drawLine('Done', done=True) break count += 1 slot.drawLine(i[1]) x = generateCommands(command_line, i) sl.drawTitle("%d/%d:%s" %(count, total,' '.join(x))) outpath = '/dev/null' errpath = '/dev/null' if outdir: outpath = os.path.join(outdir, '%s.out'%i[0]) errpath = os.path.join(outdir, '%s.err'%i[0]) pid = spawn(x, outpath, errpath, setpgrp = True) sl.mapPID(pid, slot) while sl.pids: try: slot = sl.waitJobs() except RuntimeError: print >> sys.stderr, 'Warning: lost tracking of %d jobs' % len(sl.pids) return slot.drawLine('Done', done = True) #Done def get_output(outdir, argument_list, out= True, err=False, status=False): """ For post processing the output dir. @param out: decide whether to process *.out files @param err: decide whether to process *.err files @param status: decide whether to process *.status files @return: (out, err, status): out is a hash table, in which the keys are the arguments, and the values are the string of the output, if available. err is similar. the values of hash table status is the value of exit status in int. """ if not out and not err and not status: raise RuntimeError("one of out, err and status has to be True") result = ({},{},{}) mapping = ('out','err','status') p = [] if out: p.append(0) if err: p.append(1) if status: p.append(2) for arg in argument_list: basefn = os.path.join(outdir, arg) for i in p: fn = '.'.join( (basefn, mapping[i]) ) #basefn.ext try: lines = open(fn).readlines() result[i][arg]=''.join(lines) except IOError: pass if not status: return result int_status = {} for k,v in result[2].items(): try: int_status[k] = int(v.strip()) except ValueError: pass return result[0], result[1], int_status def main(): options = 'hP:ra:o:yt:pn' long_opts = ['help','max-procs=','randomize','args=','output=','noprompt','timeout=','plain', 'version','no-exec'] try: opts,args = getopt.getopt(sys.argv[1:], options,long_opts) except getopt.GetoptError: print "Unknown options" usage() sys.exit(1) #set default values ask_prompt = True maxchild = 30 randomize = False hostfile = sys.stdin outdir = '' timeout = 0 plain = False no_exec = False if os.environ.has_key('VXARGS_OUTDIR'): outdir = os.environ['VXARGS_OUTDIR'] for o,a in opts: if o in ['--version']: print "vxargs version",version print "Copyright (c) 2004 Yun Mao (maoy@cis.upenn.edu)" print "Freely distributed under GNU LGPL License" sys.exit(1) elif o in ['-h','--help']: usage() sys.exit(1) elif o in ['-r','--randomize']: randomize = True elif o in ['-P','--max-procs']: maxchild = int(a) elif o in ['-a','--args']: try: hostfile = open(a,'r') except IOError, e: print "argument file %s has error: %s" % ( a, str(e) ) sys.exit(3) elif o in ['-o','--output']: outdir = a if a =='/dev/null': outdir = '' elif o in ['-y','--noprompt']: ask_prompt = False elif o in ['-t','--timeout']: timeout = int(a) elif o in ['-p','--plain']: plain = True elif o in ['-n','--no-exec']: no_exec = True else: print 'Unknown options' usage() sys.exit(1) if len(args)<1: print "No command given." usage() sys.exit(1) #now test outdir if outdir: if os.path.exists(outdir): if not os.path.isdir(outdir): print "%s exists and is not a dir, won't continue" % outdir sys.exit(3) elif no_exec: print "%s is the destination dir and would be destroyed." % (outdir) elif ask_prompt: if hostfile == sys.stdin: print "You must specify --noprompt (-y) option if no --args (-a) or --no-exec (-n) is given. Doing so will destroy folder %s." % (outdir) sys.exit(3) else: result = raw_input("%s exists. Continue will destroy everything in it. Are you sure? (y/n) " % (outdir)) if result not in ['y','Y']: sys.exit(3) os.system('rm -f %s' % (os.path.join(outdir,'*'))) else: if not no_exec: os.system('mkdir -p %s' % outdir) hlist = getListFromFile(hostfile) if no_exec: for i in hlist: real_cmdline = generateCommands(args, i) print ' '.join(real_cmdline) sys.exit(0) if plain: # no fancy output return start(None, maxchild, hlist, outdir, randomize, args, timeout) else: # use fancy curses-based animation try: curses.wrapper(start, maxchild, hlist, outdir, randomize, args, timeout) except curses.error: sys.exit(4) #post execution, output some stats total = 0 for k,v in final_stats.items(): print "exit code %d: %d job(s)" % (k,v) total += v print "total number of jobs:", total def usage(): print """\ NAME vxargs - build and execute command lines from an argument list file with visualization and parallelism, and output redirection. DESCRIPTION vxargs reads a list of arguments from a txt file or standard input, delimited by newlines, and executes the command one or more times with initial arguments in which {} is substituted by the arguments read from the file or standard input. The current executing commands and progress will be dynamically updated on the screen. Stdout and stderr of each command will be redirected to separate files. A list of all processes with a non-zero exit status is generated in file abnormal_list. A list of all timeout processes is generated in file killed_list. SYNOPSIS vxargs [OPTIONS] command [initial-arguments] OPTIONS --help Print a summary of the options to vxargs and exit. --max-procs=max-procs, -P max-procs Run up to max-procs processes at a time; the default is 30. --randomize, -r [OPTIONAL] Randomize the host list before all execution. --args=filename, -a filename The arguments file. If unspecified, the arguments will be read from standard input, and -y option must be specified. --output=outdir, -o outdir output directory for stdout and stderr files The default value is specified by the environment variable VXARGS_OUTDIR. If it is unspecified, both stdout and stderr will be redirected to /dev/null. Note that if the directory existed before execution, everything inside will be wiped. --timeout=timeout, -t timeout The maximal time in second for each command to execute. timeout=0 means infinite. 0 (i.e. infinite) is the default value. When the time is up, vxargs will send signal SIGINT to the process. If the process does not stop after 2 seconds, vxargs will send SIGTERM signal, and send SIGKILL if it still keeps running after 3 seconds. --noprompt, -y Wipe out the outdir without confirmation. --no-exec, -n Print the commands that would be executed, but do not execute them. --plain, -p Don't use curses-based output, but plain output to stdout instead. It will be less exciting, but will do the same job effectively. It is useful if one wants to start vxargs from cron or by another program that doesn't want to see the output. By default, vxargs uses the curses-based output. --version Display current version and copyright information. EXAMPLES: Suppose the iplist.txt file has following content: $ cat iplist.txt 216.165.109.79 #planetx.scs.cs.nyu.edu 158.130.6.254 #planetlab1.cis.upenn.edu 158.130.6.253 #planetlab2.cis.upenn.edu 128.232.103.203 #planetlab3.xeno.cl.cam.ac.uk Note that lines starting with '#' will be interpreted as comment for the previous lines, which is optional, for visualization purpose only. $ vxargs -a iplist.txt -o /tmp/result -P 10 ssh upenn_dharma@{} "hostname;uptime" ...[ UI output]... $ cat /tmp/result/* planetlab3.xeno.cl.cam.ac.uk 03:13:21 up 4 days, 14:36, 0 users, load average: 0.36, 0.44, 0.44 planetlab2.cis.upenn.edu 03:13:20 up 26 days, 16:19, 0 users, load average: 8.11, 7.41, 7.41 planetlab1.cis.upenn.edu 03:13:19 up 22 days, 20:02, 0 users, load average: 13.60, 12.55, 12.59 ssh: connect to host 216.165.109.79 port 22: Connection timed out $ other examples: cat iplist.txt | vxargs -o /tmp/result rsync -az -e ssh --delete mirror $SLICE@{}: vxargs -a iplist.txt -o /tmp/result ssh {} killall -9 java For more information, please visit http://dharma.cis.upenn.edu/planetlab/vxargs/ """ if __name__=='__main__': main()