Package TEES :: Package Classifiers :: Module ExternalClassifier
[hide private]

Source Code for Module TEES.Classifiers.ExternalClassifier

  1  import sys,os 
  2  sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/..") 
  3  import shutil, tempfile 
  4  import subprocess 
  5  import Core.ExampleUtils as ExampleUtils 
  6  import Utils.Libraries.combine as combine 
  7  import copy 
  8  import tempfile 
  9  import atexit 
 10  import gzip 
 11  import types, copy 
 12  from Classifier import Classifier 
 13  import Utils.Parameters as Parameters 
 14  import Utils.Settings as Settings 
 15  import Utils.Connection.Connection as Connection 
 16  from Utils.Connection.UnixConnection import UnixConnection 
17 18 -def removeTempUnzipped(filename):
19 if os.path.exists(filename): 20 count = ExternalClassifier.getFileCounter(filename, removeIfZero=True) 21 if count == 0: 22 os.remove(filename)
23
24 -class ExternalClassifier(Classifier):
25 """ 26 A wrapper for external classifier executables. 27 """ 28
29 - def __init__(self, connection=None):
30 self.defaultEvaluator = None 31 if connection == None: 32 self.connection = UnixConnection() # A local connection 33 else: 34 self.connection = connection 35 self.parameterGrid = None 36 self.state = None 37 self._job = None 38 self._prevJobStatus = None 39 self._filesToRelease = [] 40 41 self.parameters = None 42 self.model = None 43 self.predictions = None 44 45 self.parameterFormat = "-%k %v" 46 self.trainDirSetting = None 47 self.trainCommand = None 48 self.classifyDirSetting = None 49 self.classifyCommand = None
50
51 - def getJob(self):
52 return self._job
53
54 - def getStatus(self):
55 if self._job != None: 56 self._prevJobStatus = self.connection.getJobStatus(self._job) 57 if self._prevJobStatus in ["FINISHED", "FAILED"]: 58 self.state = None 59 self._job = None 60 for filename in self._filesToRelease: 61 ExternalClassifier.getFileCounter(filename, add=-1, createIfNotExist=False) 62 self._filesToRelease = [] 63 if self._prevJobStatus == None: 64 return "FINISHED" 65 else: 66 return self._prevJobStatus
67
68 - def setState(self, stateName):
69 assert self.getStatus() in ["FINISHED", "FAILED"] 70 self.state = stateName 71 self._job = None 72 self._prevJobStatus = None 73 if stateName == "TRAIN" or stateName == "OPTIMIZE": 74 self.model = None 75 self.parameters = None 76 # for all states 77 self.predictions = None
78 #self.optimizeJobs = [] 79 80 @classmethod
81 - def getUnzipped(cls, filename):
82 """ 83 Temporarily uncompress a file, usually a compressed example file. The uncompressed 84 file appears in the same location as the original file. The /tmp directory is 85 as these examples are usually used by a classifier that is run in separate process, 86 which on clusters might end up on a different node, where the local /tmp is no 87 longer accessible. 88 """ 89 if not filename.endswith(".gz"): 90 return filename 91 tempfilename = filename[:-3] + "-unzipped-temp" 92 # Determine if the uncompressed file does not exist, or needs to be updated 93 uncompress = False 94 if os.path.exists(tempfilename): 95 if os.path.getmtime(filename) > os.path.getmtime(tempfilename): # compressed file has changed 96 uncompress = True 97 else: 98 uncompress = True 99 # Uncompress if needed 100 if uncompress: 101 print >> sys.stderr, "Uncompressing example file", filename 102 subprocess.call("gunzip -cfv " + filename + " > " + tempfilename, shell=True) 103 assert os.path.exists(filename) 104 assert os.path.exists(tempfilename) 105 atexit.register(removeTempUnzipped, tempfilename) # mark for deletion 106 return tempfilename
107 108 @classmethod
109 - def getFileCounter(cls, filename, add=0, createIfNotExist=False, removeIfZero=False):
110 """ 111 Keep track of the number of users on a temporary file 112 """ 113 filename += "-counter" 114 count = 0 115 if os.path.exists(filename): 116 f = open(filename, "rt") 117 lines = f.readlines() 118 f.close() 119 assert len(lines) == 1, filename 120 count = int(lines[0]) 121 elif not createIfNotExist: 122 return None 123 count += add 124 if count < 0: 125 count = 0 126 if removeIfZero and count == 0 and os.path.exists(filename): 127 os.remove(filename) 128 else: 129 f = open(filename, "wt") 130 f.write(str(count)) 131 f.close() 132 return count
133
134 - def getExampleFile(self, examples, upload=True, replaceRemote=True, dummy=False):
135 # If examples are in a list, they will be written to a file for SVM-multiclass 136 if examples == None: 137 return None 138 if dummy: 139 return "DUMMY" 140 elif type(examples) == types.ListType: 141 assert False 142 #ExampleUtils.writeExamples(examples, trainPath + "/") 143 else: 144 examplesPath = os.path.normpath(os.path.abspath(examples)) 145 146 localPath = examplesPath 147 if upload: 148 examplesPath = self.connection.upload(examplesPath, uncompress=True, replace=replaceRemote) 149 if examplesPath == localPath and examplesPath.endswith(".gz"): # no upload happened 150 examplesPath = ExternalClassifier.getUnzipped(examplesPath) # uncompress if not yet uncompressed 151 ExternalClassifier.getFileCounter(examplesPath, 1, createIfNotExist=True) # increase user counter in any case 152 print >> sys.stderr, self.__class__.__name__, "using example file", examples, "as", examplesPath 153 return examplesPath
154
155 - def train(self, examples, outDir, parameters, classifyExamples=None, finishBeforeReturn=False, replaceRemoteExamples=True, dummy=False):
156 outDir = os.path.abspath(outDir) 157 158 examples = self.getExampleFile(examples, replaceRemote=replaceRemoteExamples, dummy=dummy) 159 classifyExamples = self.getExampleFile(classifyExamples, replaceRemote=replaceRemoteExamples, dummy=dummy) 160 parameters = Parameters.get(parameters, valueListKey="c") 161 trainDir = self.connection.getSetting(self.trainDirSetting) 162 163 # Return a new classifier instance for following the training process and using the model 164 classifier = copy.copy(self) 165 classifier.setState("TRAIN") 166 classifier.parameters = parameters 167 classifier._filesToRelease = [examples, classifyExamples] 168 # Train 169 if not os.path.exists(outDir): 170 os.makedirs(outDir) 171 trainCommand = os.path.join(trainDir, self.trainCommand) 172 paramKeys = sorted(parameters.keys()) 173 idStr = "" 174 paramString = "" 175 for key in paramKeys: 176 if key.startswith("TEES."): 177 continue 178 if len(paramString) > 0 and not paramString.endswith(" "): 179 paramString += " " 180 if parameters[key] != None: 181 paramString += self.parameterFormat.replace("%k", key).replace("%v", str(parameters[key])).strip() 182 idStr += "-" + str(key) + "_" + str(parameters[key]) 183 else: 184 paramString += self.parameterFormat.replace("%k", key).replace("%v", "").strip() 185 idStr += "-" + str(key) 186 classifier.parameterIdStr = idStr 187 classifier.model = self.connection.getRemotePath(outDir + "/model" + idStr, True) 188 modelPath = self.connection.getRemotePath(outDir + "/model" + idStr, False) 189 trainCommand = trainCommand.replace("%p", paramString).replace("%e", examples).replace("%m", modelPath).strip() 190 self.connection.addCommand(trainCommand) 191 # Classify with the trained model (optional) 192 if classifyExamples != None: 193 classifier.predictions = self.connection.getRemotePath(outDir + "/predictions" + idStr, True) 194 predictionsPath = self.connection.getRemotePath(outDir + "/predictions" + idStr, False) 195 classifyDir = self.connection.getSetting(self.classifyDirSetting) 196 classifyCommand = os.path.join(classifyDir, self.classifyCommand).replace("%e", classifyExamples).replace("%m", modelPath).replace("%c", predictionsPath).strip() 197 self.connection.addCommand(classifyCommand) 198 # Run the process 199 jobName = self.trainCommand.split()[0] + idStr 200 logPath = outDir + "/" + jobName 201 if dummy: # return a classifier that connects to an existing job 202 self.connection.clearCommands() 203 classifier._job = self.connection.getJob(jobDir=outDir, jobName=jobName) 204 else: # submit the job 205 classifier._job = self.connection.submit(jobDir=outDir, jobName=jobName, stdout=logPath+".stdout") 206 if finishBeforeReturn: 207 self.connection.waitForJob(classifier._job) 208 self.getStatus() 209 return classifier
210
211 - def downloadModel(self, outPath=None, breakConnection=True):
212 assert self.getStatus() == "FINISHED" and self.model != None 213 self.model = self.connection.download(self.model, outPath) 214 if breakConnection: 215 self.connection = UnixConnection() # A local connection 216 return self.model
217
218 - def downloadPredictions(self, outPath=None):
219 assert self.getStatus() == "FINISHED" and self.predictions != None 220 self.predictions = self.connection.download(self.predictions, outPath) 221 return self.predictions
222
223 - def classify(self, examples, output, model=None, finishBeforeReturn=False, replaceRemoteFiles=True):
224 output = os.path.abspath(output) 225 # Return a new classifier instance for following the training process and using the model 226 classifier = copy.copy(self) 227 classifier.setState("CLASSIFY") 228 # Classify 229 if model == None: 230 classifier.model = model = self.model 231 model = os.path.abspath(model) 232 model = self.connection.upload(model, uncompress=True, replace=replaceRemoteFiles) 233 classifier.predictions = self.connection.getRemotePath(output, True) 234 predictionsPath = self.connection.getRemotePath(output, False) 235 examples = self.getExampleFile(examples, replaceRemote=replaceRemoteFiles) 236 classifier._filesToRelease = [examples] 237 self.connection.clearCommands() 238 classifyDir = self.connection.getSetting(self.classifyDirSetting) 239 classifyCommand = os.path.join(classifyDir, self.classifyCommand).replace("%e", examples).replace("%m", model).replace("%c", predictionsPath).strip() 240 self.connection.addCommand(classifyCommand) 241 classifier._job = self.connection.submit(jobDir=os.path.abspath(os.path.dirname(output)), 242 jobName=self.classifyCommand.split()[0] + "-" + os.path.basename(model)) 243 if finishBeforeReturn: 244 self.connection.waitForJob(classifier._job) 245 classifier.downloadPredictions() 246 return classifier
247
248 - def optimize(self, examples, outDir, parameters, classifyExamples, classIds, step="BOTH", evaluator=None, determineThreshold=False, timeout=None, downloadAllModels=False):
249 assert step in ["BOTH", "SUBMIT", "RESULTS"], step 250 outDir = os.path.abspath(outDir) 251 # Initialize training (or reconnect to existing jobs) 252 combinations = Parameters.getCombinations(Parameters.get(parameters, valueListKey="c")) #Core.OptimizeParameters.getParameterCombinations(parameters) 253 trained = [] 254 for combination in combinations: 255 trained.append( self.train(examples, outDir, combination, classifyExamples, replaceRemoteExamples=(len(trained) == 0), dummy=(step == "RESULTS")) ) 256 if step == "SUBMIT": # Return already 257 classifier = copy.copy(self) 258 classifier.setState("OPTIMIZE") 259 return classifier 260 261 # Wait for the training to finish 262 finalJobStatus = self.connection.waitForJobs([x.getJob() for x in trained]) 263 # Evaluate the results 264 print >> sys.stderr, "Evaluating results" 265 #Stream.setIndent(" ") 266 bestResult = None 267 if evaluator == None: 268 evaluator = self.defaultEvaluator 269 for i in range(len(combinations)): 270 id = trained[i].parameterIdStr 271 #Stream.setIndent(" ") 272 # Get predictions 273 predictions = None 274 if trained[i].getStatus() == "FINISHED": 275 predictions = trained[i].downloadPredictions() 276 else: 277 print >> sys.stderr, "No results for combination" + id 278 continue 279 if downloadAllModels: 280 trained[i].downloadModel() 281 # Compare to other results 282 print >> sys.stderr, "*** Evaluating results for combination" + id + " ***" 283 threshold = None 284 if determineThreshold: 285 print >> sys.stderr, "Thresholding, original micro =", 286 evaluation = evaluator.evaluate(classifyExamples, predictions, classIds, os.path.join(outDir, "evaluation-before-threshold" + id + ".csv"), verbose=False) 287 print >> sys.stderr, evaluation.microF.toStringConcise() 288 threshold, bestF = evaluator.threshold(classifyExamples, predictions) 289 print >> sys.stderr, "threshold =", threshold, "at binary fscore", str(bestF)[0:6] 290 evaluation = evaluator.evaluate(classifyExamples, ExampleUtils.loadPredictions(predictions, threshold=threshold), classIds, os.path.join(outDir, "evaluation" + id + ".csv")) 291 if bestResult == None or evaluation.compare(bestResult[0]) > 0: #: averageResult.fScore > bestResult[1].fScore: 292 bestResult = [evaluation, trained[i], combinations[i], threshold] 293 if not self.connection.isLocal(): 294 os.remove(predictions) # remove predictions to save space 295 #Stream.setIndent() 296 print >> sys.stderr, "*** Evaluation complete", finalJobStatus, "***" 297 print >> sys.stderr, "Selected parameters", bestResult[2] 298 classifier = copy.copy(bestResult[1]) 299 classifier.threshold = bestResult[3] 300 classifier.downloadModel() 301 return classifier
302