Source code for schrodinger.application.matsci.jobutils

"""
Copyright Schrodinger, LLC. All rights reserved.
"""

import contextlib
import json
import os
import os.path
import pathlib
import random
import re
import shutil
import sys
import tarfile
import tempfile
import zipfile
from collections import OrderedDict
from collections import defaultdict

import psutil

import schrodinger
from schrodinger import gpgpu
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import license as dlicense
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import textlogger
from schrodinger.infra import jobhub
from schrodinger.infra import mmjob
from schrodinger.job import jobcontrol
from schrodinger.job import launchapi
from schrodinger.job import launcher
from schrodinger.job import queue
from schrodinger.Qt import QtCore
from schrodinger.structure import workflow_action_menu as wam
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils
from schrodinger.utils import license
from schrodinger.utils import subprocess

AMCELL_NO_SYSTEM_OUT = '-amcell.maegz'

DRIVER = 'driver'
ARGS = 'args'

RESTART_PARAMETERS_FILENAME = 'parameters.cmd'

RESTART_PROGRAM_NAME = 'RestartWorkflow'
RESTART_DEFAULT_JOBNAME = 'restart_workflow'

CLEAN_AND_UNIQUIFY_MAX_LEN = 100

DOLLARS = re.compile(r'([^\\])\$')

SOURCE_PATH_PROP = msprops.SOURCE_PATH_PROP

TGZ_FORMAT = tarfile.PAX_FORMAT

FROM_SUBJOB = 'from_subjob'
LOG_TAG = 'log_tag'
AUTOHOST = 'autohost'

CHECKED_OUT_MATSCI_MAIN = None

WAM_TYPES = wam.WorkflowType


