Source code for schrodinger.utils.installation_check

import argparse
import difflib
import logging
import os
import re
import shutil
import socket
import sys
import tempfile
import time
from contextlib import contextmanager
from typing import List
from typing import Optional
from typing import TextIO

import distro

import pymmlibs

pymmlibs.mmerr_set_mmlibs()
# isort: split

from schrodinger.job import jobcontrol
from schrodinger.job import queue

from . import log
from . import mmutil
from . import subprocess
from .env import swap_ld_library_path

SCHRODINGER = os.environ["SCHRODINGER"]
QUEUES_DIR = os.path.join(SCHRODINGER, "queues")
EXE = ""
if sys.platform == "win32":
    EXE = ".exe"

# TODO: Probably add a context manager to set up logging and remove it on exception
logger = log.get_output_logger("inst_check")

dir_warning_template = "* WARNING: directory '{}' does not exist\n"
file_warning_template = "* WARNING: file '{}' does not exist\n"

LOGFILE_CONFIGURED = False
LOGGERS = (logger, queue.logger, queue.syslog)


def _add_handler(handler):
    for logger_ in LOGGERS:
        logger_.addHandler(handler)


def _remove_handler(handler):
    for logger_ in LOGGERS:
        logger_.removeHandler(handler)
    handler.close()


