Package TEES :: Module batch
[hide private]

Source Code for Module TEES.batch

  1  """ 
  2  Process a large number of input files 
  3  """ 
  4   
  5  import sys, os 
  6  import time 
  7  import re 
  8  from Utils.Connection.Connection import getConnection 
  9   
10 -def getMaxJobsFromFile(controlFilename):
11 f = open(controlFilename, "rt") 12 lines = f.readlines() 13 f.close() 14 assert len(lines) == 1 15 value = int(lines[0]) 16 if value == 0: 17 print >> sys.stderr, "Exit by control file request" 18 sys.exit() 19 return value
20
21 -def getMaxJobs(maxJobs, controlFilename=None):
22 if maxJobs == None: 23 if controlFilename != None: 24 return getMaxJobsSetting(controlFilename) 25 else: 26 return None 27 else: 28 return maxJobs
29
30 -def prepareCommand(template, input=None, jobTag=None, output=None):
31 if "%i" in template or "%a" in template or "%b" in template: 32 assert input != None 33 template = template.replace("%i", input) 34 template = template.replace("%a", os.path.abspath(input)) 35 template = template.replace("%b", os.path.basename(input)) 36 if "%j" in template: 37 assert jobTag != None 38 template = template.replace("%j", jobTag) 39 if "%o" in template: 40 assert output != None 41 template = template.replace("%o", output) 42 return template
43
44 -def submitJob(command, input, connection, jobTag=None, output=None, regex=None, dummy=False, rerun=None, hideFinished=False):
45 if input != None and input.endswith(".job"): 46 if connection.debug: 47 print >> sys.stderr, "Skipped job control file", input 48 return 49 if connection.debug: 50 print >> sys.stderr, "Preparing to submit a job for input", input 51 if regex != None and regex.match(input) == None: 52 if connection.debug: 53 print >> sys.stderr, "Regular expression did not match input, no job submitted" 54 return 55 elif connection.debug and input != None: 56 print >> sys.stderr, "Regular expression matched the input" 57 58 if input != None: 59 # Determine job name and directory from the input file 60 jobDir = os.path.abspath(os.path.dirname(input)) 61 jobName = os.path.basename(input) 62 if jobName == "": # input is a directory 63 jobName = jobDir.rstrip("/").split("/")[-1] # use directory name as job name 64 jobDir = jobDir.rstrip("/").split("/")[0] # save job control file on the same level as the directory 65 if jobTag != None: 66 jobName += "-" + jobTag 67 # A defined output directory means the job file goes there 68 if output != None: 69 jobDir = output 70 else: # inputless job 71 assert jobTag != None 72 jobName = jobTag 73 jobDir = output 74 75 print >> sys.stderr, "Processing job", jobName, "for input", input 76 jobStatus = connection.getJobStatusByName(jobDir, jobName) 77 if jobStatus != None: 78 if rerun != None and jobStatus in rerun: 79 print >> sys.stderr, "Rerunning job with status", jobStatus 80 else: 81 if jobStatus == "RUNNING": 82 print >> sys.stderr, "Skipping currently running job" 83 elif not hideFinished: 84 print >> sys.stderr, "Skipping already processed job with status", jobStatus 85 return False 86 87 command = prepareCommand(command, input, jobTag, output) 88 89 if not dummy: 90 connection.submit(command, jobDir, jobName) 91 else: 92 print >> sys.stderr, "Dummy mode" 93 if connection.debug: 94 print >> sys.stderr, "------- Job command -------" 95 print >> sys.stderr, command 96 print >> sys.stderr, "--------------------------" 97 return True
98
99 -def waitForJobs(maxJobs, submitCount, connection, controlFilename=None, sleepTime=15):
100 currentJobs = connection.getNumJobs() 101 currentMaxJobs = getMaxJobs(maxJobs, controlFilename) 102 print >> sys.stderr, "Current jobs", str(currentJobs) + ", max jobs", str(currentMaxJobs) + ", submitted jobs", submitCount 103 if currentMaxJobs != None: 104 while(currentJobs >= currentMaxJobs): 105 time.sleep(sleepTime) 106 currentJobs = connection.getNumJobs() 107 currentMaxJobs = getMaxJobs(maxJobs, controlFilename) 108 print >> sys.stderr, "Current jobs", str(currentJobs) + ", max jobs", str(currentMaxJobs) + ", submitted jobs", submitCount
109
110 -def getOutputDir(currentDir, currentItem, input, output=None):
111 if output == None: 112 return None 113 else: 114 print (currentDir, currentItem, input, output, "TEST") 115 relativeCurrentDir = os.path.abspath(currentDir)[len(os.path.abspath(input)):] 116 relativeCurrentDir = relativeCurrentDir.lstrip("/") 117 return os.path.join(output, relativeCurrentDir)
118
119 -def batch(command, input=None, connection=None, jobTag=None, output=None, regex=None, regexDir=None, dummy=False, rerun=None, 120 hideFinished=False, controlFilename=None, sleepTime=None, debug=False, limit=None, loop=False):
121 """ 122 Process a large number of input files 123 124 @param input: An input file or directory. A directory will be processed recursively 125 @param connection: A parameter set defining a local connection for submitting the jobs 126 @param jobTag: The name of the job file, usually if input is not defined. Can be used in the command template. 127 @param output: An optional output directory. The input directory tree will be replicated here. 128 @param regex: A regular expression for selecting input files 129 @param regexDir: A regular expression for input directories, allowing early out for entire subtrees 130 @param dummy: In dummy mode, jobs are only printed on screen, not submitted. Good for testing 131 @param rerun: A job is normally submitted only if it does not already exist. If an existing job needs to be resubmitted, this defines the status codes, usually FAILED or FINISHED 132 @param hideFinished: Do not print a notification when skipping an existing job 133 @param controlFilename: A file with only one number inside it. This is the job limit, and can be changed while batch.py is running. 134 @param sleepTime: The time to wait between checks when waiting for jobs to finish. Default is 15 seconds. 135 @param debug: Job submission scripts are printed on screen. 136 @param limit: Maximum number of jobs. Overrides controlFilename 137 @param loop: Loop over the input directory. Otherwise process it once. 138 """ 139 if sleepTime == None: 140 sleepTime = 15 141 connection = getConnection(connection) 142 connection.debug = debug 143 if input == None: # an inputless batch job: 144 waitForJobs(limit, 0, connection, controlFilename, sleepTime) 145 submitJob(command, input, connection, jobTag, output, regex, dummy, rerun, hideFinished) 146 elif os.path.exists(input) and os.path.isfile(input): # single file 147 waitForJobs(limit, 0, connection, controlFilename, sleepTime) 148 submitJob(command, input, connection, jobTag, output, regex, dummy, rerun, hideFinished) 149 else: # walk directory tree 150 firstLoop = True 151 submitCount = 0 152 while firstLoop or loop: 153 waitForJobs(limit, submitCount, connection, controlFilename, sleepTime) 154 for triple in os.walk(input): 155 if regexDir != None and regexDir.match(os.path.join(triple[0])) == None: 156 print >> sys.stderr, "Skipping directory", triple[0] 157 continue 158 else: 159 print >> sys.stderr, "Processing directory", triple[0] 160 for item in sorted(triple[1]) + sorted(triple[2]): # process both directories and files 161 #print item, triple, os.path.join(triple[0], item) 162 if submitJob(command, os.path.join(triple[0], item), connection, jobTag, getOutputDir(triple[0], item, input, output), regex, dummy, rerun, hideFinished): 163 submitCount += 1 164 # number of submitted jobs has increased, so check if we need to wait 165 waitForJobs(limit, submitCount, connection, controlFilename, sleepTime) 166 firstLoop = False
167 168 if __name__=="__main__": 169 # Import Psyco if available 170 try: 171 import psyco 172 psyco.full() 173 print >> sys.stderr, "Found Psyco, using" 174 except ImportError: 175 print >> sys.stderr, "Psyco not installed" 176 177 from optparse import OptionParser 178 optparser = OptionParser(description="Batch process a tree of input files") 179 optparser.add_option("-c", "--command", default=None, dest="command", help="") 180 optparser.add_option("-i", "--input", default=None, dest="input", help="Input file or directory. A directory will be processed recursively") 181 optparser.add_option("-n", "--connection", default=None, dest="connection", help="") 182 optparser.add_option("-r", "--regex", default=None, dest="regex", help="") 183 optparser.add_option("-d", "--regexDir", default=None, dest="regexDir", help="") 184 optparser.add_option("-j", "--job", default=None, dest="job", help="job name") 185 optparser.add_option("-o", "--output", default=None, dest="output", help="") 186 optparser.add_option("-l", "--limit", default=None, dest="limit", help="") 187 optparser.add_option("--debug", default=False, action="store_true", dest="debug", help="Print jobs on screen") 188 optparser.add_option("--controlFile", default=None, dest="controlFile", help="") 189 optparser.add_option("--sleepTime", default=None, dest="sleepTime", help="") 190 optparser.add_option("--dummy", default=False, action="store_true", dest="dummy", help="Don't submit jobs") 191 optparser.add_option("--rerun", default=None, dest="rerun", help="Rerun jobs which have one of these states (comma-separated list)") 192 optparser.add_option("--maxJobs", default=None, type="int", dest="maxJobs", help="Maximum number of jobs in queue/running") 193 optparser.add_option("--hideFinished", default=False, action="store_true", dest="hideFinished", help="") 194 optparser.add_option("--loop", default=False, action="store_true", dest="loop", help="Continuously loop through the input directory") 195 (options, args) = optparser.parse_args() 196 197 assert options.command != None 198 if options.limit != None: options.limit = int(options.limit) 199 if options.rerun != None: options.rerun = options.rerun.split(",") 200 if options.sleepTime != None: options.sleepTime = int(options.sleepTime) 201 if options.regex != None: options.regex = re.compile(options.regex) 202 if options.regexDir != None: options.regexDir = re.compile(options.regexDir) 203 batch(command=options.command, input=options.input, connection=options.connection, jobTag=options.job, 204 output=options.output, 205 regex=options.regex, regexDir=options.regexDir, dummy=options.dummy, rerun=options.rerun, 206 hideFinished=options.hideFinished, controlFilename=options.controlFile, sleepTime=options.sleepTime, 207 debug=options.debug, limit=options.limit, loop=options.loop) 208