[docs]def get_logging_tag(job, tag=LOG_TAG): """ Get the logging tag of a job. :param job: a job object :type job: `schrodinger.job.queue.JobControlJob` :param tag: the job attribute containing the info. :type tag: str :return: the logging tag :rtype: str """ return getattr(job, tag, "")
[docs]def add_outfile_to_backend(file_fn, backend=None, set_structure_output=False, stream=False, wait=True): """ Add output file or directory to the backend. :type file_fn: str :param file_fn: File or directory name :type backend: `schrodinger.job._Backend` or None :param backend: Backend handle. If None, a backend will be checked for. If no backend is found, nothing will be done. :type set_structure_output: bool :param set_structure_output: If True, set this structure as output :type stream: bool :param stream: If True, stream the file to the submission host :type wait: bool :param wait: if True wait until the job has finished before the output file or directory is copied back to the launch host, otherwise copy the file or directory to the launch host when this function is called, only relevant if stream is False """ if not backend: backend = jobcontrol.get_backend() if not backend: return if stream: backend.addLogFile(file_fn) elif not wait: backend.copyOutputFile(file_fn) else: backend.addOutputFile(file_fn) if set_structure_output: backend.setStructureOutputFile(file_fn)
[docs]def log_structures_found(qjob, structure_files, log, jobstates_for_logging=None): """ Log structures info for a job. :type qjob: `schrodinger.job.queue.JobControlJob` :param qjob: The subjob to find structures :type structure_files: dict :param structure_files: Keys are subjobs, values are sets of structure file names :type log: callable :param log: function(msg) writes msg to log file :type jobstates_for_logging: None or list of str :param jobstates_for_logging: Log info for subjobs in these states """ if jobstates_for_logging is None: jobstates_for_logging = {queue.JobState.DONE, queue.JobState.FAILED} if qjob.state not in jobstates_for_logging: return sfiles = structure_files[qjob] # When logging information about a job object, if the object has an attribute # named the module constant LOG_TAG, the value of that attribute will precede # the rest of the log message. This enables log file lines to be attributed # to specific jobs. msg = get_logging_tag(qjob) if sfiles: msg += f"Found completed structures: {', '.join(sfiles)}" else: jobname = qjob.name if qjob.name else ' '.join(qjob._command) msg += f'No output structure found for {jobname}' log(msg)
[docs]def run_jobdj_and_add_files(jobdj, log, expect_exts=None, exclude_exts=None, jobdj_dir=os.curdir): """ Run the subjobs currently queued up in the jobdj, adding their files to the current backend for copying back to the job directory and locating the expected structure output file for each job. :type jobdj: `schrodinger.job.queue.JobDJ` :param jobdj: The JobDJ with queued up jobs to run :type log: callable :param log: function(msg) writes msg to log file :type expect_exts: None or list of str :param expect_exts: The expected extensions of the output files :type exclude_exts: None or list of str :param exclude_exts: The output files found with the excluded extensions are not copied back to the original job directory or documented into logger :type jobdj_dir: str :param jobdj_dir: jobdj_dir is the relative path from where the backend is created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2 as an example, normally backend and jobdj are created in /scr/user/jobname/, and thus jobdj_dir by default is os.curdir. If the jobdj is created inside /scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/, jobdj is in /scr/user/jobname/subdir1, and the subjob can run in /scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2) :rtype: list :return: A list of structure output file names, sorted alphabetically """ # Run all jobs and make sure we grab their output backend = jobcontrol.get_backend() structure_files = defaultdict(set) completed_jobs = set() if expect_exts is None: expect_exts = ['.cms', '.mae', '.zip'] if exclude_exts is None: exclude_exts = [] def _finalize(qjob, force=False): """ Process status changes for a JobDJ job. Log relevant information for completed jobs. :param qjob: JobDJ's snapshot of job state at status change :type qjob: schrodinger.job.queue.BaseJob :param bool force: Whether to consider jobs complete even if job.isComplete() returns False """ if not isinstance(qjob, queue.JobControlJob): return # The status change could be to RUNNING if not qjob.hasExited(): return # We attempt to finalize each job as it is retryable failed, failed # or done so that its files are copied back even if the subjob fails # or the parent job gets killed before all subjobs finish if finalize_subjob(qjob, backend, structure_files, expect_exts, log, exclude_exts=exclude_exts, jobdj_dir=jobdj_dir) or force: completed_jobs.add(qjob) # Log information about results found and not found for completed # Done and Failed subjobs log_structures_found(qjob, structure_files, log) # status_change_callback is called every time a job status changes. For # example, RUNNING->DONE. jobdj.run(status_change_callback=_finalize) for qjob in jobdj.all_jobs: if qjob in completed_jobs: continue # If multiple jobs complete simulatenously, there may be some jobs missing # from the status_change_callback. Also jobs might be returned before # isComplete returns True. This should catch all these caes. _finalize(qjob, force=True) # Sorting ensures that structures for similar systems appear near each # other in the PT after incorporation output_st_files = [] for subjob_outfiles in structure_files.values(): output_st_files += [ os.path.basename(cmd_dir_filename) for cmd_dir_filename in subjob_outfiles ] output_st_files.sort() return output_st_files
[docs]def finalize_subjob(subjob, backend, structure_files, structure_extensions, log, exclude_exts=None, jobdj_dir=os.curdir): """ Mark subjob output and log files for copying back to the job directory, and find the structure output file if there is one :type subjob: `schrodinger.job.queue.JobControlJob` or None :param subjob: The subjob to mark files from :type backend: `schrodinger.job.jobcontrol._Backend` or None :param backend: The current backend or None if there isn't one :type structure_files: dict :param structure_files: If an output structure file is found, it will be added to this dict. Keys are subjobs, values are sets of structure file names :type structure_extensions: list of str :param structure_extensions: The expected extension of the structure files :type log: function :param log: function(msg) writes msg to log file :type exclude_exts: None or list of str :param exclude_exts: The output files found with the excluded extensions are not copied back to the original job directory or documented into logger :type jobdj_dir: str :param jobdj_dir: jobdj_dir is the relative path from where the backend is created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2 as an example, normally backend and jobdj are created in /scr/user/jobname/, and thus jobdj_dir by default is os.curdir. If the jobdj is created inside /scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/, jobdj is in /scr/user/jobname/subdir1, and the subjob can run in /scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2) :rtype: bool :return: True if the job has completed, False if not """ ajob = subjob.getJob() if not ajob: # Job has not been submitted yet, or is just in that process return if exclude_exts is None: exclude_exts = [] outfiles = ajob.OutputFiles if hasattr(subjob, 'outfiles'): # Permittivity workflow may set outfiles outside the subjob and this # combines the standard ajob.OutputFiles with the customized job.outfiles outfiles += subjob.outfiles sub_dir = subjob.getCommandDir() path = sub_dir if jobdj_dir == os.curdir else os.path.join( jobdj_dir, sub_dir) add_subjob_files_to_backend(ajob, path=path, backend=backend, exclude_exts=exclude_exts) for filename in outfiles: if sub_dir: filename = os.path.join(sub_dir, filename) if not os.path.exists(filename): continue if any([filename.endswith(x) for x in exclude_exts]): continue extension = fileutils.splitext(filename)[1] for structure_extension in structure_extensions: if extension.startswith(structure_extension): structure_files[subjob].add(filename) # Completed means finished running. It does not mean success or failure. # When logging information about a job object, if the object has an attribute # named the module constant LOG_TAG, the value of that attribute will precede # the rest of the log message. This enables log file lines to be attributed # to specific jobs. if ajob.isComplete(): msg = get_logging_tag(subjob) msg += f'Job {ajob.Name} completed' log(msg) return True else: return False
[docs]def add_subjob_files_to_backend(subjob, path=None, backend=None, exclude_exts=None, also_input=False): """ Add all the output and log files from a subjob to the backend of this job so that they get copied back to the original job directory. :note: subjob log files are added as output files instead of log files. They will not be streamed back to the original job directory but instead copied back at the end of this job like a normal output file. :type subjob: `schrodinger.job.jobcontrol.Job` or `schrodinger.job.queue.JobControlJob` :param subjob: The subjob to add files from. :type path: str :param path: The path to the subjob directory from where the backend is created if it was not run in the same directory as this job. Use `FROM_SUBJOB` to get the subjob directory from a JobControlJob object - this will be ignored if subjob is a Job object. :type backend: `schrodinger.job.jobcontrol._Backend` :param backend: The backend if one exists :type exclude_exts: None or list of str :param exclude_exts: The output files found with the excluded extensions are not copied back to the original job directory or documented into logger :param bool also_input: Also add the job input files to the backend """ if not backend: backend = jobcontrol.get_backend() if not backend: return if isinstance(subjob, queue.JobControlJob): if path == FROM_SUBJOB: path = subjob.getCommandDir() subjob = subjob.getJob() if not subjob: return if exclude_exts is None: exclude_exts = [] # Subjob log files may have already been registered with this backend as # a log (streaming) file so that the subjob log gets streamed back # into the original job directory. It is an error to register a file # with the same backend as both a streaming and output file. We'll check # against the list of streaming log files to avoid this error. this_job_logfiles = set(backend.getJob().LogFiles) subjob_files = subjob.OutputFiles + subjob.LogFiles if also_input: # Note that InputFiles are stored with absolute paths, which breaks # everything. Just use the base file name. Also, ignore job control # inputs that start with '.' infiles = [os.path.basename(x) for x in subjob.InputFiles] subjob_files += [x for x in infiles if not x.startswith('.')] for filename in subjob_files: if os.path.isabs(filename): # Jobcontrol adds files with absolute paths to the OutputFiles list. # We don't want those files and they cause a traceback with # addOutputFile and old job control (JOBCON-7659) continue if any([filename.endswith(x) for x in exclude_exts]): continue if path: filename = os.path.join(path, filename) # Trying to register a log file as an out file both raises a traceback # and prints messages to the log file, so we do a pre-check rather than # a try/except to avoid the log file messages if filename not in this_job_logfiles: backend.addOutputFile(filename)
[docs]def determine_source_path(backend=None, job=None): """ Determine the original job directory. This is obtained from the job control Job object for this process. If no Job object is found, the current directory is used. :type backend: `schrodinger.job.jobcontrol._Backend` :param backend: The job control backend. Will be used to obtain the job object if no job is supplied. If neither backend or job are supplied, the backend will be obtained from job control (if one exists). :type job: `schrodinger.job.jobcontrol.Job` :param job: The job control job for this process. If not supplied, will be obtained from the backend (if one exists). :rtype: str :return: The directory that is the source path. Will be either the OrigLaunchDir property of the job or the current directory if not running under job control. """ if not job: if not backend: backend = jobcontrol.get_backend() if backend: job = backend.getJob() else: # Not running under job control, so running in the local directory return os.getcwd() try: sourcedir = job.OrigLaunchDir except AttributeError: # We don't know that this could ever happen, but just being safe sourcedir = "" return sourcedir
[docs]def set_source_path(struct, backend=None, job=None, path=None): """ Set the source path property to the original job directory. This is obtained from the job control Job object for this process. If no Job object is found, the current directory is used. :type struct: `schrodinger.structure.Structure` :param struct: The structure to set the property on. Note that property setting works properly on both Structure and Cms objects. :type backend: `schrodinger.job.jobcontrol._Backend` :param backend: The job control backend. Will be used to obtain the job object if no job is supplied. If neither backend or job are supplied, the backend will be obtained from job control (if one exists). :type job: `schrodinger.job.jobcontrol.Job` :param job: The job control job for this process. If not supplied, will be obtained from the backend (if one exists). :param str path: Manually set this path to the source directory, overriding all other options :rtype: str :return: The directory set as the source path. Will be either the OrigLaunchDir property of the job or the current directory if not running under job control. """ if path: sourcedir = path else: sourcedir = determine_source_path(backend=backend, job=job) # Note - deleting the property first is a workaround for SHARED-6890 just in # case we are working in Maestro if isinstance(struct, cms.Cms): struct.remove_cts_property(SOURCE_PATH_PROP) struct.set_cts_property(SOURCE_PATH_PROP, sourcedir) else: struct.property.pop(SOURCE_PATH_PROP, None) struct.property[SOURCE_PATH_PROP] = sourcedir return sourcedir
[docs]def get_source_path(source, existence_check=True): """ Get the source path to the original job directory :type source: `schrodinger.structure.Structure` or `schrodinger.project.ProjectRow` :param source: Either the ProjectRow or the structure to obtain the source information from. If a structure, can be either a Structure or a Cms object. :type existence_check: bool :param existence_check: If True (default), a blank string will be returned if the source path does not exist. If False, the path is returned regardless of whether it exists or not. :rtype: str :return: The original job directory or a blank string if none is found """ try: path = source.property.get(SOURCE_PATH_PROP, "") except (AttributeError, TypeError): # This is neither a Structure nor a ProjectRow raise TypeError('source must be a Structure or ProjectRow') if path and (not existence_check or os.path.exists(path)): # Add on any job subdirectory for this structure subdir = source.property.get(msprops.SUBDIRECTORY_PROP) if subdir: subpath = os.path.join(path, subdir) if (not existence_check or os.path.exists(subpath)): path = subpath return path return ""
[docs]def get_file_path(struct, prop): """ Get the path of the file defined by source path and property name :param struct: The structure whose property is checked :type struct: `schrodinger.structure.Structure` :param prop: property pointing to a file :type prop: str :return: path of the file if found :rtype: str or None """ filename = struct.property.get(prop) if not filename: return source_path = get_source_path(struct) filepath = os.path.join(source_path, filename) if not os.path.isfile(filepath): return return filepath
[docs]def prepare_job_spec_builder(argv, program_name, default_jobname, input_fn=None, set_driver_reserves_cores=False, schrodinger_product=None): """ Prepare generic job specification builder. If set_driver_reserves_cores script (driver) is set to True, script is expected to use all the cores (cpus), similar to umbrella mode in multisim. For an example see stress-strain driver. For all other cases (such as in opto/hopping/amorphous) keep set_driver_reserves_cores to False. :type argv: list :param argv: The list of command line arguments, including the script name at [0], similar to that returned by sys.argv :type program_name: str :param program_name: Program name :type default_jobname: str :param default_jobname: Default job name :type input_fn: str :param input_fn: Input filename :type set_driver_reserves_cores: bool :param set_driver_reserves_cores: If True, enable launchapi.setDriverReservesCores :type schrodinger_product: str :param schrodinger_product: A product directory to search for the script/executable. This should be the name of a directory under SCHRODINGER without the trailing version (i.e. the "-v*" part). :rtype: `launchapi.JobSpecificationArgsBuilder` :return: Job specification builder object """ job_builder = launchapi.JobSpecificationArgsBuilder( argv, use_jobname_log=True, schrodinger_product=schrodinger_product, program_name=program_name, default_jobname=default_jobname) if input_fn and os.path.isfile(input_fn): job_builder.setInputFile(input_fn) if set_driver_reserves_cores: job_builder.setDriverReservesCores(True) return job_builder
[docs]def add_folder_to_job_builder(job_builder, folder_path): """ Add folder (trajectory folder) to job directory. :type job_builder: `launchapi.JobSpecificationArgsBuilder` :param job_builder: Job specification builder object. :type folder_path: str :param folder_path: Full path to the folder that needs to copied. """ file_path = fileutils.get_files_from_folder(folder_path) for (abs_pathname, runtime_path) in file_path: job_builder.setInputFile(abs_pathname, runtime_path=runtime_path)
[docs]def add_desmond_license_to_job_builder(job_builder, license_host=None, toplevel_required=True): """ Add desmond GPU license to job builder. :param launchapi.JobSpecificationArgsBuilder job_builder: Job specification builder object :type license_host: str or None :param license_host: Host for which license should be generated. If None, use host from the top level args :param bool toplevel_required: If true, it's required that the current process was executed through toplevel.py, which causes TOPLEVEL_HOST_ARGS to be set :rtype: str or None :return: None on success, error message on error """ if license_host: args = [cmdline.FLAG_HOST, license_host] else: toplevel_args = os.getenv(jobcontrol.TOPLEVEL_HOST_ARGS_ENV) if not toplevel_args and not toplevel_required: return if not toplevel_args: return ('ERROR: Must specify %s when using umbrella mode.' % cmdline.FLAG_HOST) if (toplevel_args.find(cmdline.FLAG_SUBHOST) >= 0): return ('ERROR: Umbrella mode does not support the %s option.' % cmdline.FLAG_SUBHOST) host = msutils.get_val_from_cmdline(toplevel_args, cmdline.FLAG_HOST) if not host: return 'ERROR: Could not find %s argument.' % cmdline.FLAG_HOST args = toplevel_args.split() cmd = dlicense.add_md_lic(args) lic = msutils.get_val_from_cmdline(cmd, cmdline.FLAG_LIC) if not lic: return 'ERROR: Could not assign a license.' token, count = lic.split(':') job_builder.addLicense(license.LICENSE_BY_NAME[token], count)
[docs]def add_desmond_license_to_job_spec(job_spec, license_host): """ Add desmond GPU license based on the license host to the job spec. :param launchapi.JobSpecification job_spec: Job specification :param str license_host: Host for which license should be generated :rtype: str or None :return: None on success, error message on error """ args = [cmdline.FLAG_HOST, license_host] cmd = dlicense.add_md_lic(args) lic = msutils.get_val_from_cmdline(cmd, cmdline.FLAG_LIC) if not lic: return 'ERROR: Could not assign a license.' token, count = lic.split(':') job_spec.addLicense(license.LICENSE_BY_NAME[token], count)
[docs]def parse_restart_parameters_file(path): """ Parse parameters file. Format of the file: 1st line is driver's filename 2nd line is original arguments passed to the driver :type path: str :param path: Path to the file with original arguments :rtype: dict :return: Dictionary with parsed values """ params = {} with open(path, 'r') as pfile: params[DRIVER] = pfile.readline().strip() params[ARGS] = pfile.readline().strip().split() return params
[docs]def write_restart_parameters_file(driver, args, outpath): """ Write out original arguments to parameters file and add the file as an output file to any existing jobcontrol backend. :type driver: str :param driver: Driver's filename :type args: list :param args: Original arguments passed to driver :type outpath: str :param outpath: Path to the parameters file to write original arguments """ args_str = ' '.join(args) # Remove all occurrences of -HOST/-host in args args_str = re.sub(r'(?i)-HOST [^\s]+', '', args_str).strip() # Remove all occurrences of -TPP in args args_str = re.sub(r'(?i)%s [^\s]+' % '-TPP', '', args_str).strip() # For now, remove all occurrences of FLAG_USEZIPDATA in args args_str = re.sub(r'(?i)%s [^\s]+' % parserutils.FLAG_USEZIPDATA, '', args_str).strip() with open(outpath, 'w') as outfile: outfile.write('%s\n' % driver) outfile.write(args_str + '\n') backend = jobcontrol.get_backend() if backend: backend.addOutputFile(outpath)
[docs]def get_restart_id_filename(jobname): """ For the given restart jobname, return the name of the file containing the job id. :rtype: str :return: The name of the restart jobid file """ return jobname + '.jobid'
[docs]def extract_job_data(options, add_to_backend=False): """ Unzip the archive in the current directory. All the error handling is on the caller function. :param`argparse.Namespace` options: The object holding all the option values :param bool add_to_backend: Whether to add the files to the JC backend so that they are returned by the job """ path = get_option(options, parserutils.FLAG_USEZIPDATA) if path: with tarfile.open(name=path, mode='r:gz', format=TGZ_FORMAT) as tar: tar.extractall() if add_to_backend: backend = jobcontrol.get_backend() if backend: for name in tar.getnames(): backend.addOutputFile(name)
[docs]def archive_job_data(path, files_path): """ Create a gzipped tar archive in the current directory from the provided list of file paths. All the error handling is on the caller function. :type path: path :param path: Path to the new archive to be created :type path: files_path :param path: List of files to be archived """ with tarfile.open(name=path, mode='w:gz', format=TGZ_FORMAT) as tar: for file_path in set(files_path): tar.add(file_path)
[docs]def create_restart_launcher(script, prog, input_name, output_name, zip_name, options, args): (viewname, disp, proj, host, jobname) = get_restart_options(options) scriptlauncher = launcher.Launcher(script=script, jobname=jobname, viewname=viewname, disp=disp, proj=proj, prog=prog, copyscript=False, runtoplevel=True) args = ['-use_zip_data', zip_name] + args args = ['-HOST', host] + args scriptlauncher.addScriptArgs(args) scriptlauncher.addInputFile(input_name) scriptlauncher.addInputFile(zip_name) scriptlauncher.addOutputFile(output_name) scriptlauncher.setStructureOutputFile(output_name) # No need to update the parameters file from the restart return scriptlauncher
[docs]def create_restart_jobcmd(driver_path, zip_fn, restart_options, args, default_jobname): """ Generate command-line list for the job restart. :type driver_path: str :param driver_path: Path to the driver :type zip_fn: str :param zip_fn: Filename of the archive with all restart files :type restart_options: `argparse.Namespace` :param restart_options: The object holding all the option values :type args: list :param args: List of the arguments passed to the original run :type default_jobname: str :param default_jobname: Default job name :rtype: list :return: List of parameters ready for job submission """ (viewname, disp, proj, host, jobname) = get_restart_options(restart_options) # Prepending args[:0] = [parserutils.FLAG_USEZIPDATA, zip_fn] if host: args[:0] = ['-HOST', host] if proj: args[:0] = ['-PROJ', proj] if disp: args[:0] = ['-DISP', disp] if viewname: args[:0] = ['-VIEWNAME', viewname] if jobname: args[:0] = ['-JOBNAME', jobname] else: args[:0] = ['-JOBNAME', default_jobname] args = ['$SCHRODINGER/run', driver_path] + args return args
[docs]def write_idfile(jobobj): """ Store the job id in a file as a signal to the GUI that a new job has been launched. :type jobobj: `schrodinger.job.jobcontrol.Job` :param jobobj: The object holding all the option values """ filename = get_restart_id_filename(jobobj.name) idfile = open(filename, 'w') idfile.write(jobobj.jobid) idfile.close()
[docs]def get_string_from_flag(flag): """ Return the string from the flag :type flag: str :param flag: The flag for the desired option :rtype: str :return: The string from the flag (the flag minus the leading dash) """ if flag.startswith('-'): flag = flag[1:] return flag
[docs]def get_option(options, flag): """ Return the option value associated with flag :type options: `argparse.Namespace` :param options: The object holding all the option values :type flag: str :param flag: The flag for the desired option. :rtype: any :return: The value associated with flag, or None if flag (minus any leading dashes) is not found as a property on options """ flag = get_string_from_flag(flag) try: return getattr(options, flag) except AttributeError: return None
[docs]def set_option(options, flag, value): """ Set the option value associated with flag :type options: `argparse.Namespace` :param options: The object holding all the option values :type flag: str :param flag: The flag for the desired option. If the string starts with a '-', the '-' is removed. :type value: any :param value: The value to set the option.flag value to """ flag = get_string_from_flag(flag) setattr(options, flag, value)
[docs]def get_restart_options(options): """ Get the command line options from the -restart_x flags :rtype: tuple :return: tuple of strings (viewname, incorporation, project, host:cpu, jobname) """ restart_viewname = get_option(options, parserutils.FLAG_RESTART_VIEWNAME) restart_disp = get_option(options, parserutils.FLAG_RESTART_DISP) restart_proj = get_option(options, parserutils.FLAG_RESTART_PROJ) restart_host = get_option(options, parserutils.FLAG_RESTART_HOST) restart_jobname = get_option(options, parserutils.FLAG_RESTART_JOBNAME) return (restart_viewname, restart_disp, restart_proj, restart_host, restart_jobname)
[docs]def seed_random_number_generator(options, log=None): """ Seed the random number generator based on the command line options. If there is no seed in the command line options, a random value is used. :type options: `argparse.Namespace` :param options: The command line options from argparse. Note that passing in None for options will have the same affect as if the seed flag does not exist on options (i.e. a random value will be generated). :type log: function :param log: A function to log the seed value. Should take a single str argument. :rtype: int :return: The seed used for the generator """ seed = get_option(options, parserutils.FLAG_RANDOM_SEED) if seed is None: seed = random.randint(parserutils.RANDOM_SEED_MIN, parserutils.RANDOM_SEED_MAX) random.seed(seed) if log: log(f'Random number generator seeded with {seed}') return seed
[docs]def check_license(panel=None, token=license.MATERIALSCIENCE_MAIN, name="", as_validator=False, fall_back_tokens=None): """ Check if a valid token exists. If called from Maestro, also check out and hold a MATERIALSCIENCE_MAIN token. :type panel: schrodinger.ui.qt.appframework.AppFramework :param panel: panel to use to put up an error dialog if no license :type token: `schrodinger.utils.license` constant :param token: A token type from the schrodinger.utils.license module, such as MATERIALSCIENCE_MAIN or MATERIALSCIENCE_GA :type name: str :param name: The user-identifiable name for the token - used for error messages. If not provided, the string used in the license module for this token (if one exists) will be used. :type as_validator: bool :param as_validator: If True, this function will work as an AF2 validation method. Instead of posting a dialog or printing a message for a failed license check, it will return (False, error_message). :type fall_back_tokens: list or None :param fall_back_tokens: if present specifies that if the intended license check from the given token or name fails then attempt to get a valid license by running through this ordered list of fall back tokens, each of which is a `schrodinger.utils.license` constant for a non-MATSCI product :rtype: bool or (bool, str) :return: True if valid license exists. If no valid license exists, False will be returned by default, but (False, msg) will be returned if as_validator=True. Note that (False, msg) evalutes to True so must be handled by the calling routine as not a boolean if as_validator=True. """ global CHECKED_OUT_MATSCI_MAIN fall_back_tokens = fall_back_tokens or [] # below is code that assumes that all calls of this function made from # GUIs are to check out and hold on to a MATSCI main license, this is to # prevent more than the allotted number of users from simultaneously # running Maestro sessions using MATSCI products, in the case of using # valid fall back tokens we do not want to block users due to exceeding # the allotted number of users and so we use that code below, however # we need to then ensure that the fall back tokens do NOT include those # for any MATSCI products because otherwise that would prevent holding # on to the license and thus allow unlimited users from GUIs assert not any([license.is_matsci(token=x) for x in fall_back_tokens]) if not name: try: name = license.LICENSE_NAMES[token] except KeyError: pass msg = '' # Check out and hold a MATERIALSCIENCE_MAIN license if calling from Maestro if schrodinger.get_maestro(): if not CHECKED_OUT_MATSCI_MAIN or not CHECKED_OUT_MATSCI_MAIN.isValid(): # Tampering with licensing is a violation of the license agreement CHECKED_OUT_MATSCI_MAIN = license.License( license.MATERIALSCIENCE_MAIN) if not CHECKED_OUT_MATSCI_MAIN.isValid() and not fall_back_tokens: msg = ('There are no remaining MATERIALSCIENCE_MAIN license ' 'tokens. Materials Science features cannot be used in ' 'Maestro.') # Tampering with licensing is a violation of the license agreement if not msg: for token in [token] + fall_back_tokens: if license.is_license_available(token): break else: msg = (f'No {name} license token is available. ' 'No calculation can be run.') if msg: if as_validator: return (False, msg) elif panel: panel.error(msg) else: print(msg) return False return True
[docs]def check_licenses(*tokens, as_validator=False): """ Check if valid tokens exist. If called from Maestro, also check out and hold a MATERIALSCIENCE_MAIN token. :type tokens: list[schrodinger.utils.license] :param tokens: List of license tokens :type as_validator: bool :param as_validator: If True, this function will work as an AF2 validation method. Instead of posting a dialog or printing a message for a failed license check, it will return (False, error_message). :rtype: bool or (bool, str) :return: True if valid license exists. If no valid license exists, False will be returned by default, but (False, msg) will be returned if as_validator=True. Note that (False, msg) evalutes to True so must be handled by the calling routine as not a boolean if as_validator=True. """ for token in tokens: ret = check_license(token=token, as_validator=as_validator) if ret is not True: return ret return True
[docs]def create_run_dir(panel, jobname): """ Create a subdirectory to run a job in, asking the user and removing existing directories if needed. :type panel: schrodinger.ui.qt.appframework.AppFramework :param panel: panel to use to put up an error dialog if no license :type jobname: str :param jobname: The name of the job. The directory will be jobname + _dir :rtype: str or None :return: The path to the directory or None if an existing directory was found and the user elected not to remove it """ outdir = os.path.join(os.getcwd(), jobname + '_dir') if os.path.exists(outdir): if not panel.question('The job directory, %s, already exists.\nWould' ' you like to delete its contents and ' 'continue?' % os.path.basename(outdir)): return None def force_remove(func, path, excinfo): # Try to remove any difficult to rm file if func in (os.rmdir, os.remove): fileutils.force_remove(path) else: raise shutil.rmtree(outdir, onerror=force_remove) os.mkdir(outdir) return outdir
[docs]def string_to_value(string): """ Change a text string from a file to a value. Converts string values of special Python tokens such as True, False or None to the Python tokens. Converts numbers to int or float if possible. :type string: str :param string: The string to convert :return: string converted to, in order of preference: [True|False|None], int, float, or input type """ literals = {'True': True, 'False': False, 'None': None} if string in literals: # Special words value = literals[string] else: # Try to convert to a number if possible try: value = int(string) except ValueError: try: value = float(string) except ValueError: value = string return value
[docs]@contextlib.contextmanager def working_directory(path): """ A context manager which changes the working directory to the given path, and then changes it back to its previous value on exit. """ prev_cwd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(prev_cwd)
[docs]class StringCleaner(object): """ Manages the cleaning of strings. """
[docs] def __init__(self, extra_replacement_pairs=None, separator='-'): """ Populate an instance with some defaults. The replacement dictionary needs to be set such that the most specific replacements occur last. This is because the replacements should be done in a certain order, for example ('C:\\', '') should be done before (':', '') and ('\\', ''), and because people tend to append to an iterable rather than prepend we will traverse the iterable backwards. :type extra_replacement_pairs: list of tuples :param extra_replacement_pairs: each tuple in this list contains a single replacement pair, i.e. a single substring to be replaced and a single substring to replace it. :type separator: str :param separator: in the case of non-unique strings this is the string that separates the non-unique part from the number of times used which is the unique part. """ def pair(from_str, to_str=''): return (from_str, to_str) BASE_PAIRS = [ pair('\\'), pair('/'), pair(r'\\'), pair('?'), pair('%'), pair('*'), pair(':'), pair('|'), pair('"'), pair('>'), pair('<'), pair('('), pair(')'), pair('+'), pair(',') ] COMBIGLD_PAIRS = [ pair(' ', '_'), pair(';'), pair(']'), pair('['), pair('][', '-'), pair('[]') ] ALL_PAIRS = BASE_PAIRS + COMBIGLD_PAIRS if extra_replacement_pairs: ALL_PAIRS += extra_replacement_pairs self.replacement_dict = OrderedDict(ALL_PAIRS) self.separator = separator self.prev_names = {}
[docs] def cleanAndUniquify(self, input_str, clear_prev=False, max_len=CLEAN_AND_UNIQUIFY_MAX_LEN): """ Shorten if necessary, replace certain characters in an input string and then uniquify the string by comparing with a dictionary of previous names and number of times used. :type input_str: str :param input_str: the input string we want cleaned and uniqified :type clear_prev: bool :param clear_prev: specify if the dictionary of previous names should first be cleared :type max_len: int :param max_len: maximum length of the input_str allowed, otherwise it will be shortened to the max_len value :rtype: str :return: the input string now cleaned and uniquified """ # Shorten string if necessary output_str = input_str[:max_len] for from_substr in list(self.replacement_dict)[::-1]: to_substr = self.replacement_dict[from_substr] output_str = output_str.replace(from_substr, to_substr) if clear_prev: self.prev_names.clear() if not self.prev_names.get(output_str): self.prev_names[output_str] = 1 else: self.prev_names[output_str] += 1 output_str += self.separator + str(self.prev_names[output_str]) return output_str
[docs]def clean_string(string, default='title'): """ Cleans the given string by removing special characters to make it acceptable for a file name. If the string is blank, it will be replaced by the value of default. :type string: str :param string: The string to clean. :type default: str :param default: The name to use if string is blank :rtype: str :return: A string usable as a filename """ cleaner = StringCleaner() unclean = string or default return cleaner.cleanAndUniquify(unclean)
[docs]def zip_and_set_incorporation(zipname, filelist): """ Zip up all the requested files and set the resulting archive as the job control backend structure output file (if runnning under job control). :type zipname: str :param zipname: The name of the archive to create :type filelist: list :param filelist: Each item in filelist is the name of a file to add to file zipname """ zipper = zipfile.ZipFile(zipname, 'w') for filename in filelist: zipper.write(filename) zipper.close() backend = jobcontrol.get_backend() if backend: backend.addOutputFile(zipname) backend.setStructureOutputFile(zipname)
[docs]class CellRunInfo(object): """ Holds the information for the run for a single cell """
[docs] def __init__(self, structs, basename, replicate, multiple_cells, component=None, repeat_unit_info=None): """ Create a CellRunInfo object :type structs: list :param structs: The list of structures to include in the cell :type basename: str :param basename: The generic basename for job files. This will be modified based on the value of replicate, component and multiple_cells to form the base name for files for this specific cell. :type replicate: int :param replicate: Which replicate this is for - 1-based :type multiple_cells: bool :param multiple_cells: Whether there will be multiple replicates of this cell :type component: int or None :param component: The structure number this cell is for, or None if this is a mixed structure cell :type repeat_unit_info: list or None :param repeat_unit_info: Each item of the list is a tuple. The first item of the list is the sequence of monomer one-letter codes that give the repeat unit sequence. The second item is a tag to be added to the polymer name for that sequence (used for enumerated sequences) """ self.structs = structs self.basename = basename self.replicate = replicate self.cru = None if component: self.basename = self.basename + '_component%d' % component else: self.basename = self.basename + '_all_components' if repeat_unit_info and repeat_unit_info[1]: self.cru = repeat_unit_info[0] self.basename = self.basename + '_%s' % repeat_unit_info[1] if multiple_cells: self.basename = self.basename + '_%d' % replicate self.is_cg = all( msutils.is_coarse_grain(x, by_atom=True) for x in structs)
[docs]class MultijobDriver(object): """ Resubmit the driver as subjobs """ COMPOSITION_FLAG = '-composition' COMPOSITION_FLAG_SHORT = '-c'
[docs] def __init__(self, runlist, options, args, log, remote_script, default_jobname, basename=None): """ Create multiple cells in parallel running a subjob for each cell. Zip up the resulting cms files into a jobname.zip file and set it as the structure output file to be incorporated. :type runlist: list of `CellRunInfo` :param runlist: Each item of runlist will generate a subjob and a single cell :type options: `argparse.Namespace` :param options: The command line options :type args: iterable :param args: The command line arguments as passed in by sys :type log: function :param log: function(msg) writes msg to log file :type remote_script: string :param remote_script: the dir and name for driver to resubmit :type default_jobname: string :param default_jobname: Default job name :type basename: str or None :param basename: The generic basename defined from inputfile. """ if hasattr(options, 'output_basename'): basename = options.output_basename elif not basename: basename = default_jobname self.multijob_driver(runlist, options, args, basename, log, remote_script)
[docs] def remove_flag(self, args, flag, and_value=False): """ Remove a flag from the comand line flags :type args: list :param args: The list of command line arguments :type flag: str :param flag: The flag to remove :type and_value: bool :param and_value: Also remove the value associated with the flag - it is assumed that this is the following list item """ try: index = args.index(flag) except ValueError: # Flag was not used, so we don't need to do anything return if and_value: del args[index:index + 2] else: del args[index]
[docs] def replace_value(self, args, old_value, new_value): """ Replace the list item with value=old_value with the new value :type args: list :param args: The list of command line arguments :type old_value: str :param old_value: The value to replace :type new_value: str :param new_value: The value to replace old_value with """ try: index = args.index(old_value) except ValueError: return args[index] = new_value
[docs] def multijob_driver(self, runlist, options, args, basename, log, remote_script): """ Create multiple cells in parallel running a subjob for each cell. Zip up the resulting cms files into a jobname.zip file and set it as the structure output file to be incorporated. :type runlist: list of `CellRunInfo` :param runlist: Each item of runlist will generate a subjob and a single cell :type options: `argparse.Namespace` :param options: The command line options :type args: iterable :param args: The command line arguments as passed in by sys :type basename: str :param basename: The zipped .zip or . maegz filename for all job files. :type log: function :param log: function(msg) writes msg to log file :type remote_script: string :param remote_script: the dir and name for driver to resubmit """ log('Setting up job queue') sub_args = list(args) jobdj = queue.JobDJ(verbosity='normal', max_failures=queue.NOLIMIT, max_retries=3) for host, procs in jobdj._hosts.items(): log('Host: %s, processors:%d' % (host, procs)) # Remove flags we don't want for subjobs for flag in ['-homogeneous', '-no_disordered_cell']: self.remove_flag(sub_args, flag) self.remove_flag(sub_args, '-ncells', and_value=True) # homogeneous cells should not have composition flags as they only have # one component homo_args = sub_args[:] self.remove_flag(homo_args, self.COMPOSITION_FLAG, and_value=True) # User might have used the short form (-c) self.remove_flag(homo_args, self.COMPOSITION_FLAG_SHORT, and_value=True) # Create a subjob for each cell to create for runinfo in runlist: if len(runinfo.structs) == 1: cmdargs = homo_args[:] else: cmdargs = sub_args[:] if options.seed: seed = options.seed * runinfo.replicate self.replace_value(cmdargs, str(options.seed), str(seed)) if runinfo.cru: self.replace_value(cmdargs, options.repeat_unit, runinfo.cru) # Set up the input and output file names if hasattr(options, 'output_basename'): self.replace_value(cmdargs, options.output_basename, runinfo.basename) if hasattr(options, 'title'): self.replace_value(cmdargs, options.title, runinfo.basename) subinput = runinfo.basename + '.maegz' fileutils.force_remove(subinput) for struct in runinfo.structs: struct.append(subinput) self.replace_value(cmdargs, options.input_file, subinput) cmd = ['run', remote_script] + cmdargs rjob = RobustSubmissionJob(cmd) jobdj.addJob(rjob) log('Number of jobs to run: %d' % jobdj.total_added) # Run all jobs and make sure we grab their output expect_exts = ['.maegz' if options.no_system else '.cms'] st_files = run_jobdj_and_add_files(jobdj, log, expect_exts=expect_exts) backend = jobcontrol.get_backend() if options.no_system: # Read all the output structures and compile them into one file outname = basename + AMCELL_NO_SYSTEM_OUT writer = structure.StructureWriter(outname) struct_titles = {} for struct in structure.MultiFileStructureReader(st_files): # same component of different structures needs different titles # in PT if struct.title not in struct_titles: struct_titles[struct.title] = 0 else: struct_titles[struct.title] += 1 struct.title = struct.title + '_' + str( struct_titles[struct.title]) writer.append(struct) writer.close() if backend: backend.addOutputFile(outname) backend.setStructureOutputFile(outname) else: # Desmond CMS files need to be incorporated into a zip file. # Zip up all the cms files and incorporate them all if backend: jobname = backend.getJob().Name else: jobname = basename outname = jobname + '.zip' zip_and_set_incorporation(outname, st_files) log('Output compiled in %s' % outname)
[docs]def get_jobname(default_jobname): """ Return a the jobname from backend, command line (-NAMEJOB / environment), DEFAULT_JOBNAME :type default_jobname: str :param default_jobname: default_jobname of the current module :rtype: string :return: Jobname """ assert default_jobname return jobcontrol.get_jobname(default_jobname) or default_jobname
[docs]def get_procs(): """ Get number of processors from backend or command-line arguments. :rtype: int :return: Number of processors """ backend = jobcontrol.get_backend() if backend: if jobcontrol.get_backend_host_list(): return jobcontrol.get_backend_host_list()[0][1] or 1 else: return 1 else: if jobcontrol.get_command_line_host_list(): return jobcontrol.get_command_line_host_list()[0][1] or 1 else: return 1 return 1
[docs]def memory_usage_psutil(): """ return the memory usage in MB :rtype: float :rparam: memory usage in MB """ process = psutil.Process(os.getpid()) mem = process.memory_info()[0] / float(2**20) return mem
[docs]def get_size_of(an_object, size_unit='megabytes'): """ Return the size of an object in size_unit. The object can be any type of object. All built-in objects will return correct results, but this does not have to hold true for third-party extensions as it is implementation specific. :param an_object: :type an_object: any type of python object :return: the size of an object in size_unit :rtype: float """ unit_to_power = {'bytes': 0, 'kilobytes': 1, 'megabytes': 2, 'gigabytes': 3} return sys.getsizeof(an_object) / 1024**unit_to_power[size_unit]
[docs]def get_jobhosts(ncpu=None): """ Return the job hosts from backend or command line. :param ncpu: number of processors :type ncpu: int or None :rtype: None or list of tuple :rparam: the hosts or None """ if jobcontrol.get_backend(): hosts = jobcontrol.get_backend_host_list() else: hosts = jobcontrol.get_command_line_host_list() if hosts and ncpu: for host, cpus in hosts: if not host[1]: host[1] = cpus # If no hosts have been specified, use (localhost, None) return hosts if hosts else [(queue.LOCALHOST_ENTRY_NAME, ncpu)]
[docs]def get_jobhost(): """ Return the first job hosts from backend or command line. :rtype: list or None :rparam: the first host or None """ hosts = get_jobhosts() if not hosts: return host = hosts[0] if host[1] is not None: return list(host) return [host[0], 1]
[docs]def get_jobhost_name(): """ Return the job host name from backend or command line. :rtype: str or None :rparam: the host name """ hosts = get_jobhost() return hosts[0] if hosts else None
[docs]def get_backend_hosts_str(): """ Get backend host(s) as passed to the -HOST flag. This can be useful when a subjob needs to resubmit its own subjobs to the original queue via subhost and the current job may be running on localhost due to smart distribution being enabled in the parent job. For the hierarchy of submission: driver.py -> opto_driver/pdft_driver -> backend subjobs opto_driver.py will get 'localhost' as HOST because of the smart distribution, its subjobs still need to go to the queue. In order to do this, opto_driver.py can be submitted with '-SUBHOST get_queue_host()'. We want to keep smart distribution in the driver.py jobdj, so that opto_driver is not taking another slot in the queue. :rtype: str or None :return: Backend hosts or None if host is to determined by the JC """ # Must be running under JC if not jobcontrol.get_backend(): return hosts = jobcontrol.get_backend_host_list() if not hosts: return hosts_str = jobcontrol.host_list_to_str(hosts) # Return None if string is empty return hosts_str or None
[docs]def is_hostname_known(hostname): """ Check whether hostname is defined in the host file. :type hostname: str :param hostname: the hostname to check against :rtype: bool :rparam: True, if the hostname is defined in the host file """ hosts_list = jobcontrol.get_hosts() host_names = {h.name for h in hosts_list} return hostname in host_names
[docs]def is_jobhost_gpu_available(): """ Check whether the gpu is available on the host. First check SUBHOST, if defined. Then, check HOST, if defined. At last, check localhost. :rtype: bool :rparam: True means gpu is available. """ hostname = get_jobhost_name() if hostname in [None, queue.LOCALHOST_ENTRY_NAME]: # The calculation runs on local return gpgpu.is_any_gpu_available() # This checks the gpu availability for SUBHOST, if defined. # This checks the gpu availability for HOST, if SUBHOST not defined # and HOST defined. host = jobcontrol.get_host(hostname) # Define the GPU availability based on gpgpu (SUPPORT-128375) return bool(host.gpgpu)
[docs]def add_zipfile_to_backend(adir): """ Add a zip file of the given directory to the job backend. :type adir: str :param adir: the directory """ zip_file = f'{adir}.zip' zip_directory(adir, fileobj=zip_file) backend = None add_outfile_to_backend(zip_file, backend)
[docs]def get_backend_first_host(): """ Get backend first host. :rtype: str, int :return: host, number of processors """ assert jobcontrol.get_backend() hostlist = jobcontrol.get_backend_host_list() if hostlist: return hostlist[0] else: return (queue.LOCALHOST_ENTRY_NAME, 1)
[docs]def write_cms_with_wam(cms_model, filename, wam_type): """ Write the cms model to a file with the provided WAM property :param `cms.Cms` cms_model: The cms model to write to file :param str filename: The cms path :param int wam_type: One of the enums defined in workflow_action_menu.h """ with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer: writer.append(cms_model._raw_fsys_ct) for ct in cms_model.comp_ct: ct.append(filename, "CMS")
[docs]def add_wam_to_cms(filename, wam_type): """ Rewrite the cms with the WAM type added :param str filename: The cms path :param int wam_type: One of the enums defined in workflow_action_menu.h """ cms_model = cms.Cms(filename) write_cms_with_wam(cms_model, filename, wam_type)
[docs]def write_mae_with_wam(structs, filename, wam_type=None): """ :param list(structure.Structure) struct: The structures to write to file :param str filename: The mae path :param int wam_type: One of the enums defined in workflow_action_menu.h :raise ValueError: If the file path is not for a Maestro file """ if fileutils.get_structure_file_format(filename) != structure.MAESTRO: raise ValueError(f"{filename} is not a Maestro file path.") with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer: for struct in structs: writer.append(struct)
[docs]def set_structure_wam(struct, wam_type): """ Sets the WAM property for the passed structure :param `structure.Structure` struct: The structure to set the WAM for :param int wam_type: One of the enums defined in workflow_action_menu.h """ if isinstance(struct, cms.Cms): struct = struct._raw_fsys_ct wam.set_workflow_action_menu(struct, wam_type)
[docs]def remove_structure_wam(struct): """ Remove the structure WAM property from the passed structure :param structure.Structure struct: The structure to remove property from """ if isinstance(struct, cms.Cms): struct.remove_cts_property(msprops.STRUCTURE_WAM_PROP) else: struct.property.pop(msprops.STRUCTURE_WAM_PROP, None)
[docs]def get_smart_distribution_from_environ(default=True): """ Get smart distribution of the queue value based on the environment variable. :param bool default: Default value if env variable is not set. Must be aligned with the default value of the `queue.JobDJ.smart_distribution` :rtype: bool :return: Smart distribution value """ enable_sd = os.environ.get('SCHRODINGER_MATSCI_REUSE_DRIVER_HOST') if enable_sd is None: return default try: return msutils.setting_to_bool(str(enable_sd)) except ValueError: # Invalid string return default
[docs]def set_smart_distribution_from_environ(jobq, log=None): """ Set smart distribution of the queue to on/off based on the environment variable. :type jobq: `schrodinger.job.queue.JobDJ` :param jobq: The JobDJ object :type log: function or None :param log: A function to log smart distribution status. Should take a single str argument. """ state = get_smart_distribution_from_environ(default=None) if state is not None: jobq.setSmartDistribution(state) if log: log('Run some subjobs locally: %s' % jobq.smart_distribution)
[docs]class RobustSubmissionJob(queue.JobControlJob): """ A JobControlJob object that will retry to submit multiple times for fizzled, jobs (if the queue setting is such) but will not attempt to retry a job that died. """
[docs] def retryFailure(self, max_retries=0): """ Determine if the job should be retried or not. This overwrites the parent method to not retry jobs that have a status of "died" as that will indicate that Jaguar failed, which it almost certainly will again. :type max_retries: int :param max_retries: The queue's max_retries parameter :rtype: bool :return: True if the job should be retried, False if not """ job = self.getJob() if job and job.isComplete() and job.ExitStatus == "died": return False # don't retry the job (MATSCI-1020) return queue.JobControlJob.retryFailure(self, max_retries=max_retries)
[docs]def create_queue(options=None, host=None, **kwargs): """ Create a JobDJ job with some default values and the given keyword arguments Current defaults: - verbosty: normal - max_failures: NOLIMIT - max_retries: 3 :type options: argparse Namespace object :param options: the hostlist will be formed from the options.host property if not supplied by the host argument :type host: str :param host: The host string to use to create the queue All other keyword arguments will be passed on to the JobDJ object :rtype: `schrodinger.job.queue.JobDJ` :return: The JobDJ object """ if host != AUTOHOST: if not host: if hasattr(options, 'host') and options.host: host = options.host else: host = queue.LOCALHOST_ENTRY_NAME hostlist = jobcontrol.host_str_to_list(host) else: # This will cause JobDJ to figure out the host and processors from the # environment hostlist = None kwargs.setdefault('verbosity', 'normal') kwargs.setdefault('max_failures', queue.NOLIMIT) kwargs.setdefault('max_retries', 3) jobq = queue.JobDJ(hosts=hostlist, **kwargs) # JobDJ uses smart distribution by default, which runs a subjob on the same # host as the driver but not in the queue. This is done to keep avoid having # the driver queue slot idle. We can't do this if subjobs will occupy more # than one thread. tppval = getattr(options, 'TPP', None) if not tppval: tppval = 0 if tppval > 1: jobq.disableSmartDistribution() return jobq
[docs]def get_all_subjobs(job): """ Get all subjobs (and recursively all subjobs of subjobs) of the given job :param `jobcontrol.Job` job: The job to get subjobs of :rtype: list :return: A list of all subjobs stemming from the current job. Each item is a `jobcontrol.Job` object """ # Note - it is possible for some subjobs to be missed if they are running # on a remote machine under legacy job control. See the discussion in the RB # for MATSCI-10086 for additional details. subjobs = [] for sid in job.SubJobs: sjob = get_job_from_hub(sid) subjobs.append(sjob) subjobs.extend(get_all_subjobs(sjob)) return subjobs
[docs]def is_job_server_job(job): """ Check if the job is/was run under Job Server :param `jobcontrol.Job` job: The job to check :rtype: bool :return: Whether the job was run under job server """ return mmjob.mmjob_is_job_server_job(job.JobID)
[docs]def is_downloadable_job_server_job(job): """ Check if the job is/was run under Job Server and has not yet downloaded its files :param `jobcontrol.Job` job: The job to check :rtype: bool :return: Whether the job was run under job server and has not downloaded files """ return is_job_server_job(job) and not job.isDownloaded()
[docs]def get_job_from_hub(jobid): """ Get a job object for the given job id :param str jobid: The job id :rtype: `jobcontrol.Job` :return: The job object for this job id :raise: `jobhub.StdException` If there is no job found for jobid """ job_manager = jobhub.get_job_manager() return job_manager.getJob(jobid)
[docs]class JSFilePathData: """ Manage info for a file that belongs to a currently running job server job The object has attributes for the associated job and file name. Attributes requests not found on this class will be passed on to the pathlib.Path file name attribute. """
[docs] def __init__(self, path, job): """ Create a JSFilePathData object :param str path: The name of the file :param `jobcontrol.Job` job: The job this file is associated with """ # Note - it would be far more clear just to add a job attribute to the # pathlib.Path object and avoid this class altogether, but Path objects # do not allow adding arbitrary attributes. self.path = pathlib.Path(path) self.job = job
def __repr__(self): return self.job.JobId + ':' + str(self.path) def __getattr__(self, attribute): """ Pass on any unknown attribute requests to the path attribute :param str attribute: The requested attribute :raise AttributeError: If the attribute is not found on this object or on the path object """ try: return getattr(self.path, attribute) except AttributeError: raise AttributeError(f'{self.__class__.__name__} has no attribute' f' {attribute}')
[docs]class FileDownloadError(Exception): """ Raised by FileDownloader for any error """
[docs]class FileDownloader: """ Manage retrieving file information from a running Job Server job and downloaded associated files """
[docs] def __init__(self): """ Create a FileDownloader instance """ # The system process communicating with Job Server self.process = None # An error message describing any error that occurred self.error = None # The path to the most recent stdout files created by this class self.out_filename = None # The path to any stdout files created by this class self.out_filenames = [] # The files available for download from the server. Each item is a # JSFilePathData object self.available_filenames = [] # Any temp directories that should be removed by this class when temp # files are removed. Must be added manually by calling code self.temp_directories = [] # If this class is used from Maestro or a stand-alone GUI, try to # clean up any temp files it creates when the process shuts down. app = QtCore.QCoreApplication.instance() if app: app.aboutToQuit.connect(self.cleanFiles)
[docs] @staticmethod def interpretJobServerError(msg, checkhost=True): """ Form a more user-friendly error message for some known Job Server errors :param str msg: The Job Server error message :param bool checkhost: Whether to try to extract the host name from the message :rtype: str :return: A modified error message, or the original message if no modification is found """ def _get_host_name_in_line(line): if not all(x in line for x in '[@:]'): return None, None # Expects a line with format: [job server @ localhost:41789]... chunk = line.split('@ ')[1] host = chunk.split(':')[0] rest = chunk.split(']')[1] return host, rest # Job server error messages have one line per server, with each line # giving the error for that server. We reduce the amount of technical # jargon in the messages to just a short line per server. new_msg = "" lookup = 'lookup' for line in msg.split('\n'): if checkhost: host, rest = _get_host_name_in_line(line) if not host: new_msg += line + '\n' continue new_msg += host + ': ' else: rest = line if 'connection error' in rest and lookup in rest: # Error messages such as: # [job server @ boltsub3.schrodinger.com:8030] rpc error: code = # Unavailable desc = connection error: desc = "transport: Error # while dialing dial tcp: lookup boltsub3.schrodinger.com on # 127.0.0.53:53: no such host" new_msg += 'Unable to contact host\n' elif 'desc =' in rest: # Error messages such as: # [job server @ boltsub3.schrodinger.com:8030] rpc error: code = # Unknown desc = 'bob' is not a valid job server JobId tokens = rest.split('=') new_msg += tokens[-1].strip() + '\n' else: new_msg += rest + '\n' new_msg = new_msg.rstrip('\n') return new_msg
[docs] def listAvailableFiles(self, job, subjobs=True, wait=False): """ Get file names available on the server for the given job :param `jobcontrol.Job` job: The job to get file names for :param bool subjobs: Also get file names for subjobs of the given job :param bool wait: If True, wait until the process finishes and return the file info. If False, start a process to retrieve the file names and return. The calling function will need to call the parseAvailableFiles method when the process completes. Note that using wait=True can freeze a GUI for a long time until the network process times out. See the jobdirdlg.InteractiveFileDownloader class. :rtype: list or None :return: If wait is False, the process attribute is the running process. If wait is True, a list of `JSFilePathData` objects is returned for each file found on the server. :raise `FileDownloadError`: if an error occurs while retrieving names """ self.available_filenames = [] self.process = None # Gather a list of all jobs to get file names for all_jobs = [job] if subjobs: all_jobs.extend(get_all_subjobs(job)) # Form the command to get the file names written to a json file ids = {x.JobId: x for x in all_jobs} cmd = ['jsc', 'list-files', '-json'] cmd.extend(ids.keys()) tdir = fileutils.get_directory_path(fileutils.TEMP) temp_file = tempfile.NamedTemporaryFile(suffix='.json', dir=tdir, delete=False) self.out_filename = temp_file.name self.out_filenames.append(self.out_filename) # Start the process self.process = subprocess.Popen(cmd, stdout=temp_file, stderr=subprocess.PIPE, text=True) if not wait: return # Wait for the process to finish and check for errors code = self.process.wait() if code: self.raiseProcessError() # Parse the output data return self.parseAvailableFiles()
[docs] def raiseProcessError(self): """ Raise an exception with the stderr text from the current process :raise `FileDownloadError`: Raised with stderr output from the process """ msg = self.process.stderr.read() raise FileDownloadError(self.interpretJobServerError(msg))
[docs] def parseAvailableFiles(self): """ Parse the available file data from a process started by get_files_available_on_server :rtype: list :return: Each item of the list is a `JSFilePathData` object for a file found on the server :raise RuntimeError: If no process has been started :raise `FileDownloadError`: If unable to parse the data """ if not self.process: raise RuntimeError('There is no process output to parse') with open(self.out_filename, 'r') as jfile: data = json.load(jfile) errors = {} # Data is a list of dictionaries. Each dictionary contains the data # for one job. The keys in that dictionary are created in a Go # program and not available as constants anywhere accessible to Python for jobdata in data: jobid = jobdata['jobId'] jerrors = jobdata['errors'] # Either errors is None, downloadableFiles is None, or both are. # There is no case where errors is populated if downloadableFiles # were found. if jerrors: msg = "" # There will be one error per certified server for jerror in jerrors: # Form the server host name - the JC team says that # serverAddress will always be present for errors. msg += jerror['serverAddress'].split(':')[0] + ': ' # Make a friendly error message for this server msg += self.interpretJobServerError(jerror['userMessage'], checkhost=False) msg += '\n' errors[jobid] = msg else: # No errors occurred try: this_job = get_job_from_hub(jobid) except jobhub.StdException: # Unclear if this can actually happen, here for safety errors[jobid] = f'Unable to convert {jobid} to a job' continue for name in jobdata['downloadableFiles']: self.available_filenames.append( JSFilePathData(name, this_job)) if errors and not self.available_filenames: # There could in theory be a bunch of completely different errors # for every subjob. However, it's more likely there's a bunch of # subjobs with the same error, so only show one rather than # innundate the user with messages for error in errors.values(): numerr = len(errors) jobp = 'job' if numerr == 1 else 'jobs' msg = ('Errors occurred obtaining the list of files for ' f'{numerr} {jobp}. Example error is:\n{error}') raise FileDownloadError(msg) return self.available_filenames
[docs] def downloadJobFileToTemp(self, job, filename, temp_path=None, wait=False): """ Tail the given file on the server to a local file. This does not count as "downloading" the file in Job Server terms (i.e. the file will still be downloaded to the job directory at the end of the job if requested by Job Server settings). It is up to the calling code to remove the temporary file created by this method. :param `jobcontrol.Job` job: The job the file is associated with :param str filename: The name of the file :param str temp_path: The path to the file to write. If not given, a temp path will be created. :param bool wait: If wait is True, wait for the file to download and return the name of the temp file. If wait is False, start the download process and return None while the process runs. Note that using wait=True can freeze a GUI for a long time until the network process times out. See the jobdirdlg.InteractiveFileDownloader class :rtype: str or None :return: If wait is False, the process attribute is the running process and None is returned. If wait is True, the full path of the file created is returned. :raise `FileDownloadError`: if an error occurs while downloading """ self.process = None # Determine the file path if temp_path: temp_file = open(temp_path, 'w') else: ext = fileutils.get_file_extension(filename) tdir = fileutils.get_directory_path(fileutils.TEMP) temp_file = tempfile.NamedTemporaryFile(suffix=ext, dir=tdir, delete=False) self.out_filename = temp_file.name self.out_filenames.append(self.out_filename) # Create the command and start the process cmd = ['jsc', 'tail-file', '--name', filename, job.JobID] self.process = subprocess.Popen(cmd, stdout=temp_file, stderr=subprocess.PIPE, text=True) if not wait: return # Check that the process completed successfully code = self.process.wait() if code: self.raiseProcessError() return self.out_filename
[docs] def cleanFiles(self): """ Remove any files created by this downloader """ for afile in self.out_filenames: fileutils.force_remove(afile) self.out_filenames = [] self.out_filename = None for adir in self.temp_directories: fileutils.force_rmtree(adir, ignore_errors=True)
[docs]def register_driver_log(job_builder, default_job_name): """ Register the driver log file upfront in the given job builder specfication. :type job_builder: `launchapi.JobSpecificationArgsBuilder` :param job_builder: job specification builder object :type default_job_name: str :param default_job_name: the default job name to fall back on """ job_name = get_jobname(default_job_name) log_file_name = f'{job_name}{textlogger.DRIVER_LOG_EXT}' job_builder.setOutputFile(log_file_name, stream=True)