Source code for schrodinger.application.matsci.genetic_optimization

"""
Classes and functions for the genetic optimization module.

Copyright Schrodinger, LLC. All rights reserved."""

# Contributor: Thomas F. Hughes

import ast
import glob
import logging
import math
import ntpath
import os
import re
import shutil
import string
import sys
import tarfile
import traceback
from collections import namedtuple
from functools import reduce

import numpy
from pyevolve import Consts
from pyevolve import GenomeBase
from pyevolve import GSimpleGA
from pyevolve import Scaling
from pyevolve import Selectors
from pyevolve import Util
from scipy import constants

import schrodinger.structure as structure
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.macromodel.input import ConfSearch
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import optoelectronics
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import rxn_path
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import rxn_channel
from schrodinger.infra import mm
from schrodinger.job import queue
from schrodinger.job.util import hunt
from schrodinger.structure import workflow_action_menu as wam
from schrodinger.structutils import analyze
from schrodinger.structutils import build
from schrodinger.structutils import color
from schrodinger.structutils import transform
from schrodinger.utils import fileutils
from schrodinger.utils import imputils
from schrodinger.utils import subprocess

# MATSCI-5160 - in contrast to the random module
# the methods in the numpy.random module are consistent
# between Python 2 and 3
my_random = numpy.random.RandomState()

PYEVOLVE_LOG_EXT = '-pyevolve.log'

IN_MAE_EXT = '-in.mae'
OUT_MAE_EXT = '-out.mae'

BOND_CROSSOVER = 'bond'
CROSSOVER_CHOICES = [BOND_CROSSOVER]
DEFAULT_CROSSOVERS = [BOND_CROSSOVER]

ELEMENTAL_MUTATOR = 'elemental'
FRAGMENT_MUTATOR = 'fragment'
ISOELECTRONIC_MUTATOR = 'isoelectronic'
MUTATOR_CHOICES = [ELEMENTAL_MUTATOR, FRAGMENT_MUTATOR, ISOELECTRONIC_MUTATOR]
DEFAULT_MUTATORS = [FRAGMENT_MUTATOR, ISOELECTRONIC_MUTATOR]

GENERATIONS = 10
POPULATION = 8

CROSSOVER_RATE = 90.0
MUTATION_RATE = 90.0

RANK_SELECTION = 'rank'
ROULETTE_WHEEL_SELECTION = 'roulette_wheel'
TOURNAMENT_SELECTION = 'tournament'
TOURNAMENT_SELECTION_WITH_ROULETTE = 'tournament_with_roulette'
UNIFORM_SELECTION = 'uniform'
SELECTION_DICT = {
    RANK_SELECTION: Selectors.GRankSelector,
    ROULETTE_WHEEL_SELECTION: Selectors.GRouletteWheel,
    TOURNAMENT_SELECTION: Selectors.GTournamentSelectorAlternative,
    TOURNAMENT_SELECTION_WITH_ROULETTE: Selectors.GTournamentSelector,
    UNIFORM_SELECTION: Selectors.GUniformSelector
}
SELECTION_CHOICES = [RANK_SELECTION, ROULETTE_WHEEL_SELECTION, \
    TOURNAMENT_SELECTION, TOURNAMENT_SELECTION_WITH_ROULETTE, UNIFORM_SELECTION]
DEFAULT_SELECTION = ROULETTE_WHEEL_SELECTION

TOURNAMENT_SIZE = 2

UNPRODUCTIVE_TERM = 'unproductive'
FIRST_PROPERTY_TERM = 'first_property'
ALL_PROPERTIES_TERM = 'all_properties'
MAX_GENERATIONS_TERM = 'max_generations'
TERM_CHOICES = [UNPRODUCTIVE_TERM, FIRST_PROPERTY_TERM, ALL_PROPERTIES_TERM, \
    MAX_GENERATIONS_TERM]
DEFAULT_TERMS = [UNPRODUCTIVE_TERM, ALL_PROPERTIES_TERM]

NUM_UNPRODUCTIVE = 6

LINEAR_SCALING = 'linear'
POWER_LAW_SCALING = 'power_law'
SIGMA_TRUNCATION_SCALING = 'sigma_truncation'
BOLTZMANN_SCALING = 'boltzmann'
SCALING_DICT = {
    LINEAR_SCALING: Scaling.LinearScaling,
    POWER_LAW_SCALING: Scaling.PowerLawScaling,
    SIGMA_TRUNCATION_SCALING: Scaling.SigmaTruncScaling,
    BOLTZMANN_SCALING: Scaling.BoltzmannScaling
}
SCALING_CHOICES = [
    LINEAR_SCALING, POWER_LAW_SCALING, SIGMA_TRUNCATION_SCALING,
    BOLTZMANN_SCALING
]
DEFAULT_SCALING = SIGMA_TRUNCATION_SCALING
ALLOWS_NEGATIVE_SCORES = [SIGMA_TRUNCATION_SCALING, BOLTZMANN_SCALING]

ELITISM = 1

RANDOM_SEED = None
RANDOM_INT_BOUND = 1000000

NO_MINIMIZE = False

INDIVIDUAL_KEY = 'i_matsci_individual_index'
GENERATION_KEY = 'i_matsci_generation'
STRUCTURE_SCORE_KEY = 'r_matsci_structure_score'  # this is a type of raw score
RAW_SCORE_KEY = 'r_matsci_raw_score'
FIT_SCORE_KEY = 'r_matsci_fit_score'
SKIP_KEY = 'b_matsci_skipped'
FAILURE_KEY = 'b_matsci_failed'

DEFAULT_EVAL_KWARGS = {}

ORGANIC = 'organic'
N_HETEROCYCLES = 'N-heterocycles'
O_HETEROCYCLES = 'O-heterocycles'
S_HETEROCYCLES = 'S-heterocycles'
MIXED_HETEROCYCLES = 'Mixed-heterocycles'
COMBIGLIDE_DEFAULT = 'combiglide_default'
OPTOELECTRONICS = 'optoelectronics'
ALL = 'all'
MMSHARE_MAIN_DATA = hunt('mmshare', dir='data')
FRAGMENT_LIBS = {
    ORGANIC: os.path.join(MMSHARE_MAIN_DATA, 'res', 'organic.bld'),
    N_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
                                 'N-heterocycles.bld'),
    O_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
                                 'O-heterocycles.bld'),
    S_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
                                 'S-heterocycles.bld'),
    MIXED_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
                                     'Mixed-heterocycles.bld'),
    COMBIGLIDE_DEFAULT: os.path.join(MMSHARE_MAIN_DATA, 'cg',
                                     'interactive_palettes',
                                     'default_palette.mae'),
    OPTOELECTRONICS: os.path.join(MMSHARE_MAIN_DATA, 'genetic_optimization',
                                  'optoelectronics.mae')
}
FRAGMENT_LIBS_DEFAULT = [OPTOELECTRONICS]

ENTRY_NAME_KEY = 's_m_entry_name'

GROW_NAME_KEY = 's_m_grow_name'
PDB_ATOM_NAME_KEY = 's_m_pdb_atom_name'
PDB_RES_NAME_KEY = 's_m_pdb_residue_name'

CROSSOVER_PARENTS_KEY = 's_matsci_crossover_parents'
CROSSOVER_APPLIED_KEY = 's_matsci_crossover_applied'
MUTATION_PARENT_KEY = 's_matsci_mutation_parent'
MUTATION_APPLIED_KEY = 's_matsci_mutation_applied'

EVALUATOR_JOBS_DIR = 'evaluator_jobs'
GENER_SUBDIR = 'generation_'

NUM_DECIMAL = '%.2f'
FIELD_WIDTH = 10

# this is used to handle a ZeroDivisionError
INFINITE_SCORE = 1000000000

# allow setting a bad score for both failed and skipped jobs
BAD_SCORE = -10000000

RXN_KEY = rxn_channel.Products.RXN_REPRESENTATION_KEY

SMALL_CHILD_FREQ = 0.75
SIMILAR_STDEV_CHILDREN_FREQ = 0.75

FILE_BASE_NAME = 'genopt'

NO_OPEN_SHELL = False

TERM_THRESH = 0.0001

# penalize data
NONE = 'none'
ATYPICAL = 'atypical'
ATYPICAL_PATTERNS = dict([
    ('three-atom or longer chains containing N, O, P, and S', \
        '[$([#7,#8,#15,#16][#7,#8,#15,#16][#7,#8,#15,#16])]'),
    ('two or more different halogens', 'F.Cl_F.Br_F.I_Cl.Br_Cl.I_Br.I')
])
ATYP_MW_TARGET = 1100
ATYP_MW_WEIGHT = 1.0
ATYP_NAT_TARGET = 200
ATYP_NAT_WEIGHT = 1.0
ATYP_NEL_TARGET = 5
ATYP_NEL_WEIGHT = 15
ATYPICAL_PROPS = [
    'name=natoms target=%d comparator=lt weight=%.1f' %
    (ATYP_NAT_TARGET, ATYP_NAT_WEIGHT),
    'name=molecular_weight target=%d comparator=lt weight=%.1f' %
    (ATYP_MW_TARGET, ATYP_MW_WEIGHT),
    'name=nelements target=%d comparator=lt weight=%.1f' %
    (ATYP_NEL_TARGET, ATYP_NEL_WEIGHT),
    'name=smarts patterns=%s minimax=min weight=15.0' % \
        '_'.join(ATYPICAL_PATTERNS.values())
]
DIHYDROGEN = 'dihydrogen'
DIHYDROGEN_PATTERNS = dict([('dihydrogen molecule', '[H][H]')])
DIHYDROGEN_PROPS = ['name=smarts patterns=%s minimax=min weight=100.0' % \
    '_'.join(list(DIHYDROGEN_PATTERNS.values()))]
PENALIZE_PROTOCOLS = dict([(ATYPICAL, (ATYPICAL_PATTERNS, ATYPICAL_PROPS)),
                           (DIHYDROGEN, (DIHYDROGEN_PATTERNS, DIHYDROGEN_PROPS))
                          ])
PENALIZE_CHOICES = [NONE, ATYPICAL, DIHYDROGEN]
PENALIZE_DEFAULT = [ATYPICAL, DIHYDROGEN]

# base evaluations of structures with structure scores below this threshold
# will be skipped
STRUCTURE_SCORE_THRESHOLD = -50.0

# list of file extensions, files created by the base evaluator with these
# extensions will be copied back to the launch host as log files
EXTS_TO_RETURN = ['.spm']

STOICH_BASE_EXT_SEP = '.'

CONFORMATIONAL_SEARCH = False
CONF_SEARCH_FAILED_KEY = 'b_matsci_Conf_Search_Failed'

# see MATSCI-1988 and MMOD-1809, zero is reserved
CONF_SEARCH_SEED_RANGE = [1, 78593]

GENER_LOG_FILE_SEP = '-'

REMAINDER = 'remainder'
PREVIOUS = 'previous'
FRAGMENT = 'fragment'
FREEZER_CHOICES = [REMAINDER, PREVIOUS, FRAGMENT]
FREEZERS_DEFAULT = [REMAINDER, PREVIOUS]

FROM_FREEZER_KEY = 's_matsci_From_Freezer'

REMAINDER_FILE_EXT = '-remainder.mae'

NO_CHILD = 'no_child'
BAD_STRUCTURE = 'bad_structure'
INOCULATE_CHOICES = [NONE, NO_CHILD, BAD_STRUCTURE]
INOCULATE_DEFAULT = [NO_CHILD, BAD_STRUCTURE]

INOCULATE_KEY = 's_matsci_Inoculate'

TRIES_FROM_LIBS = 3
TRIES_FRAGMENT_MUTATOR = 3

EVALUATOR_RELATIVE_DIR = [os.curdir] + 3 * [os.pardir]

CM3_PER_BOHR3 = (100 * constants.physical_constants['Bohr radius'][0])**3
VACUUM_PERMITTIVITY_AU = 1. / (4 * constants.pi)  # atomic units

PropertyInfo = namedtuple(
    'PropertyInfo',
    ['key', 'units', 'is_positive', 'class_evaluator', 'class_kwargs'])

EV = 'eV'
NM = 'nm'
NM_X_INTENSITY = 'nm*Intensity'
ANGSTROM = 'Ang.'

NAME_PATTERN = re.compile(r'(name=){1}(\w+)')

OXIDATION_PROP = optoelectronics.OXIDATION
REDUCTION_PROP = optoelectronics.REDUCTION
HOLE_PROP = optoelectronics.HOLE_REORG
ELECTRON_PROP = optoelectronics.ELECTRON_REORG
TRIPLET_BASE_PROP = optoelectronics.TRIPLET
TRIPLET_PROP = TRIPLET_BASE_PROP
TRIPLET_RMSD_PROP = TRIPLET_BASE_PROP + '_rmsd'
TRIPLET_REORG_PROP = optoelectronics.TRIPLET_REORG
SPECTRUM_BASE_PROP = optoelectronics.WINDOW
SPECTRUM_LMAX_PROP = SPECTRUM_BASE_PROP + '_lmax'
SPECTRUM_RAREA_PROP = SPECTRUM_BASE_PROP + '_rarea'
SPECTRUM_GAREA_PROP = SPECTRUM_BASE_PROP + '_garea'
SPECTRUM_BAREA_PROP = SPECTRUM_BASE_PROP + '_barea'
TADF_BASE_PROP = optoelectronics.TADF  # TADF is thermally activated delayed fluorescence
TADF_T1_PROP = TADF_BASE_PROP
TADF_T2_PROP = TADF_BASE_PROP.replace('t1', 't2')
TADF_T3_PROP = TADF_BASE_PROP.replace('t1', 't3')
FL_EMISSION_BASE_PROP = optoelectronics.FL_EMISSION
FL_EMISSION_EMAX_PROP = FL_EMISSION_BASE_PROP + '_emax'
FL_EMISSION_RAREA_PROP = FL_EMISSION_BASE_PROP + '_rarea'
FL_EMISSION_GAREA_PROP = FL_EMISSION_BASE_PROP + '_garea'
FL_EMISSION_BAREA_PROP = FL_EMISSION_BASE_PROP + '_barea'
FL_EMISSION_STOKES_SHIFT_PROP = FL_EMISSION_BASE_PROP + '_stokes_shift'


def _monomer_evaluate_smarts_canvas(st, pattern):
    """
    Wrapper for doing Canvas SMARTS evaluation of polymer monomers.

    :type st: schrodinger.structure.Structure
    :param st: the structure

    :type pattern: str
    :param pattern: the pattern

    :rtype: list
    :return: contains lists of matching indices
    """

    # not sure if the following is a bug or a feature but it is undesirable
    # for this application, it is observed that if the given structure
    # is a polymer monomer containing head and tail atoms marked with
    # msprops.ROLE_PROP and the given pattern matches the tail
    # atom then if there is a hydrogen bonded to the tail atom it will
    # also be picked up as a match even though it is not a real match,
    # fix it by passing a copy of the structure without the properties
    stp = st.copy()
    rxn_channel.remove_grow_properties(stp)
    return analyze.evaluate_smarts_canvas(stp, pattern)


