simulateAWS Namespace Reference

Functions

def runSimulations (smcTable, simDir, buildDir, paramStringTemplate, exeNames, material, samples, onNode=False, nodes=[], cores=0, infrastructure=[], verbose=False)
 
def runProcess (simDir, id)
 
def startAndWaitForSimulationsToFinish (simDir, samples)
 
def numberOfCommands (smcTable, exeNames)
 
def mergeOutputFiles (smcTable, simDir, exeNames, material, verbose=False)
 
def getFreeCores (nodes)
 

Function Documentation

◆ getFreeCores()

def simulateAWS.getFreeCores (   nodes)
240 def getFreeCores(nodes):
241  cwd = os.getcwd()
242  freeCores = []
243  for i in nodes:
244  node = 'node%r' % i
245  num = subprocess.check_output('ssh -Y %s python %s/numOfCores.py exit' % (node,cwd), shell=True)
246  # use local node
247  #num = subprocess.check_output('python %s/numOfCores.py' % (cwd), shell=True)
248  print ('%i free nodes in %s' % (int(num), node))
249  for i in range(int(num)):
250  freeCores.append(node)
251  return freeCores
return int(ret)+1
def getFreeCores(nodes)
Definition: simulateAWS.py:240

References int().

Referenced by runSimulations().

◆ mergeOutputFiles()

def simulateAWS.mergeOutputFiles (   smcTable,
  simDir,
  exeNames,
  material,
  verbose = False 
)
199 def mergeOutputFiles(smcTable, simDir, exeNames, material, verbose=False):
200  # open smcTable
201  file = open(smcTable)
202  # ignore header line
203  params = file.readline()
204  if verbose:
205  print("Writing combined data files in folder %s" % simDir)
206  # reading smcTable line by line
207  for line in file.readlines():
208  # extracting parameters
209  params = line.split()[2:]
210  # convert list to tuple
211  params = tuple(i for i in params)
212  # read in output files
213  paramStringOut = "_"+material+"_"+"_".join(params)+".txt"
214  out=""
215  # read files
216  for executable in exeNames:
217  # skip executables that are labeled -nodata
218  if "-nodata" in executable:
219  continue
220  file = simDir + executable.split()[0] + paramStringOut
221  #print("Reading in file: %s" % file)
222  # merge the output files into a single file
223  try:
224  content = open(file).readline()
225  except:
226  open(file, 'w').write("0 0 0")
227  content = "0 0 0"
228  # raise Exception(
229  # "Directory %s doesn't contain the data files *%s.\nRemove the directory %s and run the simulations again" % (
230  # simDir, paramStringOut, simDir))
231  # sys.exit(-1)
232  out = out + content + " "
233  # file that will be written
234  outFile = simDir + "data" + paramStringOut
235  if verbose:
236  print(" data%s: %s" % (paramStringOut, out))
237  open(outFile, "w").write(out)
238 
239 # get number of free cores
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE std::enable_if_t< dt !=data_source::global_mem, void > write(PacketType &packet_data, DataScalar ptr)
write, a template function used for storing the data to local memory. This function is used to guaran...
Definition: TensorContractionSycl.h:221
TupleImpl< sizeof...(Types), Types... > tuple
Definition: Tuple.h:267
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Packet print(const Packet &a)
Definition: GenericPacketMath.h:1166
def mergeOutputFiles(smcTable, simDir, exeNames, material, verbose=False)
Definition: simulateAWS.py:199

References Eigen::internal.print(), and Eigen::TensorSycl::internal.write().

◆ numberOfCommands()

def simulateAWS.numberOfCommands (   smcTable,
  exeNames 
)
191 def numberOfCommands(smcTable, exeNames):
192  # count lines, ignore header line
193  fileCount = open(smcTable)
194  numObs = -1
195  for line in fileCount.readlines(): numObs += 1
196  fileCount.close()
197  return len(exeNames) * numObs
198 
def numberOfCommands(smcTable, exeNames)
Definition: simulateAWS.py:191

Referenced by runSimulations().

◆ runProcess()

def simulateAWS.runProcess (   simDir,
  id 
)
140 def runProcess(simDir, id):
141  # Output and error logs go to id.out and id.err.
142  # When a process is killed/terminated, t.e output file will exist, but is empty.
143  cmd = f"{simDir}{id}.sh"
144  return subprocess.Popen(cmd)
145 
def runProcess(simDir, id)
Definition: simulateAWS.py:140

Referenced by startAndWaitForSimulationsToFinish().

◆ runSimulations()