[docs]@contextmanager def ensure_logfile(output_dir=None): """ Context manager for adding a logfile to LOGGERS. Ensures that multiple nested entries into the context manager only add one logfile. """ global LOGFILE_CONFIGURED if LOGFILE_CONFIGURED: yield return if output_dir is None: output_dir = os.getcwd() logfile = os.path.join(output_dir, "inst-check.log") handler = logging.FileHandler(logfile) LOGFILE_CONFIGURED = True _add_handler(handler) try: yield finally: _remove_handler(handler) LOGFILE_CONFIGURED = False
[docs]def main() -> int: """ Run installation check. :return: exit code """ # In test jobs, the waiting for a reply from jserver (in particular, to a JPROXYPORT message) # is 5 minutes. Set the test job timeout to a bigger value, to make sure we collect the output. test_job_timeout = 360 args = parse_args() shorthost = socket.gethostname().split(".")[0] timestamp = format(int(time.time()), 'x') cwd = os.getcwd() abs_inst_check_dir = tempfile.mkdtemp(prefix=f'instcheck-{shorthost}-', dir=cwd) inst_check_dir = os.path.relpath(abs_inst_check_dir, cwd) logger.info( f"Results will be gathered in the directory {abs_inst_check_dir}") with ensure_logfile(abs_inst_check_dir): log_banner(logger) check_running_as_root(logger) try: test_pyqt() except RuntimeError as e: logger.warning(f"\nWARNING: {e}\n") with cd(inst_check_dir): run_installation_check() selected_entries = get_host_entries(args) # TODO: Handle the exception which will be raised if the host file # contains an error. For the run_test_jobs() call, we need to be in the # current directory, not in inst_check_dir, because run_test_jobs() # will call jobcontrol.get_hostfile(), and we need it to give us the # same host file that we are using. # TODO: Try to handle this better (also to make sure the host file # never gets overwritten) return_code = 0 if selected_entries: jobids = run_test_jobs(selected_entries, launch_dir=inst_check_dir, timeout=test_job_timeout) with cd(inst_check_dir): return_code = run_postmortem(jobids) logger.info("* Archiving results") shutil.make_archive(inst_check_dir, 'zip', '.', inst_check_dir) logger.info(f"Archive: {inst_check_dir}.zip") return return_code
[docs]@contextmanager def cd(path): old_dir = os.getcwd() os.chdir(path) try: yield finally: os.chdir(old_dir)
[docs]def get_host_entries(args): """ Get all the host entries (jobcontrol.Host objects) from the hosts file. If no -test arguments were given, return them all. If -test arguments were given, return entries whose names are given by those arguments and which are present in the hosts file (warn on absent ones) """ if args.nojobs: return [] all_entries = jobcontrol.get_hosts() if not args.test: return all_entries entry_by_name = dict() for entry in all_entries: entry_by_name[entry.name] = entry all_entry_names = list(entry_by_name) selected_entries = [] test_entry_names = args.test for name in test_entry_names: if name in all_entry_names: selected_entries.append(entry_by_name[name]) else: logger.warning( f"WARNING: There is no entry '{name}' in your hosts file") return selected_entries
[docs]def run_test_jobs(entries, launch_dir=".", duration=10, timeout=None): """ Run testapp jobs using a list of host entries (jobcontrol.Host objects). Return a list of all jobids, succeeded and failed, that were run """ # Need to make sure that jobs, which will be run from launch_dir, # will use the same host file as the one that the top-level process gets. # Therefore, copy the file into launch_dir, to make sure jobs use it. # (NB: If we abolish the non-default host file, this step will no longer be needed.) if launch_dir != ".": launch_dir_host_file = os.path.join(launch_dir, "schrodinger.hosts") shutil.copy(jobcontrol.get_hostfile(), launch_dir_host_file) # Add one job per entry; TestJobDJ will form the right command lines # for the right entries jobdj = TestJobDJ(entries, basedir=launch_dir, verbosity="normal") # not specifying an explicit dir for TestJob will make the job run # in subdirectory launch_dir/<host> for _ in range(len(entries)): jobdj.addJob(TestJob(duration=duration, timeout=timeout)) try: jobdj.run() except Exception as err: logger.exception("ERROR: running test jobs failed") return jobdj.completedJobids()
[docs]def get_postmortem_command(jobids: List[str]) -> List[str]: """ Return correct postmortem command. """ if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): return ["jsc", "postmortem", "-without-redaction"] + jobids return ["postmortem", "-suppressinstcheck"] + jobids
[docs]def run_postmortem(jobids): """ Run postmortem for a given set of jobids, log the results into the "postmortem.log" """ logger.info("* Running postmortem") cmd = get_postmortem_command(jobids) with open("postmortem.log", 'w') as logf: proc = subprocess.run(cmd, stderr=subprocess.STDOUT, stdout=logf) if proc.returncode: logger.warning( f'postmortem {cmd} failed with return code: {proc.returncode}') return proc.returncode
[docs]def test_pyqt(): """ Determine if we have correct libraries to load PyQt. If test fails, raises RuntimeError with suggestion on how to fix. """ if not sys.platform.startswith("linux"): return try: output = subprocess.check_output( ["python3", "-c", "import schrodinger.Qt.QtGui"], stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: output = e.output else: return epilog = "\nFailure to install these libraries may result in random segfaults, even on a server system that otherwise does not need X11." dist = distro.id() msg = "" if dist in ("debian", "ubuntu"): if "libfontconfig.so.1" in output: msg = "Please 'apt-get install fontconfig' to use Schrodinger software." raise RuntimeError(msg + epilog) elif dist in ("redhat", "centos"): if True or "libfontconfig.so.1" in output or "libX11.so.6" in output: msg = "Please 'yum install fontconfig libX11' to use Schrodinger software." raise RuntimeError(msg + epilog) else: msg = f"Your OS {dist[0]} is unrecognized. " msg += "Please install libX11 and/or libfontconfig packages to make the following command work: \n\n" msg += "$SCHRODINGER/run python3 -c 'import schrodinger.Qt.QtGui' to work.\n\n" msg += "If you are having trouble, please contact support at " msg += "https://www.schrodinger.com/supportcenter with " msg += "output of this script. " msg += "\n\nOutput:\n" + output raise RuntimeError(msg + epilog)
[docs]def get_installation_check_commands(): """ Returns a map of diagnostic commands with: key: command description value: args list """ cmd_map = { "gfxinfo": [os.path.join(SCHRODINGER, "gfxinfo" + EXE)], "machid": [os.path.join(SCHRODINGER, "machid" + EXE)], "toplevel": [ "ls", "-lL", SCHRODINGER, os.path.join(SCHRODINGER, "utilities") ], "license-tokens": [ os.path.join(SCHRODINGER, "utilities", "licutil" + EXE), "-list" ], "license-jobs": [ os.path.join(SCHRODINGER, "utilities", "licutil" + EXE), "-jobs" ], "licadmin-info": [os.path.join(SCHRODINGER, "licadmin" + EXE), "info"], "env": ["env"], "nvidia-smi": ["nvidia-smi", "-q"], "query-gpgpu": [ os.path.join(SCHRODINGER, "utilities", "query_gpgpu"), "-a" ], } if sys.platform == "win32": cmd_map["dll-checker"] = [ os.path.join(SCHRODINGER, "tools", "dll_checker_util.exe") ] elif sys.platform.startswith("darwin"): cmd_map["sysinfo"] = [ "system_profiler", "SPSoftwareDataType", "SPEthernetDataType", "SPNetworkDataType" ] elif sys.platform.startswith("linux"): cmd_map["quota"] = ["quota"] cmd_map["sestatus"] = ["sestatus", "-v", "-b"] if not sys.platform == "win32": cmd_map["ulimit"] = ["bash", "-c", "ulimit -a"] return cmd_map
def _run_command(cmd: List[str], cmd_output: TextIO = sys.stdout, log_failure: bool = True, timeout: int = 1200) -> Optional[int]: """ Runs the given command, writing combined stdout and stderr to the given cmd_output. :param cmd_output: The file where command output will be written. :param log_failure: If set to True, errors will be logged using installation_check's logger.info. Otherwise, they will be printed to the cmd_output. :param timeout: number of secs to run commands :return: return code of the command if it ran successfully; otherwise returns None. """ cmdstr = subprocess.list2cmdline(cmd) try: proc = subprocess.run(cmd, stdout=cmd_output, stderr=subprocess.STDOUT, shell=False, timeout=timeout) except (OSError, ValueError, subprocess.TimeoutExpired) as e: msg = f"Failed to run cmd '{cmdstr}': '{e}'" if log_failure: logger.info(f"- {msg}") else: print(f"({msg})", file=cmd_output) return None return proc.returncode
[docs]def run_installation_check(output_dir="."): """ Runs diagnostic commands and writes output in the directory specified. Note: licadmin will write the license - info file in the current directory, not in output_dir. We may want to remove the output_dir argument and leave it to the caller to cd to the correct directory """ cmd_map = get_installation_check_commands() with ensure_logfile(): if not os.path.exists(output_dir): os.makedirs(output_dir) for output_file, cmd in cmd_map.items(): output_file = output_file + ".log" output_path = os.path.join(output_dir, output_file) with open(output_path, "w") as fh: cmdstr = subprocess.list2cmdline(cmd) errcode = _run_command(cmd, cmd_output=fh) if errcode is None: continue elif errcode: error_msg = (f"- Command '{cmdstr} > {output_file}' " f"failed with error code {errcode}") logger.info(error_msg) else: logger.info(f"* saved output of '{cmdstr}' " f"in file '{output_file}'") # log queue file diffs queue_diffs = queue_config_diffs() if queue_diffs: queue_diff_log = os.path.join(output_dir, "queue-diff.log") logger.info(f"* saved queue config file " f"diffs in file '{queue_diff_log}'") with open(queue_diff_log, 'w') as fh: for line in queue_diffs: fh.write(line) else: logger.info("* queue config files do not differ " "from their original versions")
[docs]def queue_inst_file(queue_name, filename): queue_inst_dir = os.path.join(QUEUES_DIR, queue_name) return os.path.join(queue_inst_dir, filename)
[docs]def queue_orig_dir(queue_name): return os.path.join(QUEUES_DIR, queue_name, 'orig')
[docs]def queue_orig_file(queue_name, filename): return os.path.join(queue_orig_dir(queue_name), filename)
[docs]def queue_config_diffs(): """ Return a collection of diffs of config and template queue files (original version from the installation vs what's in the user's queues dir) """ pairs, warnings = queue_config_file_pairs() diffs = warnings # list for pair in pairs: diffs += queue_config_files_diff(pair[0], pair[1]) return diffs
[docs]def queue_config_file_pairs(): """ Return a list of pairs of config/template queue files to be compared, and a list of warnings. A pair is one file from the user's installation and the original version of the same file (from the 'orig' subdirectory). A warning is produced whenever the 'orig' subdirectory is not found, or when a user's or original file that should be present, is not. """ config_files = ['config', 'template.sh'] pairs = [] warnings = [] for queue_name in os.listdir(QUEUES_DIR): orig_dir = queue_orig_dir(queue_name) if not os.path.isdir(orig_dir): warnings.append(dir_warning_template.format(orig_dir)) continue for filename in config_files: inst_filepath = queue_inst_file(queue_name, filename) orig_filepath = queue_orig_file(queue_name, filename) if not os.path.isfile(inst_filepath): warnings.append(file_warning_template.format(inst_filepath)) continue if not os.path.isfile(orig_filepath): warnings.append(file_warning_template.format(orig_filepath)) continue pairs.append([inst_filepath, orig_filepath]) return pairs, warnings
[docs]def queue_config_files_diff(inst_filepath, orig_filepath): """ Return a diff (list of strings) of two config/template queue files """ with open(inst_filepath) as fh: inst_filelines = fh.readlines() with open(orig_filepath) as fh: orig_filelines = fh.readlines() diff = list( difflib.context_diff(orig_filelines, inst_filelines, orig_filepath, inst_filepath)) return diff
[docs]def log_banner(logger): """ Log the basic information. """ logger.info("Schrodinger installation check") logger.info("------------------------------") logger.info(f"SCHRODINGER: {SCHRODINGER}") logger.info("Hosts file: " + os.path.abspath(jobcontrol.get_hostfile()))
[docs]def check_running_as_root(logger): """ Produce a warning if we are running as root. """ if sys.platform != 'win32' and os.geteuid() == 0: warn = ( "WARNING: You are running the installation check utility as 'root'. " + "You should avoid doing that. Running as root may not reveal some of " + "the problems you will encounter as an ordinary user, or, on the contrary, " + "report problems (e.g., with passwordless ssh) that you will *not* encounter " + "as an ordinary user.") logger.warning(warn)
[docs]def parse_args(args=None): """ Parse cmdline arguments. """ parser = argparse.ArgumentParser(prog="installation_check") group = parser.add_mutually_exclusive_group() group.add_argument('-testall', action='store_true', help='Run tests on all host entries (default)') group.add_argument( '-test', action='append', help='Run test on a specific host entry (can be used multiple times)') group.add_argument('-nojobs', action='store_true', help='Do not run any jobs') args = parser.parse_args(args=args) return args
[docs]class TestJobDJ(queue.JobDJ): """ A flavor of JobDJ that takes a list of host entries (jobcontrol.Host objects) and lets the caller schedule exactly one job per entry. Create an instance of this class, add the jobs, and call run(). """
[docs] def __init__(self, entries, basedir=None, *args, **kwargs): hosts_args = [(entry.name, 1) for entry in entries] queue.JobDJ.__init__(self, hosts=hosts_args, *args, **kwargs) self._basedir = basedir or os.getcwd() self._entry_by_name = dict() for entry in entries: self._entry_by_name[entry.name] = entry self.max_retries = 0 self.max_failures = queue.NOLIMIT self.disableSmartDistribution()
[docs] def basedir(self): return self._basedir
[docs] def entryByName(self, name): return self._entry_by_name[name]
def _availableHost(self, *args, **kwargs): """ After getting an available host, delete it from the host pool. This way, no more than one job per host will be scheduled. """ host = super()._availableHost(*args, **kwargs) del self._hosts[host] return host def _checkSubmitted(self, job): """ We need to disable rescheduling queue jobs to non-queue hosts, which the original JobDJ does in this method """
[docs] def completedJobids(self): """ The jobids of all jobs that have completed (with success or failure) """ job_ids = [job._job_id for job in self.done_jobs] for job in self.failed_jobs: # if there ever was a jobid, it has been recorded in .launch_error for line in str(job.launch_error).splitlines(): match = jobcontrol.jobid_re.search(line) if match: job_id = match.group(1) job_ids.append(job_id) return job_ids
[docs]class TestJob(queue.JobControlJob): """ A flavor of JobControlJob to be used with TestJobDJ. Its command line will be determined when run() is called on it, by setup(); the job will get named after the host entry that it gets to run on. """
[docs] def __init__(self, command_dir=None, duration=10, timeout=None): """ :type command_dir: str :param command_dir: The launch directory of the job :type duration: int :param duration: The duration of the (testapp) job :type timeout: int :param timeout: Timeout (in seconds) after which the job will be killed. If None, the job is allowed to run indefinitely. """ super().__init__(None, command_dir=command_dir, timeout=timeout) self._duration = duration self._jobdj = self.getJobDJ()
[docs] def doCommand(self, host, *args, **kwargs): entry = self._jobdj.entryByName(host) try: self._command = self.formCommand(entry) super().doCommand(host, *args, **kwargs) except Exception as err: logger.exception(f"ERROR: running test job on host {host} failed")
[docs] def run(self, host, *args, **kwargs): self._command_dir = os.path.join(self._jobdj.basedir(), host) return super().run(host, *args, **kwargs)
[docs] def formCommand(self, entry): """ Generate a command line appropriate for a given host entry """ name_args = ["-j", entry.name] duration_args = ["-t", str(self._duration)] cmd = "testapp" subjob_args = [] gpu_args = ["--print-gpu-info"] if entry.queue: cmd = "para_testapp" subjob_args = ["-n", "2"] command = [cmd] + name_args + duration_args + subjob_args + gpu_args return command
[docs]def find_user_libstdcpp(): """ Returns path to system libstdc++.so.6 (or default libstdc++.so.6 before SCHRODINGER environment was found. """ LIB = "libstdc++.so.6" cpp_program = os.path.join(os.environ['SCHRODINGER_EXEC'], "testapp_backend") with swap_ld_library_path(): ldd_output = subprocess.run(["ldd", cpp_program], check=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout for line in ldd_output.splitlines(): line = line.strip() if line.startswith(LIB): return line.split(" ")[2] raise RuntimeError(f"Could not find {LIB} in {ldd_output}")
[docs]def get_libstdcpp_version(library): """ Returns GLIBCXX minor version from a given library. :param str library: pathname to a libstdc++.so.6 """ with open(library, "rb") as fh: text = fh.read() versions = [] # GLIBCXX API has been 3.4 since gcc-3.4, with only minor # version changes https://gcc.gnu.org/onlinedocs/libstdc++/manual/abi.html for match in re.finditer(rb"GLIBCXX_3\.4\.([0-9]+)", text): versions.append(int(match.group(1))) if not versions: raise RuntimeError(f"Could not gind GLIBCXX version form {library}") return sorted(versions)[-1]
[docs]def get_bundled_libstdcpp(): return os.path.normpath( os.path.join(os.environ["MMSHARE_EXEC"], "..", "..", "lib", "Linux-x86_64", "libstdc++.so.6"))