5 #include <libssh/libssh.h>
6 #include <libssh/sftp.h>
13 #include "../../utils/parsing.h"
15 #define SSH_LOCKED(expr) { std::lock_guard<std::mutex> guard(mutex); expr; }
40 std::regex re(
"Start: ([0-9]+).*End: ([0-9]+)", std::regex::extended);
42 if (std::regex_search(output, m, re)) {
44 std::size_t start = std::stoull(m[1].str(), &p);
45 std::size_t end = std::stoull(m[2].str(), &p);
46 return std::chrono::milliseconds(end - start);
48 return std::chrono::milliseconds(0);
54 std::lock_guard<std::mutex> guard(
mutex);
57 ssh_channel channel = ssh_channel_new(
session);
58 if (channel ==
nullptr) {
62 int rc = ssh_channel_open_session(channel);
72 ssh_scp
getSCP(
int mode,
const std::string& basedir) {
73 std::lock_guard<std::mutex> guard(
mutex);
76 ssh_scp scp = ssh_scp_new(
session, mode, basedir.c_str());
81 int rc = ssh_scp_init(scp);
92 std::lock_guard<std::mutex> guard(
mutex);
95 sftp_session sftp = sftp_new(
session);
96 if (sftp ==
nullptr) {
100 int rc = sftp_init(sftp);
111 std::lock_guard<std::mutex> guard(
mutex);
113 ssh_channel_close(channel);
114 ssh_channel_free(channel);
120 std::lock_guard<std::mutex> guard(
mutex);
129 std::lock_guard<std::mutex> guard(
mutex);
154 rc = ssh_userauth_publickey_auto(
session,
nullptr,
nullptr);
155 if (rc != SSH_AUTH_SUCCESS) {
157 if (rc != SSH_AUTH_SUCCESS) {
167 std::this_thread::sleep_for(std::chrono::milliseconds(100));
202 ssh_scp scp =
getSCP(SSH_SCP_WRITE | SSH_SCP_RECURSIVE,
settings_ssh().tmpdir.c_str());
204 SSH_LOCKED(rc = ssh_scp_push_directory(scp, folder.c_str(), S_IRWXU));
206 BENCHMAX_LOG_ERROR(
"benchmax.ssh",
this <<
" Failed to create temporary directory \"" << folder <<
"\": " << ssh_get_error(
session));
218 sftp_attributes attr;
220 SSH_LOCKED(dir = sftp_opendir(sftp, folder.c_str()));
221 if (dir ==
nullptr) {
222 BENCHMAX_LOG_ERROR(
"benchmax.ssh",
this <<
" Failed to open directory \"" << folder <<
"\": " << ssh_get_error(
session));
227 if (attr ==
nullptr)
break;
228 if (std::string(attr->name) ==
".")
continue;
229 if (std::string(attr->name) ==
"..")
continue;
230 SSH_LOCKED(rc = sftp_unlink(sftp, (folder + std::string(attr->name)).c_str()));
232 BENCHMAX_LOG_WARN(
"benchmax.ssh",
this <<
" Failed to unlink \"" << attr->name <<
"\": " << ssh_get_error(
session));
234 sftp_attributes_free(attr);
238 BENCHMAX_LOG_ERROR(
"benchmax.ssh",
this <<
" Failed to close directory \"" << folder <<
"\": " << ssh_get_error(
session));
241 SSH_LOCKED(rc = sftp_rmdir(sftp, folder.c_str()));
243 BENCHMAX_LOG_ERROR(
"benchmax.ssh",
this <<
" Failed to remove directory \"" << folder <<
"\": " << ssh_get_error(
session));
250 bool uploadFile(
const fs::path& local,
const std::string& base,
const std::string& remote,
int mode = S_IRUSR | S_IWUSR) {
252 ssh_scp scp =
getSCP(SSH_SCP_WRITE | SSH_SCP_RECURSIVE, base.c_str());
253 std::ifstream tmp(local.native(), std::ios::binary | std::ios::ate);
255 SSH_LOCKED(rc = ssh_scp_push_file(scp, remote.c_str(), (std::size_t)tmp.tellg(), mode));
257 BENCHMAX_LOG_ERROR(
"benchmax.ssh",
this <<
" Failed to create remote file " << remote <<
" from local file " << local <<
": " << ssh_get_error(
session));
261 std::ifstream in(local.native(), std::ios::binary);
264 in.read(buf,
sizeof(buf));
265 std::size_t bytes = (std::size_t)in.gcount();
276 std::stringstream call;
277 call <<
"date +\"Start: %s%3N\" ; ";
279 auto timeout = (std::chrono::seconds(
settings_benchmarks().limit_time) + std::chrono::seconds(3)).count();
280 if (
settings_ssh().use_wallclock) call <<
"timeout " << timeout <<
"s ";
281 else call <<
"ulimit -S -t " << timeout <<
" && ";
283 call <<
"/usr/bin/time -v ";
284 call << cmd <<
" ; rc=$? ;";
285 call <<
"date +\"End: %s%3N\" ; exit $rc";
287 SSH_LOCKED(rc = ssh_channel_request_exec(channel, call.str().c_str()));
294 bool collectOutput =
true;
299 SSH_LOCKED(eof = ssh_channel_is_eof(channel));
300 SSH_LOCKED(n = ssh_channel_read_nonblocking(channel, buf,
sizeof(buf), 0));
301 if (n > 0 && collectOutput)
result.stdout += std::string(buf, std::size_t(n));
302 SSH_LOCKED(n = ssh_channel_read_nonblocking(channel, buf,
sizeof(buf), 1));
303 if (n > 0 && collectOutput)
result.stderr += std::string(buf, std::size_t(n));
304 collectOutput = (
result.stdout.size() < 1000000) && (
result.stderr.size() < 1000000);
305 std::this_thread::yield();
306 std::this_thread::sleep_for(std::chrono::milliseconds(10));
308 if (!collectOutput) {
309 result.additional.emplace(
"output",
"truncated");
#define BENCHMAX_LOG_DEBUG(channel, msg)
Log debug messages.
#define BENCHMAX_LOG_WARN(channel, msg)
Log warnings.
#define BENCHMAX_LOG_ERROR(channel, msg)
Log errors.
A wrapper class that manages a single SSH connection as specified in a Node object (with all its chan...
bool executeCommand(const std::string &cmd, BenchmarkResult &result)
Execute a command on the remote.
ssh_scp getSCP(int mode, const std::string &basedir)
Get a new SCP session for file transfer.
sftp_session getSFTP()
Get a new SFTP session for file transfer.
void destroy(sftp_session sftp)
Terminate a SFTP session.
ssh_session session
The SSH session handle.
std::size_t maxChannels
The maximal number of channels allowed.
SSHConnection(const Node &n)
Create a new connection for the given node.
std::size_t jobs() const
Current number of jobs.
Node node
The node object.
std::string createTmpDir(const std::string &folder)
Create a temporary directory on the remote.
void destroy(ssh_scp scp)
Terminate a SCP session.
~SSHConnection()
Wait for all channels to terminate.
bool busy()
Check if all channels are busy.
void finishJob()
Decrease job counter.
std::chrono::milliseconds parse_duration(const std::string &output) const
Parse a duration from stdout.
const Node & getNode() const
Return the node.
std::size_t curChannels
The number of currently active channels.
ssh_channel getChannel()
Allocate a new channel from the current SSH session.
void destroy(ssh_channel channel)
Terminate a SSH channel.
void newJob()
Increase job counter.
bool uploadFile(const fs::path &local, const std::string &base, const std::string &remote, int mode=S_IRUSR|S_IWUSR)
Upload a file to the remote.
bool jobFree()
Check if a new job could be started.
std::atomic< std::size_t > curJobs
The number of currently running jobs.
int verbosity
Verbosity needed due to libssh interface.
void removeDir(const std::string &folder)
Remove a (temporary) directory on the remote.
const auto & settings_ssh()
Return the SSH settings.
const auto & settings_benchmarks()
Return the benchmark settings.
std::size_t parse_peak_memory(const std::string &output)
Results for a single benchmark run.
Specification of a compuation node for the SSH backend.
std::string username
Username.
int port
Port (default is 22)
std::string hostname
Hostname to connect to.
std::string password
Password (only used if public key authentication fails).