Package TEES :: Package Utils :: Package Connection :: Module SLURMConnection
[hide private]

Source Code for Module TEES.Utils.Connection.SLURMConnection

  1  import sys, os 
  2  import types 
  3  #import uuid 
  4  import subprocess 
  5  from ClusterConnection import ClusterConnection 
  6   
  7  SLURMJobTemplate = """#!/bin/bash -l  
  8  ##execution shell environment  
  9   
 10  ## name of your job 
 11  #SBATCH -J %job 
 12  ## system error message output file 
 13  #SBATCH -e %stderr 
 14  ## system message output file 
 15  #SBATCH -o %stdout 
 16  ## a per-process (soft) memory limit 
 17  ## limit is specified in MB 
 18  ## example: 1 GB is 1000 
 19  #SBATCH --mem-per-cpu=%memory 
 20  ## how long a job takes, wallclock time hh:mm:ss 
 21  #SBATCH -t %wallTime 
 22  ## number of processes 
 23  #SBATCH -n %cores 
 24   
 25  mkdir -p %stderrDir 
 26  mkdir -p %stdoutDir 
 27   
 28  %commands""" 
 29   
30 -class SLURMConnection(ClusterConnection):
31 """ 32 For using the Simple Linux Utility for Resource Management (https://computing.llnl.gov/linux/slurm/). 33 """
34 - def __init__(self, account=None, workdir=None, settings=None, wallTime=None, memory=None, cores=None, modules=None):
35 if wallTime == None: 36 wallTime = "48:00:00" 37 if memory == None: 38 memory = 4000 39 #if modules == None: 40 # modules = ["python", "ruby"] 41 ClusterConnection.__init__(self, account=account, workdir=workdir, settings=settings, memory=memory, cores=cores, modules=modules, wallTime=wallTime) 42 self.submitCommand = "sbatch" 43 self.jobListCommand = "squeue" 44 self.jobTemplate = SLURMJobTemplate
45
46 - def submit(self, script=None, jobDir=None, jobName=None, stdout=None, stderr=None):
47 pstdout, pstderr = ClusterConnection.submit(self, script, jobDir, jobName, stdout, stderr) 48 if pstderr != None: 49 print >> sys.stderr, pstderr 50 print >> sys.stderr, pstdout 51 assert pstdout.startswith("Submitted batch job"), pstdout 52 jobId = int(pstdout.split()[-1]) 53 return self._writeJobFile(jobDir, jobName, {"SLURMID":jobId}, append=True)
54
55 - def getJobStatus(self, job):
56 jobAttr = self._readJobFile(job) 57 # Check whether job exists 58 if jobAttr == None: 59 return None 60 if "SLURMID" not in jobAttr: 61 return "FAILED" # submitting the job failed 62 for line in self.run("sacct -u " + self.getUserName() + " -j " + jobAttr["SLURMID"]): 63 line = line.strip() 64 splits = line.split() 65 #if self.debug: 66 # print >> sys.stderr, "sacct line:", line 67 #print splits 68 if splits[0] == jobAttr["SLURMID"]: 69 if self.debug: 70 print >> sys.stderr, "sacct:", line 71 jobStatus = splits[5] 72 if jobStatus in ["RUNNING", "COMPLETING"]: 73 return "RUNNING" 74 elif jobStatus == "COMPLETED": 75 if "retcode" not in jobAttr: # file hasn't had the time to be updated? 76 return "RUNNING" 77 elif jobAttr["retcode"] == "0": 78 return "FINISHED" 79 else: 80 return "FAILED" 81 elif jobStatus in ["FAILED", "CANCELLED", "NODE_FAIL", "PREEMPTED", "TIMEOUT"]: 82 return "FAILED" 83 elif jobStatus in ["PENDING", "RESIZING", "SUSPENDED"]: 84 return "QUEUED" 85 else: 86 assert False, jobStatus 87 return "QUEUED"
88 89 # def makeJobScript(self, script, jobDir=None, jobName=None, stdout=None, stderr=None, wallTime=None, modules=None, cores=None, memory=None): 90 # """ 91 # Make a SLURM job submission script 92 # """ 93 # s = "#!/bin/bash -l \n" 94 # s += "##execution shell environment \n\n" 95 # 96 # #stdout, stderr = self.getStreams(stdout, stderr, name, jobWorkDir) 97 # s += "## name of your job" + "\n" 98 # s += "#SBATCH -J " + jobName + "\n" 99 # stdout, stderr = self.getStreams(stdout, stderr, jobDir, jobName) 100 # s += "## system error message output file" + "\n" 101 # s += "#SBATCH -e " + stderr + "\n" 102 # s += "## system message output file" + "\n" 103 # s += "#SBATCH -o " + stdout + "\n" 104 # s += "## a per-process (soft) memory limit" + "\n" 105 # s += "## limit is specified in MB" + "\n" 106 # s += "## example: 1 GB is 1000" + "\n" 107 # #s += "#SBATCH --mem-per-cpu=16000" + "\n" 108 # if memory == None: memory = self.memory 109 # s += "#SBATCH --mem-per-cpu=" + str(memory) + "\n" 110 # if wallTime == None: wallTime = self.wallTime 111 # s += "## how long a job takes, wallclock time hh:mm:ss" + "\n" 112 # s += "#SBATCH -t " + wallTime + "\n" 113 # if cores == None: cores = self.cores 114 # s += "## number of processes" + "\n" 115 # s += "#SBATCH -n " + str(cores) + "\n" 116 # 117 # s += "mkdir -p " + os.path.dirname(stdout) + "\n" # ensure output directory exists 118 # s += "mkdir -p " + os.path.dirname(stderr) + "\n" # ensure output directory exists 119 # 120 # if modules == None: 121 # modules = self.modules 122 # if modules != None: 123 # for module in modules: 124 # s += "module load " + module + "\n" 125 # if self.remoteSettingsPath != None: # Use a specific configuration file 126 # s += "export TEES_SETTINGS=" + self.remoteSettingsPath + "\n" 127 # if jobDir != None: 128 # s += "mkdir -p " + self.getRemotePath(jobDir) + "\n" # ensure output directory exists 129 # s += "cd " + self.getRemotePath(jobDir) + "\n\n" # move to output directory where the program will be run 130 # s += script + "\n" 131 # # Store return value in job file 132 # s += "echo retcode=$? >> " + self.getRemotePath(self._getJobPath(jobDir, jobName)) 133 # 134 # return s 135