def simulateAWS.runSimulations (   smcTable,
  simDir,
  buildDir,
  paramStringTemplate,
  exeNames,
  material,
  samples,
  onNode = False,
  nodes = [],
  cores = 0,
  infrastructure = [],
  verbose = False 
)
19  cores=0, infrastructure=[], verbose=False):
20  # # run make to produce the executables
21  # print("Running make in the build directory %s" % buildDir)
22  # try:
23  # output = subprocess.check_output('make -j 8 -C ' + buildDir, shell=True)
24  # except:
25  # print(output)
26  # raise RuntimeError('Calling make in the buildDirectory failed')
27 
28  # make simulation directory absolute
29  simDir = os.getcwd() + "/" + simDir
30  # build directory is already absolute
31 
32  # make the simulation directory if necessary
33  if not os.path.exists(simDir):
34  if verbose:
35  print('Creating directory for simulation output: ' + simDir)
36  os.mkdir(simDir)
37 
38  # check if params file exists
39  if not os.path.exists(smcTable):
40  raise RuntimeError('smcTable does not exist')
41 
42 
43  # if cores>0 a script is produced, splitting the code into #cores executables
44  if cores:
45  # todo failure if there are not enough commands per cores
46  numCmdTotal = numberOfCommands(smcTable, exeNames)
47  numCmd = int(ceil(numCmdTotal/cores))
48  cores = int(ceil(numCmdTotal/numCmd))
49  print("Number of commands %d, per script %d, number of scripts: %d" % (numCmdTotal, numCmd, cores))
50  #override buildDir (only necessary on mercuryCloud, since we cannot run the python script there)
51  onMercuryCloud = False
52  if onMercuryCloud:
53  print("Adjusting script for mercuryCloud")
54  buildDir = open('build').read().split()[0]
55  print("Copy files to cluster (rsync -avz %s $cloud:) and execute ./run.sh in the sim directory before continuing" % material)
56  elif infrastructure == "AWS":
57  print("Adjusting script for AWS")
58  buildDir = "../../."
59  else:
60  print("Adjusting script for msm3")
61  buildDir = open('build').read().split()[0]
62  #buildDir = "~/Code/Lab/build/Drivers/USER/MercuryLab/Buttercup/Calibration"
63  print("1) cd %r\n2) Adjust node numbers run.sh.\n3) Execute 'source ./run.sh'" % simDir)
64  # set counter into which file the next command is written
65  coresCounter = -1
66  numCmdCounter = 0
67  # write run.sh and create an empty file $coresCounter.sh
68  open(simDir + "/run.sh", "w").write("#!/bin/bash\nchmod +x *.sh\n")
69  for i in range(cores):
70  if onMercuryCloud:
71  open(simDir+"/run.sh", "a").write("nohup ./"+str(i)+".sh &\n")
72  elif infrastructure == "AWS":
73  os.remove(simDir+"/run.sh")
74  break
75  # open(simDir+"/run.sh", "a").write("nohup ./"+str(i)+".sh 2>&1 &\n")
76  else:
77  n = nodes[i%len(nodes)]
78  if n.isdigit():
79  n = "node%s" % n
80  open(simDir + "/run.sh", "a").write("sleep 3\n ssh %s \"cd \"`pwd`\"; nohup ./%d.sh > %s.out\"&\n" % (n,i,i))
81  # fix for windows systems
82  # cmd = "cd " + simDir + " && sed -i -e 's/\r$//' run.sh"
83  # subprocess.Popen(cmd.split(), shell=True)
84 
85  # open smcTable
86  #print("Opening smcTable: %s" % smcTable)
87  file = open(smcTable)
88  # ignore header line
89  params = file.readline()
90 
91  # reading smcTable line by line
92  for line in file.readlines():
93  # extracting parameters
94  params = line.split()[2:]
95  # convert list to tuple
96  params = tuple(i for i in params)
97  # set simulation parameters
98  paramString = paramStringTemplate % params
99  paramString += "-param _"+material+"_"+"_".join(params)
100  #print("Running simulations for: %s" % paramString)
101 
102  if cores:
103  for executable in exeNames:
104  if numCmdCounter == 0:
105  coresCounter += 1
106  open(simDir + str(coresCounter) + ".sh", "w").write("")
107  cmd = '%s/%s %s\n' % (buildDir, executable, paramString)
108  #print("Writing to %r, %r" % (simDir+str(coresCounter)+".sh", coresCounter))
109  open(simDir+str(coresCounter)+".sh", "a").write("#!/bin/sh\n" + cmd)
110  numCmdCounter = (numCmdCounter+1)%numCmd
111  elif onNode:
112  # initialise the freeCores list
113  freeCores = getFreeCores(nodes)
114  for executable in exeNames:
115  cmd = 'cd %s; %s/%s %s' % (simDir, buildDir, executable, paramString)
116  # look for new free cores when needed
117  while len(freeCores) == 0:
118  print("No free cores, waiting for nodes to free up")
119  # time to wait in seconds between checking for free nodes
120  time.sleep(300)
121  freeCores = getFreeCores(nodes)
122  node = freeCores.pop()
123  outname = paramString.replace(" ", "")
124  sshcmd = "sleep 1\n ssh %s \"%s\"" % (node, cmd + ' &> ' + executable + '_' + outname + '.out &')
125  print(sshcmd)
126  subprocess.check_output(sshcmd, shell=True)
127  else:
128  for executable in exeNames:
129  cmd = 'cd %s && %s/%s %s' % (simDir, buildDir, executable, paramString)
130  if verbose:
131  print("Running in serial: ./%s %s" % (executable, paramString))
132  subprocess.check_output(cmd, shell=True)
133  if infrastructure == "AWS":
134  cwd = os.getcwd()
135  startAndWaitForSimulationsToFinish(simDir, samples)
136  # Change working directory to simDir to save output files in the right place
137  os.chdir(cwd)
138  print("Runs finished.")
139 
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE std::enable_if_t< PacketLoad, PacketType > read(const TensorMapper &tensorMapper, const StorageIndex &NCIndex, const StorageIndex &CIndex, const StorageIndex &ld)
read, a template function used for loading the data from global memory. This function is used to guar...
Definition: TensorContractionSycl.h:162
EIGEN_STRONG_INLINE EIGEN_DEVICE_FUNC bfloat16 ceil(const bfloat16 &a)
Definition: BFloat16.h:644
str
Definition: compute_granudrum_aor.py:141
void split(const DoubleVector &in_vector, Vector< DoubleVector * > &out_vector_pt)
Definition: double_vector.cc:1413
def startAndWaitForSimulationsToFinish(simDir, samples)
Definition: simulateAWS.py:146

