1 import sys,os
2 sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/..")
3 import shutil, tempfile
4 import subprocess
5 import Core.ExampleUtils as ExampleUtils
6 import Utils.Libraries.combine as combine
7 import copy
8 import tempfile
9 import atexit
10 import gzip
11 import types, copy
12 from Classifier import Classifier
13 import Utils.Parameters as Parameters
14 import Utils.Settings as Settings
15 import Utils.Connection.Connection as Connection
16 from Utils.Connection.UnixConnection import UnixConnection
23
25 """
26 A wrapper for external classifier executables.
27 """
28
30 self.defaultEvaluator = None
31 if connection == None:
32 self.connection = UnixConnection()
33 else:
34 self.connection = connection
35 self.parameterGrid = None
36 self.state = None
37 self._job = None
38 self._prevJobStatus = None
39 self._filesToRelease = []
40
41 self.parameters = None
42 self.model = None
43 self.predictions = None
44
45 self.parameterFormat = "-%k %v"
46 self.trainDirSetting = None
47 self.trainCommand = None
48 self.classifyDirSetting = None
49 self.classifyCommand = None
50
53
55 if self._job != None:
56 self._prevJobStatus = self.connection.getJobStatus(self._job)
57 if self._prevJobStatus in ["FINISHED", "FAILED"]:
58 self.state = None
59 self._job = None
60 for filename in self._filesToRelease:
61 ExternalClassifier.getFileCounter(filename, add=-1, createIfNotExist=False)
62 self._filesToRelease = []
63 if self._prevJobStatus == None:
64 return "FINISHED"
65 else:
66 return self._prevJobStatus
67
69 assert self.getStatus() in ["FINISHED", "FAILED"]
70 self.state = stateName
71 self._job = None
72 self._prevJobStatus = None
73 if stateName == "TRAIN" or stateName == "OPTIMIZE":
74 self.model = None
75 self.parameters = None
76
77 self.predictions = None
78
79
80 @classmethod
82 """
83 Temporarily uncompress a file, usually a compressed example file. The uncompressed
84 file appears in the same location as the original file. The /tmp directory is
85 as these examples are usually used by a classifier that is run in separate process,
86 which on clusters might end up on a different node, where the local /tmp is no
87 longer accessible.
88 """
89 if not filename.endswith(".gz"):
90 return filename
91 tempfilename = filename[:-3] + "-unzipped-temp"
92
93 uncompress = False
94 if os.path.exists(tempfilename):
95 if os.path.getmtime(filename) > os.path.getmtime(tempfilename):
96 uncompress = True
97 else:
98 uncompress = True
99
100 if uncompress:
101 print >> sys.stderr, "Uncompressing example file", filename
102 subprocess.call("gunzip -cfv " + filename + " > " + tempfilename, shell=True)
103 assert os.path.exists(filename)
104 assert os.path.exists(tempfilename)
105 atexit.register(removeTempUnzipped, tempfilename)
106 return tempfilename
107
108 @classmethod
109 - def getFileCounter(cls, filename, add=0, createIfNotExist=False, removeIfZero=False):
110 """
111 Keep track of the number of users on a temporary file
112 """
113 filename += "-counter"
114 count = 0
115 if os.path.exists(filename):
116 f = open(filename, "rt")
117 lines = f.readlines()
118 f.close()
119 assert len(lines) == 1, filename
120 count = int(lines[0])
121 elif not createIfNotExist:
122 return None
123 count += add
124 if count < 0:
125 count = 0
126 if removeIfZero and count == 0 and os.path.exists(filename):
127 os.remove(filename)
128 else:
129 f = open(filename, "wt")
130 f.write(str(count))
131 f.close()
132 return count
133
134 - def getExampleFile(self, examples, upload=True, replaceRemote=True, dummy=False):
135
136 if examples == None:
137 return None
138 if dummy:
139 return "DUMMY"
140 elif type(examples) == types.ListType:
141 assert False
142
143 else:
144 examplesPath = os.path.normpath(os.path.abspath(examples))
145
146 localPath = examplesPath
147 if upload:
148 examplesPath = self.connection.upload(examplesPath, uncompress=True, replace=replaceRemote)
149 if examplesPath == localPath and examplesPath.endswith(".gz"):
150 examplesPath = ExternalClassifier.getUnzipped(examplesPath)
151 ExternalClassifier.getFileCounter(examplesPath, 1, createIfNotExist=True)
152 print >> sys.stderr, self.__class__.__name__, "using example file", examples, "as", examplesPath
153 return examplesPath
154
155 - def train(self, examples, outDir, parameters, classifyExamples=None, finishBeforeReturn=False, replaceRemoteExamples=True, dummy=False):
156 outDir = os.path.abspath(outDir)
157
158 examples = self.getExampleFile(examples, replaceRemote=replaceRemoteExamples, dummy=dummy)
159 classifyExamples = self.getExampleFile(classifyExamples, replaceRemote=replaceRemoteExamples, dummy=dummy)
160 parameters = Parameters.get(parameters, valueListKey="c")
161 trainDir = self.connection.getSetting(self.trainDirSetting)
162
163
164 classifier = copy.copy(self)
165 classifier.setState("TRAIN")
166 classifier.parameters = parameters
167 classifier._filesToRelease = [examples, classifyExamples]
168
169 if not os.path.exists(outDir):
170 os.makedirs(outDir)
171 trainCommand = os.path.join(trainDir, self.trainCommand)
172 paramKeys = sorted(parameters.keys())
173 idStr = ""
174 paramString = ""
175 for key in paramKeys:
176 if key.startswith("TEES."):
177 continue
178 if len(paramString) > 0 and not paramString.endswith(" "):
179 paramString += " "
180 if parameters[key] != None:
181 paramString += self.parameterFormat.replace("%k", key).replace("%v", str(parameters[key])).strip()
182 idStr += "-" + str(key) + "_" + str(parameters[key])
183 else:
184 paramString += self.parameterFormat.replace("%k", key).replace("%v", "").strip()
185 idStr += "-" + str(key)
186 classifier.parameterIdStr = idStr
187 classifier.model = self.connection.getRemotePath(outDir + "/model" + idStr, True)
188 modelPath = self.connection.getRemotePath(outDir + "/model" + idStr, False)
189 trainCommand = trainCommand.replace("%p", paramString).replace("%e", examples).replace("%m", modelPath).strip()
190 self.connection.addCommand(trainCommand)
191
192 if classifyExamples != None:
193 classifier.predictions = self.connection.getRemotePath(outDir + "/predictions" + idStr, True)
194 predictionsPath = self.connection.getRemotePath(outDir + "/predictions" + idStr, False)
195 classifyDir = self.connection.getSetting(self.classifyDirSetting)
196 classifyCommand = os.path.join(classifyDir, self.classifyCommand).replace("%e", classifyExamples).replace("%m", modelPath).replace("%c", predictionsPath).strip()
197 self.connection.addCommand(classifyCommand)
198
199 jobName = self.trainCommand.split()[0] + idStr
200 logPath = outDir + "/" + jobName
201 if dummy:
202 self.connection.clearCommands()
203 classifier._job = self.connection.getJob(jobDir=outDir, jobName=jobName)
204 else:
205 classifier._job = self.connection.submit(jobDir=outDir, jobName=jobName, stdout=logPath+".stdout")
206 if finishBeforeReturn:
207 self.connection.waitForJob(classifier._job)
208 self.getStatus()
209 return classifier
210
212 assert self.getStatus() == "FINISHED" and self.model != None
213 self.model = self.connection.download(self.model, outPath)
214 if breakConnection:
215 self.connection = UnixConnection()
216 return self.model
217
219 assert self.getStatus() == "FINISHED" and self.predictions != None
220 self.predictions = self.connection.download(self.predictions, outPath)
221 return self.predictions
222
223 - def classify(self, examples, output, model=None, finishBeforeReturn=False, replaceRemoteFiles=True):
224 output = os.path.abspath(output)
225
226 classifier = copy.copy(self)
227 classifier.setState("CLASSIFY")
228
229 if model == None:
230 classifier.model = model = self.model
231 model = os.path.abspath(model)
232 model = self.connection.upload(model, uncompress=True, replace=replaceRemoteFiles)
233 classifier.predictions = self.connection.getRemotePath(output, True)
234 predictionsPath = self.connection.getRemotePath(output, False)
235 examples = self.getExampleFile(examples, replaceRemote=replaceRemoteFiles)
236 classifier._filesToRelease = [examples]
237 self.connection.clearCommands()
238 classifyDir = self.connection.getSetting(self.classifyDirSetting)
239 classifyCommand = os.path.join(classifyDir, self.classifyCommand).replace("%e", examples).replace("%m", model).replace("%c", predictionsPath).strip()
240 self.connection.addCommand(classifyCommand)
241 classifier._job = self.connection.submit(jobDir=os.path.abspath(os.path.dirname(output)),
242 jobName=self.classifyCommand.split()[0] + "-" + os.path.basename(model))
243 if finishBeforeReturn:
244 self.connection.waitForJob(classifier._job)
245 classifier.downloadPredictions()
246 return classifier
247
248 - def optimize(self, examples, outDir, parameters, classifyExamples, classIds, step="BOTH", evaluator=None, determineThreshold=False, timeout=None, downloadAllModels=False):
249 assert step in ["BOTH", "SUBMIT", "RESULTS"], step
250 outDir = os.path.abspath(outDir)
251
252 combinations = Parameters.getCombinations(Parameters.get(parameters, valueListKey="c"))
253 trained = []
254 for combination in combinations:
255 trained.append( self.train(examples, outDir, combination, classifyExamples, replaceRemoteExamples=(len(trained) == 0), dummy=(step == "RESULTS")) )
256 if step == "SUBMIT":
257 classifier = copy.copy(self)
258 classifier.setState("OPTIMIZE")
259 return classifier
260
261
262 finalJobStatus = self.connection.waitForJobs([x.getJob() for x in trained])
263
264 print >> sys.stderr, "Evaluating results"
265
266 bestResult = None
267 if evaluator == None:
268 evaluator = self.defaultEvaluator
269 for i in range(len(combinations)):
270 id = trained[i].parameterIdStr
271
272
273 predictions = None
274 if trained[i].getStatus() == "FINISHED":
275 predictions = trained[i].downloadPredictions()
276 else:
277 print >> sys.stderr, "No results for combination" + id
278 continue
279 if downloadAllModels:
280 trained[i].downloadModel()
281
282 print >> sys.stderr, "*** Evaluating results for combination" + id + " ***"
283 threshold = None
284 if determineThreshold:
285 print >> sys.stderr, "Thresholding, original micro =",
286 evaluation = evaluator.evaluate(classifyExamples, predictions, classIds, os.path.join(outDir, "evaluation-before-threshold" + id + ".csv"), verbose=False)
287 print >> sys.stderr, evaluation.microF.toStringConcise()
288 threshold, bestF = evaluator.threshold(classifyExamples, predictions)
289 print >> sys.stderr, "threshold =", threshold, "at binary fscore", str(bestF)[0:6]
290 evaluation = evaluator.evaluate(classifyExamples, ExampleUtils.loadPredictions(predictions, threshold=threshold), classIds, os.path.join(outDir, "evaluation" + id + ".csv"))
291 if bestResult == None or evaluation.compare(bestResult[0]) > 0:
292 bestResult = [evaluation, trained[i], combinations[i], threshold]
293 if not self.connection.isLocal():
294 os.remove(predictions)
295
296 print >> sys.stderr, "*** Evaluation complete", finalJobStatus, "***"
297 print >> sys.stderr, "Selected parameters", bestResult[2]
298 classifier = copy.copy(bestResult[1])
299 classifier.threshold = bestResult[3]
300 classifier.downloadModel()
301 return classifier
302