1 import sys, os, shutil, types
2 import subprocess
3 import getpass
4 import time
5 import atexit, signal
6 sys.path.append(os.path.normpath(os.path.abspath(os.path.dirname(__file__))+"/../.."))
7 from Utils.Timer import Timer
8 import Utils.Settings as Settings
9 import Utils.Parameters as Parameters
10 import tempfile
11
13
14
15 - def __init__(self, account=None, workdir=None, settings=None, memory=None, cores=None, jobLimit=None, killGroup=True):
16 self.account = account
17 if memory == None:
18 memory = 4194304
19 self.memory = int(memory)
20 if cores == None:
21 cores = 1
22 self.cores = int(cores)
23
24 self.workDir = workdir
25
26
27
28 self.NOT_EXIST = "NOT_EXIST"
29 self.NONZERO = "NONZERO"
30 self.ZERO = "ZERO"
31
32
33 self.commands = []
34
35 self.compression = False
36 self.remoteSettingsPath = settings
37 self.cachedRemoteSettings = None
38 self._logs = {}
39 if jobLimit == None:
40 jobLimit = -1
41 self.jobLimit = int(jobLimit)
42 self.debug = False
43 self.resubmitOnlyFinished = True
44
45
46 if self.account == None and killGroup:
47
48 os.setpgrp()
49
50
51
52
54 if self.account == None:
55 print >> sys.stderr, "Local connection, remote directory not removed"
56 return
57 workSubDir = self.getRemotePath(subDir)
58 assert workSubDir != self.workDir, (self.workDir, subDir)
59 print >> sys.stderr, "Removing remote directory", workSubDir
60 self.run("rm -R " + workSubDir)
61
63 return self.account == None
64
66 if self.workDir != None:
67 path = os.path.normpath(self.workDir + "/" + os.path.abspath(path.split(":")[-1]))
68 if addAccount and self.account != None:
69 path = self.account + ":" + path
70 return path
71
73 localPath = os.path.abspath(path.split(":")[-1])
74 if self.workDir != None and localPath.startswith(self.workDir):
75 localPath = os.path.normpath("/" + localPath[len(self.workDir):])
76
77 return localPath
78
80 if self.account == None:
81 if hasattr(Settings, name):
82 return getattr(Settings, name)
83 else:
84 return None
85 elif self.cachedRemoteSettings == None:
86 self.cachedRemoteSettings = {}
87
88 if self.remoteSettingsPath == None:
89 rsp = self.run("echo $TEES_SETTINGS")
90 else:
91 assert self.remoteSettingsPath != None
92 rsp = self.remoteSettingsPath
93
94 print >> sys.stderr, "Reading remote TEES_SETTINGS from", self.account + ":" + rsp
95 tempdir = tempfile.mkdtemp()
96 self.scp(self.account + ":" + rsp, tempdir + "/RemoteSettings.py")
97
98
99
100 f = open(tempdir + "/RemoteSettings.py", "rt")
101 for line in f.readlines():
102 if "=" in line:
103 remoteName, remoteValue = line.split("=", 1)
104 remoteName = remoteName.strip()
105 remoteValue = remoteValue.strip().strip("\"")
106 self.cachedRemoteSettings[remoteName] = remoteValue
107 f.close()
108 shutil.rmtree(tempdir)
109
110 if name in self.cachedRemoteSettings:
111 return self.cachedRemoteSettings[name]
112 else:
113 return None
114
115
116
117
118
119
120
121
122
123
124
125
126
127
129 stdout = self.run("ls -lh " + filename, silent=True)
130 if len(stdout) > 0:
131 return True
132 else:
133 return False
134
136 stdout = self.run("mkdir -p " + dir)
137 if len(stdout) > 0:
138 return True
139 else:
140 return False
141
143 filePath = self.getRemotePath(filename)
144 if self.account == None:
145 if not os.path.exists(filePath):
146 return self.NOT_EXIST
147 elif os.path.getsize(filePath) == 0:
148 return self.ZERO
149 else:
150 return self.NONZERO
151 else:
152 lines = self.run("filetest -e " + filePath + "; filetest -z " + filePath)
153
154 if int(lines[0]) == 0:
155 return self.NOT_EXIST
156 if int(lines[1]) == 1:
157 return self.ZERO
158 else:
159 return self.NONZERO
160
161 - def scp(self, par1, par2, verbose="transfer"):
162 """
163 General scp command, par1 and par2 must be full paths, including machine name
164 """
165 account1 = None
166 if ":" in par1:
167 account1 = par1.split(":")[0]
168 account2 = None
169 if ":" in par2:
170 account2 = par2.split(":")[0]
171 if account1 == None and account2 == None:
172
173 dirPath = os.path.normpath(os.path.dirname(par2.split(":")[-1]))
174 if not os.path.exists(dirPath):
175 os.makedirs(dirPath)
176 if verbose != None:
177 print >> sys.stderr, verbose + "(local copy):", par1.split(":")[-1], par2.split(":")[-1]
178 shutil.copy2(par1.split(":")[-1], par2.split(":")[-1])
179 else:
180
181 print >> sys.stderr, verbose + ": scp " + par1 + " " + par2
182 if ":" in par2:
183 self.mkdir(os.path.dirname(par2.split(":")[-1]))
184 elif not os.path.exists(os.path.dirname(par2)):
185 os.makedirs(os.path.dirname(par2))
186 subprocess.call("scp " + par1 + " " + par2, shell=True)
187
188 - def upload(self, src, dst=None, replace=True, compress=False, uncompress=False):
189 """
190 Mirror a file from "SRC" to "ACCOUNT:WORKDIR/DST"
191 """
192 assert ":" not in src
193 if self.account == None:
194 return src
195 if dst == None:
196 dst = src
197 dst = self.getRemotePath(dst)
198
199 if replace == False and ( self.exists(dst) or (uncompress and dst.endswith(".gz") and self.exists(dst[:-3])) ):
200 if uncompress and dst.endswith(".gz"):
201 dst = dst.rsplit(".", 1)[0]
202 print >> sys.stderr, "Existing remote file", dst, "not overwritten"
203 return dst
204 else:
205 if (self.compression or compress) and not src.endswith(".gz"):
206 print >> sys.stderr, "Compressing " + src + ": ",
207 subprocess.call("gzip -fv < " + src + " > " + src + ".gz", shell=True)
208 src += ".gz"
209 dst += ".gz"
210 self.mkdir(os.path.dirname(dst))
211 self.scp(src, self.account + ":" + dst, verbose="upload")
212 if (self.compression or uncompress) and dst.endswith(".gz"):
213 self.run("gunzip -fv " + dst)
214 dst = dst.rsplit(".", 1)[0]
215 return dst
216
217 - def download(self, src, dst=None, replace=True, compress=False, uncompress=False):
218 """
219 Mirror a file from "ACCOUNT:WORKDIR/SRC" to "DST"
220 """
221
222 if ":" in src:
223 srcAccount, src = src.split(":")
224 assert self.account == srcAccount
225 else:
226 src = self.getRemotePath(src)
227
228 if dst == None:
229 dst = src
230 dst = self.getLocalPath(dst)
231 assert ":" not in dst
232 if self.account == None:
233 return dst
234
235 if replace == False and os.path.exists(dst):
236 return dst
237 else:
238 if (self.compression or compress) and not src.endswith(".gz"):
239 print >> sys.stderr, "Compressing " + src + ": ",
240 self.run("gzip < " + self.workDir + "/" + src + " > " + self.workDir + "/" + src + ".gz")
241 src = src + ".gz"
242 dst = dst + ".gz"
243 self.scp(self.account + ":" + src, dst, verbose="download")
244 if (self.compression or uncompress) and dst.endswith(".gz"):
245 subprocess.call("gunzip -f " + dst, shell=True)
246 dst = dst.rsplit(".", 1)[0]
247 return dst
248
249 - def run(self, script, chdirTo=None, silent=False):
250 """
251 Immediately run a command.
252 """
253 if chdirTo != None:
254 script = "cd " + chdirTo + " ; " + script
255 stderr = None
256 if silent:
257 stderr = subprocess.PIPE
258 if self.account == None:
259 p = subprocess.Popen(script, shell=True, stdout=subprocess.PIPE, stderr=stderr)
260 else:
261 p = subprocess.Popen("ssh " + self.account + " '" + script + "'", shell=True, stdout=subprocess.PIPE, stderr=stderr)
262 if silent:
263 p.stderr.readlines()
264 return p.stdout.readlines()
265
266 - def _getScript(self, script=None, joinString=" ; "):
267 if script == None:
268 script = joinString.join(self.commands)
269 self.commands = []
270 return script
271
273 self.commands.append(string)
274
277
278 - def getJob(self, jobDir, jobName):
280
282 return jobDir + "/" + jobName + ".job"
283
284 - def _writeJobFile(self, jobDir, jobName, attrDict={}, append=False):
285 jobPath = self.getRemotePath(self._getJobPath(jobDir, jobName))
286 jobFileText = ""
287 if not append:
288 jobFileText += "name=" + jobName + "\n"
289 assert not "name" in attrDict
290 for key in sorted(attrDict.keys()):
291 jobFileText += str(key) + "=" + str(attrDict[key]) + "\n"
292 if not os.path.exists(self.getRemotePath(jobDir)):
293 self.mkdir(self.getRemotePath(jobDir))
294 if append:
295 operator = ">>"
296 else:
297 operator = ">"
298 if self.account == None:
299 jobPopen = subprocess.Popen("cat " + operator + " " + jobPath, shell=True, stdin=subprocess.PIPE)
300 else:
301 jobPopen = subprocess.Popen("ssh " + self.account + " '" + "cat " + operator + " " + jobPath + "'", shell=True, stdin=subprocess.PIPE)
302 jobPopen.communicate(input=jobFileText)
303 return self._getJobPath(jobDir, jobName)
304
306 jobPath = self.getRemotePath(job)
307 if not self.exists(jobPath):
308 if self.debug:
309 print >> sys.stderr, "Job status file", jobPath, "does not exist"
310 return None
311 jobLines = self.run("cat " + jobPath)
312 if self.debug:
313 print >> sys.stderr, "Job status file", jobPath, "=", jobLines
314
315 attrDict = {}
316 for line in jobLines:
317 key, value = line.strip().split("=", 1)
318 assert key not in attrDict, (key, value, attrDict, jobLines)
319 attrDict[key] = value
320
321 return attrDict
322
323 - def submit(self, script=None, jobDir=None, jobName=None, stdout=None, stderr=None):
324 """
325 Queue a command.
326 """
327 if self.jobLimit != -1:
328 self.waitForJobCount(self.jobLimit)
329 script = self._getScript(script)
330 logFiles = [None, None]
331 if type(stdout) in types.StringTypes:
332 print >> sys.stderr, "Job", jobName + "'s stdout at local file", stdout
333 logFiles[0] = stdout = open(stdout, "wt")
334 if type(stderr) in types.StringTypes:
335 print >> sys.stderr, "Job", jobName + "'s stderr at local file", stderr
336 logFiles[1] = stderr = open(stderr, "wt")
337 if jobDir != None:
338 script = "cd " + jobDir + "; " + script
339 script += "; echo retcode=$? >> " + self.getRemotePath(self._getJobPath(jobDir, jobName))
340 if self.debug:
341 print >> sys.stderr, "------- Job script -------"
342 print >> sys.stderr, script
343 print >> sys.stderr, "--------------------------"
344 prevStatus = self.getJobStatus(self._getJobPath(jobDir, jobName))
345 if self.resubmitOnlyFinished and prevStatus == "RUNNING":
346 assert False, prevStatus
347
348
349 self._writeJobFile(jobDir, jobName)
350
351 if self.account == None:
352 jobPopen = subprocess.Popen(script, shell=True, stdout=stdout, stderr=stderr)
353 else:
354 jobPopen = subprocess.Popen("ssh " + self.account + " '" + script + "'", shell=True, stdout=stdout, stderr=stderr)
355
356
357 jobArgs = {"PID":jobPopen.pid, "time":time.time() + 10}
358 job = self._writeJobFile(jobDir, jobName, jobArgs, append=True)
359
360 if logFiles != [None, None]:
361 assert job not in self._logs
362 self._logs[job] = logFiles
363 print >> sys.stderr, "Submitted job", jobArgs["PID"], jobArgs["time"]
364 return job
365
367 if job in self._logs:
368 if self._logs[job][0] != None:
369 self._logs[job][0].close()
370 if self._logs[job][1] != None:
371 self._logs[job][1].close()
372 del self._logs[job]
373
375 if self.account != None:
376 return self.account.split("@")[0]
377 else:
378 return getpass.getuser()
379
381
382 stdoutLines = self.run("ps -u " + self.getUserName() + " -o ppid")
383 groupId = str(os.getpgrp())
384 numProcesses = 0
385 for line in stdoutLines:
386 if line.strip() == groupId:
387 numProcesses += 1
388 return numProcesses
389
390 - def waitForJobCount(self, targetCount=0, pollIntervalSeconds=60, verbose=True):
391 if targetCount == -1:
392 return
393 numJobs = self.getNumJobs()
394 if numJobs <= targetCount:
395 return
396 waitTimer = Timer()
397 while numJobs > targetCount:
398 print >> sys.stderr, "\rWaiting for " + str(numJobs) + " on " + accountName + " (limit=" + str(targetCount) + ")", waitTimer.elapsedTimeToString() + sleepString,
399 numJobs = self.getNumJobs()
400
401 - def waitForJob(self, job, pollIntervalSeconds=10):
402 while self.getJobStatus(job) not in ["FINISHED", "FAILED"]:
403 time.sleep(pollIntervalSeconds)
404
405 - def waitForJobs(self, jobs, pollIntervalSeconds=60, timeout=None, verbose=True):
406 print >> sys.stderr, "Waiting for results"
407 waitTimer = Timer()
408 while(True):
409 jobStatus = {"FINISHED":0, "QUEUED":0, "FAILED":0, "RUNNING":0}
410 for job in jobs:
411 jobStatus[self.getJobStatus(job)] += 1
412 jobStatusString = str(jobStatus["QUEUED"]) + " queued, " + str(jobStatus["RUNNING"]) + " running, " + str(jobStatus["FINISHED"]) + " finished, " + str(jobStatus["FAILED"]) + " failed"
413 if jobStatus["QUEUED"] + jobStatus["RUNNING"] == 0:
414 if verbose:
415 print >> sys.stderr, "\nAll runs done (" + jobStatusString + ")"
416 break
417
418 if timeout == None or timeoutTimer.getElapsedTime() < timeout:
419 sleepTimer = Timer()
420 accountName = self.account
421 if self.account == None:
422 accountName = "local"
423 if verbose:
424 sleepString = " [ ] "
425 print >> sys.stderr, "\rWaiting for " + str(len(jobs)) + " on " + accountName + "(" + jobStatusString + "),", waitTimer.elapsedTimeToString() + sleepString,
426 while sleepTimer.getElapsedTime() < pollIntervalSeconds:
427 if verbose:
428 steps = int(10 * sleepTimer.getElapsedTime() / pollIntervalSeconds) + 1
429 sleepString = " [" + steps * "." + (10-steps) * " " + "] "
430 print >> sys.stderr, "\rWaiting for " + str(len(jobs)) + " on " + accountName + "(" + jobStatusString + "),", waitTimer.elapsedTimeToString() + sleepString,
431 time.sleep(5)
432 else:
433 if verbose:
434 print >> sys.stderr, "\nTimed out, ", trainTimer.elapsedTimeToString()
435 break
436 return jobStatus
437
440
442
443 jobAttr = self._readJobFile(job)
444
445 if jobAttr == None:
446 return None
447
448 if "retcode" in jobAttr:
449 if jobAttr["retcode"] == "0":
450 self._closeLogs(job)
451 return "FINISHED"
452 else:
453 self._closeLogs(job)
454 return "FAILED"
455
456
457 jobAttr["time"] = float(jobAttr["time"])
458 currentTime = time.time()
459 processes = []
460 for line in self.run("ps -p " + jobAttr["PID"] + " -o etime, --no-heading"):
461 line = line.strip()
462 days = 0
463 if "-" in line:
464 days, line = line.split("-")
465 hours = 0
466 if line.count(":") == 2:
467 hours, minutes, seconds = line.split(":")
468 else:
469 assert line.count(":") == 1, line
470 minutes, seconds = line.split(":")
471 elapsedTime = int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60 + int(seconds)
472
473
474 if currentTime - elapsedTime <= jobAttr["time"]:
475 processes.append(jobAttr["PID"])
476 assert len(processes) <= 1
477 if len(processes) == 1:
478 return "RUNNING"
479 else:
480 self._closeLogs(job)
481 return "FAILED"
482
483 if __name__=="__main__":
484 c = CSCConnection("remoteTest", "jakrbj@louhi.csc.fi", True)
485 f = "/usr/share/biotext/Autumn2010/TriggerEdgeTest/TriggerEdge2TestDeterminismTest101103/uploadtest"
486 c.upload(f)
487 c.download(os.path.basename(f), "delme")
488
489
490
491
492