Package TEES :: Package Utils :: Package Connection :: Module ClusterConnection
[hide private]

Source Code for Module TEES.Utils.Connection.ClusterConnection

  1  import sys, os 
  2  import types 
  3  #import uuid 
  4  from UnixConnection import UnixConnection 
  5  import subprocess 
  6   
7 -class ClusterConnection(UnixConnection):
8 - def __init__(self, account=None, workdir=None, settings=None, memory=None, cores=None, modules=None, wallTime=None):
9 UnixConnection.__init__(self, killGroup=False, account=account, workdir=workdir, settings=settings, memory=memory, cores=cores) 10 self.wallTime = wallTime 11 self.modules = modules 12 self.submitCommand = None 13 self.jobListCommand = None 14 self.jobTemplate = None
15
16 - def getJobStatus(self, job):
17 jobAttr = self._readJobFile(job) 18 # Check whether job exists 19 if jobAttr == None: 20 return None 21 22 if "retcode" not in jobAttr: 23 return "QUEUED" # could be also RUNNING, but without using the cluster-specific job list we can't know 24 elif jobAttr["retcode"] == "0": 25 return "FINISHED" 26 else: 27 return "FAILED"
28
29 - def submit(self, script=None, jobDir=None, jobName=None, stdout=None, stderr=None):
30 prevStatus = self.getJobStatus(self._getJobPath(jobDir, jobName)) 31 if self.resubmitOnlyFinished and prevStatus in ["RUNNING", "QUEUED"]: 32 raise Exception("Tried to resubmit a job whose current status is", prevStatus) 33 34 script = self._getScript(script, "\n") 35 #if name == None: 36 # name = uuid.uuid1().hex 37 script = self.makeJobScript(script, jobDir, jobName, stdout, stderr) 38 if self.account == None: 39 command = [self.submitCommand] 40 else: 41 command = ["ssh", self.account, "'" + self.submitCommand + "'"] 42 if self.debug: 43 print >> sys.stderr, "------- Job script -------" 44 print >> sys.stderr, script 45 print >> sys.stderr, "--------------------------" 46 # The job status file must be open before the job is submitted, so that the return code can be 47 # written to it. 48 self._writeJobFile(jobDir, jobName) 49 if self.debug: 50 print >> sys.stderr, "Submitting job", jobName, "with command", command 51 else: 52 print >> sys.stderr, "Submitting job", jobName 53 p = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT) 54 return p.communicate(input=script)
55
56 - def getNumJobs(self):
57 """ 58 Get number of queued (pending or running) jobs 59 """ 60 stdoutLines = self.run(self.jobListCommand + " | grep " + self.getUserName() + " | wc -l") 61 assert len(stdoutLines) == 1, stdoutLines 62 assert stdoutLines[0].strip().isdigit(), stdoutLines 63 return int(stdoutLines[0].strip())
64
65 - def getStreams(self, stdout, stderr, jobDir, jobName):
66 if stderr == None: 67 stderr = self._getJobPath(jobDir, jobName) + ".stderr" 68 stderr = self.getRemotePath(stderr) 69 if stdout == None: 70 stdout = self._getJobPath(jobDir, jobName) + ".stdout" 71 stdout = self.getRemotePath(stdout) 72 return stdout, stderr
73
74 - def makeJobScript(self, script, jobDir=None, jobName=None, stdout=None, stderr=None, wallTime=None, modules=None, cores=None, memory=None):
75 stdout, stderr = self.getStreams(stdout, stderr, jobDir, jobName) 76 if memory == None: memory = self.memory 77 if wallTime == None: wallTime = self.wallTime 78 if cores == None: cores = self.cores 79 80 template = self.jobTemplate 81 template = template.replace("%job", jobName) 82 template = template.replace("%stdoutDir", os.path.dirname(stdout)) 83 template = template.replace("%stderrDir", os.path.dirname(stderr)) 84 template = template.replace("%stdout", stdout) 85 template = template.replace("%stderr", stderr) 86 template = template.replace("%memory", str(memory)) 87 template = template.replace("%wallTime", str(wallTime)) 88 template = template.replace("%cores", str(cores)) 89 90 commands = "" 91 if modules == None: 92 modules = self.modules 93 if modules != None: 94 for module in modules: 95 commands += "module load " + module + "\n" 96 if self.remoteSettingsPath != None: # Use a specific configuration file 97 commands += "export TEES_SETTINGS=" + self.remoteSettingsPath + "\n" 98 if jobDir != None: 99 commands += "mkdir -p " + self.getRemotePath(jobDir) + "\n" # ensure output directory exists 100 commands += "cd " + self.getRemotePath(jobDir) + "\n\n" # move to output directory where the program will be run 101 commands += script + "\n" 102 # Store return value in job file 103 commands += "echo retcode=$? >> " + self.getRemotePath(self._getJobPath(jobDir, jobName)) 104 105 template = template.replace("%commands", commands) 106 assert "%" not in template, template 107 return template
108