References Eigen::bfloat16_impl.ceil(), getFreeCores(), int(), numberOfCommands(), Eigen::internal.print(), Eigen::TensorSycl::internal.read(), oomph::DoubleVectorHelpers.split(), startAndWaitForSimulationsToFinish(), compute_granudrum_aor.str, and Eigen::TensorSycl::internal.write().

◆ startAndWaitForSimulationsToFinish()

def simulateAWS.startAndWaitForSimulationsToFinish (   simDir,
  samples 
)
146 def startAndWaitForSimulationsToFinish(simDir, samples):
147 
148  # Start all processes and add to array.
149  processes = []
150  cmd = "cd %s && chmod +x *.sh" % simDir
151  subprocess.check_output(cmd, shell=True)
152  # Change working directory to simDir to save output files in the right place
153  os.chdir(simDir)
154  for i in range(samples):
155  processes.append(runProcess(simDir, i))
156  print("Started process with PID %s" % processes[i])
157  # Finished processes are removed from the array.
158  # Keep looping when there are more than 20% in the array.
159  # ratio of simulations to kill when the number of (1-killRatio) * samples has finished.
160  killRatio = 0.2
161  while len(processes) > killRatio * samples:
162  for p in processes:
163  # poll() checks if the process is terminated and returns None when this is NOT the case.
164  # Remove process from array when finished.
165  if p.poll() is not None:
166  processes.remove(p)
167  # Sleep for a while to only check at certain intervals.
168  time.sleep(0.5)
169 
170  print(f"At least {(1-killRatio) * 100}% of processes finished.")
171  print(f"Killing remaining {len(processes)} processes.")
172 
173  # 80% of processes are finished, now kill the remaining processes.
174  for p in processes:
175  #p.terminate()
176  # Since we definitely don't care anymore about the remaining processes, I would use kill.
177  p.kill()
178 
179  # simulationOutput = []
180  # while True:
181  # simulationOutput.clear()
182  # for file in os.listdir(simDir):
183  # if file.endswith(".txt"):
184  # simulationOutput.append(os.path.join(simDir, file))
185  # if len(simulationOutput) >= continueThreshold * samples:
186  # print("80% of simulations finished, quitting simulations")
187  # # Kill remaining PIDS HERE to be most efficient
188  # return
189 
190 # computes number of commands to be executed

References Eigen::internal.print(), and runProcess().

Referenced by runSimulations().