Package TEES :: Package Utils :: Package Connection :: Module UnixConnection
[hide private]

Source Code for Module TEES.Utils.Connection.UnixConnection

  1  import sys, os, shutil, types 
  2  import subprocess 
  3  import getpass 
  4  import time 
  5  import atexit, signal 
  6  sys.path.append(os.path.normpath(os.path.abspath(os.path.dirname(__file__))+"/../..")) 
  7  from Utils.Timer import Timer 
  8  import Utils.Settings as Settings 
  9  import Utils.Parameters as Parameters 
 10  import tempfile 
 11   
12 -class UnixConnection:
13 #programGroupSet = False 14
15 - def __init__(self, account=None, workdir=None, settings=None, memory=None, cores=None, jobLimit=None, killGroup=True):
16 self.account = account 17 if memory == None: 18 memory = 4194304 19 self.memory = int(memory) 20 if cores == None: 21 cores = 1 22 self.cores = int(cores) 23 #self.machineName = account.split("@")[-1] 24 self.workDir = workdir 25 #self._workDirBase = workDirBase 26 #self.setWorkDir("", False) 27 # State constants 28 self.NOT_EXIST = "NOT_EXIST" 29 self.NONZERO = "NONZERO" 30 self.ZERO = "ZERO" 31 32 # Batch command queue 33 self.commands = [] 34 35 self.compression = False #True 36 self.remoteSettingsPath = settings 37 self.cachedRemoteSettings = None 38 self._logs = {} 39 if jobLimit == None: 40 jobLimit = -1 41 self.jobLimit = int(jobLimit) 42 self.debug = False 43 self.resubmitOnlyFinished = True 44 45 # Make sure local processes are killed on exit 46 if self.account == None and killGroup: # and not UnixConnection.programGroupSet: 47 #UnixConnection.programGroupSet = True 48 os.setpgrp() # All child processes from subprocess should be in this group
49 # atexit.register(os.killpg, 0, signal.SIGKILL) 50 # #if jobLimit == None: # limit parallel processes on a local account 51 # # self.jobLimit = 1 52
53 - def clearWorkDir(self, subDir=""):
54 if self.account == None: 55 print >> sys.stderr, "Local connection, remote directory not removed" 56 return 57 workSubDir = self.getRemotePath(subDir) 58 assert workSubDir != self.workDir, (self.workDir, subDir) # prevent removal of the whole remote work directory 59 print >> sys.stderr, "Removing remote directory", workSubDir 60 self.run("rm -R " + workSubDir)
61
62 - def isLocal(self):
63 return self.account == None
64
65 - def getRemotePath(self, path, addAccount=False):
66 if self.workDir != None: # a working directory has been set 67 path = os.path.normpath(self.workDir + "/" + os.path.abspath(path.split(":")[-1])) 68 if addAccount and self.account != None: # this connection refers to a remote machine 69 path = self.account + ":" + path 70 return path
71
72 - def getLocalPath(self, path):
73 localPath = os.path.abspath(path.split(":")[-1]) 74 if self.workDir != None and localPath.startswith(self.workDir): # remote work directory path 75 localPath = os.path.normpath("/" + localPath[len(self.workDir):]) 76 #assert not localPath.startswith(self.workDir), (path, localPath) # check for duplicates 77 return localPath
78
79 - def getSetting(self, name):
80 if self.account == None: 81 if hasattr(Settings, name): 82 return getattr(Settings, name) 83 else: 84 return None 85 elif self.cachedRemoteSettings == None: 86 self.cachedRemoteSettings = {} 87 # Determine location of remote TEES_SETTINGS 88 if self.remoteSettingsPath == None: # not yet known, so look for environment variable 89 rsp = self.run("echo $TEES_SETTINGS") 90 else: # download from defined location 91 assert self.remoteSettingsPath != None 92 rsp = self.remoteSettingsPath 93 # Download settings to local computer 94 print >> sys.stderr, "Reading remote TEES_SETTINGS from", self.account + ":" + rsp 95 tempdir = tempfile.mkdtemp() 96 self.scp(self.account + ":" + rsp, tempdir + "/RemoteSettings.py") 97 # Read remote settings as a text file (limited to simple variables) 98 # I guess it could also be evaluated as Python, but it may contain code 99 # dependent on the remote environment. 100 f = open(tempdir + "/RemoteSettings.py", "rt") 101 for line in f.readlines(): 102 if "=" in line: 103 remoteName, remoteValue = line.split("=", 1) 104 remoteName = remoteName.strip() 105 remoteValue = remoteValue.strip().strip("\"") 106 self.cachedRemoteSettings[remoteName] = remoteValue 107 f.close() 108 shutil.rmtree(tempdir) 109 # Return the remote value 110 if name in self.cachedRemoteSettings: 111 return self.cachedRemoteSettings[name] 112 else: 113 return None
114 115 # def setWorkDir(self, workDir, delete=False): 116 # if self._workDirBase == None: 117 # self.workDir = None 118 # return 119 # self.workDir = os.path.normpath(self._workDirBase + "/" + workDir) 120 # if delete: 121 # if self.workDir == self._workDirBase: 122 # print >> sys.stderr, "No workdir defined" 123 # else: 124 # print "Removing", self.__name__, "work directory", self.workDir, "(if it exists)" 125 # self.run("rm -fr " + self.workDir) 126 # self.run("mkdir -p " + self.workDir) 127
128 - def exists(self, filename):
129 stdout = self.run("ls -lh " + filename, silent=True) 130 if len(stdout) > 0: 131 return True 132 else: 133 return False
134
135 - def mkdir(self, dir):
136 stdout = self.run("mkdir -p " + dir) 137 if len(stdout) > 0: 138 return True 139 else: 140 return False
141
142 - def getFileStatus(self, filename):
143 filePath = self.getRemotePath(filename) 144 if self.account == None: 145 if not os.path.exists(filePath): 146 return self.NOT_EXIST 147 elif os.path.getsize(filePath) == 0: 148 return self.ZERO 149 else: 150 return self.NONZERO 151 else: 152 lines = self.run("filetest -e " + filePath + "; filetest -z " + filePath) 153 #assert len(lines) == 2 154 if int(lines[0]) == 0: 155 return self.NOT_EXIST 156 if int(lines[1]) == 1: 157 return self.ZERO 158 else: 159 return self.NONZERO
160
161 - def scp(self, par1, par2, verbose="transfer"):
162 """ 163 General scp command, par1 and par2 must be full paths, including machine name 164 """ 165 account1 = None 166 if ":" in par1: 167 account1 = par1.split(":")[0] 168 account2 = None 169 if ":" in par2: 170 account2 = par2.split(":")[0] 171 if account1 == None and account2 == None: 172 # local copy 173 dirPath = os.path.normpath(os.path.dirname(par2.split(":")[-1])) 174 if not os.path.exists(dirPath): 175 os.makedirs(dirPath) 176 if verbose != None: 177 print >> sys.stderr, verbose + "(local copy):", par1.split(":")[-1], par2.split(":")[-1] 178 shutil.copy2(par1.split(":")[-1], par2.split(":")[-1]) 179 else: 180 # remote copy 181 print >> sys.stderr, verbose + ": scp " + par1 + " " + par2 182 if ":" in par2: 183 self.mkdir(os.path.dirname(par2.split(":")[-1])) 184 elif not os.path.exists(os.path.dirname(par2)): 185 os.makedirs(os.path.dirname(par2)) 186 subprocess.call("scp " + par1 + " " + par2, shell=True)
187
188 - def upload(self, src, dst=None, replace=True, compress=False, uncompress=False):
189 """ 190 Mirror a file from "SRC" to "ACCOUNT:WORKDIR/DST" 191 """ 192 assert ":" not in src # must be a local file 193 if self.account == None: # local connection... 194 return src # ...no upload required 195 if dst == None: # default upload location 196 dst = src 197 dst = self.getRemotePath(dst) 198 199 if replace == False and ( self.exists(dst) or (uncompress and dst.endswith(".gz") and self.exists(dst[:-3])) ): 200 if uncompress and dst.endswith(".gz"): # has been uncompressed already 201 dst = dst.rsplit(".", 1)[0] 202 print >> sys.stderr, "Existing remote file", dst, "not overwritten" 203 return dst 204 else: 205 if (self.compression or compress) and not src.endswith(".gz"): 206 print >> sys.stderr, "Compressing " + src + ": ", 207 subprocess.call("gzip -fv < " + src + " > " + src + ".gz", shell=True) 208 src += ".gz" 209 dst += ".gz" 210 self.mkdir(os.path.dirname(dst)) 211 self.scp(src, self.account + ":" + dst, verbose="upload") 212 if (self.compression or uncompress) and dst.endswith(".gz"): 213 self.run("gunzip -fv " + dst) 214 dst = dst.rsplit(".", 1)[0] 215 return dst
216
217 - def download(self, src, dst=None, replace=True, compress=False, uncompress=False):
218 """ 219 Mirror a file from "ACCOUNT:WORKDIR/SRC" to "DST" 220 """ 221 # Determine src path 222 if ":" in src: # src is a full pathname, with a machine name 223 srcAccount, src = src.split(":") 224 assert self.account == srcAccount # check that the accoutn corresponds to this connection 225 else: # src is a remote path relative to remote workdir 226 src = self.getRemotePath(src) 227 # Determine dst path 228 if dst == None: # default download location 229 dst = src 230 dst = self.getLocalPath(dst) 231 assert ":" not in dst # must be a local file 232 if self.account == None: # local connection ... 233 return dst # ... no download required 234 235 if replace == False and os.path.exists(dst): 236 return dst # already downloaded 237 else: # download 238 if (self.compression or compress) and not src.endswith(".gz"): 239 print >> sys.stderr, "Compressing " + src + ": ", 240 self.run("gzip < " + self.workDir + "/" + src + " > " + self.workDir + "/" + src + ".gz") 241 src = src + ".gz" 242 dst = dst + ".gz" 243 self.scp(self.account + ":" + src, dst, verbose="download") 244 if (self.compression or uncompress) and dst.endswith(".gz"): 245 subprocess.call("gunzip -f " + dst, shell=True) 246 dst = dst.rsplit(".", 1)[0] 247 return dst
248
249 - def run(self, script, chdirTo=None, silent=False):
250 """ 251 Immediately run a command. 252 """ 253 if chdirTo != None: 254 script = "cd " + chdirTo + " ; " + script 255 stderr = None 256 if silent: 257 stderr = subprocess.PIPE 258 if self.account == None: # a local process 259 p = subprocess.Popen(script, shell=True, stdout=subprocess.PIPE, stderr=stderr) 260 else: 261 p = subprocess.Popen("ssh " + self.account + " '" + script + "'", shell=True, stdout=subprocess.PIPE, stderr=stderr) 262 if silent: 263 p.stderr.readlines() 264 return p.stdout.readlines()
265
266 - def _getScript(self, script=None, joinString=" ; "):
267 if script == None: # use command buffer 268 script = joinString.join(self.commands) 269 self.commands = [] 270 return script
271
272 - def addCommand(self, string):
273 self.commands.append(string)
274
275 - def clearCommands(self):
276 self.commands = []
277
278 - def getJob(self, jobDir, jobName):
279 return self._getJobPath(jobDir, jobName)
280
281 - def _getJobPath(self, jobDir, jobName):
282 return jobDir + "/" + jobName + ".job"
283
284 - def _writeJobFile(self, jobDir, jobName, attrDict={}, append=False):
285 jobPath = self.getRemotePath(self._getJobPath(jobDir, jobName)) 286 jobFileText = "" 287 if not append: 288 jobFileText += "name=" + jobName + "\n" 289 assert not "name" in attrDict 290 for key in sorted(attrDict.keys()): 291 jobFileText += str(key) + "=" + str(attrDict[key]) + "\n" 292 if not os.path.exists(self.getRemotePath(jobDir)): 293 self.mkdir(self.getRemotePath(jobDir)) 294 if append: 295 operator = ">>" 296 else: 297 operator = ">" 298 if self.account == None: # a local process 299 jobPopen = subprocess.Popen("cat " + operator + " " + jobPath, shell=True, stdin=subprocess.PIPE) 300 else: 301 jobPopen = subprocess.Popen("ssh " + self.account + " '" + "cat " + operator + " " + jobPath + "'", shell=True, stdin=subprocess.PIPE) 302 jobPopen.communicate(input=jobFileText) 303 return self._getJobPath(jobDir, jobName)
304
305 - def _readJobFile(self, job):
306 jobPath = self.getRemotePath(job) 307 if not self.exists(jobPath): 308 if self.debug: 309 print >> sys.stderr, "Job status file", jobPath, "does not exist" 310 return None 311 jobLines = self.run("cat " + jobPath) 312 if self.debug: 313 print >> sys.stderr, "Job status file", jobPath, "=", jobLines 314 #localJobFile = open(self.download(job), "rt") 315 attrDict = {} 316 for line in jobLines: #localJobFile: 317 key, value = line.strip().split("=", 1) 318 assert key not in attrDict, (key, value, attrDict, jobLines) 319 attrDict[key] = value 320 #localJobFile.close() 321 return attrDict
322
323 - def submit(self, script=None, jobDir=None, jobName=None, stdout=None, stderr=None):
324 """ 325 Queue a command. 326 """ 327 if self.jobLimit != -1: 328 self.waitForJobCount(self.jobLimit) 329 script = self._getScript(script) 330 logFiles = [None, None] 331 if type(stdout) in types.StringTypes: 332 print >> sys.stderr, "Job", jobName + "'s stdout at local file", stdout 333 logFiles[0] = stdout = open(stdout, "wt") 334 if type(stderr) in types.StringTypes: 335 print >> sys.stderr, "Job", jobName + "'s stderr at local file", stderr 336 logFiles[1] = stderr = open(stderr, "wt") 337 if jobDir != None: 338 script = "cd " + jobDir + "; " + script 339 script += "; echo retcode=$? >> " + self.getRemotePath(self._getJobPath(jobDir, jobName)) # store return value 340 if self.debug: 341 print >> sys.stderr, "------- Job script -------" 342 print >> sys.stderr, script 343 print >> sys.stderr, "--------------------------" 344 prevStatus = self.getJobStatus(self._getJobPath(jobDir, jobName)) 345 if self.resubmitOnlyFinished and prevStatus == "RUNNING": 346 assert False, prevStatus 347 # The job status file must be open before the job is submitted, so that the return code can be 348 # written to it. 349 self._writeJobFile(jobDir, jobName) 350 # Submit the job 351 if self.account == None: # a local process 352 jobPopen = subprocess.Popen(script, shell=True, stdout=stdout, stderr=stderr) 353 else: 354 jobPopen = subprocess.Popen("ssh " + self.account + " '" + script + "'", shell=True, stdout=stdout, stderr=stderr) 355 # The 'time' attribute marks a time after the program has started. When checking for the PID, 356 # only those programs whose STIME < 'time' are considered. 357 jobArgs = {"PID":jobPopen.pid, "time":time.time() + 10} 358 job = self._writeJobFile(jobDir, jobName, jobArgs, append=True) 359 # Keep track of log files so they can be closed 360 if logFiles != [None, None]: 361 assert job not in self._logs 362 self._logs[job] = logFiles 363 print >> sys.stderr, "Submitted job", jobArgs["PID"], jobArgs["time"] 364 return job
365
366 - def _closeLogs(self, job):
367 if job in self._logs: 368 if self._logs[job][0] != None: 369 self._logs[job][0].close() 370 if self._logs[job][1] != None: 371 self._logs[job][1].close() 372 del self._logs[job]
373
374 - def getUserName(self):
375 if self.account != None: 376 return self.account.split("@")[0] 377 else: 378 return getpass.getuser() #os.getlogin()
379
380 - def getNumJobs(self, includeQueued=True):
381 #stdoutLines = self.run("ps -u " + self.getUserName()) 382 stdoutLines = self.run("ps -u " + self.getUserName() + " -o ppid") 383 groupId = str(os.getpgrp()) 384 numProcesses = 0 385 for line in stdoutLines: 386 if line.strip() == groupId: 387 numProcesses += 1 388 return numProcesses
389
390 - def waitForJobCount(self, targetCount=0, pollIntervalSeconds=60, verbose=True):
391 if targetCount == -1: 392 return 393 numJobs = self.getNumJobs() 394 if numJobs <= targetCount: 395 return 396 waitTimer = Timer() 397 while numJobs > targetCount: 398 print >> sys.stderr, "\rWaiting for " + str(numJobs) + " on " + accountName + " (limit=" + str(targetCount) + ")", waitTimer.elapsedTimeToString() + sleepString, 399 numJobs = self.getNumJobs()
400
401 - def waitForJob(self, job, pollIntervalSeconds=10):
402 while self.getJobStatus(job) not in ["FINISHED", "FAILED"]: 403 time.sleep(pollIntervalSeconds)
404
405 - def waitForJobs(self, jobs, pollIntervalSeconds=60, timeout=None, verbose=True):
406 print >> sys.stderr, "Waiting for results" 407 waitTimer = Timer() 408 while(True): 409 jobStatus = {"FINISHED":0, "QUEUED":0, "FAILED":0, "RUNNING":0} 410 for job in jobs: 411 jobStatus[self.getJobStatus(job)] += 1 412 jobStatusString = str(jobStatus["QUEUED"]) + " queued, " + str(jobStatus["RUNNING"]) + " running, " + str(jobStatus["FINISHED"]) + " finished, " + str(jobStatus["FAILED"]) + " failed" 413 if jobStatus["QUEUED"] + jobStatus["RUNNING"] == 0: 414 if verbose: 415 print >> sys.stderr, "\nAll runs done (" + jobStatusString + ")" 416 break 417 # decide what to do 418 if timeout == None or timeoutTimer.getElapsedTime() < timeout: 419 sleepTimer = Timer() 420 accountName = self.account 421 if self.account == None: 422 accountName = "local" 423 if verbose: 424 sleepString = " [ ] " 425 print >> sys.stderr, "\rWaiting for " + str(len(jobs)) + " on " + accountName + "(" + jobStatusString + "),", waitTimer.elapsedTimeToString() + sleepString, 426 while sleepTimer.getElapsedTime() < pollIntervalSeconds: 427 if verbose: 428 steps = int(10 * sleepTimer.getElapsedTime() / pollIntervalSeconds) + 1 429 sleepString = " [" + steps * "." + (10-steps) * " " + "] " 430 print >> sys.stderr, "\rWaiting for " + str(len(jobs)) + " on " + accountName + "(" + jobStatusString + "),", waitTimer.elapsedTimeToString() + sleepString, 431 time.sleep(5) 432 else: 433 if verbose: 434 print >> sys.stderr, "\nTimed out, ", trainTimer.elapsedTimeToString() 435 break 436 return jobStatus
437
438 - def getJobStatusByName(self, jobDir, jobName):
439 return self.getJobStatus(self._getJobPath(jobDir, jobName))
440
441 - def getJobStatus(self, job):
442 # Get jobfile 443 jobAttr = self._readJobFile(job) 444 # Check whether job exists 445 if jobAttr == None: 446 return None 447 # Check for a finished process 448 if "retcode" in jobAttr: 449 if jobAttr["retcode"] == "0": 450 self._closeLogs(job) 451 return "FINISHED" 452 else: 453 self._closeLogs(job) 454 return "FAILED" 455 456 # Check for a running process 457 jobAttr["time"] = float(jobAttr["time"]) 458 currentTime = time.time() 459 processes = [] 460 for line in self.run("ps -p " + jobAttr["PID"] + " -o etime, --no-heading"): 461 line = line.strip() 462 days = 0 463 if "-" in line: 464 days, line = line.split("-") 465 hours = 0 466 if line.count(":") == 2: 467 hours, minutes, seconds = line.split(":") 468 else: 469 assert line.count(":") == 1, line 470 minutes, seconds = line.split(":") 471 elapsedTime = int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60 + int(seconds) 472 #print days, hours, minutes, seconds 473 #print elapsedTime, currentTime - elapsedTime, jobAttr["time"] 474 if currentTime - elapsedTime <= jobAttr["time"]: # skip processes started after submit time (won't work with stopped processes) 475 processes.append(jobAttr["PID"]) 476 assert len(processes) <= 1 477 if len(processes) == 1: 478 return "RUNNING" 479 else: 480 self._closeLogs(job) 481 return "FAILED" # failed without writing return code
482 483 if __name__=="__main__": 484 c = CSCConnection("remoteTest", "jakrbj@louhi.csc.fi", True) 485 f = "/usr/share/biotext/Autumn2010/TriggerEdgeTest/TriggerEdge2TestDeterminismTest101103/uploadtest" 486 c.upload(f) 487 c.download(os.path.basename(f), "delme") 488 #c.test() 489 #c.exists("merg-info.py") 490 #classifier.trainAndTestOnLouhi("trigger-train-examples", "trigger-test-examples", {"c":1000}, c) 491 #print c.exists("trigger-test-examples") 492