SMT-RAT  24.02
Toolbox for Strategic and Parallel Satisfiability-Modulo-Theories Solving
SSHConnection.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <mutex>
4 #include <sys/stat.h>
5 #include <libssh/libssh.h>
6 #include <libssh/sftp.h>
7 
8 #include "Node.h"
9 #include "SSHSettings.h"
11 #include <benchmax/logging.h>
12 
13 #include "../../utils/parsing.h"
14 
15 #define SSH_LOCKED(expr) { std::lock_guard<std::mutex> guard(mutex); expr; }
16 
17 namespace benchmax {
18 namespace ssh {
19 
20 /// A wrapper class that manages a single SSH connection as specified in a Node object (with all its channels).
22 private:
23  /// The node object.
25  /// The number of currently active channels.
26  std::size_t curChannels;
27  /// The maximal number of channels allowed.
28  std::size_t maxChannels;
29  /// The number of currently running jobs.
30  std::atomic<std::size_t> curJobs;
31  /// The SSH session handle.
32  ssh_session session;
33  /// Mutex.
34  std::mutex mutex;
35  /// Verbosity needed due to libssh interface.
36  int verbosity;
37 
38  /// Parse a duration from stdout.
39  std::chrono::milliseconds parse_duration(const std::string& output) const {
40  std::regex re("Start: ([0-9]+).*End: ([0-9]+)", std::regex::extended); //".*End: (\\d+)$");
41  std::smatch m;
42  if (std::regex_search(output, m, re)) {
43  std::size_t p;
44  std::size_t start = std::stoull(m[1].str(), &p);
45  std::size_t end = std::stoull(m[2].str(), &p);
46  return std::chrono::milliseconds(end - start);
47  } else {
48  return std::chrono::milliseconds(0);
49  }
50  }
51 
52  /// Allocate a new channel from the current SSH session.
53  ssh_channel getChannel() {
54  std::lock_guard<std::mutex> guard(mutex);
55  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Allocating channel, currently " << curChannels << " / " << maxChannels);
56  assert(!busy());
57  ssh_channel channel = ssh_channel_new(session);
58  if (channel == nullptr) {
59  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to create new ssh channel: " << ssh_get_error(session));
60  exit(1);
61  }
62  int rc = ssh_channel_open_session(channel);
63  if (rc != SSH_OK) {
64  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to open ssh channel: " << ssh_get_error(session));
65  exit(1);
66  }
67  curChannels++;
68  return channel;
69  }
70 
71  /// Get a new SCP session for file transfer.
72  ssh_scp getSCP(int mode, const std::string& basedir) {
73  std::lock_guard<std::mutex> guard(mutex);
74  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Allocating scp, currently " << curChannels << " / " << maxChannels);
75  assert(!busy());
76  ssh_scp scp = ssh_scp_new(session, mode, basedir.c_str());
77  if (scp == nullptr) {
78  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to create new scp session: " << ssh_get_error(session));
79  exit(1);
80  }
81  int rc = ssh_scp_init(scp);
82  if (rc != SSH_OK) {
83  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to initialize scp session: " << ssh_get_error(session));
84  exit(1);
85  }
86  curChannels++;
87  return scp;
88  }
89 
90  /// Get a new SFTP session for file transfer.
91  sftp_session getSFTP() {
92  std::lock_guard<std::mutex> guard(mutex);
93  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Allocating sftp, currently " << curChannels << " / " << maxChannels);
94  assert(!busy());
95  sftp_session sftp = sftp_new(session);
96  if (sftp == nullptr) {
97  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to create new sftp session: " << ssh_get_error(session));
98  exit(1);
99  }
100  int rc = sftp_init(sftp);
101  if (rc != SSH_OK) {
102  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to initialize sftp session: " << ssh_get_error(session));
103  exit(1);
104  }
105  curChannels++;
106  return sftp;
107  }
108 
109  /// Terminate a SSH channel.
110  void destroy(ssh_channel channel) {
111  std::lock_guard<std::mutex> guard(mutex);
112  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Destroying channel, currently " << curChannels << " / " << maxChannels);
113  ssh_channel_close(channel);
114  ssh_channel_free(channel);
115  curChannels--;
116  }
117 
118  /// Terminate a SCP session.
119  void destroy(ssh_scp scp) {
120  std::lock_guard<std::mutex> guard(mutex);
121  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Destroying scp, currently " << curChannels << " / " << maxChannels);
122  ssh_scp_close(scp);
123  ssh_scp_free(scp);
124  curChannels--;
125  }
126 
127  /// Terminate a SFTP session.
128  void destroy(sftp_session sftp) {
129  std::lock_guard<std::mutex> guard(mutex);
130  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Destroying sftp, currently " << curChannels << " / " << maxChannels);
131  sftp_free(sftp);
132  curChannels--;
133  }
134 public:
135  /// Create a new connection for the given node.
136  SSHConnection(const Node& n): node(n), curChannels(0), maxChannels(node.cores), curJobs(0) {
137  session = ssh_new();
138  if (session == nullptr) {
139  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to create SSH session.");
140  }
141  verbosity = SSH_LOG_NOLOG;
142  ssh_options_set(session, SSH_OPTIONS_LOG_VERBOSITY, &verbosity);
143  ssh_options_set(session, SSH_OPTIONS_HOST, node.hostname.c_str());
144  ssh_options_set(session, SSH_OPTIONS_PORT, &(node.port));
145  ssh_options_set(session, SSH_OPTIONS_USER, node.username.c_str());
146 
147  int rc = ssh_connect(session);
148  if (rc != SSH_OK) {
149  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to connect to " << node.username << "@" << node.hostname);
150  exit(1);
151  }
152  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Connected to " << node.username << "@"<< node.hostname);
153 
154  rc = ssh_userauth_publickey_auto(session, nullptr, nullptr);
155  if (rc != SSH_AUTH_SUCCESS) {
156  rc = ssh_userauth_password(session, nullptr, node.password.c_str());
157  if (rc != SSH_AUTH_SUCCESS) {
158  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to authenticate as " << node.username << ".");
159  exit(1);
160  }
161  }
162  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Authenticated as " << node.username << ".");
163  }
164  /// Wait for all channels to terminate.
166  while (curChannels > 0) {
167  std::this_thread::sleep_for(std::chrono::milliseconds(100));
168  }
169  ssh_disconnect(session);
170  ssh_free(session);
171  }
172  /// Return the node.
173  const Node& getNode() const {
174  return node;
175  }
176  /// Check if a new job could be started.
177  bool jobFree() {
178  return curJobs < maxChannels;
179  }
180  /// Increase job counter.
181  void newJob() {
182  assert(curJobs < maxChannels);
183  curJobs++;
184  }
185  /// Decrease job counter.
186  void finishJob() {
187  curJobs--;
188  }
189  /// Current number of jobs.
190  std::size_t jobs() const {
191  return curJobs;
192  }
193  /// Check if all channels are busy.
194  bool busy() {
195  //BENCHMAX_LOG_DEBUG("benchmax.ssh", "Currently " << curChannels << " / " << maxChannels);
196  return curChannels >= maxChannels;
197  }
198 
199  /// Create a temporary directory on the remote.
200  std::string createTmpDir(const std::string& folder) {
201  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Creating directory " << folder);
202  ssh_scp scp = getSCP(SSH_SCP_WRITE | SSH_SCP_RECURSIVE, settings_ssh().tmpdir.c_str());
203  int rc;
204  SSH_LOCKED(rc = ssh_scp_push_directory(scp, folder.c_str(), S_IRWXU));
205  if (rc != SSH_OK) {
206  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to create temporary directory \"" << folder << "\": " << ssh_get_error(session));
207  exit(1);
208  }
209  destroy(scp);
210  return settings_ssh().tmpdir + folder + "/";
211  }
212 
213  /// Remove a (temporary) directory on the remote.
214  void removeDir(const std::string& folder) {
215  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Removing directory " << folder);
216  sftp_session sftp = getSFTP();
217  sftp_dir dir;
218  sftp_attributes attr;
219  int rc;
220  SSH_LOCKED(dir = sftp_opendir(sftp, folder.c_str()));
221  if (dir == nullptr) {
222  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to open directory \"" << folder << "\": " << ssh_get_error(session));
223  exit(1);
224  }
225  while (true) {
226  SSH_LOCKED(attr = sftp_readdir(sftp, dir));
227  if (attr == nullptr) break;
228  if (std::string(attr->name) == ".") continue;
229  if (std::string(attr->name) == "..") continue;
230  SSH_LOCKED(rc = sftp_unlink(sftp, (folder + std::string(attr->name)).c_str()));
231  if (rc != SSH_OK) {
232  BENCHMAX_LOG_WARN("benchmax.ssh", this << " Failed to unlink \"" << attr->name << "\": " << ssh_get_error(session));
233  }
234  sftp_attributes_free(attr);
235  }
236  SSH_LOCKED(rc = sftp_closedir(dir));
237  if (rc != SSH_OK) {
238  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to close directory \"" << folder << "\": " << ssh_get_error(session));
239  exit(1);
240  }
241  SSH_LOCKED(rc = sftp_rmdir(sftp, folder.c_str()));
242  if (rc != SSH_OK) {
243  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to remove directory \"" << folder << "\": " << ssh_get_error(session));
244  exit(1);
245  }
246  destroy(sftp);
247  }
248 
249  /// Upload a file to the remote.
250  bool uploadFile(const fs::path& local, const std::string& base, const std::string& remote, int mode = S_IRUSR | S_IWUSR) {
251  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Pushing file " << base << remote);
252  ssh_scp scp = getSCP(SSH_SCP_WRITE | SSH_SCP_RECURSIVE, base.c_str());
253  std::ifstream tmp(local.native(), std::ios::binary | std::ios::ate);
254  int rc;
255  SSH_LOCKED(rc = ssh_scp_push_file(scp, remote.c_str(), (std::size_t)tmp.tellg(), mode));
256  if (rc != SSH_OK) {
257  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to create remote file " << remote << " from local file " << local << ": " << ssh_get_error(session));
258  destroy(scp);
259  return false;
260  }
261  std::ifstream in(local.native(), std::ios::binary);
262  char buf[1024];
263  while (!in.eof()) {
264  in.read(buf, sizeof(buf));
265  std::size_t bytes = (std::size_t)in.gcount();
266  SSH_LOCKED(ssh_scp_write(scp, buf, bytes));
267  }
268  destroy(scp);
269  return true;
270  }
271 
272  /// Execute a command on the remote.
273  bool executeCommand(const std::string& cmd, BenchmarkResult& result) {
274  BENCHMAX_LOG_DEBUG("benchmax.ssh", this << " Executing command " << cmd);
275  ssh_channel channel = getChannel();
276  std::stringstream call;
277  call << "date +\"Start: %s%3N\" ; ";
278  call << "cd " << settings_ssh().basedir << " ; ";
279  auto timeout = (std::chrono::seconds(settings_benchmarks().limit_time) + std::chrono::seconds(3)).count();
280  if (settings_ssh().use_wallclock) call << "timeout " << timeout << "s ";
281  else call << "ulimit -S -t " << timeout << " && ";
282  call << "ulimit -S -v " << settings_benchmarks().limit_memory.kibi() << " && ";
283  call << "/usr/bin/time -v ";
284  call << cmd << " ; rc=$? ;";
285  call << "date +\"End: %s%3N\" ; exit $rc";
286  int rc;
287  SSH_LOCKED(rc = ssh_channel_request_exec(channel, call.str().c_str()));
288  if (rc != SSH_OK) {
289  BENCHMAX_LOG_ERROR("benchmax.ssh", this << " Failed to execute: " << cmd);
290  return false;
291  }
292  result.stdout = "";
293  result.stderr = "";
294  bool collectOutput = true;
295  char buf[512];
296  int n;
297  int eof = 0;
298  while (eof == 0) {
299  SSH_LOCKED(eof = ssh_channel_is_eof(channel));
300  SSH_LOCKED(n = ssh_channel_read_nonblocking(channel, buf, sizeof(buf), 0));
301  if (n > 0 && collectOutput) result.stdout += std::string(buf, std::size_t(n));
302  SSH_LOCKED(n = ssh_channel_read_nonblocking(channel, buf, sizeof(buf), 1));
303  if (n > 0 && collectOutput) result.stderr += std::string(buf, std::size_t(n));
304  collectOutput = (result.stdout.size() < 1000000) && (result.stderr.size() < 1000000);
305  std::this_thread::yield();
306  std::this_thread::sleep_for(std::chrono::milliseconds(10));
307  }
308  if (!collectOutput) {
309  result.additional.emplace("output", "truncated");
310  result.stdout = "";
311  result.stderr = "";
312  } else {
313  BENCHMAX_LOG_DEBUG("benchmax.ssh", "stdout = " << result.stdout);
314  BENCHMAX_LOG_DEBUG("benchmax.ssh", "stderr = " << result.stderr);
315  }
316  SSH_LOCKED(result.exitCode = ssh_channel_get_exit_status(channel));
317  result.time = parse_duration(result.stdout);
318  result.peak_memory_kbytes = parse_peak_memory(result.stderr);
319  destroy(channel);
320  return true;
321  }
322 };
323 
324 }
325 }
#define SSH_LOCKED(expr)
Definition: SSHConnection.h:15
#define BENCHMAX_LOG_DEBUG(channel, msg)
Log debug messages.
Definition: logging.h:55
#define BENCHMAX_LOG_WARN(channel, msg)
Log warnings.
Definition: logging.h:51
#define BENCHMAX_LOG_ERROR(channel, msg)
Log errors.
Definition: logging.h:49
A wrapper class that manages a single SSH connection as specified in a Node object (with all its chan...
Definition: SSHConnection.h:21
bool executeCommand(const std::string &cmd, BenchmarkResult &result)
Execute a command on the remote.
ssh_scp getSCP(int mode, const std::string &basedir)
Get a new SCP session for file transfer.
Definition: SSHConnection.h:72
sftp_session getSFTP()
Get a new SFTP session for file transfer.
Definition: SSHConnection.h:91
void destroy(sftp_session sftp)
Terminate a SFTP session.
ssh_session session
The SSH session handle.
Definition: SSHConnection.h:32
std::size_t maxChannels
The maximal number of channels allowed.
Definition: SSHConnection.h:28
SSHConnection(const Node &n)
Create a new connection for the given node.
std::size_t jobs() const
Current number of jobs.
Node node
The node object.
Definition: SSHConnection.h:24
std::string createTmpDir(const std::string &folder)
Create a temporary directory on the remote.
void destroy(ssh_scp scp)
Terminate a SCP session.
~SSHConnection()
Wait for all channels to terminate.
bool busy()
Check if all channels are busy.
void finishJob()
Decrease job counter.
std::chrono::milliseconds parse_duration(const std::string &output) const
Parse a duration from stdout.
Definition: SSHConnection.h:39
const Node & getNode() const
Return the node.
std::size_t curChannels
The number of currently active channels.
Definition: SSHConnection.h:26
ssh_channel getChannel()
Allocate a new channel from the current SSH session.
Definition: SSHConnection.h:53
void destroy(ssh_channel channel)
Terminate a SSH channel.
void newJob()
Increase job counter.
bool uploadFile(const fs::path &local, const std::string &base, const std::string &remote, int mode=S_IRUSR|S_IWUSR)
Upload a file to the remote.
bool jobFree()
Check if a new job could be started.
std::atomic< std::size_t > curJobs
The number of currently running jobs.
Definition: SSHConnection.h:30
int verbosity
Verbosity needed due to libssh interface.
Definition: SSHConnection.h:36
void removeDir(const std::string &folder)
Remove a (temporary) directory on the remote.
const auto & settings_ssh()
Return the SSH settings.
Definition: SSHSettings.h:27
const auto & settings_benchmarks()
Return the benchmark settings.
Definition: benchmarks.h:41
std::size_t parse_peak_memory(const std::string &output)
Definition: parsing.h:4
Results for a single benchmark run.
Specification of a compuation node for the SSH backend.
Definition: Node.h:7
std::string username
Username.
Definition: Node.h:11
int port
Port (default is 22)
Definition: Node.h:15
std::string hostname
Hostname to connect to.
Definition: Node.h:9
std::string password
Password (only used if public key authentication fails).
Definition: Node.h:13