SMT-RAT  24.02
Toolbox for Strategic and Parallel Satisfiability-Modulo-Theories Solving
SlurmBackend.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include "Backend.h"
4 
5 #include <benchmax/logging.h>
7 
8 #include "slurm/SlurmSettings.h"
9 #include "slurm/SlurmUtilities.h"
10 
11 #include <filesystem>
12 #include <future>
13 #include <mutex>
14 #include <regex>
15 #include <thread>
16 
17 #include "../utils/parsing.h"
18 namespace benchmax {
19 
20 /**
21  * Backend for the Slurm workload manager.
22  *
23  * The execution model is as follows:
24  * We create multiple jobs that each consists of multiple array jobs that each execute one slice of the task list.
25  * One array job executes Settings::slice_size entries of the task list.
26  * One job consists of Settings::array_size array jobs.
27  * We start as many jobs as necessary.
28  */
29 class SlurmBackend: public Backend {
30 private:
31  /// A job consists of a tool, an input file, a base dir and results.
32  using JobData = std::tuple<
33  const Tool*,
34  std::filesystem::path,
36  >;
37 
38  /// Mutex for submission delay.
39  std::mutex mSubmissionMutex;
40  /// Mutex for slurmjobs file
41  std::mutex mSlurmjobMutex;
42 
43  /// Parse the content of an output file.
44  void parse_result_file(const Jobs& jobs, const std::filesystem::path& file, std::map<size_t, JobData>& results) {
45  BENCHMAX_LOG_DEBUG("benchmax.slurm", "Processing file " << file);
46  std::ifstream in(file);
47  std::string content((std::istreambuf_iterator<char>(in)), std::istreambuf_iterator<char>());
48  auto extension = file.extension();
49 
50  if (extension == ".out") {
51  std::regex filere("Executing (.+)\\n# START ([0-9]+) #([^#]*)# END \\2 #(?:([^#]*)# END DATA \\2 #)?");
52  auto reBegin = std::sregex_iterator(content.begin(), content.end(), filere);
53  auto reEnd = std::sregex_iterator();
54  for (auto i = reBegin; i != reEnd; ++i) {
55  std::size_t id = std::stoull((*i)[2]) - 1;
56  if (results.find(id) == results.end()) results.emplace(id, JobData {jobs.tools().begin()->get(), "", BenchmarkResult() });
57 
58  bool toolFound = false;
59  std::string cmdline = (*i)[1];
60  for (const auto& tool : jobs.tools()) {
61  auto t = tool->parseCommandline(cmdline);
62  if (t) {
63  std::get<0>(results[id]) = tool.get();
64  std::get<1>(results[id]) = std::filesystem::path(*t);
65  toolFound = true;
66  break;
67  }
68  }
69  if (!toolFound) {
70  BENCHMAX_LOG_WARN("benchmax.slurm", "Could not find tool for " << cmdline);
71  }
72 
73  auto& res = std::get<2>(results[id]);
74  res.stdout = (*i)[3];
75  res.exitCode = std::stoi(slurm::parse_result_info((*i)[4], "exitcode"));
76  res.time = std::chrono::milliseconds(std::stoi(slurm::parse_result_info((*i)[4], "time")));
77  BENCHMAX_LOG_DEBUG("benchmax.slurm", "Got " << res << " for task " << id << " from stdout");
78  }
79  } else if (extension == ".err") {
80  std::regex filere("# START ([0-9]+) #([^#]*)# END \\1 #(?:([^#]*)# END DATA \\1 #)?");
81  auto reBegin = std::sregex_iterator(content.begin(), content.end(), filere);
82  auto reEnd = std::sregex_iterator();
83  for (auto i = reBegin; i != reEnd; ++i) {
84  std::size_t id = std::stoull((*i)[1]) - 1;
85  if (results.find(id) == results.end()) results.emplace(id, JobData {jobs.tools().begin()->get(), "", BenchmarkResult() });
86 
87  auto& res = std::get<2>(results[id]);
88  res.stderr = (*i)[2];
89  res.peak_memory_kbytes = parse_peak_memory(res.stderr);
90  BENCHMAX_LOG_DEBUG("benchmax.slurm", "Got " << res << " for task " << id << " from stderr");
91  }
92  } else {
93  BENCHMAX_LOG_WARN("benchmax.slurm", "Trying to parse output file with unexpected extension " << extension);
94  }
95  }
96 
97  std::pair<std::size_t,std::size_t> get_job_range(std::size_t n, std::size_t numJobs) const {
98  std::size_t job_size = settings_slurm().array_size * settings_slurm().slice_size;
99  return std::make_pair(
100  job_size * n,
101  std::min(job_size * (n + 1), numJobs)
102  );
103  }
104 
105  void store_job_id(int jobid) {
106  mSlurmjobMutex.lock();
107  std::ofstream out(settings_slurm().tmp_dir + "/slurmjobs", std::ios_base::app);
108  out << jobid << std::endl;
109  out.close();
110  mSlurmjobMutex.unlock();
111  }
112 
113  std::vector<int> load_job_ids() {
114  std::vector<int> res;
115  std::ifstream in(settings_slurm().tmp_dir + "/slurmjobs");
116  if (!in) {
117  return res;
118  }
119  int jobid;
120  while(in >> jobid) {
121  res.push_back(jobid);
122  }
123  in.close();
124  return res;
125  }
126 
127  void remove_job_ids() {
128  if( std::remove( (settings_slurm().tmp_dir + "/slurmjobs").c_str() ) != 0 ){
129  BENCHMAX_LOG_WARN("benchmax.slurm", settings_slurm().tmp_dir + "/slurmjobs file could not be deleted");
130  }
131  }
132 
133  void run_job_async(std::size_t n, const std::vector<JobData>& results, bool wait_for_termination) {
135 
136  std::string jobsfilename = settings_slurm().tmp_dir + "/jobs-" + std::to_string(settings_core().start_time) + "-" + std::to_string(n+1) + ".jobs";
137  auto job_range = get_job_range(n, results.size());
138  slurm::generate_jobs_file(jobsfilename, job_range, results);
139 
140  auto submitfile = slurm::generate_submit_file_chunked({
141  std::to_string(settings_core().start_time) + "-" + std::to_string(n),
142  jobsfilename,
143  settings_slurm().tmp_dir,
144  settings_benchmarks().limit_time,
145  settings_benchmarks().grace_time,
146  settings_benchmarks().limit_memory,
147  settings_slurm().array_size,
148  settings_slurm().slice_size,
149  job_range
150  });
151 
152  BENCHMAX_LOG_INFO("benchmax.slurm", "Delaying for " << settings_slurm().submission_delay);
153  {
154  std::lock_guard<std::mutex> guard(mSubmissionMutex);
155  std::this_thread::sleep_for(settings_slurm().submission_delay);
156  }
157  BENCHMAX_LOG_INFO("benchmax.slurm", "Submitting job now.");
158 
159  std::stringstream cmd;
160  cmd << "sbatch";
161  if (wait_for_termination) cmd << " --wait";
162  cmd << " --array=1-" << std::to_string(settings_slurm().array_size);
163  cmd << " " << settings_slurm().sbatch_options;
164  cmd << " " + submitfile;
165  BENCHMAX_LOG_DEBUG("benchmax.slurm", "Command: " << cmd.str());
166  std::string output;
167  call_program(cmd.str(), output, true);
168  int jobid = slurm::parse_job_id(output);
169  if (wait_for_termination) {
170  BENCHMAX_LOG_INFO("benchmax.slurm", "Job terminated.");
171  } else {
172  store_job_id(jobid);
173  BENCHMAX_LOG_INFO("benchmax.slurm", "Job " << jobid << " was scheduled.");
174  }
175  }
176 
177  bool collect_results(const Jobs& jobs, bool check_finished) override {
178  if (check_finished) {
179  BENCHMAX_LOG_INFO("benchmax.slurm", "Check if job finished.");
180  auto jobids = load_job_ids();
181  if (jobids.size() == 0) {
182  BENCHMAX_LOG_ERROR("benchmax.slurm", "Jobids could not be determined!");
183  return false;
184  }
185  for (int jobid : jobids) {
186  if (!slurm::is_job_finished(jobid)) {
187  BENCHMAX_LOG_WARN("benchmax.slurm", "Job " << jobid << " is not finished yet.");
188  return false;
189  }
190  }
191  }
192 
193  BENCHMAX_LOG_INFO("benchmax.slurm", "Collecting results.");
194  std::map<size_t, JobData> results;
195  auto files = slurm::collect_result_files(settings_slurm().tmp_dir);
196  for (const auto& f: files) {
197  parse_result_file(jobs, f, results);
198  }
199  BENCHMAX_LOG_DEBUG("benchmax.slurm", "Parsed results.");
200  for (auto& [rid, r]: results) {
201  addResult(std::get<0>(r), std::get<1>(r), std::move(std::get<2>(r)));
202  }
203  if (settings_slurm().archive_log_file != "") {
205  settings_slurm().archive_log_file + "-" + std::to_string(settings_core().start_time) + ".tgz",
206  settings_slurm().tmp_dir
207  });
208  }
209  slurm::remove_log_files(files, !settings_slurm().keep_logs);
210 
211  if (check_finished) {
212  remove_job_ids();
213  }
214 
215  return true;
216  }
217 public:
218  bool suspendable() const {
219  return true;
220  }
221  /// Run all tools on all benchmarks using Slurm.
222  void run(const Jobs& jobs, bool wait_for_termination) {
223  if (load_job_ids().size() > 0) {
224  BENCHMAX_LOG_ERROR("benchmax.slurm", "Benchmax is still running in the specified tmp_dir! If this is not the case, please delete " + settings_slurm().tmp_dir + "/slurmjobs");
225  return;
226  }
227 
228  std::vector<JobData> results;
229  for (const auto& [tool, file]: jobs.randomized()) {
230  results.emplace_back(JobData { tool, file, BenchmarkResult() });
231  }
232  BENCHMAX_LOG_DEBUG("benchmax.slurm", "Gathered " << results.size() << " jobs");
233 
234  std::vector<std::future<void>> tasks;
235  std::size_t count = results.size() / (settings_slurm().array_size * settings_slurm().slice_size);
236  if (results.size() % (settings_slurm().array_size * settings_slurm().slice_size) > 0) count += 1;
237  for (std::size_t i = 0; i < count; ++i) {
238  tasks.emplace_back(std::async(std::launch::async,
239  [i,&results,wait_for_termination,this](){
240  return run_job_async(i, results, wait_for_termination);
241  }
242  ));
243  }
244  for (auto& f: tasks) {
245  f.wait();
246  }
247  if (wait_for_termination) {
248  BENCHMAX_LOG_DEBUG("benchmax.slurm", "All jobs terminated.");
249  } else {
250  BENCHMAX_LOG_DEBUG("benchmax.slurm", "All jobs scheduled.");
251  }
252  }
253 };
254 
255 }
#define BENCHMAX_LOG_DEBUG(channel, msg)
Log debug messages.
Definition: logging.h:55
#define BENCHMAX_LOG_WARN(channel, msg)
Log warnings.
Definition: logging.h:51
#define BENCHMAX_LOG_INFO(channel, msg)
Log informational messages.
Definition: logging.h:53
#define BENCHMAX_LOG_ERROR(channel, msg)
Log errors.
Definition: logging.h:49
Base class for all backends.
Definition: Backend.h:23
void addResult(const Tool *tool, const fs::path &file, BenchmarkResult &&result)
Add a result.
Definition: Backend.h:97
Represents a set of jobs, constructed as the cartesian product of a set of tools and a set of benchma...
Definition: Jobs.h:70
const auto & tools() const
Returns the set of tools.
Definition: Jobs.h:88
auto randomized() const
Returns all jobs in a pseudo-randomized order.
Definition: Jobs.h:109
Backend for the Slurm workload manager.
Definition: SlurmBackend.h:29
bool collect_results(const Jobs &jobs, bool check_finished) override
Definition: SlurmBackend.h:177
bool suspendable() const
Definition: SlurmBackend.h:218
void store_job_id(int jobid)
Definition: SlurmBackend.h:105
std::pair< std::size_t, std::size_t > get_job_range(std::size_t n, std::size_t numJobs) const
Definition: SlurmBackend.h:97
void run(const Jobs &jobs, bool wait_for_termination)
Run all tools on all benchmarks using Slurm.
Definition: SlurmBackend.h:222
std::vector< int > load_job_ids()
Definition: SlurmBackend.h:113
std::mutex mSubmissionMutex
Mutex for submission delay.
Definition: SlurmBackend.h:39
void run_job_async(std::size_t n, const std::vector< JobData > &results, bool wait_for_termination)
Definition: SlurmBackend.h:133
std::mutex mSlurmjobMutex
Mutex for slurmjobs file.
Definition: SlurmBackend.h:41
std::tuple< const Tool *, std::filesystem::path, BenchmarkResult > JobData
A job consists of a tool, an input file, a base dir and results.
Definition: SlurmBackend.h:36
void parse_result_file(const Jobs &jobs, const std::filesystem::path &file, std::map< size_t, JobData > &results)
Parse the content of an output file.
Definition: SlurmBackend.h:44
Base class for any tool.
Definition: Tool.h:38
static void remove(V &ts, const T &t)
Definition: Alg.h:36
void clear_directory(const fs::path &basedir)
Clear log files from directory.
void remove_log_files(const std::vector< fs::path > &files, bool remove)
Remove the given list of files.
void archive_log_files(const ArchiveProperties &p)
Put all log files into an archive.
std::string generate_submit_file_chunked(const ChunkedSubmitfileProperties &p)
void generate_jobs_file(const std::string &filename, std::pair< std::size_t, std::size_t > range, const Jobs &jobs)
int parse_job_id(const std::string &output)
Parses the job id from the output of sbatch.
std::vector< fs::path > collect_result_files(const fs::path &basedir)
Collects all result files in the given base directory for this job id.
std::string parse_result_info(const std::string &content, const std::string &name)
Parse a single result information from the output.
bool is_job_finished(int jobid)
Checks if the given job is finished.
int call_program(const std::string &commandline, std::string &stdout, bool print_to_stdout=false)
Runs an external program from some command line and records the output to stdout.
Definition: execute.h:18
const auto & settings_core()
Retrieved core settings.
Definition: Settings.h:81
const auto & settings_slurm()
Return the Slurm settings.
Definition: SlurmSettings.h:32
const auto & settings_benchmarks()
Return the benchmark settings.
Definition: benchmarks.h:41
std::size_t parse_peak_memory(const std::string &output)
Definition: parsing.h:4
auto get(const It &it, level)
Results for a single benchmark run.