Source code for schrodinger.test.jobserver

"""
Convenience functions for dealing with job server
    (starting, killing, writing config file).
"""
import datetime
import getpass
import json
import logging
import os
import posixpath
import shutil
import socket
import sys
import tempfile
import time
from collections import namedtuple
from contextlib import contextmanager
from unittest.mock import patch
from typing import Optional

import paramiko
import yaml
import psutil
import backoff
import re

from schrodinger.application.licensing.licadmin import hostname_is_local
from schrodinger.job import jobcontrol
from schrodinger.job.server import jsc
from schrodinger.utils import fileutils
from schrodinger.utils import sshconfig
from schrodinger.utils import subprocess

SCHRODINGER_JOBSERVER_CONFIG_FILE = "SCHRODINGER_JOBSERVER_CONFIG_FILE"
SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY = "SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY"

ServerInfo = namedtuple('ServerInfo', [
    'hostname', 'schrodinger', 'job_server_directory', 'username', 'pid',
    'job_server_port'
])

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)

LINUX_PATH = "/"

INI_CONTENTS = """[program:{proc_id}]
command = {cmd}
redirect_stderr = true
stdout_logfile = {log}
# Only give the process a second to start before declaring it failed.
startsecs = 1
# Don't retry if it doesn't start.
startretries = 0
"""


