Package TEES :: Package Tools :: Module ProcessUtils
[hide private]

Source Code for Module TEES.Tools.ProcessUtils

  1  import sys, os, codecs, time, signal 
  2  sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/..") 
  3  from Utils.ProgressCounter import ProgressCounter 
  4   
  5  try: 
  6      import xml.etree.cElementTree as ET 
  7  except ImportError: 
  8      import cElementTree as ET 
  9   
10 -class ProcessWrapper:
11 """ 12 Killing a process spawned by a shell is not really possible (at least in Python). 13 This becomes a problem, if a tool requires multiple (e.g. piped) processes to be 14 ran. With ProcessWrapper, all processes can be called directly from Python so 15 that their ids are known and they can be killed if they hang. A ProcessWrapper can 16 be passed as a parameter to ProcessUtils functions in place of a subprocess.Popen 17 object. 18 """
19 - def __init__(self, processes):
20 self.processes = processes # subprocesses
21
22 - def kill(self):
23 """ 24 Kill all subprocesses 25 """ 26 for process in self.processes: 27 try: 28 process.kill() 29 except: 30 pass 31 for process in self.processes: 32 poll = process.poll() 33 #print poll 34 while poll == None: 35 poll = process.poll() 36 time.sleep(1)
37 #print poll 38
39 - def poll(self):
40 """ 41 If any subprocess is running, returns None (not finished). 42 """ 43 for process in self.processes: 44 if process.poll() == None: 45 return None 46 return "FINISHED"
47
48 -def waitForProcess(process, numCorpusSentences, measureByGap, outputFile, counterName, updateMessage, timeout=None):
49 """ 50 Waits for a process to finish, and tracks the number of entities it writes 51 to it's outputfile. If writing a sentence takes longer than the timeout, 52 the process is considered stalled and is killed. 53 """ 54 maxStartupTime = 600 # Give extra time for the process to start up (even if it creates immediately an empty output file) 55 counter = ProgressCounter(numCorpusSentences, counterName) 56 counter.showMilliseconds = True 57 prevNumSentences = 0 # Number of output sentences on previous check 58 finalCheckLeft = True # Make one final check to update counters 59 processStatus = None # When None, process not finished 60 prevTime = time.time() 61 startTime = time.time() 62 # Wait until process is finished and periodically check it's progress. 63 while processStatus == None or finalCheckLeft: 64 if processStatus != None: # Extra loop to let counters finish 65 finalCheckLeft = False # Done only once 66 if os.path.exists(outputFile[0]): # Output file has already appeared on disk 67 # Measure number of sentences in output file 68 numSentences = 0 69 f = codecs.open(outputFile[0], "rt", **outputFile[1]) 70 for line in f: 71 if measureByGap: 72 if line.strip() == "": 73 numSentences += 1 74 else: 75 numSentences += 1 76 f.close() 77 # Update status 78 if numSentences - prevNumSentences != 0: # Process has progressed 79 counter.update(numSentences - prevNumSentences, updateMessage + ": ") 80 if finalCheckLeft: # This is a normal loop, not the final check 81 # Startuptime hasn't yet passed or process has made progress 82 if time.time() - startTime < maxStartupTime or numSentences - prevNumSentences != 0: 83 #if prevNumSentences == 0 or numSentences - prevNumSentences != 0: 84 prevTime = time.time() # reset timeout 85 else: # Nothing happened on this update, check whether process hung 86 elapsedTime = time.time() - prevTime 87 if timeout != None and elapsedTime > timeout: 88 print >> sys.stderr, "Process timed out (" + str(elapsedTime) + " vs. " + str(timeout) + ")" 89 print >> sys.stderr, "Killing process" 90 process.kill() 91 prevNumSentences = numSentences 92 time.sleep(1) 93 else: # Output file doesn't exist yet 94 prevTime = time.time() # reset counter if output file hasn't been created 95 processStatus = process.poll() # Get process status, None == still running 96 97 counter.markFinished() # If we get this far, don't show the error message even if process didn't finish 98 return (numSentences, numCorpusSentences)
99
100 -def makeSubset(input, workdir, fromLine):
101 """ 102 Make a subset of the input data from "fromLine" to end of input file. 103 """ 104 newInput = os.path.join(workdir, "input-from-" + str(fromLine)) 105 newInputFile = codecs.open(newInput, "wt", "utf-8") 106 107 inputFile = codecs.open(input, "rt", "utf-8") 108 lineCount = -1 109 for line in inputFile: 110 lineCount += 1 111 if lineCount < fromLine: 112 continue 113 newInputFile.write(line) 114 inputFile.close() 115 newInputFile.close() 116 return newInput
117
118 -def mergeOutput(dir, numCorpusSentences, measureByGap, outputArgs={}):
119 """ 120 Merge output files (multiple files may have been created if program failed on a sentence) 121 """ 122 filenames = os.listdir(dir) 123 outputs = [] 124 for filename in filenames: 125 if filename.find("output-from") != -1: 126 outputs.append( (int(filename.rsplit("-", 1)[-1]), filename) ) 127 outputs.sort() # Order output sets by their first sentence index 128 #print outputs 129 130 mergedOutput = codecs.open(os.path.join(dir, "merged-output"), "wt", **outputArgs) 131 132 missingSentences = 0 133 numSentences = 0 134 # Go through output subsets in order 135 for i in range(len(outputs)): 136 f = codecs.open(os.path.join(dir, outputs[i][1]), "rt", **outputArgs) 137 for line in f: # Copy to merged file 138 mergedOutput.write(line) 139 if measureByGap: 140 if line.strip() == "": 141 numSentences += 1 142 else: 143 numSentences += 1 144 f.close() 145 # If sentences are missing from output, write empty lines in merged output 146 if i < len(outputs) - 1: # not last output 147 while numSentences < outputs[i+1][0]: # Start of next subset not reached yet 148 mergedOutput.write("\n") 149 numSentences += 1 150 missingSentences += 1 151 else: # last of the output subsets 152 while numSentences < numCorpusSentences: # End of whole data not reached yet 153 mergedOutput.write("\n") 154 numSentences += 1 155 missingSentences += 1 156 mergedOutput.close() 157 return missingSentences
158
159 -def getSubsetEndPos(subsetFileName, measureByGap):
160 """ 161 Return the sentence count to which this process reached by counting 162 the sentences in the output file. 163 """ 164 if subsetFileName.find("-from-") == -1: 165 return 0 166 numSentences = getLines(subsetFileName, measureByGap) 167 subsetPos = int(subsetFileName.rsplit("-", 1)[-1]) 168 return subsetPos + numSentences
169
170 -def getLines(filename, measureByGap):
171 """ 172 Number of sentences in the file, measured either in lines, or by empty "gap" lines 173 """ 174 numSentences = 0 175 f = codecs.open(filename, "rt", "utf-8") 176 for line in f: 177 if measureByGap: 178 if line.strip() == "": 179 numSentences += 1 180 else: 181 numSentences += 1 182 f.close() 183 return numSentences
184
185 -def runSentenceProcess(launchProcess, programDir, input, workdir, measureByGap, counterName, updateMessage, timeout=None, processArgs={}, outputArgs={}):
186 """ 187 Runs a process on input sentences, and in case of problems skips one sentence and 188 reruns the process on the remaining ones. 189 """ 190 # Count input sentences 191 input = os.path.abspath(input) 192 origInput = input 193 numCorpusSentences = 0 194 inputFile = codecs.open(input, "rt", "utf-8") 195 for line in inputFile: 196 numCorpusSentences += 1 197 inputFile.close() 198 199 if "encoding" not in outputArgs: 200 outputArgs["encoding"] = "utf-8" 201 202 cwd = os.getcwd() 203 os.chdir(programDir) 204 finished = False 205 startLine = 0 206 while not finished: 207 # Count lines in input file (input data must be in a one sentence per line -format) 208 inputLines = 0 209 inputFile = codecs.open(input, "rt", "utf-8") 210 for line in inputFile: 211 inputLines += 1 212 inputFile.close() 213 214 output = os.path.join(workdir, "output-from-" + str(startLine)) 215 process = launchProcess(input, output, **processArgs) 216 result = waitForProcess(process, inputLines, measureByGap, (output, outputArgs), counterName, updateMessage, timeout) 217 if result[0] != result[1]: 218 gap = 1 219 startLine = getSubsetEndPos(output, measureByGap) + gap 220 if startLine >= numCorpusSentences: 221 finished = True 222 else: 223 print >> sys.stderr, "Process failed for sentence " + str(startLine-gap) + ", rerunning from sentence", startLine 224 input = makeSubset(origInput, workdir, startLine) 225 else: 226 finished = True 227 os.chdir(cwd) 228 229 numMissedSentences = mergeOutput(workdir, numCorpusSentences, measureByGap, outputArgs=outputArgs) 230 if numMissedSentences == 0: 231 print >> sys.stderr, "Processed succesfully all sentences" 232 else: 233 print >> sys.stderr, "Warning, processing failed for", numMissedSentences, "out of", numCorpusSentences, "sentences" 234 return os.path.abspath(os.path.join(workdir, "merged-output"))
235
236 -def getElementIndex(parent, element):
237 index = 0 238 for e in parent: 239 if e == element: 240 return index 241 index += 1 242 return -1
243
244 -def getPrevElementIndex(parent, eTag):
245 index = 0 246 elemIndex = -1 247 for element in parent: 248 if element.tag == eTag: 249 elemIndex = index 250 index += 1 251 return elemIndex
252
253 -def getElementByAttrib(parent, tag, attDict):
254 for element in parent.getiterator(): 255 if element.tag == tag: 256 found = True 257 for k, v in attDict.iteritems(): 258 if element.get(k) != v: 259 found = False 260 if found: 261 return element 262 return None
263
264 -def setDefaultElement(parent, name):
265 element = parent.find(name) 266 if element == None: 267 element = ET.Element(name) 268 parent.append(element) 269 return element
270