[docs]class ClassEvaluator: """ Manage a class evaluator. """ SEPARATOR = '_' HOST_STR = 'host_str'
[docs] def __init__(self, structs, properties): """ Create an instance. :type structs: list of schrodinger.structure.Structure :param structs: contains input structures :type properties: list :param properties: contains Property """ self.structs = structs self.properties = properties self.job_dj = self.getQueue(self.properties)
[docs] def getBaseName(self, struct, aproperty): """ Get the base name. :type struct: schrodinger.structure.Structure :param struct: the structure :type aproperty: Property :param aproperty: the property :rtype: str :return: the base name """ return struct.title + self.SEPARATOR + aproperty.name
[docs] def runIt(self): """ Run it. :raise RuntimeError: for any issue """
# needs to be reimplemented, raise a RuntimeError for any issue
[docs] def setQueue(self, job_dj): """ Set a JobDJ to run jobs. :type job_dj: `schrodinger.job.queue.JobDJ` :param job_dj: the queue """ self.job_dj = job_dj
[docs] def getQueue(self, properties): """ Return a JobDJ to run jobs. :type properties: list :param properties: contains Property :rtype: JobDJ :return: the queue """ job_dj = getattr(self, 'job_dj', None) if job_dj: return job_dj # the -HOST string has been saved in each of the properties # class kwargs because the -HOST string of the main GO job # will have been removed from the command line by the time # the tasks infrastructure starts the parallel tasks, these # properties will always have the same -HOST string so just # use the first property, the number of simultaneous # subjobs, aka the number of processors, in the -HOST string # is used for both (1) the number of parallel tasks (which is # the number of molecules that are simultaneously scored) and (2) # the number of simultaneous subjobs of the JobDJ available to # each task or molecule which is used to run the subjobs needed # to compute the molecule's score, the host in the -HOST string # sets only the host used for the JobDJ in (2) since tasks # always run on localhost if not properties: return None host_str = properties[0].class_kwargs[self.HOST_STR] return jobutils.create_queue(host=host_str)
[docs] @staticmethod def getHostStr(host_str=None): """ Return the host string. :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: str :return: the host string """ return host_str or f'{queue.LOCALHOST_ENTRY_NAME}:1'
[docs]class StructureEvaluator(ClassEvaluator): """ Manage structure evaluation. """ SMARTS_PATTERN_SEPARATOR = '_' SMARTS_PROP = 'smarts' MOL_WEIGHT_PROP = 'molecular_weight' NATOMS_PROP = 'natoms' NELEMENTS_PROP = 'nelements' PROPERTIES = {SMARTS_PROP, MOL_WEIGHT_PROP, NATOMS_PROP, NELEMENTS_PROP} SMARTS_KEY = 'i_matsci_SMARTS_property_%s' MOL_WEIGHT_KEY = 'r_m_Molecular_weight' NATOMS_KEY = 'i_m_Number_of_atoms' NELEMENTS_KEY = 'i_m_Number_of_elements' NO_UNITS = 'None' MOL_WEIGHT_UNITS = 'g/mol' PATTERNS = 'patterns'
[docs] def __init__(self, structs, properties): """ See parent class for documentation. """ super(StructureEvaluator, self).__init__(structs, properties) self.structure_properties = [ aproperty for aproperty in self.properties if aproperty.name in self.PROPERTIES ]
[docs] @staticmethod def getInfo(key, units, patterns=None, host_str=None): """ Return a PropertyInfo. :type key: str :param key: the property key :type units: str :param units: the property units :type patterns: list :param patterns: the SMARTS patterns :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: PropertyInfo :return: the property information """ host_str = ClassEvaluator.getHostStr(host_str=host_str) obj = StructureEvaluator class_kwargs = {} if patterns: class_kwargs[obj.PATTERNS] = patterns class_kwargs[obj.HOST_STR] = host_str return PropertyInfo(key=key, units=units, is_positive=True, class_evaluator=obj, class_kwargs=class_kwargs)
[docs] def runIt(self): """ Run it. """ for struct in self.structs: for aproperty in self.structure_properties: if aproperty.name == StructureEvaluator.SMARTS_PROP: value = sum( len(_monomer_evaluate_smarts_canvas(struct, pattern)) for pattern in aproperty.class_kwargs[self.PATTERNS]) elif aproperty.name == StructureEvaluator.MOL_WEIGHT_PROP: value = struct.total_weight elif aproperty.name == StructureEvaluator.NATOMS_PROP: value = struct.atom_total elif aproperty.name == StructureEvaluator.NELEMENTS_PROP: value = len(get_element_histogram(struct)) struct.property[aproperty.key] = value
[docs]class ChemInformatics(ClassEvaluator): """ Manage cheminformatics jobs. """ UNITS = 'unknown' PROP_BASE_NAME = None CHECK_MSG = '' OUT_EXT = '.mae'
[docs] def __init__(self, structs, properties): """ See parent class for documentation. """ super().__init__(structs, properties) self.cheminfo_properties = [ aproperty for aproperty in self.properties if aproperty.class_kwargs.get(self.MODEL_OPTION) ] self.xtra_infiles = []
[docs] @classmethod def getInfo(cls, name, units, model_file, tpp=1, host_str=None): """ Return a PropertyInfo. :type cls: type :param cls: the calling class :type name: str :param name: the property name :type units: str :param units: the property units :type model_file: str :param model_file: the model file :type tpp: int :param tpp: the threads per process :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: PropertyInfo :return: the property information """ host_str = super().getHostStr(host_str=host_str) if units is None: units = cls.UNITS key = cls.KEY % (name, units) class_kwargs = {} class_kwargs[cls.MODEL_OPTION] = model_file class_kwargs[cls.HOST_STR] = host_str return PropertyInfo(key=key, units=units, is_positive=False, class_evaluator=cls, class_kwargs=class_kwargs)
[docs] def getModelFile(self, aproperty): """ Return the model file from the given property. :type aproperty: Property :param aproperty: the property :rtype: str :return: the model file """ model_path = aproperty.class_kwargs[self.MODEL_OPTION] return os.path.basename(model_path)
[docs] def copyModelFiles(self): """ Copy the model files to the CWD. """ for aproperty in self.cheminfo_properties: model_path = aproperty.class_kwargs[self.MODEL_OPTION] model_infile = self.getModelFile(aproperty) if not os.path.exists(model_infile): shutil.copy(model_path, model_infile)
[docs] def makeMaestroInfile(self, struct, aproperty): """ Make Maestro infile. :type struct: schrodinger.structure.Structure :param struct: the structure :type aproperty: Property :param aproperty: the property :rtype: str :return: the Maestro input file name """ mae_infile = self.getBaseName(struct, aproperty) + IN_MAE_EXT struct.write(mae_infile) return mae_infile
[docs] def getPropertyValue(self, property_outfile, prop_key=None): """ Get the property value. :type property_outfile: str :param property_outfile: the property output file :type prop_key: str or None :param prop_key: if the property output file is a Maestro file then this specifies the structure property key :raise RuntimeError: if property output file doesn't exist or doesn't contain the property value :rtype: float :return: the property value """ if not os.path.exists(property_outfile): msg = (f'No {self.LABEL} property output file named ' f'{property_outfile} found.') raise RuntimeError(msg) if prop_key: st = structure.Structure.read(property_outfile) value = st.property.get(prop_key) if value is None: msg = ('The property value can not be found in the ' f'{self.LABEL} property output file.') raise RuntimeError(msg) return value
[docs] def generateExtraInput(self, struct): """ Generate any extra input files needed for the actual property evaluation. :type struct: schrodinger.structure.Structure :param struct: the structure """ pass
[docs] def getCmd(self, mae_infile, model_infile, prop_name, xtra_infile, property_outfile, job_name): """ Get the command line. :type mae_infile: str :param mae_infile: Maestro file containing the structure :type model_infile: str :param model_infile: the cheminformatics model file :type prop_name: str :param prop_name: a property name possibly used to identify the property value :type xtra_infile: str :param xtra_infile: an additional input file :type property_outfile: str :param property_outfile: name of the output file containing the property value :type job_name: str :param job_name: the job name :rtype: list :return: the command line """ # In the most general case each cheminformatics job will at least need # (1) a structure, (2) a model file, (3) a property name, (4) an input # file for specifying the type of calculation, (5) an output file # containing the property prediction, and (6) a job name. This method # should return the command line for running the job. pass
[docs] def runPrediction(self, struct): """ Run the prediction. :type struct: schrodinger.structure.Structure :param struct: the structure :raise RuntimeError: if it fails """ property_outfiles = [] prop_keys = [] for idx, aproperty in enumerate(self.cheminfo_properties): mae_infile = self.makeMaestroInfile(struct, aproperty) model_infile = self.getModelFile(aproperty) prop_name = base_name = self.getBaseName(struct, aproperty) if self.xtra_infiles: xtra_infile = self.xtra_infiles[idx] else: xtra_infile = None property_outfile = f'{base_name}{ChemInformatics.OUT_EXT}' property_outfiles.append(property_outfile) if self.PROP_BASE_NAME: prop_keys.append( self.PROP_BASE_NAME.format(prop_name=prop_name)) else: prop_keys.append(None) job_name = f'{base_name}_{self.LABEL}' cmd = self.getCmd(mae_infile, model_infile, prop_name, xtra_infile, property_outfile, job_name) jc_job = jobutils.RobustSubmissionJob(cmd, name=job_name) jc_job.property_outfile = property_outfile self.job_dj.addJob(jc_job) self.job_dj.run() for aproperty, property_outfile, prop_key in zip( self.cheminfo_properties, property_outfiles, prop_keys): value = self.getPropertyValue(property_outfile, prop_key=prop_key) struct.property[aproperty.key] = value
[docs] def setUp(self): """ Do any necessary set up prior to the actual property prediction. """ pass
[docs] def runIt(self): """ Run it. """ self.copyModelFiles() self.setUp() for struct in self.structs: self.generateExtraInput(struct) self.runPrediction(struct)
[docs] @classmethod def getModelFiles(cls, property_lists): """ Return file names of any model files. :type property_lists: list :param property_lists: contains lists of property specifications :rtype: list :return: file names of the model files """ files = [] property_strings = Property.getPropertyStrings(property_lists) for property_string in property_strings: model_path = \ Property.getKwargs(property_string, cls.MODEL_OPTION) if model_path is not None: files.append(model_path) return files
[docs] @classmethod def checkModelFiles(cls, model_files, host_str=None): """ Check the given model files. :type model_files: list[str] :param model_files: the names of the model files :type host_str: str :param host_str: the host string, for example 'localhost:4' :raise RuntimeError: if there is an issue """ host_str = cls.getHostStr(host_str=host_str) no_exist_msg = ('The following model file that you have ' 'specified can not be found: %s.') bad_ext_msg = ('The following model file does not have a ' 'valid *%s extension: %s.') for model_file in model_files: if not os.path.exists(model_file): raise RuntimeError(no_exist_msg % model_file) if not model_file.endswith(cls.EXT): raise RuntimeError(bad_ext_msg % (cls.EXT, model_file)) st = structure.create_new_structure() st.title = 'validate' st.addAtom('C', 0.0, 0.0, 0.0) st.addAtom('C', 1.0, 0.0, 0.0) order = 1 st.addBond(1, 2, order) props = [] for model_file in model_files: base_name = fileutils.get_basename(model_file).split(cls.EXT)[0] class_kwargs = {} class_kwargs[cls.MODEL_OPTION] = os.path.abspath(model_file) class_kwargs[cls.HOST_STR] = host_str prop = Property(key=f'r_matsci_{base_name}', name=base_name, class_kwargs=class_kwargs) props.append(prop) obj = cls([st], props) schrod_tmp = fileutils.get_directory_path(fileutils.TEMP) validate_dir = fileutils.get_next_filename_prefix( os.path.join(schrod_tmp, "validate_model"), " ") os.mkdir(validate_dir) try: with fileutils.chdir(validate_dir): obj.runIt() except RuntimeError as err: fileutils.force_rmtree(validate_dir) model_files = ', '.join(model_files) msg = (f'{str(err)} One of the model files {model_files} might be ' f'invalid.{cls.CHECK_MSG}') raise RuntimeError(msg) fileutils.force_rmtree(validate_dir)
[docs] @staticmethod def getAllModelFiles(property_lists, check=False, host_str=None): """ Return file names of any model files. :type property_lists: list :param property_lists: contains lists of property specifications :type check: bool :param check: whether to check the model files :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: list :return: file names of the model files """ all_model_paths = [] for cls in [CanvasKPLS, AutoQSAR, DeepChem]: model_paths = cls.getModelFiles(property_lists) all_model_paths.extend(model_paths) if model_paths and check: cls.checkModelFiles(model_paths, host_str=host_str) return all_model_paths
[docs]class CanvasKPLS(ChemInformatics): """ Manage Canvas KPLS jobs. """ EXT = 'kpls.tar.gz' FP_TEXT_FILE = 'fpInfo.txt' MODEL_OPTION = 'kpls_model' KEY = 'r_matsci_KPLS_%s/%s' PATH = os.path.join(MMSHARE_MAIN_DATA, 'genetic_optimization', 'canvas_kpls_models') FP_EXT = '.fp' VALUE_PATTERN = re.compile(r'\s*1\s+unknown.*\s+(-?\d+\.?\d*)$') ALLOWED_FP_TYPES = [ 'linear', 'maccs', 'radial', 'molprint2D', 'torsion', 'pairwise', 'triplet', 'quartet', 'dendritic' ] # precision is either 32- or 64-bit SINGLE_PRECISION = 32 DOUBLE_PRECISION = 64 BIT_EXT = '-bit' LABEL = 'canvasKPLS' CHECK_MSG = (' Model files must correspond to a single property type ' 'and as input take fingerprint files featuring one of ' f'the following fingerprint types, {ALLOWED_FP_TYPES}, ' 'as well as either no atom type or one of the 12 known atom ' 'types (integers in [1, 12]). See ' '$SCHRODINGER/utilities/canvasFPGen -h for more details.')
[docs] def makeFingerPrintInfile(self, mae_infile, name, job_dj=None): """ Make fingerprint infile or add the job to do so to the given queue. :type mae_infile: str :param mae_infile: the Maestro input file name :type name: str :param name: the property name :type job_dj: None or `schrodinger.job.queue.JobDJ` :param job_dj: if given then add the fingerprint job to this queue and return :raise RuntimeError: if canvasFPGen fails :rtype: str :return: the Canvas fingerprint input file name """ def finalizer(job): if not os.path.isfile(job.fp_infile): raise RuntimeError('canvasFPGen failed') precision, fptype, atomtypes = self.fp_options_dict[name] basename = mae_infile.split(IN_MAE_EXT)[0] fp_infile = basename + self.FP_EXT job_name = f'{basename}_canvasFPGen' cmd = ['canvasFPGen'] if precision == self.DOUBLE_PRECISION: cmd += ['-xp'] cmd += ['-fptype', fptype] if atomtypes is not None: cmd += ['-atomtype', str(atomtypes)] cmd += ['-imae', mae_infile] cmd += ['-o', fp_infile] if job_dj: # This will run the canvas job in the backend, which we don't want # for subprocess cmd += ['-JOB', job_name] jc_job = jobutils.RobustSubmissionJob(cmd, name=job_name) jc_job.fp_infile = fp_infile jc_job.addFinalizer(finalizer) job_dj.addJob(jc_job) else: try: output = subprocess.check_output(cmd) except subprocess.CalledProcessError as err: raise RuntimeError('canvasFPGen failed: ' + str(err)) if not os.path.isfile(fp_infile): raise RuntimeError('canvasFPGen failed to produce output file') return fp_infile
[docs] def getPropertyValue(self, property_outfile, prop_key=None): """ See parent class. """ super().getPropertyValue(property_outfile, prop_key=prop_key) with open(property_outfile, 'r') as afile: for line in afile: match = re.match(self.VALUE_PATTERN, line) if match: value = float(match.groups()[0]) break else: value = None if value is None: msg = ('The property value can not be found in the ' f'{self.LABEL} property output file.') raise RuntimeError(msg) return value
[docs] def generateExtraInput(self, struct): """ See parent class. """ # here we need to create input Canvas fingerprint files which # are needed for the final property predictions for aproperty in self.cheminfo_properties: mae_infile = self.makeMaestroInfile(struct, aproperty) xtra_infile = self.makeFingerPrintInfile(mae_infile, aproperty.name, job_dj=self.job_dj) self.xtra_infiles.append(xtra_infile) self.job_dj.run()
[docs] def makeFpOptionsDict(self): """ Make the fingerprint options dictionary. """ self.fp_options_dict = {} for aproperty in self.cheminfo_properties: model_infile = self.getModelFile(aproperty) precision, fptype, atomtypes = CanvasKPLS.getFpOptions(model_infile) self.fp_options_dict[aproperty.name] = (precision, fptype, atomtypes)
[docs] def getCmd(self, mae_infile, model_infile, prop_name, xtra_infile, property_outfile, job_name): """ See parent class. """ cmd = ['canvasKPLS'] cmd += ['-infp', xtra_infile] cmd += ['-out', property_outfile] cmd += ['-test', '-unknown', '-imod', model_infile] cmd += ['-JOB', job_name] return cmd
[docs] def setUp(self): """ See parent class. """ self.makeFpOptionsDict()
[docs] @staticmethod def getFpOptions(model_file): """ Return fingerprint options obtained from the given Canvas KPLS model file. :type model_file: str :param model_file: the name of the Canvas KPLS model file :rtype: int, str, int or None :return: contains (1) precision, (2) fingerprint type, and (3) atom type if present :raise RuntimeError: if there is anything wrong with the Canvas KPLS model file """ # get contents of fingerprint information file if not tarfile.is_tarfile(model_file): msg = ('The Canvas KPLS model file, %s, is not ' 'a valid tar archive file.' % model_file) raise RuntimeError(msg) try: with tarfile.open(name=model_file) as tar: fh = tar.extractfile(CanvasKPLS.FP_TEXT_FILE) # see MATSCI-5226 where Python 3 bytes decoding is needed lines = [ line.decode('utf-8').strip('\n') for line in fh.readlines() ] fh.close() except KeyError: msg = ('The Canvas KPLS model file, %s, does not ' 'contain the fingerprint information file, %s.' % (model_file, CanvasKPLS.FP_TEXT_FILE)) raise RuntimeError(msg) # build fingerprint information dict fp_info_dict = {'path': lines[0]} for line in lines[1:]: if line.count('=') == 1: key, value = line.split('=') try: value = int(value) except ValueError: pass fp_info_dict[key] = value elif line.endswith(CanvasKPLS.BIT_EXT): fp_info_dict['precision'] = int( line.split(CanvasKPLS.BIT_EXT)[0]) else: fp_info_dict['properties'] = [ prop.strip() for prop in line.split(',') ] # get data fptype = fp_info_dict.get('type') atomtypes = fp_info_dict.get('code') precision = fp_info_dict.get('precision') props = fp_info_dict.get('properties', []) # check data msg = None error_msg = ('The Canvas KPLS model file, %s, can not be used ' 'for genetic optimization because something is wrong ' 'with the %s file. ' % (model_file, CanvasKPLS.FP_TEXT_FILE)) if fptype not in CanvasKPLS.ALLOWED_FP_TYPES: msg = error_msg + ('The fingerprint type, %s, is not one ' 'of the following supported types: %s.' % (fptype, CanvasKPLS.ALLOWED_FP_TYPES)) elif atomtypes is not None and not (1 <= atomtypes <= 12): msg = error_msg + ('The atom type, %s, is not one of ' 'the 12 standard types, i.e. in [1, 12].' % atomtypes) elif precision is None or precision not in [ CanvasKPLS.SINGLE_PRECISION, CanvasKPLS.DOUBLE_PRECISION ]: msg = error_msg + ('The precision, %s, is neither %s nor %s.' % (precision, CanvasKPLS.SINGLE_PRECISION, CanvasKPLS.DOUBLE_PRECISION)) elif len(props) != 1: msg = error_msg + ('More than a single property has been ' 'specified: %s.' % props) if msg: raise RuntimeError(msg) return precision, fptype, atomtypes
[docs]class AutoQSAR(ChemInformatics): """ Manage AutoQSAR jobs. """ EXT = 'qzip' MODEL_OPTION = 'auto_qsar_model' KEY = 'r_matsci_Auto_QSAR_%s/%s' PROP_BASE_NAME = 'r_autoqsar_Pred_{prop_name}' IN_EXT = '.inp' LABEL = 'autoqsar'
[docs] def writeAutoQSARInFile(self, mae_infile, model_infile, base_name, prop_name): """ Write the AutoQSAR input file. :type mae_infile: str :param mae_infile: the input Maestro file name :type model_infile: str :param model_infile: the input AutoQSAR model file name :type base_name: str :param base_name: the base name to use for the AutoQSAR input file :type prop_name: str :param prop_name: the property name :rtype: str :return: the name of the AutoQSAR input file """ file_name = f'{base_name}{self.IN_EXT}' with open(file_name, 'w') as afile: afile.write(f'LIGANDS_FILE {mae_infile}') afile.write(f'MODEL_FILE {model_infile}') afile.write('MODEL_SET -best,') afile.write(f'OUT_PRED_PROP_BASE {prop_name}') return file_name
[docs] def generateExtraInput(self, struct): """ See parent class. """ # here we need to create AutoQSAR input files which # are needed for the final property predictions for aproperty in self.cheminfo_properties: mae_infile = self.makeMaestroInfile(struct, aproperty) model_infile = self.getModelFile(aproperty) prop_name = base_name = self.getBaseName(struct, aproperty) xtra_infile = self.writeAutoQSARInFile(mae_infile, model_infile, base_name, prop_name) self.xtra_infiles.append(xtra_infile)
[docs] def getCmd(self, mae_infile, model_infile, prop_name, xtra_infile, property_outfile, job_name): """ See parent class. """ cmd = ['autoqsar', model_infile, '-test'] cmd += ['-i', mae_infile] cmd += ['-pred', property_outfile] cmd += ['-best', '-unknown'] cmd += ['-prop', prop_name] cmd += ['-JOB', job_name] return cmd
[docs]class DeepChem(ChemInformatics): """ Manage DeepChem jobs. """ EXT = 'qzip' MODEL_OPTION = 'deepchem_model' KEY = 'r_matsci_DeepChem_%s/%s' PROP_BASE_NAME = 'r_m_{prop_name}_score' LABEL = 'deepchem'
[docs] def getCmd(self, mae_infile, model_infile, prop_name, xtra_infile, property_outfile, job_name): """ See parent class. """ cmd = ['ligand_ml_driver.py', model_infile, '-predict'] cmd += ['-i', mae_infile] cmd += ['-pred', property_outfile] cmd += ['-prop', prop_name] cmd += ['-JOB', job_name] return cmd
[docs]class Jaguar(ClassEvaluator): """ Manage Jaguar jobs. """ JAGUAR_OPTIONS = 'jaguar_options' TPP = 'tpp' JAGUAR_OUTPUT_ATTR = 'jaguar_output_attr' IN_EXT = '.in' OUT_EXT = '.out'
[docs] def __init__(self, structs, properties): """ See parent class for documentation. """ super(Jaguar, self).__init__(structs, properties) self.jaguar_properties = [ aproperty for aproperty in self.properties if aproperty.class_kwargs.get(self.JAGUAR_OPTIONS) ]
[docs] def loadQueue(self, job_dj): """ Load the JobDJ with jobs. :type job_dj: JobDJ :param job_dj: the queue """ base_cmd = ['jaguar', 'run'] for struct in self.structs: kwargs = {'structure': struct} for aproperty in self.jaguar_properties: ji = JaguarInput(**kwargs) ji.setValues(aproperty.class_kwargs[self.JAGUAR_OPTIONS]) ji.setStructure(struct, set_molchg_multip=False) if ji.sectionDefined('valid'): ji.deleteSection('valid') base_name = self.getBaseName(struct, aproperty) jag_in_file = base_name + self.IN_EXT ji.saveAs(jag_in_file) cmd = list(base_cmd) tpp = aproperty.class_kwargs.get(self.TPP, 1) if tpp > 1: cmd.extend([parserutils.FLAG_TPP, str(tpp)]) cmd.append(jag_in_file) jc_job = jobutils.RobustSubmissionJob(cmd, name=base_name) job_dj.addJob(jc_job)
[docs] def postProcess(self, silent=False): """ Post process the jobs and set the final results. :type silent: bool :param silent: if True, don't raise. :raise RuntimeError: if there is a problem """ for struct in self.structs: for aproperty in self.jaguar_properties: jag_out_file = self.getBaseName(struct, aproperty) + Jaguar.OUT_EXT if not os.path.exists(jag_out_file): if silent: continue msg = ('Jaguar job output file can not be found.') raise RuntimeError(msg) jo = JaguarOutput(jag_out_file) if jo.status != JaguarOutput.OK or jo.fatal_error: if silent: continue if jo.fatal_error: err = jo.fatal_error.strip().replace('\n', '') else: err = str(jo.status) msg = (f'Jaguar job died with the error: {err}.') raise RuntimeError(msg.format()) attr = aproperty.class_kwargs.get(self.JAGUAR_OUTPUT_ATTR) if attr: struct.property[aproperty.key] = getattr(jo, attr)
[docs] def runIt(self, silent=False): """ Run it. :type silent: bool :param silent: if True, don't raise. :raise RuntimeError: if there is a problem """ self.loadQueue(self.job_dj) self.job_dj.run() if not silent and (not self.job_dj.isComplete() or self.job_dj.total_failed): msg = ('Jaguar job has failed.') raise RuntimeError(msg) self.postProcess(silent=silent)
[docs]class GlassTransitionTemperature(CanvasKPLS): """ Manage glass transition temperature jobs. """ UNITS = 'C' KEY = 'r_matsci_KPLS_Tg/{units}'.format(units=UNITS) PROP = 'kpls_tg' FILE = 'Tg250.kpls.tar.gz'
[docs] @staticmethod def getInfo(host_str=None): """ Return a PropertyInfo. :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: PropertyInfo :return: the property information """ host_str = ClassEvaluator.getHostStr(host_str=host_str) obj = GlassTransitionTemperature afile = os.path.join(obj.PATH, obj.FILE) class_kwargs = {} class_kwargs[obj.MODEL_OPTION] = afile class_kwargs[obj.HOST_STR] = host_str return PropertyInfo(key=obj.KEY, units=obj.UNITS, is_positive=False, class_evaluator=obj, class_kwargs=class_kwargs)
[docs]class RefractiveIndex(CanvasKPLS, Jaguar): """ Manage refractive index jobs. """ UNITS = 'none' KEY = 'r_matsci_Refractive_Index_298K/{units}'.format(units=UNITS) PROP = 'refractive_index' FILE = '01a_kpls5_amorphous_density_HT.kpls.tar.gz' MASS_DENSITY_KEY = 'r_matsci_Mass_Density/g/cm^3' ISOTROPIC_POLARIZABILITY_KEY = 'r_matsci_Isotropic_Polarizability/bohr^3'
[docs] def __init__(self, structs, properties): """ See parent classes for documentation. """ CanvasKPLS.__init__(self, structs, properties) Jaguar.__init__(self, structs, properties)
[docs] @staticmethod def getInfo(jaguar_options=None, tpp=1, host_str=None): """ Return a PropertyInfo. :type jaguar_options: dict :param jaguar_options: contain Jaguar options :type tpp: int :param tpp: the threads per process :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: PropertyInfo :return: the property information """ host_str = ClassEvaluator.getHostStr(host_str=host_str) if not jaguar_options: jaguar_options = {} options = jaguar_options.copy() options['ipolar'] = -1 obj = RefractiveIndex afile = os.path.join(obj.PATH, obj.FILE) class_kwargs = {} class_kwargs[obj.MODEL_OPTION] = afile class_kwargs[obj.JAGUAR_OPTIONS] = options class_kwargs[obj.TPP] = tpp class_kwargs[obj.JAGUAR_OUTPUT_ATTR] = 'polar_alpha' class_kwargs[obj.HOST_STR] = host_str return PropertyInfo(key=obj.KEY, units=obj.UNITS, is_positive=False, class_evaluator=obj, class_kwargs=class_kwargs)
[docs] @classmethod def setRefractiveIndexProperty(cls, struct, mass_density, polarizability): """ Set the refractive index property on the given structure. :type struct: schrodinger.structure.Structure :param struct: the structure :type mass_density: float :param mass_density: the mass density in g/cm^3 :type polarizability: float :param polarizability: the isotropically averaged polarizability in atomic units of bohr^3 :raise ValueError: Abnormal small total_weight or large polarizability """ # the equations used to compute the refractive index are taken from # https://chemrxiv.org/articles/Combining_First-Principles_and_ \ # Data_Modeling_for_the_Accurate_Prediction_of_the_Refractive_ \ # Index_of_Organic_Polymers/5446564 number_density = mass_density * constants.N_A / struct.total_weight number_density *= CM3_PER_BOHR3 # particles/bohr^3 ratio = polarizability * number_density / (3. * VACUUM_PERMITTIVITY_AU) refractive_index = math.sqrt((1 + 2 * ratio) / (1 - ratio)) struct.property[cls.KEY] = refractive_index struct.property[cls.MASS_DENSITY_KEY] = mass_density struct.property[cls.ISOTROPIC_POLARIZABILITY_KEY] = polarizability
[docs] def runIt(self): """ Run it. """ # in turn the mass densities and polarizabilities from the Canvas KPLS # and Jaguar jobs are stored as the refractive index structure property # as this method works by assembling the final property value using the # results from intermediate calculations CanvasKPLS.runIt(self) mass_densities = [struct.property[self.KEY] for struct in self.structs] Jaguar.runIt(self) polarizabilities = [ struct.property[self.KEY] for struct in self.structs ] data = zip(self.structs, mass_densities, polarizabilities) for struct, mass_density, polarizability in data: self.setRefractiveIndexProperty(struct, mass_density, polarizability)
[docs]class CustomClassEvaluator(ClassEvaluator): """ Manage a custom class evaluator. """ UNITS = 'unknown' KEY = 'r_matsci_{name}/{units}' CUSTOM_CLASS_FILE_OPTION = 'custom_class_file' # define some dictionaries, since the RUN_DICT is the only # one that must have all keys defined do not use a name space # object UNITS_DICT = {'stub': UNITS} RUN_DICT = {'stub': 'runStub'} # files specified here should be pathless and located in the same # directory as the python file defining the subclass of this class EXTRA_INPUT_FILES_DICT = {'stub': []}
[docs] def __init__(self, structs, properties): """ See parent class for documentation. """ super().__init__(structs, properties) self.custom_properties = [ aproperty for aproperty in self.properties if aproperty.class_kwargs.get(self.CUSTOM_CLASS_FILE_OPTION) ] # the API of ClassEvaluator is general and takes multiple # structures however when used in genetic optimization only # a single structure is passed because property evaluation # of multiple structures is done in parallel and handled # elsewhere, to be more clear in this custom class offer # a new attribute self.st = self.structs[0]
[docs] @staticmethod def getInfo(name, units, custom_class_file, tpp=1, host_str=None): """ Return a PropertyInfo. :type name: str :param name: the property name :type units: str :param units: the property units :type custom_class_file: str :param custom_class_file: the Python file containing the custom class :type tpp: int :param tpp: the threads per process :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: PropertyInfo :return: the property information """ host_str = ClassEvaluator.getHostStr(host_str=host_str) CustomClassEvaluator.checkModule(custom_class_file, name) custom_module = CustomClassEvaluator.getModule(custom_class_file) custom_class = custom_module.CustomClassEvaluator if units is None: units = custom_class.UNITS_DICT.get(name, CustomClassEvaluator.UNITS) key = CustomClassEvaluator.KEY.format(name=name, units=units) class_kwargs = {} class_kwargs[ CustomClassEvaluator.CUSTOM_CLASS_FILE_OPTION] = custom_class_file class_kwargs[Jaguar.TPP] = tpp class_kwargs[CustomClassEvaluator.HOST_STR] = host_str return PropertyInfo(key=key, units=units, is_positive=False, class_evaluator=custom_class, class_kwargs=class_kwargs)
[docs] @staticmethod def getModule(path): """ Return the module from the given path. :type path: str :param path: the path to the module python file :raise RuntimeError: if there is a problem :rtype: object :return: the module """ if not os.path.exists(path): raise RuntimeError(f'The python file {path} can not be found.') # note that imputils.import_module_from_file caches the import, # likely multiple exception types so catch the bare exception try: custom_module = imputils.import_module_from_file(path) except Exception as err: raise RuntimeError(f'The python file {path} can not be ' f'imported: {str(err)}') return custom_module
[docs] @staticmethod def getExtraInputFiles(path, name): """ Return the extra input files for the given property name. :type path: str :param path: the path to the module python file :type name: str :param name: the name of the property :raise RuntimeError: if there is a problem :rtype: list :return: the extra input files """ custom_module = CustomClassEvaluator.getModule(path) adict = custom_module.CustomClassEvaluator.EXTRA_INPUT_FILES_DICT adir = os.path.dirname(path) files = [] for afile in adict.get(name, []): files.append(os.path.join(adir, afile)) return files
[docs] @staticmethod def checkDict(path, dict_name): """ Check dictionary. :type path: str :param path: the path to the module python file :type dict_name: str :param dict_name: the name of the dictionary to check :raise RuntimeError: if there is a problem """ custom_module = CustomClassEvaluator.getModule(path) custom_class = custom_module.CustomClassEvaluator adict = getattr(custom_class, dict_name) if not isinstance(adict, dict): raise RuntimeError( f'The {dict_name} for the CustomClassEvaluator class in the ' f'python file {path} must be a python dictionary.')
[docs] @staticmethod def checkModule(path, name): """ Check the module. :type path: str :param path: the path to the module python file :type name: str or None :param name: if given the name of the property using this module will be checked against the module :raise RuntimeError: if there is a problem """ # check class custom_module = CustomClassEvaluator.getModule(path) custom_class = getattr(custom_module, 'CustomClassEvaluator', None) if not custom_class: raise RuntimeError(f'The python file {path} is missing the ' 'CustomClassEvaluator class.') if CustomClassEvaluator not in custom_class.__bases__: raise RuntimeError( 'The CustomClassEvaluator class in the ' f'python file {path} must be a subclass of go.CustomClassEvaluator.' ) # check units dict CustomClassEvaluator.checkDict(path, 'UNITS_DICT') # check run dict CustomClassEvaluator.checkDict(path, 'RUN_DICT') run_dict = custom_class.RUN_DICT run_dict.pop('stub', None) if not run_dict: raise RuntimeError( 'The CustomClassEvaluator class in the ' f'python file {path} must define the RUN_DICT dictionary.') if name and name not in run_dict: raise RuntimeError( 'The RUN_DICT for the CustomClassEvaluator ' f'class in the python file {path} must map the property name ' f'{name} to a run method that calculates this property.') for value in run_dict.values(): method = getattr(custom_class, value, None) if not method: raise RuntimeError( 'The CustomClassEvaluator class in the ' f'python file {path} must define the {value} method.') if type(method).__name__ != 'function': raise RuntimeError( f'The {value} method defined for the CustomClassEvaluator ' f'class in the python file {path} must be a python function.' ) # check extra input files dict CustomClassEvaluator.checkDict(path, 'EXTRA_INPUT_FILES_DICT') if name: files = CustomClassEvaluator.getExtraInputFiles(path, name) for afile in files: if not os.path.exists(afile): raise RuntimeError( f'The extra input file {afile} can not be ' 'found.')
[docs] @staticmethod def addInputFiles(job_builder, property_lists): """ Add input files from the given properties to the job builder. :type job_builder: `launchapi.JobSpecificationArgsBuilder` :param job_builder: Job specification builder object :type property_lists: list :param property_lists: contains lists of property specifications :raise RuntimeError: if there is a problem """ # use .addForceInputFile so that all input files end up in the # job directory property_strings = Property.getPropertyStrings(property_lists) for property_string in property_strings: path = Property.getKwargs( property_string, CustomClassEvaluator.CUSTOM_CLASS_FILE_OPTION) if path is not None: name = Property.getKwargs(property_string, 'name') CustomClassEvaluator.checkModule(path, name) job_builder.setInputFile(path) files = CustomClassEvaluator.getExtraInputFiles(path, name) for afile in files: job_builder.setInputFile(afile)
[docs] def runStub(self, st, job_dj, tpp): """ Run stub. :type st: schrodinger.structure.Structure :param st: the structure :type job_dj: JobDJ :param job_dj: the queue :type tpp: int :param tpp: the threads per process :raise RuntimeError: if there is a problem :rtype: float :return: the property value """ raise RuntimeError('Stub function should not be used.')
[docs] def runIt(self): """ Run it. :raise RuntimeError: if there is a problem """ for aproperty in self.custom_properties: run_method = self.RUN_DICT.get(aproperty.name, None) if run_method: run_method = getattr(self, run_method, None) if not run_method: raise RuntimeError( 'Use the RUN_DICT constant of this class to ' 'define the name of the run method of this class for property ' f'{aproperty.name}, for example by defining a key string ' f'"{aproperty.name}" and a value string ' f'"run{aproperty.name.capitalize()}". Additionally method ' 'by this name must be defined that runs the property ' 'evaluation and returns the property.') tpp = aproperty.class_kwargs.get(Jaguar.TPP, 1) value = run_method(self.st, self.job_dj, tpp) self.st.property[aproperty.key] = value
# yapf: disable PROPERTIES = ( StructureEvaluator.SMARTS_PROP, StructureEvaluator.MOL_WEIGHT_PROP, StructureEvaluator.NATOMS_PROP, StructureEvaluator.NELEMENTS_PROP, OXIDATION_PROP, REDUCTION_PROP, HOLE_PROP, ELECTRON_PROP, TRIPLET_PROP, TRIPLET_RMSD_PROP, TRIPLET_REORG_PROP, TADF_T1_PROP, TADF_T2_PROP, TADF_T3_PROP, SPECTRUM_LMAX_PROP, SPECTRUM_RAREA_PROP, SPECTRUM_GAREA_PROP, SPECTRUM_BAREA_PROP, FL_EMISSION_EMAX_PROP, FL_EMISSION_RAREA_PROP, FL_EMISSION_GAREA_PROP, FL_EMISSION_BAREA_PROP, FL_EMISSION_STOKES_SHIFT_PROP, GlassTransitionTemperature.PROP, RefractiveIndex.PROP ) # yapf: enable
[docs]def get_script_property_info_dict(host_str=None): """ Return a (name, PropertyInfo) dict for script based properties. :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: dict :return: contains (name, PropertyInfo) """ host_str = ClassEvaluator.getHostStr(host_str=host_str) class_kwargs = {ClassEvaluator.HOST_STR: host_str} get_info = lambda x: PropertyInfo(key=x[0], units=x[1], is_positive=x[2], class_evaluator=None, class_kwargs=class_kwargs) # yapf: disable info_dict = dict([ (OXIDATION_PROP, get_info([msprops.OE_OXIDATION, EV, False])), (REDUCTION_PROP, get_info([msprops.OE_REDUCTION, EV, False])), (HOLE_PROP, get_info([msprops.OE_HOLE_REORG, EV, True])), (ELECTRON_PROP, get_info([msprops.OE_ELECTRON_REORG, EV, True])), (TRIPLET_PROP, get_info([msprops.OE_TRIPLET_ENERGY, EV, False])), (TRIPLET_RMSD_PROP, get_info([msprops.OE_T1S0_RMSD, ANGSTROM, True])), (TRIPLET_REORG_PROP, get_info([msprops.OE_TRIPLET_REORG, EV, True])), (TADF_T1_PROP, get_info([msprops.OE_TADF_GAP, EV, False])), (TADF_T2_PROP, get_info([msprops.OE_TADF_GAP2, EV, False])), (TADF_T3_PROP, get_info([msprops.OE_TADF_GAP3, EV, False])), (SPECTRUM_LMAX_PROP, get_info([msprops.OE_LMAX, NM, True])), (SPECTRUM_RAREA_PROP, get_info([msprops.OE_RED_AREA, NM_X_INTENSITY, True])), (SPECTRUM_GAREA_PROP, get_info([msprops.OE_GREEN_AREA, NM_X_INTENSITY, True])), (SPECTRUM_BAREA_PROP, get_info([msprops.OE_BLUE_AREA, NM_X_INTENSITY, True])), (FL_EMISSION_EMAX_PROP, get_info([msprops.OE_EMAX, NM, True])), (FL_EMISSION_RAREA_PROP, get_info([msprops.OE_EMS_RED_AREA, NM_X_INTENSITY, True])), (FL_EMISSION_GAREA_PROP, get_info([msprops.OE_EMS_GREEN_AREA, NM_X_INTENSITY, True])), (FL_EMISSION_BAREA_PROP, get_info([msprops.OE_EMS_BLUE_AREA, NM_X_INTENSITY, True])), (FL_EMISSION_STOKES_SHIFT_PROP, get_info([msprops.OE_STOKES_SHIFT, NM, False])) ]) # yapf: enable return info_dict
[docs]def get_property_info(name, jaguar_options=None, tpp=None, patterns=None, host_str=None): """ Return a PropertyInfo for the given name and properties. :type name: str :param name: the property name :type jaguar_options: dict :param jaguar_options: contains Jaguar options :type tpp: int :param tpp: the threads per process :type patterns: list :param patterns: the SMARTS patterns :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: PropertyInfo or None :return: the PropertyInfo """ se = StructureEvaluator gtt = GlassTransitionTemperature ri = RefractiveIndex if name == se.SMARTS_PROP: return se.getInfo(se.SMARTS_KEY, se.NO_UNITS, patterns=patterns, host_str=host_str) if name == se.MOL_WEIGHT_PROP: return se.getInfo(se.MOL_WEIGHT_KEY, se.MOL_WEIGHT_UNITS, host_str=host_str) if name == se.NATOMS_PROP: return se.getInfo(se.NATOMS_KEY, se.NO_UNITS, host_str=host_str) if name == se.NELEMENTS_PROP: return se.getInfo(se.NELEMENTS_KEY, se.NO_UNITS, host_str=host_str) if name == gtt.PROP: return gtt.getInfo(host_str=host_str) if name == ri.PROP: return ri.getInfo(jaguar_options=jaguar_options, tpp=tpp, host_str=host_str) return get_script_property_info_dict(host_str=host_str).get(name)
[docs]def get_random_csearch_seed(this_random=None): """ Return a random csearch seed. :type this_random: numpy.random.RandomState or None :param this_random: random state, if None use the module constant :rtype: int :return: the seed """ this_random = this_random or my_random lb, ub = CONF_SEARCH_SEED_RANGE return this_random.randint(lb, high=ub + 1)
[docs]class PropertySyntaxError(Exception): pass
[docs]class UnknownPropertySuboptionError(Exception): pass
[docs]class Property: """ Manage a property to be used in a genetic optimization. """ MAX = 'max' MIN = 'min' EQUALS = 'eq' GREATER_THAN = 'gt' LESS_THAN = 'lt' SUB_OPTIONS = [ 'index', 'key', 'name', 'units', 'minimax', 'target', 'comparator', 'error', 'weight', 'positive', 'patterns', 'summarize', CanvasKPLS.MODEL_OPTION, CustomClassEvaluator.CUSTOM_CLASS_FILE_OPTION, 'class_kwargs' ]
[docs] def __init__(self, index=1, key=None, name=None, units=None, minimax=None, target=None, comparator=None, error=None, weight=1.0, positive=None, summarize=None, class_kwargs=None): """ Create an instance. :type index: int :param index: a numeric index used to refer to this Property instance, a default of 1 is used :type key: str :param key: the schrodinger.structure.Structure property key to be optimized :type name: str :param name: specify a name for the property, this name will be, for example used in any `*log` files, etc. :type units: str :param units: enter the units that the property is in, for example eV, nm, etc. :type minimax: str :param minimax: to minimize or maximize this property then set this option to the class constants MIN or MAX :type target: float :param target: if instead of maximizing or minimizing the property, the genetic optimization is supposed to handle a specific value then enter that value using this option. :type comparator: str :param comparator: specify here how the target value and computed values are to be compared, i.e. either the class constants EQUALS for =, GREATER_THAN for >, or LESS_THAN for <. :type error: float :param error: if equality to a target value has been specified then this option allows the user to control the error bounds of the target value, if not specified then a default of 10% of the specified target value will be used. :type weight: float :param weight: specify the weight to use for this property, if the genetic optimization is to be run on several properties then the weight allows the user to bias the solution. This option can also be used to control a situation where more than a single property is desired and where those properties are quantified using different physical units such that the numbers might be orders of magnitude apart from one another, for example comparing eV and nm. A default of 1.0 is used. :type positive: bool :param positive: True if this property can only take on positive values, for example as in the area of a surface, False otherwise, for example as in temperature in Celcius. The default is False. :type summarize: bool :param summarize: if True then print a summary of this property, False otherwise :type class_kwargs: dict or None :param class_kwargs: contains kwargs for class based evaluation of this property """ self.setAttributes(index, key, name, units, minimax, target, comparator, error, weight, positive, summarize, class_kwargs)
[docs] def setAttributes(self, index=1, key=None, name=None, units=None, minimax=None, target=None, comparator=None, error=None, weight=1.0, positive=None, summarize=None, class_kwargs=None): """ Set some attributes for this class. :type index: int :param index: a numeric index used to refer to this Property instance, a default of 1 is used :type key: str :param key: the schrodinger.structure.Structure property key to be optimized :type name: str :param name: specify a name for the property, this name will be, for example used in any `*log` files, etc. :type units: str :param units: enter the units that the property is in, for example eV, nm, etc. :type minimax: str :param minimax: to minimize or maximize this property then set this option to the class constants MIN or MAX :type target: float :param target: if instead of maximizing or minimizing the property, the genetic optimization is supposed to handle a specific value then enter that value using this option. :type comparator: str :param comparator: specify here how the target value and computed values are to be compared, i.e. either the class constants EQUALS for =, GREATER_THAN for >, or LESS_THAN for <. :type error: float :param error: if equality to a target value has been specified then this option allows the user to control the error bounds of the target value, if not specified then a default of 10% of the specified target value will be used. :type weight: float :param weight: specify the weight to use for this property, if the genetic optimization is to be run on several properties then the weight allows the user to bias the solution. This option can also be used to control a situation where more than a single property is desired and where those properties are quantified using different physical units such that the numbers might be orders of magnitude apart from one another, for example comparing eV and nm. A default of 1.0 is used. :type positive: bool :param positive: True if this property can only take on positive values, for example as in the area of a surface, False otherwise, for example as in temperature in Celcius. The default is False. :type summarize: bool :param summarize: if True then print a summary of this property, False otherwise :type class_kwargs: dict or None :param class_kwargs: contains kwargs for class based evaluation of this property """ # make sure that we leave with a bool but handle 'False' case def handle_boolean(prop): try: prop = ast.literal_eval(prop) except ValueError: pass return bool(prop) # set some attrs, handle units self.index = index self.key = key self.name = name if units == 'None': self.units = None else: self.units = units self.minimax = minimax self.target = target self.comparator = comparator self.weight = weight self.positive = handle_boolean(positive) # if the user has specified equality to a specific target value but # has not specified any error bounds then use +/-10% of the target # value as the error bounds if self.target is not None and self.comparator == self.EQUALS and error is None: self.error = 0.10 * self.target else: self.error = error # by default summarize is off for structure properties and on # for other properties if summarize is None: if self.name in StructureEvaluator.PROPERTIES: summarize = False else: summarize = True self.summarize = handle_boolean(summarize) self.class_kwargs = class_kwargs or {}
[docs] def setClassKwargs(self, class_kwargs): """ Set the class kwargs. :type class_kwargs: dict or None :param class_kwargs: contains kwargs for class based evaluation of this property """ self.class_kwargs = class_kwargs
[docs] def parsePropertyString(self, property_string): """ Parse the attributes of this class from a string representation of the property specifications. For example, 'index=1 key=r_matsci_Reduction_Potential_(eV) name=reduction units=eV target=1.28 comparator=eq error=0.05 weight=0.5' or 'index=2 key=r_matsci_Oxidation_Potential_(eV) name=oxidation units=eV minimax=max weight=2.5' :type property_string: str :param property_string: the string representation of the property specifications :raise PropertySyntaxError: if there is something wrong with the property syntax :raise UnknownPropertySuboptionError: if an unknown property suboption is found """ BAD_SYNTAX_MSG = ( 'A property must be specified using whitespace ' 'delimited <key>=<value> pairs, where <key> and <value> are ' 'void of whitespace. The following property has incorrect syntax: %s.' ) NO_VALID_SUBOPTION_MSG = ('You have specified the following invalid ' 'property sub-option: %s.') # build kwargs kwargs = {} for option in property_string.split(): try: key, value = option.split('=') except ValueError: raise PropertySyntaxError(BAD_SYNTAX_MSG % property_string) if key not in self.SUB_OPTIONS: raise UnknownPropertySuboptionError(NO_VALID_SUBOPTION_MSG % key) # see MATSCI-4321 where numeric names weren't allowed, # i.e. 'name=4' if key != 'name': try: value = float(value) except (ValueError, TypeError): pass kwargs[key] = value # coming out of the previous loop all numeric values are floats but some # values are supposed to be integers so change them back kwargs['index'] = int(kwargs['index']) self.setAttributes(**kwargs)
[docs] def checkProperty(self): """ Check this property instance. """ NO_VALID_INDEX_MSG = ( 'A valid integer index is needed to create a ' 'Property instance. Please specify such an index using ' '\'index=<index>\'.') NO_VALID_KEY_MSG = ( 'A valid string Maestro structure property key is ' 'needed to create a Property instance. Please specify such a key ' 'using \'key=<key>\'.') NO_VALID_NAME_MSG = ( 'A valid string property name is needed to create a ' 'Property instance. Please specify such a name using ' '\'name=<name>\'.') INVALID_UNITS_MSG = ('If units wish to be created please specify such ' 'units using \'units=<units>\'.') NO_VALID_CONDITION_MSG = ( 'A valid minimax or target value condition ' 'is needed to create a Property instance. Please specify such ' 'options using \'minimax=<minimax>\' or \'target=<target>\'.') INVALID_MINIMAX_MSG = ( 'If a minimax condition is desired then it must ' 'be specified as either %s or %s. If not specified then a target ' 'condition must be specified using \'target=<target>\'.' % (self.MIN, self.MAX)) MULTI_CONDITION_MSG = ( 'A property can only be specified using either a ' 'minimax or target value condition. Please specify a single condition ' 'using either \'minimax=<minimax>\' or \'target=<target>\'.') INVALID_TARGET_MSG = ( 'If specified a target value must be a number. If ' 'not specified then a minimax condition must be specified using ' '\'minimax=<minimax>\'.') INVALID_COMPARATOR_MSG = ('If a comparator is necessary then it must be ' 'either %s, %s, or %s. Please specify such a comparator using ' '\'comparator=<comparator>\'.' % (self.EQUALS, self.GREATER_THAN, \ self.LESS_THAN)) REQUIRED_COMPARATOR_MSG = ( 'When specifying a target value condition a ' 'valid comparator from the following list is required: %s, %s, or %s. ' 'Please specify such a comparator using \'comparator=<comparator>\'.' % (self.EQUALS, self.GREATER_THAN, self.LESS_THAN)) INVALID_ERROR_MSG = ( 'If you need to specify an error bound then it must ' 'a number.') NO_VALID_WEIGHT_MSG = ( 'A valid numeric weight >0 is needed to create a ' 'Property instance.') INVALID_SMARTS_MSG = ('The following SMARTS pattern is invalid, %s.') # we must have a valid index if type(self.index) != int: raise ValueError(NO_VALID_INDEX_MSG) # we must have a valid key if type(self.key) != str: raise ValueError(NO_VALID_KEY_MSG) # we must have a valid name if type(self.name) != str: raise ValueError(NO_VALID_NAME_MSG) # we must have a valid units if self.units is not None: if type(self.units) != str: raise ValueError(INVALID_UNITS_MSG) # we must have either a minimax or target condition if self.minimax is None and self.target is None: raise ValueError(NO_VALID_CONDITION_MSG) # minimax condition must be valid if self.minimax not in [None, self.MIN, self.MAX]: raise ValueError(INVALID_MINIMAX_MSG) # we must have a single condition to meet if self.minimax is not None and self.target is not None: raise ValueError(MULTI_CONDITION_MSG) # turn off comparator and error if minimax is set and target # is not if self.minimax is not None and self.target is None: self.comparator = None self.error = None # we must have a valid target if self.target is not None: try: self.target = float(self.target) except ValueError: raise ValueError(INVALID_TARGET_MSG) # comparator must be valid if self.comparator not in [None, self.EQUALS, self.GREATER_THAN, \ self.LESS_THAN]: raise ValueError(INVALID_COMPARATOR_MSG) # if target has been specified then comparator must also be if self.target is not None and self.comparator is None: raise ValueError(REQUIRED_COMPARATOR_MSG) # we must have a valid error if self.error is not None: try: self.error = float(self.error) except ValueError: raise ValueError(INVALID_ERROR_MSG) # if greater-than or less-than a target value has been specified # then turn off the error if self.target is not None and self.comparator in \ [self.GREATER_THAN, self.LESS_THAN]: self.error = None # we must have a valid weight try: self.weight = float(self.weight) assert self.weight > 0 except (ValueError, TypeError, AssertionError): raise ValueError(NO_VALID_WEIGHT_MSG) # we must have valid SMARTS if self.class_kwargs: patterns = self.class_kwargs.get(StructureEvaluator.PATTERNS, []) for pattern in patterns: valid, msg = analyze.validate_smarts_canvas(pattern) if not valid: raise ValueError(INVALID_SMARTS_MSG % pattern)
def __repr__(self): """ Define a representation of the class. """ # yapf: disable kwargs = dict([ ('index', str(self.index)), ('key', str(self.key)), ('name', str(self.name)), ('units', str(self.units)), ('minimax', str(self.minimax)), ('target', '%.2f' % self.target if self.target is not None else str(None)), ('comparator', str(self.comparator)), ('error', '%.2f' % self.error if self.error is not None else str(None)), ('weight', '%.2f' % self.weight), ('positive', str(self.positive)), ('summarize', str(self.summarize)), ('class_kwargs', str(self.class_kwargs)) ]) # yapf: enable return ' '.join([key + '=' + value for key, value in kwargs.items()]) def __eq__(self, other_property): """ Define an equality of the class. """ if self.name == other_property.name == StructureEvaluator.SMARTS_PROP: patterns = self.class_kwargs.get(StructureEvaluator.PATTERNS, []) other_patterns = other_property.class_kwargs.get( StructureEvaluator.PATTERNS, []) return patterns == other_patterns else: return self.name == other_property.name
[docs] def isScriptProperty(self): """ Return True if this property is a script property, False otherwise. :rtype: bool :return: return True if this property is a script property, False otherwise """ return (len(self.class_kwargs) == 1 and self.name not in StructureEvaluator.PROPERTIES)
[docs] @staticmethod def getPropertyStrings(property_lists): """ Return property strings from the given property lists. :type property_lists: list :param property_lists: contains lists of property specifications :rtype: list :return: contains string representations of the property specifications """ property_strings = [] for property_list in property_lists: property_string = ' '.join(property_list) property_string = re.sub(r'\s=', '=', property_string) property_string = re.sub(r'=\s', '=', property_string) property_strings.append(property_string) return property_strings
[docs] @staticmethod def getRelPath(file_name): """ Return the relative path to the given file name. :type file_name: str :param file_name: the file name :rtype: str :return: the relative path to the file """ # some input files are only located in the top level job # directory but are used by all subjobs which are located in their own # subdirectories, this function returns the path to the shared input # file relative to the subdirectories # MATSCI-5053: ntpath covers both Linux and Windows path # When Linux remote host receives abs path from Windows localhost, # os.path.basename('C:\\Users\\test3.kpls.tar.gz') gives # 'C:\\Users\\test3.kpls.tar.gz' instead of 'test.kpls.tar.gz' path = EVALUATOR_RELATIVE_DIR + [ntpath.basename(file_name)] return os.path.join(*path)
[docs] @staticmethod def getKwargs(property_string, option_substrings, add_relative_paths=None): """ Return kwargs of the given property options from the given property string. :type property_string: str :param property_string: the string representation of the property specifications, containing options as '<option_substring>=<value>' :type option_substrings: list or str :param option_substrings: contains the option substrings for the needed values, a single occurence or list of occurences may be passed :type add_relative_paths: list :param add_relative_paths: contains options for which relative paths should be added, such relative paths might be needed for correctly parallelizing the evaluation stage of the genetic optimization as they will be needed to copy otherwise shared files into local subdirectories :rtype: dict, str, or None :return: the extracted dictionary of kwargs or single kwarg depending on the input option_substrings or None if nothing is found """ if add_relative_paths is None: add_relative_paths = [] if isinstance(option_substrings, list): f_option_substrings = list(option_substrings) else: f_option_substrings = [option_substrings] kwargs = {} for option_substring in f_option_substrings: splitter = option_substring.rstrip('=') + '=' if splitter not in property_string: continue value = property_string.split(splitter)[1] if not value: continue value = value.split()[0] if option_substring in add_relative_paths: value = Property.getRelPath(value) kwargs[splitter[:-1]] = value if not kwargs: return elif isinstance(option_substrings, list): return kwargs else: return list(kwargs.values())[0]
[docs] @staticmethod def rmKwargs(property_string, option_substrings): """ Return a copy of the given property string with all of the given property option substrings removed. :type property_string: str :param property_string: the string representation of the property specifications, containing options as '<option_substring>=<value>' :type option_substrings: list :param option_substrings: contains the option substrings to be removed :rtype: str :return: the string representation of the property specifications less the options substrings that were to be removed """ kwargs = Property.getKwargs(property_string, option_substrings) if kwargs is not None: for pair in kwargs.items(): token = ' ' + '='.join(pair) if token not in property_string: token = token[1:] property_string = property_string.replace(token, '') return property_string
[docs] @staticmethod def addKwargs(property_string, kwargs): """ Add the given options to the given property string. :type property_string: str :param property_string: the string representation of the property specifications, containing options as '<option_substring>=<value>' :type kwargs: dict :param kwargs: key-value option pairs to add to the property string :rtype: str :return: the string representation of the property specifications containin the new options """ f_property_string = property_string for pair in list(kwargs.items()): f_property_string += ' ' + '='.join(pair) return f_property_string
[docs] @staticmethod def getCustomInfo(property_string, name, tpp=1, host_str=None): """ Return a PropertyInfo. :type property_string: str :param property_string: the string representation of the property specifications, containing options as '<option_substring>=<value>' :type name: str :param name: the property name :type tpp: int :param tpp: the threads per process :type host_str: str :param host_str: the host string, for example 'localhost:4' :raise RuntimeError: if there is a problem :rtype: PropertyInfo :return: the property information """ units = Property.getKwargs(property_string, 'units') pairs = [(CanvasKPLS, 'MODEL_OPTION'), (AutoQSAR, 'MODEL_OPTION'), (DeepChem, 'MODEL_OPTION'), (CustomClassEvaluator, 'CUSTOM_CLASS_FILE_OPTION')] infos = [] for cls, option_attr in pairs: option = getattr(cls, option_attr) if option_attr == 'MODEL_OPTION': add_relative_paths = [option] else: add_relative_paths = None afile = Property.getKwargs(property_string, option, add_relative_paths=add_relative_paths) if not afile: continue infos.append( cls.getInfo(name, units, afile, tpp=tpp, host_str=host_str)) if len(infos) > 1: options_str = ','.join( getattr(cls, option_attr) for cls, option_attr in pairs) raise RuntimeError('The following sub-options can not be used ' f'simultaneously: {options_str}.') elif not infos: raise RuntimeError(f'The property name {name} is unknown.') return infos[0]
[docs]def set_title_to_stoichiometry(astructure, toappend=None, separation=STOICH_BASE_EXT_SEP): """ Set the structure title to be the stoichiometry of the structure. :type astructure: schrodinger.structure.Structure :param astructure: the structure :type toappend: str :param toappend: a string to append to the stoichiometry :type separation: str :param separation: used to separate the stoichiometry and the toappend str """ formula = analyze.generate_molecular_formula(astructure) if toappend: formula += separation + toappend astructure.title = formula astructure.property[ENTRY_NAME_KEY] = astructure.title
[docs]class StructureGenome(GenomeBase.GenomeBase): """ Manage a genome. The genome, aka chromosome, is the solution to the problem trying to be solved via genetic optimization. It is referred to as being composed of genes that are manipulated by the crossover and mutation operators. In our genetic optimization module this genome is basically just a schrodinger.structure.Structure object. """
[docs] def __init__(self): """ Create an instance. """ GenomeBase.GenomeBase.__init__(self) # use attributes of genome for parameters that are unique # to this genome and use the genome.setParams() method for # parameters that are shared by all genomes in the population self.astructure = None self.basename_ext = None self.seed = None self.skip = None self.failure = None self.infile = None self.outfile = None
[docs] def copy(self, genome): """ Copy the current genome to the provided genome. :type genome: StructureGenome :param genome: a new genome instance to which to copy the current genome """ GenomeBase.GenomeBase.copy(self, genome) if self.astructure: genome.astructure = self.astructure.copy() # use a unique random extension for the job sub-directory of # this genome to avoid a threading race condition with regard # to creating such sub-directories when doing the evaluation # step in the genetic optimization chars = list( my_random.choice(list(string.ascii_letters + string.digits), size=6, replace=False)) genome.basename_ext = ''.join(chars) genome.seed = my_random.randint(RANDOM_INT_BOUND + 1) genome.skip = None genome.failure = None genome.infile = None genome.outfile = None
[docs] def clone(self): """ Clone the current genome. :rtype: StructureGenome :return: genome """ genome = StructureGenome() self.copy(genome) return genome
[docs] def updateStructureProperties(self, index, generation): """ Update some structure properties. :type index: int :param index: the index of this individual :type generation: int :param generation: this generation """ self.astructure.property[FIT_SCORE_KEY] = self.getFitnessScore() self.astructure.property[RAW_SCORE_KEY] = self.getRawScore() self.astructure.property[GENERATION_KEY] = generation self.astructure.property[INDIVIDUAL_KEY] = index self.astructure.property[SKIP_KEY] = bool(self.skip) self.astructure.property[FAILURE_KEY] = bool(self.failure)
[docs] def resetParentProperties(self): """ Reset the crossover and mutation parent structure properties. """ self.astructure.property[CROSSOVER_PARENTS_KEY] = '' self.astructure.property[MUTATION_PARENT_KEY] = '' self.astructure.property[CROSSOVER_APPLIED_KEY] = '' self.astructure.property[MUTATION_APPLIED_KEY] = '' self.astructure.property[SKIP_KEY] = False self.astructure.property[FAILURE_KEY] = False self.astructure.property.pop(CONF_SEARCH_FAILED_KEY, None) self.astructure.property.pop(FROM_FREEZER_KEY, None) self.astructure.property.pop(INOCULATE_KEY, None)
[docs] def removeProperties(self): """ Remove some structure properties. """ props_to_remove = self.getParam('props_to_remove') if props_to_remove: for prop_to_remove in props_to_remove: self.astructure.property.pop(prop_to_remove, None) self.astructure.property.pop(STRUCTURE_SCORE_KEY, None) # MATSCI-2893 - remove EZ stereochemistry properties mm.mmct_ct_clear_stereo(self.astructure)
[docs] def optimizeGeometry(self): """ Optimize the geometry of this genome's structure using OPLS. """ if not self.getParam('no_minimize'): rxn_channel.minimize_geometry(self.astructure)
[docs] def addPreviousFreezerFile(self, freezer_file): """ Add the given file to the list of previous freezer files. :type freezer_file: str :param freezer_file: the name of the file to be added """ # the dictionary accessed with an individual's setParams method # is shared by all individuals in the population and so it would # seem that you only need to set it for one individual and it would # be set for all, this is the behavior when run in serial, # however, the other individuals appear to not be updated # in this way when running in parallel, so in general we will set it # for all individuals (done in the loop that calls this method) freezer_files = self.getParam('freezer_files') if freezer_file not in freezer_files[PREVIOUS]: freezer_files[PREVIOUS].append(freezer_file) self.setParams(freezer_files=freezer_files)
[docs] def evaluate(self, **args): """ Evaluate the score of this individual. :type args: dict :param args: dictionary of genetic optimization parameters created and used by pyevolve """ # this is to properly catch exceptions generated by # child processes try: GenomeBase.GenomeBase.evaluate(self, **args) except Exception as msg: traceback.print_exc() raise msg
# initializers
[docs]def from_initial_population(genome, **args): """ Draw a unique genome from the initial population. :type genome: StructureGenome :param genome: a genome :type args: dict :param args: dictionary of genetic optimization parameters created and used by pyevolve """ initial_population = genome.getParam('initial_population') genome.astructure = initial_population.pop() set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext) genome.resetParentProperties() genome.removeProperties() genome.setParams(initial_population=initial_population)
[docs]def get_num_simple_bonds(astructure): """ Return the number of simple bonds in the provided structure. The definition of a simple bond follows from that used in the reaction channel module and is an acyclic single order bond that may involve a hydrogen atom. :type astructure: schrodinger.structure.Structure :param astructure: the structure for which to get the number of simple bonds :rtype: int :return: the number of simple bonds """ num_simple_bonds = 0 for bond in astructure.bond: if not any(rxn_channel.CheckInput().checkIfNotSimpleBond( astructure, bond.atom1.index, bond.atom2.index, False)): num_simple_bonds += 1 return num_simple_bonds
[docs]def combine_two_structures(astructure, bstructure, offset=10.0): """ Combine two structure objects into a single structure object using somewhat arbitrary placement. :type astructure: schrodinger.structure.Structure :param astructure: the first of the structures to be combined :type bstructure: schrodinger.structure.Structure :param bstructure: the second of the structures to be combined :type offset: float :param offset: the final distance between the structures will be the sum of the molecular VDW radii plus this offset in Angstrom :rtype: schrodinger.structure.Structure :return: the combined structure object """ THRESH = 0.1 # get a target distance used to separate the two structures aatom1, aatom2, adistance = rxn_path.max_pair_vdw_distance(astructure) batom1, batom2, bdistance = rxn_path.max_pair_vdw_distance(bstructure) target_distance = adistance / 2 + bdistance / 2 + offset # get some parameters acentroid = transform.get_centroid(astructure) bcentroid = transform.get_centroid(bstructure) atobvector = bcentroid - acentroid current_distance = transform.get_vector_magnitude(atobvector) # if the two structures are on top of each other then use the # x-axis as the vector along which to separate them if current_distance > THRESH: atobunitvector = transform.get_normalized_vector(atobvector) else: atobunitvector = numpy.append(transform.X_AXIS, 0.0) # translate the second structure delta = target_distance - current_distance delta_vector = delta * atobunitvector delta_vector = numpy.delete(delta_vector, 3) delta_x = numpy.dot(delta_vector, transform.X_AXIS) delta_y = numpy.dot(delta_vector, transform.Y_AXIS) delta_z = numpy.dot(delta_vector, transform.Z_AXIS) transform.translate_structure(bstructure, delta_x, delta_y, delta_z) # combine the structures cstructure = astructure.copy() cstructure.extend(bstructure) return cstructure
# crossovers
[docs]def bond_crossover(genome, **args): """ Perform a crossover operation by swapping molecular fragments at two randomly choosen bonds, i.e. a double displacement reaction channel. :type genome: StructureGenome :param genome: a genome :type args: dict :param args: dictionary of genetic optimization parameters created and used by pyevolve :rtype: tuple :return: tuple containing the sister and brother StructureGenome """ # get, clone, and reset mom and dad genomes into brother and # sister genomes gmom = args['mom'] gdad = args['dad'] gsister = gmom.clone() gbrother = gdad.clone() gsister.resetStats() gbrother.resetStats() gsister.removeProperties() gbrother.removeProperties() # get parents string gmom_title = gmom.astructure.title gdad_title = gdad.astructure.title parents = gmom_title + ' + ' + gdad_title parents_basename_ext = gmom.basename_ext + '.' + gdad.basename_ext # get some parameters no_minimization = gsister.getParam('no_minimize') freezer_files = gsister.getParam('freezer_files') inoculate = gsister.getParam('inoculate') # prep for reaction channel by combining the structures combo = combine_two_structures(gsister.astructure, gbrother.astructure) # first try to find a single unique random double displacement channel that # does not involve hydrogen, if this is not possible (for example consider # A-A + A-A or A-A + A-B, i.e. ethane + ethane or ethane + methanol) then # try to find a single unique random double displacement channel that may # involve hydrogen, note that here it is possible that the dihydrogen # molecule may be produced as a child (for example consider the case where # both bonds involve hydrogen, i.e. A-H + B-H), if even with enabling # hydrogens we still can't get a unique channel, for example with # parents like H-H + H-H or A-H + H-H, then for this worst case scenario # we either just return brother and sister the same as mom and dad, with # the understanding that a different genetic operator will make them # amenable to bond crossover (each structure in the initial population is # guaranteed to be a candidate for a least one of the specified genetic # operators) or we draw children from the freezer depending on inoculate no_reactive_h = True channel_defs = rxn_channel.ChannelDefinitions() random_seed = my_random.randint(RANDOM_INT_BOUND + 1) channel_defs.addRandomChannels(combo, num_random_channels=1, random_types=['double_displacement'], no_reactive_h=no_reactive_h, random_seed=random_seed, unique=True, no_reactive_polymer_monomer_backbone=False, no_reactive_polymer_monomer_sidechain=False) if not channel_defs.definitions: no_reactive_h = False channel_defs = rxn_channel.ChannelDefinitions() random_seed = my_random.randint(RANDOM_INT_BOUND + 1) channel_defs.addRandomChannels( combo, num_random_channels=1, random_types=['double_displacement'], no_reactive_h=no_reactive_h, random_seed=random_seed, unique=True, no_reactive_polymer_monomer_backbone=False, no_reactive_polymer_monomer_sidechain=False) if not channel_defs.definitions: if NO_CHILD in inoculate: gsister.astructure = get_freezer_structure( freezer_files, inoculate=NO_CHILD, crossover_applied=BOND_CROSSOVER, basename_ext=gsister.basename_ext) gbrother.astructure = get_freezer_structure( freezer_files, inoculate=NO_CHILD, crossover_applied=BOND_CROSSOVER, basename_ext=gbrother.basename_ext) return (gsister, gbrother) # run the reaction channel module mae_out_file = 'crossover' + '.' + parents_basename_ext + '.mae' channels = rxn_channel.Channels(combo, channel_defs, mae_out_file=mae_out_file, isolate_products=True, no_minimization=no_minimization, no_reactive_h=no_reactive_h, unique=True) channels.orchestrate() # in the general case the 5 returned structures are (1) reactant, combination of # two structures i.e. A-B + C-D, (2) A-C, (3) B-D, (4) A-D, and (5) B-C however # we may also have only a single sub-channel, i.e. 3 structures structures = [ astructure for astructure in structure.StructureReader(mae_out_file) ] structures = structures[1:] fileutils.force_remove(mae_out_file) # handle cases with regard to A-A + B-B (1 unique product), # like A-B + C-C (2 unique products), and A-B + C-D # (4 unique products), note that it is also possible to get # 3 unique products here for example when the A-C and B-D # or A-D and B-C products have the same SMILES if len(structures) == 1: mol_one = structures[0] if NO_CHILD in inoculate: mol_two = get_freezer_structure(freezer_files, inoculate=NO_CHILD, crossover_applied=BOND_CROSSOVER, basename_ext=gbrother.basename_ext) else: mol_two = mol_one.copy() elif len(structures) == 2: mol_one, mol_two = structures elif len(structures) == 3: # of the three we want to pick the two that came from the same reaction # sub-channel first_rxn = structures[0].property[RXN_KEY] second_rxn = structures[1].property[RXN_KEY] if first_rxn == second_rxn: mol_one = structures[0] mol_two = structures[1] else: mol_one = structures[1] mol_two = structures[2] elif len(structures) == 4: mol_ac, mol_bd, mol_ad, mol_bc = structures # if both reactive bonds were in side chains then prevent using # the channel that features a product that is not a monomer, otherwise # choose between the A-C + B-D and A-D + B-C products by minimizing # the standard deviation in the size, do this with the specified # probability using a biased coin flip acbd_are_monomers = rxn_channel.are_monomers([mol_ac, mol_bd]) adbc_are_monomers = rxn_channel.are_monomers([mol_ad, mol_bc]) if acbd_are_monomers and not adbc_are_monomers: mol_one, mol_two = mol_ac, mol_bd elif not acbd_are_monomers and adbc_are_monomers: mol_one, mol_two = mol_ad, mol_bc else: first_rxn_std = numpy.std( numpy.array([mol_ac.atom_total, mol_bd.atom_total])) second_rxn_std = numpy.std( numpy.array([mol_ad.atom_total, mol_bc.atom_total])) if first_rxn_std <= second_rxn_std: small_stdev = mol_ac, mol_bd large_stdev = mol_ad, mol_bc else: small_stdev = mol_ad, mol_bc large_stdev = mol_ac, mol_bd if Util.randomFlipCoin(SIMILAR_STDEV_CHILDREN_FREQ): mol_one, mol_two = small_stdev else: mol_one, mol_two = large_stdev # the following can happen for example if a polymer monomer # reacts with itself by just swapping equivalent side chains, # in this case one channel produces products that are equivalent # to the reactants and thus are deduped while the other channel # produces one product that functions as a monomer and one product # that is not a monomer mol_one_is_monomer = rxn_channel.are_monomers([mol_one]) mol_two_is_monomer = rxn_channel.are_monomers([mol_two]) if (mol_one_is_monomer and not mol_two_is_monomer) or \ (not mol_one_is_monomer and mol_two_is_monomer): if NO_CHILD in inoculate: mol_one = get_freezer_structure(freezer_files, inoculate=NO_CHILD, crossover_applied=BOND_CROSSOVER, basename_ext=gsister.basename_ext) mol_two = get_freezer_structure(freezer_files, inoculate=NO_CHILD, crossover_applied=BOND_CROSSOVER, basename_ext=gbrother.basename_ext) else: mol_one = gsister.astructure.copy() mol_two = gbrother.astructure.copy() gsister.astructure = mol_one gbrother.astructure = mol_two # update properties set_title_to_stoichiometry(gsister.astructure, toappend=gsister.basename_ext) set_title_to_stoichiometry(gbrother.astructure, toappend=gbrother.basename_ext) # clean up gsister.astructure.property.pop(RXN_KEY, None) gbrother.astructure.property.pop(RXN_KEY, None) # set parents string gsister.astructure.property[CROSSOVER_APPLIED_KEY] = BOND_CROSSOVER if not gsister.astructure.property.get(FROM_FREEZER_KEY): gsister.astructure.property[CROSSOVER_PARENTS_KEY] = parents gbrother.astructure.property[CROSSOVER_APPLIED_KEY] = BOND_CROSSOVER if not gbrother.astructure.property.get(FROM_FREEZER_KEY): gbrother.astructure.property[CROSSOVER_PARENTS_KEY] = parents return (gsister, gbrother)
[docs]def get_element_mutator_dict(astructure): """ Return a dictionary where the keys contain the indicies of the mutatable atoms and the values contain those elements that the keyed atom may be mutated to. :type astructure: schrodinger.structure.Structure :param astructure: the structure to be mutated :rtype: dict :return: keys are atom indicies of those atoms that are mutatable and values are those elements that the atom can be mutated to """ # group hydrogen with the halogens, the following is not meant to # be exhaustive, keys are SMARTS patterns where the mutatable atom # is first and values are lists containing the elements that the # mutatable atom may be mutated to, currently the key-value pairs # may be in any order ALL_PATTERNS_DICT = { '[H]': ['F', 'Cl', 'Br', 'I'], '[C]': ['Si'], '[N]': ['P'], '[O]': ['S'], '[F]': ['H', 'Cl', 'Br', 'I'], '[Si]': ['C'], '[p-0X2]': ['N'], '[P-0X2]': ['N'], '[P-0X3]': ['N'], '[S-0X1]': ['O'], '[S-0X2]': ['O'], '[Cl]': ['H', 'F', 'Br', 'I'], '[Br]': ['H', 'F', 'Cl', 'I'], '[I]': ['H', 'F', 'Cl', 'Br'], '[Ti]': ['Zr', 'Hf'], '[Zr]': ['Ti', 'Hf'], '[Hf]': ['Ti', 'Zr'], '[V]': ['Nb', 'Ta'], '[Nb]': ['V', 'Ta'], '[Ta]': ['V', 'Nb'], '[Cr]': ['Mo', 'W'], '[Mo]': ['Cr', 'W'], '[W]': ['Cr', 'Mo'], '[Mn]': ['Tc', 'Re'], '[Tc]': ['Mn', 'Re'], '[Re]': ['Mn', 'Tc'], '[Fe]': ['Ru', 'Os'], '[Ru]': ['Fe', 'Os'], '[Os]': ['Fe', 'Ru'], '[Co]': ['Rh', 'Ir'], '[Rh]': ['Co', 'Ir'], '[Ir]': ['Co', 'Rh'], '[Ni]': ['Pd', 'Pt'], '[Pd]': ['Ni', 'Pt'], '[Pt]': ['Ni', 'Pd'], '[Cu]': ['Ag', 'Au'], '[Ag]': ['Cu', 'Au'], '[Au]': ['Cu', 'Ag'], '[Zn]': ['Cd', 'Hg'], '[Cd]': ['Zn', 'Hg'], '[Hg]': ['Zn', 'Cd'] } mutatable_dict = {} for pattern, mutate_to in ALL_PATTERNS_DICT.items(): matches = _monomer_evaluate_smarts_canvas(astructure, pattern) for match in matches: if not rxn_channel.get_grow_idxs(astructure, idxs=[match[0]]): mutatable_dict[match[0]] = mutate_to return mutatable_dict
[docs]def get_isoelectronic_mutator_indicies(astructure): """ Return a list of atom indicies that can be mutated by the isoelectronic mutator. :type astructure: schrodinger.structure.Structure :param astructure: the structure to be mutated :rtype: list :return: mutatable indicies """ def find_heavy_atom(astructure, match): for i in match: atom = astructure.atom[i] if atom.atomic_number > 1: return atom # isoelectronic SMARTS include aromatics NINE_ELECTRONS_SINGLE = [ '[#1][C-0X4]([#1])[#1]', '[#1][N-0X3][#1]', '[#1][O-0X2]', '[F-0X1]' ] EIGHT_ELECTRONS_SINGLE = ['[#1][C-0X4][#1]', '[#1][N-0X3]', '[O-0X2]'] EIGHT_ELECTRONS_DOUBLE = ['[#1][C-0X3][#1]', '[#1][N-0X2]', '[O-0X1]'] SEVEN_ELECTRONS_SINGLE = ['[#1][C-0X4]', '[N-0X3]'] SEVEN_ELECTRONS_DOUBLE = [ '[#1][c-0X3]', '[#1][C-0X3]', '[n-0X2]', '[N-0X2]' ] SEVEN_ELECTRONS_TRIPLE = ['[#1][C-0X2]', '[N-0X1]'] ALL_PATTERNS = NINE_ELECTRONS_SINGLE + EIGHT_ELECTRONS_SINGLE + \ EIGHT_ELECTRONS_DOUBLE + SEVEN_ELECTRONS_SINGLE + SEVEN_ELECTRONS_DOUBLE + \ SEVEN_ELECTRONS_TRIPLE # collect mutatable indicies mutatable_indicies = set() for pattern in ALL_PATTERNS: matches = _monomer_evaluate_smarts_canvas(astructure, pattern) for match in matches: heavy_atom = find_heavy_atom(astructure, match) idxs = [heavy_atom.index ] + [atom.index for atom in heavy_atom.bonded_atoms] if not rxn_channel.get_grow_idxs(astructure, idxs=idxs): mutatable_indicies.add(heavy_atom.index) mutatable_indicies = list(mutatable_indicies) mutatable_indicies.sort() return mutatable_indicies
[docs]def get_child_like_parent(parent_st, children_sts, definition): """ Return the child structure that is most like the provided parent. :type parent_st: schrodinger.structure.Structure :param parent_st: the parent structure :type children_sts: list of schrodinger.structure.Structure :param children_sts: the children structures :type definition: two-element list :param definition: each sublist contains two atom indicies describing the reactive bonds in parent and fragment structures which created the children :rtype: schrodinger.structure.Structure :return: the sought child structure """ # from the chosen channel determine how many atoms are on either side # of the reactive bond (inclusive) in the parent structure bond_parent, bond_fragment = definition aside_indicies = rxn_channel.indicies_from_bonds_deep( parent_st, bond_parent[0], [bond_parent[1]]) aside_natoms = len(aside_indicies) bside_natoms = parent_st.atom_total - aside_natoms # set up the following data structure to easily handle the # categorization of product structures given that the number # of product structures may be less than four due to the # unique option passed to reaction channel, here is the # list of possible a1_b1_a2_b2 for the following reaction # nomenclature, i.e. parent(A-B) + fragment(C-D) --> (1) # A-C (aka a1) + B-D (aka b1) or (2) A-D (aka a2) + B-C # (aka b2): # # [[ a1, None], [None, None]] A-A + C-C would skip second channel # and eliminate b1 via SMILES # [[ a1, b1], [None, None]] A-B + C-C would skip second channel # [[ a1, None], [ a2, b2]] A-B + C-D and eliminate b1 via SMILES # [[ a1, b1], [ a2, None]] A-B + C-D and eliminate b2 via SMILES # [[ a1, b1], [ a2, b2]] A-B + C-D # NO_RXN = 2 * [None] a1_b1_a2_b2 = [list(NO_RXN), list(NO_RXN)] for astructure in children_sts: pieces = astructure.title.split('.') structure_idx = int(pieces.pop()) - 1 reaction_idx = int(pieces.pop()) - 1 a1_b1_a2_b2[reaction_idx][structure_idx] = astructure # Mutation is meant to create a child structure by introducing # a small amount of new genetic information into the parent # structure and so the choice of child will be biased toward # that containing the larger portion of the parent, if there is # only one child like this the proceed in the obvious way, otherwise # prefer smaller children unless they differ from the parent # by a single atom (which really makes them medium sized children) # in which case flip a biased coin to choose between the smallest # child (that most like the parent and least like the fragment) # and largest child (that most like the parent and most like the # fragment), note that this chunk of code was added as a # preventative measure against increasingly larger children # handle one (in the if) or two (in the else) reactions natoms_func = lambda astructure: astructure.atom_total if NO_RXN in a1_b1_a2_b2: # in this case there is only one child that contains the # A-part of the parent and possibly one child that contains # the B-part of the parent, the fragment component for these # two children is identical so just pick the largest child a1, b1 = a1_b1_a2_b2[0] if not b1: child = a1 elif not a1: child = b1 else: child = sorted([a1, b1], key=natoms_func).pop() else: # in this case we are guaranteed to have two children that # contain the A-part of the parent and either one or two # children that contain the B-part of the parent, for those # that contain A and those that contain B determine the smaller # and the larger, the inequality sets whether we will want to # pick from A or from B in order to obtain a child that resembles # the parent more than the fragment # # for a polymer monomer reacting at the side chain (A'-B), where the # prime indicates that part with grow indices, with a non-monomer (C-D) # fragment we have: # # A'-B + C-D --> (1) A'-C + B-D and (2) A'-D + B-C # # but the child must be a monomer and so only A'-C and A'-D are # possible meaning that b1 and b2 can only ever be None def get_small_big(pair): sts = [st for st in pair if st] if not sts: return elif len(sts) == 1: return (sts[0], None) elif len(sts) == 2: return tuple(sorted(sts, key=natoms_func)) a1_a2, b1_b2 = map(get_small_big, zip(*a1_b1_a2_b2)) if a1_a2 and not b1_b2: small, big = a1_a2 natoms = aside_natoms elif not a1_a2 and b1_b2: small, big = b1_b2 natoms = bside_natoms elif a1_a2 and b1_b2: if aside_natoms >= bside_natoms: small, big = a1_a2 natoms = aside_natoms else: small, big = b1_b2 natoms = bside_natoms # pick the smaller if (1) there is only a single option, (2) # the smaller is different from the parent by more than a single # atom (this avoids always picking the child that for example # contains a single hydrogen from the fragment), or (3) if the # smaller child is different from the parent by only a single # atom then roll a biased coin to decide between picking that # smaller child or just pick the larger child if not big or small.atom_total != natoms + 1 or \ Util.randomFlipCoin(SMALL_CHILD_FREQ): child = small else: child = big return child
# mutators
[docs]def elemental_mutator(genome, **args): """ Perform a random elemental mutation to an element in the same column (as known as group) of the periodic table. Note that hydrogen and the halogens are considered to belong to the same column. :type genome: StructureGenome :param genome: a genome :type args: dict :param args: dictionary of genetic optimization parameters created and used by pyevolve :rtype: int :return: the number of mutations applied, appears to never be used in PyEvolve """ # get the mutation probability and test whether to perform a # single mutation at the requested mutation rate mutprob = args['pmut'] if not Util.randomFlipCoin(mutprob): nmutations = 0 return nmutations # reset the genome genome.removeProperties() # get some parameters freezer_files = genome.getParam('freezer_files') inoculate = genome.getParam('inoculate') # get mutatable indicies and return if this structure is not # mutatable mutatable_indicies = get_element_mutator_dict(genome.astructure) if not mutatable_indicies: nmutations = 0 if NO_CHILD in inoculate: genome.astructure = get_freezer_structure( freezer_files, inoculate=NO_CHILD, mutation_applied=ELEMENTAL_MUTATOR, basename_ext=genome.basename_ext) nmutations = 1 return nmutations # get parent string parent = genome.astructure.title # perform the single mutation index = my_random.choice(list(mutatable_indicies)) elements = mutatable_indicies[index] genome.astructure.atom[index].element = my_random.choice(elements) # finish up, remove_basename_ext covers the case where the mutation is performed # directly following a crossover, perform geometry optimization set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext) color.apply_color_scheme(genome.astructure, 'element') genome.astructure.property[MUTATION_PARENT_KEY] = remove_basename_ext( parent) genome.astructure.property[MUTATION_APPLIED_KEY] = ELEMENTAL_MUTATOR genome.optimizeGeometry() nmutations = 1 return nmutations
[docs]def fragment_mutator(genome, **args): """ Randomly mutate the genome by swapping a molecular fragement on one side of a bond by a similar fragment from a library. :type genome: StructureGenome :param genome: a genome :type args: dict :param args: dictionary of genetic optimization parameters created and used by pyevolve :rtype: int :return: the number of mutations applied, appears to never be used in PyEvolve """ # get the mutation probability and test whether to perform a # single mutation at the requested mutation rate mutprob = args['pmut'] if not Util.randomFlipCoin(mutprob): nmutations = 0 return nmutations # reset the genome genome.removeProperties() # get some parameters freezer_files = genome.getParam('freezer_files') inoculate = genome.getParam('inoculate') # get the number of mutatable bonds, i.e. the number of simple bonds, # and return if this structure is not mutatable num_simple_bonds = get_num_simple_bonds(genome.astructure) if not num_simple_bonds: nmutations = 0 if NO_CHILD in inoculate: genome.astructure = get_freezer_structure( freezer_files, inoculate=NO_CHILD, mutation_applied=FRAGMENT_MUTATOR, basename_ext=genome.basename_ext) nmutations = 1 return nmutations # get parent string parent = genome.astructure.title parent_basename_ext = genome.basename_ext # build the fragment dict fragment_dict = {FRAGMENT: genome.getParam('fragment_libs')} is_monomer = rxn_channel.are_monomers([genome.astructure]) # perform the single mutation found_fragment = False num_unproductive = 0 while not found_fragment: if num_unproductive == TRIES_FRAGMENT_MUTATOR: nmutations = 0 if NO_CHILD in inoculate: genome.astructure = get_freezer_structure( freezer_files, inoculate=NO_CHILD, mutation_applied=FRAGMENT_MUTATOR, basename_ext=genome.basename_ext) nmutations = 1 return nmutations # get a random fragment fragment_st = get_random_structure(fragment_dict) if not fragment_st: num_unproductive += 1 continue fragment_st.property.pop(FROM_FREEZER_KEY, None) rxn_channel.remove_grow_properties(fragment_st) # arbitrarily combine the two structure objects into a single structure # object to prep for reaction channel combo = combine_two_structures(genome.astructure, fragment_st) # find a single unique random double displacement channel or in the worst # case scenario we can't just return the number of mutations applied, try # random fragments TRIES_FRAGMENT_MUTATOR times before giving up, having to do # more than a single pass is very rare because here as opposed to bond # crossover no_reactive_h is False, for example only for A-A + A-A # or A-A + A-B scenarios, i.e. if the genome is Cl-Cl and the fragment is # Cl-Cl (ethane + ethane or ethane + methanol is now fine because of the # reactive H) channel_defs = rxn_channel.ChannelDefinitions() random_seed = my_random.randint(RANDOM_INT_BOUND + 1) channel_defs.addRandomChannels( combo, num_random_channels=1, random_types=['double_displacement'], no_reactive_h=False, random_seed=random_seed, unique=True, bin_rxn_types=True, no_reactive_polymer_monomer_backbone=True, no_reactive_polymer_monomer_sidechain=False) if not channel_defs.definitions: num_unproductive += 1 else: found_fragment = True # run the reaction channel module mae_out_file = 'mutate' + '.' + parent_basename_ext + '.mae' channels = rxn_channel.Channels( combo, channel_defs, mae_out_file=mae_out_file, isolate_products=True, no_minimization=genome.getParam('no_minimize'), no_reactive_h=False, unique=True) channels.orchestrate() # in the general case the 5 returned structures are (1) reactant, combination of # two structures i.e. A-B + C-D, (2) A-C, (3) B-D, (4) A-D, and (5) B-C however # we may also have only a single sub-channel, i.e. 3 structures structures = [ astructure for astructure in structure.StructureReader(mae_out_file) ] structures = structures[1:] fileutils.force_remove(mae_out_file) # require a monomer if is_monomer: structures = [st for st in structures if rxn_channel.are_monomers([st])] # with a certain probability get that child structure which is most like # the parent genome.astructure = get_child_like_parent( genome.astructure, structures, channel_defs.definitions[0].listdef) # clean up genome.astructure.property.pop(RXN_KEY, None) # finish up, remove_basename_ext covers the case where the mutation is performed # directly following a crossover set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext) genome.astructure.property[MUTATION_PARENT_KEY] = remove_basename_ext( parent) genome.astructure.property[MUTATION_APPLIED_KEY] = FRAGMENT_MUTATOR nmutations = 1 return nmutations
[docs]def isoelectronic_mutator(genome, **args): """ Perform a random isoelectronic mutation from the following sets of series CH3X, NH2X, OHX, and FX, CH2XY, NHXY, OXY, and CHXYZ and NXYZ, where X, Y, and Z are non-H-bonds. :type genome: StructureGenome :param genome: a genome :type args: dict :param args: dictionary of genetic optimization parameters created and used by pyevolve :rtype: int :return: the number of mutations applied, appears to never be used in PyEvolve """ # get the mutation probability and test whether to perform a # single mutation at the requested mutation rate mutprob = args['pmut'] if not Util.randomFlipCoin(mutprob): nmutations = 0 return nmutations # reset the genome genome.removeProperties() # get some parameters freezer_files = genome.getParam('freezer_files') inoculate = genome.getParam('inoculate') # get mutatable indicies and return if this structure is not # mutatable mutatable_indicies = get_isoelectronic_mutator_indicies(genome.astructure) if not mutatable_indicies: nmutations = 0 if NO_CHILD in inoculate: genome.astructure = get_freezer_structure( freezer_files, inoculate=NO_CHILD, mutation_applied=ISOELECTRONIC_MUTATOR, basename_ext=genome.basename_ext) nmutations = 1 return nmutations # get parent string parent = genome.astructure.title # perform the single mutation index = my_random.choice(mutatable_indicies) target_heavy = genome.astructure.atom[index] target_hydrogens = [bonded_atom.index for bonded_atom in \ target_heavy.bonded_atoms if bonded_atom.atomic_number == 1] to_elements = ['C', 'N', 'O', 'F'] for tmpidx in range(9 - target_heavy.atomic_number - len(target_hydrogens)): to_elements.pop() to_elements.remove(target_heavy.element) to_element = my_random.choice(to_elements) renumber_map = genome.astructure.deleteAtoms(target_hydrogens, renumber_map=True) new_index = renumber_map[index] target_heavy = genome.astructure.atom[new_index] target_heavy.element = to_element build.add_hydrogens(genome.astructure, atom_list=[target_heavy]) # finish up, remove_basename_ext covers the case where the mutation is performed # directly following a crossover, perform geometry optimization set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext) color.apply_color_scheme(genome.astructure, 'element') genome.astructure.property[MUTATION_PARENT_KEY] = remove_basename_ext( parent) genome.astructure.property[MUTATION_APPLIED_KEY] = ISOELECTRONIC_MUTATOR genome.optimizeGeometry() nmutations = 1 return nmutations
[docs]def get_loggable_float(afloat, num_decimal=NUM_DECIMAL, field_width=FIELD_WIDTH): """ Return a float as a string with the specified format. :type afloat: float :param afloat: a float to convert to a string :type num_decimal: str :param num_decimal: the format of the string representation :type field_width: int :param field_width: the field width of the final string :rtype: str :return: the float as a string """ afloat = num_decimal % afloat afloat = afloat.rjust(field_width) return afloat
# callbacks
[docs]def uniquify_titles_callback(ga_obj): """ Callback to uniquify titles of the individuals. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization """ generation = ga_obj.getCurrentGeneration() population = ga_obj.getPopulation() for index, indiv in enumerate(population, 1): ext = '_%s_%s' % (str(generation), str(index)) # titles coming into here look like either (1) <stoichiometry>.<basename_ext> # if the structure passed through the operators or (2) <stoichiometry>_%s_%s # if it did not (either because of elitism or the rates of operators), the order # of splitting is irrelevant because there will never be both patterns title = indiv.astructure.title.split('_')[0] title = remove_basename_ext(title) + ext indiv.astructure.title = title indiv.astructure.property[ENTRY_NAME_KEY] = title if indiv.skip: indiv.skip.updateTitle(title) if indiv.failure: indiv.failure.updateTitle(title)
[docs]def prepare_next_generation_dirs_callback(ga_obj): """ Callback to update the generation property of the genomes and to create a subdirectory to hold the next series of evaluations. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization """ # the dictionary accessed with an individual's setParams method # is shared by all individuals in the population and so it would # seem that you only need to set it for one individual and it would # be set for all, this is the behavior when run in serial, # however, the other individuals appear to not be updated # in this way when running in parallel, so in general we will set it # for all individuals next_generation = ga_obj.getCurrentGeneration() + 1 for indiv in ga_obj.getPopulation(): indiv.setParams(generation=next_generation) os.mkdir(os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + \ str(next_generation)))
[docs]def manage_skips_callback(ga_obj): """ Callback to manage skips in the evaluation. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization """ skips = ga_obj.getParam('skips') generation = ga_obj.getCurrentGeneration() skips[generation] = [] population = ga_obj.getPopulation() for indiv in population: skip = indiv.skip if skip: skips[generation].append(skip) ga_obj.setParams(skips=skips)
[docs]def manage_failures_callback(ga_obj): """ Callback to manage failures in the evaluation. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization """ jobbe = ga_obj.getParam('jobbe') failures = ga_obj.getParam('failures') generation = ga_obj.getCurrentGeneration() failures[generation] = [] # copy subjob directories of failed jobs back to the launch host population = ga_obj.getPopulation() for indiv in population: failure = indiv.failure if failure: failures[generation].append(failure) if jobbe: basename = indiv.infile.split(IN_MAE_EXT)[0] apath = os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + \ str(generation), basename) jobbe.copyOutputFile(apath) ga_obj.setParams(failures=failures) if len(failures[generation]) == len(population): logger = ga_obj.getParam('logger') if generation: logger.info('') print_bad_jobs(failures, logger, bad_type='fail') EXIT_MSG = ('All subjobs from generation %s have failed. ' 'This indicates that something is wrong with the ' 'provided input or job control settings. The subjob ' 'directories of failed jobs are automatically copied ' 'back to the launch host and should help isolate the ' 'problem. This script will now exit.') logger.error(EXIT_MSG % generation) sys.exit(EXIT_MSG % generation)
[docs]def logging_summary_callback(ga_obj): """ Callback to log progress. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization """ logger = ga_obj.getParam('logger') if not logger: return def get_minmaxavgstdbest(key): scores = [] for indiv in pop: # if this individual has been marked as a failure then # do not summarize any structure-based or physics-based # properties, regardless of which caused the failure if not indiv.failure: score = indiv.astructure.property.get(key) # the next two lines are needed to allow summarizing # structure-based properties of skipped individuals, # which are never simultaneously marked as failures, # but do not have data for physics-based properties if score is None: continue scores.append(score) if not scores: return None min_score = min(scores) max_score = max(scores) avg_score = numpy.mean(numpy.array(scores)) std_score = numpy.std(numpy.array(scores)) best_score = pop.bestRaw().astructure.property[key] return [min_score, max_score, avg_score, std_score, best_score] def print_header_one(): HEADER = 'Summary' logger.info(HEADER) EMPTY_TAG = '-' * FIELD_WIDTH all_loggables = [EMPTY_TAG] * 4 for name_and_units in prop_statistics_dict: loggable = name_and_units.center(FIELD_WIDTH * 5, '-') loggable = '|' + loggable[1:] all_loggables.append(loggable) EMPTY_TAG = '|' + EMPTY_TAG[1:] all_loggables.append(EMPTY_TAG) logger.info('%s' * len(all_loggables) % tuple(all_loggables)) def print_header_two(): GEN_TAG = 'Gen.'.ljust(FIELD_WIDTH) PERCENT_TAG = '% Comp.'.rjust(FIELD_WIDTH) NUM_SKIP_TAG = 'Skipped'.rjust(FIELD_WIDTH) NUM_FAIL_TAG = 'Failed'.rjust(FIELD_WIDTH) MIN_TAG = 'Min'.rjust(FIELD_WIDTH) MAX_TAG = 'Max'.rjust(FIELD_WIDTH) AVG_TAG = 'Avg'.rjust(FIELD_WIDTH) STD_TAG = 'Std'.rjust(FIELD_WIDTH) BEST_TAG = 'Best'.rjust(FIELD_WIDTH) TIME_TAG = 'Time/s'.rjust(FIELD_WIDTH) all_loggables = [GEN_TAG, PERCENT_TAG, NUM_SKIP_TAG, NUM_FAIL_TAG] all_loggables.extend([MIN_TAG, MAX_TAG, AVG_TAG, STD_TAG, \ BEST_TAG] * len(prop_statistics_dict)) all_loggables.append(TIME_TAG) HEADER = '%s' * len(all_loggables) % tuple(all_loggables) logger.info(HEADER) logger.info('-' * len(HEADER)) # collect by optimizable property a dictionary containing # the min, max, avg, and std of that property over the # current generation, also collect the value of that property # for the current best individual (best defined in the context # of all properties considered) pop = ga_obj.getPopulation() prop_statistics_dict = {} for aproperty in ga_obj.getParam('properties'): if aproperty.summarize: results = get_minmaxavgstdbest(aproperty.key) if results: results = list(map(get_loggable_float, results)) else: results = ['N/A'.rjust(FIELD_WIDTH)] * 5 aname = aproperty.name if aname == StructureEvaluator.SMARTS_PROP: aname += '_%s' % aproperty.index if aproperty.units: name_and_units = aname + '/' + aproperty.units else: name_and_units = aname prop_statistics_dict[name_and_units] = results # if it is the first generation then log a header including # boundaries to separate the descriptors for the different # sought properties and name the columns generation = ga_obj.getCurrentGeneration() if generation == 0: print_header_one() print_header_two() # get string representations of some quantities to log percent_done = 100 * float(generation) / float(ga_obj.getGenerations()) percent_done = NUM_DECIMAL % percent_done percent_done = str(percent_done + '%').rjust(FIELD_WIDTH) atime = get_loggable_float(ga_obj.printTimeElapsed()) skips = ga_obj.getParam('skips') num_skips = len(skips[generation]) num_skips = str(num_skips).rjust(FIELD_WIDTH) failures = ga_obj.getParam('failures') num_failures = len(failures[generation]) num_failures = str(num_failures).rjust(FIELD_WIDTH) generation = str(generation).ljust(FIELD_WIDTH) # for every generation log the important data all_loggables = [generation, percent_done, num_skips, num_failures] for minmaxavgstdbest in prop_statistics_dict.values(): all_loggables.extend(minmaxavgstdbest) all_loggables.append(atime) logger.info('%s' * len(all_loggables) % tuple(all_loggables))
[docs]def molecule_history_callback(ga_obj): """ Callback to append all structures from all generations to individual log files. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization """ jobbe = ga_obj.getParam('jobbe') subjob_data_exists = os.path.exists(EVALUATOR_JOBS_DIR) generation = ga_obj.getCurrentGeneration() generation_log_file_name = \ get_generation_log_file_name(ga_obj.getParam('file_base_name'), generation) population = ga_obj.getPopulation() molecule_history_writer = structure.MaestroWriter(generation_log_file_name) for index, individual in enumerate(population, 1): if PREVIOUS in individual.getParam('freezer_files'): individual.addPreviousFreezerFile(generation_log_file_name) individual.updateStructureProperties(index, generation) molecule_history_writer.append(individual.astructure) individual.resetParentProperties() if jobbe and subjob_data_exists and not individual.skip and \ not individual.failure: basename = individual.infile.split(IN_MAE_EXT)[0] apath = os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + \ str(generation), basename) for content in os.listdir(apath): root, ext = fileutils.splitext(content) if os.path.isfile(os.path.join(apath, content)) and \ ext in EXTS_TO_RETURN: shutil.copy(os.path.join(apath, content), os.getcwd()) jobbe.copyOutputFile(content) molecule_history_writer.close() if jobbe: jobbe.copyOutputFile(generation_log_file_name)
# terminators
[docs]def first_property(ga_obj): """ Terminate when the first property has been matched. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization :rtype: bool :return: True to terminate, False otherwise """ logger = ga_obj.getParam('logger') # loop over the current values for the sought properties and for the first # one that meets the specified criterion exit for aproperty in ga_obj.getParam('properties'): key, target, error, name, comparator, units = aproperty.key, aproperty.target, \ aproperty.error, aproperty.name, aproperty.comparator, aproperty.units if units is None: units = '' current = ga_obj.bestIndividual().astructure.property.get(key) if current is None: continue TERM_MSG = '' if comparator == Property.EQUALS: if target - error <= current <= target + error: TERM_MSG = ('The first property has been matched and so evolution ' 'will terminate, i.e. the current value of %s of %.2f%s is within %.2f%s ' 'of the target value of %.2f%s.') % (name, current, units, error, \ units, target, units) elif comparator == Property.GREATER_THAN: if current > target: TERM_MSG = ( 'The first property has been matched and so evolution ' 'will terminate, i.e. the current value of %s of %.2f%s is greater than ' 'the target value of %.2f%s.') % (name, current, units, target, units) elif comparator == Property.LESS_THAN: if current < target: TERM_MSG = ( 'The first property has been matched and so evolution ' 'will terminate, i.e. the current value of %s of %.2f%s is less than ' 'the target value of %.2f%s.') % (name, current, units, target, units) if TERM_MSG: logger.info('') logger.info(TERM_MSG) logger.info('') ga_obj.setParams(terminated=True) return True # if we didn't meet the first criterion and did specify the unproductive # criterion as a second then run it now if ga_obj.getParam('num_unproductive'): return unproductive(ga_obj)
[docs]def all_properties(ga_obj): """ Terminate when all properties have been matched. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization :rtype: bool :return: True to terminate, False otherwise """ # if we have only a single property then call first_property properties = ga_obj.getParam('properties') if len(properties) == 1: return first_property(ga_obj) attrs = ('key', 'minimax', 'target', 'error', 'comparator') for prop in properties: values = map(lambda x: getattr(prop, x), attrs) key, minimax, target, error, comparator = values current = ga_obj.bestIndividual().astructure.property.get(key) if current is None: break if minimax in [Property.MIN, Property.MAX]: break if comparator == Property.EQUALS and not (target - error <= current <= target + error): break if comparator == Property.GREATER_THAN and current <= target: break if comparator == Property.LESS_THAN and current >= target: break else: logger = ga_obj.getParam('logger') logger.info('') logger.info('All properties have been matched and so evolution ' 'will terminate.') logger.info('') ga_obj.setParams(terminated=True) return True if ga_obj.getParam('num_unproductive'): return unproductive(ga_obj)
[docs]def unproductive(ga_obj): """ Terminate if the maximum number of unproductive generations has been reached. :type ga_obj: GSimpleGA.GSimpleGA :param ga_obj: the entire current state of the genetic optimization :rtype: bool :return: True to terminate, False otherwise """ logger = ga_obj.getParam('logger') num_unproductive = ga_obj.getParam('num_unproductive') TERM_MSG = ( 'The maximum number of %s unproductive generations has been reached ' 'and so evolution will terminate.') current_best = ga_obj.bestIndividual().getRawScore() # if generation zero then store the current best and start a counter for the # number of unproductive generations, if not generation zero then if the # current and previous are different then reset the counter, otherwise # increment the counter until it reaches the exit condition if ga_obj.getCurrentGeneration() == 0: ga_obj.setParams(prev_gener_best=current_best, unproductive_so_far=0) else: previous_best = ga_obj.getParam('prev_gener_best') if abs(current_best - previous_best) < TERM_THRESH: unproductive_so_far = ga_obj.getParam('unproductive_so_far') + 1 ga_obj.setParams(unproductive_so_far=unproductive_so_far) if unproductive_so_far == num_unproductive: logger.info('') logger.info(TERM_MSG % num_unproductive) logger.info('') ga_obj.setParams(terminated=True) return True else: ga_obj.setParams(prev_gener_best=current_best, unproductive_so_far=0)
[docs]class Failure(Skip): """ Manage a failure. """
[docs]class CheckInput: """ Manage checking user input. """
[docs] def checkMaeFile(self, input_file, logger=None): """ Check that a file exists and is `*mae`. :type input_file: str :param input_file: the name of the input file :type logger: logging.Logger :param logger: output logger """ # throw error if the input file can not be found NO_EXIST_MSG = ('The following Maestro input file that you have\n' 'specified can not be found: %s. Please specify\n' 'a valid Maestro input file. This script will now\n' 'exit.') if not os.path.exists(input_file): if logger: logger.error(NO_EXIST_MSG % input_file) sys.exit(NO_EXIST_MSG % input_file) # throw error if the input file is not a Maestro file NOT_MAESTRO_FILE_MSG = ( 'The following input file that you have specified\n' 'is not a Maestro file: %s. Please specify a valid\n' 'Maestro file. This script will now exit.') if not fileutils.is_maestro_file(input_file): if logger: logger.error(NOT_MAESTRO_FILE_MSG % input_file) sys.exit(NOT_MAESTRO_FILE_MSG % input_file) # throw error if there are no structures in the Maestro file NO_STRUCTURES_MSG = ( 'The following input file that you have specified\n' 'contains no structures: %s. Please specify a valid\n' 'Maestro file. This script will now exit.') if not structure.count_structures(input_file): if logger: logger.error(NO_STRUCTURES_MSG % input_file) sys.exit(NO_STRUCTURES_MSG % input_file)
[docs] def checkOperators(self, operators, logger=None): """ Check the operators. :type operators: list :param operators: contains tuples of the operator functions and their weights :type logger: logging.Logger :param logger: output logger """ NO_OPERATORS_MSG = ( 'You have specified non-zero crossover and/or mutation ' 'rates yet you have not specified any crossover and/or mutation operators ' 'to apply. This script will now exit.') if not operators: if logger: logger.error(NO_OPERATORS_MSG) sys.exit(NO_OPERATORS_MSG) BAD_WEIGHTS_MSG = ( 'The sum of the operator weights of at least one of the ' 'specified sets of genetic operators is not 1 but rather %s. Please ' 'specify weights that sum to 1. This script will now exit.') functions, weights = list(zip(*operators)) asum = sum(list(weights)) if asum != 1: if logger: logger.error(BAD_WEIGHTS_MSG % asum) sys.exit(BAD_WEIGHTS_MSG % asum)
[docs] def checkRates(self, crossover_rate, mutation_rate, logger=None): """ Check the specified rates of crossover and mutation. :type crossover_rate: float :param crossover_rate: the rate of crossover as a percentage :type mutation_rate: float :param mutation_rate: the rate of mutation as a percentage :type logger: logging.Logger :param logger: output logger """ BAD_RATE_MSG = ( 'Crossover and mutation rates should be specified as ' 'percentages and should be 0 <= rate <= 100. In order to progress ' 'the optimization these two values can not both be zero. You have ' 'specified an invalid rate for at least one of the operators. This ' 'script will now exit.') both_zero = crossover_rate == 0 and mutation_rate == 0 if crossover_rate < 0 or crossover_rate > 100 or mutation_rate < 0 or \ mutation_rate > 100 or both_zero: if logger: logger.error(BAD_RATE_MSG) sys.exit(BAD_RATE_MSG)
[docs] def checkInitialPopulation(self, initial_population, crossover_names, mutator_names, crossover_rate, mutation_rate, no_open_shell, logger=None): """ Check the initial population. :type initial_population: list :param initial_population: the initial population of schrodinger.structure.Structure :type crossover_names: list :param crossover_names: contains the function names of the crossover operators to be used :type mutator_names: list :param mutator_names: contains the function names of the mutation operators to be used :type crossover_rate: float :param crossover_rate: the rate of crossover :type mutation_rate: float :param mutation_rate: the rate of mutation :type no_open_shell: bool :param no_open_shell: if True then check for open shell structures otherwise do not :type logger: logging.Logger :param logger: output logger """ # throw error if less than two structures are provided LESS_THAN_TWO_MSG = ( 'The initial population that you have ' 'specified contains less than two structures. An initial ' 'population must contain at least two structures. This ' 'script will now exit.') if len(initial_population) < 2: if logger: logger.error(LESS_THAN_TWO_MSG) sys.exit(LESS_THAN_TWO_MSG) # check for open shell structures if no_open_shell: OPEN_SHELL_MSG = ( 'The following list of input structures have open ' 'shells and therefore can not be processed for genetic optimization. ' 'Please reconsider such structures. Here are the titles: %s.') open_shell_titles = [] for astructure in initial_population: if structure_is_open_shell(astructure, ignore_charge=True): open_shell_titles.append(astructure.title) if open_shell_titles: if logger: logger.error(OPEN_SHELL_MSG % open_shell_titles) sys.exit(OPEN_SHELL_MSG % open_shell_titles) # check initial population against all requested operators NO_BOND_CROSSOVER_MSG = ( 'The following list of input structures do not ' 'have any simple bonds which are needed when performing bond crossover. ' 'Simple bonds are acyclic single order bonds that may involve a hydrogen ' 'atom. Here are the titles of such structures: %s.') NO_ELEMENTAL_MUTATION_MSG = ( 'The following list of input structures do not ' 'have any elements that are mutatable by the elemental mutation operator. ' 'Here are the titles of such structures: %s.') NO_ISOELECTRONIC_MUTATION_MSG = ( 'The following list of input structures do ' 'not have any functional groups that are mutatable by the isoelectronic ' 'mutation operator. Here are the titles of such structures: %s.') FAILED_ALL_MSG = ( 'The following list of input structures can not be operated ' 'on by any of the requested mutation operators. Such structures will inhibit ' 'evolutionary progress by taking up space in a population of fixed size that ' 'could otherwise be occupied by other candidate molecules. Regardless of ' 'their score they will remain in the population over successive generations ' 'until whatever selection protocol naturally removes them. For these reasons ' 'this script will now exit. Please reconsider your initial population. ' 'Here are the titles of such structures: %s.') bond_crossover_fails = [] elemental_mutation_fails = [] isoelectronic_mutation_fails = [] for astructure in initial_population: if crossover_rate: if bond_crossover.__name__ in crossover_names: if not get_num_simple_bonds(astructure): bond_crossover_fails.append(astructure.title) if mutation_rate: if elemental_mutator.__name__ in mutator_names: if not get_element_mutator_dict(astructure): elemental_mutation_fails.append(astructure.title) if isoelectronic_mutator.__name__ in mutator_names: if not get_isoelectronic_mutator_indicies(astructure): isoelectronic_mutation_fails.append(astructure.title) # log individual failures if bond_crossover_fails and logger: logger.warning(NO_BOND_CROSSOVER_MSG % bond_crossover_fails) if elemental_mutation_fails and logger: logger.warning(NO_ELEMENTAL_MUTATION_MSG % elemental_mutation_fails) if isoelectronic_mutation_fails and logger: logger.warning(NO_ISOELECTRONIC_MUTATION_MSG % isoelectronic_mutation_fails) # collect and log titles of structures that fail all specified operators to_intersect = [] if crossover_rate: if bond_crossover.__name__ in crossover_names: to_intersect.append(set(bond_crossover_fails)) if mutation_rate: if elemental_mutator.__name__ in mutator_names: to_intersect.append(set(elemental_mutation_fails)) if isoelectronic_mutator.__name__ in mutator_names: to_intersect.append(set(isoelectronic_mutation_fails)) if to_intersect: failed_all = reduce(lambda x, y: x & y, to_intersect) if failed_all: if logger: logger.error(FAILED_ALL_MSG % list(failed_all)) sys.exit(FAILED_ALL_MSG % list(failed_all))
[docs] def checkPopulationParam(self, population, num_structures_given, logger=None): """ Check the population parameter. :type population: int :param population: the size of the population to use in the genetic optimization :type num_structures_given: int :param num_structures_given: the number of structures provided to the genetic optimization :type logger: logging.Logger :param logger: output logger """ BAD_POP_MSG = ( 'The population size, i.e. %s, that you have ' 'specified to use in the genetic optimization is either greater ' 'than the number of usable initial structures, i.e. %s, that you have ' 'provided or less than 2 which is the minimum number of individuals ' 'required for genetic optimization. Please provide a value, N, that is ' '2 <= N <= number of usable initial structures. This script will now ' 'exit.') if population > num_structures_given or population < 2: if logger: logger.error(BAD_POP_MSG % (population, num_structures_given)) sys.exit(BAD_POP_MSG % (population, num_structures_given))
[docs] def checkFragmentLibs(self, fragment_libs, logger=None): """ Check the specified fragment libraries. :type fragment_libs: list :param fragment_libs: strings specifying fragment libraries to be used :type logger: logging.Logger :param logger: output logger :rtype: list :return: valid user provided fragment files """ categories = set(FRAGMENT_LIBS) categories.add(ALL) fragment_files = set(fragment_libs).difference(categories) for fragment_file in fragment_files: self.checkMaeFile(fragment_file, logger) fragment_files = list(fragment_files) fragment_files.sort() return fragment_files
[docs] def checkProperties(self, properties, logger=None): """ Check the list of properties. :type properties: list :param properties: contains Property instances :type logger: logging.Logger :param logger: output logger """ NO_PROPERTIES_MSG = ('No valid properties have been defined ' 'and so this script will now exit.') BAD_PROPERTIES_MSG = ( 'The specified properties with the ' 'following numbering are not valid Property instances ' 'and so this script will now exit, i.e. %s.') if not properties: if logger: logger.error(NO_PROPERTIES_MSG) sys.exit(NO_PROPERTIES_MSG) bad_properties = [] for aproperty in properties: if not isinstance(aproperty, Property): bad_properties.append(aproperty.index) if bad_properties: if logger: logger.error(BAD_PROPERTIES_MSG % bad_properties) sys.exit(BAD_PROPERTIES_MSG % bad_properties)
[docs] def checkGenerations(self, generations, logger=None): """ Check the specified number of generations. :type generations: int :param generations: the number of generations :type logger: logging.Logger :param logger: output logger """ NOT_ENOUGH_GENERATIONS_MSG = ( 'You have specified a number ' 'of generations that is less than 1. You must perform ' 'at least a single generation. This script will now ' 'exit.') if generations < 1: if logger: logger.error(NOT_ENOUGH_GENERATIONS_MSG) sys.exit(NOT_ENOUGH_GENERATIONS_MSG)
[docs] def checkSelection(self, selection, logger=None): """ Check the specified selection protocol. :type selection: str :param selection: the selection protocol to use. :type logger: logging.Logger :param logger: output logger """ BAD_SELECTION_MSG = ('The selection protocol that you ' 'have requested is unsupported. Please choose a ' 'supported selection protocol from the following ' 'list: %s. This script will now exit.') if selection not in SELECTION_CHOICES: if logger: logger.error(BAD_SELECTION_MSG % SELECTION_CHOICES) sys.exit(BAD_SELECTION_MSG % SELECTION_CHOICES)
[docs] def checkTournamentSize(self, tournament_size, population, logger=None): """ Check the specified tournament size. :type tournament_size: int :param tournament_size: the size of tournament to use in tournament based selection :type population: int :param population: the size of population to use :type logger: logging.Logger :param logger: output logger """ BAD_SIZE_MSG = ( 'The tournament size used in tournament based ' 'selection must be at least two (as is the case in classic ' 'binary tournament selection) and not exceed the number of ' 'individuals in the population. This script will now exit.') if tournament_size < 2 or tournament_size > population: if logger: logger.error(BAD_SIZE_MSG) sys.exit(BAD_SIZE_MSG)
[docs] def checkTerminationParams(self, terminators, num_unproductive, logger=None): """ Check the termination parameters. :type terminators: list :param terminators: the list of terminators to use :type num_unproductive: int :param num_unproductive: used when the unproductive termination option is active, it is the generation number on which to exit if the score hasn't improved :type logger: logging.Logger :param logger: output logger :rtype: list and int :return: valid terminators and valid num_unproductive """ BAD_TERM_MSG = ( 'You have specified at least one termination ' 'protocol that is not supported. The following are the ' 'supported termination protocols: %s. This script will now ' 'exit.') if not set(terminators).issubset(set(TERM_CHOICES)): if logger: logger.error(BAD_TERM_MSG % TERM_CHOICES) sys.exit(BAD_TERM_MSG % TERM_CHOICES) NUM_UNPRODUCTIVE_MSG = ( 'You have included a termination criteria ' 'based on the number of unproductive generations but the number ' 'of such generations that you have provided is less than one. ' 'Please provide a value of one or greater. This script will now ' 'exit.') if MAX_GENERATIONS_TERM in terminators: terminators = [MAX_GENERATIONS_TERM] num_unproductive = 0 else: if UNPRODUCTIVE_TERM in terminators: if num_unproductive < 1: if logger: logger.error(NUM_UNPRODUCTIVE_MSG) sys.exit(NUM_UNPRODUCTIVE_MSG) else: num_unproductive = 0 if FIRST_PROPERTY_TERM in terminators and \ ALL_PROPERTIES_TERM in terminators: terminators.remove(FIRST_PROPERTY_TERM) return terminators, num_unproductive
[docs] def checkScaling(self, scaling, properties, logger=None): """ Check the scaling. :type scaling: str :param scaling: the scaling protocol to use in the genetic optimization :type properties: list :param properties: the properties to be optimized :type logger: logging.Logger :param logger: output logger """ BAD_SCALING_MSG = ( 'You have specified an unsupported ' 'scaling protocol. Please choose a supported protocol ' 'from the following list: %s. This script will now ' 'exit.') if scaling not in SCALING_CHOICES: if logger: logger.error(BAD_SCALING_MSG % SCALING_CHOICES) sys.exit(BAD_SCALING_MSG % SCALING_CHOICES) NEGATIVE_ISSUE_MSG = ( 'You have specified to optimize a ' 'property that can take on negative, as well as positive, ' 'values but you have specified a scaling protocol that ' 'can only be used for positive scores. Please choose ' 'a scaling protocol from the following list of protocols ' 'that can be used for negative scores: %s. This script ' 'will now exit.') negative_property_present = False for aproperty in properties: if not aproperty.positive: negative_property_present = True if scaling not in ALLOWS_NEGATIVE_SCORES and \ negative_property_present: if logger: logger.error(NEGATIVE_ISSUE_MSG % ALLOWS_NEGATIVE_SCORES) sys.exit(NEGATIVE_ISSUE_MSG % ALLOWS_NEGATIVE_SCORES)
[docs] def checkElitism(self, elitism, population, logger=None): """ Check the elitism. :type elitism: int :param elitism: the number of elite individuals to use :type population: int :param population: the size of population to use :type logger: logging.Logger :param logger: output logger """ BAD_ELITISM_MSG = ( 'The elitism parameter must be >= 0 and ' 'less than the population size. This script will now ' 'exit.') if elitism < 0 or elitism >= population: if logger: logger.error(BAD_ELITISM_MSG) sys.exit(BAD_ELITISM_MSG)
[docs] def checkConformationalSearch(self, conformational_search, logger=None): """ Check the conformational search. :type conformational_search: bool or str :param conformational_search: specifies whether a conformational search is to be performed, if a string is given specifies a file used to set options :type logger: logging.Logger :param logger: output logger """ if type(conformational_search) is bool: return no_file_msg = ('The simplified Macromodel input file, %s, ' 'can not be found.') if not os.path.exists(conformational_search): if logger: logger.error(no_file_msg % conformational_search) sys.exit(no_file_msg % conformational_search)
[docs] def checkFreezers(self, freezers, pop_size, input_size, logger=None): """ Check the freezers. :type freezers: list :param freezers: collection of freezers to use :type pop_size: int :param pop_size: the size of the population :type input_size: int :param input_size: the number of structures given :type logger: logging.Logger :param logger: output logger :rtype: list :return: collection of freezers to use """ unsupported_msg = ( 'The following freezers are not supported, ' '%s. Please choose any from the following list, %s.') unsupported = set(freezers).difference(set(FREEZER_CHOICES)) if unsupported: if logger: logger.error(unsupported_msg % (unsupported, FREEZER_CHOICES)) sys.exit(unsupported_msg % (unsupported, FREEZER_CHOICES)) no_remainder_msg = ( 'You have specified to use the remainder of ' 'input structures as a freezer but you are already using all ' 'of those structures in your population. This freezer will ' 'therefore be skipped.') no_freezers_msg = ('There are now no freezers available.') if REMAINDER in freezers and pop_size == input_size: if logger: logger.warning(no_remainder_msg) freezers.remove(REMAINDER) if not freezers: if logger: logger.error(no_freezers_msg) sys.exit(no_freezers_msg) return freezers
[docs] def checkInoculate(self, inoculate, logger=None): """ Check the inoculate. :type inoculate: list :param inoculate: circumstances in which to inoculate :type logger: logging.Logger :param logger: output logger """ unsupported_msg = ( 'The following inoculate option are not supported, ' '%s. Please choose any from the following list, %s.') unsupported = set(inoculate).difference(set(INOCULATE_CHOICES)) if unsupported: if logger: logger.error(unsupported_msg % (unsupported, INOCULATE_CHOICES)) sys.exit(unsupported_msg % (unsupported, INOCULATE_CHOICES))
[docs]class GeneticOptimization: """ Manage the genetic optimization. """ MSGWIDTH = 80
[docs] def __init__(self, initial_population, properties, structure_score_threshold=STRUCTURE_SCORE_THRESHOLD, eval_kwargs=DEFAULT_EVAL_KWARGS, crossovers=None, mutators=None, fragment_libs=FRAGMENT_LIBS_DEFAULT, script_evaluator=None, generations=GENERATIONS, population=POPULATION, crossover_rate=CROSSOVER_RATE, mutation_rate=MUTATION_RATE, selection=DEFAULT_SELECTION, tournament_size=TOURNAMENT_SIZE, terminators=DEFAULT_TERMS, num_unproductive=NUM_UNPRODUCTIVE, scaling=DEFAULT_SCALING, elitism=ELITISM, random_seed=RANDOM_SEED, no_minimize=NO_MINIMIZE, file_base_name=FILE_BASE_NAME, no_open_shell=NO_OPEN_SHELL, props_to_remove=None, jobbe=None, conformational_search=CONFORMATIONAL_SEARCH, freezers=FREEZERS_DEFAULT, inoculate=INOCULATE_DEFAULT, class_evaluators=None, logger=None): """ Create an instance. :type initial_population: list :param initial_population: the initial population of schrodinger.structure.Structure :type properties: list of Property :param properties: the properties to be optimized, including structural properties as well as more physical calculable observables :type structure_score_threshold: float :param structure_score_threshold: if structure-based properties are being sought and if the base evaluator will be used then subjobs on structures with structure scores below this value will not be launched but rather such structures treated as skips :type eval_kwargs: dict :param eval_kwargs: a dictionary that will be available in all evaluator functions :type crossovers: list :param crossovers: contains two-element tuples each of which holds a crossover operator to be used in the optimization along with a weight :type mutators: list :param mutators: contains two-element tuples each of which holds a mutation operator to be used in the optimization along with a weight :type fragment_libs: list :param fragment_libs: strings specifying fragment libraries to be used, can be either module constants from FRAGMENT_LIBS.keys() (or ALL if all of those are desired) or the names of Maestro files (including the file extensions) containing fragments collected by the user :type script_evaluator: method :param script_evaluator: the evaluator function to be called to score individuals during the optimization, takes a StructureGenome and returns a JobDJ :type generations: int :param generations: the number of generations for which to run the optimization :type population: int :param population: the population size to use in the optimization, can be less-than-or-equal-to the length of initial_population :type crossover_rate: float :param crossover_rate: the rate of crossover as a percentage :type mutation_rate: float :param mutation_rate: the rate of mutation as a percentage :type selection: str :param selection: the selection protocol used to select individuals to the gene pool for the upcoming generation :type tournament_size: int :param tournament_size: the size of tournament to use if using tournament based selection, unused if a tournament based selection is not being used :type terminators: list :param terminators: list of strings that specify the termination protocols to be used to terminate the optimization, typically more than one is specified only if the unproductive protocol is being used :type num_unproductive: int :param num_unproductive: if the unproductive protocol is being used to terminate the optimization then this integer specifies how many unproductive cycles are allowed before terminating, unused if a different termination protocol is used :type scaling: str :param scaling: specifies the scaling protocol to use, scaling scales the raw scores of the individuals to produce fitness scores to ease selection in cases where raw scores are nearly equal :type elitism: int :param elitism: specify the number of elite individuals guaranteed to be added to the gene pool for the upcoming generation, zero disables elitism :type random_seed: None or int :param random_seed: the random seed, if None then system time will be used :type no_minimize: bool :param no_minimize: specify that the offspring structures generated by the crossover and mutation operators not be geometry optimized prior to selection :type file_base_name: str :param file_base_name: base name to use for output and generation log files :type no_open_shell: bool :param no_open_shell: if True then do not allow the processing of open shell molecules, False otherwise :type props_to_remove: list :param props_to_remove: a list of structure property keys to be removed prior to the evaluation stage :type jobbe: schrodinger.job.jobcontrol._Backend :param jobbe: the jobcontrol backend of the driver job :type conformational_search: bool or str :param conformational_search: specifies whether a Macromodel conformational search will be performed prior to evaluation, when a string it specifies a simplified Macromodel input file containing extra options :type freezers: list :param freezers: a collection of freezers containing structures that are used to swap out individuals from the population :type inoculate: list :param inoculate: the list of circumstances under which to use the structure freezers :type class_evaluators: dict :param class_evaluators: keys are the evaluator classes to be called to score individuals during the optimization, each must inherit ClassEvaluator, values are lists of Property to be passed to the class evaluator :type logger: logging.Logger :param logger: output logger """ self.initial_population = initial_population self.properties = properties self.structure_score_threshold = structure_score_threshold self.eval_kwargs = eval_kwargs if not crossovers: self.crossovers = [(bond_crossover, 1)] else: self.crossovers = crossovers if not mutators: self.mutators = \ [(fragment_mutator, 0.5), (isoelectronic_mutator, 0.5)] else: self.mutators = mutators self.fragment_libs = list(map(os.path.basename, fragment_libs)) if not script_evaluator: self.script_evaluator = optoelectronics_evaluator else: self.script_evaluator = script_evaluator self.generations = generations self.population = population self.crossover_rate = crossover_rate self.mutation_rate = mutation_rate self.selection = selection self.tournament_size = tournament_size self.terminators = terminators self.num_unproductive = num_unproductive self.scaling = scaling self.elitism = elitism self.random_seed = random_seed self.no_minimize = no_minimize self.file_base_name = file_base_name self.no_open_shell = no_open_shell self.props_to_remove = props_to_remove self.jobbe = jobbe if type(conformational_search) is bool: self.conformational_search = conformational_search else: self.conformational_search = os.path.basename(conformational_search) self.freezers = freezers self.inoculate = inoculate if class_evaluators is None: class_evaluators = {} self.class_evaluators = class_evaluators self.logger = logger self.pyevolve_random_seed = None self.initializers = [from_initial_population] self.skips = {} self.failures = {} self.ga_obj = None self.crossover_names = None self.mutator_names = None
[docs] def setRootLoggerForPyEvolve(self): """ Set up the root logger for PyEvolve. """ # can't seem to get pyevolve.logEnable() (in __init__.py) to work so do it by hand, # PyEvolve code will use the root logger (as well as a few print statements), # the root logger comes in with a NullHandler so first remove that if isinstance(logging.root.handlers[0], logging.FileHandler): logging.root.handlers[0].close() logging.root.removeHandler(logging.root.handlers[0]) # just use their original format aformat = '%(asctime)s [%(module)s:%(funcName)s:%(lineno)d] %(levelname)s %(message)s' # build root logger to use FileHandler handler = logging.FileHandler(self.file_base_name + PYEVOLVE_LOG_EXT, mode='w') formatter = logging.Formatter(aformat) handler.setFormatter(formatter) logging.root.addHandler(handler) logging.root.setLevel(logging.DEBUG) # log a header HEADER = 'PyEvolve log' logging.info(HEADER) logging.info('-' * len(HEADER)) logging.info('')
[docs] def setOperatorNames(self): """ Set the operator names. """ self.crossover_names = [] for atuple in self.crossovers: crossover, weight = atuple self.crossover_names.append(crossover.__name__) self.mutator_names = [] for atuple in self.mutators: mutator, weight = atuple self.mutator_names.append(mutator.__name__)
[docs] def checkInputParams(self): """ Check the input parameters. """ # check rates CheckInput().checkRates(self.crossover_rate, self.mutation_rate, self.logger) # check crossover and mutation operators and weights if self.crossover_rate: CheckInput().checkOperators(self.crossovers, self.logger) if self.mutation_rate: CheckInput().checkOperators(self.mutators, self.logger) # check the initial population CheckInput().checkInitialPopulation(self.initial_population, self.crossover_names, self.mutator_names, self.crossover_rate, self.mutation_rate, self.no_open_shell, self.logger) # check the specified size of the population, this can be less than the # number of structures in the initial population CheckInput().checkPopulationParam(self.population, len(self.initial_population), self.logger) # check any provided fragment files CheckInput().checkFragmentLibs(self.fragment_libs, self.logger) # check properties CheckInput().checkProperties(self.properties, self.logger) # check generations CheckInput().checkGenerations(self.generations, self.logger) # check selection CheckInput().checkSelection(self.selection, self.logger) # check tournament size CheckInput().checkTournamentSize(self.tournament_size, self.population, self.logger) # check termination parameters, note that if the user has only specified to # terminate after all requested generations and all specified properties involve # equality to a value, then the optimization will proceed until that number # of generations even though it may have found the best solution, i.e. that # with all of the desired property equalities, the reason for this is that the # user might also be interested in the next closest solutions for which the # closeness might improve over the course of those additional generations # (depending on the selection protocol), how best to terminate a genetic # optimization is somewhat ambiguous anyway and we certainly want to honor # (without modification) the case where the user wants to run all requested # generations self.terminators, self.num_unproductive = \ CheckInput().checkTerminationParams(self.terminators, self.num_unproductive, \ self.logger) # check scaling protocol CheckInput().checkScaling(self.scaling, self.properties, self.logger) # check elitism CheckInput().checkElitism(self.elitism, self.population, self.logger) # check random seed rxn_channel.CheckInput().checkRandomSeed(self.random_seed, self.logger) # check the conformational search CheckInput().checkConformationalSearch(self.conformational_search, self.logger) # check the freezers self.freezers = CheckInput().checkFreezers(self.freezers, self.population, len(self.initial_population), self.logger) # check the inoculate CheckInput().checkInoculate(self.inoculate, self.logger)
[docs] def printProperties(self): """ Log the set of sought properties and their details. """ HEADER = 'The following properties have been defined:' prop_strs = [] for property_obj in self.properties: prop_strs.append(repr(property_obj)) if prop_strs: self.logger.info(HEADER) self.logger.info('-' * len(HEADER)) for prop_str in prop_strs: self.logger.info(prop_str) self.logger.info('')
[docs] def printParams(self): """ Log the parameters. """ HEADER = 'Genetic Optimization Parameters' num_candidate_sts = textlogger.get_param_string( 'Num. candidate structures given', len(self.initial_population), self.MSGWIDTH) population = textlogger.get_param_string('Population', self.population, self.MSGWIDTH) structure_score_threshold = textlogger.get_param_string( 'Structure score threshold', get_loggable_float(self.structure_score_threshold).strip(), self.MSGWIDTH) generations = textlogger.get_param_string('Generations', self.generations, self.MSGWIDTH) crossovers = textlogger.get_param_string( 'Crossovers', ', '.join(self.crossover_names), self.MSGWIDTH) crossover_rate = textlogger.get_param_string('Crossover rate', self.crossover_rate, self.MSGWIDTH) mutators = textlogger.get_param_string('Mutators', ', '.join(self.mutator_names), self.MSGWIDTH) mutation_rate = textlogger.get_param_string('Mutation rate', self.mutation_rate, self.MSGWIDTH) fragment_libs = textlogger.get_param_string( 'Fragment libraries', ', '.join(self.fragment_libs), self.MSGWIDTH) elitism = textlogger.get_param_string('Elitism', self.elitism, self.MSGWIDTH) no_minimize = textlogger.get_param_string('No minimization', self.no_minimize, self.MSGWIDTH) selection = textlogger.get_param_string('Selection', self.selection, self.MSGWIDTH) tournament_size = textlogger.get_param_string('Tournament size', self.tournament_size, self.MSGWIDTH) scaling = textlogger.get_param_string('Scaling', self.scaling, self.MSGWIDTH) terminators = textlogger.get_param_string('Terminators', ', '.join(self.terminators), self.MSGWIDTH) num_unproductive = textlogger.get_param_string('Num. unproductive', self.num_unproductive, self.MSGWIDTH) evaluators = [] script_props = [ aproperty for aproperty in self.properties if aproperty.isScriptProperty() ] if script_props: evaluators.append(self.script_evaluator.__name__) if self.class_evaluators: evaluators.extend( [aclass.__name__ for aclass in self.class_evaluators]) evaluators = ', '.join(evaluators) evaluators = textlogger.get_param_string('Evaluators', evaluators, self.MSGWIDTH) n_procs = jobutils.get_procs() n_molecules = textlogger.get_param_string('# simultaneous molecules', n_procs, self.MSGWIDTH) n_sim_subjobs = textlogger.get_param_string( '# simultaneous subjobs per molecule', n_procs, self.MSGWIDTH) tpp = self.eval_kwargs.get(parserutils.FLAG_TPP) if tpp is None: tpp = 1 tpp = textlogger.get_param_string('# threads per relevant subjob', tpp, self.MSGWIDTH) base_name = textlogger.get_param_string('File base name', self.file_base_name, self.MSGWIDTH) no_open_shells = textlogger.get_param_string('No open shells', self.no_open_shell, self.MSGWIDTH) conformational_search = textlogger.get_param_string('Conformational search', \ self.conformational_search, self.MSGWIDTH) freezers = textlogger.get_param_string('Structure freezers', \ ', '.join(self.freezers), self.MSGWIDTH) inoculate = textlogger.get_param_string('Inoculate, i.e. draw from freezer, if', \ ', '.join(self.inoculate), self.MSGWIDTH) self.logger.info(HEADER) self.logger.info('-' * len(HEADER)) self.logger.info(num_candidate_sts) self.logger.info(population) self.logger.info(structure_score_threshold) self.logger.info(generations) self.logger.info(crossovers) self.logger.info(crossover_rate) self.logger.info(mutators) self.logger.info(mutation_rate) self.logger.info(fragment_libs) self.logger.info(elitism) self.logger.info(no_minimize) self.logger.info(selection) self.logger.info(tournament_size) self.logger.info(scaling) self.logger.info(terminators) self.logger.info(num_unproductive) self.logger.info(evaluators) self.logger.info(n_molecules) self.logger.info(n_sim_subjobs) self.logger.info(tpp) self.logger.info(base_name) self.logger.info(no_open_shells) self.logger.info(conformational_search) self.logger.info(freezers) self.logger.info(inoculate) self.logger.info('')
[docs] def initializeGenome(self): """ Initialize a genome. :rtype: StructureGenome :return: a genome """ genome = StructureGenome() for initializator in self.initializers: genome.initializator.add(initializator) for atuple in self.crossovers: crossover, weight = atuple genome.crossover.add(crossover, weight) for atuple in self.mutators: mutator, weight = atuple genome.mutator.add(mutator, weight) if len(self.crossovers) > 1: genome.crossover.setRandomApply(True) if len(self.mutators) > 1: genome.mutator.setRandomApply(True) genome.evaluator.set(base_evaluator) # set up freezer files freezer_files = {} if REMAINDER in self.freezers: nremainder = len(self.initial_population) - self.population my_random.shuffle(self.initial_population) remainder_file = self.file_base_name + REMAINDER_FILE_EXT awriter = structure.MaestroWriter(remainder_file) for astructure in self.initial_population[-1 * nremainder:]: awriter.append(astructure) awriter.close() freezer_files[REMAINDER] = [remainder_file] self.initial_population = self.initial_population[:-1 * nremainder] if FRAGMENT in self.freezers: freezer_files[FRAGMENT] = self.fragment_libs if PREVIOUS in self.freezers: freezer_files[PREVIOUS] = [] # use genome.setParams for parameters shared by all of the genomes in # a population, use an attribute for a property of that specific individual genome.setParams( initial_population=self.initial_population, script_evaluator=self.script_evaluator, eval_kwargs=self.eval_kwargs, generation=0, properties=self.properties, no_minimize=self.no_minimize, fragment_libs=self.fragment_libs, no_open_shell=self.no_open_shell, props_to_remove=self.props_to_remove, freezer_files=freezer_files, conformational_search=self.conformational_search, structure_score_threshold=self.structure_score_threshold, inoculate=self.inoculate, class_evaluators=self.class_evaluators) return genome
[docs] def initializeGA(self, genome): """ Initialize the genetic optimization. :type genome: StructureGenome :param genome: a genome """ self.ga_obj = GSimpleGA.GSimpleGA(genome, seed=self.pyevolve_random_seed, interactiveMode=False) self.ga_obj.setParams(skips=self.skips, failures=self.failures, logger=self.logger, properties=self.properties, num_unproductive=self.num_unproductive, terminated=False, file_base_name=self.file_base_name, jobbe=self.jobbe) self.ga_obj.setGenerations(self.generations) self.ga_obj.setPopulationSize(self.population) self.ga_obj.setCrossoverRate(float(self.crossover_rate) / 100) self.ga_obj.setMutationRate(float(self.mutation_rate) / 100) # set selection protocol self.ga_obj.selector.set(SELECTION_DICT[self.selection]) if MAX_GENERATIONS_TERM in self.terminators: pass elif ALL_PROPERTIES_TERM in self.terminators: self.ga_obj.terminationCriteria.set(all_properties) elif FIRST_PROPERTY_TERM in self.terminators: self.ga_obj.terminationCriteria.set(first_property) elif UNPRODUCTIVE_TERM in self.terminators: self.ga_obj.terminationCriteria.set(unproductive) self.ga_obj.setElitism(bool(self.elitism)) if self.elitism: self.ga_obj.setElitismReplacement(self.elitism) self.ga_obj.setSortType(Consts.sortType['scaled']) self.ga_obj.setMinimax(Consts.minimaxType['maximize']) # set full copy to True because in general the structure # object of our genome will change inside the evaluator function n_sim_subjobs = jobutils.get_procs() self.ga_obj.setMultiProcessing(flag=True, full_copy=True, max_processes=n_sim_subjobs) # set up callbacks self.ga_obj.stepCallback.set(uniquify_titles_callback) self.ga_obj.stepCallback.add(prepare_next_generation_dirs_callback) self.ga_obj.stepCallback.add(manage_skips_callback) self.ga_obj.stepCallback.add(manage_failures_callback) self.ga_obj.stepCallback.add(logging_summary_callback) self.ga_obj.stepCallback.add(molecule_history_callback) # get initial population object pop = self.ga_obj.getPopulation() # set tournament size parameter on initial population pop.setParams(tournamentPool=self.tournament_size) # set scaling protocol on initial population pop.scaleMethod.set(SCALING_DICT[self.scaling])
[docs] def setMonomerGrowAtoms(self): """ Set the monomer grow atoms using the mark monomer module convention rather than the polymer builder module convention. """ # see MATSCI-6080 where for density used in refractive index monomers # marked using the mark monomer module convention are needed, just make # this the standard for monomer recognition in this module # currently the treatment of polymer monomers in this module does # not distinguish between head and tail atoms but rather simply # considers both as a general grow atom so just arbitrarily update # using the head property for st in self.initial_population: for idx in rxn_channel.get_grow_idxs(st): atom = st.atom[idx] if rxn_channel.is_polymer_builder_grow_atom(atom) and \ not rxn_channel.is_mark_monomer_grow_atom(atom): atom.property[msprops.ROLE_PROP] = msprops.HEAD
[docs] def runIt(self): """ Run the components of the genetic optimization. """ self.setRootLoggerForPyEvolve() self.setOperatorNames() self.checkInputParams() self.setMonomerGrowAtoms() self.random_seed = rxn_channel.init_random_seed(self.random_seed, self.logger) my_random.seed(self.random_seed) self.pyevolve_random_seed = my_random.randint(RANDOM_INT_BOUND + 1) if self.logger: self.printProperties() self.printParams() genome = self.initializeGenome() self.initializeGA(genome) os.mkdir(EVALUATOR_JOBS_DIR) os.mkdir(os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + '0')) self.ga_obj.evolve(freq_stats=0) # if the optimization was not terminated in the strict sense # but has simply run all requested generations then call # some callbacks by hand if not self.ga_obj.getParam('terminated'): uniquify_titles_callback(self.ga_obj) manage_skips_callback(self.ga_obj) manage_failures_callback(self.ga_obj) logging_summary_callback(self.ga_obj) molecule_history_callback(self.ga_obj) if self.logger: self.logger.info('') # log the skips self.skips = self.ga_obj.getParam('skips') total_skips = sum([len(skips) for skips in self.skips.values()]) if total_skips and self.logger: print_bad_jobs(self.skips, self.logger, bad_type='skip') # log the failures self.failures = self.ga_obj.getParam('failures') total_failures = sum( [len(failures) for failures in self.failures.values()]) if total_failures and self.logger: print_bad_jobs(self.failures, self.logger, bad_type='fail') # create final output file generation_log_files = [] for index in range(self.ga_obj.getCurrentGeneration() + 1): afile = get_generation_log_file_name(self.file_base_name, index) generation_log_files.append(afile) output_file_name = get_output_file_name(self.file_base_name) with wam.WorkflowMenuMaestroWriter( output_file_name, wam.WorkflowType.MS_GENETIC_OPTIMIZATION) as output_writer: for astructure in structure.MultiFileStructureReader( generation_log_files): jobutils.set_source_path(astructure) output_writer.append(astructure) # log that this genetic optimization is complete if self.logger: self.logger.info('This genetic optimization is now complete.') self.logger.info('')
[docs]def get_output_file_name(basename): """ Get the output file name from the basename. :type basename: str :param basename: base name to use :rtype: str :return: output_file_name, name of output file """ return basename + OUT_MAE_EXT
[docs]def get_generation_log_file_name(basename, generation): """ Get the generation log file name. :type basename: str :param basename: base name to use :type generation: int :param generation: the generation :rtype: str :return: generation_log_file_name, name of generation log file """ return basename + GENER_LOG_FILE_SEP + str(generation) + '.mae'
# evaluators
[docs]def get_structure_score(astructure, properties, conformational_search, seed=None, this_random=None): """ Return the structure score for the provided structure. :type astructure: schrodinger.structure.Structure :param astructure: the structure to score :type properties: list of Property :param properties: the properties used in scoring :type conformational_search: bool or str :param conformational_search: specifies whether a Macromodel conformational search will be performed prior to evaluation, when a string it specifies a simplified Macromodel input file containing extra options :type seed: int or None :param seed: random seed used in conformational search or None if conformational search is not being done :type this_random: numpy.random.RandomState or None :param this_random: random state, if None use the module constant :rtype: float :return: the structure score """ # the properties will always have the same HOST_STR so just use # the first property host_str = properties[0].class_kwargs[ClassEvaluator.HOST_STR] this_random = this_random or my_random score = astructure.property.get(STRUCTURE_SCORE_KEY) if score is not None: return score # it is ok to pass on the exception as the input file will be there # and we are simply marking any failed jobs with a structure property if conformational_search: if conformational_search is True: macromodel_options_file = None else: macromodel_options_file = conformational_search try: get_low_energy_conformers( astructure, macromodel_options_file=macromodel_options_file, overwrite=True, seed=seed, this_random=this_random, host_str=host_str) except ValueError as msg: pass obj = StructureEvaluator([astructure], properties) obj.runIt() score = 0 for aproperty in properties: if aproperty.name not in StructureEvaluator.PROPERTIES: continue indiv_score = astructure.property[aproperty.key] if aproperty.minimax == Property.MAX: score += aproperty.weight * indiv_score elif aproperty.minimax == Property.MIN: score += aproperty.weight * -1 * indiv_score else: if aproperty.comparator == Property.EQUALS: try: score_tmp = aproperty.weight / abs(indiv_score - aproperty.target) except ZeroDivisionError: score_tmp = INFINITE_SCORE score += score_tmp elif aproperty.comparator == Property.GREATER_THAN: if indiv_score <= aproperty.target: score += aproperty.weight * (indiv_score - aproperty.target) elif aproperty.comparator == Property.LESS_THAN: if indiv_score >= aproperty.target: score += aproperty.weight * (aproperty.target - indiv_score) astructure.property[STRUCTURE_SCORE_KEY] = score return score
[docs]def structure_evaluator(genome, this_random=None): """ This is the structure evaulator. :type genome: StructureGenome :param genome: a genome :type this_random: numpy.random.RandomState or None :param this_random: random state, if None use the module constant :rtype: float :return: the score for this individual """ this_random = this_random or my_random conf_seed = get_random_csearch_seed(this_random=this_random) freezer_seed = this_random.randint(RANDOM_INT_BOUND + 1) freezer_files = genome.getParam('freezer_files') crossover_applied = genome.astructure.property[CROSSOVER_APPLIED_KEY] mutation_applied = genome.astructure.property[MUTATION_APPLIED_KEY] properties = genome.getParam('properties') structure_score_threshold = genome.getParam('structure_score_threshold') # do the conformational search in the evaluator rather than # at the end of the operators this way it can be done in # parallel over individuals but only if this genome came # from an operation, i.e. if this genome came directly # from the previous generation and skipped the operation, # either because it was elite or because of the rates of # operators, then do not perform the conformational search # for reasons of consistency (for example in the case of an # elite individual it could change its status due to the # stochastic nature of the search), include the conformational # search step in this structure evaluator, as well as the # base evaluator, in case we implement a structure based # property that depends on conformation conformational_search = genome.getParam('conformational_search') # structure titles coming into here look like either (1) # <stoichiometry>.<basename_ext> if the structure passed through # the operators or (2) <stoichiometry>_%d_%d if it did not # (either because of elitism or the rates of operators) if genome.astructure.title.count('_'): conformational_search = False basename = genome.astructure.title.split('_')[0] + STOICH_BASE_EXT_SEP + \ genome.basename_ext genome.astructure.title = basename score = get_structure_score(genome.astructure, properties, conformational_search, seed=conf_seed, this_random=this_random) if score < structure_score_threshold and \ BAD_STRUCTURE in genome.getParam('inoculate'): astructure = get_freezer_structure( freezer_files, structure_score_threshold=structure_score_threshold, properties=properties, conformational_search=genome.getParam('conformational_search'), inoculate=BAD_STRUCTURE, crossover_applied=crossover_applied, mutation_applied=mutation_applied, basename_ext=genome.basename_ext, seed=freezer_seed, this_random=this_random) if astructure: if astructure.property[STRUCTURE_SCORE_KEY] > score: score = astructure.property[STRUCTURE_SCORE_KEY] genome.astructure = astructure return score
[docs]def base_evaluator(genome): """ This is the base evaulator used to wrap all other evaluators. :type genome: StructureGenome :param genome: a genome :rtype: float :return: the score for this individual """ # the properties will always have the same HOST_STR so just use # the first property host_str = genome.getParam('properties')[0].class_kwargs[ ClassEvaluator.HOST_STR] this_random = numpy.random.RandomState() this_random.seed(genome.seed) class_evaluators = genome.getParam('class_evaluators') if class_evaluators.get(StructureEvaluator, None): structure_score = structure_evaluator(genome, this_random=this_random) else: structure_score = None structure_score_threshold = genome.getParam('structure_score_threshold') conf_seed = get_random_csearch_seed(this_random=this_random) # structure titles coming into here look like either (1) # <stoichiometry>.<basename_ext> if the structure passed through # the operators or (2) <stoichiometry>_%d_%d if it did not # (either because of elitism or the rates of operators) if genome.astructure.title.count('_'): operated_on = False basename = genome.astructure.title.split('_')[0] + STOICH_BASE_EXT_SEP + \ genome.basename_ext genome.astructure.title = basename else: operated_on = True basename = genome.astructure.title # do the conformational search in the evaluator rather than # at the end of the operators this way it can be done in # parallel over individuals, if the structure_evaluator was # not first called then get the lowest energy conformer now # but only if this genome came from an operation, i.e. if # this genome came directly from the previous generation # and skipped the operation, either because it was elite # or because of the rates of operators, then do not perform # the conformational search for reasons of consistency (for # example in the case of an elite individual it could change # its status due to the stochastic nature of the search), # it is ok to pass on the exception as the input file will be there # and we are simply marking any failed jobs with a structure property conformational_search = genome.getParam('conformational_search') if operated_on and conformational_search and structure_score is None: if conformational_search is True: macromodel_options_file = None else: macromodel_options_file = conformational_search try: get_low_energy_conformers( genome.astructure, macromodel_options_file=macromodel_options_file, overwrite=True, seed=conf_seed, this_random=this_random, host_str=host_str) except ValueError as msg: pass # make subdirectory to hold this evaluation current_cwd = os.getcwd() generation = genome.getParam('generation') generation_dir = os.path.join(current_cwd, EVALUATOR_JOBS_DIR, GENER_SUBDIR + str(generation)) tmp_dir = os.path.join(generation_dir, basename) os.mkdir(tmp_dir) # put an input Maestro file for this individual in the subdirectory infile = basename + IN_MAE_EXT genome.infile = infile genome.astructure.write(os.path.join(tmp_dir, infile)) # get and call the actual evaluator which returns the # JobDJ script_evaluator = genome.getParam('script_evaluator') script_jobdj = script_evaluator(genome) # handle skipped jobs if not (structure_score is None or structure_score >= structure_score_threshold): SKIP_MSG = ( 'Structure score is %.2f which is less than the %.2f threshold and ' 'is therefore being skipped.') with open(os.path.join(tmp_dir, basename + '-skipped.txt'), 'w') as skip_file: skip_file.write(SKIP_MSG % (structure_score, structure_score_threshold)) genome.skip = Skip( genome, SKIP_MSG % (structure_score, structure_score_threshold)) return BAD_SCORE # run the script based properties script_props = [ aproperty for aproperty in genome.getParam('properties') if aproperty.isScriptProperty() ] if script_props: with fileutils.chdir(tmp_dir): script_jobdj.run() outfile = os.path.join(tmp_dir, genome.outfile) if not script_jobdj.isComplete() or script_jobdj.total_failed: FAIL_MSG = ('Subjob has a bad exit status.') elif not os.path.exists(outfile): FAIL_MSG = ('Subjob output summary Maestro file can\'t be found.') else: FAIL_MSG = None if FAIL_MSG: genome.failure = Failure(genome, FAIL_MSG) return BAD_SCORE else: outfile = os.path.join(tmp_dir, genome.infile) opt_out_st = structure.Structure.read(outfile) # run the class based properties class_props = [] for class_evaluator, properties in class_evaluators.items(): if class_evaluator == StructureEvaluator: continue class_props.extend(properties) obj = class_evaluator([opt_out_st], properties) try: with fileutils.chdir(tmp_dir): obj.runIt() except RuntimeError as err: genome.failure = Failure(genome, str(err)) return BAD_SCORE # process successful job by tabulating the total score or return a failure, # only consider properties that are not structure properties as those are # handled in structure_evaluator because they do not require an external # calculation and are therefore much easier to manage properties = script_props + class_props score = 0 if structure_score: score += structure_score for aproperty in properties: key, minimax, target, comparator, weight = aproperty.key, \ aproperty.minimax, aproperty.target, aproperty.comparator, \ aproperty.weight indiv_score = opt_out_st.property.get(key) if indiv_score is None: FAIL_MSG = ('Subjob output summary Maestro file has no key: %s.') genome.failure = Failure(genome, FAIL_MSG % key) return BAD_SCORE else: # use simple linear and hyperbolic functions if minimax == Property.MAX: score += weight * indiv_score elif minimax == Property.MIN: score += weight * -1 * indiv_score else: if comparator == Property.EQUALS: try: score_tmp = weight / abs(indiv_score - target) except ZeroDivisionError: score_tmp = INFINITE_SCORE score += score_tmp elif comparator == Property.GREATER_THAN: if indiv_score <= target: score += weight * (indiv_score - target) elif comparator == Property.LESS_THAN: if indiv_score >= target: score += weight * (target - indiv_score) # this is how the genome is changed during evaluation genome.astructure = opt_out_st return score
[docs]def optoelectronics_evaluator(genome): """ Run an optoelectronics job. :type genome: StructureGenome :param genome: a genome :rtype: JobDJ :return: the JobDJ object for this individual, it is run in the base evaluator """ # the properties will always have the same HOST_STR so just use # the first property host_str = genome.getParam('properties')[0].class_kwargs[ ClassEvaluator.HOST_STR] # handle kwargs and input and output files eval_kwargs = genome.getParam('eval_kwargs') infile = genome.infile basename = infile.split(IN_MAE_EXT)[0] outfile = basename + OUT_MAE_EXT genome.outfile = outfile # build args containing user specified kwargs plus files, for # files shared by all sub-jobs just use a relative path (always # three sub-dirs deep) to the top-level directory args = [] for key, value in eval_kwargs.items(): args.append(key) if key == '-settings': path = EVALUATOR_RELATIVE_DIR + [value] args.append(os.path.join(*path)) else: args.extend(value.split(' ')) args.extend([infile, outfile]) # build the actual JobDJ object for this individual, # the actual evaluation is run on the same node as the driver and are # run from this individuals subdirectory PATH_ARGS = [ '..', '..', 'python', 'scripts', 'optoelectronics_gui_dir', 'optoelectronics_driver.py' ] SCRIPTPATH = hunt('mmshare', dir='exec') SCRIPTPATH = os.path.join(SCRIPTPATH, *PATH_ARGS) cmd = [SCRIPTPATH] + args job_dj = jobutils.create_queue(host=host_str) jc_job = jobutils.RobustSubmissionJob(cmd, name=basename) job_dj.addJob(jc_job) return job_dj
[docs]def apply_uniform_operator_weights(operators): """ Set the operator weights uniformly. :type operators: list :param operators: a list of two-element tuples, each tuple contains first an operator function and second a weight :rtype: list :return: list of two-element tuples of operators and uniform weights """ num_ops = len(operators) uniform_weight = 1.0 / float(num_ops) functions, weights = list(zip(*operators)) weights = (uniform_weight,) * num_ops operators = list(zip(functions, weights)) return operators
[docs]def structure_is_open_shell(astructure, ignore_charge=True): """ Return True if the provided structure is open shell, i.e. has an odd number of electrons. :type astructure: schrodinger.structure.Structure :param astructure: the structure in question :type ignore_charge: bool :param ignore_charge: if True then ignore any structure.formal_charge settings :rtype: bool :return: True if the provided structure is open shell, False otherwise """ nelectrons = sum([aatom.atomic_number for aatom in astructure.atom]) if not ignore_charge: nelectrons -= astructure.formal_charge return bool(nelectrons % 2)
[docs]def get_element_histogram(astructure): """ Return a dictionary where keys are elements and values are the numbers of atoms of a given element. :type astructure: schrodinger.structure.Structure :param astructure: the structure in question :rtype: dict :return: dictionary with element histogram, keys are elements (strs) and values are numbers (ints) """ pattern = re.compile(r'([A-Z][a-z]?)(\d*)') formula = analyze.generate_molecular_formula(astructure) matches = re.findall(pattern, formula) elements, numbers = list(zip(*matches)) numbers = list(numbers) while '' in numbers: numbers[numbers.index('')] = '1' numbers = tuple([int(number) for number in numbers]) return dict(zip(elements, numbers))
[docs]def remove_basename_ext(stoich_ext): """ Remove the basename extension from the given string and return the remainder which is the stoichiometry. Do this instead of having to recompute the stoichiometry which can be expensive. :type stoich_ext: str :param stoich_ext: contains the stoichiometry and basename extension :rtype: str :return: stoichiometry """ return stoich_ext.split(STOICH_BASE_EXT_SEP)[0]
[docs]def get_low_energy_conformers(astructure_in, macromodel_options_file=None, remove_files=False, overwrite=False, seed=None, this_random=None, host_str=None): """ Return the lowest energy conformers from a Macromodel conformational search. :type astructure_in: schrodinger.structure.Structure :param astructure_in: the structure to search for conformations :type macromodel_options_file: str or None :param macromodel_options_file: the name of a simplified Macromodel input file that contains any options to use in addition to those used by default in a conformational search or None if there are none and you just want to use the defaults :type remove_files: bool :param remove_files: if the job is successful, specifies whether to remove all files created for it after it finishes :type overwrite: bool :param overwrite: if True then the coordinates of the input structure will be overwritten by those of the lowest energy conformer and that structure alone returned by this function :type seed: int or None :param seed: used to seed the random number generator used in the Macromodel conformational search, should be in CONF_SEARCH_SEED_RANGE, if None then if a CONFSEARCH_SEED has been specified in macromodel_options_file it will be used, otherwise a random int in CONF_SEARCH_SEED_RANGE will be used :type this_random: numpy.random.RandomState or None :param this_random: random state, if None use the module constant :type host_str: str :param host_str: the host string, for example 'localhost:4' :rtype: list of schrodinger.structure.Structure, int :return: the structures of the lowest energy conformers sorted by increasing energy and the seed used in the conformational search (same as input if input was given either as seed or in macromodel_options_file) """ host_str = ClassEvaluator.getHostStr(host_str=host_str) this_random = this_random or my_random if macromodel_options_file and not \ os.path.exists(macromodel_options_file): astructure_in.property[CONF_SEARCH_FAILED_KEY] = True no_options_file_msg = ('The specified Macromodel options ' 'file, %s, can not be found.') raise ValueError(no_options_file_msg % macromodel_options_file) base_name = jobutils.StringCleaner().cleanAndUniquify(astructure_in.title) file_basename = base_name + '_confsearch' macromodel_in = file_basename + '.inp' macromodel_structure_in = file_basename + '_in.mae' macromodel_structure_out = file_basename + '_out.mae' astructure_in.write(macromodel_structure_in) confsearch = ConfSearch(macromodel_options_file) # handle seed seed_key = 'CONFSEARCH_SEED' if seed is None: seed = confsearch.get(seed_key, None) if seed is None: seed = get_random_csearch_seed(this_random=this_random) if seed < CONF_SEARCH_SEED_RANGE[0] or CONF_SEARCH_SEED_RANGE[1] < seed: bad_seed_msg = ('Macromodel requires a seed in %s.') raise ValueError(bad_seed_msg % CONF_SEARCH_SEED_RANGE) confsearch[seed_key] = seed confsearch['INPUT_STRUCTURE_FILE'] = macromodel_structure_in confsearch['OUTPUT_STRUCTURE_FILE'] = macromodel_structure_out confsearch.writeSimplified(macromodel_in) macromodel_cmd = ['macromodel', '-WAIT', macromodel_in] job_dj = jobutils.create_queue(host=host_str) jc_job = jobutils.RobustSubmissionJob(macromodel_cmd, name=file_basename) job_dj.addJob(jc_job) job_dj.run() # see MATSCI-1974 for the reasoning behind the third check if not job_dj.isComplete() or job_dj.total_failed: astructure_in.property[CONF_SEARCH_FAILED_KEY] = True job_fail_msg = ('Macromodel conformational search job has failed.') raise ValueError(job_fail_msg) elif not os.path.exists(macromodel_structure_out): astructure_in.property[CONF_SEARCH_FAILED_KEY] = True no_structure_out_msg = ('The Macromodel conformational search ' 'structure output file, %s, can not be found.') raise ValueError(no_structure_out_msg % macromodel_structure_out) elif not structure.count_structures(macromodel_structure_out): astructure_in.property[CONF_SEARCH_FAILED_KEY] = True no_structure_msg = ( 'The Macromodel conformational search ' 'structure output file, %s, contains zero structures.') raise ValueError(no_structure_msg % macromodel_structure_out) else: conformers = [conformer for conformer in \ structure.StructureReader(macromodel_structure_out)] if remove_files: pattern = '*' + file_basename + '*' for afile in glob.glob(pattern): fileutils.force_remove(afile) if overwrite: astructure_in.property[CONF_SEARCH_FAILED_KEY] = False conformers = [astructure_in.setXYZ(conformers[0].getXYZ())] return conformers, seed
[docs]def get_random_structure(structure_libs, tries_from_libs=TRIES_FROM_LIBS, structure_score_threshold=None, properties=None, conformational_search=CONFORMATIONAL_SEARCH, seed=None, this_random=None): """ From the given dictionary of libraries return a random structure. :type structure_libs: dict :param structure_libs: keys are strings specifying the types of libraries to be used and can be module constants from FREEZER_CHOICES.keys(), values are lists of libraries by type and can be either module constants from FRAGMENT_LIBS.keys(), ALL, or the names of Maestro files (including the file extensions) :type tries_from_libs: int :param tries_from_libs: the number of times to try before giving up :type structure_score_threshold: float or None :param structure_score_threshold: specifies that a structure with a structure score greater-than-or-equal-to this threshold is sought, the best of the considered structures will be returned and will contain several structure properties related to the scoring :type properties: list of Property or None :param properties: the properties used in structure scoring :type conformational_search: bool or str :param conformational_search: specifies whether a Macromodel conformational search will be performed prior to evaluation, when a string it specifies a simplified Macromodel input file containing extra options :type seed: int or None :param seed: if not None specifies that random should be reseeded with the given value :type this_random: numpy.random.RandomState or None :param this_random: random state, if None use the module constant :rtype: schrodinger.structure.Structure or None :return: the random structure or None if one couldn't be found """ this_random = this_random or my_random if seed is not None: this_random.seed(seed) structure_libs_copy = structure_libs.copy() if FRAGMENT in structure_libs_copy: fragments = structure_libs_copy[FRAGMENT] if ALL in fragments: fragments = set(fragments) fragments.update(set(FRAGMENT_LIBS)) fragments.remove(ALL) fragments = list(fragments) fragments.sort() structure_libs_copy[FRAGMENT] = fragments # if we are on the zeroth generation then we have no previous # structures and if that was the only freezer requested then # we will not be able to find a structure if PREVIOUS in structure_libs_copy: if not structure_libs_copy[PREVIOUS]: del structure_libs_copy[PREVIOUS] if not structure_libs_copy: return num_unproductive = 0 found = False structures_and_scores = [] while not found: if num_unproductive == tries_from_libs: if structures_and_scores: best_structure, best_score = \ sorted(structures_and_scores, key=lambda x: x[1]).pop() else: best_structure = None return best_structure # the fragment library has not been checked so do that on-the-fly, # we are ignoring genome.getParam('no_open_shell') here because that # is not exposed and always True structure_lib_type = this_random.choice(list(structure_libs_copy)) if structure_lib_type == FRAGMENT: closed_shell = simple_bonds = True else: closed_shell = simple_bonds = False structure_lib = this_random.choice( structure_libs_copy[structure_lib_type]) # if the library type is fragment then the structure_lib can be # a key from FRAGMENT_LIBS.keys() rather than an explicit file name structure_file = FRAGMENT_LIBS.get(structure_lib) if not structure_file: structure_file = structure_lib num_structures = structure.count_structures(structure_file) index = this_random.randint(1, high=num_structures + 1) astructure = structure.Structure.read(structure_file, index=index) # see MATSCI-7256 where a fragment library may contain molecules that # feature dummy atoms used as attachment site markers for atom in astructure.atom: if atom.atomic_number == -2 and atom.bond_total == 1: atom.atomic_number = 1 if closed_shell: if structure_is_open_shell(astructure, ignore_charge=True): num_unproductive += 1 continue if simple_bonds: if not get_num_simple_bonds(astructure): num_unproductive += 1 continue # set some properties, remove grow names and PDB atom and # residue names that may be present in database structures # (this is just to prevent the auto-labeling when importing # structures into Maestro) astructure.property[FROM_FREEZER_KEY] = structure_lib_type astructure.property[CROSSOVER_PARENTS_KEY] = '' astructure.property[MUTATION_PARENT_KEY] = '' astructure.property[CROSSOVER_APPLIED_KEY] = '' astructure.property[MUTATION_APPLIED_KEY] = '' if structure_lib_type == FRAGMENT: for aatom in astructure.atom: aatom.property[GROW_NAME_KEY] = '' aatom.property[PDB_ATOM_NAME_KEY] = '' aatom.property[PDB_RES_NAME_KEY] = '' if structure_score_threshold is not None: set_title_to_stoichiometry(astructure) conf_seed = get_random_csearch_seed(this_random=this_random) structure_score = get_structure_score(astructure, properties, conformational_search, seed=conf_seed, this_random=this_random) structures_and_scores.append((astructure, structure_score)) if structure_score < structure_score_threshold: num_unproductive += 1 continue found = True return astructure
[docs]def get_freezer_structure(structure_libs, tries_from_libs=TRIES_FROM_LIBS, structure_score_threshold=None, properties=None, conformational_search=CONFORMATIONAL_SEARCH, inoculate=NO_CHILD, crossover_applied=None, mutation_applied=None, basename_ext=None, seed=None, this_random=None): """ Return a random structure from the freezer and update that structure's properties. :type structure_libs: dict :param structure_libs: keys are strings specifying the types of libraries to be used and can be module constants from FREEZER_CHOICES.keys(), values are lists of libraries by type and can be either module constants from FRAGMENT_LIBS.keys(), ALL, or the names of Maestro files (including the file extensions) :type tries_from_libs: int :param tries_from_libs: the number of times to try before giving up :type structure_score_threshold: float or None :param structure_score_threshold: specifies that a structure with a structure score greater-than-or-equal-to this threshold is sought, the best of the considered structures will be returned and will contain several structure properties related to the scoring :type properties: list of Property or None :param properties: the properties used in structure scoring :type conformational_search: bool or str :param conformational_search: specifies whether a Macromodel conformational search will be performed prior to evaluation, when a string it specifies a simplified Macromodel input file containing extra options :type inoculate: str :param inoculate: specify the reason for drawing from the freezer, which is an inoculate option from INOCULATE_CHOICES :type crossover_applied: str or None :param crossover_applied: specify the intended crossover operator or None if there isn't to be one :type mutation_applied: str or None :param mutation_applied: specify the intended mutation operator or None if there isn't to be one :type basename_ext: str or None :param basename_ext: specify an extension to append to the stoichiometry which is used to set the title of the returned structure :type seed: int or None :param seed: if not None specifies that random should be reseeded with the given value :type this_random: numpy.random.RandomState or None :param this_random: random state, if None use the module constant :rtype: schrodinger.structure.Structure or None :return: the random structure or None if one couldn't be found """ this_random = this_random or my_random astructure = get_random_structure( structure_libs, tries_from_libs=tries_from_libs, structure_score_threshold=structure_score_threshold, properties=properties, conformational_search=conformational_search, seed=seed, this_random=this_random) if not astructure: return astructure.property[INOCULATE_KEY] = inoculate if crossover_applied: astructure.property[CROSSOVER_APPLIED_KEY] = crossover_applied if mutation_applied: astructure.property[MUTATION_APPLIED_KEY] = mutation_applied if basename_ext is None: basename_ext = '' set_title_to_stoichiometry(astructure, toappend=basename_ext) return astructure