[docs]def get_user(hostname): # case for mac machines which has this account to use 'buildbot' if "buildbot" in getpass.getuser(): return "buildbot" return getpass.getuser()
[docs]def get_supervisord_dir(ssh: paramiko.client.SSHClient, hostname: str) -> str: """ Gets the remote directory to start or interact with the supervisord. :param ssh: ssh client connection established with the hostname. :param hostname: name of the host where supervisord is being setup. :returns: the remote supervisord directory. """ cmd = ["echo", "~"] supervisord_dir = run_command(ssh, cmd, login=True) return f"{supervisord_dir.strip()}/supervisord-{hostname}"
[docs]def supv(ssh: paramiko.client.SSHClient, hostname: str): """ Gets the command to work with remote supervisord. :param ssh: ssh client connection established with the hostname. :param hostname: name of the host where supervisord is setup. :returns: the supervisorctl command to interact with remote supervisord. """ base_supervisord_dir = get_supervisord_dir(ssh, hostname) cmd = [f"{base_supervisord_dir}/supervisord/venv/bin/supervisorctl"] cmd.extend(["-c", f"{base_supervisord_dir}/supervisord/supervisord.conf"]) return cmd
[docs]def get_ini_file(ssh: paramiko.client.SSHClient, hostname: str, proc_id: str): """ Returns the complete path of the configuration file to manage the process under supervisord in the remote host. :param: ssh: ssh client connection established with the hostname :param: hostname: name of the host where supervisord is setup. :param: proc_id: program name to use as the basename of the configuration file. :returns: full path of the configuration file to manage the process under supervisord in the remote host. """ return get_supervisord_dir(ssh, hostname) + f"/supervisord/conf/{proc_id}.ini"
[docs]def get_log_dir(ssh: paramiko.client.SSHClient, hostname: str): """ Returns the complete path of the directory to access log files of processes being managed by supervisord in the remote host. :param: ssh: ssh client connection established with the hostname :param: hostname: name of the host where supervisord is setup. :returns: full path of the log directory. """ return get_supervisord_dir(ssh, hostname) + "/supervisord/logs"
[docs]def get_log_location(ssh, hostname, proc_id): """ Returns the complete path of the log file corresponding to the managed process under supervisord. :param: ssh: ssh client connection established with the hostname :param: hostname: name of the host where supervisord is setup. :param: proc_id: program name to use as the basename of the log file. :returns: full path of the log file corresponding to the process. """ return os.path.join(get_log_dir(ssh, hostname), f"{proc_id}.log")
[docs]def get_tls_config(cert_dir): """ :param cert_dir: path to wildcard certificates :type cert_dir: str :rtype: dict :returns: dict for webserver tls config """ return dict(certificate_key_file=cert_dir + "/wild.schrodinger.com.key", certificate_chain_file=cert_dir + "/wild.schrodinger.com.pem")
[docs]def get_job_server_directory(basedir: str, username: str, hostname: str) -> str: """ Returns a unique path to a directory to store job server data in. :param basedir: prefix of path, usually tmpdir on remote :param hostname: hostname on which to create the job server directory :rparam: Path to a directory """ tmp_base_dir = f"{basedir}/{username}" timestamp = datetime.datetime.now().strftime("%Y%m%d") prefix = f"jobserver.{timestamp}." if hostname_is_local(hostname): os.makedirs(tmp_base_dir, exist_ok=True) tmpdir = tempfile.mkdtemp(prefix=prefix, dir=tmp_base_dir) return tmpdir else: with get_ssh_client(hostname, username) as ssh: # Ensure the parent directory exists mkdir_all_cmd = ["mkdir", "-p", tmp_base_dir] run_command(ssh, mkdir_all_cmd) # Remote servers are all currently linux so should have the "mktemp" command available. mktemp_cmd = [ "mktemp", "-d", f"--tmpdir={tmp_base_dir}", "-t", f"{prefix}XXXXXXXXX" ] tmpdir = run_command(ssh, mktemp_cmd) return tmpdir.strip()
[docs]def job_server_exe(schrodinger): # Use / as pathsep because this constructs local and remote paths return LINUX_PATH.join( [schrodinger, "internal", "bin", "job_server", "jobserverd"])
[docs]def job_server_setup_exe(schrodinger): return LINUX_PATH.join( [schrodinger, "internal", "bin", "job_server", "jsc_admin"])
[docs]def run(schrodinger): return f"{schrodinger}/run"
[docs]def get_job_server_config(job_server_directory): """ Return a path for a server-specific job server config in the job server directory. """ return posixpath.join(job_server_directory, "jobserver.test.config")
[docs]def get_queue_type(hostname: str) -> str: """ Return a queue type in the format for jsc_admin for a given hostname. """ hostname = hostname.lower() if "slurm" in hostname: return "Slurm" elif "torque" in hostname: return "Torque" elif "lsf" in hostname: return "LSF" elif "pbs" in hostname: return "PBS" return "UGE"
[docs]def setup_host(hostname, schrodinger, job_server_directory, username, serve_queue_jobs): """ Set up authentication in a new directory. """ cmd = [ run(schrodinger), job_server_setup_exe(schrodinger), ] # Create a new job_server config directory on the host. Includes # authentication information setup_cmd = cmd + [ 'setup-server', '-host', hostname, "-dir", job_server_directory ] if serve_queue_jobs: setup_cmd.extend(["-queue", get_queue_type(hostname)]) else: setup_cmd.extend(["-queue", "local"]) with get_ssh_client(hostname, username) as ssh: run_command(ssh, setup_cmd)
[docs]def setup_supervisord(hostname, username): """ Setup supervisord to start jobserver in given hostname""" with get_ssh_client(hostname, username) as ssh: base_supervisord_dir = get_supervisord_dir(ssh, hostname) venv_dir = f"{base_supervisord_dir}/supervisord/venv" try: run_command(ssh, ['ls', venv_dir]) except RuntimeError: try: run_command(ssh, ["python3", "-mvenv", venv_dir]) except RuntimeError: run_command(ssh, [ "/utils/bin/python2.7", "/home/buildbot/scripts/virtualenv.py", venv_dir ]) supervisorctl_path = f"{venv_dir}/bin/supervisorctl" try: run_command(ssh, ['ls', supervisorctl_path]) except RuntimeError: run_command( ssh, ["bash", "-lc", f"{venv_dir}/bin/pip install supervisor"]) else: logger.info("supervisorctl script exists - " f"{supervisorctl_path}; skipping supervisord setup") run_supervisord(ssh, hostname) return with ssh.open_sftp() as ftp: with ftp.file( f"{base_supervisord_dir}/supervisord/supervisord.conf", "w") as fh: fh.write(f""" [unix_http_server] file={base_supervisord_dir}/supervisord/supervisor.sock [supervisord] logfile={base_supervisord_dir}/supervisord/supervisord.log pidfile={base_supervisord_dir}/supervisord/supervisord.pid [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix://{base_supervisord_dir}/supervisord/supervisor.sock ; use a unix:// URL for a unix socket [include] files = conf/*.ini """) run_supervisord(ssh, hostname)
[docs]def setup_server(hostname, schrodinger, job_server_directory, username, append=True, licensing=False, use_certs=False, certs_dir=None, use_shared_supervisors=False, use_ldap=True, use_socket_auth=True, hosts_yml_path: Optional[str] = None): """ Setup new server on arbitrary ports. :param hostname: name of host to set up jobserver :type hostname: str :param schrodinger: path to SCHRODINGER :type schrodinger: str :param job_server_directory: base directory for job server :type job_server_directory: str :param username: username to use on hostname :type username: str :param append: If True, add server config to jobserver.config, If False, overwrite jobserver.config :type append: bool :param licensing: If True, pass licensing check params to queued jobs :type licensing: bool :param use_certs: If True, use wildcard certificates in standard internal locations :type user_certs: bool :param bool use_shared_supervisors: Use shared supervisor executables :param bool use_ldap: Enable LDAP authentication :param bool use_socket_auth: Enable unix socket authentication :param hosts_yml_path: transfer a local hosts.yml file to the configured jobserver """ # For local jobserver, this cannot be a localhost for linux and need to be # proper address that will map to the machine. if sys.platform.startswith("linux") and hostname == "localhost": hostname = socket.getfqdn() serve_queue_jobs = not hostname_is_local(hostname) if serve_queue_jobs: setup_host(hostname, schrodinger, job_server_directory, username, serve_queue_jobs) setup_supervisord(hostname, username) if use_certs or certs_dir: with get_ssh_client(hostname, username) as ssh: try: if certs_dir is None: base_supervisord_dir = get_supervisord_dir( ssh, hostname) certs_dir = f"{base_supervisord_dir}/supervisord/cert" run_command(ssh, ["ls", f"{certs_dir}/wild.schrodinger.com.pem"]) except RuntimeError: raise RuntimeError( f"You need a wildcard cert on {hostname} before " "setting up jobserver under supervisord. You can " "specify the directory with --certs-dir.") if hosts_yml_path is not None: if not os.path.exists(hosts_yml_path): raise FileNotFoundError( f"No file found for specified hosts_yml_path: {hosts_yml_path}" ) transfer_hosts_yml(hostname, username, job_server_directory, hosts_yml_path) modify_jobserver_yml(hostname, job_server_directory, username, licensing, use_shared_supervisors, certs_dir, use_ldap, use_socket_auth) server = start_server(hostname, schrodinger, job_server_directory, username, serve_queue_jobs=serve_queue_jobs) if serve_queue_jobs: create_job_server_config(hostname, username, job_server_directory) return server
[docs]def run_command(ssh, command, login=False): """ Runs a command. :param ssh: a paramiko.SSHClient with an established connection to the remote machine. If ssh is None, the command will be invoked by subprocess.run :type ssh: paramiko.SSHClient :param command: The command to run as a list of string arguments :type command: list[str] :param login: If True, command requries login shell for ssh :type login: bool :return: The output of the executed command over ssh; None if local using subprocess :rtype: str, or None This function is duplicated in server_management.py for backward compatibility. """ logger.info(f"Running {command}") if not ssh: subprocess.run(command, universal_newlines=True, check=True) return env = {"PYTHONIOENCODING": "utf-8"} command = subprocess.list2cmdline(command) if login: command = f"bash --login -c '{command}'" _, out, err = ssh.exec_command(command, environment=env) output = out.readlines() error = err.readlines() logger.info(f"Stdout: {output}") exit_status = out.channel.recv_exit_status() if exit_status: raise RuntimeError( f"{command} exited with {exit_status}; subprocess stderr: {error}") elif len(error) != 0: logger.info(f"Stderr: {error}") return '\n'.join(output)
[docs]def setup_log_dir(log_dir, ssh=None): if ssh: run_command(ssh, ["mkdir", "-p", log_dir]) else: if not os.path.exists(log_dir): os.makedirs(log_dir)
[docs]def write_ini_file(ini_file, ini_contents, ssh=None): """ Write the configuration file to manage jobserver """ logger.info(f"Writing {ini_file}") dirname = os.path.dirname(ini_file) if ssh: run_command(ssh, ["mkdir", "-p", dirname]) with ssh.open_sftp() as ftp: with ftp.file(ini_file, "w") as fh: fh.write(ini_contents) else: if not os.path.exists(dirname): os.makedirs(dirname) with open(ini_file, "w") as fh: fh.write(ini_contents)
[docs]def run_supervisord(ssh: paramiko.client.SSHClient, hostname: str): """ Run the daemon with the configuration in the supervisord directory. :param ssh: a paramiko.SSHClient with an established connection to the remote machine. :param hostname: name of the host where supervisord is being setup. :raises RuntimeError: skip raising the exception if another program is already listening on the port. """ # This is to pick path for queue commands. base_supervisord_dir = get_supervisord_dir(ssh, hostname) try: run_command(ssh, [ "bash", "-lc", f'"{base_supervisord_dir}/supervisord/venv/bin/supervisord" -c "{base_supervisord_dir}/supervisord/supervisord.conf"' ]) except RuntimeError as e: if not "Another program is already listening" in str(e): raise
[docs]def monitor_job_server_with_supervisord(cmd, hostname, job_server_directory, username): """ monitor the given job server command under supervisord """ proc_id = os.path.basename(job_server_directory) with get_ssh_client(hostname, username) as ssh: log = get_log_location(ssh, hostname, proc_id) conf = get_ini_file(ssh, hostname, proc_id) ini_contents = INI_CONTENTS.format(proc_id=proc_id, cmd=" ".join(cmd), log=log) setup_log_dir(get_log_dir(ssh, hostname), ssh) write_ini_file(conf, ini_contents, ssh) try: update_supervisord(ssh, hostname, proc_id) logger.info(f"Show status of newly started program {proc_id}") try: run_command(ssh, supv(ssh, hostname) + ["status", proc_id]) except RuntimeError: logger.info("Supervisord status command failed with an error; " "attempting to tail the process logs.") run_command(ssh, supv(ssh, hostname) + ["tail", proc_id]) raise except: # cleanup the ini file on exception, so the next run doesn't pick this one. run_command(ssh, ["rm", "-rf", conf]) raise return get_ports_from_file(job_server_directory, ssh=ssh)
@contextmanager def _get_file_handle(ssh, log_filename): """ :param ssh: open ssh client, or None to read locally :type ssh: paramiko.SSHClient :param log_filename: path of file to open :type log_filename: str :rtype: yields readable file-like object """ if ssh: with ssh.open_sftp() as ftp: with ftp.file(log_filename, "r") as fh: yield fh else: with open(log_filename) as fh: yield fh
[docs]def get_ports_from_file(job_server_directory, ssh=None): """ Parse the output of jobserver to get the pid and ports. :param log_filename: path of file to open :type log_filename: str :param ssh: open ssh client, or None to read locally :type ssh: paramiko.SSHClient :rtype: tuple(int,int,int,int) """ filename = job_server_directory + "/runstate" host = "localhost" if ssh: _, out, _ = ssh.exec_command("hostname") host = out.read().strip() logger.info(f"Reading {filename} from {host}") current_time = 0 while True: try: with _get_file_handle(ssh, filename) as fh: return json.loads(fh.read()) except FileNotFoundError: # wait for successful read for 60s if current_time < 60: sleep_time = 2 logger.info( f"Failed to read ports from {filename} from {host}, " f"retry in {sleep_time}s") time.sleep(sleep_time) current_time += sleep_time continue raise
[docs]def start_server(hostname, schrodinger, job_server_directory, username, serve_queue_jobs=False): """ Start a job_server in the `schrodinger` directory on the `hostname` provided, using the `job_server_directory` as its local storage. """ if not serve_queue_jobs: return start_localhost_server(hostname, schrodinger, job_server_directory) cmd = [ run(schrodinger), job_server_exe(schrodinger), "--dir", job_server_directory, "--with-low-performance-db", ] runstate = monitor_job_server_with_supervisord(cmd, hostname, job_server_directory, username) return ServerInfo(hostname=hostname, schrodinger=schrodinger, job_server_directory=job_server_directory, username=username, pid=runstate['pid'], job_server_port=runstate['job_server_port'])
[docs]def create_job_server_config(hostname, username, job_server_directory): """ Copy the user_authentication that is created automatically at server setup from the remote server machine to the local launch host and dump it into a job server config file. :param hostname: name of hostname where job server config is located :type hostname: str """ with get_ssh_client(hostname, username) as ssh: with ssh.open_sftp() as ftp: runstate_file = posixpath.join(job_server_directory, "runstate") with ftp.file(runstate_file, "r") as fh: runstate = json.loads(fh.read()) with ftp.file(runstate["user_certificate"], "r") as fh: auth = json.loads( str(fh.read(), encoding="ascii", errors="strict")) jobport = runstate["job_server_port"] job_server_config_path = os.environ[SCHRODINGER_JOBSERVER_CONFIG_FILE] if os.path.exists(job_server_config_path): if os.path.getsize(job_server_config_path) > 0: with open(job_server_config_path) as fh: job_server_config = json.loads(fh.read()) else: job_server_config = [] else: job_server_config = [] for server_config in job_server_config[:]: if hostname == server_config.get("hostname", ""): job_server_config.remove(server_config) job_server_config.append({ "hostname": hostname, "jobport": jobport, "auth": auth }) with open(job_server_config_path, "w") as fh: json.dump(job_server_config, fh)
[docs]def modify_jobserver_yml(hostname, job_server_directory, username, licensing, use_shared_supervisors, certs_dir=None, use_ldap=True, use_socket_auth=True): """ Modify the jobserver config to respect licensing and web server certificates for a multi-user queue server. :param hostname: name of hostname where job server config is located :type hostname: str :param job_server_directory: path to job server on hostname :type get_job_server_directory: str :param licensing: If True, enable license checking on job_server :type licensing: bool :param certs_dir: If provided, use wildcard certificates from that directory. :type certs_dir: str :param bool use_shared_supervisors: Use shared supervisor executables :param bool use_ldap: Enable LDAP authentication :param bool use_socket_auth: Enable unix socket authentication """ config_directory = LINUX_PATH.join([job_server_directory, "config"]) jobserver_yml = LINUX_PATH.join([config_directory, "jobserver.yml"]) jobserver_yml_orig = LINUX_PATH.join( [config_directory, "jobserver.yml.orig"]) with get_ssh_client(hostname, username) as ssh: ssh.exec_command(f"cp {jobserver_yml} {jobserver_yml_orig}") with ssh.open_sftp() as ftp: with ftp.file(jobserver_yml_orig, "r") as fh: config = yaml.load(fh.read(), yaml.SafeLoader) config["server_mode"] = "multi-user" config["job_server"]["port"] = 0 config["file_store"]["port"] = 0 config["web_server"]["port"] = 0 config["job_server"]["use_local_auth"] = use_socket_auth if use_ldap: config["ldap_auth"]["addr"] = "ldap1.schrodinger.com:636" config["ldap_auth"][ "bind_dn_template"] = "uid={{.User}},ou=people,dc=schrodinger,dc=com" config["ldap_auth"]["insecure_tls_skip_verify"] = True else: # This field is set by default in jsc_admin setup-server for # internal multi-user deployments so needs to be explicitly disabled. config["ldap_auth"]["addr"] = "" config["job_server"]["license_checking"] = bool(licensing) if certs_dir: config["web_server"]["tls"] = get_tls_config(certs_dir) if use_shared_supervisors: config["job_server"][ "supervisor_directory"] = "/nfs/working/builds/job_server_execs" if hostname.startswith("pdxgpu"): config["job_server"][ "schrodinger_hosts_hostname"] = "pdxgpusub1.schrodinger.com" # allow deployment of test job servers on test systems which don't # have enough fds config["file_descriptors"] = 1024 with ftp.file(jobserver_yml, "w") as fh: fh.write(yaml.dump(config, default_flow_style=False))
[docs]def transfer_hosts_yml(hostname: str, username: str, job_server_directory: str, local_hosts_yml_path: str): """ Use the provided hosts.yml file to configure jobserver host entries, instead of reading from schrodinger.hosts installations. """ config_directory = LINUX_PATH.join([job_server_directory, "config"]) remote_hosts_yml_path = LINUX_PATH.join([config_directory, "hosts.yml"]) with get_ssh_client(hostname, username) as ssh: with ssh.open_sftp() as ftp: ftp.put(local_hosts_yml_path, remote_hosts_yml_path)
[docs]def copy_server_log(server, destdir): """ Copy jobserverd log file """ if hostname_is_local(server.hostname): log_directory = os.path.join(server.job_server_directory, "logs") for file in os.listdir(log_directory): shutil.copy(os.path.join(log_directory, file), destdir) return with get_ssh_client(server.hostname, server.username) as sshclient: with sshclient.open_sftp() as sftp: log_directory = server.job_server_directory + "/logs" for filename in sftp.listdir(log_directory): sftp.get(f'{log_directory}/{filename}', f'{destdir}/{filename}')
[docs]def clean_localhost_server(server): """ Clean localhost jobserver """ return
[docs]def clean_remotehost_server(ssh, server): """ Clean remotehost jobserver """ # clean server files in supervisord proc_id = os.path.basename(server.job_server_directory) log_location = get_log_location(ssh, server.hostname, proc_id) ini_location = get_ini_file(ssh, server.hostname, proc_id) for filename in (ini_location, log_location): run_command(ssh, ["rm", "-rf", filename]) update_supervisord(ssh, server.hostname, proc_id) return
[docs]def cancel_active_jobs(server: ServerInfo): """ Cancels all active jobs managed by the given localhost server. And waits for the job supervisor processes to exit before the function returns. """ @backoff.on_predicate(backoff.expo, max_time=128) def wait_for_canceled_jobs(env): try: subprocess.check_output([jsc(server.schrodinger), "list"], stderr=subprocess.STDOUT, env=env) except subprocess.CalledProcessError as e: if 'No active jobs were found' in str(e.output): # This is to allow job_supervisor(s) to wrap up after its repective job is marked as # as completed. time.sleep(120) return True raise env = os.environ.copy() env["SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY"] = server.job_server_directory env["SCHRODINGER_JOBSERVER_CONFIG_FILE"] = "not-existing" output = subprocess.check_output([jsc(server.schrodinger), "cancel", "all"], env=env) matches = re.findall("canceling job .*", str(output)) # If there are active jobs that we initiated kill request, wait for the jobs to cancel # before we return. if len(matches) > 0 and not wait_for_canceled_jobs(env): raise RuntimeError( f"timed out in canceling localhost jobs managed by the server {server}" )
[docs]def kill_server(server: ServerInfo, cancel_jobs: Optional[bool] = False): """ Kill the job_server on `server.hostname` that is using `server.job_server_directory` as its local storage. :param server: The metadata maintained about the managed jobserver. :param cancel_jobs: cancel the active jobs before killing the jobserver. """ if hostname_is_local(server.hostname): if cancel_jobs: cancel_active_jobs(server) stop_localhost_server(server) if sys.platform != 'win32': clean_localhost_server(server) return with get_ssh_client(server.hostname, server.username) as ssh: proc_id = os.path.basename(server.job_server_directory) run_command(ssh, supv(ssh, server.hostname) + ["stop", proc_id]) clean_remotehost_server(ssh, server) return
[docs]def clean_server_dir(server): """ Remove `job_server_directory` on `hostname`. """ if hostname_is_local(server.hostname): try: # NOTE (JOBCON-7529): force_rmtree retries removing files for up to # 1s. We may need to increase this timeout. fileutils.force_rmtree(server.job_server_directory) except OSError: # Log all job processes for now; we can try to limit this to # processes started from the job server directory if this ends up # being too spammy. log_job_processes() raise return cmd = ["rm", "-r", server.job_server_directory] with get_ssh_client(server.hostname, server.username) as ssh: run_command(ssh, cmd)
[docs]def log_job_processes(): """ Log information about running jobserverd/job_supervisor/job_supervisord processes, which may be preventing removal of the job server directory. """ job_exes = ["jobserverd", "job_supervisor", "job_supervisord"] # There may be multiple job_supervisor processes running because # of parallel STU test execution; print them all. found = False for proc in psutil.process_iter(attrs=["cmdline"]): name = proc.name().lower() if any(exe in name for exe in job_exes): logger.info( f"found {name} running with command line {proc.info['cmdline']}" ) found = True if not found: logger.info("did not find any running job server/supervisor processes")
[docs]def update_supervisord(ssh: paramiko.client.SSHClient, hostname: str, proc_id: str): """ Re-read the configuration file corresponding to the given proces group name and restarts the supervised program if the configuration has changed on disk. :param ssh: ssh client connection established with the hostname. :param hostname: name of the host where supervisord is setup to monitor the given supervised program. :param proc_id: group name of the supervised program. """ run_command(ssh, supv(ssh, hostname) + ["update", proc_id])
[docs]def start_localhost_server(hostname, schrodinger, job_server_directory): run_command( None, [jsc(schrodinger), "local-server-start", "-dir", job_server_directory], ) username = get_username_from_host_entry("localhost") runstate = get_ports_from_file(job_server_directory, ssh=None) return ServerInfo( hostname=hostname, schrodinger=schrodinger, job_server_directory=job_server_directory, username=username, pid=runstate['pid'], job_server_port=runstate['job_server_port'], )
[docs]def stop_localhost_server(server): """ Stop the jobserver running in localhost """ run_command( None, [ jsc(server.schrodinger), "local-server-stop", "-dir", server.job_server_directory ], ) return
@contextmanager def _get_temp_ssh_key(): if sys.platform != "win32": yield None else: with tempfile.NamedTemporaryFile(delete=False) as fh: temp_file = fh.name try: ppk_file, _ = sshconfig.find_key_pair() sshconfig._convert_ppk_openssh(ppk_file, temp_file) yield temp_file finally: os.remove(temp_file)
[docs]@contextmanager def get_ssh_client(hostname, username): """ Return ssh client for hostname. Closes ssh connection automatically. :param hostname: name of remote host :type hostname: str """ if hostname_is_local(hostname): yield else: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) with _get_temp_ssh_key() as key_filename: logger.info(f"Connecting to {username}@{hostname}") ssh.connect(hostname, username=username, key_filename=key_filename) yield ssh logger.info(f"Disconnecting from {username}@{hostname}") ssh.close()
[docs]def get_username_from_host_entry(host_entry_name): """ Return username from host_entry_name to find the correct remote user. This user name is useful for directory creation, and remote authentication. :param host_entry_name: name of host entry (localhost, bolt-gpu) :type host_entry_name: str :rtype: str """ host_entry = jobcontrol.get_host(host_entry_name) if host_entry.user: return host_entry.user username = getpass.getuser() if "+" in username: # msys2 username on domain return username.split("+")[-1] return username
[docs]@contextmanager def schrodinger_jobserver_config(): """ Set job server configuration to a temporary location. Cleans up the file after use. This function is duplicated in server_management.py for backwards compatibility. """ tmpfile, filepath = tempfile.mkstemp(prefix="") os.close(tmpfile) with patch.dict(os.environ, {SCHRODINGER_JOBSERVER_CONFIG_FILE: filepath}): try: yield finally: os.remove(filepath)