1 """
2 Process a large number of input files
3 """
4
5 import sys, os
6 import time
7 import re
8 from Utils.Connection.Connection import getConnection
9
11 f = open(controlFilename, "rt")
12 lines = f.readlines()
13 f.close()
14 assert len(lines) == 1
15 value = int(lines[0])
16 if value == 0:
17 print >> sys.stderr, "Exit by control file request"
18 sys.exit()
19 return value
20
22 if maxJobs == None:
23 if controlFilename != None:
24 return getMaxJobsSetting(controlFilename)
25 else:
26 return None
27 else:
28 return maxJobs
29
31 if "%i" in template or "%a" in template or "%b" in template:
32 assert input != None
33 template = template.replace("%i", input)
34 template = template.replace("%a", os.path.abspath(input))
35 template = template.replace("%b", os.path.basename(input))
36 if "%j" in template:
37 assert jobTag != None
38 template = template.replace("%j", jobTag)
39 if "%o" in template:
40 assert output != None
41 template = template.replace("%o", output)
42 return template
43
44 -def submitJob(command, input, connection, jobTag=None, output=None, regex=None, dummy=False, rerun=None, hideFinished=False):
45 if input != None and input.endswith(".job"):
46 if connection.debug:
47 print >> sys.stderr, "Skipped job control file", input
48 return
49 if connection.debug:
50 print >> sys.stderr, "Preparing to submit a job for input", input
51 if regex != None and regex.match(input) == None:
52 if connection.debug:
53 print >> sys.stderr, "Regular expression did not match input, no job submitted"
54 return
55 elif connection.debug and input != None:
56 print >> sys.stderr, "Regular expression matched the input"
57
58 if input != None:
59
60 jobDir = os.path.abspath(os.path.dirname(input))
61 jobName = os.path.basename(input)
62 if jobName == "":
63 jobName = jobDir.rstrip("/").split("/")[-1]
64 jobDir = jobDir.rstrip("/").split("/")[0]
65 if jobTag != None:
66 jobName += "-" + jobTag
67
68 if output != None:
69 jobDir = output
70 else:
71 assert jobTag != None
72 jobName = jobTag
73 jobDir = output
74
75 print >> sys.stderr, "Processing job", jobName, "for input", input
76 jobStatus = connection.getJobStatusByName(jobDir, jobName)
77 if jobStatus != None:
78 if rerun != None and jobStatus in rerun:
79 print >> sys.stderr, "Rerunning job with status", jobStatus
80 else:
81 if jobStatus == "RUNNING":
82 print >> sys.stderr, "Skipping currently running job"
83 elif not hideFinished:
84 print >> sys.stderr, "Skipping already processed job with status", jobStatus
85 return False
86
87 command = prepareCommand(command, input, jobTag, output)
88
89 if not dummy:
90 connection.submit(command, jobDir, jobName)
91 else:
92 print >> sys.stderr, "Dummy mode"
93 if connection.debug:
94 print >> sys.stderr, "------- Job command -------"
95 print >> sys.stderr, command
96 print >> sys.stderr, "--------------------------"
97 return True
98
99 -def waitForJobs(maxJobs, submitCount, connection, controlFilename=None, sleepTime=15):
100 currentJobs = connection.getNumJobs()
101 currentMaxJobs = getMaxJobs(maxJobs, controlFilename)
102 print >> sys.stderr, "Current jobs", str(currentJobs) + ", max jobs", str(currentMaxJobs) + ", submitted jobs", submitCount
103 if currentMaxJobs != None:
104 while(currentJobs >= currentMaxJobs):
105 time.sleep(sleepTime)
106 currentJobs = connection.getNumJobs()
107 currentMaxJobs = getMaxJobs(maxJobs, controlFilename)
108 print >> sys.stderr, "Current jobs", str(currentJobs) + ", max jobs", str(currentMaxJobs) + ", submitted jobs", submitCount
109
110 -def getOutputDir(currentDir, currentItem, input, output=None):
111 if output == None:
112 return None
113 else:
114 print (currentDir, currentItem, input, output, "TEST")
115 relativeCurrentDir = os.path.abspath(currentDir)[len(os.path.abspath(input)):]
116 relativeCurrentDir = relativeCurrentDir.lstrip("/")
117 return os.path.join(output, relativeCurrentDir)
118
119 -def batch(command, input=None, connection=None, jobTag=None, output=None, regex=None, regexDir=None, dummy=False, rerun=None,
120 hideFinished=False, controlFilename=None, sleepTime=None, debug=False, limit=None, loop=False):
121 """
122 Process a large number of input files
123
124 @param input: An input file or directory. A directory will be processed recursively
125 @param connection: A parameter set defining a local connection for submitting the jobs
126 @param jobTag: The name of the job file, usually if input is not defined. Can be used in the command template.
127 @param output: An optional output directory. The input directory tree will be replicated here.
128 @param regex: A regular expression for selecting input files
129 @param regexDir: A regular expression for input directories, allowing early out for entire subtrees
130 @param dummy: In dummy mode, jobs are only printed on screen, not submitted. Good for testing
131 @param rerun: A job is normally submitted only if it does not already exist. If an existing job needs to be resubmitted, this defines the status codes, usually FAILED or FINISHED
132 @param hideFinished: Do not print a notification when skipping an existing job
133 @param controlFilename: A file with only one number inside it. This is the job limit, and can be changed while batch.py is running.
134 @param sleepTime: The time to wait between checks when waiting for jobs to finish. Default is 15 seconds.
135 @param debug: Job submission scripts are printed on screen.
136 @param limit: Maximum number of jobs. Overrides controlFilename
137 @param loop: Loop over the input directory. Otherwise process it once.
138 """
139 if sleepTime == None:
140 sleepTime = 15
141 connection = getConnection(connection)
142 connection.debug = debug
143 if input == None:
144 waitForJobs(limit, 0, connection, controlFilename, sleepTime)
145 submitJob(command, input, connection, jobTag, output, regex, dummy, rerun, hideFinished)
146 elif os.path.exists(input) and os.path.isfile(input):
147 waitForJobs(limit, 0, connection, controlFilename, sleepTime)
148 submitJob(command, input, connection, jobTag, output, regex, dummy, rerun, hideFinished)
149 else:
150 firstLoop = True
151 submitCount = 0
152 while firstLoop or loop:
153 waitForJobs(limit, submitCount, connection, controlFilename, sleepTime)
154 for triple in os.walk(input):
155 if regexDir != None and regexDir.match(os.path.join(triple[0])) == None:
156 print >> sys.stderr, "Skipping directory", triple[0]
157 continue
158 else:
159 print >> sys.stderr, "Processing directory", triple[0]
160 for item in sorted(triple[1]) + sorted(triple[2]):
161
162 if submitJob(command, os.path.join(triple[0], item), connection, jobTag, getOutputDir(triple[0], item, input, output), regex, dummy, rerun, hideFinished):
163 submitCount += 1
164
165 waitForJobs(limit, submitCount, connection, controlFilename, sleepTime)
166 firstLoop = False
167
168 if __name__=="__main__":
169
170 try:
171 import psyco
172 psyco.full()
173 print >> sys.stderr, "Found Psyco, using"
174 except ImportError:
175 print >> sys.stderr, "Psyco not installed"
176
177 from optparse import OptionParser
178 optparser = OptionParser(description="Batch process a tree of input files")
179 optparser.add_option("-c", "--command", default=None, dest="command", help="")
180 optparser.add_option("-i", "--input", default=None, dest="input", help="Input file or directory. A directory will be processed recursively")
181 optparser.add_option("-n", "--connection", default=None, dest="connection", help="")
182 optparser.add_option("-r", "--regex", default=None, dest="regex", help="")
183 optparser.add_option("-d", "--regexDir", default=None, dest="regexDir", help="")
184 optparser.add_option("-j", "--job", default=None, dest="job", help="job name")
185 optparser.add_option("-o", "--output", default=None, dest="output", help="")
186 optparser.add_option("-l", "--limit", default=None, dest="limit", help="")
187 optparser.add_option("--debug", default=False, action="store_true", dest="debug", help="Print jobs on screen")
188 optparser.add_option("--controlFile", default=None, dest="controlFile", help="")
189 optparser.add_option("--sleepTime", default=None, dest="sleepTime", help="")
190 optparser.add_option("--dummy", default=False, action="store_true", dest="dummy", help="Don't submit jobs")
191 optparser.add_option("--rerun", default=None, dest="rerun", help="Rerun jobs which have one of these states (comma-separated list)")
192 optparser.add_option("--maxJobs", default=None, type="int", dest="maxJobs", help="Maximum number of jobs in queue/running")
193 optparser.add_option("--hideFinished", default=False, action="store_true", dest="hideFinished", help="")
194 optparser.add_option("--loop", default=False, action="store_true", dest="loop", help="Continuously loop through the input directory")
195 (options, args) = optparser.parse_args()
196
197 assert options.command != None
198 if options.limit != None: options.limit = int(options.limit)
199 if options.rerun != None: options.rerun = options.rerun.split(",")
200 if options.sleepTime != None: options.sleepTime = int(options.sleepTime)
201 if options.regex != None: options.regex = re.compile(options.regex)
202 if options.regexDir != None: options.regexDir = re.compile(options.regexDir)
203 batch(command=options.command, input=options.input, connection=options.connection, jobTag=options.job,
204 output=options.output,
205 regex=options.regex, regexDir=options.regexDir, dummy=options.dummy, rerun=options.rerun,
206 hideFinished=options.hideFinished, controlFilename=options.controlFile, sleepTime=options.sleepTime,
207 debug=options.debug, limit=options.limit, loop=options.loop)
208