1 import sys, os
2 import types
3
4 from UnixConnection import UnixConnection
5 import subprocess
6
8 - def __init__(self, account=None, workdir=None, settings=None, memory=None, cores=None, modules=None, wallTime=None):
9 UnixConnection.__init__(self, killGroup=False, account=account, workdir=workdir, settings=settings, memory=memory, cores=cores)
10 self.wallTime = wallTime
11 self.modules = modules
12 self.submitCommand = None
13 self.jobListCommand = None
14 self.jobTemplate = None
15
17 jobAttr = self._readJobFile(job)
18
19 if jobAttr == None:
20 return None
21
22 if "retcode" not in jobAttr:
23 return "QUEUED"
24 elif jobAttr["retcode"] == "0":
25 return "FINISHED"
26 else:
27 return "FAILED"
28
29 - def submit(self, script=None, jobDir=None, jobName=None, stdout=None, stderr=None):
30 prevStatus = self.getJobStatus(self._getJobPath(jobDir, jobName))
31 if self.resubmitOnlyFinished and prevStatus in ["RUNNING", "QUEUED"]:
32 raise Exception("Tried to resubmit a job whose current status is", prevStatus)
33
34 script = self._getScript(script, "\n")
35
36
37 script = self.makeJobScript(script, jobDir, jobName, stdout, stderr)
38 if self.account == None:
39 command = [self.submitCommand]
40 else:
41 command = ["ssh", self.account, "'" + self.submitCommand + "'"]
42 if self.debug:
43 print >> sys.stderr, "------- Job script -------"
44 print >> sys.stderr, script
45 print >> sys.stderr, "--------------------------"
46
47
48 self._writeJobFile(jobDir, jobName)
49 if self.debug:
50 print >> sys.stderr, "Submitting job", jobName, "with command", command
51 else:
52 print >> sys.stderr, "Submitting job", jobName
53 p = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
54 return p.communicate(input=script)
55
57 """
58 Get number of queued (pending or running) jobs
59 """
60 stdoutLines = self.run(self.jobListCommand + " | grep " + self.getUserName() + " | wc -l")
61 assert len(stdoutLines) == 1, stdoutLines
62 assert stdoutLines[0].strip().isdigit(), stdoutLines
63 return int(stdoutLines[0].strip())
64
65 - def getStreams(self, stdout, stderr, jobDir, jobName):
66 if stderr == None:
67 stderr = self._getJobPath(jobDir, jobName) + ".stderr"
68 stderr = self.getRemotePath(stderr)
69 if stdout == None:
70 stdout = self._getJobPath(jobDir, jobName) + ".stdout"
71 stdout = self.getRemotePath(stdout)
72 return stdout, stderr
73
74 - def makeJobScript(self, script, jobDir=None, jobName=None, stdout=None, stderr=None, wallTime=None, modules=None, cores=None, memory=None):
75 stdout, stderr = self.getStreams(stdout, stderr, jobDir, jobName)
76 if memory == None: memory = self.memory
77 if wallTime == None: wallTime = self.wallTime
78 if cores == None: cores = self.cores
79
80 template = self.jobTemplate
81 template = template.replace("%job", jobName)
82 template = template.replace("%stdoutDir", os.path.dirname(stdout))
83 template = template.replace("%stderrDir", os.path.dirname(stderr))
84 template = template.replace("%stdout", stdout)
85 template = template.replace("%stderr", stderr)
86 template = template.replace("%memory", str(memory))
87 template = template.replace("%wallTime", str(wallTime))
88 template = template.replace("%cores", str(cores))
89
90 commands = ""
91 if modules == None:
92 modules = self.modules
93 if modules != None:
94 for module in modules:
95 commands += "module load " + module + "\n"
96 if self.remoteSettingsPath != None:
97 commands += "export TEES_SETTINGS=" + self.remoteSettingsPath + "\n"
98 if jobDir != None:
99 commands += "mkdir -p " + self.getRemotePath(jobDir) + "\n"
100 commands += "cd " + self.getRemotePath(jobDir) + "\n\n"
101 commands += script + "\n"
102
103 commands += "echo retcode=$? >> " + self.getRemotePath(self._getJobPath(jobDir, jobName))
104
105 template = template.replace("%commands", commands)
106 assert "%" not in template, template
107 return template
108