Source code for schrodinger.test.stu.process_management

"""
Manage processes started by STU. This includes killing and printing
status information.
"""

import datetime
import os
import subprocess

import psutil

from schrodinger.infra import mmjob


[docs]def format_process_status(process): """Print a full usage/ status message about a process""" msg = (" memory: {process_memory:.2%}, {total_memory:.2%} total\n" " cpu: {process_cpu:.2%}, {total_cpu:.2%} total\n" " processes: {process_processes}, {total_processes}\n" "process tree:\n").format(**process_status(process)) msg += format_process_tree(process) return msg
[docs]def process_status(process): process_memory = process.memory_percent() process_cpu = process.cpu_percent() process_processes = 1 for child in process.children(recursive=True): try: process_memory += child.memory_percent() process_cpu += child.cpu_percent() except (psutil.NoSuchProcess, psutil.AccessDenied): continue process_processes += 1 return dict(process_memory=process_memory, total_memory=psutil.virtual_memory().percent, process_cpu=process_cpu, total_cpu=psutil.cpu_percent(), process_processes=process_processes, total_processes=len(psutil.pids()))
def _format_psutil_process(process): """Pretty-print a psutil.Process object.""" cmd = subprocess.list2cmdline(process.cmdline()) date = datetime.datetime.fromtimestamp(process.create_time()) name = process.name() if name in ('perl', 'python') and len(process.cmdline()) > 1: name += ' ' + os.path.basename(process.cmdline()[1]) return '{} [{}] started {}: {} (status: {})'.format(process.pid, name, date, cmd, process.status())
[docs]def format_process_tree(process, maxdepth=100, ignore_zombies=False): """Print the current process tree starting at process""" return '\n'.join( process_tree_lines(process, maxdepth, ignore_zombies=ignore_zombies))
[docs]def process_tree_lines(process, maxdepth=100, level=0, ignore_zombies=False): """ Recursive internal method to print the current process tree starting at process """ try: if ignore_zombies and process.status() == 'zombie': return yield ('^ ' * level) + _format_psutil_process(process) except (psutil.NoSuchProcess, psutil.AccessDenied): # Process is exiting if not ignore_zombies: yield ('^ ' * level) + f'process exiting {process.pid}' return if level > maxdepth: return level += 1 for child in process.children(recursive=False): yield from process_tree_lines(child, maxdepth, level, ignore_zombies=ignore_zombies)
[docs]def kill_process_children(process, timeout=3): """Fatally wound and terminate all child subprocesses of 'process'. Based on https://code.google.com/p/psutil/source/browse/test/test_psutil.py#257 """ #collect children first because the process generation is lazy child_procs = set(process.children(recursive=True)) for child_proc in child_procs: try: child_proc.terminate() except psutil.NoSuchProcess: pass gone, alive = psutil.wait_procs(child_procs, timeout=timeout) for child_proc in alive: try: child_proc.kill() except psutil.NoSuchProcess: pass _, alive = psutil.wait_procs(alive, timeout=timeout) alive_info = [f'{p.pid}:{p.name()}' for p in alive] assert len(alive) == 0, 'Failed to kill all children of {}: {}'.format( process.pid, ' '.join(alive_info)) return True
[docs]def kill_launched_jobcontrol_jobs(stu_test_id: str) -> str: """ Kill all jobcontrol jobs launched by processes associated with a specific STU test, designated by stu_test_id. :return: Descriptive messages about which jobs were cancelling. Empty str if no jobs were cancelled. """ # STU sets this variable in the environment of each test that it launches # JobControl returns environment variables as single strings # like "VARIABLENAME=value" test_id_env = f'SCHRODINGER_STU_TEST_ID={stu_test_id}' messages = "" latest_jobs = mmjob.LatestJobs() latest_jobs.update() for job in latest_jobs.getUpdatedJobs(): if job.isComplete(): continue if test_id_env in job.Envs: messages += f"canceling {job}\n" job.cancel() return messages